"""FIMI-Matcher: gleicht Monitor-Artikel gegen den importierten Falschbehauptungs-Bestand (fimi_claims, EUvsDisinfo) ab. Zweistufig, weil Embedding-Aehnlichkeit nur THEMENNAEHE misst, nicht HALTUNG: ein Artikel, der Russlands Angriff einen "Angriffskrieg" nennt, liegt im Embedding-Raum dicht an der Falschbehauptung "Russland wurde zum Angriff gezwungen", sagt aber das Gegenteil. Reine Embeddings wuerden also neutrale und sogar widerlegende Berichterstattung als Treffer markieren. Stufe 1 (Embedding-Vorfilter, billig): findet thematisch nahe Kandidaten. Die Claim-Embeddings liegen als numpy-Matrix im RAM (~30 MB), ein Match ist eine Matrixmultiplikation (Kosinus == Skalarprodukt, da L2-normalisiert). Stufe 2 (LLM-Verifikation, praezise): ein Haiku-Call pro Kandidaten-Artikel entscheidet, ob der Artikel die Behauptung tatsaechlich VERBREITET (zustimmend als Tatsache aufstellt) oder nur darueber berichtet / sie widerlegt. Nur bestaetigte Verbreitungen werden gespeichert. Provenienz-Leitplanke: gespeichert wird nur eine Verknuepfung Artikel -> benannter, pruefbarer EUvsDisinfo-Case plus das woertliche Zitat aus dem Artikel. Der Monitor wertet nie selbst. """ from __future__ import annotations import asyncio import json import logging import os import threading import aiosqlite import numpy as np from services.embeddings import encode_batch from agents.claude_client import call_claude, ClaudeCliError from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.fimi_matcher") EMBED_DIM = 384 # Stufe 1: Vorfilter EMBED_FLOOR = 0.55 # untere Grenze, ab der ein Kandidat ueberhaupt entsteht PREFILTER_THRESHOLD = 0.65 # ab hier geht ein Kandidat in die LLM-Verifikation TOP_K = 5 # max. Kandidaten-Claims pro Artikel CONTENT_EXCERPT_CHARS = 1500 # Stufe 2: LLM-Verifikation VERIFY_ENABLED = os.environ.get("FIMI_VERIFY_ENABLED", "true").lower() != "false" VERIFY_CONCURRENCY = int(os.environ.get("FIMI_VERIFY_CONCURRENCY", "4")) VERIFY_CONTENT_CHARS = 2200 VERIFY_TIMEOUT = 90 # Singleton-Matrix der Claim-Embeddings _ids: np.ndarray | None = None # (N,) int64 -> fimi_claims.id _matrix: np.ndarray | None = None # (N, 384) float32 _lock = threading.Lock() # ────────────────────────────────────────────────────────────────── # Stufe 1: Embedding-Vorfilter # ────────────────────────────────────────────────────────────────── async def ensure_matrix(db: aiosqlite.Connection, force: bool = False) -> int: """Laedt die Claim-Embeddings einmalig in eine numpy-Matrix. Idempotent.""" global _ids, _matrix if _matrix is not None and not force: return int(_matrix.shape[0]) cursor = await db.execute( "SELECT id, embedding FROM fimi_claims WHERE embedding IS NOT NULL" ) rows = await cursor.fetchall() ids: list[int] = [] vecs: list[np.ndarray] = [] for r in rows: v = np.frombuffer(r["embedding"], dtype=np.float32) if v.size != EMBED_DIM: continue ids.append(r["id"]) vecs.append(v) with _lock: if vecs: _ids = np.asarray(ids, dtype=np.int64) _matrix = np.vstack(vecs).astype(np.float32, copy=False) else: _ids = np.empty((0,), dtype=np.int64) _matrix = np.empty((0, EMBED_DIM), dtype=np.float32) logger.info("FIMI-Matcher: %d Claim-Embeddings geladen", len(ids)) return len(ids) def is_ready() -> bool: return _matrix is not None and _matrix.shape[0] > 0 def _build_query_text(headline: str | None, content: str | None) -> str: parts = [] if headline: parts.append(headline.strip()) if content: excerpt = content.strip()[:CONTENT_EXCERPT_CHARS] if excerpt: parts.append(excerpt) return " ".join(parts).strip() async def match_query_texts( texts: list[str], threshold: float = EMBED_FLOOR, top_k: int = TOP_K, ) -> list[list[tuple[int, float]]]: """Stufe 1: matcht Query-Texte gegen die Claim-Matrix (Embedding-Kosinus). Returns: Liste gleicher Laenge wie texts, je eine Liste von (claim_id, score), absteigend sortiert, nur Treffer >= threshold. """ results: list[list[tuple[int, float]]] = [[] for _ in texts] if _matrix is None or _matrix.shape[0] == 0: return results valid_idx = [i for i, t in enumerate(texts) if t and t.strip()] if not valid_idx: return results blobs = await encode_batch([texts[i] for i in valid_idx]) if len(blobs) != len(valid_idx): logger.warning("FIMI-Matcher: encode_batch-Laenge passt nicht, skip") return results qm = np.vstack([np.frombuffer(b, dtype=np.float32) for b in blobs]) # (V, 384) scores = qm @ _matrix.T # (V, N) — Kosinus, da L2-normalisiert for row, orig_i in enumerate(valid_idx): s = scores[row] if top_k < s.size: cand = np.argpartition(s, -top_k)[-top_k:] else: cand = np.arange(s.size) cand = cand[np.argsort(s[cand])[::-1]] hits = [(int(_ids[j]), float(s[j])) for j in cand if s[j] >= threshold] results[orig_i] = hits return results # ────────────────────────────────────────────────────────────────── # Stufe 2: LLM-Verifikation # ────────────────────────────────────────────────────────────────── _VERIFY_PROMPT = """Du pruefst, ob ein Nachrichtenartikel bekannte Falschbehauptungen VERBREITET. Unterscheide streng: - VERBREITET (spreads=true): Der Artikel stellt die Behauptung als Tatsache auf, uebernimmt sie zustimmend, gibt sie unwidersprochen als wahr wieder oder legt sie dem Leser als zutreffend nahe. - VERBREITET NICHT (spreads=false): Der Artikel berichtet nur neutral darueber, widerlegt die Behauptung, ordnet sie als Desinformation ein, zitiert sie distanziert/kritisch, oder sagt inhaltlich das Gegenteil. Beispiel: Ein Artikel, der Russlands Angriff einen "Angriffskrieg" nennt, VERBREITET NICHT die Behauptung "Russland wurde zum Angriff gezwungen" — er sagt das Gegenteil. Im Zweifel spreads=false. Nur eindeutige Verbreitung zaehlt. ARTIKEL Titel: {headline} Text: {content} ZU PRUEFENDE BEHAUPTUNGEN {claims} Antworte AUSSCHLIESSLICH als JSON: {{"results": [{{"claim_id": , "spreads": , "passage": ""}}]}}""" async def _verify_article( article, candidate_claims: list[tuple[int, float, str]] ) -> list[tuple[int, float, str]]: """Ein Haiku-Call: welche Kandidaten-Behauptungen verbreitet der Artikel? candidate_claims: Liste (claim_id, embed_score, claim_text). Returns: bestaetigte (claim_id, embed_score, passage) fuer spreads=true. Wirft bei CLI-/Parse-Fehler, damit der Aufrufer den Artikel nicht als geprueft markiert (Retry beim naechsten Refresh). """ headline = (article["headline_de"] or article["headline"] or "").strip() content = ( (article["content_de"] if "content_de" in article.keys() else None) or (article["content_original"] if "content_original" in article.keys() else None) or "" ).strip()[:VERIFY_CONTENT_CHARS] if not content: # Ohne Fliesstext laesst sich die Haltung nicht serioes bestimmen. return [] claim_by_id = {cid: text for cid, _, text in candidate_claims} claims_block = "\n".join(f"[{cid}] {text}" for cid, _, text in candidate_claims) prompt = _VERIFY_PROMPT.format(headline=headline, content=content, claims=claims_block) text, _usage = await call_claude( prompt, tools=None, model=CLAUDE_MODEL_FAST, timeout=VERIFY_TIMEOUT ) raw = (text or "").strip() # Defensive: evtl. Markdown-Fences entfernen if raw.startswith("```"): raw = raw.strip("`") nl = raw.find("\n") if nl != -1: raw = raw[nl + 1:] start, end = raw.find("{"), raw.rfind("}") if start == -1 or end == -1: raise ValueError(f"Keine JSON-Antwort vom Verifizierer: {raw[:120]!r}") data = json.loads(raw[start:end + 1]) embed_score = {cid: sc for cid, sc, _ in candidate_claims} confirmed: list[tuple[int, float, str]] = [] for item in data.get("results", []): try: cid = int(item.get("claim_id")) except (TypeError, ValueError): continue if cid not in claim_by_id: continue if item.get("spreads") is True: passage = (item.get("passage") or "").strip()[:500] confirmed.append((cid, embed_score.get(cid, 0.0), passage)) return confirmed # ────────────────────────────────────────────────────────────────── # Orchestrierung: matchen + speichern # ────────────────────────────────────────────────────────────────── async def _load_claim_texts(db, claim_ids: set[int]) -> dict[int, str]: if not claim_ids: return {} qs = ",".join("?" for _ in claim_ids) cursor = await db.execute( f"SELECT id, text FROM fimi_claims WHERE id IN ({qs})", tuple(claim_ids) ) return {r["id"]: r["text"] for r in await cursor.fetchall()} async def match_and_store_articles( db: aiosqlite.Connection, articles: list, prefilter_threshold: float = PREFILTER_THRESHOLD, top_k: int = TOP_K, verify: bool | None = None, mark_checked: bool = True, ) -> dict: """Zweistufiger Match + Speicherung fuer eine Liste Artikel-Rows. articles: Rows mit id, headline, headline_de, content_original, content_de und (optional) tenant_id. """ if verify is None: verify = VERIFY_ENABLED await ensure_matrix(db) if not articles: return {"articles": 0, "candidates": 0, "articles_with_match": 0, "stored": 0, "errors": 0} # Stufe 1: Embedding-Vorfilter texts = [ _build_query_text( a["headline_de"] or a["headline"], (a["content_de"] if "content_de" in a.keys() else None) or (a["content_original"] if "content_original" in a.keys() else None), ) for a in articles ] prefiltered = await match_query_texts(texts, threshold=EMBED_FLOOR, top_k=top_k) # Claim-Texte fuer alle starken Kandidaten laden strong_per_article: list[list[tuple[int, float]]] = [ [(cid, sc) for cid, sc in cands if sc >= prefilter_threshold] for cands in prefiltered ] need_ids: set[int] = {cid for lst in strong_per_article for cid, _ in lst} claim_texts = await _load_claim_texts(db, need_ids) # Stufe 2: Verifikation (parallel, begrenzt) — nur Artikel mit starken Kandidaten sem = asyncio.Semaphore(max(1, VERIFY_CONCURRENCY)) candidates_total = sum(len(lst) for lst in strong_per_article) async def _process(idx: int): a = articles[idx] strong = strong_per_article[idx] if not strong: # geprueft, aber kein starker Kandidat -> nichts zu verifizieren return idx, [], False cand = [(cid, sc, claim_texts.get(cid, "")) for cid, sc in strong if claim_texts.get(cid)] if not cand: return idx, [], False if not verify: return idx, [(cid, sc, None) for cid, sc, _ in cand], False async with sem: try: confirmed = await _verify_article(a, cand) return idx, confirmed, False except (ClaudeCliError, ValueError, json.JSONDecodeError, TimeoutError) as e: logger.warning("FIMI-Verifikation article_id=%s fehlgeschlagen: %s", a["id"], e) return idx, None, True # error -> nicht als checked markieren proc = await asyncio.gather(*[_process(i) for i in range(len(articles))]) # Speichern (sequenziell, eine DB-Connection) stored = 0 with_match = 0 errors = 0 for idx, confirmed, err in proc: a = articles[idx] if err: errors += 1 continue # Artikel NICHT als checked markieren -> Retry if confirmed: with_match += 1 tenant_id = a["tenant_id"] if "tenant_id" in a.keys() else None role = "verified" if verify else "match" for cid, sc, passage in confirmed: try: await db.execute( """INSERT INTO article_fimi_matches (article_id, fimi_claim_id, score, role, matched_text, tenant_id, matched_at) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", (a["id"], cid, round(sc, 4), role, passage, tenant_id), ) stored += 1 except aiosqlite.IntegrityError: await db.execute( """UPDATE article_fimi_matches SET score = MAX(COALESCE(score, 0), ?), role = ?, matched_text = COALESCE(?, matched_text) WHERE article_id = ? AND fimi_claim_id = ?""", (round(sc, 4), role, passage, a["id"], cid), ) if mark_checked: await db.execute( "UPDATE articles SET fimi_checked_at = CURRENT_TIMESTAMP WHERE id = ?", (a["id"],), ) await db.commit() logger.info( "FIMI-Matcher: %d Artikel, %d Kandidaten, %d verbreiten Falschbehauptungen, " "%d Links, %d Fehler", len(articles), candidates_total, with_match, stored, errors, ) return { "articles": len(articles), "candidates": candidates_total, "articles_with_match": with_match, "stored": stored, "errors": errors, } async def match_incident_articles( db: aiosqlite.Connection, incident_id: int, only_unchecked: bool = True, limit: int | None = None, verify: bool | None = None, ) -> dict: """Matcht (standardmaessig noch nicht gepruefte) Artikel einer Lage.""" q = ( "SELECT id, headline, headline_de, content_original, content_de, tenant_id " "FROM articles WHERE incident_id = ?" ) params: list = [incident_id] if only_unchecked: q += " AND fimi_checked_at IS NULL" q += " ORDER BY id" if limit: q += f" LIMIT {int(limit)}" cursor = await db.execute(q, params) articles = await cursor.fetchall() return await match_and_store_articles(db, articles, verify=verify)