From 88b18d0775bc5e17fd8b85fc1d3e6bdd6f8318c1 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 30 Apr 2026 20:45:41 +0000 Subject: [PATCH] fix(researcher): Robusteres JSON-Parsing der Claude-Antworten MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Behebt das Symptom, dass Recherche-Lagen wie staging Lage 6 "Friedrich Merz" trotz erfolgreichem Refresh leer blieben. Claude lieferte nicht-leere Antworten (1226-2125 Zeichen), die der bisherige Regex-Parser nicht extrahieren konnte — die Recherche meldete "0 Artikel" und der Refresh wurde stumm als Erfolg verbucht. Aenderungen: - _parse_response, select_relevant_feeds, extract_dynamic_keywords und select_relevant_telegram_channels nutzen jetzt json.JSONDecoder.raw_decode ueber Modul-Helper _extract_json_array/_extract_json_object. Damit werden auch JSON-Bloecke mit Vor-/Nachtext, Markdown-Fences oder verschachtelten Objekten zuverlaessig erkannt. - Bei Parse-Fehlschlag wird jetzt ein gekuerztes Sample der Claude-Antwort geloggt, damit kuenftige Faelle direkt debuggbar sind. - Neue ResearcherParseError-Exception unterscheidet "echt 0 Treffer" von "Antwort kaputt". search() gibt zusaetzlich ein parse_failed-Flag zurueck. - Orchestrator-Multi-Pass: wenn alle 3 research-Durchlaeufe 0 neue Artikel ergeben UND mindestens einer am Parser scheiterte, wird der Refresh als Fehler markiert (statt als stiller Erfolg). Der WebSocket-refresh_error loest dann die sichtbare UI-Meldung aus. Adhoc-Lagen sind unveraendert: dort fangen RSS und Telegram die kaputte Claude-Antwort auf, dafuer ist nur die Diagnose im Log neu. --- src/agents/orchestrator.py | 32 +++++- src/agents/researcher.py | 220 +++++++++++++++++++++++-------------- 2 files changed, 168 insertions(+), 84 deletions(-) diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 997bfeb..e946ebc 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -796,13 +796,16 @@ class AgentOrchestrator: "source_url": row["source_url"]} for row in existing_db_articles_full ] - results, usage = await researcher.search( + results, usage, parse_failed = await researcher.search( title, description, incident_type, international=international, user_id=user_id, existing_articles=existing_for_context, ) - logger.info(f"Claude-Recherche: {len(results)} Ergebnisse") - return results, usage + logger.info( + f"Claude-Recherche: {len(results)} Ergebnisse" + + (" (Parser fehlgeschlagen)" if parse_failed else "") + ) + return results, usage, parse_failed async def _podcast_pipeline(): """Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten).""" @@ -885,7 +888,7 @@ class AgentOrchestrator: pipeline_results = await asyncio.gather(*pipelines) (rss_articles, rss_feed_usage) = pipeline_results[0] - (search_results, search_usage) = pipeline_results[1] + (search_results, search_usage, search_parse_failed) = pipeline_results[1] (podcast_articles, _podcast_usage) = pipeline_results[2] telegram_articles = pipeline_results[3][0] if include_telegram else [] @@ -1568,6 +1571,11 @@ class AgentOrchestrator: logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel") + # Multi-Pass-Diagnose: Pass-Ergebnis zurueck an Multi-Pass-Caller geben + if _pass_info is not None: + _pass_info["new_count"] = new_count + _pass_info["parse_failed"] = search_parse_failed + # Executive Summary im Hintergrund vorab generieren (fuer schnelleren Export) if new_count > 0: async def _pregenerate_exec_summary(): @@ -1622,6 +1630,7 @@ class AgentOrchestrator: Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade) """ total = RESEARCH_MULTI_PASS_COUNT + pass_results = [] for pass_nr in range(1, total + 1): # Cancel zwischen Durchläufen prüfen @@ -1662,12 +1671,27 @@ class AgentOrchestrator: if is_last: raise # Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben + finally: + pass_results.append(pass_info) logger.info( f"Research Multi-Pass abgeschlossen für Lage {incident_id}: " f"{total} Durchläufe" ) + # Diagnose: Wenn ALLE Passes 0 neue Artikel hatten UND mindestens einer + # an einem Parser-Fehler scheiterte, ist die Recherche faktisch fehlgeschlagen — + # Claude lieferte zwar Antworten, aber kein verwertbares JSON. Sonst bliebe + # die Lage ohne sichtbare Fehlermeldung leer (siehe staging Lage "Friedrich Merz"). + total_new = sum(p.get("new_count", 0) for p in pass_results) + any_parse_failed = any(p.get("parse_failed") for p in pass_results) + if total_new == 0 and any_parse_failed: + raise RuntimeError( + "Recherche fehlgeschlagen: Claude lieferte keine verwertbaren Quellen " + "(JSON-Parsing schlug bei mindestens einem Durchlauf fehl). " + "Bitte Logs prüfen und Refresh erneut starten." + ) + # Singleton-Instanz orchestrator = AgentOrchestrator() diff --git a/src/agents/researcher.py b/src/agents/researcher.py index fb858c3..6f826ca 100644 --- a/src/agents/researcher.py +++ b/src/agents/researcher.py @@ -7,6 +7,60 @@ from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.researcher") + +class ResearcherParseError(Exception): + """Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte.""" + + +def _truncate_for_log(text: str, limit: int = 600) -> str: + """Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist.""" + if not text: + return "" + snippet = text.strip().replace("\n", "\\n") + if len(snippet) > limit: + snippet = snippet[:limit] + "..." + return snippet + + +def _extract_json_array(text: str): + """Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence).""" + if not text: + return None + decoder = json.JSONDecoder() + idx = 0 + while True: + bracket = text.find("[", idx) + if bracket == -1: + return None + try: + obj, _ = decoder.raw_decode(text, bracket) + except json.JSONDecodeError: + idx = bracket + 1 + continue + if isinstance(obj, list): + return obj + idx = bracket + 1 + + +def _extract_json_object(text: str): + """Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence).""" + if not text: + return None + decoder = json.JSONDecoder() + idx = 0 + while True: + brace = text.find("{", idx) + if brace == -1: + return None + try: + obj, _ = decoder.raw_decode(text, brace) + except json.JSONDecodeError: + idx = brace + 1 + continue + if isinstance(obj, dict): + return obj + idx = brace + 1 + RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System. AUSGABESPRACHE: {output_language} - KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze. @@ -211,30 +265,28 @@ class ResearcherAgent: try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) - # Neues Format: JSON-Objekt mit "feeds" und "keywords" keywords = None indices = None - # Versuche JSON-Objekt zu parsen - obj_match = re.search(r'\{[^{}]*"feeds"\s*:\s*\[[\d\s,]+\][^{}]*\}', result, re.DOTALL) - if obj_match: - try: - obj = json.loads(obj_match.group()) - indices = obj.get("feeds", []) - raw_keywords = obj.get("keywords", []) - if isinstance(raw_keywords, list) and raw_keywords: - keywords = [str(k).lower().strip() for k in raw_keywords if k] - logger.info(f"Feed-Selektion Keywords: {keywords}") - except (json.JSONDecodeError, ValueError): - pass + # Neues Format: {"feeds": [...], "keywords": [...]} + obj = _extract_json_object(result) + if isinstance(obj, dict) and isinstance(obj.get("feeds"), list): + indices = obj["feeds"] + raw_keywords = obj.get("keywords", []) + if isinstance(raw_keywords, list) and raw_keywords: + keywords = [str(k).lower().strip() for k in raw_keywords if k] + logger.info(f"Feed-Selektion Keywords: {keywords}") - # Fallback: altes Array-Format + # Fallback: nacktes Array if indices is None: - arr_match = re.search(r'\[[\d\s,]+\]', result) - if not arr_match: - logger.warning("Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds") + arr = _extract_json_array(result) + if not isinstance(arr, list): + logger.warning( + "Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s", + _truncate_for_log(result), + ) return feeds_metadata, None, usage - indices = json.loads(arr_match.group()) + indices = arr selected = [] for idx in indices: @@ -275,19 +327,12 @@ class ResearcherAgent: try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) - parsed = None - try: - parsed = json.loads(result) - except json.JSONDecodeError: - match = re.search(r'\[.*\]', result, re.DOTALL) - if match: - try: - parsed = json.loads(match.group()) - except json.JSONDecodeError: - pass - - if not parsed or not isinstance(parsed, list): - logger.warning("Keyword-Extraktion: Kein gueltiges JSON erhalten") + parsed = _extract_json_array(result) + if not isinstance(parsed, list): + logger.warning( + "Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s", + _truncate_for_log(result), + ) return None, usage # Flache Liste: alle DE + EN Begriffe @@ -310,8 +355,14 @@ class ResearcherAgent: logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}") return None, None - async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None]: - """Sucht nach Informationen zu einem Vorfall.""" + async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None, bool]: + """Sucht nach Informationen zu einem Vorfall. + + Returns: + (artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat, + das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen + "echt keine Treffer" und "kaputte Antwort" unterscheiden. + """ from config import OUTPUT_LANGUAGE if incident_type == "research": lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY @@ -354,7 +405,13 @@ class ResearcherAgent: try: result, usage = await call_claude(prompt) - articles = self._parse_response(result) + try: + articles = self._parse_response(result) + except ResearcherParseError as parse_err: + # Claude hat geantwortet, aber kein verwertbares JSON dabei. + # Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden. + logger.warning("Claude-Recherche: %s", parse_err) + return [], usage, True # Ausgeschlossene Quellen dynamisch aus DB laden excluded_sources = await self._get_excluded_sources(user_id=user_id) @@ -376,13 +433,13 @@ class ResearcherAgent: filtered.append(article) logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})") - return filtered, usage + return filtered, usage, False except TimeoutError: raise # Timeout nach oben durchreichen fuer Retry im Orchestrator except Exception as e: logger.error(f"Recherche-Fehler: {e}") - return [], None + return [], None, False async def _get_excluded_sources(self, user_id: int = None) -> list[str]: """Laedt ausgeschlossene Quellen (global + per-User).""" @@ -405,56 +462,57 @@ class ResearcherAgent: return list(EXCLUDED_SOURCES) def _parse_response(self, response: str) -> list[dict]: - """Parst die Claude-Antwort als JSON-Array.""" - # Versuche JSON direkt zu parsen + """Parst die Claude-Antwort als JSON-Array. + + Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber + kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude + wirklich keine Treffer hat) wird als [] zurueckgegeben. + """ + text = (response or "").strip() + if not text: + return [] + + # 1) Direkt parsen (Antwort ist bereits sauberes JSON) try: - data = json.loads(response) + data = json.loads(text) if isinstance(data, list): return data - if isinstance(data, dict) and "articles" in data: + if isinstance(data, dict) and isinstance(data.get("articles"), list): return data["articles"] except json.JSONDecodeError: pass - # JSON-Code-Block extrahieren - code_pat = r'`{3}(?:json)?\s*\n?(\[.*?\])\s*`{3}' - code_match = re.search(code_pat, response, re.DOTALL) - if code_match: + # 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext) + arr = _extract_json_array(text) + if isinstance(arr, list): + return arr + + # 3) JSON-Objekt mit "articles"-Key + obj = _extract_json_object(text) + if isinstance(obj, dict) and isinstance(obj.get("articles"), list): + return obj["articles"] + + # 4) Recovery: einzelne Headline-Objekte aus Fliesstext + recovered = [] + for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL): try: - data = json.loads(code_match.group(1)) - if isinstance(data, list): - return data + parsed = json.loads(obj_str) except json.JSONDecodeError: - pass + continue + if isinstance(parsed, dict) and "headline" in parsed: + recovered.append(parsed) + if recovered: + logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered)) + return recovered - # Versuche JSON aus der Antwort zu extrahieren (zwischen [ und ]) - arr_pat = r'\[\s*\{.*\}\s*\]' - match = re.search(arr_pat, response, re.DOTALL) - if match: - try: - data = json.loads(match.group()) - if isinstance(data, list): - return data - except json.JSONDecodeError: - pass - - # Letzter Versuch: einzelne JSON-Objekte mit headline - objects = re.findall(r'\{[^{}]*"headline"[^{}]*\}', response) - if objects: - results = [] - for obj_str in objects: - try: - obj = json.loads(obj_str) - if "headline" in obj: - results.append(obj) - except json.JSONDecodeError: - continue - if results: - logger.info(f"JSON-Recovery: {len(results)} Artikel aus Einzelobjekten extrahiert") - return results - - logger.warning(f"Konnte Claude-Antwort nicht als JSON parsen (Laenge: {len(response)})") - return [] + # Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei. + # Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren. + logger.warning( + "Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s", + len(text), + _truncate_for_log(text), + ) + raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})") async def select_relevant_telegram_channels( self, @@ -488,12 +546,14 @@ class ResearcherAgent: try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) - arr_match = re.search(r'\[[\d\s,]+\]', result) - if not arr_match: - logger.warning("Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele") + indices = _extract_json_array(result) + if not isinstance(indices, list): + logger.warning( + "Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s", + _truncate_for_log(result), + ) return channels_metadata, usage - indices = json.loads(arr_match.group()) selected = [] for idx in indices: if isinstance(idx, int) and 1 <= idx <= len(channels_metadata): -- 2.49.1