Commits vergleichen

..

2 Commits

Autor SHA1 Nachricht Datum
44de6616f1 Promote develop → main (2026-04-30 21:03 UTC) 2026-04-30 23:03:07 +02:00
Claude Code
88b18d0775 fix(researcher): Robusteres JSON-Parsing der Claude-Antworten
Behebt das Symptom, dass Recherche-Lagen wie staging Lage 6 "Friedrich Merz"
trotz erfolgreichem Refresh leer blieben. Claude lieferte nicht-leere Antworten
(1226-2125 Zeichen), die der bisherige Regex-Parser nicht extrahieren konnte —
die Recherche meldete "0 Artikel" und der Refresh wurde stumm als Erfolg
verbucht.

Aenderungen:
- _parse_response, select_relevant_feeds, extract_dynamic_keywords und
  select_relevant_telegram_channels nutzen jetzt json.JSONDecoder.raw_decode
  ueber Modul-Helper _extract_json_array/_extract_json_object. Damit werden
  auch JSON-Bloecke mit Vor-/Nachtext, Markdown-Fences oder verschachtelten
  Objekten zuverlaessig erkannt.
- Bei Parse-Fehlschlag wird jetzt ein gekuerztes Sample der Claude-Antwort
  geloggt, damit kuenftige Faelle direkt debuggbar sind.
- Neue ResearcherParseError-Exception unterscheidet "echt 0 Treffer" von
  "Antwort kaputt". search() gibt zusaetzlich ein parse_failed-Flag zurueck.
- Orchestrator-Multi-Pass: wenn alle 3 research-Durchlaeufe 0 neue Artikel
  ergeben UND mindestens einer am Parser scheiterte, wird der Refresh als
  Fehler markiert (statt als stiller Erfolg). Der WebSocket-refresh_error
  loest dann die sichtbare UI-Meldung aus.

Adhoc-Lagen sind unveraendert: dort fangen RSS und Telegram die kaputte
Claude-Antwort auf, dafuer ist nur die Diagnose im Log neu.
2026-04-30 20:45:41 +00:00
2 geänderte Dateien mit 168 neuen und 84 gelöschten Zeilen

Datei anzeigen

@@ -796,13 +796,16 @@ class AgentOrchestrator:
"source_url": row["source_url"]} "source_url": row["source_url"]}
for row in existing_db_articles_full for row in existing_db_articles_full
] ]
results, usage = await researcher.search( results, usage, parse_failed = await researcher.search(
title, description, incident_type, title, description, incident_type,
international=international, user_id=user_id, international=international, user_id=user_id,
existing_articles=existing_for_context, existing_articles=existing_for_context,
) )
logger.info(f"Claude-Recherche: {len(results)} Ergebnisse") logger.info(
return results, usage f"Claude-Recherche: {len(results)} Ergebnisse"
+ (" (Parser fehlgeschlagen)" if parse_failed else "")
)
return results, usage, parse_failed
async def _podcast_pipeline(): async def _podcast_pipeline():
"""Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten).""" """Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten)."""
@@ -885,7 +888,7 @@ class AgentOrchestrator:
pipeline_results = await asyncio.gather(*pipelines) pipeline_results = await asyncio.gather(*pipelines)
(rss_articles, rss_feed_usage) = pipeline_results[0] (rss_articles, rss_feed_usage) = pipeline_results[0]
(search_results, search_usage) = pipeline_results[1] (search_results, search_usage, search_parse_failed) = pipeline_results[1]
(podcast_articles, _podcast_usage) = pipeline_results[2] (podcast_articles, _podcast_usage) = pipeline_results[2]
telegram_articles = pipeline_results[3][0] if include_telegram else [] telegram_articles = pipeline_results[3][0] if include_telegram else []
@@ -1568,6 +1571,11 @@ class AgentOrchestrator:
logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel") logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel")
# Multi-Pass-Diagnose: Pass-Ergebnis zurueck an Multi-Pass-Caller geben
if _pass_info is not None:
_pass_info["new_count"] = new_count
_pass_info["parse_failed"] = search_parse_failed
# Executive Summary im Hintergrund vorab generieren (fuer schnelleren Export) # Executive Summary im Hintergrund vorab generieren (fuer schnelleren Export)
if new_count > 0: if new_count > 0:
async def _pregenerate_exec_summary(): async def _pregenerate_exec_summary():
@@ -1622,6 +1630,7 @@ class AgentOrchestrator:
Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade) Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade)
""" """
total = RESEARCH_MULTI_PASS_COUNT total = RESEARCH_MULTI_PASS_COUNT
pass_results = []
for pass_nr in range(1, total + 1): for pass_nr in range(1, total + 1):
# Cancel zwischen Durchläufen prüfen # Cancel zwischen Durchläufen prüfen
@@ -1662,12 +1671,27 @@ class AgentOrchestrator:
if is_last: if is_last:
raise raise
# Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben # Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben
finally:
pass_results.append(pass_info)
logger.info( logger.info(
f"Research Multi-Pass abgeschlossen für Lage {incident_id}: " f"Research Multi-Pass abgeschlossen für Lage {incident_id}: "
f"{total} Durchläufe" f"{total} Durchläufe"
) )
# Diagnose: Wenn ALLE Passes 0 neue Artikel hatten UND mindestens einer
# an einem Parser-Fehler scheiterte, ist die Recherche faktisch fehlgeschlagen —
# Claude lieferte zwar Antworten, aber kein verwertbares JSON. Sonst bliebe
# die Lage ohne sichtbare Fehlermeldung leer (siehe staging Lage "Friedrich Merz").
total_new = sum(p.get("new_count", 0) for p in pass_results)
any_parse_failed = any(p.get("parse_failed") for p in pass_results)
if total_new == 0 and any_parse_failed:
raise RuntimeError(
"Recherche fehlgeschlagen: Claude lieferte keine verwertbaren Quellen "
"(JSON-Parsing schlug bei mindestens einem Durchlauf fehl). "
"Bitte Logs prüfen und Refresh erneut starten."
)
# Singleton-Instanz # Singleton-Instanz
orchestrator = AgentOrchestrator() orchestrator = AgentOrchestrator()

