Dateien
AegisSight-Monitor/src/agents/factchecker.py

782 Zeilen
31 KiB
Python

"""Factchecker-Agent: Prüft Fakten gegen mehrere unabhängige Quellen."""
import asyncio
import json
import logging
import re
from difflib import SequenceMatcher
from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator
logger = logging.getLogger("osint.factchecker")
FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
VORFALL: {title}
VORLIEGENDE MELDUNGEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Meldungen stammen
- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Meldungen + WebSearch
- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen
- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren
- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als bestätigt markieren
- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten
- "confirmed" erst bei 2+ unabhängigen Quellen mit überprüfbarer URL
- Lieber "unconfirmed" als falsch bestätigt
AUFTRAG:
1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Meldungen
2. Prüfe jeden Claim aktiv per WebSearch gegen mindestens eine weitere unabhängige Quelle
3. Kategorisiere jede Aussage:
- "confirmed": Durch 2+ unabhängige seriöse Quellen mit überprüfbarer URL bestätigt
- "unconfirmed": Nur 1 Quelle oder nicht unabhängig verifizierbar
- "contradicted": Widersprüchliche Informationen aus verschiedenen Quellen
- "developing": Situation noch unklar, entwickelt sich
4. Markiere WICHTIGE NEUE Entwicklungen mit is_notification: true
Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing"
- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL
- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg (z.B. "Bestätigt durch: tagesschau.de (URL), Reuters (URL)")
- "is_notification": true/false (nur bei wichtigen Entwicklungen true)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA: {title}
VORLIEGENDE QUELLEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Quellen stammen
- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Quellen + WebSearch
- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen
- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren
- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als gesichert markieren
- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten
- Lieber "unverified" als falsch bestätigt
AUFTRAG:
Fokus: "Was sind die gesicherten Fakten zu diesem Thema?"
1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Quellen
2. Prüfe jeden Claim aktiv per WebSearch gegen weitere unabhängige Quellen
3. Kategorisiere jede Aussage:
- "established": Breit dokumentierter, gesicherter Fakt (3+ unabhängige Quellen mit URL)
- "disputed": Umstrittener Sachverhalt, verschiedene Positionen dokumentiert
- "unverified": Einzelbehauptung, nicht unabhängig verifizierbar
- "developing": Aktuelle Entwicklung, Faktenlage noch im Fluss
4. Markiere WICHTIGE Erkenntnisse mit is_notification: true
Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "established" | "disputed" | "unverified" | "developing"
- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL
- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg
- "is_notification": true/false
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
# --- Inkrementelle Faktencheck-Prompts (für Folge-Refreshes) ---
INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
VORFALL: {title}
BEREITS GEPRÜFTE FAKTEN:
{existing_facts_text}
NEUE MELDUNGEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die aus den Meldungen oder bereits geprüften Fakten stammen
- KEINE Fakten aus deinem Trainingskorpus
- Nutze WebSearch zur Verifikation
- Rufe gefundene URLs per WebFetch ab
AUFTRAG:
1. Prüfe ob die neuen Meldungen bereits geprüfte Fakten BESTÄTIGEN, WIDERLEGEN oder ERGÄNZEN
2. Aktualisiere den Status bestehender Fakten wenn nötig (z.B. "unconfirmed""confirmed")
3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Meldungen
4. Prüfe neue Claims per WebSearch gegen unabhängige Quellen
5. Markiere wichtige Statusänderungen und neue Entwicklungen mit is_notification: true
Status-Kategorien:
- "confirmed": 2+ unabhängige seriöse Quellen mit URL
- "unconfirmed": Nur 1 Quelle
- "contradicted": Widersprüchliche Informationen
- "developing": Situation unklar
Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue).
Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing"
- "sources_count": Anzahl unabhängiger Quellen
- "evidence": Begründung MIT konkreten Quellen-URLs
- "is_notification": true/false
Antworte NUR mit dem JSON-Array."""
INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA: {title}
BEREITS GEPRÜFTE FAKTEN:
{existing_facts_text}
NEUE QUELLEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die aus den Quellen oder bereits geprüften Fakten stammen
- KEINE Fakten aus deinem Trainingskorpus
- Nutze WebSearch zur Verifikation
- Rufe gefundene URLs per WebFetch ab
AUFTRAG:
1. Prüfe ob die neuen Quellen bereits geprüfte Fakten bestätigen, widerlegen oder ergänzen
2. Aktualisiere den Status bestehender Fakten wenn nötig
3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Quellen
4. Prüfe neue Claims per WebSearch
Status-Kategorien:
- "established": 3+ unabhängige Quellen mit URL
- "disputed": Verschiedene Positionen dokumentiert
- "unverified": Nicht unabhängig verifizierbar
- "developing": Faktenlage im Fluss
Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue).
Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "established" | "disputed" | "unverified" | "developing"
- "sources_count": Anzahl unabhängiger Quellen
- "evidence": Begründung MIT konkreten Quellen-URLs
- "is_notification": true/false
Antworte NUR mit dem JSON-Array."""
# --- Zwei-Phasen-Faktencheck: Prompt-Templates ---
TRIAGE_PROMPT_TEMPLATE = """Du bist ein Triage-System für Faktenchecks eines OSINT-Lagemonitoring-Systems.
AUSGABESPRACHE: {output_language}
BESTEHENDE FAKTENAUSSAGEN ({fact_count} Stück):
{existing_facts_text}
NEUE NACHRICHTENARTIKEL ({article_count} Stück):
{articles_text}
AUFGABE:
Analysiere die neuen Artikel und identifiziere:
1. BETROFFENE BESTEHENDE FAKTEN: Welche der bestehenden Fakten könnten durch die neuen Artikel
betroffen sein? (neue Bestätigung, Widerlegung, neue Evidenz, Status-Update nötig)
2. NEUE FAKTENAUSSAGEN: Welche neuen prüfbaren Faktenaussagen enthalten die Artikel,
die noch nicht in den bestehenden Fakten erfasst sind? (3-5 neue Claims)
3. GRUPPIERUNG: Gruppiere verwandte Fakten thematisch für die parallele Batch-Verarbeitung.
Verwandte Fakten MÜSSEN in derselben Gruppe sein. Max {max_per_group} Fakten pro Gruppe.
WICHTIG:
- Sei GRÜNDLICH — übersehe keine semantischen Verbindungen
- Auch indirekte Verbindungen beachten (diplomatisch <-> militärisch <-> wirtschaftlich)
- Alle developing/unconfirmed/unverified Fakten IMMER einbeziehen
- Max {max_per_group} Fakten pro Gruppe (teile große Gruppen auf)
Antworte AUSSCHLIESSLICH als JSON:
{{{{
"affected_fact_ids": [int, ...],
"new_claims": ["Prüfbare Faktenaussage als vollständiger Satz", ...],
"groups": [
{{{{
"id": 1,
"theme": "Themenbezeichnung",
"fact_ids": [int, ...],
"new_claim_indices": [int, ...]
}}}}
],
"total_affected": int,
"reasoning": "Kurze Begründung"
}}}}"""
VERIFY_GROUP_PROMPT_TEMPLATE = """Du prüfst Faktenaussagen gegen unabhängige Quellen in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA DIESER GRUPPE: {theme}
ZU PRÜFENDE BESTEHENDE FAKTEN:
{facts_text}
NEUE CLAIMS ZU PRÜFEN:
{new_claims_text}
KONTEXT (neue Nachrichtenartikel):
{articles_text}
ANWEISUNGEN:
Für JEDE Faktenaussage (bestehend UND neu):
1. Suche per WebSearch nach unabhängigen Bestätigungen oder Widerlegungen
2. Bewerte den Status:
- "confirmed": Mindestens 2 unabhängige Quellen mit verifizierter URL bestätigen
- "unconfirmed": Nicht genug unabhängige Bestätigung
- "contradicted": Glaubwürdige Quellen widersprechen
- "developing": Lage noch im Fluss
3. Dokumentiere die Evidenz mit konkreten, verifizierten URLs
QUALITÄTSREGELN:
- "confirmed" NUR bei mindestens 2 unabhängigen Quellen mit ECHTER URL
- KEINE Halluzinationen — nur tatsächlich per WebSearch gefundene Quellen
- Bestehende Evidenz BEIBEHALTEN, nur neue ergänzen
- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten
Antworte AUSSCHLIESSLICH als JSON-Array:
[
{{{{
"id": 123,
"claim": "Die Faktenaussage...",
"status": "confirmed|unconfirmed|contradicted|developing",
"sources_count": 3,
"evidence": "Bestätigt durch: Quelle1 (URL)\\nQuelle2 (URL)",
"is_notification": false
}}}}
]
Für NEUE Fakten setze id auf null."""
VERIFY_GROUP_RESEARCH_PROMPT_TEMPLATE = """Du prüfst Faktenaussagen gegen unabhängige Quellen in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA DIESER GRUPPE: {theme}
ZU PRÜFENDE BESTEHENDE FAKTEN:
{facts_text}
NEUE CLAIMS ZU PRÜFEN:
{new_claims_text}
KONTEXT (neue Quellen):
{articles_text}
ANWEISUNGEN:
Für JEDE Faktenaussage (bestehend UND neu):
1. Suche per WebSearch nach unabhängigen Bestätigungen oder Widerlegungen
2. Bewerte den Status:
- "established": 3+ unabhängige Quellen mit verifizierter URL
- "unverified": Nicht unabhängig verifizierbar
- "disputed": Verschiedene Positionen dokumentiert
- "developing": Faktenlage im Fluss
3. Dokumentiere die Evidenz mit konkreten, verifizierten URLs
QUALITÄTSREGELN:
- "established" NUR bei mindestens 3 unabhängigen Quellen mit ECHTER URL
- KEINE Halluzinationen — nur tatsächlich per WebSearch gefundene Quellen
- Bestehende Evidenz BEIBEHALTEN, nur neue ergänzen
Antworte AUSSCHLIESSLICH als JSON-Array:
[
{{{{
"id": 123,
"claim": "Die Faktenaussage...",
"status": "established|unverified|disputed|developing",
"sources_count": 3,
"evidence": "Bestätigt durch: Quelle1 (URL)\\nQuelle2 (URL)",
"is_notification": false
}}}}
]
Für NEUE Fakten setze id auf null."""
# --- Stopwords fuer Keyword-Extraktion ---
_STOPWORDS = frozenset({
"der", "die", "das", "ein", "eine", "und", "oder", "von", "nach", "bei", "mit",
"wurde", "wird", "haben", "sein", "dass", "ist", "sind", "hat", "vor", "fuer",
"den", "dem", "des", "sich", "auf", "als", "auch", "noch", "nicht", "aber",
"ueber", "durch", "einer", "einem", "eines", "werden", "wurde", "waren",
"the", "and", "was", "has", "been", "have", "that", "with", "from", "for",
"are", "were", "this", "which", "into", "their", "than", "about",
})
STATUS_PRIORITY = {
"confirmed": 5, "established": 5,
"contradicted": 4, "disputed": 4,
"unconfirmed": 3, "unverified": 3,
"developing": 1,
}
# Zwei-Phasen-Faktencheck: Konfiguration
MAX_FACTS_PER_VERIFY_GROUP = 8 # Max Fakten pro Verifikationsgruppe
TWOPHASE_MIN_FACTS = 25 # Ab dieser Anzahl bestehender Fakten wird der Zwei-Phasen-Ansatz genutzt
def normalize_claim(claim: str) -> str:
"""Normalisiert einen Claim fuer Aehnlichkeitsvergleich."""
c = claim.lower().strip()
c = c.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue").replace("\u00df", "ss")
c = re.sub(r'[^\w\s]', '', c)
c = re.sub(r'\s+', ' ', c).strip()
return c
def _keyword_set(text: str) -> set[str]:
"""Extrahiert signifikante Woerter fuer Overlap-Vergleich."""
words = set(normalize_claim(text).split())
return {w for w in words if len(w) >= 4 and w not in _STOPWORDS}
def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.75) -> dict | None:
"""Findet den besten passenden bestehenden Claim per kombiniertem Scoring.
Verwendet SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%) fuer robusteres Matching.
"""
norm_new = normalize_claim(new_claim)
if not norm_new:
return None
kw_new = _keyword_set(new_claim)
best_match = None
best_score = 0.0
for existing in existing_claims:
norm_existing = normalize_claim(existing.get("claim", ""))
if not norm_existing:
continue
# Fruehzeitiger Abbruch bei grossem Laengenunterschied
len_ratio = len(norm_new) / len(norm_existing) if norm_existing else 0
if len_ratio > 2.5 or len_ratio < 0.4:
continue
seq_ratio = SequenceMatcher(None, norm_new, norm_existing).ratio()
kw_existing = _keyword_set(existing.get("claim", ""))
kw_union = kw_new | kw_existing
jaccard = len(kw_new & kw_existing) / len(kw_union) if kw_union else 0.0
combined = 0.7 * seq_ratio + 0.3 * jaccard
if combined > best_score:
best_score = combined
best_match = existing
if best_score >= threshold:
logger.debug(
f"Claim-Match ({best_score:.2f}): "
f"'{new_claim[:50]}...' -> '{best_match['claim'][:50]}...'"
)
return best_match
return None
def deduplicate_new_facts(facts: list[dict], threshold: float = 0.70) -> list[dict]:
"""Dedupliziert Fakten aus einer einzelnen LLM-Antwort vor dem DB-Insert.
Clustert aehnliche Claims und behaelt pro Cluster den mit dem
hoechsten Status und den meisten Quellen.
"""
if not facts:
return []
clusters: list[list[dict]] = []
for fact in facts:
matched_cluster = None
for cluster in clusters:
if find_matching_claim(fact.get("claim", ""), cluster, threshold=threshold):
matched_cluster = cluster
break
if matched_cluster is not None:
matched_cluster.append(fact)
else:
clusters.append([fact])
result = []
for cluster in clusters:
best = max(cluster, key=lambda f: (
STATUS_PRIORITY.get(f.get("status", "developing"), 0),
f.get("sources_count", 0),
))
result.append(best)
if len(result) < len(facts):
logger.info(
f"Fakten-Dedup: {len(facts)} -> {len(result)} "
f"(-{len(facts) - len(result)} Duplikate)"
)
return result
class FactCheckerAgent:
"""Prüft Fakten über Claude CLI gegen unabhängige Quellen."""
def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str:
"""Formatiert Artikel als Text für den Prompt."""
articles_text = ""
for i, article in enumerate(articles[:max_articles]):
articles_text += f"\n--- Meldung {i+1} ---\n"
articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n"
source_url = article.get('source_url', '')
if source_url:
articles_text += f"URL: {source_url}\n"
headline = article.get('headline_de') or article.get('headline', '')
articles_text += f"Überschrift: {headline}\n"
content = article.get('content_de') or article.get('content_original', '')
if content:
articles_text += f"Inhalt: {content[:500]}\n"
return articles_text
def _format_existing_facts(self, facts: list[dict]) -> str:
"""Formatiert bestehende Fakten als Text für den inkrementellen Prompt."""
if not facts:
return "Keine bisherigen Fakten"
lines = []
for fc in facts:
status = fc.get("status", "developing")
claim = fc.get("claim", "")
sources = fc.get("sources_count", 0)
lines.append(f"- [{status}] ({sources} Quellen) {claim}")
return "\n".join(lines)
async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]:
"""Führt vollständigen Faktencheck durch (erster Refresh)."""
if not articles:
return [], None
articles_text = self._format_articles_text(articles)
from config import OUTPUT_LANGUAGE
template = RESEARCH_FACTCHECK_PROMPT_TEMPLATE if incident_type == "research" else FACTCHECK_PROMPT_TEMPLATE
prompt = template.format(
title=title,
articles_text=articles_text,
output_language=OUTPUT_LANGUAGE,
)
try:
result, usage = await call_claude(prompt)
facts = self._parse_response(result, articles=articles)
logger.info(f"Faktencheck: {len(facts)} Fakten geprüft")
return facts, usage
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Faktencheck-Fehler: {e}")
return [], None
async def check_incremental(
self,
title: str,
new_articles: list[dict],
existing_facts: list[dict],
incident_type: str = "adhoc",
) -> tuple[list[dict], ClaudeUsage | None]:
"""Inkrementeller Faktencheck: Prüft nur neue Artikel gegen bestehende Fakten.
Spart Tokens, da nur neue Artikel + Zusammenfassung der bestehenden Fakten gesendet werden.
"""
if not new_articles:
logger.info("Inkrementeller Faktencheck übersprungen: keine neuen Artikel")
return [], None
articles_text = self._format_articles_text(new_articles, max_articles=15)
existing_facts_text = self._format_existing_facts(existing_facts)
from config import OUTPUT_LANGUAGE
if incident_type == "research":
template = INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE
else:
template = INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE
prompt = template.format(
title=title,
articles_text=articles_text,
existing_facts_text=existing_facts_text,
output_language=OUTPUT_LANGUAGE,
)
try:
result, usage = await call_claude(prompt)
facts = self._parse_response(result, articles=new_articles)
logger.info(f"Inkrementeller Faktencheck: {len(facts)} Fakten (neu + aktualisiert)")
return facts, usage
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Inkrementeller Faktencheck-Fehler: {e}")
return [], None
async def check_incremental_twophase(
self,
title: str,
new_articles: list[dict],
existing_facts: list[dict],
incident_type: str = "adhoc",
) -> tuple[list[dict], ClaudeUsage | None]:
"""Zwei-Phasen inkrementeller Faktencheck: Haiku-Triage + parallele Opus-Verifikation.
Phase 1: Haiku identifiziert betroffene Fakten und neue Claims (schnell, günstig)
Phase 2: Opus verifiziert nur die betroffenen Fakten parallel per WebSearch
Fällt bei Triage-Fehler auf den Standard-check_incremental zurück.
"""
if not new_articles:
logger.info("Zwei-Phasen-Faktencheck übersprungen: keine neuen Artikel")
return [], None
usage_acc = UsageAccumulator()
# --- Phase 1: Triage (Haiku) ---
logger.info(f"Zwei-Phasen-Faktencheck Phase 1: Triage ({len(existing_facts)} Fakten, {len(new_articles)} Artikel)")
triage_facts_text = self._format_facts_for_triage(existing_facts)
articles_text = self._format_articles_text(new_articles, max_articles=15)
from config import OUTPUT_LANGUAGE, CLAUDE_MODEL_FAST
triage_prompt = TRIAGE_PROMPT_TEMPLATE.format(
output_language=OUTPUT_LANGUAGE,
fact_count=len(existing_facts),
existing_facts_text=triage_facts_text,
article_count=len(new_articles),
articles_text=articles_text,
max_per_group=MAX_FACTS_PER_VERIFY_GROUP,
)
try:
triage_result, triage_usage = await call_claude(
triage_prompt, tools=None, model=CLAUDE_MODEL_FAST
)
if triage_usage:
usage_acc.add(triage_usage)
except Exception as e:
logger.warning(f"Triage fehlgeschlagen, Fallback auf Standard-Faktencheck: {e}")
return await self.check_incremental(title, new_articles, existing_facts, incident_type)
# Triage-Ergebnis parsen
triage = self._parse_triage_response(triage_result)
if not triage:
logger.warning("Triage-Antwort nicht parsbar, Fallback auf Standard-Faktencheck")
return await self.check_incremental(title, new_articles, existing_facts, incident_type)
affected_ids = set(triage.get("affected_fact_ids", []))
new_claims = triage.get("new_claims", [])
groups = triage.get("groups", [])
if not groups:
logger.warning("Triage hat keine Gruppen identifiziert, Fallback auf Standard-Faktencheck")
return await self.check_incremental(title, new_articles, existing_facts, incident_type)
logger.info(
f"Triage: {len(affected_ids)} betroffene Fakten, {len(new_claims)} neue Claims, {len(groups)} Gruppen"
)
# --- Phase 2: Parallele Verifikation (Opus) ---
fact_lookup = {f["id"]: f for f in existing_facts}
async def _verify_one_group(group: dict) -> tuple[list[dict], ClaudeUsage | None]:
"""Verifiziert eine einzelne Faktengruppe."""
group_fact_ids = group.get("fact_ids", [])
group_new_indices = group.get("new_claim_indices", [])
theme = group.get("theme", "Allgemein")
group_facts = [fact_lookup[fid] for fid in group_fact_ids if fid in fact_lookup]
group_claims = [new_claims[i] for i in group_new_indices if i < len(new_claims)]
if not group_facts and not group_claims:
return [], None
# Fakten formatieren (mit Evidenz für Kontext)
facts_text = self._format_facts_for_verify(group_facts) if group_facts else "Keine bestehenden Fakten in dieser Gruppe."
new_claims_text = "\n".join(f"- {c}" for c in group_claims) if group_claims else "Keine neuen Claims."
if incident_type == "research":
template = VERIFY_GROUP_RESEARCH_PROMPT_TEMPLATE
else:
template = VERIFY_GROUP_PROMPT_TEMPLATE
prompt = template.format(
output_language=OUTPUT_LANGUAGE,
theme=theme,
facts_text=facts_text,
new_claims_text=new_claims_text,
articles_text=articles_text,
)
try:
result, v_usage = await call_claude(prompt)
facts = self._parse_response(result)
logger.info(f"Gruppe '{theme}': {len(facts)} Fakten geprüft")
return facts, v_usage
except TimeoutError:
logger.error(f"Gruppe '{theme}': Timeout")
return [], None
except Exception as e:
logger.error(f"Gruppe '{theme}': Fehler: {e}")
return [], None
logger.info(f"Zwei-Phasen-Faktencheck Phase 2: {len(groups)} Gruppen parallel verifizieren")
# Alle Gruppen parallel starten
group_results = await asyncio.gather(
*(_verify_one_group(g) for g in groups)
)
# Ergebnisse zusammenführen
all_facts = []
for facts, v_usage in group_results:
all_facts.extend(facts)
if v_usage:
usage_acc.add(v_usage)
# Kombinierte Usage erstellen
combined_usage = ClaudeUsage(
input_tokens=usage_acc.input_tokens,
output_tokens=usage_acc.output_tokens,
cache_creation_tokens=usage_acc.cache_creation_tokens,
cache_read_tokens=usage_acc.cache_read_tokens,
cost_usd=usage_acc.total_cost_usd,
duration_ms=0,
)
logger.info(
f"Zwei-Phasen-Faktencheck abgeschlossen: {len(all_facts)} Fakten "
f"({usage_acc.call_count} API-Calls, ${usage_acc.total_cost_usd:.4f})"
)
return all_facts, combined_usage
def _format_facts_for_triage(self, facts: list[dict]) -> str:
"""Formatiert Fakten kompakt mit IDs für die Triage."""
lines = []
for f in facts:
status = f.get("status", "developing")
claim = f.get("claim", "")
sources = f.get("sources_count", 0)
fid = f.get("id", "?")
lines.append(f"- [ID:{fid}] [{status}] ({sources} Quellen) {claim}")
return "\n".join(lines)
def _format_facts_for_verify(self, facts: list[dict]) -> str:
"""Formatiert Fakten detailliert für die Verifikation."""
parts = []
for f in facts:
evidence = (f.get("evidence") or "")[:300]
parts.append(
f"ID: {f.get('id', '?')}\n"
f"Status: {f.get('status', 'developing')}\n"
f"Claim: {f.get('claim', '')}\n"
f"Bisherige Evidenz: {evidence}\n"
f"Quellen: {f.get('sources_count', 0)}"
)
return "\n---\n".join(parts)
def _parse_triage_response(self, response: str) -> dict | None:
"""Parst die Triage-Antwort als JSON-Objekt."""
try:
data = json.loads(response)
if isinstance(data, dict) and "groups" in data:
return data
except json.JSONDecodeError:
pass
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
try:
data = json.loads(match.group())
if isinstance(data, dict) and "groups" in data:
return data
except json.JSONDecodeError:
pass
logger.warning("Konnte Triage-Antwort nicht als JSON parsen")
return None
def _validate_facts(self, facts: list[dict], articles: list[dict] = None) -> list[dict]:
"""Validiert Fakten: Bei fehlender URL werden Ursprungsquellen aus den Artikeln ergaenzt."""
url_pattern = re.compile(r'https?://')
# Verfuegbare Artikel-URLs sammeln
article_sources = []
if articles:
for a in articles:
url = a.get("source_url", "")
source = a.get("source", "")
headline = a.get("headline_de") or a.get("headline", "")
if url:
article_sources.append({"url": url, "source": source, "headline": headline})
for fact in facts:
status = fact.get("status", "")
evidence = fact.get("evidence") or ""
if status in ("confirmed", "established") and not url_pattern.search(evidence):
# Passende Ursprungsquellen finden (Keyword-Match auf Claim)
claim_lower = (fact.get("claim") or "").lower()
claim_words = [w for w in claim_lower.split() if len(w) >= 4][:8]
matched_sources = []
for src in article_sources:
src_text = (src["headline"] + " " + src["source"]).lower()
matches = sum(1 for w in claim_words if w in src_text)
if matches >= max(1, len(claim_words) // 4):
matched_sources.append(src)
if len(matched_sources) >= 3:
break
if matched_sources:
# Ursprungsquellen anhaengen statt herabstufen
source_refs = "; ".join(
f"{s['source']} ({s['url']})" for s in matched_sources
)
fact["evidence"] = (
evidence.rstrip(". ") +
". [Ursprungsquellen: " + source_refs +
" — Quellenlinks zum Zeitpunkt der Recherche moeglicherweise nicht mehr verfuegbar]"
)
logger.info(
f"Fakt '{fact.get('claim', '')[:50]}...' ergaenzt mit "
f"{len(matched_sources)} Ursprungsquelle(n)"
)
else:
# Keine passende Quelle gefunden -> herabstufen
old_status = status
fact["status"] = "unconfirmed" if status == "confirmed" else "unverified"
logger.warning(
f"Fakt herabgestuft ({old_status} -> {fact['status']}): "
f"keine URL in Evidenz und keine passende Ursprungsquelle: "
f"'{fact.get('claim', '')[:60]}...'"
)
return facts
def _parse_response(self, response: str, articles: list[dict] = None) -> list[dict]:
"""Parst die Claude-Antwort als JSON-Array."""
try:
data = json.loads(response)
if isinstance(data, list):
return self._validate_facts(data, articles=articles)
except json.JSONDecodeError:
pass
match = re.search(r'\[.*\]', response, re.DOTALL)
if match:
try:
data = json.loads(match.group())
if isinstance(data, list):
return self._validate_facts(data)
except json.JSONDecodeError:
pass
logger.warning("Konnte Faktencheck-Antwort nicht als JSON parsen")
return []