diff --git a/src/agents/analyzer.py b/src/agents/analyzer.py index 6c00f7b..8a067af 100644 --- a/src/agents/analyzer.py +++ b/src/agents/analyzer.py @@ -47,7 +47,6 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Zusammenfassung auf {output_language} mit Quellenverweisen [1], [2] etc. im Text (Markdown-Überschriften ## erlaubt wenn sinnvoll, aber KEINE "## ZUSAMMENFASSUNG"/"## ÜBERBLICK"-Sektion) - "sources": Array von Quellenobjekten, je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} - "key_facts": Array von bestätigten Kernfakten (Strings, in Ausgabesprache) -- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" @@ -102,7 +101,6 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Das strukturierte Briefing als Markdown-Text mit Quellenverweisen [1], [2] etc. - "sources": Array von Quellenobjekten, je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} - "key_facts": Array von gesicherten Kernfakten (Strings, in Ausgabesprache) -- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" @@ -149,7 +147,6 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Aktualisierte Zusammenfassung mit Quellenverweisen [1], [2] etc. - "sources": Array mit NUR den NEUEN Quellen aus den neuen Meldungen, je: {{"nr": , "name": "Quellenname", "url": "https://..."}}. Alte Quellen werden automatisch gemerged. - "key_facts": Array aller aktuellen Kernfakten (in Ausgabesprache) -- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" @@ -201,7 +198,6 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Das aktualisierte Briefing als Markdown-Text mit Quellenverweisen - "sources": Array mit NUR den NEUEN Quellen aus den neuen Meldungen, je: {{"nr": , "name": "Quellenname", "url": "https://..."}}. Alte Quellen werden automatisch gemerged. - "key_facts": Array aller gesicherten Kernfakten (in Ausgabesprache) -- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" @@ -796,5 +792,5 @@ class AnalyzerAgent: except json.JSONDecodeError: pass - return {"summary": summary, "sources": sources, "key_facts": [], "translations": []} + return {"summary": summary, "sources": sources, "key_facts": []} diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index a1fa096..16fd3a5 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -1410,30 +1410,64 @@ class AgentOrchestrator: snap_articles, snap_fcs, log_id, now, tenant_id), ) - # Übersetzungen aktualisieren (nur für gültige DB-IDs) - # LLM-Drift abfangen: trotz Prompt-Anweisung kommen manchmal - # ASCII-Umlaute ("Gespraeche" statt "Gespräche") in der Übersetzung. - # Dictionary-basierte Korrektur schreibt nur deutsche Woerter um. - from services.post_refresh_qc import normalize_german_umlauts as _norm_de - for translation in analysis.get("translations", []): - article_id = translation.get("article_id") - if isinstance(article_id, int): - hd = translation.get("headline_de") - cd = translation.get("content_de") - if hd: - hd, _ = _norm_de(hd) - if cd: - cd, _ = _norm_de(cd) - await db.execute( - "UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ? AND incident_id = ?", - (hd, cd, article_id, incident_id), - ) + # Translations werden vom dedizierten Translator-Agent unten + # erzeugt (frueher inline im Analyzer-Output, das war token- + # instabil und schaetzte regelmaessig content_de aus). await db.commit() # Cancel-Check nach paralleler Verarbeitung self._check_cancelled(incident_id) + # --- Translator (Haiku) fuer fremdsprachige Artikel ohne DE-Texte --- + # Idempotent: nur Artikel ohne headline_de/content_de werden geholt. + # Lauft nach der Analyse (Lagebild ist schon committed) und vor QC + # (damit normalize_umlaut_articles auch die frischen DE-Texte fasst). + try: + tr_cursor = await db.execute( + """SELECT id, headline, content_original, language + FROM articles + WHERE incident_id = ? + AND language IS NOT NULL AND LOWER(language) != 'de' + AND (headline_de IS NULL OR headline_de = '' + OR content_de IS NULL OR content_de = '')""", + (incident_id,), + ) + pending_translations = [dict(r) for r in await tr_cursor.fetchall()] + if pending_translations: + logger.info( + "Translator fuer Incident %d: %d Artikel ohne DE-Uebersetzung", + incident_id, len(pending_translations), + ) + from agents.translator import translate_articles + from services.post_refresh_qc import normalize_german_umlauts as _norm_de2 + translations = await translate_articles( + pending_translations, + output_lang="de", + usage_accumulator=usage_acc, + ) + for t in translations: + hd = t.get("headline_de") + cd = t.get("content_de") + if hd: + hd, _ = _norm_de2(hd) + if cd: + cd, _ = _norm_de2(cd) + if hd or cd: + await db.execute( + "UPDATE articles SET headline_de = COALESCE(?, headline_de), " + "content_de = COALESCE(?, content_de) WHERE id = ? AND incident_id = ?", + (hd, cd, t["id"], incident_id), + ) + await db.commit() + logger.info( + "Translator fuer Incident %d: %d/%d Artikel uebersetzt", + incident_id, len(translations), len(pending_translations), + ) + except Exception as e: + logger.error("Translator-Fehler fuer Incident %d: %s", incident_id, e, exc_info=True) + # Refresh trotz Translator-Fehler weiterlaufen lassen + # --- Neueste Entwicklungen (nur Live-Monitoring / adhoc) --- # Basis ist jetzt das frisch generierte Lagebild (autoritativ, thematisch sauber). # Zeitstempel und Quellen kommen aus den jüngsten belegenden Artikeln. diff --git a/src/agents/translator.py b/src/agents/translator.py new file mode 100644 index 0000000..f877f8c --- /dev/null +++ b/src/agents/translator.py @@ -0,0 +1,229 @@ +"""Translator-Agent: uebersetzt fremdsprachige Artikel ins Deutsche. + +Eigener Agent (separat vom Analyzer), damit Token-Limits nicht zwischen +Lagebild und Uebersetzung konkurrieren. Nutzt CLAUDE_MODEL_FAST (Haiku) in +Batches. + +Aufgerufen vom Orchestrator nach analyzer.analyze() und vor post_refresh_qc. +Backfill-Skript nutzt dieselbe Funktion fuer rueckwirkendes Auffuellen. +""" +import json +import logging +import re + +from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator +from config import CLAUDE_MODEL_FAST + +logger = logging.getLogger("osint.translator") + +# Pro Batch nicht mehr als so viele Artikel an Claude geben. +# Bei Haiku ist das Output-Limit ca. 8k Tokens. Pro Artikel kommen leicht +# 400-600 Tokens raus (headline_de + content_de bis 1000 Zeichen). Bei 15 +# wurde regelmaessig getrunkt (mid-JSON broken). 5 ist sicher mit Reserve. +DEFAULT_BATCH_SIZE = 5 + +# content_original wird ohnehin auf 1000 Zeichen gecappt (rss_parser). +# Fuer den Translator nochmal verkuerzen, falls vorhanden mehr. +CONTENT_INPUT_MAX = 1200 + +# content_de soll wie content_original auf 1000 Zeichen begrenzt sein. +CONTENT_OUTPUT_MAX = 1000 + + +def _extract_complete_objects(text: str) -> list[dict]: + """Extrahiert vollstaendige JSON-Objekte aus moeglicherweise abgeschnittenem Text. + + Klammer-Counter-Ansatz: jedes balancierte {...} wird probiert. + """ + results = [] + depth = 0 + start = -1 + in_string = False + escape = False + for i, ch in enumerate(text): + if escape: + escape = False + continue + if ch == "\\": + escape = True + continue + if ch == '"' and not escape: + in_string = not in_string + continue + if in_string: + continue + if ch == "{": + if depth == 0: + start = i + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0 and start >= 0: + obj_text = text[start:i + 1] + try: + obj = json.loads(obj_text) + if isinstance(obj, dict): + results.append(obj) + except json.JSONDecodeError: + pass + start = -1 + return results + + +def _build_prompt(articles: list[dict], output_lang: str = "de") -> str: + """Bauen den Translation-Prompt fuer eine Batch.""" + lang_label = {"de": "Deutsch", "en": "Englisch"}.get(output_lang, output_lang) + + items = [] + for a in articles: + items.append({ + "id": a["id"], + "headline": a.get("headline", "") or "", + "content": (a.get("content_original") or "")[:CONTENT_INPUT_MAX], + "source_lang": a.get("language", "en"), + }) + + return f"""Du bist ein praeziser Uebersetzer fuer Nachrichten-Artikel. +Uebersetze die folgenden Artikel nach {lang_label}. + +WICHTIG: +- Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) - NIEMALS Umschreibungen wie ae, oe, ue, ss. + Beispiele: "Gespraeche" -> "Gespräche", "Fuehrer" -> "Führer", "grosse" -> "große". +- Behalte Eigennamen (Personen, Orte, Organisationen) im Original. +- Headline kurz und buendig wie im Original. +- Content auf MAX {CONTENT_OUTPUT_MAX} Zeichen kuerzen, kein HTML, kein Markdown. +- Wenn der Artikel schon auf {lang_label} ist (z.B. source_lang="{output_lang}"), + kopiere headline und content unveraendert. + +Antworte AUSSCHLIESSLICH als JSON-Array - eine Liste von Objekten in der Form: +[{{"id": , "headline_de": "", "content_de": ""}}, ...] + +Keine Einleitung, keine Erklaerung, nur das JSON-Array. + +ARTIKEL: +{json.dumps(items, ensure_ascii=False, indent=2)} +""" + + +def _parse_response(text: str) -> list[dict]: + """Robustes JSON-Array-Parsing. + + Handhabt: + - reines JSON + - JSON in Markdown-Codefence ```json ... ``` + - abgeschnittene Antworten (extrahiert vollstaendige Top-Level-Objekte) + """ + text = text.strip() + # Markdown-Codefence entfernen + if text.startswith("```"): + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```\s*$", "", text) + text = text.strip() + + try: + data = json.loads(text) + except json.JSONDecodeError: + # Erst Array versuchen + match = re.search(r"\[.*\]", text, re.DOTALL) + if match: + try: + data = json.loads(match.group(0)) + except json.JSONDecodeError: + # Truncate-Fallback: einzelne Top-Level-Objekte extrahieren + data = _extract_complete_objects(text) + else: + data = _extract_complete_objects(text) + + if not isinstance(data, list): + raise ValueError(f"Translator-Antwort ist kein Array: {type(data).__name__}") + + cleaned = [] + for item in data: + if not isinstance(item, dict): + continue + aid = item.get("id") + if not isinstance(aid, int): + try: + aid = int(aid) + except (TypeError, ValueError): + continue + cleaned.append({ + "id": aid, + "headline_de": (item.get("headline_de") or "").strip() or None, + "content_de": (item.get("content_de") or "").strip() or None, + }) + return cleaned + + +async def translate_articles_batch( + articles: list[dict], + output_lang: str = "de", +) -> tuple[list[dict], ClaudeUsage]: + """Uebersetzt eine Batch von Artikeln. + + Erwartet articles als Liste von Dicts mit den Feldern id, headline, + content_original, language. + + Rueckgabe: (uebersetzte_artikel, usage) + Wenn der Call fehlschlaegt, wird ([], leere_usage) zurueckgegeben - der + Caller kann entscheiden, ob retry oder skip. + """ + if not articles: + return [], ClaudeUsage() + + prompt = _build_prompt(articles, output_lang) + + try: + result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + except Exception as e: + logger.error(f"Translator Claude-Call fehlgeschlagen: {e}") + return [], ClaudeUsage() + + try: + translations = _parse_response(result_text) + except Exception as e: + logger.error(f"Translator JSON-Parsing fehlgeschlagen: {e}; raw: {result_text[:300]!r}") + return [], usage + + # Validierung: nur Translations zurueckgeben, deren id wirklich + # in der angefragten Batch war + requested_ids = {a["id"] for a in articles} + valid = [t for t in translations if t["id"] in requested_ids] + if len(valid) != len(translations): + logger.warning( + "Translator: %d von %d Translations referenzieren unbekannte IDs", + len(translations) - len(valid), len(translations), + ) + return valid, usage + + +async def translate_articles( + articles: list[dict], + output_lang: str = "de", + batch_size: int = DEFAULT_BATCH_SIZE, + usage_accumulator: UsageAccumulator | None = None, +) -> list[dict]: + """Uebersetzt eine beliebige Anzahl Artikel in Batches. + + Bringt die Batches durch Logik in `translate_articles_batch` und gibt + EINE flache Liste der Translations zurueck. Wenn ein Batch fehlschlaegt, + wird er uebersprungen (anderer Batches laufen weiter). + """ + if not articles: + return [] + + all_translations = [] + for i in range(0, len(articles), batch_size): + batch = articles[i : i + batch_size] + translations, usage = await translate_articles_batch(batch, output_lang) + if usage_accumulator is not None: + usage_accumulator.add(usage) + all_translations.extend(translations) + logger.info( + "Translator-Batch %d/%d: %d/%d uebersetzt (cost=$%.4f)", + (i // batch_size) + 1, + (len(articles) + batch_size - 1) // batch_size, + len(translations), len(batch), + usage.cost_usd, + ) + return all_translations