Datei anzeigen

@@ -7,6 +7,60 @@ from config import CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.researcher") logger = logging.getLogger("osint.researcher")
class ResearcherParseError(Exception):
"""Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte."""
def _truncate_for_log(text: str, limit: int = 600) -> str:
"""Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist."""
if not text:
return ""
snippet = text.strip().replace("\n", "\\n")
if len(snippet) > limit:
snippet = snippet[:limit] + "..."
return snippet
def _extract_json_array(text: str):
"""Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
bracket = text.find("[", idx)
if bracket == -1:
return None
try:
obj, _ = decoder.raw_decode(text, bracket)
except json.JSONDecodeError:
idx = bracket + 1
continue
if isinstance(obj, list):
return obj
idx = bracket + 1
def _extract_json_object(text: str):
"""Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
if not text:
return None
decoder = json.JSONDecoder()
idx = 0
while True:
brace = text.find("{", idx)
if brace == -1:
return None
try:
obj, _ = decoder.raw_decode(text, brace)
except json.JSONDecodeError:
idx = brace + 1
continue
if isinstance(obj, dict):
return obj
idx = brace + 1
RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System. RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System.
AUSGABESPRACHE: {output_language} AUSGABESPRACHE: {output_language}
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze. - KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
@@ -211,30 +265,28 @@ class ResearcherAgent:
try: try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
# Neues Format: JSON-Objekt mit "feeds" und "keywords"
keywords = None keywords = None
indices = None indices = None
# Versuche JSON-Objekt zu parsen # Neues Format: {"feeds": [...], "keywords": [...]}
obj_match = re.search(r'\{[^{}]*"feeds"\s*:\s*\[[\d\s,]+\][^{}]*\}', result, re.DOTALL) obj = _extract_json_object(result)
if obj_match: if isinstance(obj, dict) and isinstance(obj.get("feeds"), list):
try: indices = obj["feeds"]
obj = json.loads(obj_match.group()) raw_keywords = obj.get("keywords", [])
indices = obj.get("feeds", []) if isinstance(raw_keywords, list) and raw_keywords:
raw_keywords = obj.get("keywords", []) keywords = [str(k).lower().strip() for k in raw_keywords if k]
if isinstance(raw_keywords, list) and raw_keywords: logger.info(f"Feed-Selektion Keywords: {keywords}")
keywords = [str(k).lower().strip() for k in raw_keywords if k]
logger.info(f"Feed-Selektion Keywords: {keywords}")
except (json.JSONDecodeError, ValueError):
pass
# Fallback: altes Array-Format # Fallback: nacktes Array
if indices is None: if indices is None:
arr_match = re.search(r'\[[\d\s,]+\]', result) arr = _extract_json_array(result)
if not arr_match: if not isinstance(arr, list):
logger.warning("Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds") logger.warning(
"Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s",
_truncate_for_log(result),
)
return feeds_metadata, None, usage return feeds_metadata, None, usage
indices = json.loads(arr_match.group()) indices = arr
selected = [] selected = []
for idx in indices: for idx in indices:
@@ -275,19 +327,12 @@ class ResearcherAgent:
try: try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
parsed = None parsed = _extract_json_array(result)
try: if not isinstance(parsed, list):
parsed = json.loads(result) logger.warning(
except json.JSONDecodeError: "Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s",
match = re.search(r'\[.*\]', result, re.DOTALL) _truncate_for_log(result),
if match: )
try:
parsed = json.loads(match.group())
except json.JSONDecodeError:
pass
if not parsed or not isinstance(parsed, list):
logger.warning("Keyword-Extraktion: Kein gueltiges JSON erhalten")
return None, usage return None, usage
# Flache Liste: alle DE + EN Begriffe # Flache Liste: alle DE + EN Begriffe
@@ -310,8 +355,14 @@ class ResearcherAgent:
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}") logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
return None, None return None, None
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None]: async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
"""Sucht nach Informationen zu einem Vorfall.""" """Sucht nach Informationen zu einem Vorfall.
Returns:
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
"""
from config import OUTPUT_LANGUAGE from config import OUTPUT_LANGUAGE
if incident_type == "research": if incident_type == "research":
lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY
@@ -354,7 +405,13 @@ class ResearcherAgent:
try: try:
result, usage = await call_claude(prompt) result, usage = await call_claude(prompt)
articles = self._parse_response(result) try:
articles = self._parse_response(result)
except ResearcherParseError as parse_err:
# Claude hat geantwortet, aber kein verwertbares JSON dabei.
# Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden.
logger.warning("Claude-Recherche: %s", parse_err)
return [], usage, True
# Ausgeschlossene Quellen dynamisch aus DB laden # Ausgeschlossene Quellen dynamisch aus DB laden
excluded_sources = await self._get_excluded_sources(user_id=user_id) excluded_sources = await self._get_excluded_sources(user_id=user_id)
@@ -376,13 +433,13 @@ class ResearcherAgent:
filtered.append(article) filtered.append(article)
logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})") logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})")
return filtered, usage return filtered, usage, False
except TimeoutError: except TimeoutError:
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
except Exception as e: except Exception as e:
logger.error(f"Recherche-Fehler: {e}") logger.error(f"Recherche-Fehler: {e}")
return [], None return [], None, False
async def _get_excluded_sources(self, user_id: int = None) -> list[str]: async def _get_excluded_sources(self, user_id: int = None) -> list[str]:
"""Laedt ausgeschlossene Quellen (global + per-User).""" """Laedt ausgeschlossene Quellen (global + per-User)."""
@@ -405,56 +462,57 @@ class ResearcherAgent:
return list(EXCLUDED_SOURCES) return list(EXCLUDED_SOURCES)
def _parse_response(self, response: str) -> list[dict]: def _parse_response(self, response: str) -> list[dict]:
"""Parst die Claude-Antwort als JSON-Array.""" """Parst die Claude-Antwort als JSON-Array.
# Versuche JSON direkt zu parsen
Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber
kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude
wirklich keine Treffer hat) wird als [] zurueckgegeben.
"""
text = (response or "").strip()
if not text:
return []
# 1) Direkt parsen (Antwort ist bereits sauberes JSON)
try: try:
data = json.loads(response) data = json.loads(text)
if isinstance(data, list): if isinstance(data, list):
return data return data
if isinstance(data, dict) and "articles" in data: if isinstance(data, dict) and isinstance(data.get("articles"), list):
return data["articles"] return data["articles"]
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
# JSON-Code-Block extrahieren # 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext)
code_pat = r'`{3}(?:json)?\s*\n?(\[.*?\])\s*`{3}' arr = _extract_json_array(text)
code_match = re.search(code_pat, response, re.DOTALL) if isinstance(arr, list):
if code_match: return arr
# 3) JSON-Objekt mit "articles"-Key
obj = _extract_json_object(text)
if isinstance(obj, dict) and isinstance(obj.get("articles"), list):
return obj["articles"]
# 4) Recovery: einzelne Headline-Objekte aus Fliesstext
recovered = []
for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL):
try: try:
data = json.loads(code_match.group(1)) parsed = json.loads(obj_str)
if isinstance(data, list):
return data
except json.JSONDecodeError: except json.JSONDecodeError:
pass continue
if isinstance(parsed, dict) and "headline" in parsed:
recovered.append(parsed)
if recovered:
logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered))
return recovered
# Versuche JSON aus der Antwort zu extrahieren (zwischen [ und ]) # Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei.
arr_pat = r'\[\s*\{.*\}\s*\]' # Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren.
match = re.search(arr_pat, response, re.DOTALL) logger.warning(
if match: "Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s",
try: len(text),
data = json.loads(match.group()) _truncate_for_log(text),
if isinstance(data, list): )
return data raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})")
except json.JSONDecodeError:
pass
# Letzter Versuch: einzelne JSON-Objekte mit headline
objects = re.findall(r'\{[^{}]*"headline"[^{}]*\}', response)
if objects:
results = []
for obj_str in objects:
try:
obj = json.loads(obj_str)
if "headline" in obj:
results.append(obj)
except json.JSONDecodeError:
continue
if results:
logger.info(f"JSON-Recovery: {len(results)} Artikel aus Einzelobjekten extrahiert")
return results
logger.warning(f"Konnte Claude-Antwort nicht als JSON parsen (Laenge: {len(response)})")
return []
async def select_relevant_telegram_channels( async def select_relevant_telegram_channels(
self, self,
@@ -488,12 +546,14 @@ class ResearcherAgent:
try: try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
arr_match = re.search(r'\[[\d\s,]+\]', result) indices = _extract_json_array(result)
if not arr_match: if not isinstance(indices, list):
logger.warning("Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele") logger.warning(
"Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s",
_truncate_for_log(result),
)
return channels_metadata, usage return channels_metadata, usage
indices = json.loads(arr_match.group())
selected = [] selected = []
for idx in indices: for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(channels_metadata): if isinstance(idx, int) and 1 <= idx <= len(channels_metadata):