From 40f2954811515e47da0e0ea65218badee5adc2f1 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sun, 8 Mar 2026 15:26:24 +0100 Subject: [PATCH] =?UTF-8?q?T=C3=A4glicher=20Quellen-Health-Check=20+=20Hai?= =?UTF-8?q?ku-Vorschl=C3=A4ge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Neue Tabellen: source_health_checks, source_suggestions - source_health.py: Prüft Erreichbarkeit, Feed-Validität, Aktualität, Duplikate - source_suggester.py: KI-gestützte Vorschläge via Claude Haiku - APScheduler Job: Automatischer Check täglich um 04:00 Co-Authored-By: Claude Opus 4.6 --- src/database.py | 24 +++ src/main.py | 18 ++ src/services/source_health.py | 282 +++++++++++++++++++++++++++++++ src/services/source_suggester.py | 261 ++++++++++++++++++++++++++++ 4 files changed, 585 insertions(+) create mode 100644 src/services/source_health.py create mode 100644 src/services/source_suggester.py diff --git a/src/database.py b/src/database.py index 00e2935..0b62b6e 100644 --- a/src/database.py +++ b/src/database.py @@ -184,6 +184,30 @@ CREATE TABLE IF NOT EXISTS article_locations ( CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id); CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id); + +CREATE TABLE IF NOT EXISTS source_health_checks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + check_type TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + details TEXT, + checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX IF NOT EXISTS idx_source_health_source ON source_health_checks(source_id); + +CREATE TABLE IF NOT EXISTS source_suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + suggestion_type TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL, + suggested_data TEXT, + priority TEXT DEFAULT 'medium', + status TEXT DEFAULT 'pending', + reviewed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); CREATE TABLE IF NOT EXISTS user_excluded_domains ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, diff --git a/src/main.py b/src/main.py index a43dfad..275e733 100644 --- a/src/main.py +++ b/src/main.py @@ -19,6 +19,8 @@ from config import STATIC_DIR, LOG_DIR, DATA_DIR, TIMEZONE from database import init_db, get_db from auth import decode_token from agents.orchestrator import orchestrator +from services.source_health import run_health_checks, get_health_summary +from services.source_suggester import generate_suggestions # Logging os.makedirs(LOG_DIR, exist_ok=True) @@ -140,6 +142,21 @@ async def check_auto_refresh(): await db.close() + +async def daily_source_health_check(): + """Täglicher Quellen-Health-Check + KI-Vorschläge.""" + db = await get_db() + try: + result = await run_health_checks(db) + logger.info(f"Täglicher Health-Check: {result['checked']} geprüft, {result['issues']} Probleme") + + suggestion_count = await generate_suggestions(db) + logger.info(f"Tägliche Vorschläge: {suggestion_count} neue Vorschläge") + except Exception as e: + logger.error(f"Täglicher Health-Check Fehler: {e}", exc_info=True) + finally: + await db.close() + async def cleanup_expired(): """Bereinigt abgelaufene Lagen basierend auf retention_days.""" db = await get_db() @@ -218,6 +235,7 @@ async def lifespan(app: FastAPI): scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh") scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup") + scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health") scheduler.start() logger.info("OSINT Lagemonitor gestartet") diff --git a/src/services/source_health.py b/src/services/source_health.py new file mode 100644 index 0000000..1a6bcf9 --- /dev/null +++ b/src/services/source_health.py @@ -0,0 +1,282 @@ +"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" +import asyncio +import logging +import json +from urllib.parse import urlparse + +import httpx +import feedparser +import aiosqlite + +logger = logging.getLogger("osint.source_health") + + +async def run_health_checks(db: aiosqlite.Connection) -> dict: + """Führt alle Health-Checks für aktive Grundquellen durch.""" + logger.info("Starte Quellen-Health-Check...") + + # Alle aktiven Grundquellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, article_count, last_seen_at " + "FROM sources WHERE status = 'active' AND tenant_id IS NULL" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) + await db.execute("DELETE FROM source_health_checks") + await db.commit() + + checks_done = 0 + issues_found = 0 + + # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) + sources_with_url = [s for s in sources if s["url"]] + + async with httpx.AsyncClient( + timeout=15.0, + follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, + ) as client: + for i in range(0, len(sources_with_url), 5): + batch = sources_with_url[i:i + 5] + tasks = [_check_source_reachability(client, s) for s in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for source, result in zip(batch, results): + if isinstance(result, Exception): + await _save_check( + db, source["id"], "reachability", "error", + f"Prüfung fehlgeschlagen: {result}", + ) + issues_found += 1 + else: + for check in result: + await _save_check( + db, source["id"], check["type"], check["status"], + check["message"], check.get("details"), + ) + if check["status"] != "ok": + issues_found += 1 + checks_done += 1 + + # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) + for source in sources: + if source["source_type"] == "excluded": + continue + stale_check = _check_stale(source) + if stale_check: + await _save_check( + db, source["id"], stale_check["type"], + stale_check["status"], stale_check["message"], + ) + if stale_check["status"] != "ok": + issues_found += 1 + + # 3. Duplikate erkennen + duplicates = _find_duplicates(sources) + for dup in duplicates: + await _save_check( + db, dup["source_id"], "duplicate", "warning", + dup["message"], json.dumps(dup.get("details", {})), + ) + issues_found += 1 + + await db.commit() + logger.info( + f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " + f"{issues_found} Probleme gefunden" + ) + return {"checked": checks_done, "issues": issues_found} + + +async def _check_source_reachability( + client: httpx.AsyncClient, source: dict, +) -> list[dict]: + """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" + checks = [] + url = source["url"] + + try: + resp = await client.get(url) + + if resp.status_code >= 400: + checks.append({ + "type": "reachability", + "status": "error", + "message": f"HTTP {resp.status_code} - nicht erreichbar", + "details": json.dumps({"status_code": resp.status_code, "url": url}), + }) + return checks + + if resp.status_code >= 300: + checks.append({ + "type": "reachability", + "status": "warning", + "message": f"HTTP {resp.status_code} - Weiterleitung", + "details": json.dumps({ + "status_code": resp.status_code, + "final_url": str(resp.url), + }), + }) + else: + checks.append({ + "type": "reachability", + "status": "ok", + "message": "Erreichbar", + }) + + # Feed-Validität nur für RSS-Feeds + if source["source_type"] == "rss_feed": + text = resp.text[:20000] + if " dict | None: + """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" + if source["source_type"] == "excluded": + return None + + article_count = source.get("article_count") or 0 + last_seen = source.get("last_seen_at") + + if article_count == 0: + return { + "type": "stale", + "status": "warning", + "message": "Noch nie Artikel geliefert", + } + + if last_seen: + try: + from datetime import datetime + last_dt = datetime.fromisoformat(last_seen) + now = datetime.now() + age_days = (now - last_dt).days + if age_days > 30: + return { + "type": "stale", + "status": "warning", + "message": f"Letzter Artikel vor {age_days} Tagen", + } + except (ValueError, TypeError): + pass + + return None + + +def _find_duplicates(sources: list[dict]) -> list[dict]: + """Findet doppelte Quellen (gleiche URL).""" + duplicates = [] + url_map = {} + + for s in sources: + if not s["url"]: + continue + url_norm = s["url"].lower().rstrip("/") + if url_norm in url_map: + existing = url_map[url_norm] + duplicates.append({ + "source_id": s["id"], + "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", + "details": {"duplicate_of": existing["id"], "type": "url"}, + }) + else: + url_map[url_norm] = s + + return duplicates + + +async def _save_check( + db: aiosqlite.Connection, source_id: int, check_type: str, + status: str, message: str, details: str = None, +): + """Speichert ein Health-Check-Ergebnis.""" + await db.execute( + "INSERT INTO source_health_checks " + "(source_id, check_type, status, message, details) " + "VALUES (?, ?, ?, ?, ?)", + (source_id, check_type, status, message, details), + ) + + +async def get_health_summary(db: aiosqlite.Connection) -> dict: + """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" + cursor = await db.execute(""" + SELECT + h.id, h.source_id, s.name, s.domain, s.url, s.source_type, + h.check_type, h.status, h.message, h.details, h.checked_at + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + ORDER BY + CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, + s.name + """) + checks = [dict(row) for row in await cursor.fetchall()] + + error_count = sum(1 for c in checks if c["status"] == "error") + warning_count = sum(1 for c in checks if c["status"] == "warning") + ok_count = sum(1 for c in checks if c["status"] == "ok") + + cursor = await db.execute( + "SELECT MAX(checked_at) as last_check FROM source_health_checks" + ) + row = await cursor.fetchone() + last_check = row["last_check"] if row else None + + return { + "last_check": last_check, + "total_checks": len(checks), + "errors": error_count, + "warnings": warning_count, + "ok": ok_count, + "checks": checks, + } diff --git a/src/services/source_suggester.py b/src/services/source_suggester.py new file mode 100644 index 0000000..683fc95 --- /dev/null +++ b/src/services/source_suggester.py @@ -0,0 +1,261 @@ +"""KI-gestützte Quellen-Vorschläge via Haiku.""" +import json +import logging +import re + +import aiosqlite + +from agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_suggester") + + +async def generate_suggestions(db: aiosqlite.Connection) -> int: + """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" + logger.info("Starte Quellen-Vorschläge via Haiku...") + + # 1. Aktuelle Quellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, status, " + "article_count, last_seen_at " + "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # 2. Health-Check-Probleme laden + cursor = await db.execute(""" + SELECT h.source_id, s.name, s.domain, s.url, + h.check_type, h.status, h.message + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + WHERE h.status IN ('error', 'warning') + """) + issues = [dict(row) for row in await cursor.fetchall()] + + # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) + await db.execute( + "DELETE FROM source_suggestions " + "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" + ) + + # 4. Quellen-Zusammenfassung für Haiku + categories = {} + for s in sources: + cat = s["category"] + if cat not in categories: + categories[cat] = [] + categories[cat].append(s) + + source_summary = "" + for cat, cat_sources in sorted(categories.items()): + active = [ + s for s in cat_sources + if s["status"] == "active" and s["source_type"] != "excluded" + ] + source_summary += f"\n{cat} ({len(active)} aktiv): " + source_summary += ", ".join(s["name"] for s in active[:10]) + if len(active) > 10: + source_summary += f" ... (+{len(active) - 10} weitere)" + + issues_summary = "" + if issues: + issues_summary = "\n\nProbleme gefunden:\n" + for issue in issues[:20]: + issues_summary += ( + f"- {issue['name']} ({issue['domain']}): " + f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" + ) + + prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. + +Aktuelle Quellensammlung:{source_summary}{issues_summary} + +Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. + +Beachte: +1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor mit der source_id +2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor +3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen +4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind +5. Maximal 5 Vorschläge + +Antworte NUR mit einem JSON-Array. Jedes Element: +{{ + "type": "add_source|deactivate_source|fix_url|remove_source", + "title": "Kurzer Titel", + "description": "Begründung", + "priority": "low|medium|high", + "source_id": null, + "data": {{ + "name": "Anzeigename", + "url": "https://...", + "domain": "example.de", + "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" + }} +}} + +Nur das JSON-Array, kein anderer Text.""" + + try: + response, usage = await call_claude( + prompt, tools=None, model=CLAUDE_MODEL_FAST, + ) + + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if not json_match: + logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") + return 0 + + suggestions = json.loads(json_match.group(0)) + + count = 0 + for suggestion in suggestions[:5]: + stype = suggestion.get("type", "add_source") + title = suggestion.get("title", "") + desc = suggestion.get("description", "") + priority = suggestion.get("priority", "medium") + source_id = suggestion.get("source_id") + data = json.dumps( + suggestion.get("data", {}), ensure_ascii=False, + ) + + # source_id validieren (muss existieren oder None sein) + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM sources WHERE id = ?", (source_id,), + ) + if not await cursor.fetchone(): + source_id = None + + # Duplikat-Check + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE title = ? AND status = 'pending'", + (title,), + ) + if await cursor.fetchone(): + continue + + await db.execute( + "INSERT INTO source_suggestions " + "(suggestion_type, title, description, source_id, " + "suggested_data, priority, status) " + "VALUES (?, ?, ?, ?, ?, ?, 'pending')", + (stype, title, desc, source_id, data, priority), + ) + count += 1 + + await db.commit() + logger.info( + f"Quellen-Vorschläge: {count} neue Vorschläge generiert " + f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " + f"${usage.cost_usd:.4f})" + ) + return count + + except Exception as e: + logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) + return 0 + + +async def apply_suggestion( + db: aiosqlite.Connection, suggestion_id: int, accept: bool, +) -> dict: + """Wendet einen Vorschlag an oder lehnt ihn ab.""" + cursor = await db.execute( + "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), + ) + suggestion = await cursor.fetchone() + if not suggestion: + raise ValueError("Vorschlag nicht gefunden") + + suggestion = dict(suggestion) + + if suggestion["status"] != "pending": + raise ValueError(f"Vorschlag bereits {suggestion['status']}") + + new_status = "accepted" if accept else "rejected" + result = {"status": new_status, "action": None} + + if accept: + stype = suggestion["suggestion_type"] + data = ( + json.loads(suggestion["suggested_data"]) + if suggestion["suggested_data"] + else {} + ) + + if stype == "add_source": + name = data.get("name", "Unbenannt") + url = data.get("url") + domain = data.get("domain", "") + category = data.get("category", "sonstige") + source_type = "rss_feed" if url and any( + x in (url or "").lower() + for x in ("rss", "feed", "xml", "atom") + ) else "web_source" + + if url: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", + (url,), + ) + if await cursor.fetchone(): + result["action"] = "übersprungen (URL bereits vorhanden)" + new_status = "rejected" + else: + await db.execute( + "INSERT INTO sources " + "(name, url, domain, source_type, category, status, " + "added_by, tenant_id) " + "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", + (name, url, domain, source_type, category), + ) + result["action"] = f"Quelle '{name}' angelegt" + else: + result["action"] = "übersprungen (keine URL)" + new_status = "rejected" + + elif stype == "deactivate_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "UPDATE sources SET status = 'inactive' WHERE id = ?", + (source_id,), + ) + result["action"] = "Quelle deaktiviert" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "remove_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "DELETE FROM sources WHERE id = ?", (source_id,), + ) + result["action"] = "Quelle gelöscht" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "fix_url": + source_id = suggestion["source_id"] + new_url = data.get("url") + if source_id and new_url: + await db.execute( + "UPDATE sources SET url = ? WHERE id = ?", + (new_url, source_id), + ) + result["action"] = f"URL aktualisiert auf {new_url}" + else: + result["action"] = "übersprungen (keine source_id oder URL)" + + await db.execute( + "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (new_status, suggestion_id), + ) + await db.commit() + + result["status"] = new_status + return result