feat(multitenancy): Sprach-Whitelist + Translator-Override + Forum-Quellenklasse

Vorbereitung fuer jp_demo-Organisation: drei separate Sprach-Settings statt
einer einzigen output_language.

org_settings.py:
- get_source_language_whitelist: Liste erlaubter Quellsprachen als JSON-Array
  (z.B. ["ja"] beschraenkt RSS/Telegram auf japanische Quellen).
- get_research_language: Sprache fuer WebSearch-Prompts (Default: output_language).
- get_translator_enabled: Pro-Org-Override des globalen TRANSLATOR_ENABLED-Flags.
- LANGUAGE_DISPLAY_NAMES um ja/zh/ko/ru/ar/fa/he/fr/es erweitert.

source_rules.py:
- get_feeds_with_metadata filtert nach source_language_whitelist, wenn gesetzt.
- Feeds ohne primary_language fallen bei aktiver Whitelist raus (gewollt).
- SELECT um media_type erweitert, damit es im Feed-Dict ankommt.

orchestrator.py:
- Laedt research_language, source_language_whitelist, translator_enabled aus
  den Org-Settings.
- Wenn Whitelist gesetzt: international_sources-Flag wird ignoriert.
- research_language_iso wird an researcher.search() weitergegeben.
- translate_articles bekommt enabled-Parameter aus Org-Setting.
- Geoparsing ueberspringt media_type='forum' Artikel.
- SELECT * FROM articles wird zu JOIN sources, damit media_type beim Reload
  am Article-Dict haengt.

researcher.py:
- search() akzeptiert research_language_iso. Asymmetrische Sprach-Auswahl
  (Recherche != Output) erzeugt eigene Prompt-Anweisung "primaer in Quell-
  sprache, englische Region-Outlets erlaubt".

translator.py:
- translate_articles akzeptiert enabled-Parameter. Ueberschreibt die globale
  TRANSLATOR_ENABLED-Konstante pro Aufruf.

factchecker.py:
- _format_articles_text filtert Artikel mit media_type='forum' aus. Anonyme
  Foren-Posts gelten nicht als Faktenbeleg.

rss_parser.py:
- _fetch_feed traegt media_type aus feed_config ins Article-Dict ein,
  damit downstream Pipeline-Schritte Foren-Quellen erkennen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
2026-05-22 00:12:56 +02:00
Ursprung 75038939b4
Commit 379d14518c
7 geänderte Dateien mit 226 neuen und 20 gelöschten Zeilen

Datei anzeigen

@@ -431,9 +431,27 @@ class FactCheckerAgent:
"""Prüft Fakten über Claude CLI gegen unabhängige Quellen."""
def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str:
"""Formatiert Artikel als Text für den Prompt."""
"""Formatiert Artikel als Text für den Prompt.
Foren-Quellen (media_type='forum', z.B. 5ch/Hatena/Note) werden hier
ausgeschlossen — sie sind Stimmungsmaterial, kein Faktenbeleg. Ein
anonymer Forenpost darf nicht als "Quelle bestaetigt Behauptung X"
gelten.
"""
# Falls media_type am Dict vorhanden ist, Foren-Quellen ausfiltern.
# Bei Article-Dicts aus dem RSS-/Pre-Topic-Pfad ist das Feld gesetzt;
# bei Reload aus der DB muss der Orchestrator das per JOIN annotieren.
non_forum = [a for a in articles if (a.get("media_type") or "").lower() != "forum"]
skipped = len(articles) - len(non_forum)
if skipped > 0:
logger.info(
"Faktencheck: %d Foren-Quellen (media_type='forum') ausgeschlossen, "
"%d Artikel als Faktenbeleg-Kandidaten",
skipped, len(non_forum),
)
articles_text = ""
for i, article in enumerate(articles[:max_articles]):
for i, article in enumerate(non_forum[:max_articles]):
articles_text += f"\n--- Meldung {i+1} ---\n"
articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n"
source_url = article.get('source_url', '')

Datei anzeigen

