"""Analyzer-Agent: Analysiert, übersetzt und fasst Meldungen zusammen.""" import json import logging import re from datetime import datetime from config import TIMEZONE from agents.claude_client import call_claude, ClaudeUsage logger = logging.getLogger("osint.analyzer") ANALYSIS_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. HEUTIGES DATUM: {today} AUSGABESPRACHE: {output_language} WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). VORFALL: {title} KONTEXT: {description} VORHANDENE MELDUNGEN: {articles_text} AUFTRAG: 1. Erstelle eine neutrale, faktenbasierte Zusammenfassung auf {output_language}. Sei so ausführlich wie nötig, um alle wesentlichen Aspekte und Themenstränge abzudecken 2. Verwende Inline-Quellenverweise [1], [2], [3] etc. im Zusammenfassungstext 3. Liste die bestätigten Kernfakten auf 4. Übersetze fremdsprachige Überschriften und Inhalte in die Ausgabesprache STRUKTUR: - Wenn die Meldungen thematisch klar einen einzelnen Strang behandeln: Fließtext ohne Überschriften - Wenn verschiedene Aspekte oder Themenfelder aufkommen (z.B. Ereignis + Reaktionen + Hintergrund): Gliedere mit kurzen Markdown-Zwischenüberschriften (##) - Die Entscheidung liegt bei dir — Überschriften nur wenn sie dem Leser helfen, verschiedene Themenstränge auseinanderzuhalten REGELN: - Neutral und sachlich - keine Wertungen oder Spekulationen - KEINE Gedankenstriche (—, –) verwenden — stattdessen Kommas, Doppelpunkte oder neue Saetze - Nur gesicherte Informationen in die Zusammenfassung - Bei widersprüchlichen Angaben beide Seiten erwähnen - Wenn eine Quelle eine erkennbare Ausrichtung hat (z.B. pro-russisch, pro-iranisch, staatsnah, rechtsextrem), muss dies im Fliesstext erwaehnt werden, damit der Leser die Information einordnen kann. Beispiel: "Laut dem pro-russischen Telegram-Kanal Rybar..." oder "Die iranische Nachrichtenagentur Fars meldete..." oder "Der rechtsextreme Kanal Compact behauptete..." - Quellen immer mit [Nr] referenzieren - Jede verwendete Quelle MUSS im sources-Array aufgelistet sein - Nummeriere die Quellen fortlaufend ab [1]. Verwende NUR ganze Zahlen als Quellennummern (z.B. [389], [390]), KEINE Buchstaben-Suffixe wie [389a] - Ältere Quellen zeitlich einordnen (z.B. "laut einem Bericht vom Januar", "Anfang Februar berichtete...") Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Zusammenfassung auf {output_language} mit Quellenverweisen [1], [2] etc. im Text (Markdown-Überschriften ## erlaubt wenn sinnvoll) - "sources": Array von Quellenobjekten, je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} - "key_facts": Array von bestätigten Kernfakten (Strings, in Ausgabesprache) - "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" BRIEFING_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. Du erstellst ein strukturiertes Briefing für eine Hintergrundrecherche. HEUTIGES DATUM: {today} AUSGABESPRACHE: {output_language} WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). THEMA: {title} KONTEXT: {description} VORLIEGENDE QUELLEN: {articles_text} AUFTRAG: Erstelle ein strukturiertes Briefing auf {output_language} mit folgenden Abschnitten. Sei so ausführlich wie nötig, um alle Aspekte gründlich abzudecken. Verwende durchgehend Inline-Quellenverweise [1], [2], [3] etc. im Text. ## ÜBERBLICK Kurze Einordnung des Themas (2-3 Sätze) ## HINTERGRUND Historischer Kontext, relevante Vorgeschichte ## AKTEURE Beteiligte Personen, Organisationen, Institutionen und ihre Rollen ## AKTUELLE LAGE Was ist der aktuelle Stand? Welche Entwicklungen gibt es? ## EINSCHÄTZUNG Sachliche Bewertung der Situation, mögliche Entwicklungen ## QUELLENQUALITÄT Kurze Bewertung der Informationslage: Wie belastbar sind die vorliegenden Quellen? REGELN: - Neutral und sachlich - keine Wertungen oder Spekulationen - KEINE Gedankenstriche (—, –) verwenden — stattdessen Kommas, Doppelpunkte oder neue Saetze - Nur gesicherte Informationen verwenden - Bei widersprüchlichen Angaben beide Seiten erwähnen - Wenn eine Quelle eine erkennbare Ausrichtung hat (z.B. pro-russisch, pro-iranisch, staatsnah, rechtsextrem), muss dies im Fliesstext erwaehnt werden, damit der Leser die Information einordnen kann. Beispiel: "Laut dem pro-russischen Telegram-Kanal Rybar..." oder "Die iranische Nachrichtenagentur Fars meldete..." oder "Der rechtsextreme Kanal Compact behauptete..." - Quellen immer mit [Nr] referenzieren - Jede verwendete Quelle MUSS im sources-Array aufgelistet sein - Nummeriere die Quellen fortlaufend ab [1]. Verwende NUR ganze Zahlen als Quellennummern (z.B. [389], [390]), KEINE Buchstaben-Suffixe wie [389a] - Ältere Quellen zeitlich einordnen (z.B. "laut einem Bericht vom Januar", "Anfang Februar berichtete...") - Markdown-Überschriften (##) für die Abschnitte verwenden - KEIN Fettdruck (**) verwenden Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Das strukturierte Briefing als Markdown-Text mit Quellenverweisen [1], [2] etc. - "sources": Array von Quellenobjekten, je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} - "key_facts": Array von gesicherten Kernfakten (Strings, in Ausgabesprache) - "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" INCREMENTAL_ANALYSIS_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. HEUTIGES DATUM: {today} AUSGABESPRACHE: {output_language} WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). VORFALL: {title} KONTEXT: {description} BISHERIGES LAGEBILD: {previous_summary} BISHERIGE QUELLEN: {previous_sources_text} NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE: {new_articles_text} AUFTRAG: 1. Aktualisiere das Lagebild basierend auf den neuen Meldungen. Das Lagebild soll so ausführlich wie nötig sein, um alle wesentlichen Themenstränge abzudecken 2. Behalte bestätigte Fakten aus dem bisherigen Lagebild bei 3. Ergänze neue Erkenntnisse und markiere wichtige neue Entwicklungen 4. Aktualisiere die Quellenverweise — neue Quellen bekommen fortlaufende Nummern nach den bisherigen 5. Entferne nur nachweislich widerlegte Informationen. Behalte alle thematischen Abschnitte bei, auch wenn sie nicht durch neue Meldungen aktualisiert werden STRUKTUR: - Fließtext oder mit Markdown-Zwischenüberschriften (##) — je nach Komplexität - KEIN Fettdruck (**) verwenden REGELN: - Neutral und sachlich - keine Wertungen oder Spekulationen - KEINE Gedankenstriche (—, –) verwenden — stattdessen Kommas, Doppelpunkte oder neue Saetze - Bei widersprüchlichen Angaben beide Seiten erwähnen - Wenn eine Quelle eine erkennbare Ausrichtung hat (z.B. pro-russisch, pro-iranisch, staatsnah, rechtsextrem), muss dies im Fliesstext erwaehnt werden, damit der Leser die Information einordnen kann. Beispiel: "Laut dem pro-russischen Telegram-Kanal Rybar..." oder "Die iranische Nachrichtenagentur Fars meldete..." oder "Der rechtsextreme Kanal Compact behauptete..." - Quellen immer mit [Nr] referenzieren - Ältere Quellen zeitlich einordnen Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Aktualisierte Zusammenfassung mit Quellenverweisen [1], [2] etc. - "sources": Array mit NUR den NEUEN Quellen aus den neuen Meldungen, je: {{"nr": , "name": "Quellenname", "url": "https://..."}}. Alte Quellen werden automatisch gemerged. - "key_facts": Array aller aktuellen Kernfakten (in Ausgabesprache) - "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" INCREMENTAL_BRIEFING_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. Du aktualisierst ein strukturiertes Briefing für eine Hintergrundrecherche. HEUTIGES DATUM: {today} AUSGABESPRACHE: {output_language} WICHTIG: Verwende IMMER echte UTF-8-Umlaute (ä, ö, ü, ß) — NIEMALS Umschreibungen (ae, oe, ue, ss). THEMA: {title} KONTEXT: {description} BISHERIGES BRIEFING: {previous_summary} BISHERIGE QUELLEN: {previous_sources_text} NEUE QUELLEN SEIT DEM LETZTEN UPDATE: {new_articles_text} AUFTRAG: Aktualisiere das Briefing mit den neuen Erkenntnissen. Sei so ausführlich wie nötig. Behalte die Struktur bei: ## ÜBERBLICK ## HINTERGRUND ## AKTEURE ## AKTUELLE LAGE ## EINSCHÄTZUNG ## QUELLENQUALITÄT REGELN: - Bisherige gesicherte Fakten beibehalten - KEINE Gedankenstriche (—, –) verwenden — stattdessen Kommas, Doppelpunkte oder neue Saetze - Neue Erkenntnisse einarbeiten - Veraltete Informationen aktualisieren - Wenn eine Quelle eine erkennbare Ausrichtung hat (z.B. pro-russisch, pro-iranisch, staatsnah, rechtsextrem), muss dies im Fliesstext erwaehnt werden, damit der Leser die Information einordnen kann. Beispiel: "Laut dem pro-russischen Telegram-Kanal Rybar..." oder "Die iranische Nachrichtenagentur Fars meldete..." oder "Der rechtsextreme Kanal Compact behauptete..." - Quellen immer mit [Nr] referenzieren - Markdown-Überschriften (##) für die Abschnitte verwenden Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: - "summary": Das aktualisierte Briefing als Markdown-Text mit Quellenverweisen - "sources": Array mit NUR den NEUEN Quellen aus den neuen Meldungen, je: {{"nr": , "name": "Quellenname", "url": "https://..."}}. Alte Quellen werden automatisch gemerged. - "key_facts": Array aller gesicherten Kernfakten (in Ausgabesprache) - "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" class AnalyzerAgent: """Analysiert und übersetzt Meldungen über Claude CLI.""" def _format_articles_text(self, articles: list[dict], max_articles: int = 30) -> str: """Formatiert Artikel als Text für den Prompt.""" articles_text = "" for i, article in enumerate(articles[:max_articles]): articles_text += f"\n--- Meldung {i+1} (ID: {article.get('id', 'neu')}) ---\n" articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" url = article.get('source_url', '') if url: articles_text += f"URL: {url}\n" articles_text += f"Sprache: {article.get('language', 'de')}\n" bias = article.get('source_bias', '') if bias: articles_text += f"Einordnung: {bias}\n" published = article.get('published_at', '') if published: articles_text += f"Veröffentlicht: {published}\n" headline = article.get('headline_de') or article.get('headline', '') articles_text += f"Überschrift: {headline}\n" content = article.get('content_de') or article.get('content_original', '') if content: articles_text += f"Inhalt: {content[:800]}\n" return articles_text async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[dict | None, ClaudeUsage | None]: """Erstanalyse: Analysiert alle Meldungen zu einem Vorfall (erster Refresh).""" if not articles: return None, None articles_text = self._format_articles_text(articles) from config import OUTPUT_LANGUAGE today = datetime.now(TIMEZONE).strftime("%d.%m.%Y") template = BRIEFING_PROMPT_TEMPLATE if incident_type == "research" else ANALYSIS_PROMPT_TEMPLATE prompt = template.format( title=title, description=description or "Keine weiteren Details", articles_text=articles_text, today=today, output_language=OUTPUT_LANGUAGE, ) try: result, usage = await call_claude(prompt) analysis = self._parse_response(result) if analysis: analysis = self._sanitize_sources(analysis) logger.info(f"Erstanalyse abgeschlossen: {len(analysis.get('sources', []))} Quellen referenziert") return analysis, usage except Exception as e: logger.error(f"Analyse-Fehler: {e}") return None, None async def analyze_incremental( self, title: str, description: str, new_articles: list[dict], previous_summary: str, previous_sources_json: str | None, incident_type: str = "adhoc", ) -> tuple[dict | None, ClaudeUsage | None]: """Inkrementelle Analyse: Aktualisiert das Lagebild mit nur den neuen Artikeln. Spart erheblich Tokens, da nicht alle Artikel erneut gesendet werden. """ if not new_articles: logger.info("Inkrementelle Analyse übersprungen: keine neuen Artikel") return None, None new_articles_text = self._format_articles_text(new_articles, max_articles=20) previous_sources_text = "Keine bisherigen Quellen" self._all_previous_sources = [] if previous_sources_json: try: self._all_previous_sources = json.loads(previous_sources_json) total = len(self._all_previous_sources) recent = self._all_previous_sources[-100:] if total > 100 else self._all_previous_sources src_lines = [] if total > 100: highest_nr = self._all_previous_sources[-1].get("nr", "?") src_lines.append(f"(... {total - 100} aeltere Quellen ausgelassen, hoechste Nr: {highest_nr})") for s in recent: nr = s.get("nr", "?") name = s.get("name", "?") src_lines.append(f"[{nr}] {name}") previous_sources_text = chr(10).join(src_lines) except (json.JSONDecodeError, TypeError): previous_sources_text = "Fehler beim Laden der bisherigen Quellen" from config import OUTPUT_LANGUAGE today = datetime.now(TIMEZONE).strftime("%d.%m.%Y") template = INCREMENTAL_BRIEFING_PROMPT_TEMPLATE if incident_type == "research" else INCREMENTAL_ANALYSIS_PROMPT_TEMPLATE prompt = template.format( title=title, description=description or "Keine weiteren Details", previous_summary=previous_summary, previous_sources_text=previous_sources_text, new_articles_text=new_articles_text, today=today, output_language=OUTPUT_LANGUAGE, ) try: result, usage = await call_claude(prompt) analysis = self._parse_response(result) if analysis: analysis = self._sanitize_sources(analysis) if analysis and self._all_previous_sources: # Merge: alte Quellen beibehalten, neue hinzufuegen returned_sources = analysis.get("sources", []) returned_nrs = {s.get("nr") for s in returned_sources} merged = [s for s in self._all_previous_sources if s.get("nr") not in returned_nrs] merged.extend(returned_sources) merged.sort(key=lambda s: s.get("nr", 0) if isinstance(s.get("nr"), int) else 9999) analysis["sources"] = merged logger.info( f"Inkrementelle Analyse abgeschlossen: {len(new_articles)} neue Artikel, " f"{len(analysis.get('sources', []))} Quellen gesamt (merged)" ) elif analysis: logger.info( f"Inkrementelle Analyse abgeschlossen: {len(new_articles)} neue Artikel, " f"{len(analysis.get('sources', []))} Quellen gesamt" ) return analysis, usage except Exception as e: logger.error(f"Inkrementelle Analyse-Fehler: {e}") return None, None def _sanitize_sources(self, analysis: dict) -> dict: """Entfernt Buchstaben-Suffixe aus Quellennummern (z.B. '1383a' -> 1383). Das LLM erzeugt trotz Anweisung gelegentlich Suffix-Nummern. Diese werden hier auf die Basisnummer normalisiert. Duplikate werden entfernt, wobei Eintraege mit URL bevorzugt werden. """ sources = analysis.get("sources", []) if not sources: return analysis cleaned = {} suffix_count = 0 for s in sources: nr = s.get("nr", "") nr_str = str(nr) # Prüfe auf Buchstaben-Suffix (z.B. "1383a", "1383b") m = re.match(r"^(\d+)[a-z]$", nr_str) if m: base_nr = int(m.group(1)) suffix_count += 1 # Nur übernehmen wenn Basisnummer noch nicht existiert oder # dieser Eintrag eine URL hat und der bisherige nicht if base_nr not in cleaned: s_copy = dict(s) s_copy["nr"] = base_nr cleaned[base_nr] = s_copy elif s.get("url") and not cleaned[base_nr].get("url"): s_copy = dict(s) s_copy["nr"] = base_nr cleaned[base_nr] = s_copy else: nr_int = int(nr) if isinstance(nr, (int, float)) or (isinstance(nr, str) and nr.isdigit()) else nr if nr_int not in cleaned: cleaned[nr_int] = s elif s.get("url") and not cleaned[nr_int].get("url"): cleaned[nr_int] = s if suffix_count > 0: logger.info(f"Quellen-Sanitierung: {suffix_count} Buchstaben-Suffixe entfernt") analysis["sources"] = sorted(cleaned.values(), key=lambda s: s.get("nr", 0) if isinstance(s.get("nr"), int) else 9999) return analysis def _parse_response(self, response: str) -> dict | None: """Parst die Claude-Antwort als JSON-Objekt mit robustem Fallback.""" # Markdown-Code-Fences entfernen cleaned = response.strip() if cleaned.startswith("```"): first_nl = cleaned.find(chr(10)) if first_nl != -1: cleaned = cleaned[first_nl + 1:] if cleaned.endswith("```"): cleaned = cleaned[:-3].rstrip() # Versuch 1: Direkt parsen try: data = json.loads(cleaned) if isinstance(data, dict): return data except json.JSONDecodeError: pass # Versuch 2: Aeusserstes JSON-Objekt per Regex match = re.search(r"\{.*\}", response, re.DOTALL) if match: try: data = json.loads(match.group()) if isinstance(data, dict): return data except json.JSONDecodeError: pass # Versuch 3: Abgeschnittenes JSON reparieren (haeufig bei langen Antworten) candidate = match.group() if match else cleaned repaired = self._repair_truncated_json(candidate) if repaired: return repaired # Versuch 4: summary per Regex extrahieren als letzter Fallback fallback = self._extract_summary_fallback(response) if fallback: logger.warning("JSON-Parse fehlgeschlagen, nutze Regex-Fallback fuer summary") return fallback logger.error( "Konnte Analyse-Antwort nicht als JSON parsen " f"(Laenge: {len(response)} Zeichen, Anfang: {response[:200]!r})" ) return None def _repair_truncated_json(self, text: str) -> dict | None: """Versucht abgeschnittenes JSON zu reparieren (fehlende Klammern am Ende).""" start = text.find("{") if start == -1: return None fragment = text[start:] open_braces = fragment.count("{") - fragment.count("}") open_brackets = fragment.count("[") - fragment.count("]") if open_braces <= 0 and open_brackets <= 0: return None # Nicht abgeschnitten trimmed = fragment.rstrip() # Entferne unvollstaendigen trailing content for end_pattern in [ r'"\s*$', r',\s*"[^"]*$', r',\s*$', ]: m = re.search(end_pattern, trimmed) if m: trimmed = trimmed[:m.start()] break # Schliesse offene Strukturen trimmed = trimmed.rstrip().rstrip(",") open_braces = trimmed.count("{") - trimmed.count("}") open_brackets = trimmed.count("[") - trimmed.count("]") trimmed += "]" * max(0, open_brackets) + "}" * max(0, open_braces) try: data = json.loads(trimmed) if isinstance(data, dict) and "summary" in data: logger.info(f"Abgeschnittenes JSON erfolgreich repariert ({len(trimmed)} Zeichen)") return data except json.JSONDecodeError: pass return None def _extract_summary_fallback(self, response: str) -> dict | None: """Extrahiert summary und sources per Regex wenn JSON-Parse komplett fehlschlaegt.""" m = re.search(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"', response, re.DOTALL) if not m: return None summary = m.group(1) summary = summary.replace('\\n', chr(10)).replace('\\"', '"').replace('\\\\', '\\') sources = [] sm = re.search(r'"sources"\s*:\s*(\[.*?\])', response, re.DOTALL) if sm: try: sources = json.loads(sm.group(1)) except json.JSONDecodeError: pass return {"summary": summary, "sources": sources, "key_facts": [], "translations": []}