Bisher hatten Quellen vom Typ web_source keine praktische Wirkung auf die Recherche - sie lagen nur als Marker in der DB. Jetzt werden sie aktiv in den Recherche-Prompt eingebunden. Ablauf: 1. Vor dem Hauptaufruf an Opus prüft ein günstiger Haiku-Call alle aktiven Web-Quellen des Tenants (plus globale) und wählt die thematisch passenden aus. Leere Selektion ist ausdrücklich erlaubt. 2. Die ausgewählten Domains werden dem Recherche-Prompt als "EINGETRAGENE WEB-QUELLEN" Block beigegeben mit der Empfehlung, gezielt mit "site:domain query" zu suchen, falls thematisch passend. 3. site: ist Empfehlung, kein Zwang - Claude bleibt flexibel und ergänzt seine sonstige Recherche. - source_rules.get_feeds_with_metadata: SELECT um notes-Feld erweitert, damit der Selektor besseren Kontext zur Quelle hat. - ResearcherAgent.select_relevant_web_sources: neuer Helper analog zu select_relevant_feeds, mit Skip-Optimierung wenn ≤3 Quellen. - WEB_SOURCE_SELECTION_PROMPT: explizite Regel "lieber leer als pauschal alle", verhindert Token-Verschwendung. - ResearcherAgent.search: neuer Parameter preferred_sources, beide Templates (RESEARCH + DEEP_RESEARCH) bekommen optionalen preferred_sources_block. - Orchestrator._web_search_pipeline: Vorselektion vor researcher.search, Token-Usage in usage_acc, Logging der gewählten Domains. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
677 Zeilen
29 KiB
Python
677 Zeilen
29 KiB
Python
"""Researcher-Agent: Sucht nach Informationen via Claude WebSearch."""
|
|
import json
|
|
import logging
|
|
import re
|
|
from agents.claude_client import call_claude, ClaudeUsage
|
|
from config import CLAUDE_MODEL_FAST
|
|
|
|
logger = logging.getLogger("osint.researcher")
|
|
|
|
|
|
class ResearcherParseError(Exception):
|
|
"""Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte."""
|
|
|
|
|
|
def _truncate_for_log(text: str, limit: int = 600) -> str:
|
|
"""Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist."""
|
|
if not text:
|
|
return ""
|
|
snippet = text.strip().replace("\n", "\\n")
|
|
if len(snippet) > limit:
|
|
snippet = snippet[:limit] + "..."
|
|
return snippet
|
|
|
|
|
|
def _extract_json_array(text: str):
|
|
"""Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
|
|
if not text:
|
|
return None
|
|
decoder = json.JSONDecoder()
|
|
idx = 0
|
|
while True:
|
|
bracket = text.find("[", idx)
|
|
if bracket == -1:
|
|
return None
|
|
try:
|
|
obj, _ = decoder.raw_decode(text, bracket)
|
|
except json.JSONDecodeError:
|
|
idx = bracket + 1
|
|
continue
|
|
if isinstance(obj, list):
|
|
return obj
|
|
idx = bracket + 1
|
|
|
|
|
|
def _extract_json_object(text: str):
|
|
"""Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
|
|
if not text:
|
|
return None
|
|
decoder = json.JSONDecoder()
|
|
idx = 0
|
|
while True:
|
|
brace = text.find("{", idx)
|
|
if brace == -1:
|
|
return None
|
|
try:
|
|
obj, _ = decoder.raw_decode(text, brace)
|
|
except json.JSONDecodeError:
|
|
idx = brace + 1
|
|
continue
|
|
if isinstance(obj, dict):
|
|
return obj
|
|
idx = brace + 1
|
|
|
|
RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System.
|
|
AUSGABESPRACHE: {output_language}
|
|
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
|
|
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
|
|
|
|
AUFTRAG: Suche nach aktuellen Informationen zu folgendem Vorfall:
|
|
Titel: {title}
|
|
Kontext: {description}
|
|
{existing_context}{preferred_sources_block}
|
|
REGELN:
|
|
- Suche nur bei seriösen Nachrichtenquellen (Nachrichtenagenturen, Qualitätszeitungen, öffentlich-rechtliche Medien, Behörden)
|
|
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
|
|
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
|
|
{language_instruction}
|
|
- Faktenbasiert und neutral - keine Spekulationen
|
|
- KRITISCH für source_url: Kopiere die EXAKTE URL aus den WebSearch-Ergebnissen. Erfinde oder konstruiere NIEMALS URLs aus Mustern oder Erinnerung. Wenn du die exakte URL eines Artikels nicht aus den Suchergebnissen hast, lass diesen Artikel komplett weg.
|
|
- Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. Spiegel+, Zeit+, SZ+): https://www.removepaywalls.com/search?url=ARTIKEL_URL
|
|
- Nutze WebFetch um die 3-5 wichtigsten Artikel vollständig abzurufen und zusammenzufassen
|
|
|
|
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
|
|
Jedes Element hat diese Felder:
|
|
- "headline": Originale Überschrift
|
|
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
|
|
- "source": Name der Quelle (z.B. "Reuters", "tagesschau")
|
|
- "source_url": URL des Artikels
|
|
- "content_summary": Zusammenfassung des Inhalts (3-5 Sätze, in Ausgabesprache)
|
|
- "language": Sprache des Originals (z.B. "de", "en", "fr")
|
|
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
|
|
|
|
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
|
|
|
|
DEEP_RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Tiefenrecherche-Agent für ein Lagemonitoring-System.
|
|
AUSGABESPRACHE: {output_language}
|
|
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
|
|
WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss).
|
|
|
|
AUFTRAG: Führe eine umfassende, mehrstufige Hintergrundrecherche durch zu:
|
|
Titel: {title}
|
|
Kontext: {description}
|
|
{existing_context}{preferred_sources_block}
|
|
RECHERCHE IN 4 PHASEN — Führe ALLE Phasen nacheinander durch:
|
|
|
|
PHASE 1 — BREITE ERFASSUNG:
|
|
Suche nach aktueller Berichterstattung bei Nachrichtenagenturen, Qualitätszeitungen und öffentlich-rechtlichen Medien. Nutze verschiedene Suchbegriffe und Blickwinkel. Ziel: 8-12 Quellen.
|
|
|
|
PHASE 2 — LÜCKENANALYSE:
|
|
Prüfe deine bisherigen Ergebnisse kritisch. Welche Quellentypen fehlen noch?
|
|
Typisch fehlen: Parlamentsdokumente, Gesetzestexte, NGO-/UN-Berichte, Think-Tank-Analysen, investigative Langform-Berichte, akademische Einordnungen, Fachmedien.
|
|
Welche Akteure, Perspektiven oder Dimensionen sind noch nicht abgedeckt?
|
|
|
|
PHASE 3 — GEZIELTE TIEFENRECHERCHE:
|
|
Suche GEZIELT nach den in Phase 2 identifizierten Lücken:
|
|
- Parlamentarische Quellen (Bundestagsdrucksachen, Congress.gov, Hansard, etc.)
|
|
- Offizielle Dokumente und Pressemitteilungen von Behörden
|
|
- NGO-Berichte und UN-Dokumente (ohchr.org, amnesty.org, hrw.org, etc.)
|
|
- Think-Tank-Analysen (IISS, Brookings, SWP, DGAP, Chatham House, etc.)
|
|
- Investigative Recherchen und Langform-Artikel
|
|
- Fachzeitschriften und akademische Einordnungen
|
|
Nutze spezifische Suchbegriffe für institutionelle Quellen. Ziel: 6-10 weitere Quellen.
|
|
|
|
PHASE 4 — VERIFIKATION UND VERTIEFUNG:
|
|
Nutze WebFetch um die 6-10 wichtigsten Artikel vollständig abzurufen und ausführlich zusammenzufassen.
|
|
Priorisiere dabei Primärquellen und investigative Berichte.
|
|
Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. https://www.removepaywalls.com/search?url=ARTIKEL_URL)
|
|
|
|
{language_instruction}
|
|
|
|
ZIEL: 15-25 hochwertige Quellen aus mindestens 5 verschiedenen Quellentypen:
|
|
- Nachrichtenagenturen/Qualitätspresse
|
|
- Investigative Berichte/Langform
|
|
- Parlamentarische/Regierungsquellen
|
|
- NGO/Internationale Organisationen
|
|
- Fachmedien/Akademische Quellen
|
|
|
|
AUSSCHLUSS:
|
|
- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit)
|
|
- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.)
|
|
- KEINE Meinungsblogs ohne Quellenbelege
|
|
- KEINE erfundenen oder konstruierten URLs — gib bei source_url NUR die EXAKTE URL zurueck, die WebSearch tatsaechlich angezeigt hat. Wenn du die URL nicht aus den Suchergebnissen kopieren kannst, lass den Artikel weg.
|
|
|
|
Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach.
|
|
Jedes Element hat diese Felder:
|
|
- "headline": Originale Überschrift
|
|
- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht)
|
|
- "source": Name der Quelle (z.B. "netzpolitik.org", "Handelsblatt")
|
|
- "source_url": URL des Artikels
|
|
- "content_summary": Ausführliche Zusammenfassung des Inhalts (5-8 Sätze, in Ausgabesprache)
|
|
- "language": Sprache des Originals (z.B. "de", "en", "fr")
|
|
- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format)
|
|
|
|
Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung."""
|
|
|
|
# Sprach-Anweisungen
|
|
LANG_INTERNATIONAL = "- Suche in Deutsch UND Englisch für internationale Abdeckung"
|
|
LANG_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
|
|
|
|
LANG_DEEP_INTERNATIONAL = "- Suche in Deutsch, Englisch und weiteren relevanten Sprachen"
|
|
LANG_DEEP_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen"
|
|
|
|
|
|
FEED_SELECTION_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyst. Wähle aus dieser Feed-Liste die Feeds aus, die für die Lage relevant sein könnten. Generiere außerdem optimierte Suchbegriffe für das RSS-Matching.
|
|
|
|
LAGE: {title}
|
|
KONTEXT: {description}
|
|
INTERNATIONALE QUELLEN: {international}
|
|
|
|
FEEDS:
|
|
{feed_list}
|
|
|
|
REGELN:
|
|
- Wähle alle Feeds die thematisch oder regional relevant sein könnten
|
|
- Lieber einen Feed zu viel als zu wenig auswählen
|
|
- Bei "Internationale Quellen: Nein": Keine internationalen Feeds auswählen
|
|
- Allgemeine Nachrichtenfeeds (tagesschau, Spiegel etc.) sind fast immer relevant
|
|
- QUELLENVIELFALT: Wähle pro Domain maximal 2-3 Feeds. Bevorzuge eine breite Mischung aus verschiedenen Quellen statt vieler Feeds derselben Domain.
|
|
|
|
KEYWORDS-REGELN:
|
|
- Generiere 5-10 thematisch relevante Suchbegriffe für das RSS-Matching
|
|
- Nur inhaltlich relevante Begriffe (Personen, Orte, Themen, Organisationen)
|
|
- KEINE Jahreszahlen (2024, 2025, 2026 etc.)
|
|
- KEINE Monatsnamen (Januar, Februar, März etc.)
|
|
- KEINE generischen Wörter (aktuell, news, update etc.)
|
|
- Begriffe in Kleinbuchstaben
|
|
- Sowohl deutsche als auch englische Begriffe wo sinnvoll
|
|
|
|
Antworte NUR mit einem JSON-Objekt in diesem Format:
|
|
{{"feeds": [1, 2, 5, 12], "keywords": ["begriff1", "begriff2", "begriff3"]}}"""
|
|
|
|
|
|
KEYWORD_EXTRACTION_PROMPT = """Analysiere diese aktuellen Nachrichten-Headlines und extrahiere die wichtigsten Suchbegriffe fuer RSS-Feed-Filterung.
|
|
|
|
THEMA: {title}
|
|
|
|
AKTUELLE HEADLINES (die letzten Meldungen zu diesem Thema):
|
|
{headlines}
|
|
|
|
AUFGABE:
|
|
Generiere 5 Begriffspaare (DE + EN), mit denen neue RSS-Artikel zu diesem Thema gefunden werden.
|
|
Ein Artikel gilt als relevant, wenn mindestens 2 dieser Begriffe im Titel oder der Beschreibung vorkommen.
|
|
|
|
REGELN:
|
|
- Die ersten 2 Begriffspaare MUESSEN die zentralen Akteure/Laender/Themen sein (z.B. iran, israel, usa) — also die Begriffe, die in fast JEDEM Artikel zum Thema vorkommen
|
|
- Die letzten 3 Begriffspaare sind aktuelle Entwicklungen aus den Headlines (Orte, Akteure, Schluesselwoerter der aktuellen Phase)
|
|
- Begriffe muessen so gewaehlt sein, dass sie in kurzen RSS-Titeln matchen (einzelne Woerter, keine Phrasen)
|
|
- Alle Begriffe in Kleinbuchstaben
|
|
- Exakt 5 Begriffspaare
|
|
|
|
Antwort NUR als JSON-Array:
|
|
[{{"de": "iran", "en": "iran"}}, {{"de": "israel", "en": "israel"}}, {{"de": "teheran", "en": "tehran"}}, {{"de": "luftangriff", "en": "airstrike"}}, {{"de": "trump", "en": "trump"}}]"""
|
|
|
|
|
|
WEB_SOURCE_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Pruefe diese eingetragenen Web-Quellen und waehle nur die thematisch passenden aus.
|
|
|
|
LAGE: {title}
|
|
KONTEXT: {description}
|
|
|
|
WEB-QUELLEN:
|
|
{source_list}
|
|
|
|
REGELN:
|
|
- Waehle nur Quellen, die thematisch tatsaechlich zur Lage passen
|
|
- Lieber leere Liste zurueckgeben als pauschal alle aufnehmen
|
|
- Behoerden- und institutionelle Quellen sind oft hochwertig, aber nur wenn das Thema passt
|
|
- Petitions-Plattformen z.B. nur bei Lagen zu Buergerinitiativen, Gesetzen, oeffentlichem Druck
|
|
- Bei reinen Kriegs-/Konflikt-/Tagesnachrichten meistens leere Liste
|
|
|
|
Antworte NUR mit einem JSON-Array der Quellen-Nummern, z.B. [1, 3] oder []."""
|
|
|
|
|
|
TELEGRAM_CHANNEL_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von Telegram-Kanaelen diejenigen aus, die fuer die Lage relevant sein koennten.
|
|
|
|
LAGE: {title}
|
|
KONTEXT: {description}
|
|
|
|
TELEGRAM-KANAELE:
|
|
{channel_list}
|
|
|
|
REGELN:
|
|
- Waehle alle Kanaele die thematisch relevant sein koennten
|
|
- Lieber einen Kanal zu viel als zu wenig auswaehlen
|
|
- Beachte die Kategorie und Beschreibung jedes Kanals
|
|
- Allgemeine OSINT-Kanaele sind oft relevant
|
|
- Bei Cybercrime-Themen: Cybercrime + Leaks Kanaele waehlen
|
|
- Bei geopolitischen Themen: Relevante Laender-/Regionskanaele waehlen
|
|
|
|
Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]"""
|
|
|
|
|
|
class ResearcherAgent:
|
|
"""Führt OSINT-Recherchen über Claude CLI WebSearch durch."""
|
|
|
|
async def select_relevant_feeds(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
international: bool,
|
|
feeds_metadata: list[dict],
|
|
) -> tuple[list[dict], list[str] | None, ClaudeUsage | None]:
|
|
"""Lässt Claude die relevanten Feeds für eine Lage vorauswählen.
|
|
|
|
Nutzt Haiku (CLAUDE_MODEL_FAST) für diese einfache Aufgabe.
|
|
|
|
Returns:
|
|
(ausgewählte Feeds, keywords, usage) — Bei Fehler: (alle Feeds, None, None)
|
|
"""
|
|
# Feed-Liste als nummerierte Übersicht formatieren
|
|
feed_lines = []
|
|
for i, feed in enumerate(feeds_metadata, 1):
|
|
feed_lines.append(
|
|
f"{i}. {feed['name']} ({feed['domain']}) [{feed['category']}]"
|
|
)
|
|
|
|
prompt = FEED_SELECTION_PROMPT_TEMPLATE.format(
|
|
title=title,
|
|
description=description or "Keine weitere Beschreibung",
|
|
international="Ja" if international else "Nein",
|
|
feed_list="\n".join(feed_lines),
|
|
)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
|
|
|
keywords = None
|
|
indices = None
|
|
|
|
# Neues Format: {"feeds": [...], "keywords": [...]}
|
|
obj = _extract_json_object(result)
|
|
if isinstance(obj, dict) and isinstance(obj.get("feeds"), list):
|
|
indices = obj["feeds"]
|
|
raw_keywords = obj.get("keywords", [])
|
|
if isinstance(raw_keywords, list) and raw_keywords:
|
|
keywords = [str(k).lower().strip() for k in raw_keywords if k]
|
|
logger.info(f"Feed-Selektion Keywords: {keywords}")
|
|
|
|
# Fallback: nacktes Array
|
|
if indices is None:
|
|
arr = _extract_json_array(result)
|
|
if not isinstance(arr, list):
|
|
logger.warning(
|
|
"Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s",
|
|
_truncate_for_log(result),
|
|
)
|
|
return feeds_metadata, None, usage
|
|
indices = arr
|
|
|
|
selected = []
|
|
for idx in indices:
|
|
if isinstance(idx, int) and 1 <= idx <= len(feeds_metadata):
|
|
selected.append(feeds_metadata[idx - 1])
|
|
|
|
if not selected:
|
|
logger.warning("Feed-Selektion: Keine gültigen Indizes, nutze alle Feeds")
|
|
return feeds_metadata, keywords, usage
|
|
|
|
logger.info(
|
|
f"Feed-Selektion: {len(selected)} von {len(feeds_metadata)} Feeds ausgewählt"
|
|
)
|
|
return selected, keywords, usage
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Feed-Selektion fehlgeschlagen ({e}), nutze alle Feeds")
|
|
return feeds_metadata, None, None
|
|
|
|
|
|
async def extract_dynamic_keywords(
|
|
self, title: str, recent_headlines: list[str]
|
|
) -> tuple[list[str] | None, ClaudeUsage | None]:
|
|
"""Extrahiert aktuelle Suchbegriffe aus den letzten Headlines via Haiku.
|
|
|
|
Returns:
|
|
(flache Keyword-Liste DE+EN, usage) oder (None, None) bei Fehler
|
|
"""
|
|
if not recent_headlines:
|
|
return None, None
|
|
|
|
headlines_text = "\n".join(f"- {h}" for h in recent_headlines[:30])
|
|
prompt = KEYWORD_EXTRACTION_PROMPT.format(
|
|
title=title,
|
|
headlines=headlines_text,
|
|
)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
|
|
|
parsed = _extract_json_array(result)
|
|
if not isinstance(parsed, list):
|
|
logger.warning(
|
|
"Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s",
|
|
_truncate_for_log(result),
|
|
)
|
|
return None, usage
|
|
|
|
# Flache Liste: alle DE + EN Begriffe
|
|
keywords = []
|
|
for entry in parsed:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
de = entry.get("de", "").lower().strip()
|
|
en = entry.get("en", "").lower().strip()
|
|
if de:
|
|
keywords.append(de)
|
|
if en and en != de:
|
|
keywords.append(en)
|
|
|
|
if keywords:
|
|
logger.info(f"Dynamische Keywords ({len(keywords)}): {keywords}")
|
|
return keywords if keywords else None, usage
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
|
|
return None, None
|
|
|
|
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None, preferred_sources: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
|
|
"""Sucht nach Informationen zu einem Vorfall.
|
|
|
|
Returns:
|
|
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
|
|
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
|
|
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
|
|
"""
|
|
from config import OUTPUT_LANGUAGE
|
|
|
|
# Bevorzugte Web-Quellen als Prompt-Block (optional)
|
|
preferred_sources_block = ""
|
|
if preferred_sources:
|
|
ps_lines = []
|
|
for s in preferred_sources:
|
|
domain = s.get("domain", "")
|
|
name = s.get("name", domain) or domain
|
|
if not domain:
|
|
continue
|
|
ps_lines.append(f"- {domain} ({name})")
|
|
if ps_lines:
|
|
preferred_sources_block = (
|
|
"\nEINGETRAGENE WEB-QUELLEN (vom Betreiber als seriös markiert):\n"
|
|
+ "\n".join(ps_lines) + "\n"
|
|
"EMPFEHLUNG: Wenn diese Domains thematisch zur Lage passen, suche dort gezielt "
|
|
"mit \"site:domain [Suchbegriff]\". Sie sind vertrauenswuerdig eingetragen, ersetzen "
|
|
"aber nicht deine sonstige Recherche.\n"
|
|
)
|
|
|
|
if incident_type == "research":
|
|
lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY
|
|
# Bestehende Artikel als Kontext für den Prompt aufbereiten
|
|
existing_context = ""
|
|
if existing_articles:
|
|
known_lines = []
|
|
for art in existing_articles[:50]: # Max 50 um Prompt nicht zu überladen
|
|
source = art.get("source", "Unbekannt")
|
|
headline = art.get("headline", "")
|
|
url = art.get("source_url", "")
|
|
known_lines.append(f"- {source}: {headline} ({url})")
|
|
existing_context = (
|
|
"BEREITS BEKANNTE QUELLEN — NICHT erneut suchen, finde ANDERE:\n"
|
|
+ "\n".join(known_lines) + "\n\n"
|
|
"Fokussiere dich auf Quellen und Perspektiven, die in der obigen Liste FEHLEN.\n"
|
|
)
|
|
prompt = DEEP_RESEARCH_PROMPT_TEMPLATE.format(
|
|
title=title, description=description, language_instruction=lang_instruction,
|
|
output_language=OUTPUT_LANGUAGE, existing_context=existing_context,
|
|
preferred_sources_block=preferred_sources_block,
|
|
)
|
|
else:
|
|
lang_instruction = LANG_INTERNATIONAL if international else LANG_GERMAN_ONLY
|
|
# Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen
|
|
existing_context = ""
|
|
if existing_articles:
|
|
known_lines = []
|
|
for art in existing_articles[:30]: # Max 30 bei adhoc (kompakter als research)
|
|
source = art.get("source", "Unbekannt")
|
|
headline = art.get("headline", "")
|
|
known_lines.append(f"- {source}: {headline}")
|
|
existing_context = (
|
|
"BEREITS BEKANNTE QUELLEN (aus RSS-Feeds und vorherigen Recherchen) — suche ANDERE Blickwinkel und Quellen:\n"
|
|
+ "\n".join(known_lines) + "\n"
|
|
)
|
|
prompt = RESEARCH_PROMPT_TEMPLATE.format(
|
|
title=title, description=description, language_instruction=lang_instruction,
|
|
output_language=OUTPUT_LANGUAGE, existing_context=existing_context,
|
|
preferred_sources_block=preferred_sources_block,
|
|
)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt)
|
|
try:
|
|
articles = self._parse_response(result)
|
|
except ResearcherParseError as parse_err:
|
|
# Claude hat geantwortet, aber kein verwertbares JSON dabei.
|
|
# Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden.
|
|
logger.warning("Claude-Recherche: %s", parse_err)
|
|
return [], usage, True
|
|
|
|
# Ausgeschlossene Quellen dynamisch aus DB laden
|
|
excluded_sources = await self._get_excluded_sources(user_id=user_id)
|
|
|
|
# Ausgeschlossene Quellen filtern
|
|
filtered = []
|
|
for article in articles:
|
|
source = article.get("source", "").lower()
|
|
source_url = article.get("source_url", "").lower()
|
|
excluded = False
|
|
for excl in excluded_sources:
|
|
if excl in source or excl in source_url:
|
|
excluded = True
|
|
break
|
|
if not excluded:
|
|
# Bei nur-deutsch: nicht-deutsche Ergebnisse nachfiltern
|
|
if not international and article.get("language", "de") != "de":
|
|
continue
|
|
filtered.append(article)
|
|
|
|
logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})")
|
|
return filtered, usage, False
|
|
|
|
except TimeoutError:
|
|
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
|
|
except Exception as e:
|
|
logger.error(f"Recherche-Fehler: {e}")
|
|
return [], None, False
|
|
|
|
async def _get_excluded_sources(self, user_id: int = None) -> list[str]:
|
|
"""Laedt ausgeschlossene Quellen (global + per-User)."""
|
|
try:
|
|
from source_rules import get_source_rules, get_user_excluded_domains
|
|
rules = await get_source_rules()
|
|
excluded = list(rules.get("excluded_domains", []))
|
|
|
|
# User-spezifische Ausschluesse hinzufuegen
|
|
if user_id:
|
|
user_excluded = await get_user_excluded_domains(user_id)
|
|
for domain in user_excluded:
|
|
if domain not in excluded:
|
|
excluded.append(domain)
|
|
|
|
return excluded
|
|
except Exception as e:
|
|
logger.warning(f"Fallback auf config.py fuer Excluded Sources: {e}")
|
|
from config import EXCLUDED_SOURCES
|
|
return list(EXCLUDED_SOURCES)
|
|
|
|
def _parse_response(self, response: str) -> list[dict]:
|
|
"""Parst die Claude-Antwort als JSON-Array.
|
|
|
|
Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber
|
|
kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude
|
|
wirklich keine Treffer hat) wird als [] zurueckgegeben.
|
|
"""
|
|
text = (response or "").strip()
|
|
if not text:
|
|
return []
|
|
|
|
# 1) Direkt parsen (Antwort ist bereits sauberes JSON)
|
|
try:
|
|
data = json.loads(text)
|
|
if isinstance(data, list):
|
|
return data
|
|
if isinstance(data, dict) and isinstance(data.get("articles"), list):
|
|
return data["articles"]
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext)
|
|
arr = _extract_json_array(text)
|
|
if isinstance(arr, list):
|
|
return arr
|
|
|
|
# 3) JSON-Objekt mit "articles"-Key
|
|
obj = _extract_json_object(text)
|
|
if isinstance(obj, dict) and isinstance(obj.get("articles"), list):
|
|
return obj["articles"]
|
|
|
|
# 4) Recovery: einzelne Headline-Objekte aus Fliesstext
|
|
recovered = []
|
|
for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL):
|
|
try:
|
|
parsed = json.loads(obj_str)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if isinstance(parsed, dict) and "headline" in parsed:
|
|
recovered.append(parsed)
|
|
if recovered:
|
|
logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered))
|
|
return recovered
|
|
|
|
# Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei.
|
|
# Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren.
|
|
logger.warning(
|
|
"Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s",
|
|
len(text),
|
|
_truncate_for_log(text),
|
|
)
|
|
raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})")
|
|
|
|
async def select_relevant_web_sources(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
web_sources: list[dict],
|
|
) -> tuple[list[dict], ClaudeUsage | None]:
|
|
"""Laesst Claude die thematisch passenden Web-Quellen auswaehlen (Haiku).
|
|
|
|
Returns:
|
|
(ausgewaehlte Quellen, usage). Bei Fehler: ([], None).
|
|
Leere Auswahl ist explizit erlaubt — keine Quelle wird zwangsweise aufgenommen.
|
|
"""
|
|
if not web_sources:
|
|
return [], None
|
|
|
|
# Bei sehr wenigen Quellen lohnt der Selektions-Call kaum — alle weiterreichen.
|
|
if len(web_sources) <= 3:
|
|
logger.info("Web-Source-Selektion: Nur %d Quellen, alle uebernehmen", len(web_sources))
|
|
return list(web_sources), None
|
|
|
|
lines = []
|
|
for i, src in enumerate(web_sources, 1):
|
|
cat = src.get("category", "sonstige")
|
|
notes = (src.get("notes") or "")[:80]
|
|
domain = src.get("domain", "")
|
|
line = f"{i}. {src.get('name', domain)} ({domain}) [{cat}]"
|
|
if notes:
|
|
line += f" - {notes}"
|
|
lines.append(line)
|
|
|
|
prompt = WEB_SOURCE_SELECTION_PROMPT.format(
|
|
title=title,
|
|
description=description or "Keine weitere Beschreibung",
|
|
source_list="\n".join(lines),
|
|
)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
|
indices = _extract_json_array(result)
|
|
if not isinstance(indices, list):
|
|
logger.warning(
|
|
"Web-Source-Selektion: Kein JSON in Antwort, ignoriere Quellen. Sample: %s",
|
|
_truncate_for_log(result),
|
|
)
|
|
return [], usage
|
|
|
|
selected = []
|
|
for idx in indices:
|
|
if isinstance(idx, int) and 1 <= idx <= len(web_sources):
|
|
selected.append(web_sources[idx - 1])
|
|
|
|
logger.info(
|
|
"Web-Source-Selektion: %d von %d ausgewaehlt%s",
|
|
len(selected), len(web_sources),
|
|
f" ({', '.join(s.get('domain', '') for s in selected)})" if selected else "",
|
|
)
|
|
return selected, usage
|
|
except Exception as e:
|
|
logger.warning("Web-Source-Selektion fehlgeschlagen (%s)", e)
|
|
return [], None
|
|
|
|
async def select_relevant_telegram_channels(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
channels_metadata: list[dict],
|
|
) -> tuple[list[dict], ClaudeUsage | None]:
|
|
"""Laesst Claude die relevanten Telegram-Kanaele fuer eine Lage vorauswaehlen.
|
|
|
|
Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe.
|
|
|
|
Returns:
|
|
(ausgewaehlte Kanaele, usage) -- Bei Fehler: (alle Kanaele, None)
|
|
"""
|
|
if len(channels_metadata) <= 10:
|
|
logger.info("Telegram-Selektion: Nur %d Kanaele, nutze alle", len(channels_metadata))
|
|
return channels_metadata, None
|
|
|
|
channel_lines = []
|
|
for i, ch in enumerate(channels_metadata, 1):
|
|
cat = ch.get("category", "sonstige")
|
|
notes = (ch.get("notes") or "")[:100]
|
|
channel_lines.append(f"{i}. {ch['name']} [{cat}] - {notes}")
|
|
|
|
prompt = TELEGRAM_CHANNEL_SELECTION_PROMPT.format(
|
|
title=title,
|
|
description=description or "Keine weitere Beschreibung",
|
|
channel_list="\n".join(channel_lines),
|
|
)
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
|
|
|
indices = _extract_json_array(result)
|
|
if not isinstance(indices, list):
|
|
logger.warning(
|
|
"Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s",
|
|
_truncate_for_log(result),
|
|
)
|
|
return channels_metadata, usage
|
|
|
|
selected = []
|
|
for idx in indices:
|
|
if isinstance(idx, int) and 1 <= idx <= len(channels_metadata):
|
|
selected.append(channels_metadata[idx - 1])
|
|
|
|
if not selected:
|
|
logger.warning("Telegram-Selektion: Keine gueltigen Indizes, nutze alle Kanaele")
|
|
return channels_metadata, usage
|
|
|
|
logger.info(
|
|
"Telegram-Selektion: %d von %d Kanaelen ausgewaehlt",
|
|
len(selected), len(channels_metadata)
|
|
)
|
|
return selected, usage
|
|
|
|
except Exception as e:
|
|
logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e)
|
|
return channels_metadata, None
|
|
|