From 13143b944708bd09f48794ce4aa08ad3ffbf27a6 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sun, 8 Mar 2026 19:05:45 +0100 Subject: [PATCH] =?UTF-8?q?Fix:=20Duplikat-Vorschl=C3=A4ge=20+=20Stale-Che?= =?UTF-8?q?ck=20nur=20f=C3=BCr=20RSS-Feeds?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Duplikat-Check basiert auf source_id+type statt exaktem Titel - add_source ohne source_id prüft per Domain-Match - Stale-Check überspringt web_sources (nur RSS-Feeds prüfen) Co-Authored-By: Claude Opus 4.6 --- src/services/source_health.py | 26 +- src/services/source_suggester.py | 538 ++++++++++++++++--------------- 2 files changed, 290 insertions(+), 274 deletions(-) diff --git a/src/services/source_health.py b/src/services/source_health.py index 1a6bcf9..e6ee799 100644 --- a/src/services/source_health.py +++ b/src/services/source_health.py @@ -1,4 +1,4 @@ -"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" +"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" import asyncio import logging import json @@ -12,7 +12,7 @@ logger = logging.getLogger("osint.source_health") async def run_health_checks(db: aiosqlite.Connection) -> dict: - """Führt alle Health-Checks für aktive Grundquellen durch.""" + """Führt alle Health-Checks für aktive Grundquellen durch.""" logger.info("Starte Quellen-Health-Check...") # Alle aktiven Grundquellen laden @@ -22,14 +22,14 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict: ) sources = [dict(row) for row in await cursor.fetchall()] - # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) + # Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) await db.execute("DELETE FROM source_health_checks") await db.commit() checks_done = 0 issues_found = 0 - # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) + # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) sources_with_url = [s for s in sources if s["url"]] async with httpx.AsyncClient( @@ -46,7 +46,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict: if isinstance(result, Exception): await _save_check( db, source["id"], "reachability", "error", - f"Prüfung fehlgeschlagen: {result}", + f"Prüfung fehlgeschlagen: {result}", ) issues_found += 1 else: @@ -61,7 +61,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict: # 2. Veraltete Quellen (kein Artikel seit >30 Tagen) for source in sources: - if source["source_type"] == "excluded": + if source["source_type"] in ("excluded", "web_source"): continue stale_check = _check_stale(source) if stale_check: @@ -83,7 +83,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict: await db.commit() logger.info( - f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " + f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " f"{issues_found} Probleme gefunden" ) return {"checked": checks_done, "issues": issues_found} @@ -92,7 +92,7 @@ async def run_health_checks(db: aiosqlite.Connection) -> dict: async def _check_source_reachability( client: httpx.AsyncClient, source: dict, ) -> list[dict]: - """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" + """Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" checks = [] url = source["url"] @@ -125,14 +125,14 @@ async def _check_source_reachability( "message": "Erreichbar", }) - # Feed-Validität nur für RSS-Feeds + # Feed-Validität nur für RSS-Feeds if source["source_type"] == "rss_feed": text = resp.text[:20000] if " dict | None: - """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" + """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" if source["source_type"] == "excluded": return None @@ -249,7 +249,7 @@ async def _save_check( async def get_health_summary(db: aiosqlite.Connection) -> dict: - """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" + """Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück.""" cursor = await db.execute(""" SELECT h.id, h.source_id, s.name, s.domain, s.url, s.source_type, diff --git a/src/services/source_suggester.py b/src/services/source_suggester.py index 09061e5..ed7be67 100644 --- a/src/services/source_suggester.py +++ b/src/services/source_suggester.py @@ -1,261 +1,277 @@ -"""KI-gestützte Quellen-Vorschläge via Haiku.""" -import json -import logging -import re - -import aiosqlite - -from agents.claude_client import call_claude -from config import CLAUDE_MODEL_FAST - -logger = logging.getLogger("osint.source_suggester") - - -async def generate_suggestions(db: aiosqlite.Connection) -> int: - """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" - logger.info("Starte Quellen-Vorschläge via Haiku...") - - # 1. Aktuelle Quellen laden - cursor = await db.execute( - "SELECT id, name, url, domain, source_type, category, status, " - "article_count, last_seen_at " - "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" - ) - sources = [dict(row) for row in await cursor.fetchall()] - - # 2. Health-Check-Probleme laden - cursor = await db.execute(""" - SELECT h.source_id, s.name, s.domain, s.url, - h.check_type, h.status, h.message - FROM source_health_checks h - JOIN sources s ON s.id = h.source_id - WHERE h.status IN ('error', 'warning') - """) - issues = [dict(row) for row in await cursor.fetchall()] - - # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) - await db.execute( - "DELETE FROM source_suggestions " - "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" - ) - - # 4. Quellen-Zusammenfassung für Haiku - categories = {} - for s in sources: - cat = s["category"] - if cat not in categories: - categories[cat] = [] - categories[cat].append(s) - - source_summary = "" - for cat, cat_sources in sorted(categories.items()): - active = [ - s for s in cat_sources - if s["status"] == "active" and s["source_type"] != "excluded" - ] - source_summary += f"\n{cat} ({len(active)} aktiv): " - source_summary += ", ".join(s["name"] for s in active[:10]) - if len(active) > 10: - source_summary += f" ... (+{len(active) - 10} weitere)" - - issues_summary = "" - if issues: - issues_summary = "\n\nProbleme gefunden:\n" - for issue in issues[:20]: - issues_summary += ( - f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " - f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" - ) - - prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. - -Aktuelle Quellensammlung:{source_summary}{issues_summary} - -Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. - -Beachte: -1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste -2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor -3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen -4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind -5. Maximal 5 Vorschläge - -Antworte NUR mit einem JSON-Array. Jedes Element: -{{ - "type": "add_source|deactivate_source|fix_url|remove_source", - "title": "Kurzer Titel", - "description": "Begründung", - "priority": "low|medium|high", - "source_id": null, - "data": {{ - "name": "Anzeigename", - "url": "https://...", - "domain": "example.de", - "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" - }} -}} - -Nur das JSON-Array, kein anderer Text.""" - - try: - response, usage = await call_claude( - prompt, tools=None, model=CLAUDE_MODEL_FAST, - ) - - json_match = re.search(r'\[.*\]', response, re.DOTALL) - if not json_match: - logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") - return 0 - - suggestions = json.loads(json_match.group(0)) - - count = 0 - for suggestion in suggestions[:5]: - stype = suggestion.get("type", "add_source") - title = suggestion.get("title", "") - desc = suggestion.get("description", "") - priority = suggestion.get("priority", "medium") - source_id = suggestion.get("source_id") - data = json.dumps( - suggestion.get("data", {}), ensure_ascii=False, - ) - - # source_id validieren (muss existieren oder None sein) - if source_id is not None: - cursor = await db.execute( - "SELECT id FROM sources WHERE id = ?", (source_id,), - ) - if not await cursor.fetchone(): - source_id = None - - # Duplikat-Check - cursor = await db.execute( - "SELECT id FROM source_suggestions " - "WHERE title = ? AND status = 'pending'", - (title,), - ) - if await cursor.fetchone(): - continue - - await db.execute( - "INSERT INTO source_suggestions " - "(suggestion_type, title, description, source_id, " - "suggested_data, priority, status) " - "VALUES (?, ?, ?, ?, ?, ?, 'pending')", - (stype, title, desc, source_id, data, priority), - ) - count += 1 - - await db.commit() - logger.info( - f"Quellen-Vorschläge: {count} neue Vorschläge generiert " - f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " - f"${usage.cost_usd:.4f})" - ) - return count - - except Exception as e: - logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) - return 0 - - -async def apply_suggestion( - db: aiosqlite.Connection, suggestion_id: int, accept: bool, -) -> dict: - """Wendet einen Vorschlag an oder lehnt ihn ab.""" - cursor = await db.execute( - "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), - ) - suggestion = await cursor.fetchone() - if not suggestion: - raise ValueError("Vorschlag nicht gefunden") - - suggestion = dict(suggestion) - - if suggestion["status"] != "pending": - raise ValueError(f"Vorschlag bereits {suggestion['status']}") - - new_status = "accepted" if accept else "rejected" - result = {"status": new_status, "action": None} - - if accept: - stype = suggestion["suggestion_type"] - data = ( - json.loads(suggestion["suggested_data"]) - if suggestion["suggested_data"] - else {} - ) - - if stype == "add_source": - name = data.get("name", "Unbenannt") - url = data.get("url") - domain = data.get("domain", "") - category = data.get("category", "sonstige") - source_type = "rss_feed" if url and any( - x in (url or "").lower() - for x in ("rss", "feed", "xml", "atom") - ) else "web_source" - - if url: - cursor = await db.execute( - "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", - (url,), - ) - if await cursor.fetchone(): - result["action"] = "übersprungen (URL bereits vorhanden)" - new_status = "rejected" - else: - await db.execute( - "INSERT INTO sources " - "(name, url, domain, source_type, category, status, " - "added_by, tenant_id) " - "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", - (name, url, domain, source_type, category), - ) - result["action"] = f"Quelle '{name}' angelegt" - else: - result["action"] = "übersprungen (keine URL)" - new_status = "rejected" - - elif stype == "deactivate_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute( - "UPDATE sources SET status = 'inactive' WHERE id = ?", - (source_id,), - ) - result["action"] = "Quelle deaktiviert" - else: - result["action"] = "übersprungen (keine source_id)" - - elif stype == "remove_source": - source_id = suggestion["source_id"] - if source_id: - await db.execute( - "DELETE FROM sources WHERE id = ?", (source_id,), - ) - result["action"] = "Quelle gelöscht" - else: - result["action"] = "übersprungen (keine source_id)" - - elif stype == "fix_url": - source_id = suggestion["source_id"] - new_url = data.get("url") - if source_id and new_url: - await db.execute( - "UPDATE sources SET url = ? WHERE id = ?", - (new_url, source_id), - ) - result["action"] = f"URL aktualisiert auf {new_url}" - else: - result["action"] = "übersprungen (keine source_id oder URL)" - - await db.execute( - "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " - "WHERE id = ?", - (new_status, suggestion_id), - ) - await db.commit() - - result["status"] = new_status - return result +"""KI-gestützte Quellen-Vorschläge via Haiku.""" +import json +import logging +import re + +import aiosqlite + +from agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.source_suggester") + + +async def generate_suggestions(db: aiosqlite.Connection) -> int: + """Generiert Quellen-Vorschläge basierend auf Health-Checks und Lückenanalyse.""" + logger.info("Starte Quellen-Vorschläge via Haiku...") + + # 1. Aktuelle Quellen laden + cursor = await db.execute( + "SELECT id, name, url, domain, source_type, category, status, " + "article_count, last_seen_at " + "FROM sources WHERE tenant_id IS NULL ORDER BY category, name" + ) + sources = [dict(row) for row in await cursor.fetchall()] + + # 2. Health-Check-Probleme laden + cursor = await db.execute(""" + SELECT h.source_id, s.name, s.domain, s.url, + h.check_type, h.status, h.message + FROM source_health_checks h + JOIN sources s ON s.id = h.source_id + WHERE h.status IN ('error', 'warning') + """) + issues = [dict(row) for row in await cursor.fetchall()] + + # 3. Alte pending-Vorschläge entfernen (älter als 30 Tage) + await db.execute( + "DELETE FROM source_suggestions " + "WHERE status = 'pending' AND created_at < datetime('now', '-30 days')" + ) + + # 4. Quellen-Zusammenfassung für Haiku + categories = {} + for s in sources: + cat = s["category"] + if cat not in categories: + categories[cat] = [] + categories[cat].append(s) + + source_summary = "" + for cat, cat_sources in sorted(categories.items()): + active = [ + s for s in cat_sources + if s["status"] == "active" and s["source_type"] != "excluded" + ] + source_summary += f"\n{cat} ({len(active)} aktiv): " + source_summary += ", ".join(s["name"] for s in active[:10]) + if len(active) > 10: + source_summary += f" ... (+{len(active) - 10} weitere)" + + issues_summary = "" + if issues: + issues_summary = "\n\nProbleme gefunden:\n" + for issue in issues[:20]: + issues_summary += ( + f"- [source_id={issue['source_id']}] {issue['name']} ({issue['domain']}): " + f"{issue['check_type']} = {issue['status']} - {issue['message']}\n" + ) + + prompt = f"""Du bist ein OSINT-Analyst und verwaltest die Quellensammlung eines Lagebildmonitors für Sicherheitsbehörden. + +Aktuelle Quellensammlung:{source_summary}{issues_summary} + +Aufgabe: Analysiere die Quellensammlung und schlage Verbesserungen vor. + +Beachte: +1. Bei Problemen (nicht erreichbar, leere Feeds): Schlage "deactivate_source" vor und setze "source_id" auf die ID aus [source_id=X] in der Problemliste +2. Fehlende wichtige OSINT-Quellen: Schlage "add_source" mit konkreter RSS-Feed-URL vor +3. Fokus auf deutschsprachige + wichtige internationale Nachrichtenquellen +4. Nur Quellen vorschlagen, die NICHT bereits vorhanden sind +5. Maximal 5 Vorschläge + +Antworte NUR mit einem JSON-Array. Jedes Element: +{{ + "type": "add_source|deactivate_source|fix_url|remove_source", + "title": "Kurzer Titel", + "description": "Begründung", + "priority": "low|medium|high", + "source_id": null, + "data": {{ + "name": "Anzeigename", + "url": "https://...", + "domain": "example.de", + "category": "international|nachrichtenagentur|qualitaetszeitung|behoerde|fachmedien|think-tank|regional|sonstige" + }} +}} + +Nur das JSON-Array, kein anderer Text.""" + + try: + response, usage = await call_claude( + prompt, tools=None, model=CLAUDE_MODEL_FAST, + ) + + json_match = re.search(r'\[.*\]', response, re.DOTALL) + if not json_match: + logger.warning("Keine Vorschläge von Haiku erhalten (kein JSON)") + return 0 + + suggestions = json.loads(json_match.group(0)) + + count = 0 + for suggestion in suggestions[:5]: + stype = suggestion.get("type", "add_source") + title = suggestion.get("title", "") + desc = suggestion.get("description", "") + priority = suggestion.get("priority", "medium") + source_id = suggestion.get("source_id") + data = json.dumps( + suggestion.get("data", {}), ensure_ascii=False, + ) + + # source_id validieren (muss existieren oder None sein) + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM sources WHERE id = ?", (source_id,), + ) + if not await cursor.fetchone(): + source_id = None + + # Duplikat-Check: gleicher Typ + gleiche source_id oder gleiche Domain pending? + if source_id is not None: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'", + (stype, source_id), + ) + else: + # Bei add_source ohne source_id: Domain aus suggested_data prüfen + check_domain = suggestion.get('data', {}).get('domain', '') + if check_domain: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE suggestion_type = ? AND suggested_data LIKE ? AND status = 'pending'", + (stype, f'%{check_domain}%'), + ) + else: + cursor = await db.execute( + "SELECT id FROM source_suggestions " + "WHERE title = ? AND status = 'pending'", + (title,), + ) + if await cursor.fetchone(): + continue + + await db.execute( + "INSERT INTO source_suggestions " + "(suggestion_type, title, description, source_id, " + "suggested_data, priority, status) " + "VALUES (?, ?, ?, ?, ?, ?, 'pending')", + (stype, title, desc, source_id, data, priority), + ) + count += 1 + + await db.commit() + logger.info( + f"Quellen-Vorschläge: {count} neue Vorschläge generiert " + f"(Haiku: {usage.input_tokens} in / {usage.output_tokens} out / " + f"${usage.cost_usd:.4f})" + ) + return count + + except Exception as e: + logger.error(f"Fehler bei Quellen-Vorschlägen: {e}", exc_info=True) + return 0 + + +async def apply_suggestion( + db: aiosqlite.Connection, suggestion_id: int, accept: bool, +) -> dict: + """Wendet einen Vorschlag an oder lehnt ihn ab.""" + cursor = await db.execute( + "SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,), + ) + suggestion = await cursor.fetchone() + if not suggestion: + raise ValueError("Vorschlag nicht gefunden") + + suggestion = dict(suggestion) + + if suggestion["status"] != "pending": + raise ValueError(f"Vorschlag bereits {suggestion['status']}") + + new_status = "accepted" if accept else "rejected" + result = {"status": new_status, "action": None} + + if accept: + stype = suggestion["suggestion_type"] + data = ( + json.loads(suggestion["suggested_data"]) + if suggestion["suggested_data"] + else {} + ) + + if stype == "add_source": + name = data.get("name", "Unbenannt") + url = data.get("url") + domain = data.get("domain", "") + category = data.get("category", "sonstige") + source_type = "rss_feed" if url and any( + x in (url or "").lower() + for x in ("rss", "feed", "xml", "atom") + ) else "web_source" + + if url: + cursor = await db.execute( + "SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", + (url,), + ) + if await cursor.fetchone(): + result["action"] = "übersprungen (URL bereits vorhanden)" + new_status = "rejected" + else: + await db.execute( + "INSERT INTO sources " + "(name, url, domain, source_type, category, status, " + "added_by, tenant_id) " + "VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)", + (name, url, domain, source_type, category), + ) + result["action"] = f"Quelle '{name}' angelegt" + else: + result["action"] = "übersprungen (keine URL)" + new_status = "rejected" + + elif stype == "deactivate_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "UPDATE sources SET status = 'inactive' WHERE id = ?", + (source_id,), + ) + result["action"] = "Quelle deaktiviert" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "remove_source": + source_id = suggestion["source_id"] + if source_id: + await db.execute( + "DELETE FROM sources WHERE id = ?", (source_id,), + ) + result["action"] = "Quelle gelöscht" + else: + result["action"] = "übersprungen (keine source_id)" + + elif stype == "fix_url": + source_id = suggestion["source_id"] + new_url = data.get("url") + if source_id and new_url: + await db.execute( + "UPDATE sources SET url = ? WHERE id = ?", + (new_url, source_id), + ) + result["action"] = f"URL aktualisiert auf {new_url}" + else: + result["action"] = "übersprungen (keine source_id oder URL)" + + await db.execute( + "UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP " + "WHERE id = ?", + (new_status, suggestion_id), + ) + await db.commit() + + result["status"] = new_status + return result