@@ -744,14 +744,42 @@ class AgentOrchestrator:
description = incident["description"] or ""
incident_type = incident["type"] or "adhoc"
international = bool(incident["international_sources"]) if "international_sources" in incident.keys() else True
# Wenn die Org eine Sprach-Whitelist gesetzt hat, ist 'international' bedeutungslos —
# die Whitelist gewinnt. Wir setzen 'international' auf True, damit der nachgelagerte
# Code alle (durch Whitelist gefilterten) Feeds in Betracht zieht. Tatsaechliche
# Einschraenkung passiert in get_feeds_with_metadata.
# Hinweis: source_lang_whitelist wird weiter unten geladen.
include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False
visibility = incident["visibility"] if "visibility" in incident.keys() else "public"
created_by = incident["created_by"] if "created_by" in incident.keys() else None
tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None
# Org-Sprache fuer alle KI-Agenten (Lagebild, Faktencheck, Recherche)
from services.org_settings import get_org_language, language_display
from services.org_settings import (
get_org_language, language_display, get_research_language,
get_source_language_whitelist, get_translator_enabled,
)
output_language_iso = await get_org_language(db, tenant_id) if tenant_id else "de"
output_language = language_display(output_language_iso)
# research_language steuert nur den WebSearch-Prompt ("suche in Sprache X").
# Default = output_language_iso. Bei jp_demo wird das auf 'ja' gesetzt, waehrend
# output_language_iso 'de' bleibt (Lagebild auf Deutsch, Recherche auf Japanisch).
research_language_iso = await get_research_language(db, tenant_id) if tenant_id else output_language_iso
# source_language_whitelist schraenkt RSS-/Telegram-Quellenpool ein (z.B. ['ja']).
# Wenn gesetzt, wird das incident-level Flag international_sources ignoriert
# (Whitelist ist explizit, das Flag ist Default-Verhalten).
source_lang_whitelist = await get_source_language_whitelist(db, tenant_id) if tenant_id else None
# Pro-Org-Override des globalen TRANSLATOR_ENABLED-Flags.
translator_enabled = await get_translator_enabled(db, tenant_id)
# Whitelist gewinnt ueber das incident-Flag international_sources:
# wenn die Org eine Sprach-Whitelist hat, sind alle gewaehlten Feeds
# ohnehin "Wunsch-Sprache" — kein Splitting in primary/international noetig.
if source_lang_whitelist:
international = True
logger.info(
"Org %s hat source_language_whitelist=%s gesetzt; "
"incident.international_sources wird ignoriert",
tenant_id, source_lang_whitelist,
)
previous_summary = incident["summary"] or ""
previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None
previous_developments = incident["latest_developments"] if "latest_developments" in incident.keys() else None
@@ -936,6 +964,7 @@ class AgentOrchestrator:
preferred_sources=preferred_sources,
output_language=output_language,
output_language_iso=output_language_iso,
research_language_iso=research_language_iso,
)
logger.info(
f"Claude-Recherche: {len(results)} Ergebnisse"
@@ -1209,14 +1238,25 @@ class AgentOrchestrator:
await db.commit()
# Geoparsing: Orte aus neuen Artikeln extrahieren und speichern
if new_articles_for_analysis:
# Foren-Quellen (media_type='forum') ausschliessen: 5ch/Hatena/Note-Posts haben
# keinen eigenen, fuer das Lagebild interessanten geographischen Bezug; spart Haiku-Calls.
articles_for_geoparsing = [
a for a in new_articles_for_analysis
if (a.get("media_type") or "").lower() != "forum"
]
if new_articles_for_analysis and not articles_for_geoparsing:
logger.info(
"Geoparsing uebersprungen: alle %d neuen Artikel sind Forum-Quellen",
len(new_articles_for_analysis),
)
if articles_for_geoparsing:
# Pipeline-Schritt 5: Orte erkennen (Start)
await _pipe_start("geoparsing")
try:
from agents.geoparsing import geoparse_articles
incident_context = f"{title} - {description}"
logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...")
geo_results, category_labels = await geoparse_articles(new_articles_for_analysis, incident_context)
logger.info(f"Geoparsing fuer {len(articles_for_geoparsing)} neue Artikel (Foren ausgeschlossen)...")
geo_results, category_labels = await geoparse_articles(articles_for_geoparsing, incident_context)
geo_count = 0
for art_id, locations in geo_results.items():
for loc in locations:
@@ -1294,7 +1334,12 @@ class AgentOrchestrator:
all_articles_preloaded = None
if not previous_summary or new_count == 0 or not existing_facts:
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
# JOIN auf sources, damit media_type pro Artikel verfuegbar ist
# (Faktencheck schliesst Foren-Quellen aus, das Stimmungs-Modul nimmt
# nur diese). Bei Quellen ohne Match in sources bleibt media_type NULL.
"SELECT a.*, s.media_type AS media_type FROM articles a "
"LEFT JOIN sources s ON s.name = a.source "
"WHERE a.incident_id = ? ORDER BY a.collected_at DESC",
(incident_id,),
)
all_articles_preloaded = [dict(row) for row in await cursor.fetchall()]
@@ -1582,8 +1627,9 @@ class AgentOrchestrator:
from services.post_refresh_qc import normalize_german_umlauts as _norm_de2
translations = await translate_articles(
pending_translations,
output_lang="de",
output_lang=output_language_iso,
usage_accumulator=usage_acc,
enabled=translator_enabled,
)
for t in translations:
hd = t.get("headline_de")

