- asyncio.Semaphore(4) + as_completed: 4 Worker parallel statt sequenziell - Per-Batch commit: kein Datenverlust bei Abbruch - sqlite3 timeout=60 + PRAGMA busy_timeout=60000 + journal_mode=WAL: kein Crash bei aktivem Live-Write-Lock - Bessere Progress-Logs (alle 20 Batches)
166 Zeilen
6.1 KiB
Python
166 Zeilen
6.1 KiB
Python
"""Backfill 2026-05-03: Fehlende Uebersetzungen ergaenzen.
|
|
|
|
Hintergrund: bis 2026-05-03 hat der Analyzer-Agent translations als Teil der
|
|
Lagebild-Antwort generiert. Bei vielen Artikeln pro Refresh hat das LLM die
|
|
Translations weggelassen oder gekuerzt (Output-Token-Druck). Folge: viele
|
|
englische Artikel haben keine headline_de/content_de.
|
|
|
|
Diese Migration nutzt den neuen translator.py-Agent (Haiku) und uebersetzt
|
|
alle Artikel WHERE language != 'de' AND (headline_de OR content_de fehlt).
|
|
|
|
Idempotent: was schon uebersetzt ist, wird nicht erneut bearbeitet (durch
|
|
COALESCE im UPDATE bleibt vorhandener Inhalt erhalten).
|
|
|
|
Kosten: bei Haiku ~$0.01-0.02 pro Artikel. Anzeige der Gesamt-Kosten am Ende.
|
|
"""
|
|
import argparse
|
|
import asyncio
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback)
|
|
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
|
|
try:
|
|
from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
|
|
from agents.claude_client import UsageAccumulator
|
|
from services.post_refresh_qc import normalize_german_umlauts
|
|
except ImportError:
|
|
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src")
|
|
from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
|
|
from agents.claude_client import UsageAccumulator
|
|
from services.post_refresh_qc import normalize_german_umlauts
|
|
|
|
|
|
async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
|
|
db = sqlite3.connect(db_path, timeout=60)
|
|
db.row_factory = sqlite3.Row
|
|
# Live-Service haelt regelmaessig den Write-Lock. Statt sofort zu crashen
|
|
# warten wir bis zu 60 Sekunden auf den Lock.
|
|
db.execute("PRAGMA busy_timeout = 60000")
|
|
db.execute("PRAGMA journal_mode = WAL")
|
|
|
|
sql = """SELECT id, incident_id, headline, content_original, language
|
|
FROM articles
|
|
WHERE language IS NOT NULL AND LOWER(language) != 'de'
|
|
AND (headline_de IS NULL OR headline_de = ''
|
|
OR content_de IS NULL OR content_de = '')"""
|
|
if limit:
|
|
sql += f" LIMIT {int(limit)}"
|
|
cur = db.execute(sql)
|
|
rows = [dict(r) for r in cur.fetchall()]
|
|
print(f"Artikel ohne Uebersetzung: {len(rows)}")
|
|
|
|
if not rows:
|
|
db.close()
|
|
return 0
|
|
|
|
if dry_run:
|
|
print("DRY-RUN: keine Uebersetzungen, keine Updates.")
|
|
for r in rows[:5]:
|
|
print(f" id={r['id']} ({r['language']}): {r['headline'][:80]!r}")
|
|
db.close()
|
|
return 0
|
|
|
|
usage = UsageAccumulator()
|
|
total = len(rows)
|
|
batch_size = DEFAULT_BATCH_SIZE
|
|
PARALLEL_WORKERS = 4
|
|
updated = 0
|
|
translated = 0
|
|
sample_translations = []
|
|
completed_count = 0
|
|
print(f"Starte parallele Verarbeitung: Batches a {batch_size}, {PARALLEL_WORKERS} Worker parallel...", flush=True)
|
|
|
|
# Batches vorbereiten
|
|
batches = [rows[i:i + batch_size] for i in range(0, total, batch_size)]
|
|
semaphore = asyncio.Semaphore(PARALLEL_WORKERS)
|
|
|
|
async def process_batch(batch):
|
|
async with semaphore:
|
|
return await translate_articles_batch(batch)
|
|
|
|
# Tasks erstellen und in beliebiger Reihenfolge bearbeiten
|
|
tasks = [asyncio.create_task(process_batch(b)) for b in batches]
|
|
n_batches = len(batches)
|
|
|
|
for completed_task in asyncio.as_completed(tasks):
|
|
try:
|
|
translations, batch_usage = await completed_task
|
|
except Exception as e:
|
|
print(f" Batch-Fehler: {e}", flush=True)
|
|
continue
|
|
usage.add(batch_usage)
|
|
translated += len(translations)
|
|
for t in translations:
|
|
hd = t.get("headline_de")
|
|
cd = t.get("content_de")
|
|
if hd:
|
|
hd, _ = normalize_german_umlauts(hd)
|
|
if cd:
|
|
cd, _ = normalize_german_umlauts(cd)
|
|
if hd or cd:
|
|
db.execute(
|
|
"UPDATE articles SET headline_de = COALESCE(?, headline_de), "
|
|
"content_de = COALESCE(?, content_de) WHERE id = ?",
|
|
(hd, cd, t["id"]),
|
|
)
|
|
updated += 1
|
|
if len(sample_translations) < 3:
|
|
sample_translations.append(t["id"])
|
|
db.commit() # Per-Batch commit -> bei Abbruch kein Datenverlust
|
|
|
|
completed_count += 1
|
|
if completed_count % 20 == 0 or completed_count == n_batches:
|
|
print(
|
|
f"[{completed_count}/{n_batches} Batches | Updates={updated}/{total} | Cost=${usage.total_cost_usd:.2f}]",
|
|
flush=True,
|
|
)
|
|
|
|
print()
|
|
print(f"=== Stats ===")
|
|
print(f" Total betrachtet: {total}")
|
|
print(f" Translator OK: {translated}")
|
|
print(f" DB-Updates: {updated}")
|
|
print(f" Calls: {usage.call_count}")
|
|
print(f" Input-Tokens: {usage.input_tokens:,}")
|
|
print(f" Output-Tokens: {usage.output_tokens:,}")
|
|
print(f" Cost gesamt: ${usage.total_cost_usd:.4f}")
|
|
print()
|
|
print("=== Stichprobe (3 frische Uebersetzungen) ===")
|
|
if sample_translations:
|
|
placeholders = ",".join("?" * len(sample_translations))
|
|
for r in db.execute(
|
|
f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})",
|
|
sample_translations,
|
|
):
|
|
d = dict(r)
|
|
print(f" [{d['id']}]")
|
|
print(f" EN: {d['headline'][:120]!r}")
|
|
print(f" DE: {d['headline_de'][:120]!r}")
|
|
db.close()
|
|
return 0
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--db", required=True)
|
|
ap.add_argument("--no-backup", action="store_true")
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
ap.add_argument("--limit", type=int, default=None,
|
|
help="Optional: Test-Lauf mit nur N Artikeln")
|
|
args = ap.parse_args()
|
|
|
|
if not args.no_backup and not args.dry_run:
|
|
ts = datetime.now().strftime("%Y-%m-%d-%H%M")
|
|
backup_path = f"{args.db}.translations-bak-{ts}"
|
|
shutil.copy2(args.db, backup_path)
|
|
print(f"Backup: {backup_path}")
|
|
|
|
return asyncio.run(main_async(args.db, args.dry_run, args.limit))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|