"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" import asyncio import logging import json from urllib.parse import urlparse import httpx import feedparser import aiosqlite logger = logging.getLogger("osint.source_health") async def run_health_checks(db: aiosqlite.Connection) -> dict: """Führt alle Health-Checks für aktive Grundquellen durch.""" logger.info("Starte Quellen-Health-Check...") # Alle aktiven Grundquellen laden cursor = await db.execute( "SELECT id, name, url, domain, source_type, article_count, last_seen_at " "FROM sources WHERE status = 'active' AND tenant_id IS NULL" ) sources = [dict(row) for row in await cursor.fetchall()] # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) await db.execute("DELETE FROM source_health_checks") await db.commit() checks_done = 0 issues_found = 0 # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) sources_with_url = [s for s in sources if s["url"]] async with httpx.AsyncClient( timeout=15.0, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, ) as client: for i in range(0, len(sources_with_url), 5): batch = sources_with_url[i:i + 5] tasks = [_check_source_reachability(client, s) for s in batch] results = await asyncio.gather(*tasks, return_exceptions=True) for source, result in zip(batch, results): if isinstance(result, Exception): await _save_check( db, source["id"], "reachability", "error", f"Prüfung fehlgeschlagen: {result}", ) issues_found += 1 else: for check in result: await _save_check( db, source["id"], check["type"], check["status"], check["message"], check.get("details"), ) if check["status"] != "ok": issues_found += 1 checks_done += 1 # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) for source in sources: if source["source_type"] in ("excluded", "web_source"): continue stale_check = _check_stale(source) if stale_check: await _save_check( db, source["id"], stale_check["type"], stale_check["status"], stale_check["message"], ) if stale_check["status"] != "ok": issues_found += 1 # 3. Duplikate erkennen duplicates = _find_duplicates(sources) for dup in duplicates: await _save_check( db, dup["source_id"], "duplicate", "warning", dup["message"], json.dumps(dup.get("details", {})), ) issues_found += 1 await db.commit() logger.info( f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " f"{issues_found} Probleme gefunden" ) return {"checked": checks_done, "issues": issues_found} async def _check_source_reachability( client: httpx.AsyncClient, source: dict, ) -> list[dict]: """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" checks = [] url = source["url"] try: resp = await client.get(url) if resp.status_code >= 400: checks.append({ "type": "reachability", "status": "error", "message": f"HTTP {resp.status_code} - nicht erreichbar", "details": json.dumps({"status_code": resp.status_code, "url": url}), }) return checks if resp.status_code >= 300: checks.append({ "type": "reachability", "status": "warning", "message": f"HTTP {resp.status_code} - Weiterleitung", "details": json.dumps({ "status_code": resp.status_code, "final_url": str(resp.url), }), }) else: checks.append({ "type": "reachability", "status": "ok", "message": "Erreichbar", }) # Feed-Validität nur für RSS-Feeds if source["source_type"] == "rss_feed": text = resp.text[:20000] if " dict | None: """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" if source["source_type"] == "excluded": return None article_count = source.get("article_count") or 0 last_seen = source.get("last_seen_at") if article_count == 0: return { "type": "stale", "status": "warning", "message": "Noch nie Artikel geliefert", } if last_seen: try: from datetime import datetime last_dt = datetime.fromisoformat(last_seen) now = datetime.now() age_days = (now - last_dt).days if age_days > 30: return { "type": "stale", "status": "warning", "message": f"Letzter Artikel vor {age_days} Tagen", } except (ValueError, TypeError): pass return None def _find_duplicates(sources: list[dict]) -> list[dict]: """Findet doppelte Quellen (gleiche URL).""" duplicates = [] url_map = {} for s in sources: if not s["url"]: continue url_norm = s["url"].lower().rstrip("/") if url_norm in url_map: existing = url_map[url_norm] duplicates.append({ "source_id": s["id"], "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", "details": {"duplicate_of": existing["id"], "type": "url"}, }) else: url_map[url_norm] = s return duplicates async def _save_check( db: aiosqlite.Connection, source_id: int, check_type: str, status: str, message: str, details: str = None, ): """Speichert ein Health-Check-Ergebnis.""" await db.execute( "INSERT INTO source_health_checks " "(source_id, check_type, status, message, details) " "VALUES (?, ?, ?, ?, ?)", (source_id, check_type, status, message, details), ) async def get_health_summary(db: aiosqlite.Connection) -> dict: """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" cursor = await db.execute(""" SELECT h.id, h.source_id, s.name, s.domain, s.url, s.source_type, h.check_type, h.status, h.message, h.details, h.checked_at FROM source_health_checks h JOIN sources s ON s.id = h.source_id ORDER BY CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, s.name """) checks = [dict(row) for row in await cursor.fetchall()] error_count = sum(1 for c in checks if c["status"] == "error") warning_count = sum(1 for c in checks if c["status"] == "warning") ok_count = sum(1 for c in checks if c["status"] == "ok") cursor = await db.execute( "SELECT MAX(checked_at) as last_check FROM source_health_checks" ) row = await cursor.fetchone() last_check = row["last_check"] if row else None return { "last_check": last_check, "total_checks": len(checks), "errors": error_count, "warnings": warning_count, "ok": ok_count, "checks": checks, }