Datei anzeigen

@@ -562,14 +562,27 @@ class ResearcherAgent:
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
return None, None
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None, output_language: str = "Deutsch", output_language_iso: str = "de") -> tuple[list[dict], ClaudeUsage | None, bool]:
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None, output_language: str = "Deutsch", output_language_iso: str = "de", research_language_iso: str | None = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
"""Sucht nach Informationen zu einem Vorfall.
Args:
output_language / output_language_iso: Ausgabesprache (Lagebild-Sprache).
research_language_iso: optionaler Override fuer die Sprache, in der gesucht
werden soll. Default = output_language_iso. Bei jp_demo z.B. 'ja',
waehrend output_language_iso 'de' bleibt (Lagebild deutsch, Recherche japanisch).
Returns:
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
"""
# research_language defaultet auf output_language. Wenn das aber abweicht
# (z.B. jp_demo: research='ja', output='de'), ueberschreiben wir die
# Sprach-Anweisung im Prompt mit einer eigenen, dual-sprachigen Variante.
research_language_iso = (research_language_iso or output_language_iso or "de").lower()
# Display-Name der Recherche-Sprache fuer Prompts ("Japanese", "Russian", ...)
from services.org_settings import language_display as _lang_display
research_language_display = _lang_display(research_language_iso)
# Bevorzugte Web-Quellen als Prompt-Block (optional)
preferred_sources_block = ""
if preferred_sources:
@@ -589,8 +602,31 @@ class ResearcherAgent:
"aber nicht deine sonstige Recherche.\n"
)
# Asymmetrische Sprach-Auswahl: research_language weicht von output_language ab
# -> eigene Anweisung "primaer in research-language, englische Quellen aus der
# Region auch erlaubt". Sonst die bisherige Logik (primary_only vs international).
asymmetric_lang = research_language_iso != output_language_iso
def _build_lang_instruction(deep: bool) -> str:
if asymmetric_lang:
# jp_demo & Co.: Recherche in Quellsprache + lokale Englisch-Outlets.
return (
f"- Fokus liegt auf {research_language_display}-sprachigen Quellen "
f"(Behoerden, Qualitaetszeitungen, oeffentlich-rechtliche Medien dieser Sprache).\n"
f"- Englischsprachige Outlets mit Fokus auf demselben Sprachraum/Region sind "
f"ebenfalls willkommen (z.B. Japan Times, Nikkei Asia, Kyodo English fuer Japan; "
f"Moscow Times English fuer Russland).\n"
f"- Quellen ausserhalb des Sprachraums NUR, wenn sie exklusive Informationen "
f"ueber die Region liefern (z.B. Reuters/AFP/AP-Berichte aus der Region).\n"
f"- Antworte in der Ausgabesprache {output_language} (das Lagebild wird in "
f"{output_language} angezeigt), aber zitiere die Original-Headlines/Quellen unveraendert."
)
if deep:
return lang_deep_international(output_language) if international else lang_deep_primary_only(output_language)
return lang_international(output_language) if international else lang_primary_only(output_language)
if incident_type == "research":
lang_instruction = lang_deep_international(output_language) if international else lang_deep_primary_only(output_language)
lang_instruction = _build_lang_instruction(deep=True)
# Bestehende Artikel als Kontext für den Prompt aufbereiten
existing_context = ""
if existing_articles:
@@ -611,7 +647,7 @@ class ResearcherAgent:
preferred_sources_block=preferred_sources_block,
)
else:
lang_instruction = lang_international(output_language) if international else lang_primary_only(output_language)
lang_instruction = _build_lang_instruction(deep=False)
# Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen
existing_context = ""
if existing_articles:

Datei anzeigen

