Dateien
AegisSight-Monitor-Verwaltung/src/routers/sources.py
claude-dev 2001815e19 Phase 4: Admin-Übersicht erweitern (Stats-Bar + Health-Badge inline + Letzter Treffer)
Backend
- routers/sources.py:
  - GET /api/sources/global/stats NEU: aggregierte Counter
    nach Typ, Total-Articles, Health-Bilanz (errors/warnings/ok)
  - GET /api/sources/global liefert pro Quelle health_status
    (worst-case error > warning > ok, NULL wenn nie gecheckt)

Frontend
- dashboard.html sub-global-sources: Stats-Bar Container oben.
  Tabellenkopf bekommt zwei neue Spalten: Letzter Treffer + Health.
- style.css: .sources-stats-bar (analog Monitor-Style),
  .health-badge mit Varianten error/warning/ok/unknown.
- sources.js:
  - loadGlobalSources lädt parallel /global + /global/stats
  - renderGlobalStats: rendert Stats-Bar mit Total-Quellen,
    Counts pro Typ (aus META), Total-Articles, Health-Counters
  - renderGlobalSources: 9 Spalten statt 7, Letzter-Treffer + Health-Badge,
    typeLabel statt TYPE_LABELS-Direktzugriff
2026-05-09 03:12:30 +00:00

979 Zeilen
35 KiB
Python

