Dateien
AegisSight-Monitor-Verwaltung/migrations/migrate_umlauts_2026-05-03.py
claude-dev 5f96c8f3dd Backfill-Migration: ASCII-Umlaute in articles korrigieren
Idempotente Migration mit --db Parameter, Backup vor Lauf, Sample-Output.
Behandelt headline_de + content_de bei allen Artikeln; bei language=de
zusaetzlich headline + content_original. Nutzt das gleiche hunspell-Dict
wie der Live-QC.
2026-05-02 23:26:27 +00:00

128 Zeilen
4.2 KiB
Python

"""Backfill 2026-05-03: ASCII-Umlaute in articles korrigieren.
Ursache: Zwei verschiedene Pfade lassen ASCII-Umlaute in der DB landen:
1. Quellen wie dpa-AFX/finanznachrichten.de oder Telegram-Kanaele liefern
Headlines schon als "Gespraeche" statt "Gespräche".
2. LLM-Uebersetzungen (analyzer.py / orchestrator.py) erzeugen gelegentlich
ASCII-Umlaute in headline_de/content_de trotz Prompt-Anweisung.
Diese Migration normalisiert rueckwirkend alle articles per
normalize_german_umlauts (hunspell-Dict mit ~150k deutschen Woertern).
Felder pro Artikel:
- headline_de, content_de (immer)
- headline, content_original (nur wenn language='de')
Idempotent: nur Aenderungen werden geschrieben.
"""
import argparse
import shutil
import sqlite3
import sys
from datetime import datetime
# normalize_german_umlauts aus dem Monitor-Repo importieren
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
try:
from services.post_refresh_qc import normalize_german_umlauts
except ImportError:
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor-staging/src")
from services.post_refresh_qc import normalize_german_umlauts
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True)
ap.add_argument("--no-backup", action="store_true")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
if not args.no_backup and not args.dry_run:
ts = datetime.now().strftime("%Y-%m-%d-%H%M")
backup_path = f"{args.db}.umlaut-bak-{ts}"
shutil.copy2(args.db, backup_path)
print(f"Backup angelegt: {backup_path}")
db = sqlite3.connect(args.db)
db.row_factory = sqlite3.Row
cur = db.execute(
"""SELECT id, language, headline, headline_de, content_original, content_de
FROM articles"""
)
rows = cur.fetchall()
print(f"Artikel gesamt: {len(rows)}")
affected = 0
total_replacements = 0
by_field = {"headline": 0, "headline_de": 0, "content_original": 0, "content_de": 0}
samples = []
for r in rows:
rid = r["id"]
is_de = (r["language"] or "").lower() == "de"
updates = {}
if r["headline_de"]:
new, n = normalize_german_umlauts(r["headline_de"])
if n > 0:
updates["headline_de"] = new
by_field["headline_de"] += n
total_replacements += n
if r["content_de"]:
new, n = normalize_german_umlauts(r["content_de"])
if n > 0:
updates["content_de"] = new
by_field["content_de"] += n
total_replacements += n
if is_de:
if r["headline"]:
new, n = normalize_german_umlauts(r["headline"])
if n > 0:
updates["headline"] = new
by_field["headline"] += n
total_replacements += n
if r["content_original"]:
new, n = normalize_german_umlauts(r["content_original"])
if n > 0:
updates["content_original"] = new
by_field["content_original"] += n
total_replacements += n
if updates:
affected += 1
if len(samples) < 5:
# Sample fuer Bericht
first_field = next(iter(updates))
old_val = r[first_field]
new_val = updates[first_field]
samples.append((rid, first_field, old_val[:120], new_val[:120]))
if not args.dry_run:
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [rid]
db.execute(f"UPDATE articles SET {set_clause} WHERE id = ?", values)
if not args.dry_run:
db.commit()
print()
print(f"Betroffene Artikel: {affected}")
print(f"Gesamt-Wortersetzungen: {total_replacements}")
for k, v in by_field.items():
print(f" {k}: {v}")
print()
print("=== Sample (vorher -> nachher) ===")
for rid, field, old, new in samples:
print(f" [{rid}] {field}:")
print(f" BEFORE: {old!r}")
print(f" AFTER: {new!r}")
db.close()
print()
print("Fertig.")
return 0
if __name__ == "__main__":
sys.exit(main())