Commits vergleichen
2 Commits
bfa4d5fd78
...
44de6616f1
| Autor | SHA1 | Datum | |
|---|---|---|---|
| 44de6616f1 | |||
|
|
88b18d0775 |
@@ -796,13 +796,16 @@ class AgentOrchestrator:
|
||||
"source_url": row["source_url"]}
|
||||
for row in existing_db_articles_full
|
||||
]
|
||||
results, usage = await researcher.search(
|
||||
results, usage, parse_failed = await researcher.search(
|
||||
title, description, incident_type,
|
||||
international=international, user_id=user_id,
|
||||
existing_articles=existing_for_context,
|
||||
)
|
||||
logger.info(f"Claude-Recherche: {len(results)} Ergebnisse")
|
||||
return results, usage
|
||||
logger.info(
|
||||
f"Claude-Recherche: {len(results)} Ergebnisse"
|
||||
+ (" (Parser fehlgeschlagen)" if parse_failed else "")
|
||||
)
|
||||
return results, usage, parse_failed
|
||||
|
||||
async def _podcast_pipeline():
|
||||
"""Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten)."""
|
||||
@@ -885,7 +888,7 @@ class AgentOrchestrator:
|
||||
pipeline_results = await asyncio.gather(*pipelines)
|
||||
|
||||
(rss_articles, rss_feed_usage) = pipeline_results[0]
|
||||
(search_results, search_usage) = pipeline_results[1]
|
||||
(search_results, search_usage, search_parse_failed) = pipeline_results[1]
|
||||
(podcast_articles, _podcast_usage) = pipeline_results[2]
|
||||
telegram_articles = pipeline_results[3][0] if include_telegram else []
|
||||
|
||||
@@ -1568,6 +1571,11 @@ class AgentOrchestrator:
|
||||
|
||||
logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel")
|
||||
|
||||
# Multi-Pass-Diagnose: Pass-Ergebnis zurueck an Multi-Pass-Caller geben
|
||||
if _pass_info is not None:
|
||||
_pass_info["new_count"] = new_count
|
||||
_pass_info["parse_failed"] = search_parse_failed
|
||||
|
||||
# Executive Summary im Hintergrund vorab generieren (fuer schnelleren Export)
|
||||
if new_count > 0:
|
||||
async def _pregenerate_exec_summary():
|
||||
@@ -1622,6 +1630,7 @@ class AgentOrchestrator:
|
||||
Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade)
|
||||
"""
|
||||
total = RESEARCH_MULTI_PASS_COUNT
|
||||
pass_results = []
|
||||
|
||||
for pass_nr in range(1, total + 1):
|
||||
# Cancel zwischen Durchläufen prüfen
|
||||
@@ -1662,12 +1671,27 @@ class AgentOrchestrator:
|
||||
if is_last:
|
||||
raise
|
||||
# Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben
|
||||
finally:
|
||||
pass_results.append(pass_info)
|
||||
|
||||
logger.info(
|
||||
f"Research Multi-Pass abgeschlossen für Lage {incident_id}: "
|
||||
f"{total} Durchläufe"
|
||||
)
|
||||
|
||||
# Diagnose: Wenn ALLE Passes 0 neue Artikel hatten UND mindestens einer
|
||||
# an einem Parser-Fehler scheiterte, ist die Recherche faktisch fehlgeschlagen —
|
||||
# Claude lieferte zwar Antworten, aber kein verwertbares JSON. Sonst bliebe
|
||||
# die Lage ohne sichtbare Fehlermeldung leer (siehe staging Lage "Friedrich Merz").
|
||||
total_new = sum(p.get("new_count", 0) for p in pass_results)
|
||||
any_parse_failed = any(p.get("parse_failed") for p in pass_results)
|
||||
if total_new == 0 and any_parse_failed:
|
||||
raise RuntimeError(
|
||||
"Recherche fehlgeschlagen: Claude lieferte keine verwertbaren Quellen "
|
||||
"(JSON-Parsing schlug bei mindestens einem Durchlauf fehl). "
|
||||
"Bitte Logs prüfen und Refresh erneut starten."
|
||||
)
|
||||
|
||||
|
||||
# Singleton-Instanz
|
||||
orchestrator = AgentOrchestrator()
|
||||
|
||||
@@ -7,6 +7,60 @@ from config import CLAUDE_MODEL_FAST
|
||||
|
||||
logger = logging.getLogger("osint.researcher")
|
||||
|
||||
|
||||
class ResearcherParseError(Exception):
|
||||
"""Claude hat eine nicht-leere Antwort geliefert, aus der kein JSON extrahiert werden konnte."""
|
||||
|
||||
|
||||
def _truncate_for_log(text: str, limit: int = 600) -> str:
|
||||
"""Kürzt eine Claude-Antwort für Logs, damit ein Sample sichtbar ist."""
|
||||
if not text:
|
||||
return ""
|
||||
snippet = text.strip().replace("\n", "\\n")
|
||||
if len(snippet) > limit:
|
||||
snippet = snippet[:limit] + "..."
|
||||
return snippet
|
||||
|
||||
|
||||
def _extract_json_array(text: str):
|
||||
"""Findet das erste vollständige JSON-Array im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
|
||||
if not text:
|
||||
return None
|
||||
decoder = json.JSONDecoder()
|
||||
idx = 0
|
||||
while True:
|
||||
bracket = text.find("[", idx)
|
||||
if bracket == -1:
|
||||
return None
|
||||
try:
|
||||
obj, _ = decoder.raw_decode(text, bracket)
|
||||
except json.JSONDecodeError:
|
||||
idx = bracket + 1
|
||||
continue
|
||||
if isinstance(obj, list):
|
||||
return obj
|
||||
idx = bracket + 1
|
||||
|
||||
|
||||
def _extract_json_object(text: str):
|
||||
"""Findet das erste vollständige JSON-Objekt im Text (auch mit Vor-/Nachtext oder Markdown-Fence)."""
|
||||
if not text:
|
||||
return None
|
||||
decoder = json.JSONDecoder()
|
||||
idx = 0
|
||||
while True:
|
||||
brace = text.find("{", idx)
|
||||
if brace == -1:
|
||||
return None
|
||||
try:
|
||||
obj, _ = decoder.raw_decode(text, brace)
|
||||
except json.JSONDecodeError:
|
||||
idx = brace + 1
|
||||
continue
|
||||
if isinstance(obj, dict):
|
||||
return obj
|
||||
idx = brace + 1
|
||||
|
||||
RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System.
|
||||
AUSGABESPRACHE: {output_language}
|
||||
- KEINE Gedankenstriche (— oder –) verwenden, stattdessen Kommas, Doppelpunkte oder neue Saetze.
|
||||
@@ -211,30 +265,28 @@ class ResearcherAgent:
|
||||
try:
|
||||
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
||||
|
||||
# Neues Format: JSON-Objekt mit "feeds" und "keywords"
|
||||
keywords = None
|
||||
indices = None
|
||||
|
||||
# Versuche JSON-Objekt zu parsen
|
||||
obj_match = re.search(r'\{[^{}]*"feeds"\s*:\s*\[[\d\s,]+\][^{}]*\}', result, re.DOTALL)
|
||||
if obj_match:
|
||||
try:
|
||||
obj = json.loads(obj_match.group())
|
||||
indices = obj.get("feeds", [])
|
||||
# Neues Format: {"feeds": [...], "keywords": [...]}
|
||||
obj = _extract_json_object(result)
|
||||
if isinstance(obj, dict) and isinstance(obj.get("feeds"), list):
|
||||
indices = obj["feeds"]
|
||||
raw_keywords = obj.get("keywords", [])
|
||||
if isinstance(raw_keywords, list) and raw_keywords:
|
||||
keywords = [str(k).lower().strip() for k in raw_keywords if k]
|
||||
logger.info(f"Feed-Selektion Keywords: {keywords}")
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Fallback: altes Array-Format
|
||||
# Fallback: nacktes Array
|
||||
if indices is None:
|
||||
arr_match = re.search(r'\[[\d\s,]+\]', result)
|
||||
if not arr_match:
|
||||
logger.warning("Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds")
|
||||
arr = _extract_json_array(result)
|
||||
if not isinstance(arr, list):
|
||||
logger.warning(
|
||||
"Feed-Selektion: Kein JSON in Antwort, nutze alle Feeds. Sample: %s",
|
||||
_truncate_for_log(result),
|
||||
)
|
||||
return feeds_metadata, None, usage
|
||||
indices = json.loads(arr_match.group())
|
||||
indices = arr
|
||||
|
||||
selected = []
|
||||
for idx in indices:
|
||||
@@ -275,19 +327,12 @@ class ResearcherAgent:
|
||||
try:
|
||||
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
||||
|
||||
parsed = None
|
||||
try:
|
||||
parsed = json.loads(result)
|
||||
except json.JSONDecodeError:
|
||||
match = re.search(r'\[.*\]', result, re.DOTALL)
|
||||
if match:
|
||||
try:
|
||||
parsed = json.loads(match.group())
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
if not parsed or not isinstance(parsed, list):
|
||||
logger.warning("Keyword-Extraktion: Kein gueltiges JSON erhalten")
|
||||
parsed = _extract_json_array(result)
|
||||
if not isinstance(parsed, list):
|
||||
logger.warning(
|
||||
"Keyword-Extraktion: Kein gueltiges JSON erhalten. Sample: %s",
|
||||
_truncate_for_log(result),
|
||||
)
|
||||
return None, usage
|
||||
|
||||
# Flache Liste: alle DE + EN Begriffe
|
||||
@@ -310,8 +355,14 @@ class ResearcherAgent:
|
||||
logger.warning(f"Keyword-Extraktion fehlgeschlagen: {e}")
|
||||
return None, None
|
||||
|
||||
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None]:
|
||||
"""Sucht nach Informationen zu einem Vorfall."""
|
||||
async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True, user_id: int = None, existing_articles: list[dict] = None) -> tuple[list[dict], ClaudeUsage | None, bool]:
|
||||
"""Sucht nach Informationen zu einem Vorfall.
|
||||
|
||||
Returns:
|
||||
(artikel, usage, parse_failed) — parse_failed ist True, wenn Claude geantwortet hat,
|
||||
das JSON aber nicht extrahierbar war. So kann der Orchestrator zwischen
|
||||
"echt keine Treffer" und "kaputte Antwort" unterscheiden.
|
||||
"""
|
||||
from config import OUTPUT_LANGUAGE
|
||||
if incident_type == "research":
|
||||
lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY
|
||||
@@ -354,7 +405,13 @@ class ResearcherAgent:
|
||||
|
||||
try:
|
||||
result, usage = await call_claude(prompt)
|
||||
try:
|
||||
articles = self._parse_response(result)
|
||||
except ResearcherParseError as parse_err:
|
||||
# Claude hat geantwortet, aber kein verwertbares JSON dabei.
|
||||
# Usage trotzdem zurueckgeben, damit Credits korrekt verbucht werden.
|
||||
logger.warning("Claude-Recherche: %s", parse_err)
|
||||
return [], usage, True
|
||||
|
||||
# Ausgeschlossene Quellen dynamisch aus DB laden
|
||||
excluded_sources = await self._get_excluded_sources(user_id=user_id)
|
||||
@@ -376,13 +433,13 @@ class ResearcherAgent:
|
||||
filtered.append(article)
|
||||
|
||||
logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})")
|
||||
return filtered, usage
|
||||
return filtered, usage, False
|
||||
|
||||
except TimeoutError:
|
||||
raise # Timeout nach oben durchreichen fuer Retry im Orchestrator
|
||||
except Exception as e:
|
||||
logger.error(f"Recherche-Fehler: {e}")
|
||||
return [], None
|
||||
return [], None, False
|
||||
|
||||
async def _get_excluded_sources(self, user_id: int = None) -> list[str]:
|
||||
"""Laedt ausgeschlossene Quellen (global + per-User)."""
|
||||
@@ -405,56 +462,57 @@ class ResearcherAgent:
|
||||
return list(EXCLUDED_SOURCES)
|
||||
|
||||
def _parse_response(self, response: str) -> list[dict]:
|
||||
"""Parst die Claude-Antwort als JSON-Array."""
|
||||
# Versuche JSON direkt zu parsen
|
||||
"""Parst die Claude-Antwort als JSON-Array.
|
||||
|
||||
Wirft ResearcherParseError, wenn die Antwort nicht-leer ist, sich aber
|
||||
kein JSON extrahieren laesst. Eine echte leere Liste (z.B. wenn Claude
|
||||
wirklich keine Treffer hat) wird als [] zurueckgegeben.
|
||||
"""
|
||||
text = (response or "").strip()
|
||||
if not text:
|
||||
return []
|
||||
|
||||
# 1) Direkt parsen (Antwort ist bereits sauberes JSON)
|
||||
try:
|
||||
data = json.loads(response)
|
||||
data = json.loads(text)
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
if isinstance(data, dict) and "articles" in data:
|
||||
if isinstance(data, dict) and isinstance(data.get("articles"), list):
|
||||
return data["articles"]
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# JSON-Code-Block extrahieren
|
||||
code_pat = r'`{3}(?:json)?\s*\n?(\[.*?\])\s*`{3}'
|
||||
code_match = re.search(code_pat, response, re.DOTALL)
|
||||
if code_match:
|
||||
try:
|
||||
data = json.loads(code_match.group(1))
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
# 2) JSON-Array irgendwo im Text (Markdown-Fence oder Vor-/Nachtext)
|
||||
arr = _extract_json_array(text)
|
||||
if isinstance(arr, list):
|
||||
return arr
|
||||
|
||||
# Versuche JSON aus der Antwort zu extrahieren (zwischen [ und ])
|
||||
arr_pat = r'\[\s*\{.*\}\s*\]'
|
||||
match = re.search(arr_pat, response, re.DOTALL)
|
||||
if match:
|
||||
try:
|
||||
data = json.loads(match.group())
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
# 3) JSON-Objekt mit "articles"-Key
|
||||
obj = _extract_json_object(text)
|
||||
if isinstance(obj, dict) and isinstance(obj.get("articles"), list):
|
||||
return obj["articles"]
|
||||
|
||||
# Letzter Versuch: einzelne JSON-Objekte mit headline
|
||||
objects = re.findall(r'\{[^{}]*"headline"[^{}]*\}', response)
|
||||
if objects:
|
||||
results = []
|
||||
for obj_str in objects:
|
||||
# 4) Recovery: einzelne Headline-Objekte aus Fliesstext
|
||||
recovered = []
|
||||
for obj_str in re.findall(r'\{[^{}]*"headline"[^{}]*\}', text, re.DOTALL):
|
||||
try:
|
||||
obj = json.loads(obj_str)
|
||||
if "headline" in obj:
|
||||
results.append(obj)
|
||||
parsed = json.loads(obj_str)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if results:
|
||||
logger.info(f"JSON-Recovery: {len(results)} Artikel aus Einzelobjekten extrahiert")
|
||||
return results
|
||||
if isinstance(parsed, dict) and "headline" in parsed:
|
||||
recovered.append(parsed)
|
||||
if recovered:
|
||||
logger.info("JSON-Recovery: %d Artikel aus Einzelobjekten extrahiert", len(recovered))
|
||||
return recovered
|
||||
|
||||
logger.warning(f"Konnte Claude-Antwort nicht als JSON parsen (Laenge: {len(response)})")
|
||||
return []
|
||||
# Parse fehlgeschlagen — Claude hat geantwortet, aber kein verwertbares JSON dabei.
|
||||
# Sample loggen, damit der Fehler debuggbar ist, und Aufrufer signalisieren.
|
||||
logger.warning(
|
||||
"Konnte Claude-Antwort nicht als JSON parsen (Laenge: %d). Sample: %s",
|
||||
len(text),
|
||||
_truncate_for_log(text),
|
||||
)
|
||||
raise ResearcherParseError(f"Claude-Antwort enthielt kein verwertbares JSON (Laenge: {len(text)})")
|
||||
|
||||
async def select_relevant_telegram_channels(
|
||||
self,
|
||||
@@ -488,12 +546,14 @@ class ResearcherAgent:
|
||||
try:
|
||||
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
||||
|
||||
arr_match = re.search(r'\[[\d\s,]+\]', result)
|
||||
if not arr_match:
|
||||
logger.warning("Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele")
|
||||
indices = _extract_json_array(result)
|
||||
if not isinstance(indices, list):
|
||||
logger.warning(
|
||||
"Telegram-Selektion: Kein JSON in Antwort, nutze alle Kanaele. Sample: %s",
|
||||
_truncate_for_log(result),
|
||||
)
|
||||
return channels_metadata, usage
|
||||
|
||||
indices = json.loads(arr_match.group())
|
||||
selected = []
|
||||
for idx in indices:
|
||||
if isinstance(idx, int) and 1 <= idx <= len(channels_metadata):
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren