diff --git a/src/agents/factchecker.py b/src/agents/factchecker.py index 804f094..d21059d 100644 --- a/src/agents/factchecker.py +++ b/src/agents/factchecker.py @@ -1,418 +1,742 @@ -"""Factchecker-Agent: Prüft Fakten gegen mehrere unabhängige Quellen.""" -import json -import logging -import re -from difflib import SequenceMatcher -from agents.claude_client import call_claude, ClaudeUsage - -logger = logging.getLogger("osint.factchecker") - -FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System. -AUSGABESPRACHE: {output_language} -WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). - -VORFALL: {title} - -VORLIEGENDE MELDUNGEN: -{articles_text} - -STRENGE REGELN - KEINE HALLUZINATIONEN: -- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Meldungen stammen -- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Meldungen + WebSearch -- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen -- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren -- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als bestätigt markieren -- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten -- "confirmed" erst bei 2+ unabhängigen Quellen mit überprüfbarer URL -- Lieber "unconfirmed" als falsch bestätigt - -AUFTRAG: -1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Meldungen -2. Prüfe jeden Claim aktiv per WebSearch gegen mindestens eine weitere unabhängige Quelle -3. Kategorisiere jede Aussage: - - "confirmed": Durch 2+ unabhängige seriöse Quellen mit überprüfbarer URL bestätigt - - "unconfirmed": Nur 1 Quelle oder nicht unabhängig verifizierbar - - "contradicted": Widersprüchliche Informationen aus verschiedenen Quellen - - "developing": Situation noch unklar, entwickelt sich -4. Markiere WICHTIGE NEUE Entwicklungen mit is_notification: true - -Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat: -- "claim": Die Faktenaussage auf {output_language} -- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing" -- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL -- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg (z.B. "Bestätigt durch: tagesschau.de (URL), Reuters (URL)") -- "is_notification": true/false (nur bei wichtigen Entwicklungen true) - -Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" - -RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System. -AUSGABESPRACHE: {output_language} -WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). - -THEMA: {title} - -VORLIEGENDE QUELLEN: -{articles_text} - -STRENGE REGELN - KEINE HALLUZINATIONEN: -- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Quellen stammen -- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Quellen + WebSearch -- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen -- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren -- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als gesichert markieren -- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten -- Lieber "unverified" als falsch bestätigt - -AUFTRAG: -Fokus: "Was sind die gesicherten Fakten zu diesem Thema?" - -1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Quellen -2. Prüfe jeden Claim aktiv per WebSearch gegen weitere unabhängige Quellen -3. Kategorisiere jede Aussage: - - "established": Breit dokumentierter, gesicherter Fakt (3+ unabhängige Quellen mit URL) - - "disputed": Umstrittener Sachverhalt, verschiedene Positionen dokumentiert - - "unverified": Einzelbehauptung, nicht unabhängig verifizierbar - - "developing": Aktuelle Entwicklung, Faktenlage noch im Fluss -4. Markiere WICHTIGE Erkenntnisse mit is_notification: true - -Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat: -- "claim": Die Faktenaussage auf {output_language} -- "status": "established" | "disputed" | "unverified" | "developing" -- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL -- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg -- "is_notification": true/false - -Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" - -# --- Inkrementelle Faktencheck-Prompts (für Folge-Refreshes) --- - -INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System. -AUSGABESPRACHE: {output_language} -WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). - -VORFALL: {title} - -BEREITS GEPRÜFTE FAKTEN: -{existing_facts_text} - -NEUE MELDUNGEN: -{articles_text} - -STRENGE REGELN - KEINE HALLUZINATIONEN: -- Du darfst NUR Fakten bewerten, die aus den Meldungen oder bereits geprüften Fakten stammen -- KEINE Fakten aus deinem Trainingskorpus -- Nutze WebSearch zur Verifikation -- Rufe gefundene URLs per WebFetch ab - -AUFTRAG: -1. Prüfe ob die neuen Meldungen bereits geprüfte Fakten BESTÄTIGEN, WIDERLEGEN oder ERGÄNZEN -2. Aktualisiere den Status bestehender Fakten wenn nötig (z.B. "unconfirmed" → "confirmed") -3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Meldungen -4. Prüfe neue Claims per WebSearch gegen unabhängige Quellen -5. Markiere wichtige Statusänderungen und neue Entwicklungen mit is_notification: true - -Status-Kategorien: -- "confirmed": 2+ unabhängige seriöse Quellen mit URL -- "unconfirmed": Nur 1 Quelle -- "contradicted": Widersprüchliche Informationen -- "developing": Situation unklar - -Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). -Jedes Element hat: -- "claim": Die Faktenaussage auf {output_language} -- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing" -- "sources_count": Anzahl unabhängiger Quellen -- "evidence": Begründung MIT konkreten Quellen-URLs -- "is_notification": true/false - -Antworte NUR mit dem JSON-Array.""" - -INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System. -AUSGABESPRACHE: {output_language} -WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). - -THEMA: {title} - -BEREITS GEPRÜFTE FAKTEN: -{existing_facts_text} - -NEUE QUELLEN: -{articles_text} - -STRENGE REGELN - KEINE HALLUZINATIONEN: -- Du darfst NUR Fakten bewerten, die aus den Quellen oder bereits geprüften Fakten stammen -- KEINE Fakten aus deinem Trainingskorpus -- Nutze WebSearch zur Verifikation -- Rufe gefundene URLs per WebFetch ab - -AUFTRAG: -1. Prüfe ob die neuen Quellen bereits geprüfte Fakten bestätigen, widerlegen oder ergänzen -2. Aktualisiere den Status bestehender Fakten wenn nötig -3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Quellen -4. Prüfe neue Claims per WebSearch - -Status-Kategorien: -- "established": 3+ unabhängige Quellen mit URL -- "disputed": Verschiedene Positionen dokumentiert -- "unverified": Nicht unabhängig verifizierbar -- "developing": Faktenlage im Fluss - -Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). -Jedes Element hat: -- "claim": Die Faktenaussage auf {output_language} -- "status": "established" | "disputed" | "unverified" | "developing" -- "sources_count": Anzahl unabhängiger Quellen -- "evidence": Begründung MIT konkreten Quellen-URLs -- "is_notification": true/false - -Antworte NUR mit dem JSON-Array.""" - - -# --- Stopwords fuer Keyword-Extraktion --- -_STOPWORDS = frozenset({ - "der", "die", "das", "ein", "eine", "und", "oder", "von", "nach", "bei", "mit", - "wurde", "wird", "haben", "sein", "dass", "ist", "sind", "hat", "vor", "fuer", - "den", "dem", "des", "sich", "auf", "als", "auch", "noch", "nicht", "aber", - "ueber", "durch", "einer", "einem", "eines", "werden", "wurde", "waren", - "the", "and", "was", "has", "been", "have", "that", "with", "from", "for", - "are", "were", "this", "which", "into", "their", "than", "about", -}) - -STATUS_PRIORITY = { - "confirmed": 5, "established": 5, - "contradicted": 4, "disputed": 4, - "unconfirmed": 3, "unverified": 3, - "developing": 1, -} - - -def normalize_claim(claim: str) -> str: - """Normalisiert einen Claim fuer Aehnlichkeitsvergleich.""" - c = claim.lower().strip() - c = c.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue").replace("\u00df", "ss") - c = re.sub(r'[^\w\s]', '', c) - c = re.sub(r'\s+', ' ', c).strip() - return c - - -def _keyword_set(text: str) -> set[str]: - """Extrahiert signifikante Woerter fuer Overlap-Vergleich.""" - words = set(normalize_claim(text).split()) - return {w for w in words if len(w) >= 4 and w not in _STOPWORDS} - - -def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.75) -> dict | None: - """Findet den besten passenden bestehenden Claim per kombiniertem Scoring. - - Verwendet SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%) fuer robusteres Matching. - """ - norm_new = normalize_claim(new_claim) - if not norm_new: - return None - - kw_new = _keyword_set(new_claim) - best_match = None - best_score = 0.0 - - for existing in existing_claims: - norm_existing = normalize_claim(existing.get("claim", "")) - if not norm_existing: - continue - - # Fruehzeitiger Abbruch bei grossem Laengenunterschied - len_ratio = len(norm_new) / len(norm_existing) if norm_existing else 0 - if len_ratio > 2.5 or len_ratio < 0.4: - continue - - seq_ratio = SequenceMatcher(None, norm_new, norm_existing).ratio() - - kw_existing = _keyword_set(existing.get("claim", "")) - kw_union = kw_new | kw_existing - jaccard = len(kw_new & kw_existing) / len(kw_union) if kw_union else 0.0 - - combined = 0.7 * seq_ratio + 0.3 * jaccard - - if combined > best_score: - best_score = combined - best_match = existing - - if best_score >= threshold: - logger.debug( - f"Claim-Match ({best_score:.2f}): " - f"'{new_claim[:50]}...' -> '{best_match['claim'][:50]}...'" - ) - return best_match - return None - - -def deduplicate_new_facts(facts: list[dict], threshold: float = 0.70) -> list[dict]: - """Dedupliziert Fakten aus einer einzelnen LLM-Antwort vor dem DB-Insert. - - Clustert aehnliche Claims und behaelt pro Cluster den mit dem - hoechsten Status und den meisten Quellen. - """ - if not facts: - return [] - - clusters: list[list[dict]] = [] - for fact in facts: - matched_cluster = None - for cluster in clusters: - if find_matching_claim(fact.get("claim", ""), cluster, threshold=threshold): - matched_cluster = cluster - break - if matched_cluster is not None: - matched_cluster.append(fact) - else: - clusters.append([fact]) - - result = [] - for cluster in clusters: - best = max(cluster, key=lambda f: ( - STATUS_PRIORITY.get(f.get("status", "developing"), 0), - f.get("sources_count", 0), - )) - result.append(best) - - if len(result) < len(facts): - logger.info( - f"Fakten-Dedup: {len(facts)} -> {len(result)} " - f"(-{len(facts) - len(result)} Duplikate)" - ) - return result - - -class FactCheckerAgent: - """Prüft Fakten über Claude CLI gegen unabhängige Quellen.""" - - def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str: - """Formatiert Artikel als Text für den Prompt.""" - articles_text = "" - for i, article in enumerate(articles[:max_articles]): - articles_text += f"\n--- Meldung {i+1} ---\n" - articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" - source_url = article.get('source_url', '') - if source_url: - articles_text += f"URL: {source_url}\n" - headline = article.get('headline_de') or article.get('headline', '') - articles_text += f"Überschrift: {headline}\n" - content = article.get('content_de') or article.get('content_original', '') - if content: - articles_text += f"Inhalt: {content[:500]}\n" - return articles_text - - def _format_existing_facts(self, facts: list[dict]) -> str: - """Formatiert bestehende Fakten als Text für den inkrementellen Prompt.""" - if not facts: - return "Keine bisherigen Fakten" - lines = [] - for fc in facts: - status = fc.get("status", "developing") - claim = fc.get("claim", "") - sources = fc.get("sources_count", 0) - lines.append(f"- [{status}] ({sources} Quellen) {claim}") - return "\n".join(lines) - - async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]: - """Führt vollständigen Faktencheck durch (erster Refresh).""" - if not articles: - return [], None - - articles_text = self._format_articles_text(articles) - - from config import OUTPUT_LANGUAGE - template = RESEARCH_FACTCHECK_PROMPT_TEMPLATE if incident_type == "research" else FACTCHECK_PROMPT_TEMPLATE - prompt = template.format( - title=title, - articles_text=articles_text, - output_language=OUTPUT_LANGUAGE, - ) - - try: - result, usage = await call_claude(prompt) - facts = self._parse_response(result) - logger.info(f"Faktencheck: {len(facts)} Fakten geprüft") - return facts, usage - except TimeoutError: - raise # Timeout nach oben durchreichen fuer Retry im Orchestrator - except Exception as e: - logger.error(f"Faktencheck-Fehler: {e}") - return [], None - - async def check_incremental( - self, - title: str, - new_articles: list[dict], - existing_facts: list[dict], - incident_type: str = "adhoc", - ) -> tuple[list[dict], ClaudeUsage | None]: - """Inkrementeller Faktencheck: Prüft nur neue Artikel gegen bestehende Fakten. - - Spart Tokens, da nur neue Artikel + Zusammenfassung der bestehenden Fakten gesendet werden. - """ - if not new_articles: - logger.info("Inkrementeller Faktencheck übersprungen: keine neuen Artikel") - return [], None - - articles_text = self._format_articles_text(new_articles, max_articles=15) - existing_facts_text = self._format_existing_facts(existing_facts) - - from config import OUTPUT_LANGUAGE - if incident_type == "research": - template = INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE - else: - template = INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE - - prompt = template.format( - title=title, - articles_text=articles_text, - existing_facts_text=existing_facts_text, - output_language=OUTPUT_LANGUAGE, - ) - - try: - result, usage = await call_claude(prompt) - facts = self._parse_response(result) - logger.info(f"Inkrementeller Faktencheck: {len(facts)} Fakten (neu + aktualisiert)") - return facts, usage - except TimeoutError: - raise # Timeout nach oben durchreichen fuer Retry im Orchestrator - except Exception as e: - logger.error(f"Inkrementeller Faktencheck-Fehler: {e}") - return [], None - - def _validate_facts(self, facts: list[dict]) -> list[dict]: - """Validiert Fakten: confirmed/established ohne URL wird herabgestuft.""" - url_pattern = re.compile(r'https?://') - for fact in facts: - status = fact.get("status", "") - evidence = fact.get("evidence") or "" - if status in ("confirmed", "established") and not url_pattern.search(evidence): - old_status = status - fact["status"] = "unconfirmed" if status == "confirmed" else "unverified" - logger.warning( - f"Fakt herabgestuft ({old_status} -> {fact['status']}): " - f"keine URL in Evidenz: '{fact.get('claim', '')[:60]}...'" - ) - return facts - - def _parse_response(self, response: str) -> list[dict]: - """Parst die Claude-Antwort als JSON-Array.""" - try: - data = json.loads(response) - if isinstance(data, list): - return self._validate_facts(data) - except json.JSONDecodeError: - pass - - match = re.search(r'\[.*\]', response, re.DOTALL) - if match: - try: - data = json.loads(match.group()) - if isinstance(data, list): - return self._validate_facts(data) - except json.JSONDecodeError: - pass - - logger.warning("Konnte Faktencheck-Antwort nicht als JSON parsen") - return [] +"""Factchecker-Agent: Prüft Fakten gegen mehrere unabhängige Quellen.""" +import asyncio +import json +import logging +import re +from difflib import SequenceMatcher +from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator + +logger = logging.getLogger("osint.factchecker") + +FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +VORFALL: {title} + +VORLIEGENDE MELDUNGEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Meldungen stammen +- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Meldungen + WebSearch +- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen +- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren +- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als bestätigt markieren +- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten +- "confirmed" erst bei 2+ unabhängigen Quellen mit überprüfbarer URL +- Lieber "unconfirmed" als falsch bestätigt + +AUFTRAG: +1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Meldungen +2. Prüfe jeden Claim aktiv per WebSearch gegen mindestens eine weitere unabhängige Quelle +3. Kategorisiere jede Aussage: + - "confirmed": Durch 2+ unabhängige seriöse Quellen mit überprüfbarer URL bestätigt + - "unconfirmed": Nur 1 Quelle oder nicht unabhängig verifizierbar + - "contradicted": Widersprüchliche Informationen aus verschiedenen Quellen + - "developing": Situation noch unklar, entwickelt sich +4. Markiere WICHTIGE NEUE Entwicklungen mit is_notification: true + +Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing" +- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL +- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg (z.B. "Bestätigt durch: tagesschau.de (URL), Reuters (URL)") +- "is_notification": true/false (nur bei wichtigen Entwicklungen true) + +Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" + +RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +THEMA: {title} + +VORLIEGENDE QUELLEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Quellen stammen +- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Quellen + WebSearch +- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen +- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren +- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als gesichert markieren +- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten +- Lieber "unverified" als falsch bestätigt + +AUFTRAG: +Fokus: "Was sind die gesicherten Fakten zu diesem Thema?" + +1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Quellen +2. Prüfe jeden Claim aktiv per WebSearch gegen weitere unabhängige Quellen +3. Kategorisiere jede Aussage: + - "established": Breit dokumentierter, gesicherter Fakt (3+ unabhängige Quellen mit URL) + - "disputed": Umstrittener Sachverhalt, verschiedene Positionen dokumentiert + - "unverified": Einzelbehauptung, nicht unabhängig verifizierbar + - "developing": Aktuelle Entwicklung, Faktenlage noch im Fluss +4. Markiere WICHTIGE Erkenntnisse mit is_notification: true + +Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "established" | "disputed" | "unverified" | "developing" +- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL +- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg +- "is_notification": true/false + +Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" + +# --- Inkrementelle Faktencheck-Prompts (für Folge-Refreshes) --- + +INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +VORFALL: {title} + +BEREITS GEPRÜFTE FAKTEN: +{existing_facts_text} + +NEUE MELDUNGEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die aus den Meldungen oder bereits geprüften Fakten stammen +- KEINE Fakten aus deinem Trainingskorpus +- Nutze WebSearch zur Verifikation +- Rufe gefundene URLs per WebFetch ab + +AUFTRAG: +1. Prüfe ob die neuen Meldungen bereits geprüfte Fakten BESTÄTIGEN, WIDERLEGEN oder ERGÄNZEN +2. Aktualisiere den Status bestehender Fakten wenn nötig (z.B. "unconfirmed" → "confirmed") +3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Meldungen +4. Prüfe neue Claims per WebSearch gegen unabhängige Quellen +5. Markiere wichtige Statusänderungen und neue Entwicklungen mit is_notification: true + +Status-Kategorien: +- "confirmed": 2+ unabhängige seriöse Quellen mit URL +- "unconfirmed": Nur 1 Quelle +- "contradicted": Widersprüchliche Informationen +- "developing": Situation unklar + +Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). +Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing" +- "sources_count": Anzahl unabhängiger Quellen +- "evidence": Begründung MIT konkreten Quellen-URLs +- "is_notification": true/false + +Antworte NUR mit dem JSON-Array.""" + +INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +THEMA: {title} + +BEREITS GEPRÜFTE FAKTEN: +{existing_facts_text} + +NEUE QUELLEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die aus den Quellen oder bereits geprüften Fakten stammen +- KEINE Fakten aus deinem Trainingskorpus +- Nutze WebSearch zur Verifikation +- Rufe gefundene URLs per WebFetch ab + +AUFTRAG: +1. Prüfe ob die neuen Quellen bereits geprüfte Fakten bestätigen, widerlegen oder ergänzen +2. Aktualisiere den Status bestehender Fakten wenn nötig +3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Quellen +4. Prüfe neue Claims per WebSearch + +Status-Kategorien: +- "established": 3+ unabhängige Quellen mit URL +- "disputed": Verschiedene Positionen dokumentiert +- "unverified": Nicht unabhängig verifizierbar +- "developing": Faktenlage im Fluss + +Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). +Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "established" | "disputed" | "unverified" | "developing" +- "sources_count": Anzahl unabhängiger Quellen +- "evidence": Begründung MIT konkreten Quellen-URLs +- "is_notification": true/false + +Antworte NUR mit dem JSON-Array.""" + +# --- Zwei-Phasen-Faktencheck: Prompt-Templates --- + +TRIAGE_PROMPT_TEMPLATE = """Du bist ein Triage-System für Faktenchecks eines OSINT-Lagemonitoring-Systems. +AUSGABESPRACHE: {output_language} + +BESTEHENDE FAKTENAUSSAGEN ({fact_count} Stück): +{existing_facts_text} + +NEUE NACHRICHTENARTIKEL ({article_count} Stück): +{articles_text} + +AUFGABE: +Analysiere die neuen Artikel und identifiziere: + +1. BETROFFENE BESTEHENDE FAKTEN: Welche der bestehenden Fakten könnten durch die neuen Artikel + betroffen sein? (neue Bestätigung, Widerlegung, neue Evidenz, Status-Update nötig) + +2. NEUE FAKTENAUSSAGEN: Welche neuen prüfbaren Faktenaussagen enthalten die Artikel, + die noch nicht in den bestehenden Fakten erfasst sind? (3-5 neue Claims) + +3. GRUPPIERUNG: Gruppiere verwandte Fakten thematisch für die parallele Batch-Verarbeitung. + Verwandte Fakten MÜSSEN in derselben Gruppe sein. Max {max_per_group} Fakten pro Gruppe. + +WICHTIG: +- Sei GRÜNDLICH — übersehe keine semantischen Verbindungen +- Auch indirekte Verbindungen beachten (diplomatisch <-> militärisch <-> wirtschaftlich) +- Alle developing/unconfirmed/unverified Fakten IMMER einbeziehen +- Max {max_per_group} Fakten pro Gruppe (teile große Gruppen auf) + +Antworte AUSSCHLIESSLICH als JSON: +{{{{ + "affected_fact_ids": [int, ...], + "new_claims": ["Prüfbare Faktenaussage als vollständiger Satz", ...], + "groups": [ + {{{{ + "id": 1, + "theme": "Themenbezeichnung", + "fact_ids": [int, ...], + "new_claim_indices": [int, ...] + }}}} + ], + "total_affected": int, + "reasoning": "Kurze Begründung" +}}}}""" + +VERIFY_GROUP_PROMPT_TEMPLATE = """Du prüfst Faktenaussagen gegen unabhängige Quellen in einem OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +THEMA DIESER GRUPPE: {theme} + +ZU PRÜFENDE BESTEHENDE FAKTEN: +{facts_text} + +NEUE CLAIMS ZU PRÜFEN: +{new_claims_text} + +KONTEXT (neue Nachrichtenartikel): +{articles_text} + +ANWEISUNGEN: +Für JEDE Faktenaussage (bestehend UND neu): +1. Suche per WebSearch nach unabhängigen Bestätigungen oder Widerlegungen +2. Bewerte den Status: + - "confirmed": Mindestens 2 unabhängige Quellen mit verifizierter URL bestätigen + - "unconfirmed": Nicht genug unabhängige Bestätigung + - "contradicted": Glaubwürdige Quellen widersprechen + - "developing": Lage noch im Fluss +3. Dokumentiere die Evidenz mit konkreten, verifizierten URLs + +QUALITÄTSREGELN: +- "confirmed" NUR bei mindestens 2 unabhängigen Quellen mit ECHTER URL +- KEINE Halluzinationen — nur tatsächlich per WebSearch gefundene Quellen +- Bestehende Evidenz BEIBEHALTEN, nur neue ergänzen +- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten + +Antworte AUSSCHLIESSLICH als JSON-Array: +[ + {{{{ + "id": 123, + "claim": "Die Faktenaussage...", + "status": "confirmed|unconfirmed|contradicted|developing", + "sources_count": 3, + "evidence": "Bestätigt durch: Quelle1 (URL)\\nQuelle2 (URL)", + "is_notification": false + }}}} +] + +Für NEUE Fakten setze id auf null.""" + +VERIFY_GROUP_RESEARCH_PROMPT_TEMPLATE = """Du prüfst Faktenaussagen gegen unabhängige Quellen in einem OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} +WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). + +THEMA DIESER GRUPPE: {theme} + +ZU PRÜFENDE BESTEHENDE FAKTEN: +{facts_text} + +NEUE CLAIMS ZU PRÜFEN: +{new_claims_text} + +KONTEXT (neue Quellen): +{articles_text} + +ANWEISUNGEN: +Für JEDE Faktenaussage (bestehend UND neu): +1. Suche per WebSearch nach unabhängigen Bestätigungen oder Widerlegungen +2. Bewerte den Status: + - "established": 3+ unabhängige Quellen mit verifizierter URL + - "unverified": Nicht unabhängig verifizierbar + - "disputed": Verschiedene Positionen dokumentiert + - "developing": Faktenlage im Fluss +3. Dokumentiere die Evidenz mit konkreten, verifizierten URLs + +QUALITÄTSREGELN: +- "established" NUR bei mindestens 3 unabhängigen Quellen mit ECHTER URL +- KEINE Halluzinationen — nur tatsächlich per WebSearch gefundene Quellen +- Bestehende Evidenz BEIBEHALTEN, nur neue ergänzen + +Antworte AUSSCHLIESSLICH als JSON-Array: +[ + {{{{ + "id": 123, + "claim": "Die Faktenaussage...", + "status": "established|unverified|disputed|developing", + "sources_count": 3, + "evidence": "Bestätigt durch: Quelle1 (URL)\\nQuelle2 (URL)", + "is_notification": false + }}}} +] + +Für NEUE Fakten setze id auf null.""" + + +# --- Stopwords fuer Keyword-Extraktion --- +_STOPWORDS = frozenset({ + "der", "die", "das", "ein", "eine", "und", "oder", "von", "nach", "bei", "mit", + "wurde", "wird", "haben", "sein", "dass", "ist", "sind", "hat", "vor", "fuer", + "den", "dem", "des", "sich", "auf", "als", "auch", "noch", "nicht", "aber", + "ueber", "durch", "einer", "einem", "eines", "werden", "wurde", "waren", + "the", "and", "was", "has", "been", "have", "that", "with", "from", "for", + "are", "were", "this", "which", "into", "their", "than", "about", +}) + +STATUS_PRIORITY = { + "confirmed": 5, "established": 5, + "contradicted": 4, "disputed": 4, + "unconfirmed": 3, "unverified": 3, + "developing": 1, +} + +# Zwei-Phasen-Faktencheck: Konfiguration +MAX_FACTS_PER_VERIFY_GROUP = 8 # Max Fakten pro Verifikationsgruppe +TWOPHASE_MIN_FACTS = 25 # Ab dieser Anzahl bestehender Fakten wird der Zwei-Phasen-Ansatz genutzt + + +def normalize_claim(claim: str) -> str: + """Normalisiert einen Claim fuer Aehnlichkeitsvergleich.""" + c = claim.lower().strip() + c = c.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue").replace("\u00df", "ss") + c = re.sub(r'[^\w\s]', '', c) + c = re.sub(r'\s+', ' ', c).strip() + return c + + +def _keyword_set(text: str) -> set[str]: + """Extrahiert signifikante Woerter fuer Overlap-Vergleich.""" + words = set(normalize_claim(text).split()) + return {w for w in words if len(w) >= 4 and w not in _STOPWORDS} + + +def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.75) -> dict | None: + """Findet den besten passenden bestehenden Claim per kombiniertem Scoring. + + Verwendet SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%) fuer robusteres Matching. + """ + norm_new = normalize_claim(new_claim) + if not norm_new: + return None + + kw_new = _keyword_set(new_claim) + best_match = None + best_score = 0.0 + + for existing in existing_claims: + norm_existing = normalize_claim(existing.get("claim", "")) + if not norm_existing: + continue + + # Fruehzeitiger Abbruch bei grossem Laengenunterschied + len_ratio = len(norm_new) / len(norm_existing) if norm_existing else 0 + if len_ratio > 2.5 or len_ratio < 0.4: + continue + + seq_ratio = SequenceMatcher(None, norm_new, norm_existing).ratio() + + kw_existing = _keyword_set(existing.get("claim", "")) + kw_union = kw_new | kw_existing + jaccard = len(kw_new & kw_existing) / len(kw_union) if kw_union else 0.0 + + combined = 0.7 * seq_ratio + 0.3 * jaccard + + if combined > best_score: + best_score = combined + best_match = existing + + if best_score >= threshold: + logger.debug( + f"Claim-Match ({best_score:.2f}): " + f"'{new_claim[:50]}...' -> '{best_match['claim'][:50]}...'" + ) + return best_match + return None + + +def deduplicate_new_facts(facts: list[dict], threshold: float = 0.70) -> list[dict]: + """Dedupliziert Fakten aus einer einzelnen LLM-Antwort vor dem DB-Insert. + + Clustert aehnliche Claims und behaelt pro Cluster den mit dem + hoechsten Status und den meisten Quellen. + """ + if not facts: + return [] + + clusters: list[list[dict]] = [] + for fact in facts: + matched_cluster = None + for cluster in clusters: + if find_matching_claim(fact.get("claim", ""), cluster, threshold=threshold): + matched_cluster = cluster + break + if matched_cluster is not None: + matched_cluster.append(fact) + else: + clusters.append([fact]) + + result = [] + for cluster in clusters: + best = max(cluster, key=lambda f: ( + STATUS_PRIORITY.get(f.get("status", "developing"), 0), + f.get("sources_count", 0), + )) + result.append(best) + + if len(result) < len(facts): + logger.info( + f"Fakten-Dedup: {len(facts)} -> {len(result)} " + f"(-{len(facts) - len(result)} Duplikate)" + ) + return result + + +class FactCheckerAgent: + """Prüft Fakten über Claude CLI gegen unabhängige Quellen.""" + + def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str: + """Formatiert Artikel als Text für den Prompt.""" + articles_text = "" + for i, article in enumerate(articles[:max_articles]): + articles_text += f"\n--- Meldung {i+1} ---\n" + articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" + source_url = article.get('source_url', '') + if source_url: + articles_text += f"URL: {source_url}\n" + headline = article.get('headline_de') or article.get('headline', '') + articles_text += f"Überschrift: {headline}\n" + content = article.get('content_de') or article.get('content_original', '') + if content: + articles_text += f"Inhalt: {content[:500]}\n" + return articles_text + + def _format_existing_facts(self, facts: list[dict]) -> str: + """Formatiert bestehende Fakten als Text für den inkrementellen Prompt.""" + if not facts: + return "Keine bisherigen Fakten" + lines = [] + for fc in facts: + status = fc.get("status", "developing") + claim = fc.get("claim", "") + sources = fc.get("sources_count", 0) + lines.append(f"- [{status}] ({sources} Quellen) {claim}") + return "\n".join(lines) + + async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]: + """Führt vollständigen Faktencheck durch (erster Refresh).""" + if not articles: + return [], None + + articles_text = self._format_articles_text(articles) + + from config import OUTPUT_LANGUAGE + template = RESEARCH_FACTCHECK_PROMPT_TEMPLATE if incident_type == "research" else FACTCHECK_PROMPT_TEMPLATE + prompt = template.format( + title=title, + articles_text=articles_text, + output_language=OUTPUT_LANGUAGE, + ) + + try: + result, usage = await call_claude(prompt) + facts = self._parse_response(result) + logger.info(f"Faktencheck: {len(facts)} Fakten geprüft") + return facts, usage + except TimeoutError: + raise # Timeout nach oben durchreichen fuer Retry im Orchestrator + except Exception as e: + logger.error(f"Faktencheck-Fehler: {e}") + return [], None + + async def check_incremental( + self, + title: str, + new_articles: list[dict], + existing_facts: list[dict], + incident_type: str = "adhoc", + ) -> tuple[list[dict], ClaudeUsage | None]: + """Inkrementeller Faktencheck: Prüft nur neue Artikel gegen bestehende Fakten. + + Spart Tokens, da nur neue Artikel + Zusammenfassung der bestehenden Fakten gesendet werden. + """ + if not new_articles: + logger.info("Inkrementeller Faktencheck übersprungen: keine neuen Artikel") + return [], None + + articles_text = self._format_articles_text(new_articles, max_articles=15) + existing_facts_text = self._format_existing_facts(existing_facts) + + from config import OUTPUT_LANGUAGE + if incident_type == "research": + template = INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE + else: + template = INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE + + prompt = template.format( + title=title, + articles_text=articles_text, + existing_facts_text=existing_facts_text, + output_language=OUTPUT_LANGUAGE, + ) + + try: + result, usage = await call_claude(prompt) + facts = self._parse_response(result) + logger.info(f"Inkrementeller Faktencheck: {len(facts)} Fakten (neu + aktualisiert)") + return facts, usage + except TimeoutError: + raise # Timeout nach oben durchreichen fuer Retry im Orchestrator + except Exception as e: + logger.error(f"Inkrementeller Faktencheck-Fehler: {e}") + return [], None + + async def check_incremental_twophase( + self, + title: str, + new_articles: list[dict], + existing_facts: list[dict], + incident_type: str = "adhoc", + ) -> tuple[list[dict], ClaudeUsage | None]: + """Zwei-Phasen inkrementeller Faktencheck: Haiku-Triage + parallele Opus-Verifikation. + + Phase 1: Haiku identifiziert betroffene Fakten und neue Claims (schnell, günstig) + Phase 2: Opus verifiziert nur die betroffenen Fakten parallel per WebSearch + + Fällt bei Triage-Fehler auf den Standard-check_incremental zurück. + """ + if not new_articles: + logger.info("Zwei-Phasen-Faktencheck übersprungen: keine neuen Artikel") + return [], None + + usage_acc = UsageAccumulator() + + # --- Phase 1: Triage (Haiku) --- + logger.info(f"Zwei-Phasen-Faktencheck Phase 1: Triage ({len(existing_facts)} Fakten, {len(new_articles)} Artikel)") + + triage_facts_text = self._format_facts_for_triage(existing_facts) + articles_text = self._format_articles_text(new_articles, max_articles=15) + + from config import OUTPUT_LANGUAGE, CLAUDE_MODEL_FAST + triage_prompt = TRIAGE_PROMPT_TEMPLATE.format( + output_language=OUTPUT_LANGUAGE, + fact_count=len(existing_facts), + existing_facts_text=triage_facts_text, + article_count=len(new_articles), + articles_text=articles_text, + max_per_group=MAX_FACTS_PER_VERIFY_GROUP, + ) + + try: + triage_result, triage_usage = await call_claude( + triage_prompt, tools=None, model=CLAUDE_MODEL_FAST + ) + if triage_usage: + usage_acc.add(triage_usage) + except Exception as e: + logger.warning(f"Triage fehlgeschlagen, Fallback auf Standard-Faktencheck: {e}") + return await self.check_incremental(title, new_articles, existing_facts, incident_type) + + # Triage-Ergebnis parsen + triage = self._parse_triage_response(triage_result) + if not triage: + logger.warning("Triage-Antwort nicht parsbar, Fallback auf Standard-Faktencheck") + return await self.check_incremental(title, new_articles, existing_facts, incident_type) + + affected_ids = set(triage.get("affected_fact_ids", [])) + new_claims = triage.get("new_claims", []) + groups = triage.get("groups", []) + + if not groups: + logger.warning("Triage hat keine Gruppen identifiziert, Fallback auf Standard-Faktencheck") + return await self.check_incremental(title, new_articles, existing_facts, incident_type) + + logger.info( + f"Triage: {len(affected_ids)} betroffene Fakten, {len(new_claims)} neue Claims, {len(groups)} Gruppen" + ) + + # --- Phase 2: Parallele Verifikation (Opus) --- + fact_lookup = {f["id"]: f for f in existing_facts} + + async def _verify_one_group(group: dict) -> tuple[list[dict], ClaudeUsage | None]: + """Verifiziert eine einzelne Faktengruppe.""" + group_fact_ids = group.get("fact_ids", []) + group_new_indices = group.get("new_claim_indices", []) + theme = group.get("theme", "Allgemein") + + group_facts = [fact_lookup[fid] for fid in group_fact_ids if fid in fact_lookup] + group_claims = [new_claims[i] for i in group_new_indices if i < len(new_claims)] + + if not group_facts and not group_claims: + return [], None + + # Fakten formatieren (mit Evidenz für Kontext) + facts_text = self._format_facts_for_verify(group_facts) if group_facts else "Keine bestehenden Fakten in dieser Gruppe." + new_claims_text = "\n".join(f"- {c}" for c in group_claims) if group_claims else "Keine neuen Claims." + + if incident_type == "research": + template = VERIFY_GROUP_RESEARCH_PROMPT_TEMPLATE + else: + template = VERIFY_GROUP_PROMPT_TEMPLATE + + prompt = template.format( + output_language=OUTPUT_LANGUAGE, + theme=theme, + facts_text=facts_text, + new_claims_text=new_claims_text, + articles_text=articles_text, + ) + + try: + result, v_usage = await call_claude(prompt) + facts = self._parse_response(result) + logger.info(f"Gruppe '{theme}': {len(facts)} Fakten geprüft") + return facts, v_usage + except TimeoutError: + logger.error(f"Gruppe '{theme}': Timeout") + return [], None + except Exception as e: + logger.error(f"Gruppe '{theme}': Fehler: {e}") + return [], None + + logger.info(f"Zwei-Phasen-Faktencheck Phase 2: {len(groups)} Gruppen parallel verifizieren") + + # Alle Gruppen parallel starten + group_results = await asyncio.gather( + *(_verify_one_group(g) for g in groups) + ) + + # Ergebnisse zusammenführen + all_facts = [] + for facts, v_usage in group_results: + all_facts.extend(facts) + if v_usage: + usage_acc.add(v_usage) + + # Kombinierte Usage erstellen + combined_usage = ClaudeUsage( + input_tokens=usage_acc.input_tokens, + output_tokens=usage_acc.output_tokens, + cache_creation_tokens=usage_acc.cache_creation_tokens, + cache_read_tokens=usage_acc.cache_read_tokens, + cost_usd=usage_acc.total_cost_usd, + duration_ms=0, + ) + + logger.info( + f"Zwei-Phasen-Faktencheck abgeschlossen: {len(all_facts)} Fakten " + f"({usage_acc.call_count} API-Calls, ${usage_acc.total_cost_usd:.4f})" + ) + + return all_facts, combined_usage + + def _format_facts_for_triage(self, facts: list[dict]) -> str: + """Formatiert Fakten kompakt mit IDs für die Triage.""" + lines = [] + for f in facts: + status = f.get("status", "developing") + claim = f.get("claim", "") + sources = f.get("sources_count", 0) + fid = f.get("id", "?") + lines.append(f"- [ID:{fid}] [{status}] ({sources} Quellen) {claim}") + return "\n".join(lines) + + def _format_facts_for_verify(self, facts: list[dict]) -> str: + """Formatiert Fakten detailliert für die Verifikation.""" + parts = [] + for f in facts: + evidence = (f.get("evidence") or "")[:300] + parts.append( + f"ID: {f.get('id', '?')}\n" + f"Status: {f.get('status', 'developing')}\n" + f"Claim: {f.get('claim', '')}\n" + f"Bisherige Evidenz: {evidence}\n" + f"Quellen: {f.get('sources_count', 0)}" + ) + return "\n---\n".join(parts) + + def _parse_triage_response(self, response: str) -> dict | None: + """Parst die Triage-Antwort als JSON-Objekt.""" + try: + data = json.loads(response) + if isinstance(data, dict) and "groups" in data: + return data + except json.JSONDecodeError: + pass + + match = re.search(r'\{.*\}', response, re.DOTALL) + if match: + try: + data = json.loads(match.group()) + if isinstance(data, dict) and "groups" in data: + return data + except json.JSONDecodeError: + pass + + logger.warning("Konnte Triage-Antwort nicht als JSON parsen") + return None + + def _validate_facts(self, facts: list[dict]) -> list[dict]: + """Validiert Fakten: confirmed/established ohne URL wird herabgestuft.""" + url_pattern = re.compile(r'https?://') + for fact in facts: + status = fact.get("status", "") + evidence = fact.get("evidence") or "" + if status in ("confirmed", "established") and not url_pattern.search(evidence): + old_status = status + fact["status"] = "unconfirmed" if status == "confirmed" else "unverified" + logger.warning( + f"Fakt herabgestuft ({old_status} -> {fact['status']}): " + f"keine URL in Evidenz: '{fact.get('claim', '')[:60]}...'" + ) + return facts + + def _parse_response(self, response: str) -> list[dict]: + """Parst die Claude-Antwort als JSON-Array.""" + try: + data = json.loads(response) + if isinstance(data, list): + return self._validate_facts(data) + except json.JSONDecodeError: + pass + + match = re.search(r'\[.*\]', response, re.DOTALL) + if match: + try: + data = json.loads(match.group()) + if isinstance(data, list): + return self._validate_facts(data) + except json.JSONDecodeError: + pass + + logger.warning("Konnte Faktencheck-Antwort nicht als JSON parsen") + return [] diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 82c7734..6415ee6 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -9,7 +9,7 @@ from typing import Optional from urllib.parse import urlparse, urlunparse from agents.claude_client import UsageAccumulator -from agents.factchecker import find_matching_claim, deduplicate_new_facts +from agents.factchecker import find_matching_claim, deduplicate_new_facts, TWOPHASE_MIN_FACTS from source_rules import ( _detect_category, _extract_domain, @@ -764,58 +764,108 @@ class AgentOrchestrator: except Exception as e: logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}") - # Schritt 3: Analyse und Zusammenfassung + # Schritt 3+4: Analyse und Faktencheck PARALLEL if new_count > 0 or not previous_summary: - analyzer = AnalyzerAgent() + is_first_summary = not previous_summary - # Inkrementelle Analyse wenn Lagebild bereits existiert und neue Artikel vorhanden - if previous_summary and new_count > 0: - logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") - analysis, analysis_usage = await analyzer.analyze_incremental( - title, description, new_articles_for_analysis, - previous_summary, previous_sources_json, incident_type, + # Snapshot des alten Lagebilds sichern BEVOR parallele Verarbeitung startet + if previous_summary: + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?", + (incident_id,), ) - else: - # Erstanalyse: Alle Artikel laden - logger.info("Erstanalyse: Alle Artikel werden analysiert") + snap_articles = (await cursor.fetchone())["cnt"] + cursor = await db.execute( + "SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?", + (incident_id,), + ) + snap_fcs = (await cursor.fetchone())["cnt"] + await db.execute( + """INSERT INTO incident_snapshots + (incident_id, summary, sources_json, + article_count, fact_check_count, refresh_log_id, created_at, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (incident_id, previous_summary, previous_sources_json, + snap_articles, snap_fcs, log_id, now, tenant_id), + ) + await db.commit() + + # Bestehende Fakten und alle Artikel vorladen (für parallele Tasks) + cursor = await db.execute( + "SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?", + (incident_id,), + ) + existing_facts = [dict(row) for row in await cursor.fetchall()] + + # Alle Artikel vorladen für Erstanalyse/Erstcheck + all_articles_preloaded = None + if not previous_summary or new_count == 0: cursor = await db.execute( "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", (incident_id,), ) - all_articles = [dict(row) for row in await cursor.fetchall()] - analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type) + all_articles_preloaded = [dict(row) for row in await cursor.fetchall()] + if self._ws_manager: + await self._ws_manager.broadcast_for_incident({ + "type": "status_update", + "incident_id": incident_id, + "data": {"status": "analyzing", "detail": "Analyse und Faktencheck laufen parallel...", "started_at": now_utc}, + }, visibility, created_by, tenant_id) + + # --- Analyse-Task --- + async def _do_analysis(): + analyzer = AnalyzerAgent() + if previous_summary and new_count > 0: + logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") + return await analyzer.analyze_incremental( + title, description, new_articles_for_analysis, + previous_summary, previous_sources_json, incident_type, + ) + else: + logger.info("Erstanalyse: Alle Artikel werden analysiert") + return await analyzer.analyze(title, description, all_articles_preloaded, incident_type) + + # --- Faktencheck-Task --- + async def _do_factcheck(): + factchecker = FactCheckerAgent() + if existing_facts and new_count > 0: + if len(existing_facts) >= TWOPHASE_MIN_FACTS: + logger.info( + f"Zwei-Phasen-Faktencheck: {new_count} neue Artikel, " + f"{len(existing_facts)} bestehende Fakten" + ) + return await factchecker.check_incremental_twophase( + title, new_articles_for_analysis, existing_facts, incident_type, + ) + else: + logger.info( + f"Inkrementeller Faktencheck: {new_count} neue Artikel, " + f"{len(existing_facts)} bestehende Fakten" + ) + return await factchecker.check_incremental( + title, new_articles_for_analysis, existing_facts, incident_type, + ) + else: + return await factchecker.check(title, all_articles_preloaded or [], incident_type) + + # Beide Tasks PARALLEL starten + logger.info("Starte Analyse und Faktencheck parallel...") + analysis_result, factcheck_result = await asyncio.gather( + _do_analysis(), + _do_factcheck(), + ) + + analysis, analysis_usage = analysis_result + fact_checks, fc_usage = factcheck_result + + # --- Analyse-Ergebnisse verarbeiten --- if analysis_usage: usage_acc.add(analysis_usage) if analysis: - is_first_summary = not previous_summary - - # Snapshot des alten Lagebilds sichern (nur wenn schon eins existiert) - if previous_summary: - cursor = await db.execute( - "SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?", - (incident_id,), - ) - snap_articles = (await cursor.fetchone())["cnt"] - cursor = await db.execute( - "SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?", - (incident_id,), - ) - snap_fcs = (await cursor.fetchone())["cnt"] - await db.execute( - """INSERT INTO incident_snapshots - (incident_id, summary, sources_json, - article_count, fact_check_count, refresh_log_id, created_at, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (incident_id, previous_summary, previous_sources_json, - snap_articles, snap_fcs, log_id, now, tenant_id), - ) - - # sources_json aus der Analyse extrahieren und speichern sources = analysis.get("sources", []) sources_json = json.dumps(sources, ensure_ascii=False) if sources else None - new_summary = analysis.get("summary", "") await db.execute( @@ -855,50 +905,16 @@ class AgentOrchestrator: await db.commit() - # Checkpoint 2: Cancel prüfen nach Analyse + # Cancel-Check nach paralleler Verarbeitung self._check_cancelled(incident_id) - if self._ws_manager: - await self._ws_manager.broadcast_for_incident({ - "type": "status_update", - "incident_id": incident_id, - "data": {"status": "factchecking", "detail": "Prüft Fakten gegen unabhängige Quellen...", "started_at": now_utc}, - }, visibility, created_by, tenant_id) - - # Schritt 4: Faktencheck - factchecker = FactCheckerAgent() - - # Bestehende Fakten laden für inkrementellen Check - cursor = await db.execute( - "SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?", - (incident_id,), - ) - existing_facts = [dict(row) for row in await cursor.fetchall()] - - if existing_facts and new_count > 0: - # Inkrementeller Faktencheck: nur neue Artikel + bestehende Fakten - logger.info(f"Inkrementeller Faktencheck: {new_count} neue Artikel, {len(existing_facts)} bestehende Fakten") - fact_checks, fc_usage = await factchecker.check_incremental( - title, new_articles_for_analysis, existing_facts, incident_type, - ) - else: - # Erstcheck: alle Artikel - cursor = await db.execute( - "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", - (incident_id,), - ) - all_articles_for_fc = [dict(row) for row in await cursor.fetchall()] - fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type) - + # --- Faktencheck-Ergebnisse verarbeiten --- # Pre-Dedup: Duplikate aus LLM-Antwort entfernen fact_checks = deduplicate_new_facts(fact_checks) if fc_usage: usage_acc.add(fc_usage) - # Checkpoint 3: Cancel prüfen nach Faktencheck - self._check_cancelled(incident_id) - # Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks) is_first_refresh = len(existing_facts) == 0