@@ -373,20 +373,27 @@ async def translate_articles(
output_lang: str = "de",
batch_size: int = DEFAULT_BATCH_SIZE,
usage_accumulator: UsageAccumulator | None = None,
enabled: bool | None = None,
) -> list[dict]:
"""Uebersetzt eine beliebige Anzahl Artikel in Batches.
Bringt die Batches durch Logik in `translate_articles_batch` und gibt
EINE flache Liste der Translations zurueck. Wenn ein Batch fehlschlaegt,
wird er uebersprungen (anderer Batches laufen weiter).
enabled: Pro-Aufruf-Override des globalen TRANSLATOR_ENABLED-Flags. Wenn None,
greift das Modul-Default (config.TRANSLATOR_ENABLED, abgeleitet aus .env).
Der Orchestrator setzt das aus dem Org-Setting 'translator_enabled', damit
jp_demo (Translator zwingend an) trotz global deaktiviertem Flag funktioniert.
"""
if not articles:
return []
if not TRANSLATOR_ENABLED:
is_enabled = TRANSLATOR_ENABLED if enabled is None else bool(enabled)
if not is_enabled:
logger.info(
"Translator deaktiviert (TRANSLATOR_ENABLED=false), %d Artikel uebersprungen",
len(articles),
"Translator deaktiviert (enabled=%s, global TRANSLATOR_ENABLED=%s), %d Artikel uebersprungen",
enabled, TRANSLATOR_ENABLED, len(articles),
)
return []

Datei anzeigen

@@ -227,6 +227,10 @@ class RSSParser:
# alle "news.google.com" sind, obwohl sie für 14 verschiedene
# Behörden/Zeitungen stehen. Wird vom Domain-Cap genutzt.
"source_domain": feed_config.get("domain") or "",
# media_type aus dem Feed-Eintrag (z.B. "forum" fuer 5ch/Hatena/Note)
# damit downstream Pipeline-Schritte (Faktencheck, Geoparsing,
# Topic-Filter, Stimmungs-Kachel) Foren-Quellen erkennen koennen.
"media_type": feed_config.get("media_type") or "",
"content_original": summary[:1000] if summary else None,
"content_de": summary[:1000] if summary and self._is_german(summary) else None,
# Sprache primär aus der Quell-Konfiguration übernehmen

Datei anzeigen

