From 445f64593687767c19f08a2223130c82b130ec25 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Tue, 10 Mar 2026 21:49:50 +0100 Subject: [PATCH] feat: Post-Refresh QC auf Haiku umgestellt Faktencheck-Duplikate: Fuzzy-Vorfilter (Threshold 0.60) reduziert Kandidaten, Haiku clustert semantische Duplikate kontextbezogen. Karten-Locations: Haiku bewertet target-Kategorien anhand des Lage-Kontexts statt statischer Wortlisten. Kosten ca. 0.005-0.008 USD pro Check. Co-Authored-By: Claude Opus 4.6 --- src/services/post_refresh_qc.py | 404 +++++++++++++++++++++++--------- 1 file changed, 288 insertions(+), 116 deletions(-) diff --git a/src/services/post_refresh_qc.py b/src/services/post_refresh_qc.py index a84417a..83a7b90 100644 --- a/src/services/post_refresh_qc.py +++ b/src/services/post_refresh_qc.py @@ -1,54 +1,149 @@ -"""Post-Refresh Quality Check: Prueft Faktenchecks und Karten-Locations nach jedem Refresh. +"""Post-Refresh Quality Check via Haiku. -Erkennt: -1. Inhaltliche Faktencheck-Duplikate (fuzzy matching) -2. Falsch kategorisierte Karten-Locations (z.B. Laender als 'target' die nicht angegriffen wurden) +Prueft nach jedem Refresh: +1. Semantische Faktencheck-Duplikate (Haiku-Clustering mit Fuzzy-Vorfilter) +2. Falsch kategorisierte Karten-Locations (Haiku bewertet Kontext der Lage) + +Regelbasierte Listen dienen als Fallback falls Haiku fehlschlaegt. """ +import json import logging +import re from difflib import SequenceMatcher +from agents.claude_client import call_claude +from config import CLAUDE_MODEL_FAST + logger = logging.getLogger("osint.post_refresh_qc") -# Orte die in einem Konflikt-Kontext fast nie echte Angriffsziele sind -# (werden typischerweise wirtschaftlich/politisch erwaehnt) -_NON_TARGET_LOCATIONS = { - # Weit entfernte Laender (keine direkten Kriegsziele) - "australia", "australien", "northern territory", "queensland", - "new south wales", "victoria", - "cuba", "kuba", - "new york city", "new york", "washington", - "taiwan", "south korea", "japan", - "afghanistan", "pakistan", "karachi", - "china", "peking", "beijing", - "indien", "india", "new delhi", - "brasilien", "brazil", - "mexiko", "mexico", - "argentinien", "argentina", - "kanada", "canada", - "philippinen", "philippines", - "indonesien", "indonesia", - "nigeria", "south africa", +STATUS_PRIORITY = { + "confirmed": 5, "established": 5, + "contradicted": 4, "disputed": 4, + "unconfirmed": 3, "unverified": 3, + "developing": 1, } -# Orte die je nach Konflikt als actor/response statt target kategorisiert werden sollten -_ACTOR_NOT_TARGET = { - "usa", "united states", "us", "vereinigte staaten", - "deutschland", "germany", - "frankreich", "france", - "grossbritannien", "united kingdom", "uk", -} +# --------------------------------------------------------------------------- +# 1. Faktencheck-Duplikate +# --------------------------------------------------------------------------- + +_DEDUP_PROMPT = """\ +Du bist ein Deduplizierungs-Agent fuer Faktenchecks eines OSINT-Monitors. + +LAGE: {incident_title} + +Unten stehen Faktenchecks (ID + Status + Claim). Finde Gruppen von Fakten, +die INHALTLICH DASSELBE aussagen, auch wenn sie unterschiedlich formuliert sind. + +REGELN: +- Gleicher Sachverhalt = gleiche Gruppe + (z.B. "Trump fordert Kapitulation" und "US-Praesident verlangt bedingungslose Aufgabe") +- Unterschiedliche Detailtiefe zum SELBEN Fakt = gleiche Gruppe +- VERSCHIEDENE Sachverhalte = VERSCHIEDENE Gruppen + (z.B. "Angriff auf Isfahan" vs "Angriff auf Teheran" sind NICHT dasselbe) +- Eine Gruppe muss mindestens 2 Eintraege haben + +Antworte NUR als JSON-Array von Gruppen. Jede Gruppe ist ein Array von IDs: +[[1,5,12], [3,8]] + +Wenn keine Duplikate: antworte mit [] + +FAKTEN: +{facts_text}""" -async def check_fact_duplicates(db, incident_id: int) -> int: - """Prueft auf inhaltliche Faktencheck-Duplikate innerhalb einer Lage. +async def _haiku_find_duplicate_clusters( + facts: list[dict], incident_title: str +) -> list[list[int]]: + """Fragt Haiku welche Fakten semantische Duplikate sind.""" + facts_text = "\n".join( + f'ID={f["id"]} [{f["status"]}]: {f["claim"]}' + for f in facts + ) + prompt = _DEDUP_PROMPT.format( + incident_title=incident_title, facts_text=facts_text + ) + try: + result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + data = json.loads(result) + if isinstance(data, list) and all(isinstance(g, list) for g in data): + return data + except json.JSONDecodeError: + match = re.search(r'\[.*\]', result, re.DOTALL) + if match: + try: + data = json.loads(match.group()) + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + except Exception as e: + logger.warning("Haiku Duplikat-Clustering fehlgeschlagen: %s", e) + return [] - Nutzt Fuzzy-Matching (SequenceMatcher + Keyword-Jaccard) um - semantisch identische Claims zu finden und das schwaecher belegte zu entfernen. - Returns: Anzahl entfernter Duplikate. +def _fuzzy_prefilter(all_facts: list[dict], max_candidates: int = 80) -> list[dict]: + """Waehlt Kandidaten fuer Haiku-Check per Fuzzy-Vorfilter aus. + + Findet Paare mit Aehnlichkeit >= 0.60 und gibt die betroffenen Fakten zurueck. + Begrenzt auf max_candidates um Haiku-Tokens zu sparen. """ from agents.factchecker import normalize_claim, _keyword_set + if len(all_facts) <= max_candidates: + return all_facts + + normalized = [] + for f in all_facts: + nc = normalize_claim(f["claim"]) + kw = _keyword_set(f["claim"]) + normalized.append((f, nc, kw)) + + candidate_ids = set() + recent = normalized[:60] + + for i, (fact_a, norm_a, kw_a) in enumerate(recent): + for j, (fact_b, norm_b, kw_b) in enumerate(normalized): + if i >= j or fact_b["id"] == fact_a["id"]: + continue + if not norm_a or not norm_b: + continue + + len_ratio = len(norm_a) / len(norm_b) if norm_b else 0 + if len_ratio > 2.5 or len_ratio < 0.4: + continue + + seq_ratio = SequenceMatcher(None, norm_a, norm_b).ratio() + kw_union = kw_a | kw_b + jaccard = len(kw_a & kw_b) / len(kw_union) if kw_union else 0.0 + combined = 0.7 * seq_ratio + 0.3 * jaccard + + if combined >= 0.60: + candidate_ids.add(fact_a["id"]) + candidate_ids.add(fact_b["id"]) + + if len(candidate_ids) >= max_candidates: + break + if len(candidate_ids) >= max_candidates: + break + + candidates = [f for f in all_facts if f["id"] in candidate_ids] + logger.info( + "Fuzzy-Vorfilter: %d/%d Fakten als Duplikat-Kandidaten identifiziert", + len(candidates), len(all_facts), + ) + return candidates + + +async def check_fact_duplicates(db, incident_id: int, incident_title: str) -> int: + """Prueft auf semantische Faktencheck-Duplikate via Haiku. + + 1. Fuzzy-Vorfilter reduziert auf relevante Kandidaten + 2. Haiku clustert semantische Duplikate + 3. Pro Cluster: behalte besten Fakt, loesche Rest + + Returns: Anzahl entfernter Duplikate. + """ cursor = await db.execute( "SELECT id, claim, status, sources_count, evidence, checked_at " "FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC", @@ -59,61 +154,45 @@ async def check_fact_duplicates(db, incident_id: int) -> int: if len(all_facts) < 2: return 0 - STATUS_PRIORITY = { - "confirmed": 5, "established": 5, - "contradicted": 4, "disputed": 4, - "unconfirmed": 3, "unverified": 3, - "developing": 1, - } + # Schritt 1: Fuzzy-Vorfilter + candidates = _fuzzy_prefilter(all_facts) + if len(candidates) < 2: + return 0 - # Vorberechnung: normalisierte Claims und Keywords - normalized = [] - for f in all_facts: - nc = normalize_claim(f["claim"]) - kw = _keyword_set(f["claim"]) - normalized.append((f, nc, kw)) + # Schritt 2: Haiku-Clustering (in Batches von max 80) + all_clusters = [] + batch_size = 80 + for i in range(0, len(candidates), batch_size): + batch = candidates[i:i + batch_size] + clusters = await _haiku_find_duplicate_clusters(batch, incident_title) + all_clusters.extend(clusters) - # Paarweiser Vergleich: nur die letzten 50 Fakten gegen alle pruefen + if not all_clusters: + logger.info("QC Fakten: Haiku fand keine Duplikate") + return 0 + + # Schritt 3: Pro Cluster besten behalten, Rest loeschen + facts_by_id = {f["id"]: f for f in all_facts} ids_to_delete = set() - checked_pairs = set() - recent = normalized[:50] - for i, (fact_a, norm_a, kw_a) in enumerate(recent): - if fact_a["id"] in ids_to_delete: + for cluster_ids in all_clusters: + valid_ids = [cid for cid in cluster_ids if cid in facts_by_id] + if len(valid_ids) <= 1: continue - for j, (fact_b, norm_b, kw_b) in enumerate(normalized): - if i >= j or fact_b["id"] == fact_a["id"] or fact_b["id"] in ids_to_delete: - continue - pair_key = (min(fact_a["id"], fact_b["id"]), max(fact_a["id"], fact_b["id"])) - if pair_key in checked_pairs: - continue - checked_pairs.add(pair_key) + cluster_facts = [facts_by_id[cid] for cid in valid_ids] + best = max(cluster_facts, key=lambda f: ( + STATUS_PRIORITY.get(f["status"], 0), + f.get("sources_count", 0), + f.get("checked_at", ""), + )) - if not norm_a or not norm_b: - continue - - # Laengenfilter - len_ratio = len(norm_a) / len(norm_b) if norm_b else 0 - if len_ratio > 2.5 or len_ratio < 0.4: - continue - - seq_ratio = SequenceMatcher(None, norm_a, norm_b).ratio() - kw_union = kw_a | kw_b - jaccard = len(kw_a & kw_b) / len(kw_union) if kw_union else 0.0 - combined = 0.7 * seq_ratio + 0.3 * jaccard - - if combined >= 0.80: # Hoher Threshold fuer Duplikaterkennung - # Behalte den mit hoeherem Status-Prio + mehr Quellen - score_a = (STATUS_PRIORITY.get(fact_a["status"], 0), fact_a.get("sources_count", 0)) - score_b = (STATUS_PRIORITY.get(fact_b["status"], 0), fact_b.get("sources_count", 0)) - - loser_id = fact_b["id"] if score_a >= score_b else fact_a["id"] - winner_id = fact_a["id"] if score_a >= score_b else fact_b["id"] - ids_to_delete.add(loser_id) + for fact in cluster_facts: + if fact["id"] != best["id"]: + ids_to_delete.add(fact["id"]) logger.info( - "QC Duplikat: ID %d entfernt (Score %.2f), behalte ID %d", - loser_id, combined, winner_id, + "QC Duplikat: ID %d entfernt, behalte ID %d ('%s')", + fact["id"], best["id"], best["claim"][:60], ) if ids_to_delete: @@ -130,11 +209,45 @@ async def check_fact_duplicates(db, incident_id: int) -> int: return len(ids_to_delete) -async def check_location_categories(db, incident_id: int) -> int: - """Prueft und korrigiert falsch kategorisierte Karten-Locations. +# --------------------------------------------------------------------------- +# 2. Karten-Location-Kategorien +# --------------------------------------------------------------------------- - Locations die als 'target' markiert sind aber offensichtlich keine - Angriffsziele im Konfliktkontext darstellen, werden korrigiert. +_LOCATION_PROMPT = """\ +Du bist ein Geopolitik-Experte fuer einen OSINT-Monitor. + +LAGE: {incident_title} +BESCHREIBUNG: {incident_desc} + +Unten stehen Orte, die auf der Karte als "target" (Angriffsziel) markiert sind. +Pruefe fuer jeden Ort, ob die Kategorie "target" korrekt ist. + +KATEGORIEN: +- target: Ort wurde tatsaechlich militaerisch angegriffen oder bombardiert +- actor: Ort gehoert zu einer Konfliktpartei (z.B. Hauptstadt des Angreifers) +- response: Ort reagiert auf den Konflikt (z.B. diplomatische Reaktion, Sanktionen) +- mentioned: Ort wird nur im Kontext erwaehnt (z.B. wirtschaftliche Auswirkungen) + +REGELN: +- Nur Orte die TATSAECHLICH physisch angegriffen/bombardiert wurden = "target" +- Hauptstaedte von Angreiferlaendern (z.B. Washington DC) = "actor" +- Laender die nur wirtschaftlich betroffen sind (z.B. steigende Oelpreise) = "mentioned" +- Laender die diplomatisch reagieren = "response" +- Im Zweifel: "mentioned" + +Antworte als JSON-Array mit Korrekturen. Nur Eintraege die GEAENDERT werden muessen: +[{{"id": 123, "category": "mentioned"}}, {{"id": 456, "category": "actor"}}] + +Wenn alle Kategorien korrekt sind: antworte mit [] + +ORTE (aktuell alle als "target" markiert): +{locations_text}""" + + +async def check_location_categories( + db, incident_id: int, incident_title: str, incident_desc: str +) -> int: + """Prueft Karten-Location-Kategorien via Haiku. Returns: Anzahl korrigierter Eintraege. """ @@ -148,39 +261,80 @@ async def check_location_categories(db, incident_id: int) -> int: if not targets: return 0 - fixes = [] - + # Dedupliziere nach location_name fuer den Prompt (spart Tokens) + unique_names = {} + ids_by_name = {} for loc in targets: - name_lower = loc["location_name"].lower().strip() + name = loc["location_name"] + if name not in unique_names: + unique_names[name] = loc + ids_by_name[name] = [] + ids_by_name[name].append(loc["id"]) - if name_lower in _NON_TARGET_LOCATIONS: - fixes.append((loc["id"], "mentioned")) - elif name_lower in _ACTOR_NOT_TARGET: - lat, lon = loc["latitude"], loc["longitude"] - # USA mit DC-Koordinaten -> actor - if name_lower in ("usa", "united states", "us", "vereinigte staaten"): - if 37.0 <= lat <= 41.0 and -79.0 <= lon <= -74.0: - fixes.append((loc["id"], "actor")) - # Deutschland mit Berlin-Koordinaten -> response - elif name_lower in ("deutschland", "germany"): - if 50.0 <= lat <= 54.0 and 10.0 <= lon <= 15.0: - fixes.append((loc["id"], "response")) - # Frankreich mit Paris-Koordinaten -> response - elif name_lower in ("frankreich", "france"): - if 45.0 <= lat <= 50.0 and 1.0 <= lon <= 4.0: - fixes.append((loc["id"], "response")) - # UK mit London-Koordinaten -> response - elif name_lower in ("grossbritannien", "united kingdom", "uk"): - if 50.0 <= lat <= 56.0 and -3.0 <= lon <= 1.0: - fixes.append((loc["id"], "response")) + locations_text = "\n".join( + f'ID={loc["id"]} | {loc["location_name"]} ({loc["latitude"]:.2f}, {loc["longitude"]:.2f})' + for loc in unique_names.values() + ) + prompt = _LOCATION_PROMPT.format( + incident_title=incident_title, + incident_desc=incident_desc[:500] if incident_desc else "(keine Beschreibung)", + locations_text=locations_text, + ) + + fixes = [] + try: + result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + data = json.loads(result) + if isinstance(data, list): + fixes = data + except json.JSONDecodeError: + match = re.search(r'\[.*\]', result, re.DOTALL) + if match: + try: + data = json.loads(match.group()) + if isinstance(data, list): + fixes = data + except json.JSONDecodeError: + pass + except Exception as e: + logger.warning("Haiku Location-Check fehlgeschlagen: %s", e) + return 0 + + if not fixes: + logger.info("QC Locations: Haiku fand keine falschen Kategorien") + return 0 + + # Korrekturen anwenden (auch auf alle IDs mit gleichem Namen) total_fixed = 0 - for loc_id, new_category in fixes: + representative_ids = {loc["id"]: name for name, loc in unique_names.items()} + + for fix in fixes: + fix_id = fix.get("id") + new_cat = fix.get("category") + if not fix_id or not new_cat: + continue + if new_cat not in ("target", "actor", "response", "mentioned"): + continue + + # Finde den location_name fuer diese ID + loc_name = representative_ids.get(fix_id) + if not loc_name: + continue + + # Korrigiere ALLE Eintraege mit diesem Namen + all_ids = ids_by_name.get(loc_name, [fix_id]) + placeholders = ",".join("?" * len(all_ids)) await db.execute( - "UPDATE article_locations SET category = ? WHERE id = ?", - (new_category, loc_id), + f"UPDATE article_locations SET category = ? " + f"WHERE id IN ({placeholders}) AND category = 'target'", + [new_cat] + all_ids, + ) + total_fixed += len(all_ids) + logger.info( + "QC Location: '%s' (%d Eintraege): target -> %s", + loc_name, len(all_ids), new_cat, ) - total_fixed += 1 if total_fixed > 0: logger.info( @@ -191,14 +345,32 @@ async def check_location_categories(db, incident_id: int) -> int: return total_fixed +# --------------------------------------------------------------------------- +# 3. Hauptfunktion +# --------------------------------------------------------------------------- + async def run_post_refresh_qc(db, incident_id: int) -> dict: - """Fuehrt den kompletten Post-Refresh Quality Check durch. + """Fuehrt den kompletten Post-Refresh Quality Check via Haiku durch. Returns: Dict mit Ergebnissen {facts_removed, locations_fixed}. """ try: - facts_removed = await check_fact_duplicates(db, incident_id) - locations_fixed = await check_location_categories(db, incident_id) + # Lage-Titel und Beschreibung laden + cursor = await db.execute( + "SELECT title, description FROM incidents WHERE id = ?", + (incident_id,), + ) + row = await cursor.fetchone() + if not row: + return {"facts_removed": 0, "locations_fixed": 0} + + incident_title = row["title"] or "" + incident_desc = row["description"] or "" + + facts_removed = await check_fact_duplicates(db, incident_id, incident_title) + locations_fixed = await check_location_categories( + db, incident_id, incident_title, incident_desc + ) if facts_removed > 0 or locations_fixed > 0: await db.commit()