diff --git a/src/services/source_health.py b/src/services/source_health.py index e6ee799..0f073c9 100644 --- a/src/services/source_health.py +++ b/src/services/source_health.py @@ -1,282 +1,282 @@ -"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" -import asyncio -import logging -import json -from urllib.parse import urlparse - -import httpx -import feedparser -import aiosqlite - -logger = logging.getLogger("osint.source_health") - - -async def run_health_checks(db: aiosqlite.Connection) -> dict: - """Führt alle Health-Checks für aktive Grundquellen durch.""" - logger.info("Starte Quellen-Health-Check...") - - # Alle aktiven Grundquellen laden - cursor = await db.execute( - "SELECT id, name, url, domain, source_type, article_count, last_seen_at " - "FROM sources WHERE status = 'active' AND tenant_id IS NULL" - ) - sources = [dict(row) for row in await cursor.fetchall()] - - # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) - await db.execute("DELETE FROM source_health_checks") - await db.commit() - - checks_done = 0 - issues_found = 0 - - # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) - sources_with_url = [s for s in sources if s["url"]] - - async with httpx.AsyncClient( - timeout=15.0, - follow_redirects=True, - headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, - ) as client: - for i in range(0, len(sources_with_url), 5): - batch = sources_with_url[i:i + 5] - tasks = [_check_source_reachability(client, s) for s in batch] - results = await asyncio.gather(*tasks, return_exceptions=True) - - for source, result in zip(batch, results): - if isinstance(result, Exception): - await _save_check( - db, source["id"], "reachability", "error", - f"Prüfung fehlgeschlagen: {result}", - ) - issues_found += 1 - else: - for check in result: - await _save_check( - db, source["id"], check["type"], check["status"], - check["message"], check.get("details"), - ) - if check["status"] != "ok": - issues_found += 1 - checks_done += 1 - - # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) - for source in sources: - if source["source_type"] in ("excluded", "web_source"): - continue - stale_check = _check_stale(source) - if stale_check: - await _save_check( - db, source["id"], stale_check["type"], - stale_check["status"], stale_check["message"], - ) - if stale_check["status"] != "ok": - issues_found += 1 - - # 3. Duplikate erkennen - duplicates = _find_duplicates(sources) - for dup in duplicates: - await _save_check( - db, dup["source_id"], "duplicate", "warning", - dup["message"], json.dumps(dup.get("details", {})), - ) - issues_found += 1 - - await db.commit() - logger.info( - f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " - f"{issues_found} Probleme gefunden" - ) - return {"checked": checks_done, "issues": issues_found} - - -async def _check_source_reachability( - client: httpx.AsyncClient, source: dict, -) -> list[dict]: - """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" - checks = [] - url = source["url"] - - try: - resp = await client.get(url) - - if resp.status_code >= 400: - checks.append({ - "type": "reachability", - "status": "error", - "message": f"HTTP {resp.status_code} - nicht erreichbar", - "details": json.dumps({"status_code": resp.status_code, "url": url}), - }) - return checks - - if resp.status_code >= 300: - checks.append({ - "type": "reachability", - "status": "warning", - "message": f"HTTP {resp.status_code} - Weiterleitung", - "details": json.dumps({ - "status_code": resp.status_code, - "final_url": str(resp.url), - }), - }) - else: - checks.append({ - "type": "reachability", - "status": "ok", - "message": "Erreichbar", - }) - - # Feed-Validität nur für RSS-Feeds - if source["source_type"] == "rss_feed": - text = resp.text[:20000] - if " dict | None: - """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" - if source["source_type"] == "excluded": - return None - - article_count = source.get("article_count") or 0 - last_seen = source.get("last_seen_at") - - if article_count == 0: - return { - "type": "stale", - "status": "warning", - "message": "Noch nie Artikel geliefert", - } - - if last_seen: - try: - from datetime import datetime - last_dt = datetime.fromisoformat(last_seen) - now = datetime.now() - age_days = (now - last_dt).days - if age_days > 30: - return { - "type": "stale", - "status": "warning", - "message": f"Letzter Artikel vor {age_days} Tagen", - } - except (ValueError, TypeError): - pass - - return None - - -def _find_duplicates(sources: list[dict]) -> list[dict]: - """Findet doppelte Quellen (gleiche URL).""" - duplicates = [] - url_map = {} - - for s in sources: - if not s["url"]: - continue - url_norm = s["url"].lower().rstrip("/") - if url_norm in url_map: - existing = url_map[url_norm] - duplicates.append({ - "source_id": s["id"], - "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", - "details": {"duplicate_of": existing["id"], "type": "url"}, - }) - else: - url_map[url_norm] = s - - return duplicates - - -async def _save_check( - db: aiosqlite.Connection, source_id: int, check_type: str, - status: str, message: str, details: str = None, -): - """Speichert ein Health-Check-Ergebnis.""" - await db.execute( - "INSERT INTO source_health_checks " - "(source_id, check_type, status, message, details) " - "VALUES (?, ?, ?, ?, ?)", - (source_id, check_type, status, message, details), - ) - - -async def get_health_summary(db: aiosqlite.Connection) -> dict: - """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" - cursor = await db.execute(""" - SELECT - h.id, h.source_id, s.name, s.domain, s.url, s.source_type, - h.check_type, h.status, h.message, h.details, h.checked_at - FROM source_health_checks h - JOIN sources s ON s.id = h.source_id - ORDER BY - CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, - s.name - """) - checks = [dict(row) for row in await cursor.fetchall()] - - error_count = sum(1 for c in checks if c["status"] == "error") - warning_count = sum(1 for c in checks if c["status"] == "warning") - ok_count = sum(1 for c in checks if c["status"] == "ok") - - cursor = await db.execute( - "SELECT MAX(checked_at) as last_check FROM source_health_checks" - ) - row = await cursor.fetchone() - last_check = row["last_check"] if row else None - - return { - "last_check": last_check, - "total_checks": len(checks), - "errors": error_count, - "warnings": warning_count, - "ok": ok_count, - "checks": checks, - } +"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" +import asyncio +import logging +import json +from urllib.parse import urlparse + +import httpx +import feedparser +import aiosqlite + +logger = logging.getLogger("osint.source_health") + + +async def run_health_checks(db: aiosqlite.Connection) -> dict: + """Führt alle Health-Checks für aktive Grundquellen durch.""" + logger.info("Starte Quellen-Health-Check...") + + # Alle aktiven Grundquellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, article_count, last_seen_at " + "FROM sources WHERE status = 'active' AND tenant_id IS NULL" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) + await db.execute("DELETE FROM source_health_checks") + await db.commit() + + checks_done = 0 + issues_found = 0 + + # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) + sources_with_url = [s for s in sources if s["url"]] + + async with httpx.AsyncClient( + timeout=15.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + ) as client: + for i in range(0, len(sources_with_url), 5): + batch = sources_with_url[i:i + 5] + tasks = [_check_source_reachability(client, s) for s in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for source, result in zip(batch, results): + if isinstance(result, Exception): + await _save_check( + db, source["id"], "reachability", "error", + f"Prüfung fehlgeschlagen: {result}", + ) + issues_found += 1 + else: + for check in result: + await _save_check( + db, source["id"], check["type"], check["status"], + check["message"], check.get("details"), + ) + if check["status"] != "ok": + issues_found += 1 + checks_done += 1 + + # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) + for source in sources: + if source["source_type"] in ("excluded", "web_source"): + continue + stale_check = _check_stale(source) + if stale_check: + await _save_check( + db, source["id"], stale_check["type"], + stale_check["status"], stale_check["message"], + ) + if stale_check["status"] != "ok": + issues_found += 1 + + # 3. Duplikate erkennen + duplicates = _find_duplicates(sources) + for dup in duplicates: + await _save_check( + db, dup["source_id"], "duplicate", "warning", + dup["message"], json.dumps(dup.get("details", {})), + ) + issues_found += 1 + + await db.commit() + logger.info( + f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " + f"{issues_found} Probleme gefunden" + ) + return {"checked": checks_done, "issues": issues_found} + + +async def _check_source_reachability( + client: httpx.AsyncClient, source: dict, +) -> list[dict]: + """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" + checks = [] + url = source["url"] + + try: + resp = await client.get(url) + + if resp.status_code >= 400: + checks.append({ + "type": "reachability", + "status": "error", + "message": f"HTTP {resp.status_code} - nicht erreichbar", + "details": json.dumps({"status_code": resp.status_code, "url": url}), + }) + return checks + + if resp.status_code >= 300: + checks.append({ + "type": "reachability", + "status": "warning", + "message": f"HTTP {resp.status_code} - Weiterleitung", + "details": json.dumps({ + "status_code": resp.status_code, + "final_url": str(resp.url), + }), + }) + else: + checks.append({ + "type": "reachability", + "status": "ok", + "message": "Erreichbar", + }) + + # Feed-Validität nur für RSS-Feeds + if source["source_type"] == "rss_feed": + text = resp.text[:20000] + if " dict | None: + """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" + if source["source_type"] == "excluded": + return None + + article_count = source.get("article_count") or 0 + last_seen = source.get("last_seen_at") + + if article_count == 0: + return { + "type": "stale", + "status": "warning", + "message": "Noch nie Artikel geliefert", + } + + if last_seen: + try: + from datetime import datetime + last_dt = datetime.fromisoformat(last_seen) + now = datetime.now() + age_days = (now - last_dt).days + if age_days > 30: + return { + "type": "stale", + "status": "warning", + "message": f"Letzter Artikel vor {age_days} Tagen", + } + except (ValueError, TypeError): + pass + + return None + + +def _find_duplicates(sources: list[dict]) -> list[dict]: + """Findet doppelte Quellen (gleiche URL).""" + duplicates = [] + url_map = {} + + for s in sources: + if not s["url"]: + continue + url_norm = s["url"].lower().rstrip("/") + if url_norm in url_map: + existing = url_map[url_norm] + duplicates.append({ + "source_id": s["id"], + "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", + "details": {"duplicate_of": existing["id"], "type": "url"}, + }) + else: + url_map[url_norm] = s + + return duplicates + + +async def _save_check( + db: aiosqlite.Connection, source_id: int, check_type: str, + status: str, message: str, details: str = None, +): + """Speichert ein Health-Check-Ergebnis.""" + await db.execute( + "INSERT INTO source_health_checks " + "(source_id, check_type, status, message, details) " + "VALUES (?, ?, ?, ?, ?)", + (source_id, check_type, status, message, details), + ) + + +async def get_health_summary(db: aiosqlite.Connection) -> dict: + """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" + cursor = await db.execute(""" + SELECT + h.id, h.source_id, s.name, s.domain, s.url, s.source_type, + h.check_type, h.status, h.message, h.details, h.checked_at + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + ORDER BY + CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, + s.name + """) + checks = [dict(row) for row in await cursor.fetchall()] + + error_count = sum(1 for c in checks if c["status"] == "error") + warning_count = sum(1 for c in checks if c["status"] == "warning") + ok_count = sum(1 for c in checks if c["status"] == "ok") + + cursor = await db.execute( + "SELECT MAX(checked_at) as last_check FROM source_health_checks" + ) + row = await cursor.fetchone() + last_check = row["last_check"] if row else None + + return { + "last_check": last_check, + "total_checks": len(checks), + "errors": error_count, + "warnings": warning_count, + "ok": ok_count, + "checks": checks, + } diff --git a/src/services/source_suggester.py b/src/services/source_suggester.py index ed7be67..2e41937 100644 --- a/src/services/source_suggester.py +++ b/src/services/source_suggester.py @@ -1,4 +1,4 @@ -"""KI-gestützte Quellen-Vorschläge via Haiku.""" +"""KI-gestützte Quellen-Vorschläge via Haiku.""" import json import logging import re @@ -12,8 +12,8 @@ logger = logging.getLogger("osint.source_suggester") async def generate_suggestions(db: aiosqlite.Connection) -> int: - """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" - logger.info("Starte Quellen-Vorschläge via Haiku...") + """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" + logger.info("Starte Quellen-Vorschläge via Haiku...") # 1. Aktuelle Quellen laden cursor = await db.execute( @@ -33,13 +33,13 @@ async def generate_suggestions(db: aiosqlite.Connection) -> int: """) issues = [dict(row) for row in await cursor.fetchall()] - # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) + # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) await db.execute( "DELETE FROM source_suggestions " "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" ) - # 4. Quellen-Zusammenfassung für Haiku + # 4. Quellen-Zusammenfassung für Haiku categories = {} for s in sources: cat = s["category"] @@ -67,7 +67,7 @@ async def generate_suggestions(db: aiosqlite.Connection) -> int: f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" ) - prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. + prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. Aktuelle Quellensammlung:{source_summary}{issues_summary} @@ -78,13 +78,13 @@ Beachte: 2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor 3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen 4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind -5. Maximal 5 Vorschläge +5. Maximal 5 Vorschläge Antworte NUR mit einem JSON-Array. Jedes Element: {{ "type": "add_source|deactivate_source|fix_url|remove_source", "title": "Kurzer Titel", - "description": "Begründung", + "description": "Begründung", "priority": "low|medium|high", "source_id": null, "data": {{ @@ -104,7 +104,7 @@ Nur das JSON-Array, kein anderer Text.""" json_match = re.search(r'\[.*\]', response, re.DOTALL) if not json_match: - logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") + logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") return 0 suggestions = json.loads(json_match.group(0)) @@ -164,14 +164,14 @@ Nur das JSON-Array, kein anderer Text.""" await db.commit() logger.info( - f"Quellen-Vorschläge: {count} neue Vorschläge generiert " + f"Quellen-Vorschläge: {count} neue Vorschläge generiert " f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " f"${usage.cost_usd:.4f})" ) return count except Exception as e: - logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) + logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) return 0 @@ -218,7 +218,7 @@ async def apply_suggestion( (url,), ) if await cursor.fetchone(): - result["action"] = "übersprungen (URL bereits vorhanden)" + result["action"] = "übersprungen (URL bereits vorhanden)" new_status = "rejected" else: await db.execute( @@ -230,7 +230,7 @@ async def apply_suggestion( ) result["action"] = f"Quelle '{name}' angelegt" else: - result["action"] = "übersprungen (keine URL)" + result["action"] = "übersprungen (keine URL)" new_status = "rejected" elif stype == "deactivate_source": @@ -242,7 +242,7 @@ async def apply_suggestion( ) result["action"] = "Quelle deaktiviert" else: - result["action"] = "übersprungen (keine source_id)" + result["action"] = "übersprungen (keine source_id)" elif stype == "remove_source": source_id = suggestion["source_id"] @@ -250,9 +250,9 @@ async def apply_suggestion( await db.execute( "DELETE FROM sources WHERE id = ?", (source_id,), ) - result["action"] = "Quelle gelöscht" + result["action"] = "Quelle gelöscht" else: - result["action"] = "übersprungen (keine source_id)" + result["action"] = "übersprungen (keine source_id)" elif stype == "fix_url": source_id = suggestion["source_id"] @@ -264,7 +264,7 @@ async def apply_suggestion( ) result["action"] = f"URL aktualisiert auf {new_url}" else: - result["action"] = "übersprungen (keine source_id oder URL)" + result["action"] = "übersprungen (keine source_id oder URL)" await db.execute( "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP "