"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" import asyncio import logging import json import uuid from urllib.parse import urlparse import httpx import feedparser import aiosqlite try: from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S except ImportError: HEALTH_CHECK_USER_AGENT = "Mozilla/5.0 (compatible; AegisSight-HealthCheck/1.0)" HEALTH_CHECK_TIMEOUT_S = 15.0 # Phase 18: alternative User-Agents fuer Bot-Block-Bypass USER_AGENT_GOOGLEBOT = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" USER_AGENT_BROWSER = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0 Safari/537.36" ) REMOVEPAYWALLS_PREFIX = "https://www.removepaywall.com/search?url=" # HTTP-Codes, die einen Retry mit anderem UA rechtfertigen RETRY_ON_STATUS = {403, 406, 429} logger = logging.getLogger("osint.source_health") async def run_health_checks(db: aiosqlite.Connection) -> dict: """Führt Health-Checks für alle aktiven Quellen durch (global + Tenant).""" logger.info("Starte Quellen-Health-Check...") # Alle aktiven Quellen laden (global UND Tenant-spezifisch) cursor = await db.execute( "SELECT id, name, url, domain, source_type, article_count, last_seen_at, " "COALESCE(fetch_strategy, 'default') AS fetch_strategy " "FROM sources WHERE status = 'active' " ) sources = [dict(row) for row in await cursor.fetchall()] # Bisherigen Stand in History archivieren, dann frisch starten run_id = uuid.uuid4().hex[:12] await db.execute( "INSERT INTO source_health_history " "(run_id, source_id, check_type, status, message, details, checked_at) " "SELECT ?, source_id, check_type, status, message, details, checked_at " "FROM source_health_checks", (run_id,), ) await db.execute("DELETE FROM source_health_checks") await db.commit() logger.info(f"Health-Check Run {run_id}: vorigen Stand archiviert") checks_done = 0 issues_found = 0 # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) sources_with_url = [s for s in sources if s["url"]] async with httpx.AsyncClient( timeout=HEALTH_CHECK_TIMEOUT_S, follow_redirects=True, headers={"User-Agent": HEALTH_CHECK_USER_AGENT}, ) as client: for i in range(0, len(sources_with_url), 5): batch = sources_with_url[i:i + 5] tasks = [_check_source_reachability(client, s) for s in batch] results = await asyncio.gather(*tasks, return_exceptions=True) for source, result in zip(batch, results): if isinstance(result, Exception): await _save_check( db, source["id"], "reachability", "error", f"Prüfung fehlgeschlagen: {result}", ) issues_found += 1 else: for check in result: await _save_check( db, source["id"], check["type"], check["status"], check["message"], check.get("details"), ) if check["status"] != "ok": issues_found += 1 checks_done += 1 # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) for source in sources: if source["source_type"] in ("excluded", "web_source"): continue stale_check = _check_stale(source) if stale_check: await _save_check( db, source["id"], stale_check["type"], stale_check["status"], stale_check["message"], ) if stale_check["status"] != "ok": issues_found += 1 # 3. Duplikate erkennen duplicates = _find_duplicates(sources) for dup in duplicates: await _save_check( db, dup["source_id"], "duplicate", "warning", dup["message"], json.dumps(dup.get("details", {})), ) issues_found += 1 await db.commit() logger.info( f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " f"{issues_found} Probleme gefunden" ) return {"checked": checks_done, "issues": issues_found} async def _check_source_reachability( client: httpx.AsyncClient, source: dict, ) -> list[dict]: """Prüft Erreichbarkeit und Feed-Validität einer Quelle. Phase 18: pro Quelle eine fetch_strategy ('default' | 'googlebot' | 'paywall' | 'skip'). Bei 'default' wird im Fehlerfall (403/406/429) ein Retry mit Googlebot-UA gemacht. Bei 'paywall' wird auf removepaywall.com umgeleitet. Bei 'skip' wird kein Check ausgeführt. """ checks = [] url = source["url"] strategy = source.get("fetch_strategy") or "default" # 'skip' -> kein Check (bekannte unerreichbare Quellen, z.B. Login-only) if strategy == "skip": checks.append({ "type": "reachability", "status": "ok", "message": "Health-Check uebersprungen (fetch_strategy=skip)", }) return checks # URL-Schema sicherstellen if url and not url.startswith(("http://", "https://")): url = "https://" + url.lstrip("/") # Initialen UA waehlen initial_ua = HEALTH_CHECK_USER_AGENT initial_url = url if strategy == "googlebot": initial_ua = USER_AGENT_GOOGLEBOT elif strategy == "paywall": # Paywall-Quellen: Feed-URL direkt laden, aber mit Browser-UA (versucht Bot-Detection zu umgehen). # removepaywall.com ist fuer Article-URLs, NICHT fuer RSS-Feed-Validity-Checks # (gibt HTML statt XML zurueck). Researcher-Pipeline nutzt removepaywall fuer Inhalte. initial_ua = USER_AGENT_BROWSER try: resp = await client.get(initial_url, headers={"User-Agent": initial_ua}) # Paywall-Quellen: 4xx ist erwartbar (Bot-Detection), als warning markieren statt error if strategy == "paywall" and resp.status_code in RETRY_ON_STATUS: checks.append({ "type": "reachability", "status": "warning", "message": f"Paywall-Quelle, Direkt-Zugang HTTP {resp.status_code} (Researcher-Pipeline nutzt removepaywall.com fuer Inhalte)", }) return checks # Feed-Validity-Check skippen (Paywall liefert kein RSS) # Bot-Block-Retry nur bei strategy='default' if ( strategy == "default" and resp.status_code in RETRY_ON_STATUS ): retry = await client.get(url, headers={"User-Agent": USER_AGENT_GOOGLEBOT}) if retry.status_code < 400: resp = retry # Retry hat geholfen checks.append({ "type": "reachability", "status": "warning", "message": f"Erreichbar nur mit Googlebot-UA (Standard-UA bekam HTTP {initial_url and 'unknown' or 'XXX'})", }) if resp.status_code >= 400: checks.append({ "type": "reachability", "status": "error", "message": f"HTTP {resp.status_code} - nicht erreichbar", "details": json.dumps({"status_code": resp.status_code, "url": url}), }) return checks if resp.status_code >= 300: checks.append({ "type": "reachability", "status": "warning", "message": f"HTTP {resp.status_code} - Weiterleitung", "details": json.dumps({ "status_code": resp.status_code, "final_url": str(resp.url), }), }) else: checks.append({ "type": "reachability", "status": "ok", "message": "Erreichbar", }) # Feed-Validität nur für RSS-Feeds if source["source_type"] == "rss_feed": text = resp.text[:20000] if " dict | None: """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" if source["source_type"] == "excluded": return None article_count = source.get("article_count") or 0 last_seen = source.get("last_seen_at") if article_count == 0: return { "type": "stale", "status": "warning", "message": "Noch nie Artikel geliefert", } if last_seen: try: from datetime import datetime last_dt = datetime.fromisoformat(last_seen) now = datetime.now() age_days = (now - last_dt).days if age_days > 30: return { "type": "stale", "status": "warning", "message": f"Letzter Artikel vor {age_days} Tagen", } except (ValueError, TypeError): pass return None def _find_duplicates(sources: list[dict]) -> list[dict]: """Findet doppelte Quellen (gleiche URL).""" duplicates = [] url_map = {} for s in sources: if not s["url"]: continue url_norm = s["url"].lower().rstrip("/") if url_norm in url_map: existing = url_map[url_norm] duplicates.append({ "source_id": s["id"], "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", "details": {"duplicate_of": existing["id"], "type": "url"}, }) else: url_map[url_norm] = s return duplicates async def _save_check( db: aiosqlite.Connection, source_id: int, check_type: str, status: str, message: str, details: str = None, ): """Speichert ein Health-Check-Ergebnis.""" await db.execute( "INSERT INTO source_health_checks " "(source_id, check_type, status, message, details) " "VALUES (?, ?, ?, ?, ?)", (source_id, check_type, status, message, details), ) async def get_health_summary(db: aiosqlite.Connection) -> dict: """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" cursor = await db.execute(""" SELECT h.id, h.source_id, s.name, s.domain, s.url, s.source_type, h.check_type, h.status, h.message, h.details, h.checked_at FROM source_health_checks h JOIN sources s ON s.id = h.source_id ORDER BY CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, s.name """) checks = [dict(row) for row in await cursor.fetchall()] error_count = sum(1 for c in checks if c["status"] == "error") warning_count = sum(1 for c in checks if c["status"] == "warning") ok_count = sum(1 for c in checks if c["status"] == "ok") cursor = await db.execute( "SELECT MAX(checked_at) as last_check FROM source_health_checks" ) row = await cursor.fetchone() last_check = row["last_check"] if row else None return { "last_check": last_check, "total_checks": len(checks), "errors": error_count, "warnings": warning_count, "ok": ok_count, "checks": checks, }