Dateien
AegisSight-Monitor/src/agents/researcher.py
UserIsMH 0a6208c289 WebSearch: eingetragene Web-Quellen via Haiku vorselektieren
Bisher hatten Quellen vom Typ web_source keine praktische Wirkung auf
die Recherche - sie lagen nur als Marker in der DB. Jetzt werden sie
aktiv in den Recherche-Prompt eingebunden.

Ablauf:
1. Vor dem Hauptaufruf an Opus prüft ein günstiger Haiku-Call alle
   aktiven Web-Quellen des Tenants (plus globale) und wählt die
   thematisch passenden aus. Leere Selektion ist ausdrücklich erlaubt.
2. Die ausgewählten Domains werden dem Recherche-Prompt als
   "EINGETRAGENE WEB-QUELLEN" Block beigegeben mit der Empfehlung,
   gezielt mit "site:domain query" zu suchen, falls thematisch passend.
3. site: ist Empfehlung, kein Zwang - Claude bleibt flexibel und
   ergänzt seine sonstige Recherche.

- source_rules.get_feeds_with_metadata: SELECT um notes-Feld erweitert,
  damit der Selektor besseren Kontext zur Quelle hat.
- ResearcherAgent.select_relevant_web_sources: neuer Helper analog zu
  select_relevant_feeds, mit Skip-Optimierung wenn ≤3 Quellen.
- WEB_SOURCE_SELECTION_PROMPT: explizite Regel "lieber leer als
  pauschal alle", verhindert Token-Verschwendung.
- ResearcherAgent.search: neuer Parameter preferred_sources, beide
  Templates (RESEARCH + DEEP_RESEARCH) bekommen optionalen
  preferred_sources_block.
- Orchestrator._web_search_pipeline: Vorselektion vor researcher.search,
  Token-Usage in usage_acc, Logging der gewählten Domains.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 16:45:17 +02:00

677 Zeilen
29 KiB
Python

