chore(fimi): Backfill-Runner fuer alle ungepruefte Artikel (gechunkt, robust)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
94
scripts/backfill_fimi.py
Ausführbare Datei
94
scripts/backfill_fimi.py
Ausführbare Datei
@@ -0,0 +1,94 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Backfill: alle noch ungeprueften Artikel gegen den Falschbehauptungsbestand
|
||||
abgleichen (Embedding-Vorfilter + LLM-Verifikation).
|
||||
|
||||
Geht alle Lagen mit ungeprueften Artikeln durch, kleine zuerst (schnelle,
|
||||
frueh testbare Ergebnisse), grosse zuletzt. Pro Lage in Batches, damit die
|
||||
Score-Matrix (Artikel x Claims) den RAM nicht sprengt. Robust: Fehler
|
||||
einzelner Batches stoppen den Lauf nicht; bei Artikeln, die wiederholt
|
||||
scheitern (kein Fortschritt), wird die Lage abgebrochen statt endlos zu
|
||||
schleifen.
|
||||
|
||||
Aufruf (im Staging-Verzeichnis, mit dessen venv):
|
||||
HF_HUB_OFFLINE=1 TRANSFORMERS_OFFLINE=1 FIMI_VERIFY_CONCURRENCY=5 \
|
||||
./venv/bin/python scripts/backfill_fimi.py
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path.insert(0, "src")
|
||||
|
||||
import aiosqlite
|
||||
from services import fimi_matcher
|
||||
|
||||
DB_PATH = "data/osint.db"
|
||||
BATCH = 120
|
||||
|
||||
|
||||
def _ts() -> str:
|
||||
return time.strftime("%H:%M:%S")
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
db = await aiosqlite.connect(DB_PATH)
|
||||
db.row_factory = aiosqlite.Row
|
||||
t0 = time.time()
|
||||
n_claims = await fimi_matcher.ensure_matrix(db)
|
||||
print(f"[{_ts()}] Matrix: {n_claims} Claims geladen", flush=True)
|
||||
|
||||
cursor = await db.execute(
|
||||
"""SELECT incident_id, COUNT(*) AS n
|
||||
FROM articles WHERE fimi_checked_at IS NULL AND incident_id IS NOT NULL
|
||||
GROUP BY incident_id ORDER BY n"""
|
||||
)
|
||||
incidents = [(r["incident_id"], r["n"]) for r in await cursor.fetchall()]
|
||||
total = sum(n for _, n in incidents)
|
||||
print(f"[{_ts()}] START: {len(incidents)} Lagen, {total} ungepruefte Artikel", flush=True)
|
||||
|
||||
grand = {"articles": 0, "candidates": 0, "articles_with_match": 0, "stored": 0, "errors": 0}
|
||||
for iid, n in incidents:
|
||||
done = 0
|
||||
prev_remaining = None
|
||||
while True:
|
||||
res = await fimi_matcher.match_incident_articles(
|
||||
db, iid, only_unchecked=True, limit=BATCH
|
||||
)
|
||||
if res["articles"] == 0:
|
||||
break
|
||||
done += res["articles"]
|
||||
for k in grand:
|
||||
grand[k] += res.get(k, 0)
|
||||
|
||||
cur = await db.execute(
|
||||
"SELECT COUNT(*) FROM articles WHERE incident_id = ? AND fimi_checked_at IS NULL",
|
||||
(iid,),
|
||||
)
|
||||
remaining = (await cur.fetchone())[0]
|
||||
print(
|
||||
f"[{_ts()}] Lage {iid}: +{res['articles']} ({done}/{n}), "
|
||||
f"Treffer {res['articles_with_match']}, Fehler {res['errors']}, "
|
||||
f"verbleibend {remaining}",
|
||||
flush=True,
|
||||
)
|
||||
if remaining == 0:
|
||||
break
|
||||
if prev_remaining is not None and remaining >= prev_remaining:
|
||||
print(
|
||||
f"[{_ts()}] Lage {iid}: kein Fortschritt (verbleibend {remaining}), "
|
||||
f"Abbruch wegen wiederholt fehlschlagender Artikel",
|
||||
flush=True,
|
||||
)
|
||||
break
|
||||
prev_remaining = remaining
|
||||
print(f"[{_ts()}] == Lage {iid} fertig: {done} Artikel verarbeitet ==", flush=True)
|
||||
|
||||
await db.close()
|
||||
dt = time.time() - t0
|
||||
print(f"[{_ts()}] FERTIG nach {dt/60:.1f} min: {grand}", flush=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren