Dateien
AegisSight-Monitor/src/agents/factchecker.py
claude-dev e2ea4eaaa0 Faktencheck-Deduplizierung und Auto-Resolve implementiert
3-Ebenen-System gegen Duplikate:
1. Pre-Dedup: LLM-Antwort wird vor DB-Insert dedupliziert (deduplicate_new_facts)
2. Auto-Resolve: Bestaetigte Fakten loesen automatisch stale developing/unconfirmed Fakten auf
3. Periodische Konsolidierung: Haiku clustert alle 6h semantische Duplikate und entfernt sie

Verbessertes Claim-Matching: SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%)
statt reinem SequenceMatcher. Threshold von 0.7 auf 0.75 erhoeht.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 21:59:50 +01:00

404 Zeilen
16 KiB
Python

"""Factchecker-Agent: Prüft Fakten gegen mehrere unabhängige Quellen."""
import json
import logging
import re
from difflib import SequenceMatcher
from agents.claude_client import call_claude, ClaudeUsage
logger = logging.getLogger("osint.factchecker")
FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
VORFALL: {title}
VORLIEGENDE MELDUNGEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Meldungen stammen
- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Meldungen + WebSearch
- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen
- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren
- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als bestätigt markieren
- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten
- "confirmed" erst bei 2+ unabhängigen Quellen mit überprüfbarer URL
- Lieber "unconfirmed" als falsch bestätigt
AUFTRAG:
1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Meldungen
2. Prüfe jeden Claim aktiv per WebSearch gegen mindestens eine weitere unabhängige Quelle
3. Kategorisiere jede Aussage:
- "confirmed": Durch 2+ unabhängige seriöse Quellen mit überprüfbarer URL bestätigt
- "unconfirmed": Nur 1 Quelle oder nicht unabhängig verifizierbar
- "contradicted": Widersprüchliche Informationen aus verschiedenen Quellen
- "developing": Situation noch unklar, entwickelt sich
4. Markiere WICHTIGE NEUE Entwicklungen mit is_notification: true
Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing"
- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL
- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg (z.B. "Bestätigt durch: tagesschau.de (URL), Reuters (URL)")
- "is_notification": true/false (nur bei wichtigen Entwicklungen true)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA: {title}
VORLIEGENDE QUELLEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die direkt aus den oben übergebenen Quellen stammen
- KEINE Fakten aus deinem Trainingskorpus - NUR aus den übergebenen Quellen + WebSearch
- Nutze WebSearch um jeden Claim gegen mindestens 1 weitere unabhängige Quelle zu prüfen
- Rufe die gefundenen URLs per WebFetch ab um den Inhalt zu verifizieren
- Nur wenn du den Claim in der tatsächlich abgerufenen Quelle findest, darfst du ihn als gesichert markieren
- Jeder Claim MUSS eine konkrete Quellen-URL als Beleg enthalten
- Lieber "unverified" als falsch bestätigt
AUFTRAG:
Fokus: "Was sind die gesicherten Fakten zu diesem Thema?"
1. Identifiziere die 5-10 wichtigsten Faktenaussagen aus den Quellen
2. Prüfe jeden Claim aktiv per WebSearch gegen weitere unabhängige Quellen
3. Kategorisiere jede Aussage:
- "established": Breit dokumentierter, gesicherter Fakt (3+ unabhängige Quellen mit URL)
- "disputed": Umstrittener Sachverhalt, verschiedene Positionen dokumentiert
- "unverified": Einzelbehauptung, nicht unabhängig verifizierbar
- "developing": Aktuelle Entwicklung, Faktenlage noch im Fluss
4. Markiere WICHTIGE Erkenntnisse mit is_notification: true
Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "established" | "disputed" | "unverified" | "developing"
- "sources_count": Anzahl unabhängiger Quellen mit überprüfbarer URL
- "evidence": Begründung MIT konkreten Quellen-URLs als Beleg
- "is_notification": true/false
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
# --- Inkrementelle Faktencheck-Prompts (für Folge-Refreshes) ---
INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
VORFALL: {title}
BEREITS GEPRÜFTE FAKTEN:
{existing_facts_text}
NEUE MELDUNGEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die aus den Meldungen oder bereits geprüften Fakten stammen
- KEINE Fakten aus deinem Trainingskorpus
- Nutze WebSearch zur Verifikation
- Rufe gefundene URLs per WebFetch ab
AUFTRAG:
1. Prüfe ob die neuen Meldungen bereits geprüfte Fakten BESTÄTIGEN, WIDERLEGEN oder ERGÄNZEN
2. Aktualisiere den Status bestehender Fakten wenn nötig (z.B. "unconfirmed""confirmed")
3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Meldungen
4. Prüfe neue Claims per WebSearch gegen unabhängige Quellen
5. Markiere wichtige Statusänderungen und neue Entwicklungen mit is_notification: true
Status-Kategorien:
- "confirmed": 2+ unabhängige seriöse Quellen mit URL
- "unconfirmed": Nur 1 Quelle
- "contradicted": Widersprüchliche Informationen
- "developing": Situation unklar
Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue).
Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing"
- "sources_count": Anzahl unabhängiger Quellen
- "evidence": Begründung MIT konkreten Quellen-URLs
- "is_notification": true/false
Antworte NUR mit dem JSON-Array."""
INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
THEMA: {title}
BEREITS GEPRÜFTE FAKTEN:
{existing_facts_text}
NEUE QUELLEN:
{articles_text}
STRENGE REGELN - KEINE HALLUZINATIONEN:
- Du darfst NUR Fakten bewerten, die aus den Quellen oder bereits geprüften Fakten stammen
- KEINE Fakten aus deinem Trainingskorpus
- Nutze WebSearch zur Verifikation
- Rufe gefundene URLs per WebFetch ab
AUFTRAG:
1. Prüfe ob die neuen Quellen bereits geprüfte Fakten bestätigen, widerlegen oder ergänzen
2. Aktualisiere den Status bestehender Fakten wenn nötig
3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Quellen
4. Prüfe neue Claims per WebSearch
Status-Kategorien:
- "established": 3+ unabhängige Quellen mit URL
- "disputed": Verschiedene Positionen dokumentiert
- "unverified": Nicht unabhängig verifizierbar
- "developing": Faktenlage im Fluss
Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue).
Jedes Element hat:
- "claim": Die Faktenaussage auf {output_language}
- "status": "established" | "disputed" | "unverified" | "developing"
- "sources_count": Anzahl unabhängiger Quellen
- "evidence": Begründung MIT konkreten Quellen-URLs
- "is_notification": true/false
Antworte NUR mit dem JSON-Array."""
# --- Stopwords fuer Keyword-Extraktion ---
_STOPWORDS = frozenset({
"der", "die", "das", "ein", "eine", "und", "oder", "von", "nach", "bei", "mit",
"wurde", "wird", "haben", "sein", "dass", "ist", "sind", "hat", "vor", "fuer",
"den", "dem", "des", "sich", "auf", "als", "auch", "noch", "nicht", "aber",
"ueber", "durch", "einer", "einem", "eines", "werden", "wurde", "waren",
"the", "and", "was", "has", "been", "have", "that", "with", "from", "for",
"are", "were", "this", "which", "into", "their", "than", "about",
})
STATUS_PRIORITY = {
"confirmed": 5, "established": 5,
"contradicted": 4, "disputed": 4,
"unconfirmed": 3, "unverified": 3,
"developing": 1,
}
def normalize_claim(claim: str) -> str:
"""Normalisiert einen Claim fuer Aehnlichkeitsvergleich."""
c = claim.lower().strip()
c = c.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue").replace("\u00df", "ss")
c = re.sub(r'[^\w\s]', '', c)
c = re.sub(r'\s+', ' ', c).strip()
return c
def _keyword_set(text: str) -> set[str]:
"""Extrahiert signifikante Woerter fuer Overlap-Vergleich."""
words = set(normalize_claim(text).split())
return {w for w in words if len(w) >= 4 and w not in _STOPWORDS}
def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.75) -> dict | None:
"""Findet den besten passenden bestehenden Claim per kombiniertem Scoring.
Verwendet SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%) fuer robusteres Matching.
"""
norm_new = normalize_claim(new_claim)
if not norm_new:
return None
kw_new = _keyword_set(new_claim)
best_match = None
best_score = 0.0
for existing in existing_claims:
norm_existing = normalize_claim(existing.get("claim", ""))
if not norm_existing:
continue
# Fruehzeitiger Abbruch bei grossem Laengenunterschied
len_ratio = len(norm_new) / len(norm_existing) if norm_existing else 0
if len_ratio > 2.5 or len_ratio < 0.4:
continue
seq_ratio = SequenceMatcher(None, norm_new, norm_existing).ratio()
kw_existing = _keyword_set(existing.get("claim", ""))
kw_union = kw_new | kw_existing
jaccard = len(kw_new & kw_existing) / len(kw_union) if kw_union else 0.0
combined = 0.7 * seq_ratio + 0.3 * jaccard
if combined > best_score:
best_score = combined
best_match = existing
if best_score >= threshold:
logger.debug(
f"Claim-Match ({best_score:.2f}): "
f"'{new_claim[:50]}...' -> '{best_match['claim'][:50]}...'"
)
return best_match
return None
def deduplicate_new_facts(facts: list[dict], threshold: float = 0.70) -> list[dict]:
"""Dedupliziert Fakten aus einer einzelnen LLM-Antwort vor dem DB-Insert.
Clustert aehnliche Claims und behaelt pro Cluster den mit dem
hoechsten Status und den meisten Quellen.
"""
if not facts:
return []
clusters: list[list[dict]] = []
for fact in facts:
matched_cluster = None
for cluster in clusters:
if find_matching_claim(fact.get("claim", ""), cluster, threshold=threshold):
matched_cluster = cluster
break
if matched_cluster is not None:
matched_cluster.append(fact)
else:
clusters.append([fact])
result = []
for cluster in clusters:
best = max(cluster, key=lambda f: (
STATUS_PRIORITY.get(f.get("status", "developing"), 0),
f.get("sources_count", 0),
))
result.append(best)
if len(result) < len(facts):
logger.info(
f"Fakten-Dedup: {len(facts)} -> {len(result)} "
f"(-{len(facts) - len(result)} Duplikate)"
)
return result
class FactCheckerAgent:
"""Prüft Fakten über Claude CLI gegen unabhängige Quellen."""
def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str:
"""Formatiert Artikel als Text für den Prompt."""
articles_text = ""
for i, article in enumerate(articles[:max_articles]):
articles_text += f"\n--- Meldung {i+1} ---\n"
articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n"
source_url = article.get('source_url', '')
if source_url:
articles_text += f"URL: {source_url}\n"
headline = article.get('headline_de') or article.get('headline', '')
articles_text += f"Überschrift: {headline}\n"
content = article.get('content_de') or article.get('content_original', '')
if content:
articles_text += f"Inhalt: {content[:500]}\n"
return articles_text
def _format_existing_facts(self, facts: list[dict]) -> str:
"""Formatiert bestehende Fakten als Text für den inkrementellen Prompt."""
if not facts:
return "Keine bisherigen Fakten"
lines = []
for fc in facts:
status = fc.get("status", "developing")
claim = fc.get("claim", "")
sources = fc.get("sources_count", 0)
lines.append(f"- [{status}] ({sources} Quellen) {claim}")
return "\n".join(lines)
async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]:
"""Führt vollständigen Faktencheck durch (erster Refresh)."""
if not articles:
return [], None
articles_text = self._format_articles_text(articles)
from config import OUTPUT_LANGUAGE
template = RESEARCH_FACTCHECK_PROMPT_TEMPLATE if incident_type == "research" else FACTCHECK_PROMPT_TEMPLATE
prompt = template.format(
title=title,
articles_text=articles_text,
output_language=OUTPUT_LANGUAGE,
)
try:
result, usage = await call_claude(prompt)
facts = self._parse_response(result)
logger.info(f"Faktencheck: {len(facts)} Fakten geprüft")
return facts, usage
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Faktencheck-Fehler: {e}")
return [], None
async def check_incremental(
self,
title: str,
new_articles: list[dict],
existing_facts: list[dict],
incident_type: str = "adhoc",
) -> tuple[list[dict], ClaudeUsage | None]:
"""Inkrementeller Faktencheck: Prüft nur neue Artikel gegen bestehende Fakten.
Spart Tokens, da nur neue Artikel + Zusammenfassung der bestehenden Fakten gesendet werden.
"""
if not new_articles:
logger.info("Inkrementeller Faktencheck übersprungen: keine neuen Artikel")
return [], None
articles_text = self._format_articles_text(new_articles, max_articles=15)
existing_facts_text = self._format_existing_facts(existing_facts)
from config import OUTPUT_LANGUAGE
if incident_type == "research":
template = INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE
else:
template = INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE
prompt = template.format(
title=title,
articles_text=articles_text,
existing_facts_text=existing_facts_text,
output_language=OUTPUT_LANGUAGE,
)
try:
result, usage = await call_claude(prompt)
facts = self._parse_response(result)
logger.info(f"Inkrementeller Faktencheck: {len(facts)} Fakten (neu + aktualisiert)")
return facts, usage
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Inkrementeller Faktencheck-Fehler: {e}")
return [], None
def _parse_response(self, response: str) -> list[dict]:
"""Parst die Claude-Antwort als JSON-Array."""
try:
data = json.loads(response)
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
match = re.search(r'\[.*\]', response, re.DOTALL)
if match:
try:
data = json.loads(match.group())
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
logger.warning("Konnte Faktencheck-Antwort nicht als JSON parsen")
return []