"""Researcher-Agent: Sucht nach Informationen via Claude WebSearch."""
import json
import logging
import re
from agents.claude_client import call_claude, ClaudeUsage
from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.researcher")
class ResearcherParseError(Exception):
"""Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte."""
def _truncate_for_log(text: str, limit: int = 600) -> str:
"""Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist."""
if not text:
return ""
snippet = text.strip().replace("\n", "\\n")
if len(snippet) > limit:
snippet = snippet[:limit] + "..."
return snippet
def _extract_json_array(text: str):
"""Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
bracket = text.find("[", idx)
if bracket == -1:
return None
try:
obj, _ = decoder.raw_decode(text, bracket)
except json.JSONDecodeError:
idx = bracket + 1
continue
if isinstance(obj, list):
return obj
idx = bracket + 1
def _extract_json_object(text: str):
"""Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
brace = text.find("{", idx)
if brace == -1:
return None
try:
obj, _ = decoder.raw_decode(text, brace)
except json.JSONDecodeError:
idx = brace + 1
continue
if isinstance(obj, dict):
return obj
idx = brace + 1
RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
AUFTRAG: Suche nach aktuellen Informationen zu folgendem Vorfall:
Titel: {title}
Kontext: {description}
{existing_context}{preferred_sources_block}
REGELN:
- Suche nur bei seriösen Nachrichtenquellen (Nachrichtenagenturen, Qualitätszeitungen, öffentlich-rechtliche Medien, Behörden)
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
{language_instruction}
- Faktenbasiert und neutral - keine Spekulationen
- KRITISCH für source_url: Kopiere die EXAKTE URL aus den WebSearch-Ergebnissen. Erfinde oder konstruiere NIEMALS URLs aus Mustern oder Erinnerung. Wenn du die exakte URL eines Artikels nicht aus den Suchergebnissen hast, lass diesen Artikel komplett weg.
- Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. Spiegel+, Zeit+, SZ+): https://www.removepaywalls.com/search?url=ARTIKEL_URL
- Nutze WebFetch um die 3-5 wichtigsten Artikel vollständig abzurufen und zusammenzufassen
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
Jedes Element hat diese Felder:
- "headline": Originale Überschrift
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
- "source": Name der Quelle (z.B. "Reuters", "tagesschau")
- "source_url": URL des Artikels
- "content_summary": Zusammenfassung des Inhalts (3-5 Sätze, in Ausgabesprache)
- "language": Sprache des Originals (z.B. "de", "en", "fr")
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
DEEP_RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Tiefenrecherche-Agent für ein Lagemonitoring-System.
AUSGABESPRACHE: {output_language}
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
AUFTRAG: Führe eine umfassende, mehrstufige Hintergrundrecherche durch zu:
Titel: {title}
Kontext: {description}
{existing_context}{preferred_sources_block}
RECHERCHE IN 4 PHASEN — Führe ALLE Phasen nacheinander durch:
PHASE 1 — BREITE ERFASSUNG:
Suche nach aktueller Berichterstattung bei Nachrichtenagenturen, Qualitätszeitungen und öffentlich-rechtlichen Medien. Nutze verschiedene Suchbegriffe und Blickwinkel. Ziel: 8-12 Quellen.
PHASE 2 — LÜCKENANALYSE:
Prüfe deine bisherigen Ergebnisse kritisch. Welche Quellentypen fehlen noch?
Typisch fehlen: Parlamentsdokumente, Gesetzestexte, NGO-/UN-Berichte, Think-Tank-Analysen, investigative Langform-Berichte, akademische Einordnungen, Fachmedien.
Welche Akteure, Perspektiven oder Dimensionen sind noch nicht abgedeckt?
PHASE 3 — GEZIELTE TIEFENRECHERCHE:
Suche GEZIELT nach den in Phase 2 identifizierten Lücken:
- Parlamentarische Quellen (Bundestagsdrucksachen, Congress.gov, Hansard, etc.)
- Offizielle Dokumente und Pressemitteilungen von Behörden
- NGO-Berichte und UN-Dokumente (ohchr.org, amnesty.org, hrw.org, etc.)
- Think-Tank-Analysen (IISS, Brookings, SWP, DGAP, Chatham House, etc.)
- Investigative Recherchen und Langform-Artikel
- Fachzeitschriften und akademische Einordnungen
Nutze spezifische Suchbegriffe für institutionelle Quellen. Ziel: 6-10 weitere Quellen.
PHASE 4 — VERIFIKATION UND VERTIEFUNG:
Nutze WebFetch um die 6-10 wichtigsten Artikel vollständig abzurufen und ausführlich zusammenzufassen.
Priorisiere dabei Primärquellen und investigative Berichte.
Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. https://www.removepaywalls.com/search?url=ARTIKEL_URL)
{language_instruction}
ZIEL: 15-25 hochwertige Quellen aus mindestens 5 verschiedenen Quellentypen:
- Nachrichtenagenturen/Qualitätspresse
- Investigative Berichte/Langform
- Parlamentarische/Regierungsquellen
- NGO/Internationale Organisationen
- Fachmedien/Akademische Quellen
AUSSCHLUSS:
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
- KEINE Meinungsblogs ohne Quellenbelege
- KEINE erfundenen oder konstruierten URLs — gib bei source_url NUR die EXAKTE URL zurueck, die WebSearch tatsaechlich angezeigt hat. Wenn du die URL nicht aus den Suchergebnissen kopieren kannst, lass den Artikel weg.
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
Jedes Element hat diese Felder:
- "headline": Originale Überschrift
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
- "source": Name der Quelle (z.B. "netzpolitik.org", "Handelsblatt")
- "source_url": URL des Artikels
- "content_summary": Ausführliche Zusammenfassung des Inhalts (5-8 Sätze, in Ausgabesprache)
- "language": Sprache des Originals (z.B. "de", "en", "fr")
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
# Sprach-Anweisungen
LANG_INTERNATIONAL = "- Suche in Deutsch UND Englisch für internationale Abdeckung"
LANG_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
LANG_DEEP_INTERNATIONAL = "- Suche in Deutsch, Englisch und weiteren relevanten Sprachen"
LANG_DEEP_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
FEED_SELECTION_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyst. Wähle aus dieser Feed-Liste die Feeds aus, die für die Lage relevant sein könnten. Generiere außerdem optimierte Suchbegriffe für das RSS-Matching.
LAGE: {title}
KONTEXT: {description}
INTERNATIONALE QUELLEN: {international}
FEEDS:
{feed_list}
REGELN:
- Wähle alle Feeds die thematisch oder regional relevant sein könnten
- Lieber einen Feed zu viel als zu wenig auswählen
- Bei "Internationale Quellen: Nein": Keine internationalen Feeds auswählen
- Allgemeine Nachrichtenfeeds (tagesschau, Spiegel etc.) sind fast immer relevant
- QUELLENVIELFALT: Wähle pro Domain maximal 2-3 Feeds. Bevorzuge eine breite Mischung aus verschiedenen Quellen statt vieler Feeds derselben Domain.
KEYWORDS-REGELN:
- Generiere 5-10 thematisch relevante Suchbegriffe für das RSS-Matching
- Nur inhaltlich relevante Begriffe (Personen, Orte, Themen, Organisationen)
- KEINE Jahreszahlen (2024, 2025, 2026 etc.)
- KEINE Monatsnamen (Januar, Februar, März etc.)
- KEINE generischen Wörter (aktuell, news, update etc.)
- Begriffe in Kleinbuchstaben
- Sowohl deutsche als auch englische Begriffe wo sinnvoll
Antworte NUR mit einem JSON-Objekt in diesem Format:
{{"feeds": [1, 2, 5, 12], "keywords": ["begriff1", "begriff2", "begriff3"]}}"""
KEYWORD_EXTRACTION_PROMPT = """Analysiere diese aktuellen Nachrichten-Headlines und extrahiere die wichtigsten Suchbegriffe fuer RSS-Feed-Filterung.
THEMA: {title}
AKTUELLE HEADLINES (die letzten Meldungen zu diesem Thema):
{headlines}
AUFGABE:
Generiere 5 Begriffspaare (DE + EN), mit denen neue RSS-Artikel zu diesem Thema gefunden werden.
Ein Artikel gilt als relevant, wenn mindestens 2 dieser Begriffe im Titel oder der Beschreibung vorkommen.
REGELN:
- Die ersten 2 Begriffspaare MUESSEN die zentralen Akteure/Laender/Themen sein (z.B. iran, israel, usa) — also die Begriffe, die in fast JEDEM Artikel zum Thema vorkommen
- Die letzten 3 Begriffspaare sind aktuelle Entwicklungen aus den Headlines (Orte, Akteure, Schluesselwoerter der aktuellen Phase)
- Begriffe muessen so gewaehlt sein, dass sie in kurzen RSS-Titeln matchen (einzelne Woerter, keine Phrasen)
- Alle Begriffe in Kleinbuchstaben
- Exakt 5 Begriffspaare
Antwort NUR als JSON-Array:
[{{"de": "iran", "en": "iran"}}, {{"de": "israel", "en": "israel"}}, {{"de": "teheran", "en": "tehran"}}, {{"de": "luftangriff", "en": "airstrike"}}, {{"de": "trump", "en": "trump"}}]"""
WEB_SOURCE_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Pruefe diese eingetragenen Web-Quellen und waehle nur die thematisch passenden aus.
LAGE: {title}
KONTEXT: {description}
WEB-QUELLEN:
{source_list}
REGELN:
- Waehle nur Quellen, die thematisch tatsaechlich zur Lage passen
- Lieber leere Liste zurueckgeben als pauschal alle aufnehmen
- Behoerden- und institutionelle Quellen sind oft hochwertig, aber nur wenn das Thema passt
- Petitions-Plattformen z.B. nur bei Lagen zu Buergerinitiativen, Gesetzen, oeffentlichem Druck
- Bei reinen Kriegs-/Konflikt-/Tagesnachrichten meistens leere Liste
Antworte NUR mit einem JSON-Array der Quellen-Nummern, z.B. [1, 3] oder []."""
TELEGRAM_CHANNEL_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von Telegram-Kanaelen diejenigen aus, die fuer die Lage relevant sein koennten.
LAGE: {title}
KONTEXT: {description}
TELEGRAM-KANAELE:
{channel_list}
REGELN:
- Waehle alle Kanaele die thematisch relevant sein koennten
- Lieber einen Kanal zu viel als zu wenig auswaehlen
- Beachte die Kategorie und Beschreibung jedes Kanals
- Allgemeine OSINT-Kanaele sind oft relevant
- Bei Cybercrime-Themen: Cybercrime + Leaks Kanaele waehlen
- Bei geopolitischen Themen: Relevante Laender-/Regionskanaele waehlen
Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]"""
class ResearcherAgent:
"""Führt OSINT-Recherchen über Claude CLI WebSearch durch."""
async def select_relevant_feeds(
self,
title: str,
description: str,
international: bool,
feeds_metadata: list[dict],
) -> tuple[list[dict], list[str] | None, ClaudeUsage | None]:
"""Lässt Claude die relevanten Feeds für eine Lage vorauswählen.
Nutzt Haiku (CLAUDE_MODEL_FAST) für diese einfache Aufgabe.
Returns:
(ausgewählte Feeds, keywords, usage) — Bei Fehler: (alle Feeds, None, None)
"""
# Feed-Liste als nummerierte Übersicht formatieren
feed_lines = []
for i, feed in enumerate(feeds_metadata, 1):
feed_lines.append(
f"{i}. {feed['name']} ({feed['domain']}) [{feed['category']}]"
)
prompt = FEED_SELECTION_PROMPT_TEMPLATE.format(
title=title,
description=description or "Keine weitere Beschreibung",
international="Ja" if international else "Nein",
feed_list="\n".join(feed_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
keywords = None
indices = None
# Neues Format: {"feeds": [...], "keywords": [...]}
obj = _extract_json_object(result)
if isinstance(obj, dict) and isinstance(obj.get("feeds"), list):
indices = obj["feeds"]
raw_keywords = obj.get("keywords", [])
if isinstance(raw_keywords, list) and raw_keywords:
keywords = [str(k).lower().strip() for k in raw_keywords if k]
logger.info(f"Feed-Selektion Keywords: {keywords}")
# Fallback: nacktes Array
if indices is None:
arr = _extract_json_array(result)
if not isinstance(arr, list):
logger.warning(
"Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s",
_truncate_for_log(result),
)
return feeds_metadata, None, usage
indices = arr
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(feeds_metadata):
selected.append(feeds_metadata[idx - 1])
if not selected:
logger.warning("Feed-Selektion: Keine gültigen Indizes, nutze alle Feeds")
return feeds_metadata, keywords, usage
logger.info(
f"Feed-Selektion: {len(selected)} von {len(feeds_metadata)} Feeds ausgewählt"
)
return selected, keywords, usage
except Exception as e:
logger.warning(f"Feed-Selektion fehlgeschlagen ({e}), nutze alle Feeds")
return feeds_metadata, None, None
async def extract_dynamic_keywords(
self, title: str, recent_headlines: list[str]
) -> tuple[list[str] | None, ClaudeUsage | None]:
"""Extrahiert aktuelle Suchbegriffe aus den letzten Headlines via Haiku.
Returns:
(flache Keyword-Liste DE+EN, usage) oder (None, None) bei Fehler
"""
if not recent_headlines:
return None, None
headlines_text = "\n".join(f"- {h}" for h in recent_headlines[:30])
prompt = KEYWORD_EXTRACTION_PROMPT.format(
title=title,
headlines=headlines_text,
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
parsed = _extract_json_array(result)
if not isinstance(parsed, list):
logger.warning(
"Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s",
_truncate_for_log(result),
)
return None, usage
# Flache Liste: alle DE + EN Begriffe
keywords = []
for entry in parsed:
if not isinstance(entry, dict):
continue
de = entry.get("de", "").lower().strip()
en = entry.get("en", "").lower().strip()
if de:
keywords.append(de)
if en and en != de:
keywords.append(en)
if keywords:
logger.info(f"Dynamische Keywords ({len(keywords)}): {keywords}")
return keywords if keywords else None, usage
except Exception as e:
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
return None, None
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
"""Sucht nach Informationen zu einem Vorfall.
Returns:
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
"""
from config import OUTPUT_LANGUAGE
# Bevorzugte Web-Quellen als Prompt-Block (optional)
preferred_sources_block = ""
if preferred_sources:
ps_lines = []
for s in preferred_sources:
domain = s.get("domain", "")
name = s.get("name", domain) or domain
if not domain:
continue
ps_lines.append(f"- {domain} ({name})")
if ps_lines:
preferred_sources_block = (
"\nEINGETRAGENE WEB-QUELLEN (vom Betreiber als seriös markiert):\n"
+ "\n".join(ps_lines) + "\n"
"EMPFEHLUNG: Wenn diese Domains thematisch zur Lage passen, suche dort gezielt "
"mit \"site:domain [Suchbegriff]\". Sie sind vertrauenswuerdig eingetragen, ersetzen "
"aber nicht deine sonstige Recherche.\n"
)
if incident_type == "research":
lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY
# Bestehende Artikel als Kontext für den Prompt aufbereiten
existing_context = ""
if existing_articles:
known_lines = []
for art in existing_articles[:50]: # Max 50 um Prompt nicht zu überladen
source = art.get("source", "Unbekannt")
headline = art.get("headline", "")
url = art.get("source_url", "")
known_lines.append(f"- {source}: {headline} ({url})")
existing_context = (
"BEREITS BEKANNTE QUELLEN — NICHT erneut suchen, finde ANDERE:\n"
+ "\n".join(known_lines) + "\n\n"
"Fokussiere dich auf Quellen und Perspektiven, die in der obigen Liste FEHLEN.\n"
)
prompt = DEEP_RESEARCH_PROMPT_TEMPLATE.format(
title=title, description=description, language_instruction=lang_instruction,
output_language=OUTPUT_LANGUAGE, existing_context=existing_context,
preferred_sources_block=preferred_sources_block,
)
else:
lang_instruction = LANG_INTERNATIONAL if international else LANG_GERMAN_ONLY
# Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen
existing_context = ""
if existing_articles:
known_lines = []
for art in existing_articles[:30]: # Max 30 bei adhoc (kompakter als research)
source = art.get("source", "Unbekannt")
headline = art.get("headline", "")
known_lines.append(f"- {source}: {headline}")
existing_context = (
"BEREITS BEKANNTE QUELLEN (aus RSS-Feeds und vorherigen Recherchen) — suche ANDERE Blickwinkel und Quellen:\n"
+ "\n".join(known_lines) + "\n"
)
prompt = RESEARCH_PROMPT_TEMPLATE.format(
title=title, description=description, language_instruction=lang_instruction,
output_language=OUTPUT_LANGUAGE, existing_context=existing_context,
preferred_sources_block=preferred_sources_block,
)
try:
result, usage = await call_claude(prompt)
try:
articles = self._parse_response(result)
except ResearcherParseError as parse_err:
# Claude hat geantwortet, aber kein verwertbares JSON dabei.
# Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden.
logger.warning("Claude-Recherche: %s", parse_err)
return [], usage, True
# Ausgeschlossene Quellen dynamisch aus DB laden
excluded_sources = await self._get_excluded_sources(user_id=user_id)
# Ausgeschlossene Quellen filtern
filtered = []
for article in articles:
source = article.get("source", "").lower()
source_url = article.get("source_url", "").lower()
excluded = False
for excl in excluded_sources:
if excl in source or excl in source_url:
excluded = True
break
if not excluded:
# Bei nur-deutsch: nicht-deutsche Ergebnisse nachfiltern
if not international and article.get("language", "de") != "de":
continue
filtered.append(article)
logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})")
return filtered, usage, False
except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e:
logger.error(f"Recherche-Fehler: {e}")
return [], None, False
async def _get_excluded_sources(self, user_id: int = None) -> list[str]:
"""Laedt ausgeschlossene Quellen (global + per-User)."""
try:
from source_rules import get_source_rules, get_user_excluded_domains
rules = await get_source_rules()
excluded = list(rules.get("excluded_domains", []))
# User-spezifische Ausschluesse hinzufuegen
if user_id:
user_excluded = await get_user_excluded_domains(user_id)
for domain in user_excluded:
if domain not in excluded:
excluded.append(domain)
return excluded
except Exception as e:
logger.warning(f"Fallback auf config.py fuer Excluded Sources: {e}")
from config import EXCLUDED_SOURCES
return list(EXCLUDED_SOURCES)
def _parse_response(self, response: str) -> list[dict]:
"""Parst die Claude-Antwort als JSON-Array.
Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber
kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude
wirklich keine Treffer hat) wird als [] zurueckgegeben.
"""
text = (response or "").strip()
if not text:
return []
# 1) Direkt parsen (Antwort ist bereits sauberes JSON)
try:
data = json.loads(text)
if isinstance(data, list):
return data
if isinstance(data, dict) and isinstance(data.get("articles"), list):
return data["articles"]
except json.JSONDecodeError:
pass
# 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext)
arr = _extract_json_array(text)
if isinstance(arr, list):
return arr
# 3) JSON-Objekt mit "articles"-Key
obj = _extract_json_object(text)
if isinstance(obj, dict) and isinstance(obj.get("articles"), list):
return obj["articles"]
# 4) Recovery: einzelne Headline-Objekte aus Fliesstext
recovered = []
for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL):
try:
parsed = json.loads(obj_str)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict) and "headline" in parsed:
recovered.append(parsed)
if recovered:
logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered))
return recovered
# Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei.
# Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren.
logger.warning(
"Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s",
len(text),
_truncate_for_log(text),
)
raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})")
async def select_relevant_web_sources(
self,
title: str,
description: str,
web_sources: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die thematisch passenden Web-Quellen auswaehlen (Haiku).
Returns:
(ausgewaehlte Quellen, usage). Bei Fehler: ([], None).
Leere Auswahl ist explizit erlaubt — keine Quelle wird zwangsweise aufgenommen.
"""
if not web_sources:
return [], None
# Bei sehr wenigen Quellen lohnt der Selektions-Call kaum — alle weiterreichen.
if len(web_sources) <= 3:
logger.info("Web-Source-Selektion: Nur %d Quellen, alle uebernehmen", len(web_sources))
return list(web_sources), None
lines = []
for i, src in enumerate(web_sources, 1):
cat = src.get("category", "sonstige")
notes = (src.get("notes") or "")[:80]
domain = src.get("domain", "")
line = f"{i}. {src.get('name', domain)} ({domain}) [{cat}]"
if notes:
line += f" - {notes}"
lines.append(line)
prompt = WEB_SOURCE_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
source_list="\n".join(lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"Web-Source-Selektion: Kein JSON in Antwort, ignoriere Quellen. Sample: %s",
_truncate_for_log(result),
)
return [], usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(web_sources):
selected.append(web_sources[idx - 1])
logger.info(
"Web-Source-Selektion: %d von %d ausgewaehlt%s",
len(selected), len(web_sources),
f" ({', '.join(s.get('domain', '') for s in selected)})" if selected else "",
)
return selected, usage
except Exception as e:
logger.warning("Web-Source-Selektion fehlgeschlagen (%s)", e)
return [], None
async def select_relevant_telegram_channels(
self,
title: str,
description: str,
channels_metadata: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die relevanten Telegram-Kanaele fuer eine Lage vorauswaehlen.
Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe.
Returns:
(ausgewaehlte Kanaele, usage) -- Bei Fehler: (alle Kanaele, None)
"""
if len(channels_metadata) <= 10:
logger.info("Telegram-Selektion: Nur %d Kanaele, nutze alle", len(channels_metadata))
return channels_metadata, None
channel_lines = []
for i, ch in enumerate(channels_metadata, 1):
cat = ch.get("category", "sonstige")
notes = (ch.get("notes") or "")[:100]
channel_lines.append(f"{i}. {ch['name']} [{cat}] - {notes}")
prompt = TELEGRAM_CHANNEL_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
channel_list="\n".join(channel_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s",
_truncate_for_log(result),
)
return channels_metadata, usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(channels_metadata):
selected.append(channels_metadata[idx - 1])
if not selected:
logger.warning("Telegram-Selektion: Keine gueltigen Indizes, nutze alle Kanaele")
return channels_metadata, usage
logger.info(
"Telegram-Selektion: %d von %d Kanaelen ausgewaehlt",
len(selected), len(channels_metadata)
)
return selected, usage
except Exception as e:
logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e)
return channels_metadata, None