Dateien
AegisSight-Monitor/src/agents/translator.py
UserIsMH 379d14518c feat(multitenancy): Sprach-Whitelist + Translator-Override + Forum-Quellenklasse
Vorbereitung fuer jp_demo-Organisation: drei separate Sprach-Settings statt
einer einzigen output_language.

org_settings.py:
- get_source_language_whitelist: Liste erlaubter Quellsprachen als JSON-Array
  (z.B. ["ja"] beschraenkt RSS/Telegram auf japanische Quellen).
- get_research_language: Sprache fuer WebSearch-Prompts (Default: output_language).
- get_translator_enabled: Pro-Org-Override des globalen TRANSLATOR_ENABLED-Flags.
- LANGUAGE_DISPLAY_NAMES um ja/zh/ko/ru/ar/fa/he/fr/es erweitert.

source_rules.py:
- get_feeds_with_metadata filtert nach source_language_whitelist, wenn gesetzt.
- Feeds ohne primary_language fallen bei aktiver Whitelist raus (gewollt).
- SELECT um media_type erweitert, damit es im Feed-Dict ankommt.

orchestrator.py:
- Laedt research_language, source_language_whitelist, translator_enabled aus
  den Org-Settings.
- Wenn Whitelist gesetzt: international_sources-Flag wird ignoriert.
- research_language_iso wird an researcher.search() weitergegeben.
- translate_articles bekommt enabled-Parameter aus Org-Setting.
- Geoparsing ueberspringt media_type='forum' Artikel.
- SELECT * FROM articles wird zu JOIN sources, damit media_type beim Reload
  am Article-Dict haengt.

researcher.py:
- search() akzeptiert research_language_iso. Asymmetrische Sprach-Auswahl
  (Recherche != Output) erzeugt eigene Prompt-Anweisung "primaer in Quell-
  sprache, englische Region-Outlets erlaubt".

translator.py:
- translate_articles akzeptiert enabled-Parameter. Ueberschreibt die globale
  TRANSLATOR_ENABLED-Konstante pro Aufruf.

factchecker.py:
- _format_articles_text filtert Artikel mit media_type='forum' aus. Anonyme
  Foren-Posts gelten nicht als Faktenbeleg.

rss_parser.py:
- _fetch_feed traegt media_type aus feed_config ins Article-Dict ein,
  damit downstream Pipeline-Schritte Foren-Quellen erkennen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 00:12:56 +02:00

415 Zeilen
15 KiB
Python

