"""Post-Refresh Quality Check via Haiku. Prueft nach jedem Refresh: 1. Semantische Faktencheck-Duplikate (Haiku-Clustering mit Fuzzy-Vorfilter) 2. Falsch kategorisierte Karten-Locations (Haiku bewertet Kontext der Lage) 3. Umlaut-Normalisierung in summary + latest_developments (deterministisch) Regelbasierte Listen dienen als Fallback falls Haiku fehlschlaegt. """ import json import logging import os import re from difflib import SequenceMatcher from agents.claude_client import call_claude from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.post_refresh_qc") STATUS_PRIORITY = { "confirmed": 5, "established": 5, "contradicted": 4, "disputed": 4, "unconfirmed": 3, "unverified": 3, "developing": 1, } # --------------------------------------------------------------------------- # 1. Faktencheck-Duplikate # --------------------------------------------------------------------------- _DEDUP_PROMPT = """\ Du bist ein Deduplizierungs-Agent fuer Faktenchecks eines OSINT-Monitors. LAGE: {incident_title} Unten stehen Faktenchecks (ID + Status + Claim). Finde Gruppen von Fakten, die INHALTLICH DASSELBE aussagen, auch wenn sie unterschiedlich formuliert sind. REGELN: - Gleicher Sachverhalt = gleiche Gruppe (z.B. "Trump fordert Kapitulation" und "US-Praesident verlangt bedingungslose Aufgabe") - Unterschiedliche Detailtiefe zum SELBEN Fakt = gleiche Gruppe - VERSCHIEDENE Sachverhalte = VERSCHIEDENE Gruppen (z.B. "Angriff auf Isfahan" vs "Angriff auf Teheran" sind NICHT dasselbe) - Eine Gruppe muss mindestens 2 Eintraege haben Antworte NUR als JSON-Array von Gruppen. Jede Gruppe ist ein Array von IDs: [[1,5,12], [3,8]] Wenn keine Duplikate: antworte mit [] FAKTEN: {facts_text}""" async def _haiku_find_duplicate_clusters( facts: list[dict], incident_title: str ) -> list[list[int]]: """Fragt Haiku welche Fakten semantische Duplikate sind.""" facts_text = "\n".join( f'ID={f["id"]} [{f["status"]}]: {f["claim"]}' for f in facts ) prompt = _DEDUP_PROMPT.format( incident_title=incident_title, facts_text=facts_text ) try: result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) data = json.loads(result) if isinstance(data, list) and all(isinstance(g, list) for g in data): return data except json.JSONDecodeError: match = re.search(r'\[.*\]', result, re.DOTALL) if match: try: data = json.loads(match.group()) if isinstance(data, list): return data except json.JSONDecodeError: pass except Exception as e: logger.warning("Haiku Duplikat-Clustering fehlgeschlagen: %s", e) return [] def _fuzzy_prefilter(all_facts: list[dict], max_candidates: int = 80) -> list[dict]: """Waehlt Kandidaten fuer Haiku-Check per Fuzzy-Vorfilter aus. Findet Paare mit Aehnlichkeit >= 0.60 und gibt die betroffenen Fakten zurueck. Begrenzt auf max_candidates um Haiku-Tokens zu sparen. """ from agents.factchecker import normalize_claim, _keyword_set if len(all_facts) <= max_candidates: return all_facts normalized = [] for f in all_facts: nc = normalize_claim(f["claim"]) kw = _keyword_set(f["claim"]) normalized.append((f, nc, kw)) candidate_ids = set() recent = normalized[:60] for i, (fact_a, norm_a, kw_a) in enumerate(recent): for j, (fact_b, norm_b, kw_b) in enumerate(normalized): if i >= j or fact_b["id"] == fact_a["id"]: continue if not norm_a or not norm_b: continue len_ratio = len(norm_a) / len(norm_b) if norm_b else 0 if len_ratio > 2.5 or len_ratio < 0.4: continue seq_ratio = SequenceMatcher(None, norm_a, norm_b).ratio() kw_union = kw_a | kw_b jaccard = len(kw_a & kw_b) / len(kw_union) if kw_union else 0.0 combined = 0.7 * seq_ratio + 0.3 * jaccard if combined >= 0.60: candidate_ids.add(fact_a["id"]) candidate_ids.add(fact_b["id"]) if len(candidate_ids) >= max_candidates: break if len(candidate_ids) >= max_candidates: break candidates = [f for f in all_facts if f["id"] in candidate_ids] logger.info( "Fuzzy-Vorfilter: %d/%d Fakten als Duplikat-Kandidaten identifiziert", len(candidates), len(all_facts), ) return candidates async def check_fact_duplicates(db, incident_id: int, incident_title: str) -> int: """Prueft auf semantische Faktencheck-Duplikate via Haiku. 1. Fuzzy-Vorfilter reduziert auf relevante Kandidaten 2. Haiku clustert semantische Duplikate 3. Pro Cluster: behalte besten Fakt, loesche Rest Returns: Anzahl entfernter Duplikate. """ cursor = await db.execute( "SELECT id, claim, status, sources_count, evidence, checked_at " "FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC", (incident_id,), ) all_facts = [dict(row) for row in await cursor.fetchall()] if len(all_facts) < 2: return 0 # Schritt 1: Fuzzy-Vorfilter candidates = _fuzzy_prefilter(all_facts) if len(candidates) < 2: return 0 # Schritt 2: Haiku-Clustering (in Batches von max 80) all_clusters = [] batch_size = 80 for i in range(0, len(candidates), batch_size): batch = candidates[i:i + batch_size] clusters = await _haiku_find_duplicate_clusters(batch, incident_title) all_clusters.extend(clusters) if not all_clusters: logger.info("QC Fakten: Haiku fand keine Duplikate") return 0 # Schritt 3: Pro Cluster besten behalten, Rest loeschen facts_by_id = {f["id"]: f for f in all_facts} ids_to_delete = set() for cluster_ids in all_clusters: valid_ids = [cid for cid in cluster_ids if cid in facts_by_id] if len(valid_ids) <= 1: continue cluster_facts = [facts_by_id[cid] for cid in valid_ids] best = max(cluster_facts, key=lambda f: ( STATUS_PRIORITY.get(f["status"], 0), f.get("sources_count", 0), f.get("checked_at", ""), )) for fact in cluster_facts: if fact["id"] != best["id"]: ids_to_delete.add(fact["id"]) logger.info( "QC Duplikat: ID %d entfernt, behalte ID %d ('%s')", fact["id"], best["id"], best["claim"][:60], ) if ids_to_delete: placeholders = ",".join("?" * len(ids_to_delete)) await db.execute( f"DELETE FROM fact_checks WHERE id IN ({placeholders})", list(ids_to_delete), ) logger.info( "QC: %d Faktencheck-Duplikate entfernt fuer Incident %d", len(ids_to_delete), incident_id, ) return len(ids_to_delete) # --------------------------------------------------------------------------- # 2. Karten-Location-Kategorien # --------------------------------------------------------------------------- _LOCATION_PROMPT = """\ Du bist ein Geopolitik-Experte fuer einen OSINT-Monitor. LAGE: {incident_title} BESCHREIBUNG: {incident_desc} {labels_context} Unten stehen Orte, die auf der Karte als "primary" (Hauptgeschehen) markiert sind. Pruefe fuer jeden Ort, ob die Kategorie "primary" korrekt ist. KATEGORIEN: - primary: {label_primary} — Wo das Hauptgeschehen stattfindet - secondary: {label_secondary} — Direkte Reaktionen/Gegenmassnahmen - tertiary: {label_tertiary} — Entscheidungstraeger/Beteiligte - mentioned: {label_mentioned} — Nur erwaehnt REGELN: - Nur Orte die DIREKT vom Hauptgeschehen betroffen sind = "primary" - Orte mit Reaktionen/Gegenmassnahmen = "secondary" - Orte von Entscheidungstraegern (z.B. Hauptstaedte) = "tertiary" - Nur erwaehnte Orte = "mentioned" - Im Zweifel: "mentioned" Antworte als JSON-Array mit Korrekturen. Nur Eintraege die GEAENDERT werden muessen: [{{"id": 123, "category": "mentioned"}}, {{"id": 456, "category": "tertiary"}}] Wenn alle Kategorien korrekt sind: antworte mit [] ORTE (aktuell alle als "primary" markiert): {locations_text}""" async def check_location_categories( db, incident_id: int, incident_title: str, incident_desc: str ) -> int: """Prueft Karten-Location-Kategorien via Haiku. Returns: Anzahl korrigierter Eintraege. """ cursor = await db.execute( "SELECT id, location_name, latitude, longitude, category " "FROM article_locations WHERE incident_id = ? AND category = 'primary'", (incident_id,), ) targets = [dict(row) for row in await cursor.fetchall()] if not targets: return 0 # Category-Labels aus DB laden (fuer kontextabhaengige Prompt-Beschreibungen) cursor = await db.execute( "SELECT category_labels FROM incidents WHERE id = ?", (incident_id,) ) inc_row = await cursor.fetchone() labels = {} if inc_row and inc_row["category_labels"]: try: labels = json.loads(inc_row["category_labels"]) except (json.JSONDecodeError, TypeError): pass label_primary = labels.get("primary") or "Hauptgeschehen" label_secondary = labels.get("secondary") or "Reaktionen" label_tertiary = labels.get("tertiary") or "Beteiligte" label_mentioned = labels.get("mentioned") or "Erwaehnt" labels_context = "" if labels: labels_context = f"KATEGORIE-LABELS: primary={label_primary}, secondary={label_secondary}, tertiary={label_tertiary}, mentioned={label_mentioned}\n" # Dedupliziere nach location_name fuer den Prompt (spart Tokens) unique_names = {} ids_by_name = {} for loc in targets: name = loc["location_name"] if name not in unique_names: unique_names[name] = loc ids_by_name[name] = [] ids_by_name[name].append(loc["id"]) locations_text = "\n".join( f'ID={loc["id"]} | {loc["location_name"]} ({loc["latitude"]:.2f}, {loc["longitude"]:.2f})' for loc in unique_names.values() ) prompt = _LOCATION_PROMPT.format( incident_title=incident_title, incident_desc=incident_desc[:500] if incident_desc else "(keine Beschreibung)", labels_context=labels_context, label_primary=label_primary, label_secondary=label_secondary, label_tertiary=label_tertiary, label_mentioned=label_mentioned, locations_text=locations_text, ) fixes = [] try: result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) data = json.loads(result) if isinstance(data, list): fixes = data except json.JSONDecodeError: match = re.search(r'\[.*\]', result, re.DOTALL) if match: try: data = json.loads(match.group()) if isinstance(data, list): fixes = data except json.JSONDecodeError: pass except Exception as e: logger.warning("Haiku Location-Check fehlgeschlagen: %s", e) return 0 if not fixes: logger.info("QC Locations: Haiku fand keine falschen Kategorien") return 0 # Korrekturen anwenden (auch auf alle IDs mit gleichem Namen) total_fixed = 0 representative_ids = {loc["id"]: name for name, loc in unique_names.items()} for fix in fixes: fix_id = fix.get("id") new_cat = fix.get("category") if not fix_id or not new_cat: continue if new_cat not in ("primary", "secondary", "tertiary", "mentioned"): continue # Finde den location_name fuer diese ID loc_name = representative_ids.get(fix_id) if not loc_name: continue # Korrigiere ALLE Eintraege mit diesem Namen all_ids = ids_by_name.get(loc_name, [fix_id]) placeholders = ",".join("?" * len(all_ids)) await db.execute( f"UPDATE article_locations SET category = ? " f"WHERE id IN ({placeholders}) AND category = 'primary'", [new_cat] + all_ids, ) total_fixed += len(all_ids) logger.info( "QC Location: '%s' (%d Eintraege): primary -> %s", loc_name, len(all_ids), new_cat, ) if total_fixed > 0: logger.info( "QC: %d Karten-Location-Kategorien korrigiert fuer Incident %d", total_fixed, incident_id, ) return total_fixed # --------------------------------------------------------------------------- # Hauptfunktion # --------------------------------------------------------------------------- async def run_post_refresh_qc(db, incident_id: int) -> dict: """Fuehrt den kompletten Post-Refresh Quality Check via Haiku durch. Returns: Dict mit Ergebnissen {facts_removed, locations_fixed}. """ try: # Lage-Titel und Beschreibung laden cursor = await db.execute( "SELECT title, description FROM incidents WHERE id = ?", (incident_id,), ) row = await cursor.fetchone() if not row: return {"facts_removed": 0, "locations_fixed": 0} incident_title = row["title"] or "" incident_desc = row["description"] or "" facts_removed = await check_fact_duplicates(db, incident_id, incident_title) locations_fixed = await check_location_categories( db, incident_id, incident_title, incident_desc ) umlauts_fixed = await normalize_umlaut_fields(db, incident_id) if facts_removed > 0 or locations_fixed > 0 or umlauts_fixed > 0: await db.commit() logger.info( "Post-Refresh QC fuer Incident %d: %d Duplikate entfernt, %d Locations korrigiert, %d Umlaute normalisiert", incident_id, facts_removed, locations_fixed, umlauts_fixed, ) return { "facts_removed": facts_removed, "locations_fixed": locations_fixed, "umlauts_fixed": umlauts_fixed, } except Exception as e: logger.error( "Post-Refresh QC Fehler fuer Incident %d: %s", incident_id, e, exc_info=True, ) return {"facts_removed": 0, "locations_fixed": 0, "umlauts_fixed": 0, "error": str(e)} # --------------------------------------------------------------------------- # 3. Umlaut-Normalisierung (deterministisch, Sicherheitsnetz gegen LLM-Drift) # --------------------------------------------------------------------------- # Das grosse Mapping wird aus umlaut_dict.json geladen. Das JSON wird einmalig # aus hunspell-de-de erzeugt (siehe scripts/build_umlaut_dict.py) und enthaelt # >150.000 deutsche Umlaut-Woerter inklusive Flexionsformen. Mehrdeutigkeiten # (z. B. "dass"/"daß", "Masse"/"Maße") sind bereits ausgefiltert. _DICT_PATH = os.path.join(os.path.dirname(__file__), "umlaut_dict.json") try: with open(_DICT_PATH, encoding="utf-8") as _dict_file: _UMLAUT_REPLACEMENTS = json.load(_dict_file) logger.info("Umlaut-Dict geladen: %d Eintraege aus %s", len(_UMLAUT_REPLACEMENTS), _DICT_PATH) except FileNotFoundError: logger.warning("umlaut_dict.json nicht gefunden – Umlaut-Normalisierung laeuft mit leerem Dict") _UMLAUT_REPLACEMENTS = {} # _MANUAL_SUPPLEMENT: Lueckenfueller fuer Woerter, die hunspell-de-de nicht abdeckt # (primaer Komposita und seltene Konjunktiv-Formen). Wird ueber das Korpus-Dict gelegt. _MANUAL_SUPPLEMENT = { # Konjunktiv I von "saeen" (selten, aber kommt vor) "saee": "säe", "saeen": "säen", "gesaet": "gesät", # Komposita mit Amtstitel, die hunspell als Teile kennt aber nicht kombiniert "aussenminister": "außenminister", "aussenministerin": "außenministerin", "aussenministern": "außenministern", "aussenpolitik": "außenpolitik", "aussenpolitisch": "außenpolitisch", "aussenpolitische": "außenpolitische", "aussenpolitischer": "außenpolitischer", "aussenpolitischen": "außenpolitischen", "vizepraesident": "vizepräsident", "vizepraesidenten": "vizepräsidenten", "vizepraesidentin": "vizepräsidentin", "parlamentspraesident": "parlamentspräsident", "parlamentspraesidenten": "parlamentspräsidenten", "parlamentspraesidentin": "parlamentspräsidentin", "generalsekretaer": "generalsekretär", "generalsekretaerin": "generalsekretärin", "generalsekretaers": "generalsekretärs", "staatssekretaer": "staatssekretär", "staatssekretaerin": "staatssekretärin", # Strassen-Komposita "wasserstrasse": "wasserstraße", "wasserstrassen": "wasserstraßen", "hauptstrasse": "hauptstraße", "autostrasse": "autostraße", "bundesstrasse": "bundesstraße", "landstrasse": "landstraße", # Militaer-Komposita (haeufig in OSINT-Kontext) "militaerkommando": "militärkommando", "militaerbasis": "militärbasis", "militaerschlag": "militärschlag", "militaerschlaege": "militärschläge", # Suedeutsch-Doppel-D-Spezialfall (haendisch korrigierbar) "suedeutsch": "süddeutsch", "suedeutsche": "süddeutsche", "suedeutschen": "süddeutschen", # Fuehrungs- und Oeffnungs-Komposita (hunspell kennt die Stamm-Woerter, nicht die Komposita) "wiedereroeffnung": "wiedereröffnung", "wiedereroeffnungen": "wiedereröffnungen", "kriegsfuehrung": "kriegsführung", "kriegsfuehrer": "kriegsführer", "fuehrungsebene": "führungsebene", "fuehrungsebenen": "führungsebenen", "fuehrungskraft": "führungskraft", "fuehrungskraefte": "führungskräfte", "fuehrungsposition": "führungsposition", "fuehrungspositionen": "führungspositionen", "fuehrungsrolle": "führungsrolle", "geschaeftsfuehrer": "geschäftsführer", "geschaeftsfuehrung": "geschäftsführung", "staatsfuehrung": "staatsführung", "parteifuehrung": "parteiführung", "militaerfuehrung": "militärführung", } # Capitalize-Varianten fuer das Supplement (hunspell-Korpus hat sie schon eingebaut) _MANUAL_SUPPLEMENT_FULL = {} for _k, _v in _MANUAL_SUPPLEMENT.items(): _MANUAL_SUPPLEMENT_FULL[_k] = _v if _k[:1].islower(): _MANUAL_SUPPLEMENT_FULL[_k[:1].upper() + _k[1:]] = _v[:1].upper() + _v[1:] # Supplement ueber das Korpus-Dict legen (Supplement hat Vorrang bei Kollision) _UMLAUT_REPLACEMENTS = {**_UMLAUT_REPLACEMENTS, **_MANUAL_SUPPLEMENT_FULL} # Whitelist: Tokens, die trotz Dict-Match NIE ersetzt werden (Eigennamen, # englische Fremdwoerter, Fachbegriffe). Greift vor dem Dict-Lookup. _UMLAUT_WHITELIST = frozenset({ # Englische Fremdwoerter "Boeing", "Business", "Access", "Process", "Message", "Password", "Miss", "Boss", "Goethe", "Yahoo", # Eigennamen, die zufaellig "ss" enthalten und nicht umgeschrieben werden sollen "Israel", "Israels", }) # Tokenizer: matcht Woerter aus Buchstaben (inkl. deutschen Umlauten). # Performanter als ein alternierendes Regex ueber 150k Keys — O(1) Dict-Lookup pro Wort. _WORD_PATTERN = re.compile(r"[A-Za-zÄÖÜäöüß]+") def normalize_german_umlauts(text: str) -> tuple[str, int]: """Ersetzt typische deutsche Umschreibungen durch echte Umlaute. Deterministisch, wortgrenzen-basiert, case-preserving. Sicher gegen englische Wortbestandteile (Boeing, Business, Access) weil nur explizit gelistete deutsche Woerter ersetzt werden. Rueckgabe: (normalisierter_text, anzahl_ersetzungen) """ if not text: return text, 0 count = [0] def _replace(match: re.Match) -> str: word = match.group(0) if word in _UMLAUT_WHITELIST: return word replacement = _UMLAUT_REPLACEMENTS.get(word) if replacement is None: return word count[0] += 1 return replacement new_text = _WORD_PATTERN.sub(_replace, text) return new_text, count[0] async def normalize_umlaut_fields(db, incident_id: int) -> int: """Liest summary + latest_developments eines Incidents, normalisiert Umlaute, schreibt bei tatsaechlichen Aenderungen zurueck. Rueckgabe: Anzahl der Ersetzungen insgesamt (summary + latest_developments). """ cursor = await db.execute( "SELECT summary, latest_developments FROM incidents WHERE id = ?", (incident_id,), ) row = await cursor.fetchone() if not row: return 0 orig_summary = row["summary"] or "" orig_dev = row["latest_developments"] or "" new_summary, count_summary = normalize_german_umlauts(orig_summary) new_dev, count_dev = normalize_german_umlauts(orig_dev) total = count_summary + count_dev if total == 0: return 0 await db.execute( "UPDATE incidents SET summary = ?, latest_developments = ? WHERE id = ?", ( new_summary if count_summary > 0 else orig_summary, new_dev if count_dev > 0 else orig_dev, incident_id, ), ) logger.info( "Umlaut-Normalisierung Incident %d: %d in summary, %d in latest_developments", incident_id, count_summary, count_dev, ) return total