Backfill-Migration: fehlende DE-Uebersetzungen via Translator-Agent

Nutzt agents/translator.py mit Haiku, idempotent (COALESCE), Batches a 5,
robustes Parsing, Backup vor Lauf, Statistik am Ende inkl. Cost.
Dieser Commit ist enthalten in:
claude-dev
2026-05-03 00:05:07 +00:00
Ursprung 5f96c8f3dd
Commit e31536f8f9

Datei anzeigen

@@ -0,0 +1,127 @@
"""Backfill 2026-05-03: Fehlende Uebersetzungen ergaenzen.
Hintergrund: bis 2026-05-03 hat der Analyzer-Agent translations als Teil der
Lagebild-Antwort generiert. Bei vielen Artikeln pro Refresh hat das LLM die
Translations weggelassen oder gekuerzt (Output-Token-Druck). Folge: viele
englische Artikel haben keine headline_de/content_de.
Diese Migration nutzt den neuen translator.py-Agent (Haiku) und uebersetzt
alle Artikel WHERE language != 'de' AND (headline_de OR content_de fehlt).
Idempotent: was schon uebersetzt ist, wird nicht erneut bearbeitet (durch
COALESCE im UPDATE bleibt vorhandener Inhalt erhalten).
Kosten: bei Haiku ~$0.01-0.02 pro Artikel. Anzeige der Gesamt-Kosten am Ende.
"""
import argparse
import asyncio
import shutil
import sqlite3
import sys
from datetime import datetime
# Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback)
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
try:
from agents.translator import translate_articles
from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts
except ImportError:
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src")
from agents.translator import translate_articles
from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts
async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
db = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
sql = """SELECT id, incident_id, headline, content_original, language
FROM articles
WHERE language IS NOT NULL AND LOWER(language) != 'de'
AND (headline_de IS NULL OR headline_de = ''
OR content_de IS NULL OR content_de = '')"""
if limit:
sql += f" LIMIT {int(limit)}"
cur = db.execute(sql)
rows = [dict(r) for r in cur.fetchall()]
print(f"Artikel ohne Uebersetzung: {len(rows)}")
if not rows:
db.close()
return 0
if dry_run:
print("DRY-RUN: keine Uebersetzungen, keine Updates.")
for r in rows[:5]:
print(f" id={r['id']} ({r['language']}): {r['headline'][:80]!r}")
db.close()
return 0
usage = UsageAccumulator()
translations = await translate_articles(rows, output_lang="de",
usage_accumulator=usage)
print(f"Uebersetzt: {len(translations)} von {len(rows)}")
updated = 0
for t in translations:
hd = t.get("headline_de")
cd = t.get("content_de")
if hd:
hd, _ = normalize_german_umlauts(hd)
if cd:
cd, _ = normalize_german_umlauts(cd)
if hd or cd:
db.execute(
"UPDATE articles SET headline_de = COALESCE(?, headline_de), "
"content_de = COALESCE(?, content_de) WHERE id = ?",
(hd, cd, t["id"]),
)
updated += 1
db.commit()
print()
print(f"=== Stats ===")
print(f" Updates: {updated}")
print(f" Calls: {usage.call_count}")
print(f" Input-Tokens: {usage.input_tokens:,}")
print(f" Output-Tokens: {usage.output_tokens:,}")
print(f" Cost gesamt: ${usage.total_cost_usd:.4f}")
print()
print("=== Stichprobe (3 frische Uebersetzungen) ===")
sample_ids = [t["id"] for t in translations[:3]]
if sample_ids:
placeholders = ",".join("?" * len(sample_ids))
for r in db.execute(
f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})",
sample_ids,
):
d = dict(r)
print(f" [{d['id']}]")
print(f" EN: {d['headline'][:120]!r}")
print(f" DE: {d['headline_de'][:120]!r}")
db.close()
return 0
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True)
ap.add_argument("--no-backup", action="store_true")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--limit", type=int, default=None,
help="Optional: Test-Lauf mit nur N Artikeln")
args = ap.parse_args()
if not args.no_backup and not args.dry_run:
ts = datetime.now().strftime("%Y-%m-%d-%H%M")
backup_path = f"{args.db}.translations-bak-{ts}"
shutil.copy2(args.db, backup_path)
print(f"Backup: {backup_path}")
return asyncio.run(main_async(args.db, args.dry_run, args.limit))
if __name__ == "__main__":
sys.exit(main())