"""KI-gestützte Quellen-Vorschläge via Haiku + deterministische Karteileichen-Heuristik.""" import json import logging import re import aiosqlite from agents.claude_client import call_claude from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.source_suggester") # Schwelle für "stumm seit": eine Quelle, die seit mehr als so vielen Tagen # keinen Artikel mehr geliefert hat, gilt als Karteileichen-Kandidat. STALE_DEACTIVATE_THRESHOLD_DAYS = 60 async def generate_stale_deactivation_suggestions( db: aiosqlite.Connection, days_threshold: int = STALE_DEACTIVATE_THRESHOLD_DAYS, ) -> int: """Erzeugt deactivate_source-Vorschläge für Karteileichen-Quellen. Karteileiche = aktive Quelle, die entweder noch nie einen Artikel geliefert hat (article_count = 0) oder seit mehr als days_threshold Tagen stumm ist (last_seen_at älter als die Schwelle). Reine SQL-Heuristik, kein KI-Aufruf. Doppel-Vermeidung: existiert bereits ein pending deactivate-Vorschlag für dieselbe source_id, wird kein neuer erzeugt. Returns: Anzahl neu erstellter Vorschläge. """ cursor = await db.execute( f""" SELECT id, name, url, domain, article_count, last_seen_at FROM sources WHERE status = 'active' AND ( COALESCE(article_count, 0) = 0 OR (last_seen_at IS NOT NULL AND last_seen_at < datetime('now', '-{int(days_threshold)} days')) ) """ ) candidates = [dict(row) for row in await cursor.fetchall()] if not candidates: return 0 cursor = await db.execute( "SELECT DISTINCT source_id FROM source_suggestions " "WHERE status = 'pending' AND suggestion_type = 'deactivate_source' " "AND source_id IS NOT NULL" ) already_pending = {row["source_id"] for row in await cursor.fetchall()} created = 0 for c in candidates: sid = c["id"] if sid in already_pending: continue if (c["article_count"] or 0) == 0: reason = "Hat seit Anlage noch nie einen Artikel geliefert." else: reason = ( f"Letzter Artikel vor mehr als {days_threshold} Tagen " f"(last_seen_at={c['last_seen_at']})." ) title = f"{c['name']} (ID {sid}) - Karteileiche, deaktivieren?" description = ( f"Quelle: {c['name']} | URL: {c['url']} | Domain: {c['domain'] or '-'}\n" f"Begründung: {reason}\n" f"article_count={c['article_count'] or 0}, " f"last_seen_at={c['last_seen_at'] or 'NULL'}\n" "Hinweis: Quelle wurde automatisch als inaktiv erkannt. " "Bitte vor Annahme prüfen, ob sie wirklich nicht mehr gebraucht wird." ) suggested_data = json.dumps( {"action": "deactivate", "source_id": sid}, ensure_ascii=False ) await db.execute( "INSERT INTO source_suggestions " "(suggestion_type, title, description, source_id, suggested_data, " " priority, status) VALUES " "('deactivate_source', ?, ?, ?, ?, 'medium', 'pending')", (title, description, sid, suggested_data), ) created += 1 if created > 0: await db.commit() logger.info( "Karteileichen-Heuristik: %d neue deactivate-Vorschläge erstellt " "(%d Kandidaten, %d bereits pending)", created, len(candidates), len(already_pending), ) else: logger.info( "Karteileichen-Heuristik: keine neuen Vorschläge " "(%d Kandidaten, alle bereits pending)", len(candidates), ) return created async def generate_strategy_escalation_suggestions(db: aiosqlite.Connection) -> int: """Erzeugt deactivate_source-Vorschläge für Quellen, bei denen die fetch_strategy bereits eskaliert wurde (googlebot oder paywall) und der Reachability-Check trotzdem error meldet. Beispiel: Rheinische Post hat fetch_strategy=googlebot, kriegt aber HTTP 403. -> Strategie greift nicht, Quelle ist faktisch nicht abrufbar. Vorschlag: deaktivieren. Doppel-Vermeidung wie in der Karteileichen-Heuristik: nur wenn noch kein pending deactivate-Vorschlag für die source_id existiert. Returns: Anzahl neu erstellter Vorschläge. """ cursor = await db.execute( """ SELECT s.id, s.name, s.url, s.domain, s.fetch_strategy, h.message FROM sources s JOIN source_health_checks h ON h.source_id = s.id WHERE s.status = 'active' AND s.fetch_strategy IN ('googlebot', 'paywall') AND h.check_type = 'reachability' AND h.status = 'error' """ ) candidates = [dict(row) for row in await cursor.fetchall()] if not candidates: return 0 cursor = await db.execute( "SELECT DISTINCT source_id FROM source_suggestions " "WHERE status = 'pending' AND suggestion_type = 'deactivate_source' " "AND source_id IS NOT NULL" ) already_pending = {row["source_id"] for row in await cursor.fetchall()} created = 0 for c in candidates: sid = c["id"] if sid in already_pending: continue title = f"{c['name']} (ID {sid}) - Strategie greift nicht" description = ( f"Quelle: {c['name']} | URL: {c['url']} | Domain: {c['domain'] or '-'}\n" f"fetch_strategy='{c['fetch_strategy']}' wurde bereits zur Eskalation gesetzt, " f"liefert beim Health-Check aber weiter einen Fehler:\n" f" {c['message']}\n" "Vorschlag: deaktivieren oder fetch_strategy='skip' setzen, damit die Quelle " "den Health-Check nicht weiter verfälscht.\n" "Hinweis: Quelle wurde automatisch erkannt. Bitte vor Annahme prüfen." ) suggested_data = json.dumps( {"action": "deactivate", "source_id": sid, "reason": "fetch_strategy_failed", "current_strategy": c["fetch_strategy"]}, ensure_ascii=False, ) await db.execute( "INSERT INTO source_suggestions " "(suggestion_type, title, description, source_id, suggested_data, " " priority, status) VALUES " "('deactivate_source', ?, ?, ?, ?, 'high', 'pending')", (title, description, sid, suggested_data), ) created += 1 if created > 0: await db.commit() logger.info( "Strategie-Eskalations-Heuristik: %d neue deactivate-Vorschläge " "(%d Kandidaten, %d bereits pending)", created, len(candidates), len(already_pending), ) return created async def generate_suggestions(db: aiosqlite.Connection) -> int: """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse. Drei Stufen, in dieser Reihenfolge ausgeführt (spezifisch -> generisch -> KI): 1. Deterministisch: Strategie-Eskalations-Heuristik (fetch_strategy=googlebot oder paywall, aber Reachability weiter error) erzeugt deactivate_source- Vorschläge mit Priorität 'high'. Spezifischste Diagnose: "Workaround greift nicht". Läuft ZUERST, damit diese Sources nicht von der generischeren Karteileichen-Stufe weggefangen werden. 2. Deterministisch: Karteileichen-Heuristik (article_count=0 oder >60d stumm) erzeugt sofort deactivate_source-Vorschläge für alle übrigen toten Quellen ohne KI-Aufruf. 3. KI-basiert: Haiku schaut sich Quellensammlung + Health-Probleme an und schlägt weitere Verbesserungen vor (add_source, deactivate_source, fix_url, ...). Rückgabe ist die Gesamtzahl neu erzeugter Vorschläge aller Stufen. """ strategy_count = await generate_strategy_escalation_suggestions(db) stale_count = await generate_stale_deactivation_suggestions(db) logger.info("Starte Quellen-Vorschläge via Haiku...") # 1. Aktuelle Quellen laden cursor = await db.execute( "SELECT id, name, url, domain, source_type, category, status, " "article_count, last_seen_at " "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" ) sources = [dict(row) for row in await cursor.fetchall()] # 2. Health-Check-Probleme laden cursor = await db.execute(""" SELECT h.source_id, s.name, s.domain, s.url, h.check_type, h.status, h.message FROM source_health_checks h JOIN sources s ON s.id = h.source_id WHERE h.status IN ('error', 'warning') """) issues = [dict(row) for row in await cursor.fetchall()] # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) await db.execute( "DELETE FROM source_suggestions " "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" ) # 4. Quellen-Zusammenfassung für Haiku categories = {} for s in sources: cat = s["category"] if cat not in categories: categories[cat] = [] categories[cat].append(s) source_summary = "" for cat, cat_sources in sorted(categories.items()): active = [ s for s in cat_sources if s["status"] == "active" and s["source_type"] != "excluded" ] source_summary += f"\n{cat} ({len(active)} aktiv): " source_summary += ", ".join(s["name"] for s in active[:10]) if len(active) > 10: source_summary += f" ... (+{len(active) - 10} weitere)" issues_summary = "" if issues: issues_summary = "\n\nProbleme gefunden:\n" for issue in issues[:20]: issues_summary += ( f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" ) prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. Aktuelle Quellensammlung:{source_summary}{issues_summary} Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. Beachte: 1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste 2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor 3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen 4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind 5. Maximal 5 Vorschläge Antworte NUR mit einem JSON-Array. Jedes Element: {{ "type": "add_source|deactivate_source|fix_url|remove_source", "title": "Kurzer Titel", "description": "Begründung", "priority": "low|medium|high", "source_id": null, "data": {{ "name": "Anzeigename", "url": "https://...", "domain": "example.de", "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" }} }} Nur das JSON-Array, kein anderer Text.""" try: response, usage = await call_claude( prompt, tools=None, model=CLAUDE_MODEL_FAST, ) json_match = re.search(r'\[.*\]', response, re.DOTALL) if not json_match: logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") return 0 suggestions = json.loads(json_match.group(0)) count = 0 for suggestion in suggestions[:5]: stype = suggestion.get("type", "add_source") title = suggestion.get("title", "") desc = suggestion.get("description", "") priority = suggestion.get("priority", "medium") source_id = suggestion.get("source_id") data = json.dumps( suggestion.get("data", {}), ensure_ascii=False, ) # source_id validieren (muss existieren oder None sein) if source_id is not None: cursor = await db.execute( "SELECT id FROM sources WHERE id = ?", (source_id,), ) if not await cursor.fetchone(): source_id = None # Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending? if source_id is not None: cursor = await db.execute( "SELECT id FROM source_suggestions " "WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'", (stype, source_id), ) else: # Bei add_source ohne source_id: Domain aus suggested_data prüfen check_domain = suggestion.get('data', {}).get('domain', '') if check_domain: cursor = await db.execute( "SELECT id FROM source_suggestions " "WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'", (stype, f'%{check_domain}%'), ) else: cursor = await db.execute( "SELECT id FROM source_suggestions " "WHERE title = ? AND status = 'pending'", (title,), ) if await cursor.fetchone(): continue await db.execute( "INSERT INTO source_suggestions " "(suggestion_type, title, description, source_id, " "suggested_data, priority, status) " "VALUES (?, ?, ?, ?, ?, ?, 'pending')", (stype, title, desc, source_id, data, priority), ) count += 1 await db.commit() logger.info( f"Quellen-Vorschläge: {count} neue Vorschläge generiert via Haiku " f"(+{stale_count} Karteileichen, +{strategy_count} Strategie-Eskalation) " f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " f"${usage.cost_usd:.4f})" ) return count + stale_count + strategy_count except Exception as e: logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) return stale_count + strategy_count async def apply_suggestion( db: aiosqlite.Connection, suggestion_id: int, accept: bool, ) -> dict: """Wendet einen Vorschlag an oder lehnt ihn ab.""" cursor = await db.execute( "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), ) suggestion = await cursor.fetchone() if not suggestion: raise ValueError("Vorschlag nicht gefunden") suggestion = dict(suggestion) if suggestion["status"] != "pending": raise ValueError(f"Vorschlag bereits {suggestion['status']}") new_status = "accepted" if accept else "rejected" result = {"status": new_status, "action": None} if accept: stype = suggestion["suggestion_type"] data = ( json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {} ) if stype == "add_source": name = data.get("name", "Unbenannt") url = data.get("url") domain = data.get("domain", "") category = data.get("category", "sonstige") source_type = "rss_feed" if url and any( x in (url or "").lower() for x in ("rss", "feed", "xml", "atom") ) else "web_source" if url: cursor = await db.execute( "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,), ) if await cursor.fetchone(): result["action"] = "übersprungen (URL bereits vorhanden)" new_status = "rejected" else: await db.execute( "INSERT INTO sources " "(name, url, domain, source_type, category, status, " "added_by, tenant_id) " "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", (name, url, domain, source_type, category), ) result["action"] = f"Quelle '{name}' angelegt" else: result["action"] = "übersprungen (keine URL)" new_status = "rejected" elif stype == "deactivate_source": source_id = suggestion["source_id"] if source_id: await db.execute( "UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,), ) result["action"] = "Quelle deaktiviert" else: result["action"] = "übersprungen (keine source_id)" elif stype == "remove_source": source_id = suggestion["source_id"] if source_id: await db.execute( "DELETE FROM sources WHERE id = ?", (source_id,), ) result["action"] = "Quelle gelöscht" else: result["action"] = "übersprungen (keine source_id)" elif stype == "fix_url": source_id = suggestion["source_id"] new_url = data.get("url") if source_id and new_url: await db.execute( "UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id), ) result["action"] = f"URL aktualisiert auf {new_url}" else: result["action"] = "übersprungen (keine source_id oder URL)" await db.execute( "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " "WHERE id = ?", (new_status, suggestion_id), ) await db.commit() result["status"] = new_status return result