"""Backfill 2026-05-03: Fehlende Uebersetzungen ergaenzen. Hintergrund: bis 2026-05-03 hat der Analyzer-Agent translations als Teil der Lagebild-Antwort generiert. Bei vielen Artikeln pro Refresh hat das LLM die Translations weggelassen oder gekuerzt (Output-Token-Druck). Folge: viele englische Artikel haben keine headline_de/content_de. Diese Migration nutzt den neuen translator.py-Agent (Haiku) und uebersetzt alle Artikel WHERE language != 'de' AND (headline_de OR content_de fehlt). Idempotent: was schon uebersetzt ist, wird nicht erneut bearbeitet (durch COALESCE im UPDATE bleibt vorhandener Inhalt erhalten). Kosten: bei Haiku ~$0.01-0.02 pro Artikel. Anzeige der Gesamt-Kosten am Ende. """ import argparse import asyncio import shutil import sqlite3 import sys from datetime import datetime # Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback) sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") try: from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE from agents.claude_client import UsageAccumulator from services.post_refresh_qc import normalize_german_umlauts except ImportError: sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src") from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE from agents.claude_client import UsageAccumulator from services.post_refresh_qc import normalize_german_umlauts async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int: db = sqlite3.connect(db_path, timeout=60) db.row_factory = sqlite3.Row # Live-Service haelt regelmaessig den Write-Lock. Statt sofort zu crashen # warten wir bis zu 60 Sekunden auf den Lock. db.execute("PRAGMA busy_timeout = 60000") db.execute("PRAGMA journal_mode = WAL") sql = """SELECT id, incident_id, headline, content_original, language FROM articles WHERE language IS NOT NULL AND LOWER(language) != 'de' AND (headline_de IS NULL OR headline_de = '' OR content_de IS NULL OR content_de = '')""" if limit: sql += f" LIMIT {int(limit)}" cur = db.execute(sql) rows = [dict(r) for r in cur.fetchall()] print(f"Artikel ohne Uebersetzung: {len(rows)}") if not rows: db.close() return 0 if dry_run: print("DRY-RUN: keine Uebersetzungen, keine Updates.") for r in rows[:5]: print(f" id={r['id']} ({r['language']}): {r['headline'][:80]!r}") db.close() return 0 usage = UsageAccumulator() total = len(rows) batch_size = DEFAULT_BATCH_SIZE PARALLEL_WORKERS = 4 updated = 0 translated = 0 sample_translations = [] completed_count = 0 print(f"Starte parallele Verarbeitung: Batches a {batch_size}, {PARALLEL_WORKERS} Worker parallel...", flush=True) # Batches vorbereiten batches = [rows[i:i + batch_size] for i in range(0, total, batch_size)] semaphore = asyncio.Semaphore(PARALLEL_WORKERS) async def process_batch(batch): async with semaphore: return await translate_articles_batch(batch) # Tasks erstellen und in beliebiger Reihenfolge bearbeiten tasks = [asyncio.create_task(process_batch(b)) for b in batches] n_batches = len(batches) for completed_task in asyncio.as_completed(tasks): try: translations, batch_usage = await completed_task except Exception as e: print(f" Batch-Fehler: {e}", flush=True) continue usage.add(batch_usage) translated += len(translations) for t in translations: hd = t.get("headline_de") cd = t.get("content_de") if hd: hd, _ = normalize_german_umlauts(hd) if cd: cd, _ = normalize_german_umlauts(cd) if hd or cd: db.execute( "UPDATE articles SET headline_de = COALESCE(?, headline_de), " "content_de = COALESCE(?, content_de) WHERE id = ?", (hd, cd, t["id"]), ) updated += 1 if len(sample_translations) < 3: sample_translations.append(t["id"]) db.commit() # Per-Batch commit -> bei Abbruch kein Datenverlust completed_count += 1 if completed_count % 20 == 0 or completed_count == n_batches: print( f"[{completed_count}/{n_batches} Batches | Updates={updated}/{total} | Cost=${usage.total_cost_usd:.2f}]", flush=True, ) print() print(f"=== Stats ===") print(f" Total betrachtet: {total}") print(f" Translator OK: {translated}") print(f" DB-Updates: {updated}") print(f" Calls: {usage.call_count}") print(f" Input-Tokens: {usage.input_tokens:,}") print(f" Output-Tokens: {usage.output_tokens:,}") print(f" Cost gesamt: ${usage.total_cost_usd:.4f}") print() print("=== Stichprobe (3 frische Uebersetzungen) ===") if sample_translations: placeholders = ",".join("?" * len(sample_translations)) for r in db.execute( f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})", sample_translations, ): d = dict(r) print(f" [{d['id']}]") print(f" EN: {d['headline'][:120]!r}") print(f" DE: {d['headline_de'][:120]!r}") db.close() return 0 def main(): ap = argparse.ArgumentParser() ap.add_argument("--db", required=True) ap.add_argument("--no-backup", action="store_true") ap.add_argument("--dry-run", action="store_true") ap.add_argument("--limit", type=int, default=None, help="Optional: Test-Lauf mit nur N Artikeln") args = ap.parse_args() if not args.no_backup and not args.dry_run: ts = datetime.now().strftime("%Y-%m-%d-%H%M") backup_path = f"{args.db}.translations-bak-{ts}" shutil.copy2(args.db, backup_path) print(f"Backup: {backup_path}") return asyncio.run(main_async(args.db, args.dry_run, args.limit)) if __name__ == "__main__": sys.exit(main())