Dateien
AegisSight-Monitor/src/agents/researcher.py
Claude Code 9c50439785 feat(x): X (Twitter) als Bezugsquelle pro Lage
X-Accounts werden analog zu Telegram als Quelle (source_type=x_account)
konfiguriert und pro Lage ueber include_x zugeschaltet. Der Scraper
(feeds/x_parser.py, twscrape) liest Account-Timelines, optional ueber
einen HTTP-Proxy mit Fallback auf direkten Abruf ueber die Server-IP.

- DB-Migration include_x, Pydantic-Modelle, incidents-Router
- Orchestrator-X-Pipeline plus Haiku-Account-Vorselektion
- sources-Router /x/validate, x_account-Typ in Stats und Frontend
- Lage-Einstellungen: X-Toggle neben international und Telegram
- twscrape als Abhaengigkeit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 06:52:19 +00:00

1096 Zeilen
48 KiB
Python

"""Researcher-Agent: Sucht nach Informationen via Claude WebSearch."""
import json
import logging
import re
import urllib.parse
from agents.claude_client import call_claude, ClaudeUsage
from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.researcher")
# Google-News-Locale pro ISO-Sprachcode: (hl, gl). ceid wird daraus gebaut.
_GNEWS_LOCALE = {
"ja": ("ja", "JP"),
"de": ("de", "DE"),
"en": ("en-US", "US"),
"ru": ("ru", "RU"),
"ko": ("ko", "KR"),
"zh": ("zh-CN", "CN"),
"fr": ("fr", "FR"),
"es": ("es", "ES"),
"it": ("it", "IT"),
"ar": ("ar", "EG"),
"he": ("iw", "IL"),
"fa": ("fa", "IR"),
}
def build_news_search_feeds(
keywords_by_lang: dict | list | None,
languages: list[str],
max_keywords: int = 4,
recency_days: int | None = None,
) -> list[dict]:
"""Baut dynamische Google-News-Volltext-Such-Feeds pro Sprache.
Statt nur feste site:-RSS-Feeds zu durchsuchen, erzeugt diese Funktion pro
Sprache einen Google-News-Suchfeed (news.google.com/rss/search?q=...). Damit
erreicht die Pipeline auch Quellen, die in keinem festen Feed stehen
(Security-Vendor-Blogs, Fachportale, Regionalmedien). Der Recall steigt
massiv; die Precision bleibt, weil der nachgelagerte Topic-Filter unveraendert
greift.
Args:
keywords_by_lang: Sprach-Dict {iso: [keyword,...]} aus der Keyword-Extraktion.
languages: ISO-Codes, fuer die ein Suchfeed gebaut werden soll.
max_keywords: wie viele (spezifischste) Keywords in die Such-Query gehen.
recency_days: wenn gesetzt, wird der Google-News-Operator "when:Nd" an die
Query gehaengt — der Feed liefert dann nur Artikel der letzten N Tage.
Fuer "Frische-Suchfeeds", die das aktuelle Bild garantiert einfangen.
Returns:
Liste von Feed-Config-Dicts (kompatibel mit RSSParser._fetch_feed).
"""
if not keywords_by_lang or not isinstance(keywords_by_lang, dict):
return []
feeds: list[dict] = []
seen_queries: set[str] = set()
for lang in languages:
lang_key = (lang or "").lower().strip()
locale = _GNEWS_LOCALE.get(lang_key)
if not locale:
continue
lang_kws = [str(k).strip() for k in (keywords_by_lang.get(lang_key) or []) if str(k).strip()]
en_kws = [str(k).strip() for k in (keywords_by_lang.get("en") or []) if str(k).strip()]
if lang_key == "en":
query_terms = en_kws[:max_keywords]
else:
# Fuer nicht-englische Sprachen: die ersten 2 englischen Keywords
# voranstellen. Haiku ordnet Eigennamen/Akronyme (z.B. "Qilin",
# "Asahi") nach vorne — und die kommen auch in fremdsprachigen
# Artikeln lateinisch vor. Ohne das fehlt beim ersten Refresh (noch
# keine Headlines-Historie) der entscheidende Eigenname in der Query.
# Danach 3 sprach-spezifische Keywords.
query_terms = en_kws[:2] + lang_kws[:3]
# Wenn fuer die Sprache gar keine Keywords da sind: ganz auf en.
if not lang_kws:
query_terms = en_kws[:max_keywords]
# Dedup, Reihenfolge erhalten
seen_terms: set[str] = set()
deduped: list[str] = []
for t in query_terms:
tl = t.lower()
if tl in seen_terms:
continue
seen_terms.add(tl)
deduped.append(t)
if not deduped:
continue
query = " ".join(deduped)
# when:Nd-Operator anhaengen (Google-News-Zeitfilter)
effective_query = query
if recency_days and recency_days > 0:
effective_query = f"{query} when:{recency_days}d"
if not effective_query or effective_query in seen_queries:
continue
seen_queries.add(effective_query)
hl, gl = locale
ceid_lang = hl.split("-")[0]
url = (
"https://news.google.com/rss/search?q="
+ urllib.parse.quote(effective_query)
+ f"&hl={hl}&gl={gl}&ceid={gl}:{ceid_lang}"
)
if recency_days and recency_days > 0:
name = f"Google News Suche ({lang_key}, letzte {recency_days}d): {query}"
domain = f"google-news-search-{lang_key}-recent"
else:
name = f"Google News Suche ({lang_key}): {query}"
domain = f"google-news-search-{lang_key}"
feeds.append({
"name": name,
"url": url,
# Eigene Domain-Gruppe, damit der Domain-Cap die Such-Feeds NICHT mit
# den site:-Google-News-Feeds in einen Topf wirft.
"domain": domain,
"primary_language": lang_key,
"category": "international",
"media_type": "",
})
logger.info("Google-News-Suchfeed (%s): q=%r", lang_key, effective_query)
return feeds
class ResearcherParseError(Exception):
"""Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte."""
def _truncate_for_log(text: str, limit: int = 600) -> str:
"""Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist."""
if not text:
return ""
snippet = text.strip().replace("\n", "\\n")
if len(snippet) > limit:
snippet = snippet[:limit] + "..."
return snippet
def _extract_json_array(text: str):
"""Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
bracket = text.find("[", idx)
if bracket == -1:
return None
try:
obj, _ = decoder.raw_decode(text, bracket)
except json.JSONDecodeError:
idx = bracket + 1
continue
if isinstance(obj, list):
return obj
idx = bracket + 1
def _extract_json_object(text: str):
"""Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
brace = text.find("{", idx)
if brace == -1:
return None
try:
obj, _ = decoder.raw_decode(text, brace)
except json.JSONDecodeError:
idx = brace + 1
continue
if isinstance(obj, dict):
return obj
idx = brace + 1
def _normalize_keywords_dict(raw: dict) -> dict | None:
"""Normalisiert ein {iso_lang: [keywords]}-Dict aus Haiku-Output.
Wir wenden .lower() global an (Python case-folding lässt CJK unverändert und
lowercased kyrillisch/arabisch/hebräisch sinnvoll), damit der Match später
konsistent gegen den ebenfalls lowercased Headline-Text läuft.
Entfernt leere Strings und Duplikate. Gibt None zurück, wenn das Ergebnis leer ist.
"""
out: dict[str, list[str]] = {}
for lang, kws in raw.items():
if not isinstance(lang, str) or not isinstance(kws, list):
continue
lang_key = lang.lower().strip()
clean: list[str] = []
seen: set[str] = set()
for k in kws:
s = str(k).strip().lower()
if not s or s in seen:
continue
seen.add(s)
clean.append(s)
if clean:
out[lang_key] = clean
return out or None
def flatten_keywords(keywords_by_lang: dict | list | None) -> list[str]:
"""Bequeme Flachsicht aller Keywords (für Logging, Web-Source-Selektion etc.).
Akzeptiert auch die alte flache Liste, damit Aufrufer schrittweise migrieren können.
"""
if not keywords_by_lang:
return []
if isinstance(keywords_by_lang, list):
return [str(k).strip() for k in keywords_by_lang if str(k).strip()]
flat: list[str] = []
seen: set[str] = set()
for kws in keywords_by_lang.values():
if not isinstance(kws, list):
continue
for k in kws:
s = str(k).strip()
if not s or s in seen:
continue
seen.add(s)
flat.append(s)
return flat
def keywords_for_language(keywords_by_lang: dict | list | None, lang: str | None) -> list[str]:
"""Liefert die für eine konkrete Feed-/Channel-Sprache anwendbaren Keywords.
- Universelle "en"-Keywords (lateinische Eigennamen) immer mitgeben.
- Plus die Keywords der Feed-Sprache, falls vorhanden.
- Für unbekannte/None-Sprachen: alle Keywords (flach), damit kein Feed leer ausgeht.
- Akzeptiert auch alte flache Liste -> wird unverändert zurückgegeben.
"""
if not keywords_by_lang:
return []
if isinstance(keywords_by_lang, list):
return [str(k).strip() for k in keywords_by_lang if str(k).strip()]
if not lang:
return flatten_keywords(keywords_by_lang)
lang_key = lang.lower().strip()
out: list[str] = []
seen: set[str] = set()
for k_lang in ("en", lang_key):
for k in keywords_by_lang.get(k_lang, []) or []:
s = str(k).strip()
if not s or s in seen:
continue
seen.add(s)
out.append(s)
# Wenn weder "en" noch lang_key Treffer haben (z.B. Haiku-Schema-Mismatch):
# auf die universelle Flachsicht zurückfallen, damit der Feed nicht leer matched.
if not out:
return flatten_keywords(keywords_by_lang)
return out
RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
AUFTRAG: Suche nach aktuellen Informationen zu folgendem Vorfall:
Titel: {title}
Kontext: {description}
{existing_context}{preferred_sources_block}
REGELN:
- Suche nur bei seriösen Nachrichtenquellen (Nachrichtenagenturen, Qualitätszeitungen, öffentlich-rechtliche Medien, Behörden)
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
{language_instruction}
- Faktenbasiert und neutral - keine Spekulationen
- KRITISCH für source_url: Kopiere die EXAKTE URL aus den WebSearch-Ergebnissen. Erfinde oder konstruiere NIEMALS URLs aus Mustern oder Erinnerung. Wenn du die exakte URL eines Artikels nicht aus den Suchergebnissen hast, lass diesen Artikel komplett weg.
- Nutze removepaywall.com für Paywall-geschützte Artikel (z.B. Spiegel+, Zeit+, SZ+): https://www.removepaywall.com/search?url=ARTIKEL_URL
- Nutze WebFetch um die 3-5 wichtigsten Artikel vollständig abzurufen und zusammenzufassen
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
Jedes Element hat diese Felder:
- "headline": Originale Überschrift
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
- "source": Name der Quelle (z.B. "Reuters", "tagesschau")
- "source_url": URL des Artikels
- "content_summary": Zusammenfassung des Inhalts (3-5 Sätze, in Ausgabesprache)
- "language": Sprache des Originals (z.B. "de", "en", "fr")
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
DEEP_RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Tiefenrecherche-Agent für ein Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
AUFTRAG: Führe eine umfassende, mehrstufige Hintergrundrecherche durch zu:
Titel: {title}
Kontext: {description}
{existing_context}{preferred_sources_block}
RECHERCHE IN 4 PHASEN — Führe ALLE Phasen nacheinander durch:
PHASE 1 — BREITE ERFASSUNG:
Suche nach aktueller Berichterstattung bei Nachrichtenagenturen, Qualitätszeitungen und öffentlich-rechtlichen Medien. Nutze verschiedene Suchbegriffe und Blickwinkel. Ziel: 8-12 Quellen.
PHASE 2 — LÜCKENANALYSE:
Prüfe deine bisherigen Ergebnisse kritisch. Welche Quellentypen fehlen noch?
Typisch fehlen: Parlamentsdokumente, Gesetzestexte, NGO-/UN-Berichte, Think-Tank-Analysen, investigative Langform-Berichte, akademische Einordnungen, Fachmedien.
Welche Akteure, Perspektiven oder Dimensionen sind noch nicht abgedeckt?
PHASE 3 — GEZIELTE TIEFENRECHERCHE:
Suche GEZIELT nach den in Phase 2 identifizierten Lücken:
- Parlamentarische Quellen (Bundestagsdrucksachen, Congress.gov, Hansard, etc.)
- Offizielle Dokumente und Pressemitteilungen von Behörden
- NGO-Berichte und UN-Dokumente (ohchr.org, amnesty.org, hrw.org, etc.)
- Think-Tank-Analysen (IISS, Brookings, SWP, DGAP, Chatham House, etc.)
- Investigative Recherchen und Langform-Artikel
- Fachzeitschriften und akademische Einordnungen
Nutze spezifische Suchbegriffe für institutionelle Quellen. Ziel: 6-10 weitere Quellen.
PHASE 4 — VERIFIKATION UND VERTIEFUNG:
Nutze WebFetch um die 6-10 wichtigsten Artikel vollständig abzurufen und ausführlich zusammenzufassen.
Priorisiere dabei Primärquellen und investigative Berichte.
Nutze removepaywall.com für Paywall-geschützte Artikel (z.B. https://www.removepaywall.com/search?url=ARTIKEL_URL)
{language_instruction}
ZIEL: 15-25 hochwertige Quellen aus mindestens 5 verschiedenen Quellentypen:
- Nachrichtenagenturen/Qualitätspresse
- Investigative Berichte/Langform
- Parlamentarische/Regierungsquellen
- NGO/Internationale Organisationen
- Fachmedien/Akademische Quellen
AUSSCHLUSS:
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
- KEINE Meinungsblogs ohne Quellenbelege
- KEINE erfundenen oder konstruierten URLs — gib bei source_url NUR die EXAKTE URL zurueck, die WebSearch tatsaechlich angezeigt hat. Wenn du die URL nicht aus den Suchergebnissen kopieren kannst, lass den Artikel weg.
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
Jedes Element hat diese Felder:
- "headline": Originale Überschrift
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
- "source": Name der Quelle (z.B. "netzpolitik.org", "Handelsblatt")
- "source_url": URL des Artikels
- "content_summary": Ausführliche Zusammenfassung des Inhalts (5-8 Sätze, in Ausgabesprache)
- "language": Sprache des Originals (z.B. "de", "en", "fr")
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
# Sprach-Anweisungen (org-sprach-relativ; primary_display = "Deutsch" | "English")
def lang_international(primary_display: str) -> str:
if primary_display == "Deutsch":
return "- Suche in Deutsch UND Englisch für internationale Abdeckung"
if primary_display == "English":
return "- Search in English AND other relevant languages for international coverage"
return f"- Suche in {primary_display} und weiteren relevanten Sprachen"
def lang_primary_only(primary_display: str) -> str:
if primary_display == "Deutsch":
return "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
if primary_display == "English":
return "- Search ONLY in English-language sources\n- NO sources in other languages"
return f"- Suche NUR auf {primary_display}\n- KEINE Quellen in anderen Sprachen"
def lang_deep_international(primary_display: str) -> str:
if primary_display == "Deutsch":
return "- Suche in Deutsch, Englisch und weiteren relevanten Sprachen"
if primary_display == "English":
return "- Search in English and other relevant languages"
return f"- Suche in {primary_display} und weiteren relevanten Sprachen"
def lang_deep_primary_only(primary_display: str) -> str:
if primary_display == "Deutsch":
return "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
if primary_display == "English":
return "- Search ONLY in English-language sources\n- NO sources in other languages"
return f"- Suche NUR auf {primary_display}\n- KEINE Quellen in anderen Sprachen"
FEED_SELECTION_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyst. Wähle aus dieser Feed-Liste die Feeds aus, die für die Lage relevant sein könnten. Generiere außerdem optimierte Suchbegriffe für das RSS-Matching.
LAGE: {title}
KONTEXT: {description}
INTERNATIONALE QUELLEN: {international}
FEEDS (Format: Nr. Name (Domain, Sprache) [Kategorie]):
{feed_list}
REGELN:
- Wähle alle Feeds die thematisch oder regional relevant sein könnten
- Lieber einen Feed zu viel als zu wenig auswählen
- Bei "Internationale Quellen: Nein": Keine internationalen Feeds auswählen
- Allgemeine Nachrichtenfeeds (tagesschau, Spiegel etc.) sind fast immer relevant
- QUELLENVIELFALT: Wähle pro Domain maximal 2-3 Feeds. Bevorzuge eine breite Mischung aus verschiedenen Quellen statt vieler Feeds derselben Domain.
KEYWORDS-REGELN:
- Keywords werden nach Sprache GRUPPIERT zurückgegeben (siehe Format unten).
- "en" enthält universelle Begriffe (Eigennamen, Akronyme, lateinisch geschriebene Marken/Personen),
die in JEDER Sprache vorkommen (z.B. "iran", "trump", "takaichi", "sdf").
- Für JEDE Sprache, in der ausgewählte Feeds publizieren (z.B. "ja", "ru", "ar", "zh", "ko", "fa",
"he", "de"), MUSS zusätzlich eine Liste mit 3-8 Suchbegriffen in der jeweiligen ORIGINALSCHRIFT
generiert werden. Beispiel Japan: "ja": ["自衛隊", "憲法改正", "改憲", "9条", "防衛省"].
Beispiel Russland: "ru": ["украина", "путин", "москва", "санкции"].
- Wenn die Lage rein deutsch oder englisch ist und keine fremdsprachigen Feeds gewählt werden,
reichen "de" und/oder "en".
- Nur inhaltlich relevante Begriffe (Personen, Orte, Themen, Organisationen)
- KEINE Jahreszahlen (2024, 2025, 2026 etc.)
- KEINE Monatsnamen (Januar, Februar, März etc.)
- KEINE generischen Wörter (aktuell, news, update etc.)
- Lateinische Begriffe in Kleinbuchstaben. CJK/Arabisch/Hebräisch/Kyrillisch wie üblich.
Antworte NUR mit einem JSON-Objekt in genau diesem Format:
{{"feeds": [1, 2, 5, 12], "keywords": {{"de": ["..."], "en": ["..."], "ja": ["..."]}}}}"""
KEYWORD_EXTRACTION_PROMPT = """Analysiere diese aktuellen Nachrichten-Headlines und extrahiere die wichtigsten Suchbegriffe fuer RSS-Feed-Filterung.
THEMA: {title}
AKTUELLE HEADLINES (die letzten Meldungen zu diesem Thema):
{headlines}
AUFGABE:
Generiere 5 Begriffspaare (DE + EN), mit denen neue RSS-Artikel zu diesem Thema gefunden werden.
Ein Artikel gilt als relevant, wenn mindestens 2 dieser Begriffe im Titel oder der Beschreibung vorkommen
- bei spezifischen Begriffen (Eigennamen, lange Begriffe ab 7 Zeichen) reicht 1 Treffer.
Wenn das Thema einen klaren Länderbezug zu einem nicht-lateinischen Sprachraum hat (z.B. Japan,
China, Korea, Russland, Iran, Israel, arabische Welt), GIB ZUSAETZLICH ein Feld "extra" mit
schrift-spezifischen Keywords pro Sprache zurück (siehe Format unten). Diese matchen dann die
Original-Headlines in den jeweiligen Feeds.
REGELN:
- ZWINGEND: Eigennamen oder spezifische Begriffe aus dem THEMA (z.B. Personennamen, Tiernamen,
Ortsnamen wie "timmy", "buckelwal", "merz", "dobrindt") MUESSEN als eigene Begriffspaare
enthalten sein. Solche Begriffe sind oft das einzige, was in kurzen Headlines vorkommt.
- Die ersten 2 Begriffspaare sind die zentralen Akteure/Laender/Themen (z.B. iran, israel,
buckelwal, timmy) — also die Begriffe, die in fast JEDEM Artikel zum Thema vorkommen.
- Die uebrigen 3 Begriffspaare sind aktuelle Entwicklungen aus den Headlines (Orte, Akteure,
Schluesselwoerter der aktuellen Phase).
- Wenn DE und EN identisch sind (Eigennamen), trotzdem das Paar einreichen.
- Begriffe muessen so gewaehlt sein, dass sie in kurzen RSS-Titeln matchen (einzelne Woerter,
keine Phrasen, keine Konjunktionen).
- Lateinische Begriffe in Kleinbuchstaben. CJK/Arabisch/Hebräisch/Kyrillisch wie üblich.
- Exakt 5 Begriffspaare im "pairs"-Array.
Antwort NUR als JSON-Objekt, z.B.:
{{"pairs": [{{"de": "japan", "en": "japan"}}, {{"de": "verfassung", "en": "constitution"}}, {{"de": "takaichi", "en": "takaichi"}}, {{"de": "selbstverteidigung", "en": "sdf"}}, {{"de": "pazifismus", "en": "pacifism"}}], "extra": {{"ja": ["自衛隊", "憲法改正", "改憲", "9条", "高市"]}}}}
Wenn kein nicht-lateinischer Sprachraum betroffen ist, lass "extra" weg oder gib `{{}}` zurück."""
WEB_SOURCE_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Pruefe diese eingetragenen Web-Quellen und waehle nur die thematisch passenden aus.
LAGE: {title}
KONTEXT: {description}
WEB-QUELLEN:
{source_list}
REGELN:
- Waehle nur Quellen, die thematisch tatsaechlich zur Lage passen
- Lieber leere Liste zurueckgeben als pauschal alle aufnehmen
- Behoerden- und institutionelle Quellen sind oft hochwertig, aber nur wenn das Thema passt
- Petitions-Plattformen z.B. nur bei Lagen zu Buergerinitiativen, Gesetzen, oeffentlichem Druck
- Bei reinen Kriegs-/Konflikt-/Tagesnachrichten meistens leere Liste
Antworte NUR mit einem JSON-Array der Quellen-Nummern, z.B. [1, 3] oder []."""
TELEGRAM_CHANNEL_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von Telegram-Kanaelen diejenigen aus, die fuer die Lage relevant sein koennten.
LAGE: {title}
KONTEXT: {description}
TELEGRAM-KANAELE:
{channel_list}
REGELN:
- Waehle alle Kanaele die thematisch relevant sein koennten
- Lieber einen Kanal zu viel als zu wenig auswaehlen
- Beachte die Kategorie und Beschreibung jedes Kanals
- Allgemeine OSINT-Kanaele sind oft relevant
- Bei Cybercrime-Themen: Cybercrime + Leaks Kanaele waehlen
- Bei geopolitischen Themen: Relevante Laender-/Regionskanaele waehlen
Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]"""
X_ACCOUNT_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von X-Accounts (Twitter) diejenigen aus, die fuer die Lage relevant sein koennten.
LAGE: {title}
KONTEXT: {description}
X-ACCOUNTS:
{account_list}
REGELN:
- Waehle alle Accounts die thematisch relevant sein koennten
- Lieber einen Account zu viel als zu wenig auswaehlen
- Beachte die Kategorie und Beschreibung jedes Accounts
- Allgemeine OSINT-Accounts sind oft relevant
- Bei geopolitischen Themen: Relevante Laender-/Regions-Accounts waehlen
Antworte NUR mit einem JSON-Array der Account-Nummern, z.B.: [1, 3, 5, 12]"""
class ResearcherAgent:
"""Führt OSINT-Recherchen über Claude CLI WebSearch durch."""
async def select_relevant_feeds(
self,
title: str,
description: str,
international: bool,
feeds_metadata: list[dict],
) -> tuple[list[dict], dict | None, ClaudeUsage | None]:
"""Lässt Claude die relevanten Feeds für eine Lage vorauswählen.
Nutzt Haiku (CLAUDE_MODEL_FAST) für diese einfache Aufgabe.
Returns:
(ausgewählte Feeds, keywords_by_lang, usage)
keywords_by_lang ist ein Dict {iso_lang: [keyword, ...]} mit mindestens
den Schlüsseln, für die ausgewählte Feeds publizieren ("en" enthält
universelle/lateinische Begriffe, die in jedem Feed matchen).
Bei Fehler: (alle Feeds, None, usage_or_None).
"""
# Feed-Liste als nummerierte Übersicht formatieren (mit Sprache)
feed_lines = []
for i, feed in enumerate(feeds_metadata, 1):
lang = feed.get("primary_language") or "?"
feed_lines.append(
f"{i}. {feed['name']} ({feed['domain']}, {lang}) [{feed['category']}]"
)
prompt = FEED_SELECTION_PROMPT_TEMPLATE.format(
title=title,
description=description or "Keine weitere Beschreibung",
international="Ja" if international else "Nein",
feed_list="\n".join(feed_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
keywords_by_lang: dict | None = None
indices = None
obj = _extract_json_object(result)
if isinstance(obj, dict) and isinstance(obj.get("feeds"), list):
indices = obj["feeds"]
raw_keywords = obj.get("keywords")
# Neues Format: {"de": [...], "en": [...], "ja": [...]}
if isinstance(raw_keywords, dict):
keywords_by_lang = _normalize_keywords_dict(raw_keywords)
# Backward-Format: flache Liste -> als "en" speichern (universell behandelt)
elif isinstance(raw_keywords, list) and raw_keywords:
flat = [str(k).strip() for k in raw_keywords if str(k).strip()]
if flat:
keywords_by_lang = {"en": [w.lower() for w in flat]}
if keywords_by_lang:
logger.info(f"Feed-Selektion Keywords (Sprachen): {keywords_by_lang}")
# Fallback: nacktes Array
if indices is None:
arr = _extract_json_array(result)
if not isinstance(arr, list):
logger.warning(
"Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s",
_truncate_for_log(result),
)
return feeds_metadata, None, usage
indices = arr
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(feeds_metadata):
selected.append(feeds_metadata[idx - 1])
if not selected:
logger.warning("Feed-Selektion: Keine gültigen Indizes, nutze alle Feeds")
return feeds_metadata, keywords_by_lang, usage
logger.info(
f"Feed-Selektion: {len(selected)} von {len(feeds_metadata)} Feeds ausgewählt"
)
return selected, keywords_by_lang, usage
except Exception as e:
logger.warning(f"Feed-Selektion fehlgeschlagen ({e}), nutze alle Feeds")
return feeds_metadata, None, None
async def extract_dynamic_keywords(
self, title: str, recent_headlines: list[str]
) -> tuple[dict | None, ClaudeUsage | None]:
"""Extrahiert aktuelle Suchbegriffe aus den letzten Headlines via Haiku.
Returns:
(keywords_by_lang, usage) oder (None, None) bei Fehler.
keywords_by_lang ist ein Dict {iso_lang: [keyword,...]}, mit mindestens
"de" und "en" gefüllt, optional zusätzlich "ja"/"zh"/"ko"/"ar"/"he"/"fa"/"ru"
bei nicht-lateinischen Sprachräumen.
"""
if not recent_headlines:
return None, None
headlines_text = "\n".join(f"- {h}" for h in recent_headlines[:30])
prompt = KEYWORD_EXTRACTION_PROMPT.format(
title=title,
headlines=headlines_text,
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
# Neues Format: {"pairs": [...], "extra": {"ja": [...]}}
obj = _extract_json_object(result)
pairs_raw = None
extra_raw: dict = {}
if isinstance(obj, dict) and isinstance(obj.get("pairs"), list):
pairs_raw = obj["pairs"]
extra = obj.get("extra")
if isinstance(extra, dict):
extra_raw = extra
else:
# Backward: nacktes Array von {de,en}-Paaren
arr = _extract_json_array(result)
if isinstance(arr, list):
pairs_raw = arr
else:
logger.warning(
"Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s",
_truncate_for_log(result),
)
return None, usage
de_list: list[str] = []
en_list: list[str] = []
for entry in pairs_raw or []:
if not isinstance(entry, dict):
continue
de = str(entry.get("de", "")).lower().strip()
en = str(entry.get("en", "")).lower().strip()
if de and de not in de_list:
de_list.append(de)
if en and en not in en_list:
en_list.append(en)
# Bug-2-Fallback: Lagentitel-Wörter (>=4 Zeichen) zwingend in Keyword-Liste,
# falls Haiku sie weggelassen hat. Verhindert "Buckelwal timmy"-Bug, bei dem
# der Eigenname "timmy" fehlte und damit Headlines mit nur "Buckelwal" durchfielen.
STOPWORDS = {"der", "die", "das", "und", "oder", "von", "vom", "zum", "zur",
"the", "and", "for", "with", "ueber", "über", "von", "for"}
for word in (title or "").lower().split():
w = word.strip(".,;:!?\"\'()[]{}")
if len(w) >= 4 and w not in STOPWORDS:
if w not in en_list:
en_list.append(w)
logger.info(f"Lagentitel-Keyword '{w}' nachträglich injiziert")
keywords_by_lang: dict[str, list[str]] = {}
if de_list:
keywords_by_lang["de"] = de_list
if en_list:
keywords_by_lang["en"] = en_list
# Extra-Sprachen mit übernehmen
extra_norm = _normalize_keywords_dict(extra_raw) if extra_raw else None
if extra_norm:
for lang, kws in extra_norm.items():
keywords_by_lang.setdefault(lang, [])
for k in kws:
if k not in keywords_by_lang[lang]:
keywords_by_lang[lang].append(k)
if not keywords_by_lang:
return None, usage
logger.info(
"Dynamische Keywords (Sprachen): %s",
{k: len(v) for k, v in keywords_by_lang.items()},
)
return keywords_by_lang, usage
except Exception as e:
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
return None, None
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None, output_language: str = "Deutsch", output_language_iso: str = "de", research_language_iso: str | None = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
"""Sucht nach Informationen zu einem Vorfall.
Args:
output_language / output_language_iso: Ausgabesprache (Lagebild-Sprache).
research_language_iso: optionaler Override fuer die Sprache, in der gesucht
werden soll. Default = output_language_iso. Bei jp_demo z.B. 'ja',
waehrend output_language_iso 'de' bleibt (Lagebild deutsch, Recherche japanisch).
Returns:
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
"""
# research_language defaultet auf output_language. Wenn das aber abweicht
# (z.B. jp_demo: research='ja', output='de'), ueberschreiben wir die
# Sprach-Anweisung im Prompt mit einer eigenen, dual-sprachigen Variante.
research_language_iso = (research_language_iso or output_language_iso or "de").lower()
# Display-Name der Recherche-Sprache fuer Prompts ("Japanese", "Russian", ...)
from services.org_settings import language_display as _lang_display
research_language_display = _lang_display(research_language_iso)
# Bevorzugte Web-Quellen als Prompt-Block (optional)
preferred_sources_block = ""
if preferred_sources:
ps_lines = []
for s in preferred_sources:
domain = s.get("domain", "")
name = s.get("name", domain) or domain
if not domain:
continue
ps_lines.append(f"- {domain} ({name})")
if ps_lines:
preferred_sources_block = (
"\nEINGETRAGENE WEB-QUELLEN (vom Betreiber als seriös markiert):\n"
+ "\n".join(ps_lines) + "\n"
"EMPFEHLUNG: Wenn diese Domains thematisch zur Lage passen, suche dort gezielt "
"mit \"site:domain [Suchbegriff]\". Sie sind vertrauenswuerdig eingetragen, ersetzen "
"aber nicht deine sonstige Recherche.\n"
)
# Asymmetrische Sprach-Auswahl: research_language weicht von output_language ab
# -> eigene Anweisung "primaer in research-language, englische Quellen aus der
# Region auch erlaubt". Sonst die bisherige Logik (primary_only vs international).
asymmetric_lang = research_language_iso != output_language_iso
def _build_lang_instruction(deep: bool) -> str:
if asymmetric_lang:
# jp_demo & Co.: Recherche in Quellsprache + lokale Englisch-Outlets.
return (
f"- Fokus liegt auf {research_language_display}-sprachigen Quellen "
f"(Behoerden, Qualitaetszeitungen, oeffentlich-rechtliche Medien dieser Sprache).\n"
f"- Englischsprachige Outlets mit Fokus auf demselben Sprachraum/Region sind "
f"ebenfalls willkommen (z.B. Japan Times, Nikkei Asia, Kyodo English fuer Japan; "
f"Moscow Times English fuer Russland).\n"
f"- Quellen ausserhalb des Sprachraums NUR, wenn sie exklusive Informationen "
f"ueber die Region liefern (z.B. Reuters/AFP/AP-Berichte aus der Region).\n"
f"- Antworte in der Ausgabesprache {output_language} (das Lagebild wird in "
f"{output_language} angezeigt), aber zitiere die Original-Headlines/Quellen unveraendert."
)
if deep:
return lang_deep_international(output_language) if international else lang_deep_primary_only(output_language)
return lang_international(output_language) if international else lang_primary_only(output_language)
if incident_type == "research":
lang_instruction = _build_lang_instruction(deep=True)
# Bestehende Artikel als Kontext für den Prompt aufbereiten
existing_context = ""
if existing_articles:
known_lines = []
for art in existing_articles[:50]: # Max 50 um Prompt nicht zu überladen
source = art.get("source", "Unbekannt")
headline = art.get("headline", "")
url = art.get("source_url", "")
known_lines.append(f"- {source}: {headline} ({url})")
existing_context = (
"BEREITS BEKANNTE QUELLEN — NICHT erneut suchen, finde ANDERE:\n"
+ "\n".join(known_lines) + "\n\n"
"Fokussiere dich auf Quellen und Perspektiven, die in der obigen Liste FEHLEN.\n"
)
prompt = DEEP_RESEARCH_PROMPT_TEMPLATE.format(
title=title, description=description, language_instruction=lang_instruction,
output_language=output_language, existing_context=existing_context,
preferred_sources_block=preferred_sources_block,
)
else:
lang_instruction = _build_lang_instruction(deep=False)
# Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen
existing_context = ""
if existing_articles:
known_lines = []
for art in existing_articles[:30]: # Max 30 bei adhoc (kompakter als research)
source = art.get("source", "Unbekannt")
headline = art.get("headline", "")
known_lines.append(f"- {source}: {headline}")
existing_context = (
"BEREITS BEKANNTE QUELLEN (aus RSS-Feeds und vorherigen Recherchen) — suche ANDERE Blickwinkel und Quellen:\n"
+ "\n".join(known_lines) + "\n"
)
prompt = RESEARCH_PROMPT_TEMPLATE.format(
title=title, description=description, language_instruction=lang_instruction,
output_language=output_language, existing_context=existing_context,
preferred_sources_block=preferred_sources_block,
)
try:
result, usage = await call_claude(prompt)
try:
articles = self._parse_response(result)
except ResearcherParseError as parse_err:
# Claude hat geantwortet, aber kein verwertbares JSON dabei.
# Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden.
logger.warning("Claude-Recherche: %s", parse_err)
return [], usage, True
# Ausgeschlossene Quellen dynamisch aus DB laden
excluded_sources = await self._get_excluded_sources(user_id=user_id)
# Ausgeschlossene Quellen filtern
filtered = []
for article in articles:
source = article.get("source", "").lower()
source_url = article.get("source_url", "").lower()
excluded = False
for excl in excluded_sources:
if excl in source or excl in source_url:
excluded = True
break
if not excluded:
# Bei nur-primary: andersprachige Ergebnisse nachfiltern
if not international and article.get("language", output_language_iso) != output_language_iso:
continue
filtered.append(article)
logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})")
return filtered, usage, False
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Recherche-Fehler: {e}")
return [], None, False
async def _get_excluded_sources(self, user_id: int = None) -> list[str]:
"""Laedt ausgeschlossene Quellen (global + per-User)."""
try:
from source_rules import get_source_rules, get_user_excluded_domains
rules = await get_source_rules()
excluded = list(rules.get("excluded_domains", []))
# User-spezifische Ausschluesse hinzufuegen
if user_id:
user_excluded = await get_user_excluded_domains(user_id)
for domain in user_excluded:
if domain not in excluded:
excluded.append(domain)
return excluded
except Exception as e:
logger.warning(f"Fallback auf config.py fuer Excluded Sources: {e}")
from config import EXCLUDED_SOURCES
return list(EXCLUDED_SOURCES)
def _parse_response(self, response: str) -> list[dict]:
"""Parst die Claude-Antwort als JSON-Array.
Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber
kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude
wirklich keine Treffer hat) wird als [] zurueckgegeben.
"""
text = (response or "").strip()
if not text:
return []
# 1) Direkt parsen (Antwort ist bereits sauberes JSON)
try:
data = json.loads(text)
if isinstance(data, list):
return data
if isinstance(data, dict) and isinstance(data.get("articles"), list):
return data["articles"]
except json.JSONDecodeError:
pass
# 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext)
arr = _extract_json_array(text)
if isinstance(arr, list):
return arr
# 3) JSON-Objekt mit "articles"-Key
obj = _extract_json_object(text)
if isinstance(obj, dict) and isinstance(obj.get("articles"), list):
return obj["articles"]
# 4) Recovery: einzelne Headline-Objekte aus Fliesstext
recovered = []
for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL):
try:
parsed = json.loads(obj_str)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict) and "headline" in parsed:
recovered.append(parsed)
if recovered:
logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered))
return recovered
# Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei.
# Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren.
logger.warning(
"Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s",
len(text),
_truncate_for_log(text),
)
raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})")
async def select_relevant_web_sources(
self,
title: str,
description: str,
web_sources: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die thematisch passenden Web-Quellen auswaehlen (Haiku).
Returns:
(ausgewaehlte Quellen, usage). Bei Fehler: ([], None).
Leere Auswahl ist explizit erlaubt — keine Quelle wird zwangsweise aufgenommen.
"""
if not web_sources:
return [], None
# Bei sehr wenigen Quellen lohnt der Selektions-Call kaum — alle weiterreichen.
if len(web_sources) <= 3:
logger.info("Web-Source-Selektion: Nur %d Quellen, alle uebernehmen", len(web_sources))
return list(web_sources), None
lines = []
for i, src in enumerate(web_sources, 1):
cat = src.get("category", "sonstige")
notes = (src.get("notes") or "")[:80]
domain = src.get("domain", "")
line = f"{i}. {src.get('name', domain)} ({domain}) [{cat}]"
if notes:
line += f" - {notes}"
lines.append(line)
prompt = WEB_SOURCE_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
source_list="\n".join(lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"Web-Source-Selektion: Kein JSON in Antwort, ignoriere Quellen. Sample: %s",
_truncate_for_log(result),
)
return [], usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(web_sources):
selected.append(web_sources[idx - 1])
logger.info(
"Web-Source-Selektion: %d von %d ausgewaehlt%s",
len(selected), len(web_sources),
f" ({', '.join(s.get('domain', '') for s in selected)})" if selected else "",
)
return selected, usage
except Exception as e:
logger.warning("Web-Source-Selektion fehlgeschlagen (%s)", e)
return [], None
async def select_relevant_telegram_channels(
self,
title: str,
description: str,
channels_metadata: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die relevanten Telegram-Kanaele fuer eine Lage vorauswaehlen.
Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe.
Returns:
(ausgewaehlte Kanaele, usage) -- Bei Fehler: (alle Kanaele, None)
"""
if len(channels_metadata) <= 10:
logger.info("Telegram-Selektion: Nur %d Kanaele, nutze alle", len(channels_metadata))
return channels_metadata, None
channel_lines = []
for i, ch in enumerate(channels_metadata, 1):
cat = ch.get("category", "sonstige")
notes = (ch.get("notes") or "")[:100]
channel_lines.append(f"{i}. {ch['name']} [{cat}] - {notes}")
prompt = TELEGRAM_CHANNEL_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
channel_list="\n".join(channel_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s",
_truncate_for_log(result),
)
return channels_metadata, usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(channels_metadata):
selected.append(channels_metadata[idx - 1])
if not selected:
logger.warning("Telegram-Selektion: Keine gueltigen Indizes, nutze alle Kanaele")
return channels_metadata, usage
logger.info(
"Telegram-Selektion: %d von %d Kanaelen ausgewaehlt",
len(selected), len(channels_metadata)
)
return selected, usage
except Exception as e:
logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e)
return channels_metadata, None
async def select_relevant_x_accounts(
self,
title: str,
description: str,
accounts_metadata: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die relevanten X-Accounts fuer eine Lage vorauswaehlen.
Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe.
Returns:
(ausgewaehlte Accounts, usage) -- Bei Fehler: (alle Accounts, None)
"""
if len(accounts_metadata) <= 10:
logger.info("X-Selektion: Nur %d Accounts, nutze alle", len(accounts_metadata))
return accounts_metadata, None
account_lines = []
for i, acc in enumerate(accounts_metadata, 1):
cat = acc.get("category", "sonstige")
notes = (acc.get("notes") or "")[:100]
account_lines.append(f"{i}. {acc['name']} [{cat}] - {notes}")
prompt = X_ACCOUNT_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
account_list="\n".join(account_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"X-Selektion: Kein JSON in Antwort, nutze alle Accounts. Sample: %s",
_truncate_for_log(result),
)
return accounts_metadata, usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(accounts_metadata):
selected.append(accounts_metadata[idx - 1])
if not selected:
logger.warning("X-Selektion: Keine gueltigen Indizes, nutze alle Accounts")
return accounts_metadata, usage
logger.info(
"X-Selektion: %d von %d Accounts ausgewaehlt",
len(selected), len(accounts_metadata)
)
return selected, usage
except Exception as e:
logger.warning("X-Selektion fehlgeschlagen (%s), nutze alle Accounts", e)
return accounts_metadata, None