diff --git a/src/agents/factchecker.py b/src/agents/factchecker.py index ef15113..a33ff0e 100644 --- a/src/agents/factchecker.py +++ b/src/agents/factchecker.py @@ -431,9 +431,27 @@ class FactCheckerAgent: """Prüft Fakten über Claude CLI gegen unabhängige Quellen.""" def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str: - """Formatiert Artikel als Text für den Prompt.""" + """Formatiert Artikel als Text für den Prompt. + + Foren-Quellen (media_type='forum', z.B. 5ch/Hatena/Note) werden hier + ausgeschlossen — sie sind Stimmungsmaterial, kein Faktenbeleg. Ein + anonymer Forenpost darf nicht als "Quelle bestaetigt Behauptung X" + gelten. + """ + # Falls media_type am Dict vorhanden ist, Foren-Quellen ausfiltern. + # Bei Article-Dicts aus dem RSS-/Pre-Topic-Pfad ist das Feld gesetzt; + # bei Reload aus der DB muss der Orchestrator das per JOIN annotieren. + non_forum = [a for a in articles if (a.get("media_type") or "").lower() != "forum"] + skipped = len(articles) - len(non_forum) + if skipped > 0: + logger.info( + "Faktencheck: %d Foren-Quellen (media_type='forum') ausgeschlossen, " + "%d Artikel als Faktenbeleg-Kandidaten", + skipped, len(non_forum), + ) + articles_text = "" - for i, article in enumerate(articles[:max_articles]): + for i, article in enumerate(non_forum[:max_articles]): articles_text += f"\n--- Meldung {i+1} ---\n" articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" source_url = article.get('source_url', '') diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index d251211..63b4770 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -744,14 +744,42 @@ class AgentOrchestrator: description = incident["description"] or "" incident_type = incident["type"] or "adhoc" international = bool(incident["international_sources"]) if "international_sources" in incident.keys() else True + # Wenn die Org eine Sprach-Whitelist gesetzt hat, ist 'international' bedeutungslos — + # die Whitelist gewinnt. Wir setzen 'international' auf True, damit der nachgelagerte + # Code alle (durch Whitelist gefilterten) Feeds in Betracht zieht. Tatsaechliche + # Einschraenkung passiert in get_feeds_with_metadata. + # Hinweis: source_lang_whitelist wird weiter unten geladen. include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False visibility = incident["visibility"] if "visibility" in incident.keys() else "public" created_by = incident["created_by"] if "created_by" in incident.keys() else None tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None # Org-Sprache fuer alle KI-Agenten (Lagebild, Faktencheck, Recherche) - from services.org_settings import get_org_language, language_display + from services.org_settings import ( + get_org_language, language_display, get_research_language, + get_source_language_whitelist, get_translator_enabled, + ) output_language_iso = await get_org_language(db, tenant_id) if tenant_id else "de" output_language = language_display(output_language_iso) + # research_language steuert nur den WebSearch-Prompt ("suche in Sprache X"). + # Default = output_language_iso. Bei jp_demo wird das auf 'ja' gesetzt, waehrend + # output_language_iso 'de' bleibt (Lagebild auf Deutsch, Recherche auf Japanisch). + research_language_iso = await get_research_language(db, tenant_id) if tenant_id else output_language_iso + # source_language_whitelist schraenkt RSS-/Telegram-Quellenpool ein (z.B. ['ja']). + # Wenn gesetzt, wird das incident-level Flag international_sources ignoriert + # (Whitelist ist explizit, das Flag ist Default-Verhalten). + source_lang_whitelist = await get_source_language_whitelist(db, tenant_id) if tenant_id else None + # Pro-Org-Override des globalen TRANSLATOR_ENABLED-Flags. + translator_enabled = await get_translator_enabled(db, tenant_id) + # Whitelist gewinnt ueber das incident-Flag international_sources: + # wenn die Org eine Sprach-Whitelist hat, sind alle gewaehlten Feeds + # ohnehin "Wunsch-Sprache" — kein Splitting in primary/international noetig. + if source_lang_whitelist: + international = True + logger.info( + "Org %s hat source_language_whitelist=%s gesetzt; " + "incident.international_sources wird ignoriert", + tenant_id, source_lang_whitelist, + ) previous_summary = incident["summary"] or "" previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None previous_developments = incident["latest_developments"] if "latest_developments" in incident.keys() else None @@ -936,6 +964,7 @@ class AgentOrchestrator: preferred_sources=preferred_sources, output_language=output_language, output_language_iso=output_language_iso, + research_language_iso=research_language_iso, ) logger.info( f"Claude-Recherche: {len(results)} Ergebnisse" @@ -1209,14 +1238,25 @@ class AgentOrchestrator: await db.commit() # Geoparsing: Orte aus neuen Artikeln extrahieren und speichern - if new_articles_for_analysis: + # Foren-Quellen (media_type='forum') ausschliessen: 5ch/Hatena/Note-Posts haben + # keinen eigenen, fuer das Lagebild interessanten geographischen Bezug; spart Haiku-Calls. + articles_for_geoparsing = [ + a for a in new_articles_for_analysis + if (a.get("media_type") or "").lower() != "forum" + ] + if new_articles_for_analysis and not articles_for_geoparsing: + logger.info( + "Geoparsing uebersprungen: alle %d neuen Artikel sind Forum-Quellen", + len(new_articles_for_analysis), + ) + if articles_for_geoparsing: # Pipeline-Schritt 5: Orte erkennen (Start) await _pipe_start("geoparsing") try: from agents.geoparsing import geoparse_articles incident_context = f"{title} - {description}" - logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...") - geo_results, category_labels = await geoparse_articles(new_articles_for_analysis, incident_context) + logger.info(f"Geoparsing fuer {len(articles_for_geoparsing)} neue Artikel (Foren ausgeschlossen)...") + geo_results, category_labels = await geoparse_articles(articles_for_geoparsing, incident_context) geo_count = 0 for art_id, locations in geo_results.items(): for loc in locations: @@ -1294,7 +1334,12 @@ class AgentOrchestrator: all_articles_preloaded = None if not previous_summary or new_count == 0 or not existing_facts: cursor = await db.execute( - "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", + # JOIN auf sources, damit media_type pro Artikel verfuegbar ist + # (Faktencheck schliesst Foren-Quellen aus, das Stimmungs-Modul nimmt + # nur diese). Bei Quellen ohne Match in sources bleibt media_type NULL. + "SELECT a.*, s.media_type AS media_type FROM articles a " + "LEFT JOIN sources s ON s.name = a.source " + "WHERE a.incident_id = ? ORDER BY a.collected_at DESC", (incident_id,), ) all_articles_preloaded = [dict(row) for row in await cursor.fetchall()] @@ -1582,8 +1627,9 @@ class AgentOrchestrator: from services.post_refresh_qc import normalize_german_umlauts as _norm_de2 translations = await translate_articles( pending_translations, - output_lang="de", + output_lang=output_language_iso, usage_accumulator=usage_acc, + enabled=translator_enabled, ) for t in translations: hd = t.get("headline_de") diff --git a/src/agents/researcher.py b/src/agents/researcher.py index bf7f046..1b42e29 100644 --- a/src/agents/researcher.py +++ b/src/agents/researcher.py @@ -562,14 +562,27 @@ class ResearcherAgent: logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}") return None, None - async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None, output_language: str = "Deutsch", output_language_iso: str = "de") -> tuple[list[dict], ClaudeUsage | None, bool]: + async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None, output_language: str = "Deutsch", output_language_iso: str = "de", research_language_iso: str | None = None) -> tuple[list[dict], ClaudeUsage | None, bool]: """Sucht nach Informationen zu einem Vorfall. + Args: + output_language / output_language_iso: Ausgabesprache (Lagebild-Sprache). + research_language_iso: optionaler Override fuer die Sprache, in der gesucht + werden soll. Default = output_language_iso. Bei jp_demo z.B. 'ja', + waehrend output_language_iso 'de' bleibt (Lagebild deutsch, Recherche japanisch). + Returns: (artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat, das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen "echt keine Treffer" und "kaputte Antwort" unterscheiden. """ + # research_language defaultet auf output_language. Wenn das aber abweicht + # (z.B. jp_demo: research='ja', output='de'), ueberschreiben wir die + # Sprach-Anweisung im Prompt mit einer eigenen, dual-sprachigen Variante. + research_language_iso = (research_language_iso or output_language_iso or "de").lower() + # Display-Name der Recherche-Sprache fuer Prompts ("Japanese", "Russian", ...) + from services.org_settings import language_display as _lang_display + research_language_display = _lang_display(research_language_iso) # Bevorzugte Web-Quellen als Prompt-Block (optional) preferred_sources_block = "" if preferred_sources: @@ -589,8 +602,31 @@ class ResearcherAgent: "aber nicht deine sonstige Recherche.\n" ) + # Asymmetrische Sprach-Auswahl: research_language weicht von output_language ab + # -> eigene Anweisung "primaer in research-language, englische Quellen aus der + # Region auch erlaubt". Sonst die bisherige Logik (primary_only vs international). + asymmetric_lang = research_language_iso != output_language_iso + + def _build_lang_instruction(deep: bool) -> str: + if asymmetric_lang: + # jp_demo & Co.: Recherche in Quellsprache + lokale Englisch-Outlets. + return ( + f"- Fokus liegt auf {research_language_display}-sprachigen Quellen " + f"(Behoerden, Qualitaetszeitungen, oeffentlich-rechtliche Medien dieser Sprache).\n" + f"- Englischsprachige Outlets mit Fokus auf demselben Sprachraum/Region sind " + f"ebenfalls willkommen (z.B. Japan Times, Nikkei Asia, Kyodo English fuer Japan; " + f"Moscow Times English fuer Russland).\n" + f"- Quellen ausserhalb des Sprachraums NUR, wenn sie exklusive Informationen " + f"ueber die Region liefern (z.B. Reuters/AFP/AP-Berichte aus der Region).\n" + f"- Antworte in der Ausgabesprache {output_language} (das Lagebild wird in " + f"{output_language} angezeigt), aber zitiere die Original-Headlines/Quellen unveraendert." + ) + if deep: + return lang_deep_international(output_language) if international else lang_deep_primary_only(output_language) + return lang_international(output_language) if international else lang_primary_only(output_language) + if incident_type == "research": - lang_instruction = lang_deep_international(output_language) if international else lang_deep_primary_only(output_language) + lang_instruction = _build_lang_instruction(deep=True) # Bestehende Artikel als Kontext für den Prompt aufbereiten existing_context = "" if existing_articles: @@ -611,7 +647,7 @@ class ResearcherAgent: preferred_sources_block=preferred_sources_block, ) else: - lang_instruction = lang_international(output_language) if international else lang_primary_only(output_language) + lang_instruction = _build_lang_instruction(deep=False) # Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen existing_context = "" if existing_articles: diff --git a/src/agents/translator.py b/src/agents/translator.py index 9ecfe58..20fc8c8 100644 --- a/src/agents/translator.py +++ b/src/agents/translator.py @@ -373,20 +373,27 @@ async def translate_articles( output_lang: str = "de", batch_size: int = DEFAULT_BATCH_SIZE, usage_accumulator: UsageAccumulator | None = None, + enabled: bool | None = None, ) -> list[dict]: """Uebersetzt eine beliebige Anzahl Artikel in Batches. Bringt die Batches durch Logik in `translate_articles_batch` und gibt EINE flache Liste der Translations zurueck. Wenn ein Batch fehlschlaegt, wird er uebersprungen (anderer Batches laufen weiter). + + enabled: Pro-Aufruf-Override des globalen TRANSLATOR_ENABLED-Flags. Wenn None, + greift das Modul-Default (config.TRANSLATOR_ENABLED, abgeleitet aus .env). + Der Orchestrator setzt das aus dem Org-Setting 'translator_enabled', damit + jp_demo (Translator zwingend an) trotz global deaktiviertem Flag funktioniert. """ if not articles: return [] - if not TRANSLATOR_ENABLED: + is_enabled = TRANSLATOR_ENABLED if enabled is None else bool(enabled) + if not is_enabled: logger.info( - "Translator deaktiviert (TRANSLATOR_ENABLED=false), %d Artikel uebersprungen", - len(articles), + "Translator deaktiviert (enabled=%s, global TRANSLATOR_ENABLED=%s), %d Artikel uebersprungen", + enabled, TRANSLATOR_ENABLED, len(articles), ) return [] diff --git a/src/feeds/rss_parser.py b/src/feeds/rss_parser.py index 7e3d1d6..58335b7 100644 --- a/src/feeds/rss_parser.py +++ b/src/feeds/rss_parser.py @@ -227,6 +227,10 @@ class RSSParser: # alle "news.google.com" sind, obwohl sie für 14 verschiedene # Behörden/Zeitungen stehen. Wird vom Domain-Cap genutzt. "source_domain": feed_config.get("domain") or "", + # media_type aus dem Feed-Eintrag (z.B. "forum" fuer 5ch/Hatena/Note) + # damit downstream Pipeline-Schritte (Faktencheck, Geoparsing, + # Topic-Filter, Stimmungs-Kachel) Foren-Quellen erkennen koennen. + "media_type": feed_config.get("media_type") or "", "content_original": summary[:1000] if summary else None, "content_de": summary[:1000] if summary and self._is_german(summary) else None, # Sprache primär aus der Quell-Konfiguration übernehmen diff --git a/src/services/org_settings.py b/src/services/org_settings.py index d152b5d..cc8ac1d 100644 --- a/src/services/org_settings.py +++ b/src/services/org_settings.py @@ -1,12 +1,17 @@ """Organization-Settings-Helper. -KV-Store pro Organisation. Aktuell genutzt fuer output_language ('de'|'en'). -Spaeter erweiterbar (Default-Modell, Telegram-Toggle, Theme, ...). +KV-Store pro Organisation. Aktuell genutzt fuer: + - output_language ('de'|'en'|...) - Anzeige-/Lagebild-Sprache + - source_language_whitelist (JSON-Liste, z.B. ["ja"]) - schraenkt RSS/Telegram-Quellen ein + - research_language (ISO-Code) - steuert WebSearch-Prompts (default = output_language) + - translator_enabled ('true'|'false') - override fuer das globale TRANSLATOR_ENABLED-Flag Cache: TTL 60s in-memory pro (tenant_id, key). Wird bei set_org_setting() invalidiert. """ +import json import logging +import os import time from typing import Optional @@ -84,6 +89,15 @@ async def set_org_setting( LANGUAGE_DISPLAY_NAMES = { "de": "Deutsch", "en": "English", + "ja": "Japanese", + "zh": "Chinese", + "ko": "Korean", + "ru": "Russian", + "ar": "Arabic", + "fa": "Persian", + "he": "Hebrew", + "fr": "French", + "es": "Spanish", } @@ -91,7 +105,10 @@ async def get_org_language( db: aiosqlite.Connection, tenant_id: int, ) -> str: - """Liefert ISO-2-Sprachcode der Org (default 'de').""" + """Liefert ISO-2-Sprachcode der Org (default 'de'). + + Steuert die Lagebild-/Anzeige-Sprache. + """ value = await get_org_setting(db, tenant_id, "output_language", default="de") if value not in LANGUAGE_DISPLAY_NAMES: logger.warning("Unbekannte output_language '%s' fuer Org %s -- fallback 'de'", value, tenant_id) @@ -99,6 +116,65 @@ async def get_org_language( return value +async def get_source_language_whitelist( + db: aiosqlite.Connection, + tenant_id: int, +) -> Optional[list[str]]: + """Liefert Liste erlaubter Quellsprachen oder None (= keine Einschränkung). + + Gespeichert als JSON-Array unter dem Key 'source_language_whitelist'. + Beispiel-Wert: '["ja"]' -> nur japanischsprachige Quellen. + """ + raw = await get_org_setting(db, tenant_id, "source_language_whitelist", default=None) + if not raw: + return None + try: + parsed = json.loads(raw) + except (json.JSONDecodeError, TypeError) as e: + logger.warning( + "source_language_whitelist fuer Org %s ist kein JSON ('%s'): %s", + tenant_id, raw, e, + ) + return None + if not isinstance(parsed, list): + logger.warning("source_language_whitelist fuer Org %s ist keine Liste: %r", tenant_id, parsed) + return None + cleaned = [str(x).strip().lower() for x in parsed if str(x).strip()] + return cleaned or None + + +async def get_research_language( + db: aiosqlite.Connection, + tenant_id: int, +) -> str: + """Liefert die Sprache, in der der WebSearch-Researcher primär sucht. + + Default = output_language. Bei jp_demo z.B. 'ja', während output_language='de' bleibt. + """ + value = await get_org_setting(db, tenant_id, "research_language", default=None) + if value and value in LANGUAGE_DISPLAY_NAMES: + return value + return await get_org_language(db, tenant_id) + + +async def get_translator_enabled( + db: aiosqlite.Connection, + tenant_id: Optional[int], +) -> bool: + """Liefert true wenn der (volle) Translator-Schritt fuer diese Org laufen soll. + + Hierarchie: + 1. Org-Setting 'translator_enabled' ('true'/'false') gewinnt, wenn gesetzt. + 2. Sonst: globales ENV-Flag TRANSLATOR_ENABLED (Default true im config.py). + """ + if tenant_id is not None: + raw = await get_org_setting(db, tenant_id, "translator_enabled", default=None) + if raw is not None: + return str(raw).strip().lower() in ("true", "1", "yes", "on") + env_value = os.environ.get("TRANSLATOR_ENABLED", "true").strip().lower() + return env_value in ("true", "1", "yes", "on") + + def language_display(lang_iso: str) -> str: """ISO-Code -> Anzeigename fuer Prompts ('de' -> 'Deutsch').""" return LANGUAGE_DISPLAY_NAMES.get(lang_iso, lang_iso) diff --git a/src/source_rules.py b/src/source_rules.py index a817d49..1b5aaed 100644 --- a/src/source_rules.py +++ b/src/source_rules.py @@ -642,14 +642,20 @@ async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss source_type: "rss_feed" (Default) oder "podcast_feed" — trennt RSS- und Podcast-Quellen in getrennten Pipelines, damit der RSS-Heisspfad unveraendert bleibt. + + Wenn die Org eine source_language_whitelist gesetzt hat (z.B. jp_demo: ['ja']), + werden nur Feeds geliefert, deren primary_language darauf passt. Feeds ohne + gesetztes primary_language fallen in dem Fall raus — das ist gewollt, weil + eine Whitelist gerade die strenge Beschraenkung ist. """ from database import get_db + from services.org_settings import get_source_language_whitelist db = await get_db() try: if tenant_id: cursor = await db.execute( - "SELECT name, url, domain, category, notes, primary_language, " + "SELECT name, url, domain, category, notes, primary_language, media_type, " "COALESCE(article_count, 0) AS article_count FROM sources " "WHERE source_type = ? AND status = 'active' " "AND (tenant_id IS NULL OR tenant_id = ?)", @@ -657,12 +663,25 @@ async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss ) else: cursor = await db.execute( - "SELECT name, url, domain, category, notes, primary_language, " + "SELECT name, url, domain, category, notes, primary_language, media_type, " "COALESCE(article_count, 0) AS article_count FROM sources " "WHERE source_type = ? AND status = 'active'", (source_type,), ) - return [dict(row) for row in await cursor.fetchall()] + feeds = [dict(row) for row in await cursor.fetchall()] + + # Whitelist-Filter (nur wenn die Org eine gesetzt hat) + if tenant_id: + whitelist = await get_source_language_whitelist(db, tenant_id) + if whitelist: + before = len(feeds) + feeds = [f for f in feeds if (f.get("primary_language") or "").lower() in whitelist] + logger.info( + "source_language_whitelist=%s fuer Org %s: %d/%d Feeds passieren", + whitelist, tenant_id, len(feeds), before, + ) + + return feeds except Exception as e: logger.error(f"Fehler beim Laden der Feed-Metadaten ({source_type}): {e}") return []