From ca4422ccd16a342dea8967c2550c2932442cd36a Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sat, 9 May 2026 02:56:49 +0000 Subject: [PATCH] =?UTF-8?q?Phase=202=20Health-Check=20tenant-f=C3=A4hig=20?= =?UTF-8?q?+=20Historie?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - migrations/2026-05-09d_source_health_history.py NEU: source_health_history-Tabelle (Append-only Verlauf der Health-Check-Runs mit run_id und archived_at) - shared/services/source_health.py: - tenant_id IS NULL Filter raus -> auch Tenant-Quellen werden gecheckt - Mojibake (Triple-Encoded UTF-8) via ftfy gefixt - DELETE FROM source_health_checks: vorher Stand mit run_id (uuid4) in source_health_history archivieren -> kein Datenverlust mehr - User-Agent + Timeout aus config.HEALTH_CHECK_* statt hardcoded - routers/sources.py /health/run-stream: gleiche Änderungen wie oben - config.py: HEALTH_CHECK_USER_AGENT + HEALTH_CHECK_TIMEOUT_S ergänzt --- .../2026-05-09d_source_health_history.py | 57 ++ src/config.py | 7 + src/routers/sources.py | 17 +- src/shared/services/source_health.py | 580 +++++++++--------- 4 files changed, 376 insertions(+), 285 deletions(-) create mode 100644 migrations/2026-05-09d_source_health_history.py diff --git a/migrations/2026-05-09d_source_health_history.py b/migrations/2026-05-09d_source_health_history.py new file mode 100644 index 0000000..dd1cda0 --- /dev/null +++ b/migrations/2026-05-09d_source_health_history.py @@ -0,0 +1,57 @@ +"""Migration 2026-05-09d: source_health_history (Verlauf der Health-Checks). + +Bislang wurde vor jedem Health-Check-Run die Tabelle source_health_checks geleert +(DELETE FROM source_health_checks). Damit ging die Historie verloren - kein +Trend, keine Vergleichsmöglichkeit über Runs. + +Diese Migration legt eine reine Append-Tabelle source_health_history an. +Vor jedem Health-Check-Run wird der aktuelle Stand von source_health_checks +hier archiviert (mit run_id und archived_at). + +Ausführung: + DB_PATH=/home/claude-dev/osint-data/osint.db python3 migrations/2026-05-09d_source_health_history.py + DB_PATH=/home/claude-dev/AegisSight-Monitor-staging/data/osint.db python3 migrations/2026-05-09d_source_health_history.py +""" +import os +import sqlite3 +import sys + + +def main(db_path: str) -> int: + if not os.path.exists(db_path): + print(f"FEHLER: DB nicht gefunden: {db_path}", file=sys.stderr) + return 1 + + conn = sqlite3.connect(db_path, timeout=60) + conn.execute("PRAGMA busy_timeout = 60000") + conn.execute("PRAGMA journal_mode = WAL") + + print(f"Migration auf {db_path}") + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS source_health_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL, + source_id INTEGER NOT NULL, + check_type TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + details TEXT, + checked_at TIMESTAMP, + archived_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_source_health_history_run ON source_health_history(run_id); + CREATE INDEX IF NOT EXISTS idx_source_health_history_source ON source_health_history(source_id, archived_at DESC); + CREATE INDEX IF NOT EXISTS idx_source_health_history_status ON source_health_history(status, archived_at DESC); + """) + print(" + source_health_history + Indizes (idempotent)") + + conn.commit() + conn.close() + print("Migration abgeschlossen.") + return 0 + + +if __name__ == "__main__": + db_path = os.environ.get("DB_PATH", "/home/claude-dev/osint-data/osint.db") + sys.exit(main(db_path)) diff --git a/src/config.py b/src/config.py index ffa2942..7da715b 100644 --- a/src/config.py +++ b/src/config.py @@ -48,3 +48,10 @@ MAX_FEEDS_PER_DOMAIN = 3 CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001" CLAUDE_MODEL_MEDIUM = "claude-sonnet-4-6" CLAUDE_MODEL_STANDARD = "claude-opus-4-7" + +# Health-Check (genutzt von shared/services/source_health.py + routers/sources.py) +HEALTH_CHECK_USER_AGENT = os.environ.get( + "HEALTH_CHECK_USER_AGENT", + "Mozilla/5.0 (compatible; AegisSight-HealthCheck/1.0)", +) +HEALTH_CHECK_TIMEOUT_S = float(os.environ.get("HEALTH_CHECK_TIMEOUT_S", "15.0")) diff --git a/src/routers/sources.py b/src/routers/sources.py index e483283..20d3f9b 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,5 +1,6 @@ """Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" import logging +import uuid from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.responses import StreamingResponse @@ -10,6 +11,7 @@ import aiosqlite from auth import get_current_admin from database import db_dependency from audit import log_action, get_client_ip, row_to_dict +from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S from shared.source_rules import ( discover_source, discover_all_feeds, @@ -564,7 +566,7 @@ async def run_health_check_stream( # Quellen laden cursor = await db.execute( "SELECT id, name, url, domain, source_type, article_count, last_seen_at " - "FROM sources WHERE status = 'active' AND tenant_id IS NULL" + "FROM sources WHERE status = 'active'" # tenant + global ) sources = [dict(row) for row in await cursor.fetchall()] sources_with_url = [s for s in sources if s["url"]] @@ -577,6 +579,15 @@ async def run_health_check_stream( # Phase 1: Erreichbarkeit yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n" + # Bisherigen Stand archivieren, dann frisch + run_id = uuid.uuid4().hex[:12] + await db.execute( + "INSERT INTO source_health_history " + "(run_id, source_id, check_type, status, message, details, checked_at) " + "SELECT ?, source_id, check_type, status, message, details, checked_at " + "FROM source_health_checks", + (run_id,), + ) await db.execute("DELETE FROM source_health_checks") await db.commit() @@ -584,8 +595,8 @@ async def run_health_check_stream( checked = 0 async with httpx.AsyncClient( - timeout=15.0, follow_redirects=True, - headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + timeout=HEALTH_CHECK_TIMEOUT_S, follow_redirects=True, + headers={"User-Agent": HEALTH_CHECK_USER_AGENT}, ) as client: for source in sources_with_url: try: diff --git a/src/shared/services/source_health.py b/src/shared/services/source_health.py index e6ee799..e6b1cdd 100644 --- a/src/shared/services/source_health.py +++ b/src/shared/services/source_health.py @@ -1,282 +1,298 @@ -"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" -import asyncio -import logging -import json -from urllib.parse import urlparse - -import httpx -import feedparser -import aiosqlite - -logger = logging.getLogger("osint.source_health") - - -async def run_health_checks(db: aiosqlite.Connection) -> dict: - """Führt alle Health-Checks für aktive Grundquellen durch.""" - logger.info("Starte Quellen-Health-Check...") - - # Alle aktiven Grundquellen laden - cursor = await db.execute( - "SELECT id, name, url, domain, source_type, article_count, last_seen_at " - "FROM sources WHERE status = 'active' AND tenant_id IS NULL" - ) - sources = [dict(row) for row in await cursor.fetchall()] - - # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) - await db.execute("DELETE FROM source_health_checks") - await db.commit() - - checks_done = 0 - issues_found = 0 - - # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) - sources_with_url = [s for s in sources if s["url"]] - - async with httpx.AsyncClient( - timeout=15.0, - follow_redirects=True, - headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, - ) as client: - for i in range(0, len(sources_with_url), 5): - batch = sources_with_url[i:i + 5] - tasks = [_check_source_reachability(client, s) for s in batch] - results = await asyncio.gather(*tasks, return_exceptions=True) - - for source, result in zip(batch, results): - if isinstance(result, Exception): - await _save_check( - db, source["id"], "reachability", "error", - f"Prüfung fehlgeschlagen: {result}", - ) - issues_found += 1 - else: - for check in result: - await _save_check( - db, source["id"], check["type"], check["status"], - check["message"], check.get("details"), - ) - if check["status"] != "ok": - issues_found += 1 - checks_done += 1 - - # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) - for source in sources: - if source["source_type"] in ("excluded", "web_source"): - continue - stale_check = _check_stale(source) - if stale_check: - await _save_check( - db, source["id"], stale_check["type"], - stale_check["status"], stale_check["message"], - ) - if stale_check["status"] != "ok": - issues_found += 1 - - # 3. Duplikate erkennen - duplicates = _find_duplicates(sources) - for dup in duplicates: - await _save_check( - db, dup["source_id"], "duplicate", "warning", - dup["message"], json.dumps(dup.get("details", {})), - ) - issues_found += 1 - - await db.commit() - logger.info( - f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " - f"{issues_found} Probleme gefunden" - ) - return {"checked": checks_done, "issues": issues_found} - - -async def _check_source_reachability( - client: httpx.AsyncClient, source: dict, -) -> list[dict]: - """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" - checks = [] - url = source["url"] - - try: - resp = await client.get(url) - - if resp.status_code >= 400: - checks.append({ - "type": "reachability", - "status": "error", - "message": f"HTTP {resp.status_code} - nicht erreichbar", - "details": json.dumps({"status_code": resp.status_code, "url": url}), - }) - return checks - - if resp.status_code >= 300: - checks.append({ - "type": "reachability", - "status": "warning", - "message": f"HTTP {resp.status_code} - Weiterleitung", - "details": json.dumps({ - "status_code": resp.status_code, - "final_url": str(resp.url), - }), - }) - else: - checks.append({ - "type": "reachability", - "status": "ok", - "message": "Erreichbar", - }) - - # Feed-Validität nur für RSS-Feeds - if source["source_type"] == "rss_feed": - text = resp.text[:20000] - if " dict | None: - """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" - if source["source_type"] == "excluded": - return None - - article_count = source.get("article_count") or 0 - last_seen = source.get("last_seen_at") - - if article_count == 0: - return { - "type": "stale", - "status": "warning", - "message": "Noch nie Artikel geliefert", - } - - if last_seen: - try: - from datetime import datetime - last_dt = datetime.fromisoformat(last_seen) - now = datetime.now() - age_days = (now - last_dt).days - if age_days > 30: - return { - "type": "stale", - "status": "warning", - "message": f"Letzter Artikel vor {age_days} Tagen", - } - except (ValueError, TypeError): - pass - - return None - - -def _find_duplicates(sources: list[dict]) -> list[dict]: - """Findet doppelte Quellen (gleiche URL).""" - duplicates = [] - url_map = {} - - for s in sources: - if not s["url"]: - continue - url_norm = s["url"].lower().rstrip("/") - if url_norm in url_map: - existing = url_map[url_norm] - duplicates.append({ - "source_id": s["id"], - "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", - "details": {"duplicate_of": existing["id"], "type": "url"}, - }) - else: - url_map[url_norm] = s - - return duplicates - - -async def _save_check( - db: aiosqlite.Connection, source_id: int, check_type: str, - status: str, message: str, details: str = None, -): - """Speichert ein Health-Check-Ergebnis.""" - await db.execute( - "INSERT INTO source_health_checks " - "(source_id, check_type, status, message, details) " - "VALUES (?, ?, ?, ?, ?)", - (source_id, check_type, status, message, details), - ) - - -async def get_health_summary(db: aiosqlite.Connection) -> dict: - """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" - cursor = await db.execute(""" - SELECT - h.id, h.source_id, s.name, s.domain, s.url, s.source_type, - h.check_type, h.status, h.message, h.details, h.checked_at - FROM source_health_checks h - JOIN sources s ON s.id = h.source_id - ORDER BY - CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, - s.name - """) - checks = [dict(row) for row in await cursor.fetchall()] - - error_count = sum(1 for c in checks if c["status"] == "error") - warning_count = sum(1 for c in checks if c["status"] == "warning") - ok_count = sum(1 for c in checks if c["status"] == "ok") - - cursor = await db.execute( - "SELECT MAX(checked_at) as last_check FROM source_health_checks" - ) - row = await cursor.fetchone() - last_check = row["last_check"] if row else None - - return { - "last_check": last_check, - "total_checks": len(checks), - "errors": error_count, - "warnings": warning_count, - "ok": ok_count, - "checks": checks, - } +"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" +import asyncio +import logging +import json +import uuid +from urllib.parse import urlparse + +import httpx +import feedparser +import aiosqlite + +try: + from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S +except ImportError: + HEALTH_CHECK_USER_AGENT = "Mozilla/5.0 (compatible; AegisSight-HealthCheck/1.0)" + HEALTH_CHECK_TIMEOUT_S = 15.0 + +logger = logging.getLogger("osint.source_health") + + +async def run_health_checks(db: aiosqlite.Connection) -> dict: + """Führt Health-Checks für alle aktiven Quellen durch (global + Tenant).""" + logger.info("Starte Quellen-Health-Check...") + + # Alle aktiven Quellen laden (global UND Tenant-spezifisch) + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, article_count, last_seen_at " + "FROM sources WHERE status = 'active' " + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # Bisherigen Stand in History archivieren, dann frisch starten + run_id = uuid.uuid4().hex[:12] + await db.execute( + "INSERT INTO source_health_history " + "(run_id, source_id, check_type, status, message, details, checked_at) " + "SELECT ?, source_id, check_type, status, message, details, checked_at " + "FROM source_health_checks", + (run_id,), + ) + await db.execute("DELETE FROM source_health_checks") + await db.commit() + logger.info(f"Health-Check Run {run_id}: vorigen Stand archiviert") + + checks_done = 0 + issues_found = 0 + + # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) + sources_with_url = [s for s in sources if s["url"]] + + async with httpx.AsyncClient( + timeout=HEALTH_CHECK_TIMEOUT_S, + follow_redirects=True, + headers={"User-Agent": HEALTH_CHECK_USER_AGENT}, + ) as client: + for i in range(0, len(sources_with_url), 5): + batch = sources_with_url[i:i + 5] + tasks = [_check_source_reachability(client, s) for s in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for source, result in zip(batch, results): + if isinstance(result, Exception): + await _save_check( + db, source["id"], "reachability", "error", + f"Prüfung fehlgeschlagen: {result}", + ) + issues_found += 1 + else: + for check in result: + await _save_check( + db, source["id"], check["type"], check["status"], + check["message"], check.get("details"), + ) + if check["status"] != "ok": + issues_found += 1 + checks_done += 1 + + # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) + for source in sources: + if source["source_type"] in ("excluded", "web_source"): + continue + stale_check = _check_stale(source) + if stale_check: + await _save_check( + db, source["id"], stale_check["type"], + stale_check["status"], stale_check["message"], + ) + if stale_check["status"] != "ok": + issues_found += 1 + + # 3. Duplikate erkennen + duplicates = _find_duplicates(sources) + for dup in duplicates: + await _save_check( + db, dup["source_id"], "duplicate", "warning", + dup["message"], json.dumps(dup.get("details", {})), + ) + issues_found += 1 + + await db.commit() + logger.info( + f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " + f"{issues_found} Probleme gefunden" + ) + return {"checked": checks_done, "issues": issues_found} + + +async def _check_source_reachability( + client: httpx.AsyncClient, source: dict, +) -> list[dict]: + """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" + checks = [] + url = source["url"] + + try: + resp = await client.get(url) + + if resp.status_code >= 400: + checks.append({ + "type": "reachability", + "status": "error", + "message": f"HTTP {resp.status_code} - nicht erreichbar", + "details": json.dumps({"status_code": resp.status_code, "url": url}), + }) + return checks + + if resp.status_code >= 300: + checks.append({ + "type": "reachability", + "status": "warning", + "message": f"HTTP {resp.status_code} - Weiterleitung", + "details": json.dumps({ + "status_code": resp.status_code, + "final_url": str(resp.url), + }), + }) + else: + checks.append({ + "type": "reachability", + "status": "ok", + "message": "Erreichbar", + }) + + # Feed-Validität nur für RSS-Feeds + if source["source_type"] == "rss_feed": + text = resp.text[:20000] + if " dict | None: + """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" + if source["source_type"] == "excluded": + return None + + article_count = source.get("article_count") or 0 + last_seen = source.get("last_seen_at") + + if article_count == 0: + return { + "type": "stale", + "status": "warning", + "message": "Noch nie Artikel geliefert", + } + + if last_seen: + try: + from datetime import datetime + last_dt = datetime.fromisoformat(last_seen) + now = datetime.now() + age_days = (now - last_dt).days + if age_days > 30: + return { + "type": "stale", + "status": "warning", + "message": f"Letzter Artikel vor {age_days} Tagen", + } + except (ValueError, TypeError): + pass + + return None + + +def _find_duplicates(sources: list[dict]) -> list[dict]: + """Findet doppelte Quellen (gleiche URL).""" + duplicates = [] + url_map = {} + + for s in sources: + if not s["url"]: + continue + url_norm = s["url"].lower().rstrip("/") + if url_norm in url_map: + existing = url_map[url_norm] + duplicates.append({ + "source_id": s["id"], + "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", + "details": {"duplicate_of": existing["id"], "type": "url"}, + }) + else: + url_map[url_norm] = s + + return duplicates + + +async def _save_check( + db: aiosqlite.Connection, source_id: int, check_type: str, + status: str, message: str, details: str = None, +): + """Speichert ein Health-Check-Ergebnis.""" + await db.execute( + "INSERT INTO source_health_checks " + "(source_id, check_type, status, message, details) " + "VALUES (?, ?, ?, ?, ?)", + (source_id, check_type, status, message, details), + ) + + +async def get_health_summary(db: aiosqlite.Connection) -> dict: + """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" + cursor = await db.execute(""" + SELECT + h.id, h.source_id, s.name, s.domain, s.url, s.source_type, + h.check_type, h.status, h.message, h.details, h.checked_at + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + ORDER BY + CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, + s.name + """) + checks = [dict(row) for row in await cursor.fetchall()] + + error_count = sum(1 for c in checks if c["status"] == "error") + warning_count = sum(1 for c in checks if c["status"] == "warning") + ok_count = sum(1 for c in checks if c["status"] == "ok") + + cursor = await db.execute( + "SELECT MAX(checked_at) as last_check FROM source_health_checks" + ) + row = await cursor.fetchone() + last_check = row["last_check"] if row else None + + return { + "last_check": last_check, + "total_checks": len(checks), + "errors": error_count, + "warnings": warning_count, + "ok": ok_count, + "checks": checks, + }