"""Translator-Agent: uebersetzt fremdsprachige Artikel ins Deutsche.
Eigener Agent (separat vom Analyzer), damit Token-Limits nicht zwischen
Lagebild und Uebersetzung konkurrieren. Nutzt CLAUDE_MODEL_FAST (Haiku) in
Batches.
Aufgerufen vom Orchestrator nach analyzer.analyze() und vor post_refresh_qc.
Backfill-Skript nutzt dieselbe Funktion fuer rueckwirkendes Auffuellen.
"""
import json
import logging
import re
from agents.claude_client import call_claude, ClaudeUsage, UsageAccumulator
from config import CLAUDE_MODEL_FAST, TRANSLATOR_ENABLED
logger = logging.getLogger("osint.translator")
# Pro Batch nicht mehr als so viele Artikel an Claude geben.
# Bei Haiku ist das Output-Limit ca. 8k Tokens. Pro Artikel kommen leicht
# 400-600 Tokens raus (headline_de + content_de bis 1000 Zeichen). Bei 15
# wurde regelmaessig getrunkt (mid-JSON broken). 5 ist sicher mit Reserve.
DEFAULT_BATCH_SIZE = 5
# content_original wird ohnehin auf 1000 Zeichen gecappt (rss_parser).
# Fuer den Translator nochmal verkuerzen, falls vorhanden mehr.
CONTENT_INPUT_MAX = 1200
# content_de soll wie content_original auf 1000 Zeichen begrenzt sein.
CONTENT_OUTPUT_MAX = 1000
def _extract_complete_objects(text: str) -> list[dict]:
"""Extrahiert vollstaendige JSON-Objekte aus moeglicherweise abgeschnittenem Text.
Klammer-Counter-Ansatz: jedes balancierte {...} wird probiert.
"""
results = []
depth = 0
start = -1
in_string = False
escape = False
for i, ch in enumerate(text):
if escape:
escape = False
continue
if ch == "\\":
escape = True
continue
if ch == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
depth -= 1
if depth == 0 and start >= 0:
obj_text = text[start:i + 1]
try:
obj = json.loads(obj_text)
if isinstance(obj, dict):
results.append(obj)
except json.JSONDecodeError:
pass
start = -1
return results
def _build_prompt(articles: list[dict], output_lang: str = "de") -> str:
"""Bauen den Translation-Prompt fuer eine Batch."""
lang_label = {"de": "Deutsch", "en": "Englisch"}.get(output_lang, output_lang)
items = []
for a in articles:
items.append({
"id": a["id"],
"headline": a.get("headline", "") or "",
"content": (a.get("content_original") or "")[:CONTENT_INPUT_MAX],
"source_lang": a.get("language", "en"),
})
return f"""Du bist ein praeziser Uebersetzer fuer Nachrichten-Artikel.
Uebersetze die folgenden Artikel nach {lang_label}.
WICHTIG:
- Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) - NIEMALS Umschreibungen wie ae, oe, ue, ss.
Beispiele: "Gespraeche" -> "Gespräche", "Fuehrer" -> "Führer", "grosse" -> "große".
- Behalte Eigennamen (Personen, Orte, Organisationen) im Original.
- Headline kurz und buendig wie im Original.
- Content auf MAX {CONTENT_OUTPUT_MAX} Zeichen kuerzen, kein HTML, kein Markdown.
- Wenn der Artikel schon auf {lang_label} ist (z.B. source_lang="{output_lang}"),
kopiere headline und content unveraendert.
Antworte AUSSCHLIESSLICH mit einem flachen JSON-Array (kein Wrapper-Objekt!).
Format genau so:
[
{{"id": 1, "headline_de": "Titel auf Deutsch", "content_de": "Inhalt auf Deutsch"}},
{{"id": 2, "headline_de": "...", "content_de": "..."}}
]
NICHT erlaubt: {{"translations": [...]}} oder {{"items": [...]}} oder Markdown-Codefences.
Nur das Array, ohne Einleitung, ohne Erklaerung.
ARTIKEL:
{json.dumps(items, ensure_ascii=False, indent=2)}
"""
def _parse_response(text: str) -> list[dict]:
"""Robustes JSON-Array-Parsing.
Handhabt:
- reines JSON
- JSON in Markdown-Codefence ```json ... ```
- abgeschnittene Antworten (extrahiert vollstaendige Top-Level-Objekte)
"""
text = text.strip()
# Markdown-Codefence entfernen
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```\s*$", "", text)
text = text.strip()
try:
data = json.loads(text)
except json.JSONDecodeError:
# Erst Array versuchen
match = re.search(r"\[.*\]", text, re.DOTALL)
if match:
try:
data = json.loads(match.group(0))
except json.JSONDecodeError:
# Truncate-Fallback: einzelne Top-Level-Objekte extrahieren
data = _extract_complete_objects(text)
else:
data = _extract_complete_objects(text)
# Claude wraps das Array gelegentlich in {"translations": [...]} oder {"items": [...]}
if isinstance(data, dict):
for key in ("translations", "items", "results", "data"):
if isinstance(data.get(key), list):
data = data[key]
break
else:
# Einzelnes Objekt? Dann als Liste mit einem Element behandeln
if "id" in data:
data = [data]
else:
raise ValueError(f"Translator-Antwort: Dict ohne erwarteten Array-Key (keys={list(data.keys())[:5]})")
if not isinstance(data, list):
raise ValueError(f"Translator-Antwort ist kein Array: {type(data).__name__}")
cleaned = []
for item in data:
if not isinstance(item, dict):
continue
aid = item.get("id")
if not isinstance(aid, int):
try:
aid = int(aid)
except (TypeError, ValueError):
continue
cleaned.append({
"id": aid,
"headline_de": (item.get("headline_de") or "").strip() or None,
"content_de": (item.get("content_de") or "").strip() or None,
})
return cleaned
async def translate_articles_batch(
articles: list[dict],
output_lang: str = "de",
) -> tuple[list[dict], ClaudeUsage]:
"""Uebersetzt eine Batch von Artikeln.
Erwartet articles als Liste von Dicts mit den Feldern id, headline,
content_original, language.
Rueckgabe: (uebersetzte_artikel, usage)
Wenn der Call fehlschlaegt, wird ([], leere_usage) zurueckgegeben - der
Caller kann entscheiden, ob retry oder skip.
"""
if not articles:
return [], ClaudeUsage()
prompt = _build_prompt(articles, output_lang)
try:
result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
except Exception as e:
logger.error(f"Translator Claude-Call fehlgeschlagen: {e}")
return [], ClaudeUsage()
try:
translations = _parse_response(result_text)
except Exception as e:
logger.error(f"Translator JSON-Parsing fehlgeschlagen: {e}; raw: {result_text[:300]!r}")
return [], usage
# Validierung: nur Translations zurueckgeben, deren id wirklich
# in der angefragten Batch war
requested_ids = {a["id"] for a in articles}
valid = [t for t in translations if t["id"] in requested_ids]
if len(valid) != len(translations):
logger.warning(
"Translator: %d von %d Translations referenzieren unbekannte IDs",
len(translations) - len(valid), len(translations),
)
return valid, usage
# --- Pre-Topic-Filter: schmale Headline-Übersetzung -----------------------------
#
# Der Topic-Filter (analyzer.filter_relevant_articles) ist ein Haiku-Call, der pro
# Artikel beurteilt, ob er thematisch zur Lage passt. Bei fremdsprachigen Headlines
# (CJK/Arabisch/Hebräisch/Kyrillisch) bewertet Haiku konservativ und verwirft sie
# häufig, weil er sie nur halb versteht. Damit landeten z.B. die japanischen
# Ministeriums-Feeds (MOD, NHK, Asahi) in Lagen mit Japan-Bezug nie in der finalen
# Auswahl, obwohl der RSS-Match korrekt griff.
#
# Diese Funktion übersetzt einen einzelnen Batch-Call alle nicht-lateinischen
# Headlines + erste Content-Sätze ins Englische und hängt das Ergebnis als
# article["headline_en_for_topic"] / article["content_en_for_topic"] an. Der
# Topic-Filter zeigt das dem LLM zusätzlich zum Original.
#
# WICHTIG: Diese Mini-Übersetzung ist UNABHÄNGIG vom TRANSLATOR_ENABLED-Flag —
# sie wird auch dann gemacht, wenn der nachgelagerte Volltext-Translator
# deaktiviert ist (Pflicht für korrektes Topic-Filtering, sehr kleine Kosten).
_TOPIC_TRANSLATE_CONTENT_MAX = 500
def _needs_pretopic_translate(article: dict) -> bool:
"""Erkennt fremdsprachige Headlines, die für den Topic-Filter übersetzt
werden sollten.
Heuristik: Headline enthält Non-ASCII-Zeichen, die NICHT in den typischen
deutsch/franz./span./port./skand. Latin-1-Erweiterungen liegen.
Das sind v.a. CJK (Kanji/Kana/Hangul), Arabisch, Hebräisch, Kyrillisch,
Thai, Devanagari etc.
"""
headline = (article.get("headline_de") or article.get("headline") or "").strip()
if not headline:
return False
for ch in headline:
cp = ord(ch)
# Bereiche ausschließen, die in Latin-Schrift normal sind:
# ASCII (0-127), Latin-1 Supplement (128-255), Latin Extended-A/B (256-591)
if cp <= 591:
continue
# Alles darüber sind fremde Schriftsysteme → übersetzen
return True
return False
async def translate_headlines_for_topic_filter(
articles: list[dict],
target_lang: str = "en",
) -> tuple[int, ClaudeUsage]:
"""Übersetzt die Headlines fremdsprachiger Artikel ins Englische, damit der
nachgelagerte Topic-Filter (Haiku) sie zuverlässig beurteilen kann.
Setzt direkt auf den Artikel-Dicts:
article["headline_en_for_topic"]: str | None
article["content_en_for_topic"]: str | None
Returns:
(anzahl_übersetzt, ClaudeUsage)
"""
if not articles:
return 0, ClaudeUsage()
candidates = [a for a in articles if _needs_pretopic_translate(a)]
if not candidates:
return 0, ClaudeUsage()
# Eindeutige Indizes (auch wenn article kein "id"-Feld hat, weil noch nicht
# in der DB): wir nutzen die Position in der gesamten articles-Liste.
idx_by_obj = {id(a): i for i, a in enumerate(articles)}
items = []
for a in candidates:
idx = idx_by_obj.get(id(a))
if idx is None:
continue
headline = (a.get("headline_de") or a.get("headline") or "").strip()
content_src = (a.get("content_de") or a.get("content_original") or "")
items.append({
"i": idx,
"h": headline[:200],
"c": content_src[:_TOPIC_TRANSLATE_CONTENT_MAX],
})
if not items:
return 0, ClaudeUsage()
lang_label = {"en": "English", "de": "German"}.get(target_lang, target_lang)
prompt = f"""Translate these news headlines and short content snippets to {lang_label}.
Keep proper names (people, organizations, places) untouched. Keep it concise; the goal
is to let another model judge topical relevance, not to publish.
Return ONLY a JSON array. Each item: {{"i": <index>, "h": <headline in {lang_label}>, "c": <content snippet in {lang_label}>}}.
Keep the same "i" values. No prose, no markdown fences.
INPUT:
{json.dumps(items, ensure_ascii=False)}
"""
try:
result_text, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
except Exception as e:
logger.warning(f"Pre-Topic-Translate Claude-Call fehlgeschlagen: {e}")
return 0, ClaudeUsage()
# Robustes Parsing (Markdown-Codefence + nacktes Array)
text = result_text.strip()
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```\s*$", "", text)
text = text.strip()
try:
data = json.loads(text)
except json.JSONDecodeError:
m = re.search(r"\[.*\]", text, re.DOTALL)
if not m:
logger.warning(
f"Pre-Topic-Translate: kein JSON-Array in Antwort. Sample: {text[:200]!r}"
)
return 0, usage
try:
data = json.loads(m.group(0))
except json.JSONDecodeError:
data = _extract_complete_objects(text)
if not isinstance(data, list):
logger.warning(
f"Pre-Topic-Translate: Antwort ist kein Array ({type(data).__name__})"
)
return 0, usage
applied = 0
for entry in data:
if not isinstance(entry, dict):
continue
idx = entry.get("i")
if not isinstance(idx, int) or not (0 <= idx < len(articles)):
try:
idx = int(idx)
if not (0 <= idx < len(articles)):
continue
except (TypeError, ValueError):
continue
h = (entry.get("h") or "").strip() or None
c = (entry.get("c") or "").strip() or None
if h:
articles[idx]["headline_en_for_topic"] = h
if c:
articles[idx]["content_en_for_topic"] = c
if h or c:
applied += 1
return applied, usage
async def translate_articles(
articles: list[dict],
output_lang: str = "de",
batch_size: int = DEFAULT_BATCH_SIZE,
usage_accumulator: UsageAccumulator | None = None,
enabled: bool | None = None,
) -> list[dict]:
"""Uebersetzt eine beliebige Anzahl Artikel in Batches.
Bringt die Batches durch Logik in `translate_articles_batch` und gibt
EINE flache Liste der Translations zurueck. Wenn ein Batch fehlschlaegt,
wird er uebersprungen (anderer Batches laufen weiter).
enabled: Pro-Aufruf-Override des globalen TRANSLATOR_ENABLED-Flags. Wenn None,
greift das Modul-Default (config.TRANSLATOR_ENABLED, abgeleitet aus .env).
Der Orchestrator setzt das aus dem Org-Setting 'translator_enabled', damit
jp_demo (Translator zwingend an) trotz global deaktiviertem Flag funktioniert.
"""
if not articles:
return []
is_enabled = TRANSLATOR_ENABLED if enabled is None else bool(enabled)
if not is_enabled:
logger.info(
"Translator deaktiviert (enabled=%s, global TRANSLATOR_ENABLED=%s), %d Artikel uebersprungen",
enabled, TRANSLATOR_ENABLED, len(articles),
)
return []
all_translations = []
for i in range(0, len(articles), batch_size):
batch = articles[i : i + batch_size]
translations, usage = await translate_articles_batch(batch, output_lang)
if usage_accumulator is not None:
usage_accumulator.add(usage)
all_translations.extend(translations)
logger.info(
"Translator-Batch %d/%d: %d/%d uebersetzt (cost=$%.4f)",
(i // batch_size) + 1,
(len(articles) + batch_size - 1) // batch_size,
len(translations), len(batch),
usage.cost_usd,
)
return all_translations