diff --git a/src/agents/factchecker.py b/src/agents/factchecker.py index a269b42..bda5fb5 100644 --- a/src/agents/factchecker.py +++ b/src/agents/factchecker.py @@ -168,49 +168,120 @@ Jedes Element hat: Antworte NUR mit dem JSON-Array.""" +# --- Stopwords fuer Keyword-Extraktion --- +_STOPWORDS = frozenset({ + "der", "die", "das", "ein", "eine", "und", "oder", "von", "nach", "bei", "mit", + "wurde", "wird", "haben", "sein", "dass", "ist", "sind", "hat", "vor", "fuer", + "den", "dem", "des", "sich", "auf", "als", "auch", "noch", "nicht", "aber", + "ueber", "durch", "einer", "einem", "eines", "werden", "wurde", "waren", + "the", "and", "was", "has", "been", "have", "that", "with", "from", "for", + "are", "were", "this", "which", "into", "their", "than", "about", +}) + +STATUS_PRIORITY = { + "confirmed": 5, "established": 5, + "contradicted": 4, "disputed": 4, + "unconfirmed": 3, "unverified": 3, + "developing": 1, +} + + def normalize_claim(claim: str) -> str: - """Normalisiert einen Claim für Ähnlichkeitsvergleich.""" + """Normalisiert einen Claim fuer Aehnlichkeitsvergleich.""" c = claim.lower().strip() - # Umlaute normalisieren - c = c.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss") + c = c.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue").replace("\u00df", "ss") c = re.sub(r'[^\w\s]', '', c) c = re.sub(r'\s+', ' ', c).strip() return c -def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.7) -> dict | None: - """Findet den besten passenden bestehenden Claim per Fuzzy-Matching. +def _keyword_set(text: str) -> set[str]: + """Extrahiert signifikante Woerter fuer Overlap-Vergleich.""" + words = set(normalize_claim(text).split()) + return {w for w in words if len(w) >= 4 and w not in _STOPWORDS} - Args: - new_claim: Der neue Claim-Text - existing_claims: Liste von Dicts mit mindestens {"id", "claim", "status"} - threshold: Mindest-Ähnlichkeit (0.0-1.0), Standard 0.7 - Returns: - Das passende Dict oder None wenn kein Match über dem Schwellwert +def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.75) -> dict | None: + """Findet den besten passenden bestehenden Claim per kombiniertem Scoring. + + Verwendet SequenceMatcher (70%) + Jaccard-Keyword-Overlap (30%) fuer robusteres Matching. """ norm_new = normalize_claim(new_claim) if not norm_new: return None + kw_new = _keyword_set(new_claim) best_match = None - best_ratio = 0.0 + best_score = 0.0 for existing in existing_claims: norm_existing = normalize_claim(existing.get("claim", "")) if not norm_existing: continue - ratio = SequenceMatcher(None, norm_new, norm_existing).ratio() - if ratio > best_ratio: - best_ratio = ratio + + # Fruehzeitiger Abbruch bei grossem Laengenunterschied + len_ratio = len(norm_new) / len(norm_existing) if norm_existing else 0 + if len_ratio > 2.5 or len_ratio < 0.4: + continue + + seq_ratio = SequenceMatcher(None, norm_new, norm_existing).ratio() + + kw_existing = _keyword_set(existing.get("claim", "")) + kw_union = kw_new | kw_existing + jaccard = len(kw_new & kw_existing) / len(kw_union) if kw_union else 0.0 + + combined = 0.7 * seq_ratio + 0.3 * jaccard + + if combined > best_score: + best_score = combined best_match = existing - if best_ratio >= threshold: - logger.debug(f"Claim-Match ({best_ratio:.2f}): '{new_claim[:50]}...' → '{best_match['claim'][:50]}...'") + if best_score >= threshold: + logger.debug( + f"Claim-Match ({best_score:.2f}): " + f"'{new_claim[:50]}...' -> '{best_match['claim'][:50]}...'" + ) return best_match return None +def deduplicate_new_facts(facts: list[dict], threshold: float = 0.70) -> list[dict]: + """Dedupliziert Fakten aus einer einzelnen LLM-Antwort vor dem DB-Insert. + + Clustert aehnliche Claims und behaelt pro Cluster den mit dem + hoechsten Status und den meisten Quellen. + """ + if not facts: + return [] + + clusters: list[list[dict]] = [] + for fact in facts: + matched_cluster = None + for cluster in clusters: + if find_matching_claim(fact.get("claim", ""), cluster, threshold=threshold): + matched_cluster = cluster + break + if matched_cluster is not None: + matched_cluster.append(fact) + else: + clusters.append([fact]) + + result = [] + for cluster in clusters: + best = max(cluster, key=lambda f: ( + STATUS_PRIORITY.get(f.get("status", "developing"), 0), + f.get("sources_count", 0), + )) + result.append(best) + + if len(result) < len(facts): + logger.info( + f"Fakten-Dedup: {len(facts)} -> {len(result)} " + f"(-{len(facts) - len(result)} Duplikate)" + ) + return result + + class FactCheckerAgent: """Prüft Fakten über Claude CLI gegen unabhängige Quellen.""" diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 25c8022..82c7734 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -9,7 +9,7 @@ from typing import Optional from urllib.parse import urlparse, urlunparse from agents.claude_client import UsageAccumulator -from agents.factchecker import find_matching_claim +from agents.factchecker import find_matching_claim, deduplicate_new_facts from source_rules import ( _detect_category, _extract_domain, @@ -890,6 +890,9 @@ class AgentOrchestrator: all_articles_for_fc = [dict(row) for row in await cursor.fetchall()] fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type) + # Pre-Dedup: Duplikate aus LLM-Antwort entfernen + fact_checks = deduplicate_new_facts(fact_checks) + if fc_usage: usage_acc.add(fc_usage) diff --git a/src/main.py b/src/main.py index 275e733..3ada9fc 100644 --- a/src/main.py +++ b/src/main.py @@ -21,6 +21,7 @@ from auth import decode_token from agents.orchestrator import orchestrator from services.source_health import run_health_checks, get_health_summary from services.source_suggester import generate_suggestions +from services.fact_consolidation import consolidate_fact_checks # Logging os.makedirs(LOG_DIR, exist_ok=True) diff --git a/src/services/fact_consolidation.py b/src/services/fact_consolidation.py new file mode 100644 index 0000000..867bcbd --- /dev/null +++ b/src/services/fact_consolidation.py @@ -0,0 +1,243 @@ +"""Periodische Faktencheck-Konsolidierung via Haiku. + +Erkennt und merged semantische Duplikate unter Faktenchecks. +Laeuft als Scheduler-Job alle 6 Stunden. +""" +import json +import logging +import re +from datetime import datetime + +from config import CLAUDE_MODEL_FAST, TIMEZONE +from database import get_db +from agents.claude_client import call_claude + +logger = logging.getLogger("osint.fact_consolidation") + +STATUS_PRIORITY = { + "confirmed": 5, "established": 5, + "contradicted": 4, "disputed": 4, + "unconfirmed": 3, "unverified": 3, + "developing": 1, +} + +CONSOLIDATION_PROMPT = ( + "Du bist ein Deduplizierungs-Agent. Du bekommst eine Liste von Faktenchecks (ID + Claim + Status).\n" + "Finde Gruppen von Fakten, die inhaltlich DASSELBE aussagen (auch bei unterschiedlicher Formulierung).\n\n" + "REGELN:\n" + '- Gleicher Sachverhalt = gleiche Gruppe (z.B. "Khamenei wurde getoetet" und "Chamenei bei Angriff ums Leben gekommen")\n' + "- Unterschiedliche Detailtiefe zum SELBEN Fakt = gleiche Gruppe\n" + '- VERSCHIEDENE Sachverhalte = verschiedene Gruppen (z.B. "Angriff auf Isfahan" vs "Angriff auf Teheran")\n' + "- Eine Gruppe muss mindestens 2 Eintraege haben\n\n" + "Antworte NUR als JSON-Array von Gruppen. Jede Gruppe ist ein Array von IDs:\n" + "[[1,5,12], [3,8], [20,25,30]]\n\n" + "Wenn keine Duplikate: antworte mit []\n\n" + "FAKTEN:\n{facts_text}" +) + + +async def _ask_haiku_for_clusters(facts: list[dict]) -> list[list[int]]: + """Fragt Haiku welche Fakten semantische Duplikate sind.""" + facts_text = "\n".join( + f'ID={f["id"]} [{f["status"]}]: {f["claim"]}' + for f in facts + ) + prompt = CONSOLIDATION_PROMPT.format(facts_text=facts_text) + + try: + result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + data = json.loads(result) + if isinstance(data, list) and all(isinstance(g, list) for g in data): + return data + except json.JSONDecodeError: + match = re.search(r'\[.*\]', result, re.DOTALL) + if match: + try: + data = json.loads(match.group()) + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + except Exception as e: + logger.error(f"Haiku-Cluster-Anfrage fehlgeschlagen: {e}") + + return [] + + +async def consolidate_fact_checks(max_per_incident: int = 25): + """Konsolidiert doppelte Faktenchecks via Haiku-Clustering.""" + db = await get_db() + try: + cursor = await db.execute( + "SELECT incident_id, COUNT(*) as cnt FROM fact_checks " + "GROUP BY incident_id HAVING cnt > ?", + (max_per_incident,), + ) + bloated = [dict(row) for row in await cursor.fetchall()] + + if not bloated: + logger.info("Faktencheck-Konsolidierung: keine aufgeblaehten Incidents gefunden") + return 0 + + total_removed = 0 + + for row in bloated: + incident_id = row["incident_id"] + + # Pruefe ob gerade ein Refresh laeuft + cursor_rl = await db.execute( + "SELECT COUNT(*) as cnt FROM refresh_log " + "WHERE incident_id = ? AND status = 'running'", + (incident_id,), + ) + rl_row = await cursor_rl.fetchone() + if rl_row and rl_row["cnt"] > 0: + logger.info( + f"Incident {incident_id} hat laufenden Refresh, ueberspringe" + ) + continue + + cursor2 = await db.execute( + "SELECT id, claim, status, sources_count, evidence, " + "checked_at, status_history " + "FROM fact_checks WHERE incident_id = ? " + "ORDER BY checked_at DESC", + (incident_id,), + ) + all_facts = [dict(r) for r in await cursor2.fetchall()] + + if len(all_facts) <= max_per_incident: + continue + + # Haiku in Batches fragen + all_clusters = [] + batch_size = 80 + for i in range(0, len(all_facts), batch_size): + batch = all_facts[i:i + batch_size] + clusters = await _ask_haiku_for_clusters(batch) + all_clusters.extend(clusters) + + # Pro Cluster: besten behalten, Rest loeschen + ids_to_delete = [] + facts_by_id = {f["id"]: f for f in all_facts} + + for cluster_ids in all_clusters: + valid_ids = [cid for cid in cluster_ids if cid in facts_by_id] + if len(valid_ids) <= 1: + continue + + cluster_facts = [facts_by_id[cid] for cid in valid_ids] + best = max(cluster_facts, key=lambda f: ( + STATUS_PRIORITY.get(f["status"], 0), + f.get("sources_count", 0), + f.get("checked_at", ""), + )) + + for fact in cluster_facts: + if fact["id"] != best["id"]: + ids_to_delete.append(fact["id"]) + + if ids_to_delete: + unique_ids = list(set(ids_to_delete)) + placeholders = ",".join("?" * len(unique_ids)) + await db.execute( + f"DELETE FROM fact_checks WHERE id IN ({placeholders})", + unique_ids, + ) + total_removed += len(unique_ids) + logger.info( + f"Incident {incident_id}: {len(unique_ids)} Duplikate entfernt, " + f"{len(all_facts) - len(unique_ids)} verbleiben" + ) + + await db.commit() + if total_removed > 0: + logger.info( + f"Faktencheck-Konsolidierung: {total_removed} Duplikate entfernt" + ) + return total_removed + + except Exception as e: + logger.error( + f"Faktencheck-Konsolidierung Fehler: {e}", exc_info=True + ) + return 0 + finally: + await db.close() + + +async def auto_resolve_stale_facts(incident_id: int, confirmed_claims: list[dict], db): + """Loest veraltete developing/unconfirmed Fakten automatisch auf, + wenn ein bestaetigter Match gefunden wird. + + Wird vom Orchestrator nach jedem Faktencheck aufgerufen. + """ + if not confirmed_claims: + return 0 + + from agents.factchecker import find_matching_claim + + now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') + + cursor = await db.execute( + "SELECT id, claim, status, status_history FROM fact_checks " + "WHERE incident_id = ? " + "AND status IN ('developing', 'unconfirmed', 'unverified')", + (incident_id,), + ) + stale_facts = [dict(row) for row in await cursor.fetchall()] + + if not stale_facts: + return 0 + + resolved_count = 0 + resolved_ids = set() + + for confirmed_fc in confirmed_claims: + confirmed_claim_text = confirmed_fc.get("claim", "") + for stale in stale_facts: + if stale["id"] in resolved_ids: + continue + # Niedrigerer Threshold (0.65) fuer aggressiveres Auto-Resolve + if find_matching_claim( + confirmed_claim_text, [stale], threshold=0.65 + ): + try: + history = json.loads( + stale.get("status_history") or "[]" + ) + except (ValueError, TypeError): + history = [] + + new_status = ( + "confirmed" + if confirmed_fc.get("status") == "confirmed" + else "established" + ) + history.append({ + "status": new_status, + "at": now, + "reason": "auto-resolved", + }) + + await db.execute( + "UPDATE fact_checks SET status = ?, " + "evidence = COALESCE(evidence, '') " + "|| ' [Auto-aufgeloest: uebereinstimmender Fakt bestaetigt]', " + "status_history = ?, checked_at = ? WHERE id = ?", + (new_status, json.dumps(history), now, stale["id"]), + ) + resolved_ids.add(stale["id"]) + resolved_count += 1 + logger.info( + f"Auto-resolved Fakt #{stale['id']}: " + f"'{stale['claim'][:60]}...' -> {new_status}" + ) + + if resolved_count > 0: + logger.info( + f"Auto-Resolve: {resolved_count} veraltete Fakten " + f"fuer Incident {incident_id} aufgeloest" + ) + + return resolved_count