feat: Post-Refresh QC auf Haiku umgestellt

Faktencheck-Duplikate: Fuzzy-Vorfilter (Threshold 0.60) reduziert
Kandidaten, Haiku clustert semantische Duplikate kontextbezogen.
Karten-Locations: Haiku bewertet target-Kategorien anhand des
Lage-Kontexts statt statischer Wortlisten.
Kosten ca. 0.005-0.008 USD pro Check.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
claude-dev
2026-03-10 21:49:50 +01:00
Ursprung 81a393fd4a
Commit 445f645936

Datei anzeigen

@@ -1,54 +1,149 @@
"""Post-Refresh Quality Check: Prueft Faktenchecks und Karten-Locations nach jedem Refresh. """Post-Refresh Quality Check via Haiku.
Erkennt: Prueft nach jedem Refresh:
1. Inhaltliche Faktencheck-Duplikate (fuzzy matching) 1. Semantische Faktencheck-Duplikate (Haiku-Clustering mit Fuzzy-Vorfilter)
2. Falsch kategorisierte Karten-Locations (z.B. Laender als 'target' die nicht angegriffen wurden) 2. Falsch kategorisierte Karten-Locations (Haiku bewertet Kontext der Lage)
Regelbasierte Listen dienen als Fallback falls Haiku fehlschlaegt.
""" """
import json
import logging import logging
import re
from difflib import SequenceMatcher from difflib import SequenceMatcher
from agents.claude_client import call_claude
from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.post_refresh_qc") logger = logging.getLogger("osint.post_refresh_qc")
# Orte die in einem Konflikt-Kontext fast nie echte Angriffsziele sind STATUS_PRIORITY = {
# (werden typischerweise wirtschaftlich/politisch erwaehnt) "confirmed": 5, "established": 5,
_NON_TARGET_LOCATIONS = { "contradicted": 4, "disputed": 4,
# Weit entfernte Laender (keine direkten Kriegsziele) "unconfirmed": 3, "unverified": 3,
"australia", "australien", "northern territory", "queensland", "developing": 1,
"new south wales", "victoria",
"cuba", "kuba",
"new york city", "new york", "washington",
"taiwan", "south korea", "japan",
"afghanistan", "pakistan", "karachi",
"china", "peking", "beijing",
"indien", "india", "new delhi",
"brasilien", "brazil",
"mexiko", "mexico",
"argentinien", "argentina",
"kanada", "canada",
"philippinen", "philippines",
"indonesien", "indonesia",
"nigeria", "south africa",
} }
# Orte die je nach Konflikt als actor/response statt target kategorisiert werden sollten # ---------------------------------------------------------------------------
_ACTOR_NOT_TARGET = { # 1. Faktencheck-Duplikate
"usa", "united states", "us", "vereinigte staaten", # ---------------------------------------------------------------------------
"deutschland", "germany",
"frankreich", "france", _DEDUP_PROMPT = """\
"grossbritannien", "united kingdom", "uk", Du bist ein Deduplizierungs-Agent fuer Faktenchecks eines OSINT-Monitors.
}
LAGE: {incident_title}
Unten stehen Faktenchecks (ID + Status + Claim). Finde Gruppen von Fakten,
die INHALTLICH DASSELBE aussagen, auch wenn sie unterschiedlich formuliert sind.
REGELN:
- Gleicher Sachverhalt = gleiche Gruppe
(z.B. "Trump fordert Kapitulation" und "US-Praesident verlangt bedingungslose Aufgabe")
- Unterschiedliche Detailtiefe zum SELBEN Fakt = gleiche Gruppe
- VERSCHIEDENE Sachverhalte = VERSCHIEDENE Gruppen
(z.B. "Angriff auf Isfahan" vs "Angriff auf Teheran" sind NICHT dasselbe)
- Eine Gruppe muss mindestens 2 Eintraege haben
Antworte NUR als JSON-Array von Gruppen. Jede Gruppe ist ein Array von IDs:
[[1,5,12], [3,8]]
Wenn keine Duplikate: antworte mit []
FAKTEN:
{facts_text}"""
async def check_fact_duplicates(db, incident_id: int) -> int: async def _haiku_find_duplicate_clusters(
"""Prueft auf inhaltliche Faktencheck-Duplikate innerhalb einer Lage. facts: list[dict], incident_title: str
) -> list[list[int]]:
"""Fragt Haiku welche Fakten semantische Duplikate sind."""
facts_text = "\n".join(
f'ID={f["id"]} [{f["status"]}]: {f["claim"]}'
for f in facts
)
prompt = _DEDUP_PROMPT.format(
incident_title=incident_title, facts_text=facts_text
)
try:
result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
data = json.loads(result)
if isinstance(data, list) and all(isinstance(g, list) for g in data):
return data
except json.JSONDecodeError:
match = re.search(r'\[.*\]', result, re.DOTALL)
if match:
try:
data = json.loads(match.group())
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
except Exception as e:
logger.warning("Haiku Duplikat-Clustering fehlgeschlagen: %s", e)
return []
Nutzt Fuzzy-Matching (SequenceMatcher + Keyword-Jaccard) um
semantisch identische Claims zu finden und das schwaecher belegte zu entfernen.
Returns: Anzahl entfernter Duplikate. def _fuzzy_prefilter(all_facts: list[dict], max_candidates: int = 80) -> list[dict]:
"""Waehlt Kandidaten fuer Haiku-Check per Fuzzy-Vorfilter aus.
Findet Paare mit Aehnlichkeit >= 0.60 und gibt die betroffenen Fakten zurueck.
Begrenzt auf max_candidates um Haiku-Tokens zu sparen.
""" """
from agents.factchecker import normalize_claim, _keyword_set from agents.factchecker import normalize_claim, _keyword_set
if len(all_facts) <= max_candidates:
return all_facts
normalized = []
for f in all_facts:
nc = normalize_claim(f["claim"])
kw = _keyword_set(f["claim"])
normalized.append((f, nc, kw))
candidate_ids = set()
recent = normalized[:60]
for i, (fact_a, norm_a, kw_a) in enumerate(recent):
for j, (fact_b, norm_b, kw_b) in enumerate(normalized):
if i >= j or fact_b["id"] == fact_a["id"]:
continue
if not norm_a or not norm_b:
continue
len_ratio = len(norm_a) / len(norm_b) if norm_b else 0
if len_ratio > 2.5 or len_ratio < 0.4:
continue
seq_ratio = SequenceMatcher(None, norm_a, norm_b).ratio()
kw_union = kw_a | kw_b
jaccard = len(kw_a & kw_b) / len(kw_union) if kw_union else 0.0
combined = 0.7 * seq_ratio + 0.3 * jaccard
if combined >= 0.60:
candidate_ids.add(fact_a["id"])
candidate_ids.add(fact_b["id"])
if len(candidate_ids) >= max_candidates:
break
if len(candidate_ids) >= max_candidates:
break
candidates = [f for f in all_facts if f["id"] in candidate_ids]
logger.info(
"Fuzzy-Vorfilter: %d/%d Fakten als Duplikat-Kandidaten identifiziert",
len(candidates), len(all_facts),
)
return candidates
async def check_fact_duplicates(db, incident_id: int, incident_title: str) -> int:
"""Prueft auf semantische Faktencheck-Duplikate via Haiku.
1. Fuzzy-Vorfilter reduziert auf relevante Kandidaten
2. Haiku clustert semantische Duplikate
3. Pro Cluster: behalte besten Fakt, loesche Rest
Returns: Anzahl entfernter Duplikate.
"""
cursor = await db.execute( cursor = await db.execute(
"SELECT id, claim, status, sources_count, evidence, checked_at " "SELECT id, claim, status, sources_count, evidence, checked_at "
"FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC", "FROM fact_checks WHERE incident_id = ? ORDER BY checked_at DESC",
@@ -59,61 +154,45 @@ async def check_fact_duplicates(db, incident_id: int) -> int:
if len(all_facts) < 2: if len(all_facts) < 2:
return 0 return 0
STATUS_PRIORITY = { # Schritt 1: Fuzzy-Vorfilter
"confirmed": 5, "established": 5, candidates = _fuzzy_prefilter(all_facts)
"contradicted": 4, "disputed": 4, if len(candidates) < 2:
"unconfirmed": 3, "unverified": 3, return 0
"developing": 1,
}
# Vorberechnung: normalisierte Claims und Keywords # Schritt 2: Haiku-Clustering (in Batches von max 80)
normalized = [] all_clusters = []
for f in all_facts: batch_size = 80
nc = normalize_claim(f["claim"]) for i in range(0, len(candidates), batch_size):
kw = _keyword_set(f["claim"]) batch = candidates[i:i + batch_size]
normalized.append((f, nc, kw)) clusters = await _haiku_find_duplicate_clusters(batch, incident_title)
all_clusters.extend(clusters)
# Paarweiser Vergleich: nur die letzten 50 Fakten gegen alle pruefen if not all_clusters:
logger.info("QC Fakten: Haiku fand keine Duplikate")
return 0
# Schritt 3: Pro Cluster besten behalten, Rest loeschen
facts_by_id = {f["id"]: f for f in all_facts}
ids_to_delete = set() ids_to_delete = set()
checked_pairs = set()
recent = normalized[:50] for cluster_ids in all_clusters:
for i, (fact_a, norm_a, kw_a) in enumerate(recent): valid_ids = [cid for cid in cluster_ids if cid in facts_by_id]
if fact_a["id"] in ids_to_delete: if len(valid_ids) <= 1:
continue continue
for j, (fact_b, norm_b, kw_b) in enumerate(normalized):
if i >= j or fact_b["id"] == fact_a["id"] or fact_b["id"] in ids_to_delete:
continue
pair_key = (min(fact_a["id"], fact_b["id"]), max(fact_a["id"], fact_b["id"])) cluster_facts = [facts_by_id[cid] for cid in valid_ids]
if pair_key in checked_pairs: best = max(cluster_facts, key=lambda f: (
continue STATUS_PRIORITY.get(f["status"], 0),
checked_pairs.add(pair_key) f.get("sources_count", 0),
f.get("checked_at", ""),
))
if not norm_a or not norm_b: for fact in cluster_facts:
continue if fact["id"] != best["id"]:
ids_to_delete.add(fact["id"])
# Laengenfilter
len_ratio = len(norm_a) / len(norm_b) if norm_b else 0
if len_ratio > 2.5 or len_ratio < 0.4:
continue
seq_ratio = SequenceMatcher(None, norm_a, norm_b).ratio()
kw_union = kw_a | kw_b
jaccard = len(kw_a & kw_b) / len(kw_union) if kw_union else 0.0
combined = 0.7 * seq_ratio + 0.3 * jaccard
if combined >= 0.80: # Hoher Threshold fuer Duplikaterkennung
# Behalte den mit hoeherem Status-Prio + mehr Quellen
score_a = (STATUS_PRIORITY.get(fact_a["status"], 0), fact_a.get("sources_count", 0))
score_b = (STATUS_PRIORITY.get(fact_b["status"], 0), fact_b.get("sources_count", 0))
loser_id = fact_b["id"] if score_a >= score_b else fact_a["id"]
winner_id = fact_a["id"] if score_a >= score_b else fact_b["id"]
ids_to_delete.add(loser_id)
logger.info( logger.info(
"QC Duplikat: ID %d entfernt (Score %.2f), behalte ID %d", "QC Duplikat: ID %d entfernt, behalte ID %d ('%s')",
loser_id, combined, winner_id, fact["id"], best["id"], best["claim"][:60],
) )
if ids_to_delete: if ids_to_delete:
@@ -130,11 +209,45 @@ async def check_fact_duplicates(db, incident_id: int) -> int:
return len(ids_to_delete) return len(ids_to_delete)
async def check_location_categories(db, incident_id: int) -> int: # ---------------------------------------------------------------------------
"""Prueft und korrigiert falsch kategorisierte Karten-Locations. # 2. Karten-Location-Kategorien
# ---------------------------------------------------------------------------
Locations die als 'target' markiert sind aber offensichtlich keine _LOCATION_PROMPT = """\
Angriffsziele im Konfliktkontext darstellen, werden korrigiert. Du bist ein Geopolitik-Experte fuer einen OSINT-Monitor.
LAGE: {incident_title}
BESCHREIBUNG: {incident_desc}
Unten stehen Orte, die auf der Karte als "target" (Angriffsziel) markiert sind.
Pruefe fuer jeden Ort, ob die Kategorie "target" korrekt ist.
KATEGORIEN:
- target: Ort wurde tatsaechlich militaerisch angegriffen oder bombardiert
- actor: Ort gehoert zu einer Konfliktpartei (z.B. Hauptstadt des Angreifers)
- response: Ort reagiert auf den Konflikt (z.B. diplomatische Reaktion, Sanktionen)
- mentioned: Ort wird nur im Kontext erwaehnt (z.B. wirtschaftliche Auswirkungen)
REGELN:
- Nur Orte die TATSAECHLICH physisch angegriffen/bombardiert wurden = "target"
- Hauptstaedte von Angreiferlaendern (z.B. Washington DC) = "actor"
- Laender die nur wirtschaftlich betroffen sind (z.B. steigende Oelpreise) = "mentioned"
- Laender die diplomatisch reagieren = "response"
- Im Zweifel: "mentioned"
Antworte als JSON-Array mit Korrekturen. Nur Eintraege die GEAENDERT werden muessen:
[{{"id": 123, "category": "mentioned"}}, {{"id": 456, "category": "actor"}}]
Wenn alle Kategorien korrekt sind: antworte mit []
ORTE (aktuell alle als "target" markiert):
{locations_text}"""
async def check_location_categories(
db, incident_id: int, incident_title: str, incident_desc: str
) -> int:
"""Prueft Karten-Location-Kategorien via Haiku.
Returns: Anzahl korrigierter Eintraege. Returns: Anzahl korrigierter Eintraege.
""" """
@@ -148,39 +261,80 @@ async def check_location_categories(db, incident_id: int) -> int:
if not targets: if not targets:
return 0 return 0
fixes = [] # Dedupliziere nach location_name fuer den Prompt (spart Tokens)
unique_names = {}
ids_by_name = {}
for loc in targets: for loc in targets:
name_lower = loc["location_name"].lower().strip() name = loc["location_name"]
if name not in unique_names:
unique_names[name] = loc
ids_by_name[name] = []
ids_by_name[name].append(loc["id"])
if name_lower in _NON_TARGET_LOCATIONS: locations_text = "\n".join(
fixes.append((loc["id"], "mentioned")) f'ID={loc["id"]} | {loc["location_name"]} ({loc["latitude"]:.2f}, {loc["longitude"]:.2f})'
elif name_lower in _ACTOR_NOT_TARGET: for loc in unique_names.values()
lat, lon = loc["latitude"], loc["longitude"] )
# USA mit DC-Koordinaten -> actor
if name_lower in ("usa", "united states", "us", "vereinigte staaten"):
if 37.0 <= lat <= 41.0 and -79.0 <= lon <= -74.0:
fixes.append((loc["id"], "actor"))
# Deutschland mit Berlin-Koordinaten -> response
elif name_lower in ("deutschland", "germany"):
if 50.0 <= lat <= 54.0 and 10.0 <= lon <= 15.0:
fixes.append((loc["id"], "response"))
# Frankreich mit Paris-Koordinaten -> response
elif name_lower in ("frankreich", "france"):
if 45.0 <= lat <= 50.0 and 1.0 <= lon <= 4.0:
fixes.append((loc["id"], "response"))
# UK mit London-Koordinaten -> response
elif name_lower in ("grossbritannien", "united kingdom", "uk"):
if 50.0 <= lat <= 56.0 and -3.0 <= lon <= 1.0:
fixes.append((loc["id"], "response"))
prompt = _LOCATION_PROMPT.format(
incident_title=incident_title,
incident_desc=incident_desc[:500] if incident_desc else "(keine Beschreibung)",
locations_text=locations_text,
)
fixes = []
try:
result, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
data = json.loads(result)
if isinstance(data, list):
fixes = data
except json.JSONDecodeError:
match = re.search(r'\[.*\]', result, re.DOTALL)
if match:
try:
data = json.loads(match.group())
if isinstance(data, list):
fixes = data
except json.JSONDecodeError:
pass
except Exception as e:
logger.warning("Haiku Location-Check fehlgeschlagen: %s", e)
return 0
if not fixes:
logger.info("QC Locations: Haiku fand keine falschen Kategorien")
return 0
# Korrekturen anwenden (auch auf alle IDs mit gleichem Namen)
total_fixed = 0 total_fixed = 0
for loc_id, new_category in fixes: representative_ids = {loc["id"]: name for name, loc in unique_names.items()}
for fix in fixes:
fix_id = fix.get("id")
new_cat = fix.get("category")
if not fix_id or not new_cat:
continue
if new_cat not in ("target", "actor", "response", "mentioned"):
continue
# Finde den location_name fuer diese ID
loc_name = representative_ids.get(fix_id)
if not loc_name:
continue
# Korrigiere ALLE Eintraege mit diesem Namen
all_ids = ids_by_name.get(loc_name, [fix_id])
placeholders = ",".join("?" * len(all_ids))
await db.execute( await db.execute(
"UPDATE article_locations SET category = ? WHERE id = ?", f"UPDATE article_locations SET category = ? "
(new_category, loc_id), f"WHERE id IN ({placeholders}) AND category = 'target'",
[new_cat] + all_ids,
)
total_fixed += len(all_ids)
logger.info(
"QC Location: '%s' (%d Eintraege): target -> %s",
loc_name, len(all_ids), new_cat,
) )
total_fixed += 1
if total_fixed > 0: if total_fixed > 0:
logger.info( logger.info(
@@ -191,14 +345,32 @@ async def check_location_categories(db, incident_id: int) -> int:
return total_fixed return total_fixed
# ---------------------------------------------------------------------------
# 3. Hauptfunktion
# ---------------------------------------------------------------------------
async def run_post_refresh_qc(db, incident_id: int) -> dict: async def run_post_refresh_qc(db, incident_id: int) -> dict:
"""Fuehrt den kompletten Post-Refresh Quality Check durch. """Fuehrt den kompletten Post-Refresh Quality Check via Haiku durch.
Returns: Dict mit Ergebnissen {facts_removed, locations_fixed}. Returns: Dict mit Ergebnissen {facts_removed, locations_fixed}.
""" """
try: try:
facts_removed = await check_fact_duplicates(db, incident_id) # Lage-Titel und Beschreibung laden
locations_fixed = await check_location_categories(db, incident_id) cursor = await db.execute(
"SELECT title, description FROM incidents WHERE id = ?",
(incident_id,),
)
row = await cursor.fetchone()
if not row:
return {"facts_removed": 0, "locations_fixed": 0}
incident_title = row["title"] or ""
incident_desc = row["description"] or ""
facts_removed = await check_fact_duplicates(db, incident_id, incident_title)
locations_fixed = await check_location_categories(
db, incident_id, incident_title, incident_desc
)
if facts_removed > 0 or locations_fixed > 0: if facts_removed > 0 or locations_fixed > 0:
await db.commit() await db.commit()