@@ -1,12 +1,17 @@
"""Organization-Settings-Helper.
KV-Store pro Organisation. Aktuell genutzt fuer output_language ('de'|'en').
Spaeter erweiterbar (Default-Modell, Telegram-Toggle, Theme, ...).
KV-Store pro Organisation. Aktuell genutzt fuer:
- output_language ('de'|'en'|...) - Anzeige-/Lagebild-Sprache
- source_language_whitelist (JSON-Liste, z.B. ["ja"]) - schraenkt RSS/Telegram-Quellen ein
- research_language (ISO-Code) - steuert WebSearch-Prompts (default = output_language)
- translator_enabled ('true'|'false') - override fuer das globale TRANSLATOR_ENABLED-Flag
Cache: TTL 60s in-memory pro (tenant_id, key). Wird bei set_org_setting()
invalidiert.
"""
import json
import logging
import os
import time
from typing import Optional
@@ -84,6 +89,15 @@ async def set_org_setting(
LANGUAGE_DISPLAY_NAMES = {
"de": "Deutsch",
"en": "English",
"ja": "Japanese",
"zh": "Chinese",
"ko": "Korean",
"ru": "Russian",
"ar": "Arabic",
"fa": "Persian",
"he": "Hebrew",
"fr": "French",
"es": "Spanish",
}
@@ -91,7 +105,10 @@ async def get_org_language(
db: aiosqlite.Connection,
tenant_id: int,
) -> str:
"""Liefert ISO-2-Sprachcode der Org (default 'de')."""
"""Liefert ISO-2-Sprachcode der Org (default 'de').
Steuert die Lagebild-/Anzeige-Sprache.
"""
value = await get_org_setting(db, tenant_id, "output_language", default="de")
if value not in LANGUAGE_DISPLAY_NAMES:
logger.warning("Unbekannte output_language '%s' fuer Org %s -- fallback 'de'", value, tenant_id)
@@ -99,6 +116,65 @@ async def get_org_language(
return value
async def get_source_language_whitelist(
db: aiosqlite.Connection,
tenant_id: int,
) -> Optional[list[str]]:
"""Liefert Liste erlaubter Quellsprachen oder None (= keine Einschränkung).
Gespeichert als JSON-Array unter dem Key 'source_language_whitelist'.
Beispiel-Wert: '["ja"]' -> nur japanischsprachige Quellen.
"""
raw = await get_org_setting(db, tenant_id, "source_language_whitelist", default=None)
if not raw:
return None
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError) as e:
logger.warning(
"source_language_whitelist fuer Org %s ist kein JSON ('%s'): %s",
tenant_id, raw, e,
)
return None
if not isinstance(parsed, list):
logger.warning("source_language_whitelist fuer Org %s ist keine Liste: %r", tenant_id, parsed)
return None
cleaned = [str(x).strip().lower() for x in parsed if str(x).strip()]
return cleaned or None
async def get_research_language(
db: aiosqlite.Connection,
tenant_id: int,
) -> str:
"""Liefert die Sprache, in der der WebSearch-Researcher primär sucht.
Default = output_language. Bei jp_demo z.B. 'ja', während output_language='de' bleibt.
"""
value = await get_org_setting(db, tenant_id, "research_language", default=None)
if value and value in LANGUAGE_DISPLAY_NAMES:
return value
return await get_org_language(db, tenant_id)
async def get_translator_enabled(
db: aiosqlite.Connection,
tenant_id: Optional[int],
) -> bool:
"""Liefert true wenn der (volle) Translator-Schritt fuer diese Org laufen soll.
Hierarchie:
1. Org-Setting 'translator_enabled' ('true'/'false') gewinnt, wenn gesetzt.
2. Sonst: globales ENV-Flag TRANSLATOR_ENABLED (Default true im config.py).
"""
if tenant_id is not None:
raw = await get_org_setting(db, tenant_id, "translator_enabled", default=None)
if raw is not None:
return str(raw).strip().lower() in ("true", "1", "yes", "on")
env_value = os.environ.get("TRANSLATOR_ENABLED", "true").strip().lower()
return env_value in ("true", "1", "yes", "on")
def language_display(lang_iso: str) -> str:
"""ISO-Code -> Anzeigename fuer Prompts ('de' -> 'Deutsch')."""
return LANGUAGE_DISPLAY_NAMES.get(lang_iso, lang_iso)

Datei anzeigen

@@ -642,14 +642,20 @@ async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss
source_type: "rss_feed" (Default) oder "podcast_feed" — trennt RSS- und Podcast-Quellen
in getrennten Pipelines, damit der RSS-Heisspfad unveraendert bleibt.
Wenn die Org eine source_language_whitelist gesetzt hat (z.B. jp_demo: ['ja']),
werden nur Feeds geliefert, deren primary_language darauf passt. Feeds ohne
gesetztes primary_language fallen in dem Fall raus — das ist gewollt, weil
eine Whitelist gerade die strenge Beschraenkung ist.
"""
from database import get_db
from services.org_settings import get_source_language_whitelist
db = await get_db()
try:
if tenant_id:
cursor = await db.execute(
"SELECT name, url, domain, category, notes, primary_language, "
"SELECT name, url, domain, category, notes, primary_language, media_type, "
"COALESCE(article_count, 0) AS article_count FROM sources "
"WHERE source_type = ? AND status = 'active' "
"AND (tenant_id IS NULL OR tenant_id = ?)",
@@ -657,12 +663,25 @@ async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss
)
else:
cursor = await db.execute(
"SELECT name, url, domain, category, notes, primary_language, "
"SELECT name, url, domain, category, notes, primary_language, media_type, "
"COALESCE(article_count, 0) AS article_count FROM sources "
"WHERE source_type = ? AND status = 'active'",
(source_type,),
)
return [dict(row) for row in await cursor.fetchall()]
feeds = [dict(row) for row in await cursor.fetchall()]
# Whitelist-Filter (nur wenn die Org eine gesetzt hat)
if tenant_id:
whitelist = await get_source_language_whitelist(db, tenant_id)
if whitelist:
before = len(feeds)
feeds = [f for f in feeds if (f.get("primary_language") or "").lower() in whitelist]
logger.info(
"source_language_whitelist=%s fuer Org %s: %d/%d Feeds passieren",
whitelist, tenant_id, len(feeds), before,
)
return feeds
except Exception as e:
logger.error(f"Fehler beim Laden der Feed-Metadaten ({source_type}): {e}")
return []