"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
import logging
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
import aiosqlite
from auth import get_current_admin
from database import db_dependency
from audit import log_action, get_client_ip, row_to_dict
from source_meta import get_meta
from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S
from shared.source_rules import (
discover_source,
discover_all_feeds,
evaluate_feeds_with_claude,
_extract_domain,
_detect_category,
domain_to_display_name,
)
logger = logging.getLogger("verwaltung.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"}
@router.get("/meta")
async def get_sources_meta(admin: dict = Depends(get_current_admin)):
"""Liefert Kategorien und Typen als Single Source of Truth.
Frontend lädt das beim Init und befüllt damit Filter-Dropdowns + Label-Lookups.
Damit gibt es keine hardcoded Listen mehr im JS/HTML.
"""
return get_meta()
class GlobalSourceCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: str = Field(default="sonstige")
status: str = Field(default="active", pattern="^(active|inactive)$")
notes: Optional[str] = None
class GlobalSourceUpdate(BaseModel):
name: Optional[str] = Field(default=None, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: Optional[str] = None
status: Optional[str] = Field(default=None, pattern="^(active|inactive)$")
notes: Optional[str] = None
@router.get("/global")
async def list_global_sources(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Grundquellen auflisten (tenant_id IS NULL).
Liefert pro Quelle den worst-case Health-Status aus source_health_checks
(error > warning > ok > unknown). Damit kann das Frontend ein Inline-Badge
pro Zeile zeigen, ohne separate Health-Tab-Abfrage.
"""
cursor = await db.execute("""
SELECT s.*,
COALESCE((
SELECT CASE
WHEN MAX(CASE WHEN h.status = 'error' THEN 3
WHEN h.status = 'warning' THEN 2
WHEN h.status = 'ok' THEN 1
ELSE 0 END) = 3 THEN 'error'
WHEN MAX(CASE WHEN h.status = 'error' THEN 3
WHEN h.status = 'warning' THEN 2
WHEN h.status = 'ok' THEN 1
ELSE 0 END) = 2 THEN 'warning'
WHEN MAX(CASE WHEN h.status = 'error' THEN 3
WHEN h.status = 'warning' THEN 2
WHEN h.status = 'ok' THEN 1
ELSE 0 END) = 1 THEN 'ok'
ELSE NULL
END
FROM source_health_checks h WHERE h.source_id = s.id
), NULL) AS health_status
FROM sources s
WHERE s.tenant_id IS NULL
ORDER BY s.category, s.source_type, s.name
""")
return [dict(row) for row in await cursor.fetchall()]
@router.post("/global", status_code=201)
async def create_global_source(
data: GlobalSourceCreate,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Neue Grundquelle anlegen."""
if data.url:
cursor = await db.execute(
"SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL",
(data.url,),
)
existing = await cursor.fetchone()
if existing:
raise HTTPException(
status_code=409,
detail=f"URL bereits vorhanden: {existing['name']}",
)
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, 'system', NULL)""",
(data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes),
)
src_id = cursor.lastrowid
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (src_id,))
new_src = dict(await cursor.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="create", resource_type="source", resource_id=src_id,
after=new_src,
)
return new_src
@router.put("/global/{source_id}")
async def update_global_source(
source_id: int,
data: GlobalSourceUpdate,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Grundquelle bearbeiten."""
cursor = await db.execute(
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
before = dict(row)
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
updates[field] = value
if not updates:
return before
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [source_id]
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
after = dict(await cursor.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=source_id,
before=before, after=after,
)
return after
@router.delete("/global/{source_id}", status_code=204)
async def delete_global_source(
source_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Grundquelle loeschen."""
cursor = await db.execute(
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
)
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
before = dict(row)
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
await db.commit()
await log_action(
db, admin, get_client_ip(request),
action="delete", resource_type="source", resource_id=source_id,
before=before,
)
@router.get("/global/stats")
async def get_global_stats(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Stats für die Grundquellen-Stats-Bar oben im Tab."""
cur = await db.execute("""
SELECT source_type, COUNT(*) AS count, COALESCE(SUM(article_count), 0) AS articles
FROM sources
WHERE tenant_id IS NULL AND status = 'active'
GROUP BY source_type
""")
by_type = {}
total = 0
total_articles = 0
for r in await cur.fetchall():
d = dict(r)
by_type[d["source_type"]] = {"count": d["count"], "articles": d["articles"]}
total += d["count"]
total_articles += d["articles"]
# Health-Counter
health = {"errors": 0, "warnings": 0, "ok": 0}
cur = await db.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'
""")
if await cur.fetchone():
cur = await db.execute("""
SELECT h.status AS hs, COUNT(DISTINCT h.source_id) AS cnt
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
WHERE s.tenant_id IS NULL AND s.status = 'active'
GROUP BY h.status
""")
for r in await cur.fetchall():
d = dict(r)
if d["hs"] == "error":
health["errors"] = d["cnt"]
elif d["hs"] == "warning":
health["warnings"] = d["cnt"]
elif d["hs"] == "ok":
health["ok"] = d["cnt"]
return {
"by_type": by_type,
"total": total,
"total_articles": total_articles,
"health": health,
}
@router.get("/tenant")
async def list_tenant_sources(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle tenant-spezifischen Quellen mit Org-Name auflisten."""
cursor = await db.execute("""
SELECT s.*, o.name as org_name
FROM sources s
LEFT JOIN organizations o ON o.id = s.tenant_id
WHERE s.tenant_id IS NOT NULL
ORDER BY o.name, s.category, s.name
""")
return [dict(row) for row in await cursor.fetchall()]
@router.post("/tenant/{source_id}/promote")
async def promote_to_global(
source_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Tenant-Quelle zur Grundquelle befoerdern."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
if row["tenant_id"] is None:
raise HTTPException(status_code=400, detail="Bereits eine Grundquelle")
before = dict(row)
if row["url"]:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
(row["url"],),
)
if await cursor.fetchone():
raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden")
await db.execute(
"UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?",
(source_id,),
)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
after = dict(await cursor.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=source_id,
before=before, after=after,
)
return after
class BulkPromoteRequest(BaseModel):
source_ids: list[int]
@router.post("/tenant/bulk-promote")
async def bulk_promote_tenant_sources(
data: BulkPromoteRequest,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Mehrere Tenant-Quellen auf einen Schlag zur Grundquelle befoerdern.
Returns:
promoted: int - Anzahl erfolgreich promoter Quellen
skipped: list - {id, name, reason} fuer uebersprungene
failed: list - {id, error} fuer Fehler
"""
promoted = 0
skipped = []
failed = []
for sid in data.source_ids:
try:
cur = await db.execute("SELECT * FROM sources WHERE id = ?", (sid,))
row = await cur.fetchone()
if not row:
failed.append({"id": sid, "error": "nicht gefunden"})
continue
if row["tenant_id"] is None:
skipped.append({"id": sid, "name": row["name"], "reason": "bereits Grundquelle"})
continue
before = dict(row)
if row["url"]:
cur = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
(row["url"],),
)
if await cur.fetchone():
skipped.append({"id": sid, "name": row["name"],
"reason": "URL bereits als Grundquelle vorhanden"})
continue
await db.execute(
"UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?",
(sid,),
)
cur = await db.execute("SELECT * FROM sources WHERE id = ?", (sid,))
after = dict(await cur.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source", resource_id=sid,
before=before, after=after,
)
promoted += 1
except Exception as e:
failed.append({"id": sid, "error": str(e)})
await db.commit()
return {"promoted": promoted, "skipped": skipped, "failed": failed}
@router.post("/discover")
async def discover_source_endpoint(
url: str,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen.
Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt
die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen.
"""
try:
multi = await discover_all_feeds(url)
except Exception as e:
logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen")
domain = multi["domain"]
category = multi["category"]
feeds = multi.get("feeds", [])
# Fallback auf Einzel-Discovery wenn keine Feeds gefunden
if not feeds:
try:
single = await discover_source(url)
if single.get("rss_url"):
feeds = [{"name": single["name"], "url": single["rss_url"]}]
domain = single.get("domain", domain)
category = single.get("category", category)
except Exception:
pass
if not feeds:
return {
"domain": domain,
"category": category,
"feeds": [],
"existing": [],
"message": "Keine RSS-Feeds gefunden",
}
# Mit Claude bewerten
try:
relevant_feeds = await evaluate_feeds_with_claude(domain, feeds)
except Exception:
relevant_feeds = feeds[:3]
# Bereits vorhandene Grundquellen pruefen
cursor = await db.execute(
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
)
existing_urls = {row["url"] for row in await cursor.fetchall()}
result_feeds = []
existing = []
for feed in relevant_feeds:
info = {
"name": feed.get("name", domain_to_display_name(domain)),
"url": feed["url"],
"domain": domain,
"category": category,
}
if feed["url"] in existing_urls:
existing.append(info)
else:
result_feeds.append(info)
return {
"domain": domain,
"category": category,
"feeds": result_feeds,
"existing": existing,
}
@router.post("/discover/add")
async def add_discovered_sources(
feeds: list[dict],
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Erkannte Feeds als Grundquellen anlegen.
Erwartet eine Liste von {name, url, domain, category}.
Ueberspringt bereits vorhandene URLs.
"""
cursor = await db.execute(
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
)
existing_urls = {row["url"] for row in await cursor.fetchall()}
added = 0
skipped = 0
added_ids = []
for feed in feeds:
if not feed.get("url"):
continue
if feed["url"] in existing_urls:
skipped += 1
continue
domain = feed.get("domain", "")
cur = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""",
(feed["name"], feed["url"], domain, feed.get("category", "sonstige")),
)
added_ids.append(cur.lastrowid)
existing_urls.add(feed["url"])
added += 1
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
if feeds and feeds[0].get("domain"):
domain = feeds[0]["domain"]
cursor = await db.execute(
"SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL",
(domain.lower(),),
)
if not await cursor.fetchone():
await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""",
(domain_to_display_name(domain), f"https://{domain}", domain,
feeds[0].get("category", "sonstige")),
)
added += 1
await db.commit()
if added_ids:
await log_action(
db, admin, get_client_ip(request),
action="create", resource_type="source",
after={"discovered_add": {"count": added, "ids": added_ids,
"domain": feeds[0].get("domain") if feeds else None}},
)
return {"added": added, "skipped": skipped}
# --- Health-Check & Vorschläge ---
@router.get("/health")
async def get_health(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check-Ergebnisse abrufen."""
# Prüfen ob Tabelle existiert
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'"
)
if not await cursor.fetchone():
return {"last_check": None, "total_checks": 0, "errors": 0, "warnings": 0, "ok": 0, "checks": []}
cursor = await db.execute("""
SELECT
h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
h.check_type, h.status, h.message, h.details, h.checked_at
FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
ORDER BY
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
s.name
""")
checks = [dict(row) for row in await cursor.fetchall()]
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks")
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}
@router.get("/suggestions")
async def get_suggestions(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'"
)
if not await cursor.fetchone():
return []
cursor = await db.execute("""
SELECT * FROM source_suggestions
ORDER BY
CASE status WHEN 'pending' THEN 0 ELSE 1 END,
created_at DESC
LIMIT 50
""")
return [dict(row) for row in await cursor.fetchall()]
class SuggestionAction(BaseModel):
accept: bool
@router.put("/suggestions/{suggestion_id}")
async def update_suggestion(
suggestion_id: int,
action: SuggestionAction,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Vorschlag annehmen oder ablehnen."""
import json as _json
cursor = await db.execute(
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,)
)
suggestion = await cursor.fetchone()
if not suggestion:
raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden")
suggestion = dict(suggestion)
if suggestion["status"] != "pending":
raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}")
new_status = "accepted" if action.accept else "rejected"
result_action = None
if action.accept:
stype = suggestion["suggestion_type"]
data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {}
if stype == "add_source":
name = data.get("name", "Unbenannt")
url = data.get("url")
domain = data.get("domain", "")
category = data.get("category", "sonstige")
source_type = "rss_feed" if url and any(
x in (url or "").lower() for x in ("rss", "feed", "xml", "atom")
) else "web_source"
if url:
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,)
)
if await cursor.fetchone():
result_action = "übersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
"INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) "
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
(name, url, domain, source_type, category),
)
result_action = f"Quelle '{name}' angelegt"
else:
result_action = "übersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,))
result_action = "Quelle deaktiviert"
elif stype == "remove_source":
source_id = suggestion["source_id"]
if source_id:
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
result_action = "Quelle gelöscht"
elif stype == "fix_url":
source_id = suggestion["source_id"]
new_url = data.get("url")
if source_id and new_url:
await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id))
result_action = f"URL aktualisiert"
# Auto-Reject: Wenn fix_url oder add_source akzeptiert wird,
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
if stype in ("fix_url", "add_source") and suggestion.get("source_id"):
await db.execute(
"UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP "
"WHERE source_id = ? AND suggestion_type = 'deactivate_source' AND status = 'pending' AND id != ?",
(suggestion["source_id"], suggestion_id),
)
await db.execute(
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_status, suggestion_id),
)
await db.commit()
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="source",
resource_id=suggestion.get("source_id"),
before={"suggestion_id": suggestion_id, "status": "pending"},
after={"suggestion_id": suggestion_id, "status": new_status,
"result_action": result_action},
)
return {"status": new_status, "action": result_action}
@router.post("/health/run")
async def run_health_check_now(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check manuell starten."""
# source_health und source_suggester importieren
from shared.services.source_health import run_health_checks
from shared.services.source_suggester import generate_suggestions
result = await run_health_checks(db)
suggestion_count = await generate_suggestions(db)
return {
"checked": result["checked"],
"issues": result["issues"],
"suggestions": suggestion_count,
}
@router.post("/health/run-stream")
async def run_health_check_stream(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check mit Fortschrittsanzeige (SSE-Stream)."""
import json as _json
import asyncio
from urllib.parse import urlparse
# Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active'" # tenant + global
)
sources = [dict(row) for row in await cursor.fetchall()]
sources_with_url = [s for s in sources if s["url"]]
total = len(sources_with_url)
async def generate():
import httpx
import feedparser
# Phase 1: Erreichbarkeit
yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n"
# Bisherigen Stand archivieren, dann frisch
run_id = uuid.uuid4().hex[:12]
await db.execute(
"INSERT INTO source_health_history "
"(run_id, source_id, check_type, status, message, details, checked_at) "
"SELECT ?, source_id, check_type, status, message, details, checked_at "
"FROM source_health_checks",
(run_id,),
)
await db.execute("DELETE FROM source_health_checks")
await db.commit()
issues_found = 0
checked = 0
async with httpx.AsyncClient(
timeout=HEALTH_CHECK_TIMEOUT_S, follow_redirects=True,
headers={"User-Agent": HEALTH_CHECK_USER_AGENT},
) as client:
for source in sources_with_url:
try:
checks = []
try:
resp = await client.get(source["url"])
if resp.status_code >= 400:
checks.append({"type": "reachability", "status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar"})
else:
checks.append({"type": "reachability", "status": "ok", "message": "Erreichbar"})
if source["source_type"] in ("rss_feed", "podcast_feed"):
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({"type": "feed_validity", "status": "error",
"message": "Kein RSS/Atom-Feed"})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({"type": "feed_validity", "status": "error",
"message": "Feed fehlerhaft"})
elif not feed.entries:
checks.append({"type": "feed_validity", "status": "warning",
"message": "Feed leer"})
else:
checks.append({"type": "feed_validity", "status": "ok",
"message": f"Feed OK ({len(feed.entries)} Eintr.)"})
except httpx.TimeoutException:
checks.append({"type": "reachability", "status": "error", "message": "Timeout (15s)"})
except httpx.ConnectError as e:
checks.append({"type": "reachability", "status": "error", "message": f"Verbindung fehlgeschlagen"})
except Exception as e:
checks.append({"type": "reachability", "status": "error", "message": f"{type(e).__name__}"})
for c in checks:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, ?, ?, ?)",
(source["id"], c["type"], c["status"], c["message"]))
if c["status"] != "ok":
issues_found += 1
except Exception:
pass
checked += 1
status_icon = "ok"
if any(c["status"] == "error" for c in checks):
status_icon = "error"
elif any(c["status"] == "warning" for c in checks):
status_icon = "warning"
yield f"data: {_json.dumps({'phase': 'check', 'checked': checked, 'total': total, 'current': source['name'], 'status': status_icon})}\n\n"
# Stale + Duplikate (schnell, kein Fortschritt noetig)
for source in sources:
if source["source_type"] in ("excluded", "web_source"):
continue
article_count = source.get("article_count") or 0
if article_count == 0:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', 'Noch nie Artikel geliefert')",
(source["id"],))
issues_found += 1
elif source.get("last_seen_at"):
try:
from datetime import datetime
last_dt = datetime.fromisoformat(source["last_seen_at"])
age = (datetime.now() - last_dt).days
if age > 30:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', ?)",
(source["id"], f"Letzter Artikel vor {age} Tagen"))
issues_found += 1
except (ValueError, TypeError):
pass
# Duplikate
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'duplicate', 'warning', ?)",
(s["id"], f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})"))
issues_found += 1
else:
url_map[url_norm] = s
await db.commit()
# Phase 2: Vorschlaege
yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n"
from shared.services.source_suggester import generate_suggestions
suggestion_count = await generate_suggestions(db)
# Fertig
yield f"data: {_json.dumps({'phase': 'done', 'checked': checked, 'total': total, 'issues': issues_found, 'suggestions': suggestion_count})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.post("/health/search-fix/{source_id}")
async def search_fix_for_source(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen."""
import json as _json
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, category FROM sources WHERE id = ?",
(source_id,),
)
source = await cursor.fetchone()
if not source:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
source = dict(source)
# Health-Check-Probleme für diese Quelle laden
cursor = await db.execute(
"SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?",
(source_id,),
)
issues = [dict(row) for row in await cursor.fetchall()]
issues_text = "\n".join(f"- {i['check_type']}: {i['status']} - {i['message']}" for i in issues)
prompt = f"""Du bist ein OSINT-Analyst. Folgende Quelle ist nicht mehr erreichbar:
Name: {source['name']}
URL: {source['url'] or 'keine'}
Domain: {source['domain'] or 'unbekannt'}
Typ: {source['source_type']}
Kategorie: {source['category']}
Probleme:
{issues_text}
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
Regeln:
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
Antworte NUR mit einem JSON-Objekt:
{{
"fixable": true/false,
"solutions": [
{{
"type": "replace_url|add_feed|deactivate",
"name": "Anzeigename",
"url": "https://...",
"description": "Kurze Begründung"
}}
],
"summary": "Zusammenfassung in 1-2 Sätzen"
}}
Nur das JSON, kein anderer Text."""
from shared.agents.claude_client import call_claude
try:
response, usage = await call_claude(prompt, tools="WebSearch,WebFetch")
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
result = _json.loads(json_match.group(0))
else:
result = {"fixable": False, "solutions": [], "summary": response[:500]}
# Lösungen als Vorschläge speichern
for sol in result.get("solutions", []):
sol_type = sol.get("type", "add_feed")
suggestion_type = {
"replace_url": "fix_url",
"add_feed": "add_source",
"deactivate": "deactivate_source",
}.get(sol_type, "add_source")
title = f"{source['name']}: {sol.get('description', sol_type)[:120]}"
# Duplikat-Check: gleicher Typ + gleiche Quelle bereits pending?
cursor = await db.execute(
"SELECT id FROM source_suggestions WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
(suggestion_type, source_id),
)
if await cursor.fetchone():
continue
data = _json.dumps({
"name": sol.get("name", source["name"]),
"url": sol.get("url", ""),
"domain": source["domain"] or "",
"category": source["category"],
}, ensure_ascii=False)
await db.execute(
"INSERT INTO source_suggestions "
"(suggestion_type, title, description, source_id, suggested_data, priority, status) "
"VALUES (?, ?, ?, ?, ?, 'high', 'pending')",
(suggestion_type, title, sol.get("description", ""), source_id, data),
)
await db.commit()
result["cost_usd"] = usage.cost_usd
result["tokens"] = {"input": usage.input_tokens, "output": usage.output_tokens}
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}")