From 670a6617a7b00e38b7da26903d0370b98a3ce5d5 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sat, 9 May 2026 01:32:51 +0000 Subject: [PATCH] Migration: parallele translate-Batches + busy_timeout/WAL - asyncio.Semaphore(4) + as_completed: 4 Worker parallel statt sequenziell - Per-Batch commit: kein Datenverlust bei Abbruch - sqlite3 timeout=60 + PRAGMA busy_timeout=60000 + journal_mode=WAL: kein Crash bei aktivem Live-Write-Lock - Bessere Progress-Logs (alle 20 Batches) --- migrations/migrate_translations_2026-05-03.py | 98 +++++++++++++------ 1 file changed, 68 insertions(+), 30 deletions(-) diff --git a/migrations/migrate_translations_2026-05-03.py b/migrations/migrate_translations_2026-05-03.py index 2a8f465..a2bda81 100644 --- a/migrations/migrate_translations_2026-05-03.py +++ b/migrations/migrate_translations_2026-05-03.py @@ -23,19 +23,23 @@ from datetime import datetime # Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback) sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") try: - from agents.translator import translate_articles + from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE from agents.claude_client import UsageAccumulator from services.post_refresh_qc import normalize_german_umlauts except ImportError: sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src") - from agents.translator import translate_articles + from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE from agents.claude_client import UsageAccumulator from services.post_refresh_qc import normalize_german_umlauts async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int: - db = sqlite3.connect(db_path) + db = sqlite3.connect(db_path, timeout=60) db.row_factory = sqlite3.Row + # Live-Service haelt regelmaessig den Write-Lock. Statt sofort zu crashen + # warten wir bis zu 60 Sekunden auf den Lock. + db.execute("PRAGMA busy_timeout = 60000") + db.execute("PRAGMA journal_mode = WAL") sql = """SELECT id, incident_id, headline, content_original, language FROM articles @@ -60,42 +64,76 @@ async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int: return 0 usage = UsageAccumulator() - translations = await translate_articles(rows, output_lang="de", - usage_accumulator=usage) - print(f"Uebersetzt: {len(translations)} von {len(rows)}") - + total = len(rows) + batch_size = DEFAULT_BATCH_SIZE + PARALLEL_WORKERS = 4 updated = 0 - for t in translations: - hd = t.get("headline_de") - cd = t.get("content_de") - if hd: - hd, _ = normalize_german_umlauts(hd) - if cd: - cd, _ = normalize_german_umlauts(cd) - if hd or cd: - db.execute( - "UPDATE articles SET headline_de = COALESCE(?, headline_de), " - "content_de = COALESCE(?, content_de) WHERE id = ?", - (hd, cd, t["id"]), + translated = 0 + sample_translations = [] + completed_count = 0 + print(f"Starte parallele Verarbeitung: Batches a {batch_size}, {PARALLEL_WORKERS} Worker parallel...", flush=True) + + # Batches vorbereiten + batches = [rows[i:i + batch_size] for i in range(0, total, batch_size)] + semaphore = asyncio.Semaphore(PARALLEL_WORKERS) + + async def process_batch(batch): + async with semaphore: + return await translate_articles_batch(batch) + + # Tasks erstellen und in beliebiger Reihenfolge bearbeiten + tasks = [asyncio.create_task(process_batch(b)) for b in batches] + n_batches = len(batches) + + for completed_task in asyncio.as_completed(tasks): + try: + translations, batch_usage = await completed_task + except Exception as e: + print(f" Batch-Fehler: {e}", flush=True) + continue + usage.add(batch_usage) + translated += len(translations) + for t in translations: + hd = t.get("headline_de") + cd = t.get("content_de") + if hd: + hd, _ = normalize_german_umlauts(hd) + if cd: + cd, _ = normalize_german_umlauts(cd) + if hd or cd: + db.execute( + "UPDATE articles SET headline_de = COALESCE(?, headline_de), " + "content_de = COALESCE(?, content_de) WHERE id = ?", + (hd, cd, t["id"]), + ) + updated += 1 + if len(sample_translations) < 3: + sample_translations.append(t["id"]) + db.commit() # Per-Batch commit -> bei Abbruch kein Datenverlust + + completed_count += 1 + if completed_count % 20 == 0 or completed_count == n_batches: + print( + f"[{completed_count}/{n_batches} Batches | Updates={updated}/{total} | Cost=${usage.total_cost_usd:.2f}]", + flush=True, ) - updated += 1 - db.commit() print() print(f"=== Stats ===") - print(f" Updates: {updated}") - print(f" Calls: {usage.call_count}") - print(f" Input-Tokens: {usage.input_tokens:,}") - print(f" Output-Tokens: {usage.output_tokens:,}") - print(f" Cost gesamt: ${usage.total_cost_usd:.4f}") + print(f" Total betrachtet: {total}") + print(f" Translator OK: {translated}") + print(f" DB-Updates: {updated}") + print(f" Calls: {usage.call_count}") + print(f" Input-Tokens: {usage.input_tokens:,}") + print(f" Output-Tokens: {usage.output_tokens:,}") + print(f" Cost gesamt: ${usage.total_cost_usd:.4f}") print() print("=== Stichprobe (3 frische Uebersetzungen) ===") - sample_ids = [t["id"] for t in translations[:3]] - if sample_ids: - placeholders = ",".join("?" * len(sample_ids)) + if sample_translations: + placeholders = ",".join("?" * len(sample_translations)) for r in db.execute( f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})", - sample_ids, + sample_translations, ): d = dict(r) print(f" [{d['id']}]")