diff --git a/scripts/backfill_fimi.py b/scripts/backfill_fimi.py new file mode 100755 index 0000000..0013f08 --- /dev/null +++ b/scripts/backfill_fimi.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +"""Backfill: alle noch ungeprueften Artikel gegen den Falschbehauptungsbestand +abgleichen (Embedding-Vorfilter + LLM-Verifikation). + +Geht alle Lagen mit ungeprueften Artikeln durch, kleine zuerst (schnelle, +frueh testbare Ergebnisse), grosse zuletzt. Pro Lage in Batches, damit die +Score-Matrix (Artikel x Claims) den RAM nicht sprengt. Robust: Fehler +einzelner Batches stoppen den Lauf nicht; bei Artikeln, die wiederholt +scheitern (kein Fortschritt), wird die Lage abgebrochen statt endlos zu +schleifen. + +Aufruf (im Staging-Verzeichnis, mit dessen venv): + HF_HUB_OFFLINE=1 TRANSFORMERS_OFFLINE=1 FIMI_VERIFY_CONCURRENCY=5 \ + ./venv/bin/python scripts/backfill_fimi.py +""" +from __future__ import annotations + +import asyncio +import sys +import time + +sys.path.insert(0, "src") + +import aiosqlite +from services import fimi_matcher + +DB_PATH = "data/osint.db" +BATCH = 120 + + +def _ts() -> str: + return time.strftime("%H:%M:%S") + + +async def main() -> None: + db = await aiosqlite.connect(DB_PATH) + db.row_factory = aiosqlite.Row + t0 = time.time() + n_claims = await fimi_matcher.ensure_matrix(db) + print(f"[{_ts()}] Matrix: {n_claims} Claims geladen", flush=True) + + cursor = await db.execute( + """SELECT incident_id, COUNT(*) AS n + FROM articles WHERE fimi_checked_at IS NULL AND incident_id IS NOT NULL + GROUP BY incident_id ORDER BY n""" + ) + incidents = [(r["incident_id"], r["n"]) for r in await cursor.fetchall()] + total = sum(n for _, n in incidents) + print(f"[{_ts()}] START: {len(incidents)} Lagen, {total} ungepruefte Artikel", flush=True) + + grand = {"articles": 0, "candidates": 0, "articles_with_match": 0, "stored": 0, "errors": 0} + for iid, n in incidents: + done = 0 + prev_remaining = None + while True: + res = await fimi_matcher.match_incident_articles( + db, iid, only_unchecked=True, limit=BATCH + ) + if res["articles"] == 0: + break + done += res["articles"] + for k in grand: + grand[k] += res.get(k, 0) + + cur = await db.execute( + "SELECT COUNT(*) FROM articles WHERE incident_id = ? AND fimi_checked_at IS NULL", + (iid,), + ) + remaining = (await cur.fetchone())[0] + print( + f"[{_ts()}] Lage {iid}: +{res['articles']} ({done}/{n}), " + f"Treffer {res['articles_with_match']}, Fehler {res['errors']}, " + f"verbleibend {remaining}", + flush=True, + ) + if remaining == 0: + break + if prev_remaining is not None and remaining >= prev_remaining: + print( + f"[{_ts()}] Lage {iid}: kein Fortschritt (verbleibend {remaining}), " + f"Abbruch wegen wiederholt fehlschlagender Artikel", + flush=True, + ) + break + prev_remaining = remaining + print(f"[{_ts()}] == Lage {iid} fertig: {done} Artikel verarbeitet ==", flush=True) + + await db.close() + dt = time.time() - t0 + print(f"[{_ts()}] FERTIG nach {dt/60:.1f} min: {grand}", flush=True) + + +if __name__ == "__main__": + asyncio.run(main())