Dateien
AegisSight-Monitor-Verwaltung/migrations/migrate_translations_2026-05-03.py
claude-dev 670a6617a7 Migration: parallele translate-Batches + busy_timeout/WAL
- asyncio.Semaphore(4) + as_completed: 4 Worker parallel statt sequenziell
- Per-Batch commit: kein Datenverlust bei Abbruch
- sqlite3 timeout=60 + PRAGMA busy_timeout=60000 + journal_mode=WAL: kein Crash bei aktivem Live-Write-Lock
- Bessere Progress-Logs (alle 20 Batches)
2026-05-09 01:32:51 +00:00

166 Zeilen
6.1 KiB
Python

"""Backfill 2026-05-03: Fehlende Uebersetzungen ergaenzen.
Hintergrund: bis 2026-05-03 hat der Analyzer-Agent translations als Teil der
Lagebild-Antwort generiert. Bei vielen Artikeln pro Refresh hat das LLM die
Translations weggelassen oder gekuerzt (Output-Token-Druck). Folge: viele
englische Artikel haben keine headline_de/content_de.
Diese Migration nutzt den neuen translator.py-Agent (Haiku) und uebersetzt
alle Artikel WHERE language != 'de' AND (headline_de OR content_de fehlt).
Idempotent: was schon uebersetzt ist, wird nicht erneut bearbeitet (durch
COALESCE im UPDATE bleibt vorhandener Inhalt erhalten).
Kosten: bei Haiku ~$0.01-0.02 pro Artikel. Anzeige der Gesamt-Kosten am Ende.
"""
import argparse
import asyncio
import shutil
import sqlite3
import sys
from datetime import datetime
# Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback)
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
try:
from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts
except ImportError:
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src")
from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts
async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
db = sqlite3.connect(db_path, timeout=60)
db.row_factory = sqlite3.Row
# Live-Service haelt regelmaessig den Write-Lock. Statt sofort zu crashen
# warten wir bis zu 60 Sekunden auf den Lock.
db.execute("PRAGMA busy_timeout = 60000")
db.execute("PRAGMA journal_mode = WAL")
sql = """SELECT id, incident_id, headline, content_original, language
FROM articles
WHERE language IS NOT NULL AND LOWER(language) != 'de'
AND (headline_de IS NULL OR headline_de = ''
OR content_de IS NULL OR content_de = '')"""
if limit:
sql += f" LIMIT {int(limit)}"
cur = db.execute(sql)
rows = [dict(r) for r in cur.fetchall()]
print(f"Artikel ohne Uebersetzung: {len(rows)}")
if not rows:
db.close()
return 0
if dry_run:
print("DRY-RUN: keine Uebersetzungen, keine Updates.")
for r in rows[:5]:
print(f" id={r['id']} ({r['language']}): {r['headline'][:80]!r}")
db.close()
return 0
usage = UsageAccumulator()
total = len(rows)
batch_size = DEFAULT_BATCH_SIZE
PARALLEL_WORKERS = 4
updated = 0
translated = 0
sample_translations = []
completed_count = 0
print(f"Starte parallele Verarbeitung: Batches a {batch_size}, {PARALLEL_WORKERS} Worker parallel...", flush=True)
# Batches vorbereiten
batches = [rows[i:i + batch_size] for i in range(0, total, batch_size)]
semaphore = asyncio.Semaphore(PARALLEL_WORKERS)
async def process_batch(batch):
async with semaphore:
return await translate_articles_batch(batch)
# Tasks erstellen und in beliebiger Reihenfolge bearbeiten
tasks = [asyncio.create_task(process_batch(b)) for b in batches]
n_batches = len(batches)
for completed_task in asyncio.as_completed(tasks):
try:
translations, batch_usage = await completed_task
except Exception as e:
print(f" Batch-Fehler: {e}", flush=True)
continue
usage.add(batch_usage)
translated += len(translations)
for t in translations:
hd = t.get("headline_de")
cd = t.get("content_de")
if hd:
hd, _ = normalize_german_umlauts(hd)
if cd:
cd, _ = normalize_german_umlauts(cd)
if hd or cd:
db.execute(
"UPDATE articles SET headline_de = COALESCE(?, headline_de), "
"content_de = COALESCE(?, content_de) WHERE id = ?",
(hd, cd, t["id"]),
)
updated += 1
if len(sample_translations) < 3:
sample_translations.append(t["id"])
db.commit() # Per-Batch commit -> bei Abbruch kein Datenverlust
completed_count += 1
if completed_count % 20 == 0 or completed_count == n_batches:
print(
f"[{completed_count}/{n_batches} Batches | Updates={updated}/{total} | Cost=${usage.total_cost_usd:.2f}]",
flush=True,
)
print()
print(f"=== Stats ===")
print(f" Total betrachtet: {total}")
print(f" Translator OK: {translated}")
print(f" DB-Updates: {updated}")
print(f" Calls: {usage.call_count}")
print(f" Input-Tokens: {usage.input_tokens:,}")
print(f" Output-Tokens: {usage.output_tokens:,}")
print(f" Cost gesamt: ${usage.total_cost_usd:.4f}")
print()
print("=== Stichprobe (3 frische Uebersetzungen) ===")
if sample_translations:
placeholders = ",".join("?" * len(sample_translations))
for r in db.execute(
f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})",
sample_translations,
):
d = dict(r)
print(f" [{d['id']}]")
print(f" EN: {d['headline'][:120]!r}")
print(f" DE: {d['headline_de'][:120]!r}")
db.close()
return 0
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True)
ap.add_argument("--no-backup", action="store_true")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--limit", type=int, default=None,
help="Optional: Test-Lauf mit nur N Artikeln")
args = ap.parse_args()
if not args.no_backup and not args.dry_run:
ts = datetime.now().strftime("%Y-%m-%d-%H%M")
backup_path = f"{args.db}.translations-bak-{ts}"
shutil.copy2(args.db, backup_path)
print(f"Backup: {backup_path}")
return asyncio.run(main_async(args.db, args.dry_run, args.limit))
if __name__ == "__main__":
sys.exit(main())