Migration: parallele translate-Batches + busy_timeout/WAL

- asyncio.Semaphore(4) + as_completed: 4 Worker parallel statt sequenziell
- Per-Batch commit: kein Datenverlust bei Abbruch
- sqlite3 timeout=60 + PRAGMA busy_timeout=60000 + journal_mode=WAL: kein Crash bei aktivem Live-Write-Lock
- Bessere Progress-Logs (alle 20 Batches)
Dieser Commit ist enthalten in:
claude-dev
2026-05-09 01:32:51 +00:00
Ursprung e31536f8f9
Commit 670a6617a7

Datei anzeigen

@@ -23,19 +23,23 @@ from datetime import datetime
# Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback) # Pfade fuer Imports (Live-Repo bevorzugt, Staging-Fallback)
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src") sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
try: try:
from agents.translator import translate_articles from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
from agents.claude_client import UsageAccumulator from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts from services.post_refresh_qc import normalize_german_umlauts
except ImportError: except ImportError:
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src") sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src")
from agents.translator import translate_articles from agents.translator import translate_articles_batch, DEFAULT_BATCH_SIZE
from agents.claude_client import UsageAccumulator from agents.claude_client import UsageAccumulator
from services.post_refresh_qc import normalize_german_umlauts from services.post_refresh_qc import normalize_german_umlauts
async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int: async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
db = sqlite3.connect(db_path) db = sqlite3.connect(db_path, timeout=60)
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
# Live-Service haelt regelmaessig den Write-Lock. Statt sofort zu crashen
# warten wir bis zu 60 Sekunden auf den Lock.
db.execute("PRAGMA busy_timeout = 60000")
db.execute("PRAGMA journal_mode = WAL")
sql = """SELECT id, incident_id, headline, content_original, language sql = """SELECT id, incident_id, headline, content_original, language
FROM articles FROM articles
@@ -60,11 +64,35 @@ async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
return 0 return 0
usage = UsageAccumulator() usage = UsageAccumulator()
translations = await translate_articles(rows, output_lang="de", total = len(rows)
usage_accumulator=usage) batch_size = DEFAULT_BATCH_SIZE
print(f"Uebersetzt: {len(translations)} von {len(rows)}") PARALLEL_WORKERS = 4
updated = 0 updated = 0
translated = 0
sample_translations = []
completed_count = 0
print(f"Starte parallele Verarbeitung: Batches a {batch_size}, {PARALLEL_WORKERS} Worker parallel...", flush=True)
# Batches vorbereiten
batches = [rows[i:i + batch_size] for i in range(0, total, batch_size)]
semaphore = asyncio.Semaphore(PARALLEL_WORKERS)
async def process_batch(batch):
async with semaphore:
return await translate_articles_batch(batch)
# Tasks erstellen und in beliebiger Reihenfolge bearbeiten
tasks = [asyncio.create_task(process_batch(b)) for b in batches]
n_batches = len(batches)
for completed_task in asyncio.as_completed(tasks):
try:
translations, batch_usage = await completed_task
except Exception as e:
print(f" Batch-Fehler: {e}", flush=True)
continue
usage.add(batch_usage)
translated += len(translations)
for t in translations: for t in translations:
hd = t.get("headline_de") hd = t.get("headline_de")
cd = t.get("content_de") cd = t.get("content_de")
@@ -79,23 +107,33 @@ async def main_async(db_path: str, dry_run: bool, limit: int | None) -> int:
(hd, cd, t["id"]), (hd, cd, t["id"]),
) )
updated += 1 updated += 1
db.commit() if len(sample_translations) < 3:
sample_translations.append(t["id"])
db.commit() # Per-Batch commit -> bei Abbruch kein Datenverlust
completed_count += 1
if completed_count % 20 == 0 or completed_count == n_batches:
print(
f"[{completed_count}/{n_batches} Batches | Updates={updated}/{total} | Cost=${usage.total_cost_usd:.2f}]",
flush=True,
)
print() print()
print(f"=== Stats ===") print(f"=== Stats ===")
print(f" Updates: {updated}") print(f" Total betrachtet: {total}")
print(f" Translator OK: {translated}")
print(f" DB-Updates: {updated}")
print(f" Calls: {usage.call_count}") print(f" Calls: {usage.call_count}")
print(f" Input-Tokens: {usage.input_tokens:,}") print(f" Input-Tokens: {usage.input_tokens:,}")
print(f" Output-Tokens: {usage.output_tokens:,}") print(f" Output-Tokens: {usage.output_tokens:,}")
print(f" Cost gesamt: ${usage.total_cost_usd:.4f}") print(f" Cost gesamt: ${usage.total_cost_usd:.4f}")
print() print()
print("=== Stichprobe (3 frische Uebersetzungen) ===") print("=== Stichprobe (3 frische Uebersetzungen) ===")
sample_ids = [t["id"] for t in translations[:3]] if sample_translations:
if sample_ids: placeholders = ",".join("?" * len(sample_translations))
placeholders = ",".join("?" * len(sample_ids))
for r in db.execute( for r in db.execute(
f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})", f"SELECT id, headline, headline_de FROM articles WHERE id IN ({placeholders})",
sample_ids, sample_translations,
): ):
d = dict(r) d = dict(r)
print(f" [{d['id']}]") print(f" [{d['id']}]")