diff --git a/requirements.txt b/requirements.txt index a3d6515..f994aa9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,10 @@ pdfplumber>=0.11 pytesseract>=0.3 pdf2image>=1.17 Pillow>=10.0 +# FIMI / Counter-Disinformation: Embedding-Match gegen EUvsDisinfo-Falschbehauptungen +# (services/embeddings.py, services/fimi_matcher.py). Modell-Cache wird mit Vigil +# geteilt (~/.cache/huggingface). Versionen wie Vigil-venv fuer Kompatibilitaet. +torch==2.12.0 +sentence-transformers==3.4.1 +transformers==4.57.6 +numpy==2.4.5 diff --git a/scripts/import_fimi_claims.py b/scripts/import_fimi_claims.py new file mode 100755 index 0000000..44ab07c --- /dev/null +++ b/scripts/import_fimi_claims.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""Einmal-/Sync-Import des EUvsDisinfo-Falschbehauptungsbestands in den Monitor. + +Kopiert die Claims (Text, Verdict, Widerlegung, Quell-Referenz, Embedding-BLOB) +aus der Vigil-Datenbank in die Monitor-Tabelle fimi_claims. Die Embeddings +werden als BLOB 1:1 uebernommen (384-dim float32, L2-normalisiert) und im +Monitor mit demselben Modell (paraphrase-multilingual-MiniLM-L12-v2) gematcht. + +Idempotent: UPSERT auf der stabilen Vigil-claim.id. Bestehende Treffer in +article_fimi_matches bleiben dadurch gueltig. + +Aufruf (Staging): + python scripts/import_fimi_claims.py \ + --vigil-db /home/claude-dev/vigil-data/vigil.db \ + --osint-db /home/claude-dev/AegisSight-Monitor-staging/data/osint.db +""" +from __future__ import annotations + +import argparse +import sqlite3 +import sys + +EUVSDISINFO_REPORT_BASE = "https://euvsdisinfo.eu/report/" + + +def case_url_from_source_ref(source_ref: str | None) -> str | None: + """Leitet die EUvsDisinfo-Case-URL aus 'euvsdisinfo:' ab.""" + if not source_ref: + return None + prefix = "euvsdisinfo:" + if source_ref.startswith(prefix): + slug = source_ref[len(prefix):].strip().strip("/") + if slug: + return f"{EUVSDISINFO_REPORT_BASE}{slug}/" + return None + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--vigil-db", required=True, help="Pfad zur Vigil-SQLite-DB (Quelle)") + ap.add_argument("--osint-db", required=True, help="Pfad zur Monitor-SQLite-DB (Ziel)") + ap.add_argument("--limit", type=int, default=0, help="Optional: nur N Claims importieren (Test)") + args = ap.parse_args() + + src = sqlite3.connect(args.vigil_db) + src.row_factory = sqlite3.Row + q = ( + "SELECT id, text, text_normalized, language, verdict, verdict_summary, " + "source_id, embedding, first_seen_at FROM claims WHERE embedding IS NOT NULL" + ) + if args.limit: + q += f" LIMIT {int(args.limit)}" + rows = src.execute(q).fetchall() + src.close() + print(f"Vigil: {len(rows)} Claims mit Embedding gelesen", flush=True) + + dst = sqlite3.connect(args.osint_db) + dst.execute("PRAGMA busy_timeout=10000") + + # Sicherstellen, dass die Zieltabelle existiert (falls Skript vor init_db laeuft) + dst.execute( + """CREATE TABLE IF NOT EXISTS fimi_claims ( + id INTEGER PRIMARY KEY, + text TEXT NOT NULL, + text_normalized TEXT, + language TEXT, + verdict TEXT NOT NULL DEFAULT 'false', + verdict_summary TEXT, + source_ref TEXT, + case_url TEXT, + embedding BLOB, + first_seen_at TIMESTAMP, + imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + )""" + ) + dst.execute("CREATE INDEX IF NOT EXISTS idx_fimi_claims_source_ref ON fimi_claims(source_ref)") + + inserted = 0 + with_url = 0 + for r in rows: + case_url = case_url_from_source_ref(r["source_id"]) + if case_url: + with_url += 1 + dst.execute( + """INSERT INTO fimi_claims + (id, text, text_normalized, language, verdict, verdict_summary, + source_ref, case_url, embedding, first_seen_at, imported_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(id) DO UPDATE SET + text=excluded.text, + text_normalized=excluded.text_normalized, + language=excluded.language, + verdict=excluded.verdict, + verdict_summary=excluded.verdict_summary, + source_ref=excluded.source_ref, + case_url=excluded.case_url, + embedding=excluded.embedding, + first_seen_at=excluded.first_seen_at, + imported_at=CURRENT_TIMESTAMP""", + ( + r["id"], r["text"], r["text_normalized"], r["language"], + r["verdict"] or "false", r["verdict_summary"], r["source_id"], + case_url, r["embedding"], r["first_seen_at"], + ), + ) + inserted += 1 + dst.commit() + total = dst.execute("SELECT COUNT(*) FROM fimi_claims").fetchone()[0] + dst.close() + print(f"Monitor: {inserted} Claims upserted ({with_url} mit Case-URL), " + f"fimi_claims enthaelt jetzt {total} Eintraege", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/database.py b/src/database.py index 9f3ae88..12b9eea 100644 --- a/src/database.py +++ b/src/database.py @@ -355,6 +355,41 @@ CREATE TABLE IF NOT EXISTS organization_settings ( updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(organization_id, key) ); + +-- FIMI / Counter-Disinformation: importierter Falschbehauptungs-Bestand +-- (EUvsDisinfo). Read-only Referenz, befuellt per scripts/import_fimi_claims.py. +-- Die id entspricht der Vigil-claim.id (stabil fuer Re-Sync via UPSERT). +CREATE TABLE IF NOT EXISTS fimi_claims ( + id INTEGER PRIMARY KEY, + text TEXT NOT NULL, + text_normalized TEXT, + language TEXT, + verdict TEXT NOT NULL DEFAULT 'false', + verdict_summary TEXT, + source_ref TEXT, + case_url TEXT, + embedding BLOB, + first_seen_at TIMESTAMP, + imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX IF NOT EXISTS idx_fimi_claims_source_ref ON fimi_claims(source_ref); + +-- FIMI: Treffer zwischen Monitor-Artikeln und Falschbehauptungen. +-- Bewusst KEIN harter FK auf fimi_claims, damit ein Claim-Re-Sync die +-- bestehenden Treffer nicht kaskadierend loescht. +CREATE TABLE IF NOT EXISTS article_fimi_matches ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + article_id INTEGER NOT NULL REFERENCES articles(id) ON DELETE CASCADE, + fimi_claim_id INTEGER NOT NULL, + score REAL NOT NULL, + role TEXT DEFAULT 'match', + matched_text TEXT, + matched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + tenant_id INTEGER REFERENCES organizations(id), + UNIQUE(article_id, fimi_claim_id) +); +CREATE INDEX IF NOT EXISTS idx_afm_article ON article_fimi_matches(article_id); +CREATE INDEX IF NOT EXISTS idx_afm_claim ON article_fimi_matches(fimi_claim_id); """ @@ -606,6 +641,14 @@ async def init_db(): await db.execute("ALTER TABLE articles ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") await db.commit() + # Migration: FIMI-Match-Marker fuer articles (wann zuletzt gegen den + # Falschbehauptungs-Bestand geprueft; verhindert Re-Encoding bereits + # gepruefter Artikel bei jedem Refresh) + if "fimi_checked_at" not in art_columns: + await db.execute("ALTER TABLE articles ADD COLUMN fimi_checked_at TIMESTAMP") + await db.commit() + logger.info("Migration: fimi_checked_at zu articles hinzugefuegt") + # Migration: tenant_id fuer fact_checks cursor = await db.execute("PRAGMA table_info(fact_checks)") fc_columns = [row[1] for row in await cursor.fetchall()] diff --git a/src/services/embeddings.py b/src/services/embeddings.py new file mode 100644 index 0000000..31729c3 --- /dev/null +++ b/src/services/embeddings.py @@ -0,0 +1,127 @@ +"""Embedding-Service für den Claim-Matcher. + +Lädt ein multilinguales SentenceTransformer-Modell als Singleton. +Erzeugt L2-normalisierte 384-dim Vektoren, sodass Kosinus-Ähnlichkeit +einem einfachen Skalarprodukt entspricht. +""" +from __future__ import annotations + +import asyncio +import logging +import threading +from typing import Iterable + +import numpy as np + +logger = logging.getLogger("osint.embeddings") + +MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" +EMBED_DIM = 384 +DTYPE = np.float32 + +# Threshold-Empfehlungen (empirisch aus Sanity-Tests): +# >= 0.85 -> sehr wahrscheinlich identische Behauptung +# >= 0.75 -> ähnliche Behauptung, dem User zur Auswahl vorschlagen +# < 0.60 -> wahrscheinlich verschiedene Behauptungen +DEFAULT_MATCH_THRESHOLD = 0.75 # fuer Duplikat-Warnung beim Anlegen +LIVE_SEARCH_THRESHOLD = 0.55 # fuer Live-Suche im Modal, mehr Recall + +_model = None +_model_lock = threading.Lock() + + +def _get_model(): + """Lädt das Modell einmalig (lazy) und gibt es zurück.""" + global _model + if _model is None: + with _model_lock: + if _model is None: + from sentence_transformers import SentenceTransformer + logger.info("Lade Embedding-Modell %s ...", MODEL_NAME) + _model = SentenceTransformer(MODEL_NAME) + logger.info("Embedding-Modell geladen, dim=%d", EMBED_DIM) + return _model + + +def _encode_sync(texts: list[str]) -> np.ndarray: + """Synchroner Encode (CPU-bound, sollte im Executor laufen).""" + model = _get_model() + vecs = model.encode( + texts, + normalize_embeddings=True, + convert_to_numpy=True, + show_progress_bar=False, + ) + return vecs.astype(DTYPE, copy=False) + + +async def encode_text(text: str) -> bytes: + """Encodet einen Text und gibt das Embedding als Bytes (BLOB-tauglich) zurück.""" + if not text or not text.strip(): + raise ValueError("Leerer Text kann nicht embedded werden") + loop = asyncio.get_running_loop() + vec = await loop.run_in_executor(None, _encode_sync, [text]) + return vec[0].tobytes() + + +async def encode_batch(texts: list[str]) -> list[bytes]: + """Encodet mehrere Texte in einem Batch (effizienter als einzeln).""" + texts = [t for t in texts if t and t.strip()] + if not texts: + return [] + loop = asyncio.get_running_loop() + vecs = await loop.run_in_executor(None, _encode_sync, texts) + return [v.tobytes() for v in vecs] + + +def decode_embedding(blob: bytes | None) -> np.ndarray | None: + """Decodet einen BLOB zurück in einen numpy-Vektor.""" + if blob is None or len(blob) == 0: + return None + return np.frombuffer(blob, dtype=DTYPE) + + +def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: + """Kosinus-Ähnlichkeit zweier Vektoren. + + Da wir L2-normalisiert encoden, reicht das Skalarprodukt. + Defensiv: wenn ein Vektor nicht normalisiert ist, fängt diese Variante das ab. + """ + na = float(np.linalg.norm(a)) + nb = float(np.linalg.norm(b)) + if na == 0.0 or nb == 0.0: + return 0.0 + return float(np.dot(a, b) / (na * nb)) + + +def find_similar( + query: np.ndarray, + candidates: Iterable[tuple[int, np.ndarray]], + top_k: int = 5, + threshold: float = DEFAULT_MATCH_THRESHOLD, +) -> list[tuple[int, float]]: + """Sucht in einer Kandidaten-Menge die top_k ähnlichsten Embeddings. + + Args: + query: L2-normalisierter Query-Vektor. + candidates: Iterable von (id, embedding-Vektor)-Tupeln. + top_k: maximale Anzahl Treffer. + threshold: minimaler Score, alles darunter wird verworfen. + + Returns: + Liste von (id, score), absteigend sortiert. + """ + scored: list[tuple[int, float]] = [] + for cid, vec in candidates: + if vec is None: + continue + score = cosine_similarity(query, vec) + if score >= threshold: + scored.append((cid, score)) + scored.sort(key=lambda x: x[1], reverse=True) + return scored[:top_k] + + +def warm_up() -> None: + """Lädt das Modell vor (kann beim App-Start in einem Thread aufgerufen werden).""" + _get_model() diff --git a/src/services/fimi_matcher.py b/src/services/fimi_matcher.py new file mode 100644 index 0000000..27c19cd --- /dev/null +++ b/src/services/fimi_matcher.py @@ -0,0 +1,371 @@ +"""FIMI-Matcher: gleicht Monitor-Artikel gegen den importierten +Falschbehauptungs-Bestand (fimi_claims, EUvsDisinfo) ab. + +Zweistufig, weil Embedding-Aehnlichkeit nur THEMENNAEHE misst, nicht HALTUNG: +ein Artikel, der Russlands Angriff einen "Angriffskrieg" nennt, liegt im +Embedding-Raum dicht an der Falschbehauptung "Russland wurde zum Angriff +gezwungen", sagt aber das Gegenteil. Reine Embeddings wuerden also neutrale +und sogar widerlegende Berichterstattung als Treffer markieren. + + Stufe 1 (Embedding-Vorfilter, billig): findet thematisch nahe Kandidaten. + Die Claim-Embeddings liegen als numpy-Matrix im RAM (~30 MB), ein + Match ist eine Matrixmultiplikation (Kosinus == Skalarprodukt, da + L2-normalisiert). + Stufe 2 (LLM-Verifikation, praezise): ein Haiku-Call pro Kandidaten-Artikel + entscheidet, ob der Artikel die Behauptung tatsaechlich VERBREITET + (zustimmend als Tatsache aufstellt) oder nur darueber berichtet / + sie widerlegt. Nur bestaetigte Verbreitungen werden gespeichert. + +Provenienz-Leitplanke: gespeichert wird nur eine Verknuepfung Artikel -> +benannter, pruefbarer EUvsDisinfo-Case plus das woertliche Zitat aus dem +Artikel. Der Monitor wertet nie selbst. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +import threading + +import aiosqlite +import numpy as np + +from services.embeddings import encode_batch +from agents.claude_client import call_claude, ClaudeCliError +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.fimi_matcher") + +EMBED_DIM = 384 +# Stufe 1: Vorfilter +EMBED_FLOOR = 0.55 # untere Grenze, ab der ein Kandidat ueberhaupt entsteht +PREFILTER_THRESHOLD = 0.65 # ab hier geht ein Kandidat in die LLM-Verifikation +TOP_K = 5 # max. Kandidaten-Claims pro Artikel +CONTENT_EXCERPT_CHARS = 1500 +# Stufe 2: LLM-Verifikation +VERIFY_ENABLED = os.environ.get("FIMI_VERIFY_ENABLED", "true").lower() != "false" +VERIFY_CONCURRENCY = int(os.environ.get("FIMI_VERIFY_CONCURRENCY", "4")) +VERIFY_CONTENT_CHARS = 2200 +VERIFY_TIMEOUT = 90 + +# Singleton-Matrix der Claim-Embeddings +_ids: np.ndarray | None = None # (N,) int64 -> fimi_claims.id +_matrix: np.ndarray | None = None # (N, 384) float32 +_lock = threading.Lock() + + +# ────────────────────────────────────────────────────────────────── +# Stufe 1: Embedding-Vorfilter +# ────────────────────────────────────────────────────────────────── + +async def ensure_matrix(db: aiosqlite.Connection, force: bool = False) -> int: + """Laedt die Claim-Embeddings einmalig in eine numpy-Matrix. Idempotent.""" + global _ids, _matrix + if _matrix is not None and not force: + return int(_matrix.shape[0]) + + cursor = await db.execute( + "SELECT id, embedding FROM fimi_claims WHERE embedding IS NOT NULL" + ) + rows = await cursor.fetchall() + ids: list[int] = [] + vecs: list[np.ndarray] = [] + for r in rows: + v = np.frombuffer(r["embedding"], dtype=np.float32) + if v.size != EMBED_DIM: + continue + ids.append(r["id"]) + vecs.append(v) + + with _lock: + if vecs: + _ids = np.asarray(ids, dtype=np.int64) + _matrix = np.vstack(vecs).astype(np.float32, copy=False) + else: + _ids = np.empty((0,), dtype=np.int64) + _matrix = np.empty((0, EMBED_DIM), dtype=np.float32) + logger.info("FIMI-Matcher: %d Claim-Embeddings geladen", len(ids)) + return len(ids) + + +def is_ready() -> bool: + return _matrix is not None and _matrix.shape[0] > 0 + + +def _build_query_text(headline: str | None, content: str | None) -> str: + parts = [] + if headline: + parts.append(headline.strip()) + if content: + excerpt = content.strip()[:CONTENT_EXCERPT_CHARS] + if excerpt: + parts.append(excerpt) + return " ".join(parts).strip() + + +async def match_query_texts( + texts: list[str], + threshold: float = EMBED_FLOOR, + top_k: int = TOP_K, +) -> list[list[tuple[int, float]]]: + """Stufe 1: matcht Query-Texte gegen die Claim-Matrix (Embedding-Kosinus). + + Returns: Liste gleicher Laenge wie texts, je eine Liste von + (claim_id, score), absteigend sortiert, nur Treffer >= threshold. + """ + results: list[list[tuple[int, float]]] = [[] for _ in texts] + if _matrix is None or _matrix.shape[0] == 0: + return results + + valid_idx = [i for i, t in enumerate(texts) if t and t.strip()] + if not valid_idx: + return results + blobs = await encode_batch([texts[i] for i in valid_idx]) + if len(blobs) != len(valid_idx): + logger.warning("FIMI-Matcher: encode_batch-Laenge passt nicht, skip") + return results + + qm = np.vstack([np.frombuffer(b, dtype=np.float32) for b in blobs]) # (V, 384) + scores = qm @ _matrix.T # (V, N) — Kosinus, da L2-normalisiert + + for row, orig_i in enumerate(valid_idx): + s = scores[row] + if top_k < s.size: + cand = np.argpartition(s, -top_k)[-top_k:] + else: + cand = np.arange(s.size) + cand = cand[np.argsort(s[cand])[::-1]] + hits = [(int(_ids[j]), float(s[j])) for j in cand if s[j] >= threshold] + results[orig_i] = hits + return results + + +# ────────────────────────────────────────────────────────────────── +# Stufe 2: LLM-Verifikation +# ────────────────────────────────────────────────────────────────── + +_VERIFY_PROMPT = """Du pruefst, ob ein Nachrichtenartikel bekannte Falschbehauptungen VERBREITET. + +Unterscheide streng: +- VERBREITET (spreads=true): Der Artikel stellt die Behauptung als Tatsache auf, uebernimmt sie zustimmend, gibt sie unwidersprochen als wahr wieder oder legt sie dem Leser als zutreffend nahe. +- VERBREITET NICHT (spreads=false): Der Artikel berichtet nur neutral darueber, widerlegt die Behauptung, ordnet sie als Desinformation ein, zitiert sie distanziert/kritisch, oder sagt inhaltlich das Gegenteil. + +Beispiel: Ein Artikel, der Russlands Angriff einen "Angriffskrieg" nennt, VERBREITET NICHT die Behauptung "Russland wurde zum Angriff gezwungen" — er sagt das Gegenteil. + +Im Zweifel spreads=false. Nur eindeutige Verbreitung zaehlt. + +ARTIKEL +Titel: {headline} +Text: {content} + +ZU PRUEFENDE BEHAUPTUNGEN +{claims} + +Antworte AUSSCHLIESSLICH als JSON: +{{"results": [{{"claim_id": , "spreads": , "passage": ""}}]}}""" + + +async def _verify_article( + article, candidate_claims: list[tuple[int, float, str]] +) -> list[tuple[int, float, str]]: + """Ein Haiku-Call: welche Kandidaten-Behauptungen verbreitet der Artikel? + + candidate_claims: Liste (claim_id, embed_score, claim_text). + Returns: bestaetigte (claim_id, embed_score, passage) fuer spreads=true. + Wirft bei CLI-/Parse-Fehler, damit der Aufrufer den Artikel nicht als + geprueft markiert (Retry beim naechsten Refresh). + """ + headline = (article["headline_de"] or article["headline"] or "").strip() + content = ( + (article["content_de"] if "content_de" in article.keys() else None) + or (article["content_original"] if "content_original" in article.keys() else None) + or "" + ).strip()[:VERIFY_CONTENT_CHARS] + if not content: + # Ohne Fliesstext laesst sich die Haltung nicht serioes bestimmen. + return [] + + claim_by_id = {cid: text for cid, _, text in candidate_claims} + claims_block = "\n".join(f"[{cid}] {text}" for cid, _, text in candidate_claims) + prompt = _VERIFY_PROMPT.format(headline=headline, content=content, claims=claims_block) + + text, _usage = await call_claude( + prompt, tools=None, model=CLAUDE_MODEL_FAST, timeout=VERIFY_TIMEOUT + ) + raw = (text or "").strip() + # Defensive: evtl. Markdown-Fences entfernen + if raw.startswith("```"): + raw = raw.strip("`") + nl = raw.find("\n") + if nl != -1: + raw = raw[nl + 1:] + start, end = raw.find("{"), raw.rfind("}") + if start == -1 or end == -1: + raise ValueError(f"Keine JSON-Antwort vom Verifizierer: {raw[:120]!r}") + data = json.loads(raw[start:end + 1]) + + embed_score = {cid: sc for cid, sc, _ in candidate_claims} + confirmed: list[tuple[int, float, str]] = [] + for item in data.get("results", []): + try: + cid = int(item.get("claim_id")) + except (TypeError, ValueError): + continue + if cid not in claim_by_id: + continue + if item.get("spreads") is True: + passage = (item.get("passage") or "").strip()[:500] + confirmed.append((cid, embed_score.get(cid, 0.0), passage)) + return confirmed + + +# ────────────────────────────────────────────────────────────────── +# Orchestrierung: matchen + speichern +# ────────────────────────────────────────────────────────────────── + +async def _load_claim_texts(db, claim_ids: set[int]) -> dict[int, str]: + if not claim_ids: + return {} + qs = ",".join("?" for _ in claim_ids) + cursor = await db.execute( + f"SELECT id, text FROM fimi_claims WHERE id IN ({qs})", tuple(claim_ids) + ) + return {r["id"]: r["text"] for r in await cursor.fetchall()} + + +async def match_and_store_articles( + db: aiosqlite.Connection, + articles: list, + prefilter_threshold: float = PREFILTER_THRESHOLD, + top_k: int = TOP_K, + verify: bool | None = None, + mark_checked: bool = True, +) -> dict: + """Zweistufiger Match + Speicherung fuer eine Liste Artikel-Rows. + + articles: Rows mit id, headline, headline_de, content_original, content_de + und (optional) tenant_id. + """ + if verify is None: + verify = VERIFY_ENABLED + await ensure_matrix(db) + if not articles: + return {"articles": 0, "candidates": 0, "articles_with_match": 0, "stored": 0, "errors": 0} + + # Stufe 1: Embedding-Vorfilter + texts = [ + _build_query_text( + a["headline_de"] or a["headline"], + (a["content_de"] if "content_de" in a.keys() else None) + or (a["content_original"] if "content_original" in a.keys() else None), + ) + for a in articles + ] + prefiltered = await match_query_texts(texts, threshold=EMBED_FLOOR, top_k=top_k) + + # Claim-Texte fuer alle starken Kandidaten laden + strong_per_article: list[list[tuple[int, float]]] = [ + [(cid, sc) for cid, sc in cands if sc >= prefilter_threshold] + for cands in prefiltered + ] + need_ids: set[int] = {cid for lst in strong_per_article for cid, _ in lst} + claim_texts = await _load_claim_texts(db, need_ids) + + # Stufe 2: Verifikation (parallel, begrenzt) — nur Artikel mit starken Kandidaten + sem = asyncio.Semaphore(max(1, VERIFY_CONCURRENCY)) + candidates_total = sum(len(lst) for lst in strong_per_article) + + async def _process(idx: int): + a = articles[idx] + strong = strong_per_article[idx] + if not strong: + # geprueft, aber kein starker Kandidat -> nichts zu verifizieren + return idx, [], False + cand = [(cid, sc, claim_texts.get(cid, "")) for cid, sc in strong if claim_texts.get(cid)] + if not cand: + return idx, [], False + if not verify: + return idx, [(cid, sc, None) for cid, sc, _ in cand], False + async with sem: + try: + confirmed = await _verify_article(a, cand) + return idx, confirmed, False + except (ClaudeCliError, ValueError, json.JSONDecodeError, TimeoutError) as e: + logger.warning("FIMI-Verifikation article_id=%s fehlgeschlagen: %s", + a["id"], e) + return idx, None, True # error -> nicht als checked markieren + + proc = await asyncio.gather(*[_process(i) for i in range(len(articles))]) + + # Speichern (sequenziell, eine DB-Connection) + stored = 0 + with_match = 0 + errors = 0 + for idx, confirmed, err in proc: + a = articles[idx] + if err: + errors += 1 + continue # Artikel NICHT als checked markieren -> Retry + if confirmed: + with_match += 1 + tenant_id = a["tenant_id"] if "tenant_id" in a.keys() else None + role = "verified" if verify else "match" + for cid, sc, passage in confirmed: + try: + await db.execute( + """INSERT INTO article_fimi_matches + (article_id, fimi_claim_id, score, role, matched_text, tenant_id, matched_at) + VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)""", + (a["id"], cid, round(sc, 4), role, passage, tenant_id), + ) + stored += 1 + except aiosqlite.IntegrityError: + await db.execute( + """UPDATE article_fimi_matches + SET score = MAX(COALESCE(score, 0), ?), + role = ?, matched_text = COALESCE(?, matched_text) + WHERE article_id = ? AND fimi_claim_id = ?""", + (round(sc, 4), role, passage, a["id"], cid), + ) + if mark_checked: + await db.execute( + "UPDATE articles SET fimi_checked_at = CURRENT_TIMESTAMP WHERE id = ?", + (a["id"],), + ) + await db.commit() + logger.info( + "FIMI-Matcher: %d Artikel, %d Kandidaten, %d verbreiten Falschbehauptungen, " + "%d Links, %d Fehler", + len(articles), candidates_total, with_match, stored, errors, + ) + return { + "articles": len(articles), + "candidates": candidates_total, + "articles_with_match": with_match, + "stored": stored, + "errors": errors, + } + + +async def match_incident_articles( + db: aiosqlite.Connection, + incident_id: int, + only_unchecked: bool = True, + limit: int | None = None, + verify: bool | None = None, +) -> dict: + """Matcht (standardmaessig noch nicht gepruefte) Artikel einer Lage.""" + q = ( + "SELECT id, headline, headline_de, content_original, content_de, tenant_id " + "FROM articles WHERE incident_id = ?" + ) + params: list = [incident_id] + if only_unchecked: + q += " AND fimi_checked_at IS NULL" + q += " ORDER BY id" + if limit: + q += f" LIMIT {int(limit)}" + cursor = await db.execute(q, params) + articles = await cursor.fetchall() + return await match_and_store_articles(db, articles, verify=verify)