diff --git a/src/services/source_suggester.py b/src/services/source_suggester.py index 683fc95..09061e5 100644 --- a/src/services/source_suggester.py +++ b/src/services/source_suggester.py @@ -1,261 +1,261 @@ -"""KI-gestützte Quellen-Vorschläge via Haiku.""" -import json -import logging -import re - -import aiosqlite - -from agents.claude_client import call_claude -from config import CLAUDE_MODEL_FAST - -logger = logging.getLogger("osint.source_suggester") - - -async def generate_suggestions(db: aiosqlite.Connection) -> int: - """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" - logger.info("Starte Quellen-Vorschläge via Haiku...") - - # 1. Aktuelle Quellen laden - cursor = await db.execute( - "SELECT id, name, url, domain, source_type, category, status, " - "article_count, last_seen_at " - "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" - ) - sources = [dict(row) for row in await cursor.fetchall()] - - # 2. Health-Check-Probleme laden - cursor = await db.execute(""" - SELECT h.source_id, s.name, s.domain, s.url, - h.check_type, h.status, h.message - FROM source_health_checks h - JOIN sources s ON s.id = h.source_id - WHERE h.status IN ('error', 'warning') - """) - issues = [dict(row) for row in await cursor.fetchall()] - - # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) - await db.execute( - "DELETE FROM source_suggestions " - "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" - ) - - # 4. Quellen-Zusammenfassung für Haiku - categories = {} - for s in sources: - cat = s["category"] - if cat not in categories: - categories[cat] = [] - categories[cat].append(s) - - source_summary = "" - for cat, cat_sources in sorted(categories.items()): - active = [ - s for s in cat_sources - if s["status"] == "active" and s["source_type"] != "excluded" - ] - source_summary += f"\n{cat} ({len(active)} aktiv): " - source_summary += ", ".join(s["name"] for s in active[:10]) - if len(active) > 10: - source_summary += f" ... (+{len(active) - 10} weitere)" - - issues_summary = "" - if issues: - issues_summary = "\n\nProbleme gefunden:\n" - for issue in issues[:20]: - issues_summary += ( - f"- {issue['name']} ({issue['domain']}): " - f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" - ) - - prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. - -Aktuelle Quellensammlung:{source_summary}{issues_summary} - -Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. - -Beachte: -1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor mit der source_id -2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor -3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen -4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind -5. Maximal 5 Vorschläge - -Antworte NUR mit einem JSON-Array. Jedes Element: -{{ - "type": "add_source|deactivate_source|fix_url|remove_source", - "title": "Kurzer Titel", - "description": "Begründung", - "priority": "low|medium|high", - "source_id": null, - "data": {{ - "name": "Anzeigename", - "url": "https://...", - "domain": "example.de", - "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" - }} -}} - -Nur das JSON-Array, kein anderer Text.""" - - try: - response, usage = await call_claude( - prompt, tools=None, model=CLAUDE_MODEL_FAST, - ) - - json_match = re.search(r'\[.*\]', response, re.DOTALL) - if not json_match: - logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") - return 0 - - suggestions = json.loads(json_match.group(0)) - - count = 0 - for suggestion in suggestions[:5]: - stype = suggestion.get("type", "add_source") - title = suggestion.get("title", "") - desc = suggestion.get("description", "") - priority = suggestion.get("priority", "medium") - source_id = suggestion.get("source_id") - data = json.dumps( - suggestion.get("data", {}), ensure_ascii=False, - ) - - # source_id validieren (muss existieren oder None sein) - if source_id is not None: - cursor = await db.execute( - "SELECT id FROM sources WHERE id = ?", (source_id,), - ) - if not await cursor.fetchone(): - source_id = None - - # Duplikat-Check - cursor = await db.execute( - "SELECT id FROM source_suggestions " - "WHERE title = ? AND status = 'pending'", - (title,), - ) - if await cursor.fetchone(): - continue - - await db.execute( - "INSERT INTO source_suggestions " - "(suggestion_type, title, description, source_id, " - "suggested_data, priority, status) " - "VALUES (?, ?, ?, ?, ?, ?, 'pending')", - (stype, title, desc, source_id, data, priority), - ) - count += 1 - - await db.commit() - logger.info( - f"Quellen-Vorschläge: {count} neue Vorschläge generiert " - f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " - f"${usage.cost_usd:.4f})" - ) - return count - - except Exception as e: - logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) - return 0 - - -async def apply_suggestion( - db: aiosqlite.Connection, suggestion_id: int, accept: bool, -) -> dict: - """Wendet einen Vorschlag an oder lehnt ihn ab.""" - cursor = await db.execute( - "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), - ) - suggestion = await cursor.fetchone() - if not suggestion: - raise ValueError("Vorschlag nicht gefunden") - - suggestion = dict(suggestion) - - if suggestion["status"] != "pending": - raise ValueError(f"Vorschlag bereits {suggestion['status']}") - - new_status = "accepted" if accept else "rejected" - result = {"status": new_status, "action": None} - - if accept: - stype = suggestion["suggestion_type"] - data = ( - json.loads(suggestion["suggested_data"]) - if suggestion["suggested_data"] - else {} - ) - - if stype == "add_source": - name = data.get("name", "Unbenannt") - url = data.get("url") - domain = data.get("domain", "") - category = data.get("category", "sonstige") - source_type = "rss_feed" if url and any( - x in (url or "").lower() - for x in ("rss", "feed", "xml", "atom") - ) else "web_source" - - if url: - cursor = await db.execute( - "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", - (url,), - ) - if await cursor.fetchone(): - result["action"] = "übersprungen (URL bereits vorhanden)" - new_status = "rejected" - else: - await db.execute( - "INSERT INTO sources " - "(name, url, domain, source_type, category, status, " - "added_by, tenant_id) " - "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", - (name, url, domain, source_type, category), - ) - result["action"] = f"Quelle '{name}' angelegt" - else: - result["action"] = "übersprungen (keine URL)" - new_status = "rejected" - - elif stype == "deactivate_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute( - "UPDATE sources SET status = 'inactive' WHERE id = ?", - (source_id,), - ) - result["action"] = "Quelle deaktiviert" - else: - result["action"] = "übersprungen (keine source_id)" - - elif stype == "remove_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute( - "DELETE FROM sources WHERE id = ?", (source_id,), - ) - result["action"] = "Quelle gelöscht" - else: - result["action"] = "übersprungen (keine source_id)" - - elif stype == "fix_url": - source_id = suggestion["source_id"] - new_url = data.get("url") - if source_id and new_url: - await db.execute( - "UPDATE sources SET url = ? WHERE id = ?", - (new_url, source_id), - ) - result["action"] = f"URL aktualisiert auf {new_url}" - else: - result["action"] = "übersprungen (keine source_id oder URL)" - - await db.execute( - "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " - "WHERE id = ?", - (new_status, suggestion_id), - ) - await db.commit() - - result["status"] = new_status - return result +"""KI-gestützte Quellen-Vorschläge via Haiku.""" +import json +import logging +import re + +import aiosqlite + +from agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_suggester") + + +async def generate_suggestions(db: aiosqlite.Connection) -> int: + """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" + logger.info("Starte Quellen-Vorschläge via Haiku...") + + # 1. Aktuelle Quellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, status, " + "article_count, last_seen_at " + "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # 2. Health-Check-Probleme laden + cursor = await db.execute(""" + SELECT h.source_id, s.name, s.domain, s.url, + h.check_type, h.status, h.message + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + WHERE h.status IN ('error', 'warning') + """) + issues = [dict(row) for row in await cursor.fetchall()] + + # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) + await db.execute( + "DELETE FROM source_suggestions " + "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" + ) + + # 4. Quellen-Zusammenfassung für Haiku + categories = {} + for s in sources: + cat = s["category"] + if cat not in categories: + categories[cat] = [] + categories[cat].append(s) + + source_summary = "" + for cat, cat_sources in sorted(categories.items()): + active = [ + s for s in cat_sources + if s["status"] == "active" and s["source_type"] != "excluded" + ] + source_summary += f"\n{cat} ({len(active)} aktiv): " + source_summary += ", ".join(s["name"] for s in active[:10]) + if len(active) > 10: + source_summary += f" ... (+{len(active) - 10} weitere)" + + issues_summary = "" + if issues: + issues_summary = "\n\nProbleme gefunden:\n" + for issue in issues[:20]: + issues_summary += ( + f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " + f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" + ) + + prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. + +Aktuelle Quellensammlung:{source_summary}{issues_summary} + +Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. + +Beachte: +1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste +2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor +3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen +4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind +5. Maximal 5 Vorschläge + +Antworte NUR mit einem JSON-Array. Jedes Element: +{{ + "type": "add_source|deactivate_source|fix_url|remove_source", + "title": "Kurzer Titel", + "description": "Begründung", + "priority": "low|medium|high", + "source_id": null, + "data": {{ + "name": "Anzeigename", + "url": "https://...", + "domain": "example.de", + "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" + }} +}} + +Nur das JSON-Array, kein anderer Text.""" + + try: + response, usage = await call_claude( + prompt, tools=None, model=CLAUDE_MODEL_FAST, + ) + + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if not json_match: + logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") + return 0 + + suggestions = json.loads(json_match.group(0)) + + count = 0 + for suggestion in suggestions[:5]: + stype = suggestion.get("type", "add_source") + title = suggestion.get("title", "") + desc = suggestion.get("description", "") + priority = suggestion.get("priority", "medium") + source_id = suggestion.get("source_id") + data = json.dumps( + suggestion.get("data", {}), ensure_ascii=False, + ) + + # source_id validieren (muss existieren oder None sein) + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM sources WHERE id = ?", (source_id,), + ) + if not await cursor.fetchone(): + source_id = None + + # Duplikat-Check + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE title = ? AND status = 'pending'", + (title,), + ) + if await cursor.fetchone(): + continue + + await db.execute( + "INSERT INTO source_suggestions " + "(suggestion_type, title, description, source_id, " + "suggested_data, priority, status) " + "VALUES (?, ?, ?, ?, ?, ?, 'pending')", + (stype, title, desc, source_id, data, priority), + ) + count += 1 + + await db.commit() + logger.info( + f"Quellen-Vorschläge: {count} neue Vorschläge generiert " + f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " + f"${usage.cost_usd:.4f})" + ) + return count + + except Exception as e: + logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) + return 0 + + +async def apply_suggestion( + db: aiosqlite.Connection, suggestion_id: int, accept: bool, +) -> dict: + """Wendet einen Vorschlag an oder lehnt ihn ab.""" + cursor = await db.execute( + "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), + ) + suggestion = await cursor.fetchone() + if not suggestion: + raise ValueError("Vorschlag nicht gefunden") + + suggestion = dict(suggestion) + + if suggestion["status"] != "pending": + raise ValueError(f"Vorschlag bereits {suggestion['status']}") + + new_status = "accepted" if accept else "rejected" + result = {"status": new_status, "action": None} + + if accept: + stype = suggestion["suggestion_type"] + data = ( + json.loads(suggestion["suggested_data"]) + if suggestion["suggested_data"] + else {} + ) + + if stype == "add_source": + name = data.get("name", "Unbenannt") + url = data.get("url") + domain = data.get("domain", "") + category = data.get("category", "sonstige") + source_type = "rss_feed" if url and any( + x in (url or "").lower() + for x in ("rss", "feed", "xml", "atom") + ) else "web_source" + + if url: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", + (url,), + ) + if await cursor.fetchone(): + result["action"] = "übersprungen (URL bereits vorhanden)" + new_status = "rejected" + else: + await db.execute( + "INSERT INTO sources " + "(name, url, domain, source_type, category, status, " + "added_by, tenant_id) " + "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", + (name, url, domain, source_type, category), + ) + result["action"] = f"Quelle '{name}' angelegt" + else: + result["action"] = "übersprungen (keine URL)" + new_status = "rejected" + + elif stype == "deactivate_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "UPDATE sources SET status = 'inactive' WHERE id = ?", + (source_id,), + ) + result["action"] = "Quelle deaktiviert" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "remove_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "DELETE FROM sources WHERE id = ?", (source_id,), + ) + result["action"] = "Quelle gelöscht" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "fix_url": + source_id = suggestion["source_id"] + new_url = data.get("url") + if source_id and new_url: + await db.execute( + "UPDATE sources SET url = ? WHERE id = ?", + (new_url, source_id), + ) + result["action"] = f"URL aktualisiert auf {new_url}" + else: + result["action"] = "übersprungen (keine source_id oder URL)" + + await db.execute( + "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (new_status, suggestion_id), + ) + await db.commit() + + result["status"] = new_status + return result