"""Researcher-Agent: Sucht nach Informationen via Claude WebSearch.""" import json import logging import re from agents.claude_client import call_claude, ClaudeUsage from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.researcher") RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System. AUSGABESPRACHE: {output_language} - KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze. WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). AUFTRAG: Suche nach aktuellen Informationen zu folgendem Vorfall: Titel: {title} Kontext: {description} {existing_context} REGELN: - Suche nur bei seriösen Nachrichtenquellen (Nachrichtenagenturen, Qualitätszeitungen, öffentlich-rechtliche Medien, Behörden) - KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit) - KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.) {language_instruction} - Faktenbasiert und neutral - keine Spekulationen - KRITISCH für source_url: Kopiere die EXAKTE URL aus den WebSearch-Ergebnissen. Erfinde oder konstruiere NIEMALS URLs aus Mustern oder Erinnerung. Wenn du die exakte URL eines Artikels nicht aus den Suchergebnissen hast, lass diesen Artikel komplett weg. - Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. Spiegel+, Zeit+, SZ+): https://www.removepaywalls.com/search?url=ARTIKEL_URL - Nutze WebFetch um die 3-5 wichtigsten Artikel vollständig abzurufen und zusammenzufassen Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach. Jedes Element hat diese Felder: - "headline": Originale Überschrift - "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht) - "source": Name der Quelle (z.B. "Reuters", "tagesschau") - "source_url": URL des Artikels - "content_summary": Zusammenfassung des Inhalts (3-5 Sätze, in Ausgabesprache) - "language": Sprache des Originals (z.B. "de", "en", "fr") - "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format) Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" DEEP_RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Tiefenrecherche-Agent für ein Lagemonitoring-System. AUSGABESPRACHE: {output_language} - KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze. WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). AUFTRAG: Führe eine umfassende, mehrstufige Hintergrundrecherche durch zu: Titel: {title} Kontext: {description} {existing_context} RECHERCHE IN 4 PHASEN — Führe ALLE Phasen nacheinander durch: PHASE 1 — BREITE ERFASSUNG: Suche nach aktueller Berichterstattung bei Nachrichtenagenturen, Qualitätszeitungen und öffentlich-rechtlichen Medien. Nutze verschiedene Suchbegriffe und Blickwinkel. Ziel: 8-12 Quellen. PHASE 2 — LÜCKENANALYSE: Prüfe deine bisherigen Ergebnisse kritisch. Welche Quellentypen fehlen noch? Typisch fehlen: Parlamentsdokumente, Gesetzestexte, NGO-/UN-Berichte, Think-Tank-Analysen, investigative Langform-Berichte, akademische Einordnungen, Fachmedien. Welche Akteure, Perspektiven oder Dimensionen sind noch nicht abgedeckt? PHASE 3 — GEZIELTE TIEFENRECHERCHE: Suche GEZIELT nach den in Phase 2 identifizierten Lücken: - Parlamentarische Quellen (Bundestagsdrucksachen, Congress.gov, Hansard, etc.) - Offizielle Dokumente und Pressemitteilungen von Behörden - NGO-Berichte und UN-Dokumente (ohchr.org, amnesty.org, hrw.org, etc.) - Think-Tank-Analysen (IISS, Brookings, SWP, DGAP, Chatham House, etc.) - Investigative Recherchen und Langform-Artikel - Fachzeitschriften und akademische Einordnungen Nutze spezifische Suchbegriffe für institutionelle Quellen. Ziel: 6-10 weitere Quellen. PHASE 4 — VERIFIKATION UND VERTIEFUNG: Nutze WebFetch um die 6-10 wichtigsten Artikel vollständig abzurufen und ausführlich zusammenzufassen. Priorisiere dabei Primärquellen und investigative Berichte. Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. https://www.removepaywalls.com/search?url=ARTIKEL_URL) {language_instruction} ZIEL: 15-25 hochwertige Quellen aus mindestens 5 verschiedenen Quellentypen: - Nachrichtenagenturen/Qualitätspresse - Investigative Berichte/Langform - Parlamentarische/Regierungsquellen - NGO/Internationale Organisationen - Fachmedien/Akademische Quellen AUSSCHLUSS: - KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit) - KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.) - KEINE Meinungsblogs ohne Quellenbelege - KEINE erfundenen oder konstruierten URLs — gib bei source_url NUR die EXAKTE URL zurueck, die WebSearch tatsaechlich angezeigt hat. Wenn du die URL nicht aus den Suchergebnissen kopieren kannst, lass den Artikel weg. Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach. Jedes Element hat diese Felder: - "headline": Originale Überschrift - "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht) - "source": Name der Quelle (z.B. "netzpolitik.org", "Handelsblatt") - "source_url": URL des Artikels - "content_summary": Ausführliche Zusammenfassung des Inhalts (5-8 Sätze, in Ausgabesprache) - "language": Sprache des Originals (z.B. "de", "en", "fr") - "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format) Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" # Sprach-Anweisungen LANG_INTERNATIONAL = "- Suche in Deutsch UND Englisch für internationale Abdeckung" LANG_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen" LANG_DEEP_INTERNATIONAL = "- Suche in Deutsch, Englisch und weiteren relevanten Sprachen" LANG_DEEP_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen" FEED_SELECTION_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyst. Wähle aus dieser Feed-Liste die Feeds aus, die für die Lage relevant sein könnten. Generiere außerdem optimierte Suchbegriffe für das RSS-Matching. LAGE: {title} KONTEXT: {description} INTERNATIONALE QUELLEN: {international} FEEDS: {feed_list} REGELN: - Wähle alle Feeds die thematisch oder regional relevant sein könnten - Lieber einen Feed zu viel als zu wenig auswählen - Bei "Internationale Quellen: Nein": Keine internationalen Feeds auswählen - Allgemeine Nachrichtenfeeds (tagesschau, Spiegel etc.) sind fast immer relevant - QUELLENVIELFALT: Wähle pro Domain maximal 2-3 Feeds. Bevorzuge eine breite Mischung aus verschiedenen Quellen statt vieler Feeds derselben Domain. KEYWORDS-REGELN: - Generiere 5-10 thematisch relevante Suchbegriffe für das RSS-Matching - Nur inhaltlich relevante Begriffe (Personen, Orte, Themen, Organisationen) - KEINE Jahreszahlen (2024, 2025, 2026 etc.) - KEINE Monatsnamen (Januar, Februar, März etc.) - KEINE generischen Wörter (aktuell, news, update etc.) - Begriffe in Kleinbuchstaben - Sowohl deutsche als auch englische Begriffe wo sinnvoll Antworte NUR mit einem JSON-Objekt in diesem Format: {{"feeds": [1, 2, 5, 12], "keywords": ["begriff1", "begriff2", "begriff3"]}}""" KEYWORD_EXTRACTION_PROMPT = """Analysiere diese aktuellen Nachrichten-Headlines und extrahiere die wichtigsten Suchbegriffe fuer RSS-Feed-Filterung. THEMA: {title} AKTUELLE HEADLINES (die letzten Meldungen zu diesem Thema): {headlines} AUFGABE: Generiere 5 Begriffspaare (DE + EN), mit denen neue RSS-Artikel zu diesem Thema gefunden werden. Ein Artikel gilt als relevant, wenn mindestens 2 dieser Begriffe im Titel oder der Beschreibung vorkommen. REGELN: - Die ersten 2 Begriffspaare MUESSEN die zentralen Akteure/Laender/Themen sein (z.B. iran, israel, usa) — also die Begriffe, die in fast JEDEM Artikel zum Thema vorkommen - Die letzten 3 Begriffspaare sind aktuelle Entwicklungen aus den Headlines (Orte, Akteure, Schluesselwoerter der aktuellen Phase) - Begriffe muessen so gewaehlt sein, dass sie in kurzen RSS-Titeln matchen (einzelne Woerter, keine Phrasen) - Alle Begriffe in Kleinbuchstaben - Exakt 5 Begriffspaare Antwort NUR als JSON-Array: [{{"de": "iran", "en": "iran"}}, {{"de": "israel", "en": "israel"}}, {{"de": "teheran", "en": "tehran"}}, {{"de": "luftangriff", "en": "airstrike"}}, {{"de": "trump", "en": "trump"}}]""" TELEGRAM_CHANNEL_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von Telegram-Kanaelen diejenigen aus, die fuer die Lage relevant sein koennten. LAGE: {title} KONTEXT: {description} TELEGRAM-KANAELE: {channel_list} REGELN: - Waehle alle Kanaele die thematisch relevant sein koennten - Lieber einen Kanal zu viel als zu wenig auswaehlen - Beachte die Kategorie und Beschreibung jedes Kanals - Allgemeine OSINT-Kanaele sind oft relevant - Bei Cybercrime-Themen: Cybercrime + Leaks Kanaele waehlen - Bei geopolitischen Themen: Relevante Laender-/Regionskanaele waehlen Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]""" class ResearcherAgent: """Führt OSINT-Recherchen über Claude CLI WebSearch durch.""" async def select_relevant_feeds( self, title: str, description: str, international: bool, feeds_metadata: list[dict], ) -> tuple[list[dict], list[str] | None, ClaudeUsage | None]: """Lässt Claude die relevanten Feeds für eine Lage vorauswählen. Nutzt Haiku (CLAUDE_MODEL_FAST) für diese einfache Aufgabe. Returns: (ausgewählte Feeds, keywords, usage) — Bei Fehler: (alle Feeds, None, None) """ # Feed-Liste als nummerierte Übersicht formatieren feed_lines = [] for i, feed in enumerate(feeds_metadata, 1): feed_lines.append( f"{i}. {feed['name']} ({feed['domain']}) [{feed['category']}]" ) prompt = FEED_SELECTION_PROMPT_TEMPLATE.format( title=title, description=description or "Keine weitere Beschreibung", international="Ja" if international else "Nein", feed_list="\n".join(feed_lines), ) try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) # Neues Format: JSON-Objekt mit "feeds" und "keywords" keywords = None indices = None # Versuche JSON-Objekt zu parsen obj_match = re.search(r'\{[^{}]*"feeds"\s*:\s*\[[\d\s,]+\][^{}]*\}', result, re.DOTALL) if obj_match: try: obj = json.loads(obj_match.group()) indices = obj.get("feeds", []) raw_keywords = obj.get("keywords", []) if isinstance(raw_keywords, list) and raw_keywords: keywords = [str(k).lower().strip() for k in raw_keywords if k] logger.info(f"Feed-Selektion Keywords: {keywords}") except (json.JSONDecodeError, ValueError): pass # Fallback: altes Array-Format if indices is None: arr_match = re.search(r'\[[\d\s,]+\]', result) if not arr_match: logger.warning("Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds") return feeds_metadata, None, usage indices = json.loads(arr_match.group()) selected = [] for idx in indices: if isinstance(idx, int) and 1 <= idx <= len(feeds_metadata): selected.append(feeds_metadata[idx - 1]) if not selected: logger.warning("Feed-Selektion: Keine gültigen Indizes, nutze alle Feeds") return feeds_metadata, keywords, usage logger.info( f"Feed-Selektion: {len(selected)} von {len(feeds_metadata)} Feeds ausgewählt" ) return selected, keywords, usage except Exception as e: logger.warning(f"Feed-Selektion fehlgeschlagen ({e}), nutze alle Feeds") return feeds_metadata, None, None async def extract_dynamic_keywords( self, title: str, recent_headlines: list[str] ) -> tuple[list[str] | None, ClaudeUsage | None]: """Extrahiert aktuelle Suchbegriffe aus den letzten Headlines via Haiku. Returns: (flache Keyword-Liste DE+EN, usage) oder (None, None) bei Fehler """ if not recent_headlines: return None, None headlines_text = "\n".join(f"- {h}" for h in recent_headlines[:30]) prompt = KEYWORD_EXTRACTION_PROMPT.format( title=title, headlines=headlines_text, ) try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) parsed = None try: parsed = json.loads(result) except json.JSONDecodeError: match = re.search(r'\[.*\]', result, re.DOTALL) if match: try: parsed = json.loads(match.group()) except json.JSONDecodeError: pass if not parsed or not isinstance(parsed, list): logger.warning("Keyword-Extraktion: Kein gueltiges JSON erhalten") return None, usage # Flache Liste: alle DE + EN Begriffe keywords = [] for entry in parsed: if not isinstance(entry, dict): continue de = entry.get("de", "").lower().strip() en = entry.get("en", "").lower().strip() if de: keywords.append(de) if en and en != de: keywords.append(en) if keywords: logger.info(f"Dynamische Keywords ({len(keywords)}): {keywords}") return keywords if keywords else None, usage except Exception as e: logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}") return None, None async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None]: """Sucht nach Informationen zu einem Vorfall.""" from config import OUTPUT_LANGUAGE if incident_type == "research": lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY # Bestehende Artikel als Kontext für den Prompt aufbereiten existing_context = "" if existing_articles: known_lines = [] for art in existing_articles[:50]: # Max 50 um Prompt nicht zu überladen source = art.get("source", "Unbekannt") headline = art.get("headline", "") url = art.get("source_url", "") known_lines.append(f"- {source}: {headline} ({url})") existing_context = ( "BEREITS BEKANNTE QUELLEN — NICHT erneut suchen, finde ANDERE:\n" + "\n".join(known_lines) + "\n\n" "Fokussiere dich auf Quellen und Perspektiven, die in der obigen Liste FEHLEN.\n" ) prompt = DEEP_RESEARCH_PROMPT_TEMPLATE.format( title=title, description=description, language_instruction=lang_instruction, output_language=OUTPUT_LANGUAGE, existing_context=existing_context, ) else: lang_instruction = LANG_INTERNATIONAL if international else LANG_GERMAN_ONLY # Bestehende Artikel als Kontext: bei Folge-Refreshes findet Claude andere Quellen existing_context = "" if existing_articles: known_lines = [] for art in existing_articles[:30]: # Max 30 bei adhoc (kompakter als research) source = art.get("source", "Unbekannt") headline = art.get("headline", "") known_lines.append(f"- {source}: {headline}") existing_context = ( "BEREITS BEKANNTE QUELLEN (aus RSS-Feeds und vorherigen Recherchen) — suche ANDERE Blickwinkel und Quellen:\n" + "\n".join(known_lines) + "\n" ) prompt = RESEARCH_PROMPT_TEMPLATE.format( title=title, description=description, language_instruction=lang_instruction, output_language=OUTPUT_LANGUAGE, existing_context=existing_context, ) try: result, usage = await call_claude(prompt) articles = self._parse_response(result) # Ausgeschlossene Quellen dynamisch aus DB laden excluded_sources = await self._get_excluded_sources(user_id=user_id) # Ausgeschlossene Quellen filtern filtered = [] for article in articles: source = article.get("source", "").lower() source_url = article.get("source_url", "").lower() excluded = False for excl in excluded_sources: if excl in source or excl in source_url: excluded = True break if not excluded: # Bei nur-deutsch: nicht-deutsche Ergebnisse nachfiltern if not international and article.get("language", "de") != "de": continue filtered.append(article) logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})") return filtered, usage except TimeoutError: raise # Timeout nach oben durchreichen fuer Retry im Orchestrator except Exception as e: logger.error(f"Recherche-Fehler: {e}") return [], None async def _get_excluded_sources(self, user_id: int = None) -> list[str]: """Laedt ausgeschlossene Quellen (global + per-User).""" try: from source_rules import get_source_rules, get_user_excluded_domains rules = await get_source_rules() excluded = list(rules.get("excluded_domains", [])) # User-spezifische Ausschluesse hinzufuegen if user_id: user_excluded = await get_user_excluded_domains(user_id) for domain in user_excluded: if domain not in excluded: excluded.append(domain) return excluded except Exception as e: logger.warning(f"Fallback auf config.py fuer Excluded Sources: {e}") from config import EXCLUDED_SOURCES return list(EXCLUDED_SOURCES) def _parse_response(self, response: str) -> list[dict]: """Parst die Claude-Antwort als JSON-Array.""" # Versuche JSON direkt zu parsen try: data = json.loads(response) if isinstance(data, list): return data if isinstance(data, dict) and "articles" in data: return data["articles"] except json.JSONDecodeError: pass # JSON-Code-Block extrahieren code_pat = r'`{3}(?:json)?\s*\n?(\[.*?\])\s*`{3}' code_match = re.search(code_pat, response, re.DOTALL) if code_match: try: data = json.loads(code_match.group(1)) if isinstance(data, list): return data except json.JSONDecodeError: pass # Versuche JSON aus der Antwort zu extrahieren (zwischen [ und ]) arr_pat = r'\[\s*\{.*\}\s*\]' match = re.search(arr_pat, response, re.DOTALL) if match: try: data = json.loads(match.group()) if isinstance(data, list): return data except json.JSONDecodeError: pass # Letzter Versuch: einzelne JSON-Objekte mit headline objects = re.findall(r'\{[^{}]*"headline"[^{}]*\}', response) if objects: results = [] for obj_str in objects: try: obj = json.loads(obj_str) if "headline" in obj: results.append(obj) except json.JSONDecodeError: continue if results: logger.info(f"JSON-Recovery: {len(results)} Artikel aus Einzelobjekten extrahiert") return results logger.warning(f"Konnte Claude-Antwort nicht als JSON parsen (Laenge: {len(response)})") return [] async def select_relevant_telegram_channels( self, title: str, description: str, channels_metadata: list[dict], ) -> tuple[list[dict], ClaudeUsage | None]: """Laesst Claude die relevanten Telegram-Kanaele fuer eine Lage vorauswaehlen. Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe. Returns: (ausgewaehlte Kanaele, usage) -- Bei Fehler: (alle Kanaele, None) """ if len(channels_metadata) <= 10: logger.info("Telegram-Selektion: Nur %d Kanaele, nutze alle", len(channels_metadata)) return channels_metadata, None channel_lines = [] for i, ch in enumerate(channels_metadata, 1): cat = ch.get("category", "sonstige") notes = (ch.get("notes") or "")[:100] channel_lines.append(f"{i}. {ch['name']} [{cat}] - {notes}") prompt = TELEGRAM_CHANNEL_SELECTION_PROMPT.format( title=title, description=description or "Keine weitere Beschreibung", channel_list="\n".join(channel_lines), ) try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) arr_match = re.search(r'\[[\d\s,]+\]', result) if not arr_match: logger.warning("Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele") return channels_metadata, usage indices = json.loads(arr_match.group()) selected = [] for idx in indices: if isinstance(idx, int) and 1 <= idx <= len(channels_metadata): selected.append(channels_metadata[idx - 1]) if not selected: logger.warning("Telegram-Selektion: Keine gueltigen Indizes, nutze alle Kanaele") return channels_metadata, usage logger.info( "Telegram-Selektion: %d von %d Kanaelen ausgewaehlt", len(selected), len(channels_metadata) ) return selected, usage except Exception as e: logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e) return channels_metadata, None