diff --git a/src/agents/analyzer.py b/src/agents/analyzer.py index 2f42cd1..1defecb 100644 --- a/src/agents/analyzer.py +++ b/src/agents/analyzer.py @@ -98,18 +98,99 @@ Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" +INCREMENTAL_ANALYSIS_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. +HEUTIGES DATUM: {today} +AUSGABESPRACHE: {output_language} + +VORFALL: {title} +KONTEXT: {description} + +BISHERIGES LAGEBILD: +{previous_summary} + +BISHERIGE QUELLEN: +{previous_sources_text} + +NEUE MELDUNGEN SEIT DEM LETZTEN UPDATE: +{new_articles_text} + +AUFTRAG: +1. Aktualisiere das Lagebild basierend auf den neuen Meldungen (max. 500 Wörter) +2. Behalte bestätigte Fakten aus dem bisherigen Lagebild bei +3. Ergänze neue Erkenntnisse und markiere wichtige neue Entwicklungen +4. Aktualisiere die Quellenverweise — neue Quellen bekommen fortlaufende Nummern nach den bisherigen +5. Entferne veraltete oder widerlegte Informationen + +STRUKTUR: +- Fließtext oder mit Markdown-Zwischenüberschriften (##) — je nach Komplexität +- Neue Entwicklungen mit **Fettdruck** hervorheben + +REGELN: +- Neutral und sachlich - keine Wertungen oder Spekulationen +- Bei widersprüchlichen Angaben beide Seiten erwähnen +- Quellen immer mit [Nr] referenzieren +- Das sources-Array muss ALLE Quellen enthalten (bisherige + neue) +- Ältere Quellen zeitlich einordnen + +Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: +- "summary": Aktualisierte Zusammenfassung mit Quellenverweisen [1], [2] etc. +- "sources": VOLLSTÄNDIGES Array aller Quellen (alte + neue), je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} +- "key_facts": Array aller aktuellen Kernfakten (in Ausgabesprache) +- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) + +Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" + +INCREMENTAL_BRIEFING_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyse-Agent für ein Lagemonitoring-System. +Du aktualisierst ein strukturiertes Briefing für eine Hintergrundrecherche. +HEUTIGES DATUM: {today} +AUSGABESPRACHE: {output_language} + +THEMA: {title} +KONTEXT: {description} + +BISHERIGES BRIEFING: +{previous_summary} + +BISHERIGE QUELLEN: +{previous_sources_text} + +NEUE QUELLEN SEIT DEM LETZTEN UPDATE: +{new_articles_text} + +AUFTRAG: +Aktualisiere das Briefing (max. 800 Wörter) mit den neuen Erkenntnissen. Behalte die Struktur bei: + +## ÜBERBLICK +## HINTERGRUND +## AKTEURE +## AKTUELLE LAGE +## EINSCHÄTZUNG +## QUELLENQUALITÄT + +REGELN: +- Bisherige gesicherte Fakten beibehalten +- Neue Erkenntnisse einarbeiten und mit **Fettdruck** hervorheben +- Veraltete Informationen aktualisieren +- Quellen immer mit [Nr] referenzieren +- Das sources-Array muss ALLE Quellen enthalten (bisherige + neue) +- Markdown-Überschriften (##) für die Abschnitte verwenden + +Antworte AUSSCHLIESSLICH als JSON-Objekt mit diesen Feldern: +- "summary": Das aktualisierte Briefing als Markdown-Text mit Quellenverweisen +- "sources": VOLLSTÄNDIGES Array aller Quellen (alte + neue), je: {{"nr": 1, "name": "Quellenname", "url": "https://..."}} +- "key_facts": Array aller gesicherten Kernfakten (in Ausgabesprache) +- "translations": Array von Objekten mit "article_id", "headline_de", "content_de" (nur für neue fremdsprachige Artikel) + +Antworte NUR mit dem JSON-Objekt. Keine Einleitung, keine Erklärung.""" + class AnalyzerAgent: """Analysiert und übersetzt Meldungen über Claude CLI.""" - async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[dict | None, ClaudeUsage | None]: - """Analysiert alle Meldungen zu einem Vorfall.""" - if not articles: - return None, None - - # Artikel-Text für Prompt aufbereiten + def _format_articles_text(self, articles: list[dict], max_articles: int = 30) -> str: + """Formatiert Artikel als Text für den Prompt.""" articles_text = "" - for i, article in enumerate(articles[:30]): # Max 30 Artikel um Prompt-Länge zu begrenzen + for i, article in enumerate(articles[:max_articles]): articles_text += f"\n--- Meldung {i+1} (ID: {article.get('id', 'neu')}) ---\n" articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" url = article.get('source_url', '') @@ -123,7 +204,15 @@ class AnalyzerAgent: articles_text += f"Überschrift: {headline}\n" content = article.get('content_de') or article.get('content_original', '') if content: - articles_text += f"Inhalt: {content[:500]}\n" + articles_text += f"Inhalt: {content[:800]}\n" + return articles_text + + async def analyze(self, title: str, description: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[dict | None, ClaudeUsage | None]: + """Erstanalyse: Analysiert alle Meldungen zu einem Vorfall (erster Refresh).""" + if not articles: + return None, None + + articles_text = self._format_articles_text(articles) from config import OUTPUT_LANGUAGE today = datetime.now(TIMEZONE).strftime("%d.%m.%Y") @@ -140,12 +229,69 @@ class AnalyzerAgent: result, usage = await call_claude(prompt) analysis = self._parse_response(result) if analysis: - logger.info(f"Analyse abgeschlossen: {len(analysis.get('sources', []))} Quellen referenziert") + logger.info(f"Erstanalyse abgeschlossen: {len(analysis.get('sources', []))} Quellen referenziert") return analysis, usage except Exception as e: logger.error(f"Analyse-Fehler: {e}") return None, None + async def analyze_incremental( + self, + title: str, + description: str, + new_articles: list[dict], + previous_summary: str, + previous_sources_json: str | None, + incident_type: str = "adhoc", + ) -> tuple[dict | None, ClaudeUsage | None]: + """Inkrementelle Analyse: Aktualisiert das Lagebild mit nur den neuen Artikeln. + + Spart erheblich Tokens, da nicht alle Artikel erneut gesendet werden. + """ + if not new_articles: + logger.info("Inkrementelle Analyse übersprungen: keine neuen Artikel") + return None, None + + new_articles_text = self._format_articles_text(new_articles, max_articles=20) + + previous_sources_text = "Keine bisherigen Quellen" + if previous_sources_json: + try: + sources = json.loads(previous_sources_json) + lines = [] + for s in sources: + lines.append(f"[{s.get('nr', '?')}] {s.get('name', '?')} — {s.get('url', '?')}") + previous_sources_text = "\n".join(lines) + except (json.JSONDecodeError, TypeError): + previous_sources_text = "Fehler beim Laden der bisherigen Quellen" + + from config import OUTPUT_LANGUAGE + today = datetime.now(TIMEZONE).strftime("%d.%m.%Y") + + template = INCREMENTAL_BRIEFING_PROMPT_TEMPLATE if incident_type == "research" else INCREMENTAL_ANALYSIS_PROMPT_TEMPLATE + prompt = template.format( + title=title, + description=description or "Keine weiteren Details", + previous_summary=previous_summary, + previous_sources_text=previous_sources_text, + new_articles_text=new_articles_text, + today=today, + output_language=OUTPUT_LANGUAGE, + ) + + try: + result, usage = await call_claude(prompt) + analysis = self._parse_response(result) + if analysis: + logger.info( + f"Inkrementelle Analyse abgeschlossen: {len(new_articles)} neue Artikel, " + f"{len(analysis.get('sources', []))} Quellen gesamt" + ) + return analysis, usage + except Exception as e: + logger.error(f"Inkrementelle Analyse-Fehler: {e}") + return None, None + def _parse_response(self, response: str) -> dict | None: """Parst die Claude-Antwort als JSON-Objekt.""" try: diff --git a/src/agents/claude_client.py b/src/agents/claude_client.py index d8eafa0..f550d11 100644 --- a/src/agents/claude_client.py +++ b/src/agents/claude_client.py @@ -3,7 +3,7 @@ import asyncio import json import logging from dataclasses import dataclass -from config import CLAUDE_PATH, CLAUDE_TIMEOUT +from config import CLAUDE_PATH, CLAUDE_TIMEOUT, CLAUDE_MODEL_FAST logger = logging.getLogger("osint.claude_client") @@ -38,9 +38,17 @@ class UsageAccumulator: self.call_count += 1 -async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch") -> tuple[str, ClaudeUsage]: - """Ruft Claude CLI auf. Gibt (result_text, usage) zurück.""" +async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch", model: str | None = None) -> tuple[str, ClaudeUsage]: + """Ruft Claude CLI auf. Gibt (result_text, usage) zurück. + + Args: + prompt: Der Prompt für Claude + tools: Kommagetrennte erlaubte Tools (None = keine Tools, --max-turns 1) + model: Optionales Modell (z.B. CLAUDE_MODEL_FAST für Haiku). None = CLI-Default (Opus). + """ cmd = [CLAUDE_PATH, "-p", prompt, "--output-format", "json"] + if model: + cmd.extend(["--model", model]) if tools: cmd.extend(["--allowedTools", tools]) else: @@ -77,8 +85,9 @@ async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch") -> cost_usd=data.get("total_cost_usd", 0.0), duration_ms=data.get("duration_ms", 0), ) + model_info = f" [{model}]" if model else "" logger.info( - f"Claude: {usage.input_tokens} in / {usage.output_tokens} out / " + f"Claude{model_info}: {usage.input_tokens} in / {usage.output_tokens} out / " f"cache {usage.cache_creation_tokens}+{usage.cache_read_tokens} / " f"${usage.cost_usd:.4f} / {usage.duration_ms}ms" ) diff --git a/src/agents/factchecker.py b/src/agents/factchecker.py index 13969d9..4dd8b00 100644 --- a/src/agents/factchecker.py +++ b/src/agents/factchecker.py @@ -2,6 +2,7 @@ import json import logging import re +from difflib import SequenceMatcher from agents.claude_client import call_claude, ClaudeUsage logger = logging.getLogger("osint.factchecker") @@ -81,17 +82,138 @@ Antworte AUSSCHLIESSLICH als JSON-Array. Jedes Element hat: Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" +# --- Inkrementelle Faktencheck-Prompts (für Folge-Refreshes) --- + +INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für ein OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} + +VORFALL: {title} + +BEREITS GEPRÜFTE FAKTEN: +{existing_facts_text} + +NEUE MELDUNGEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die aus den Meldungen oder bereits geprüften Fakten stammen +- KEINE Fakten aus deinem Trainingskorpus +- Nutze WebSearch zur Verifikation +- Rufe gefundene URLs per WebFetch ab + +AUFTRAG: +1. Prüfe ob die neuen Meldungen bereits geprüfte Fakten BESTÄTIGEN, WIDERLEGEN oder ERGÄNZEN +2. Aktualisiere den Status bestehender Fakten wenn nötig (z.B. "unconfirmed" → "confirmed") +3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Meldungen +4. Prüfe neue Claims per WebSearch gegen unabhängige Quellen +5. Markiere wichtige Statusänderungen und neue Entwicklungen mit is_notification: true + +Status-Kategorien: +- "confirmed": 2+ unabhängige seriöse Quellen mit URL +- "unconfirmed": Nur 1 Quelle +- "contradicted": Widersprüchliche Informationen +- "developing": Situation unklar + +Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). +Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "confirmed" | "unconfirmed" | "contradicted" | "developing" +- "sources_count": Anzahl unabhängiger Quellen +- "evidence": Begründung MIT konkreten Quellen-URLs +- "is_notification": true/false + +Antworte NUR mit dem JSON-Array.""" + +INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE = """Du bist ein Faktencheck-Agent für eine Hintergrundrecherche in einem OSINT-Lagemonitoring-System. +AUSGABESPRACHE: {output_language} + +THEMA: {title} + +BEREITS GEPRÜFTE FAKTEN: +{existing_facts_text} + +NEUE QUELLEN: +{articles_text} + +STRENGE REGELN - KEINE HALLUZINATIONEN: +- Du darfst NUR Fakten bewerten, die aus den Quellen oder bereits geprüften Fakten stammen +- KEINE Fakten aus deinem Trainingskorpus +- Nutze WebSearch zur Verifikation +- Rufe gefundene URLs per WebFetch ab + +AUFTRAG: +1. Prüfe ob die neuen Quellen bereits geprüfte Fakten bestätigen, widerlegen oder ergänzen +2. Aktualisiere den Status bestehender Fakten wenn nötig +3. Identifiziere 3-5 NEUE Faktenaussagen aus den neuen Quellen +4. Prüfe neue Claims per WebSearch + +Status-Kategorien: +- "established": 3+ unabhängige Quellen mit URL +- "disputed": Verschiedene Positionen dokumentiert +- "unverified": Nicht unabhängig verifizierbar +- "developing": Faktenlage im Fluss + +Antworte AUSSCHLIESSLICH als JSON-Array mit ALLEN Fakten (bestehende aktualisiert + neue). +Jedes Element hat: +- "claim": Die Faktenaussage auf {output_language} +- "status": "established" | "disputed" | "unverified" | "developing" +- "sources_count": Anzahl unabhängiger Quellen +- "evidence": Begründung MIT konkreten Quellen-URLs +- "is_notification": true/false + +Antworte NUR mit dem JSON-Array.""" + + +def normalize_claim(claim: str) -> str: + """Normalisiert einen Claim für Ähnlichkeitsvergleich.""" + c = claim.lower().strip() + # Umlaute normalisieren + c = c.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss") + c = re.sub(r'[^\w\s]', '', c) + c = re.sub(r'\s+', ' ', c).strip() + return c + + +def find_matching_claim(new_claim: str, existing_claims: list[dict], threshold: float = 0.7) -> dict | None: + """Findet den besten passenden bestehenden Claim per Fuzzy-Matching. + + Args: + new_claim: Der neue Claim-Text + existing_claims: Liste von Dicts mit mindestens {"id", "claim", "status"} + threshold: Mindest-Ähnlichkeit (0.0-1.0), Standard 0.7 + + Returns: + Das passende Dict oder None wenn kein Match über dem Schwellwert + """ + norm_new = normalize_claim(new_claim) + if not norm_new: + return None + + best_match = None + best_ratio = 0.0 + + for existing in existing_claims: + norm_existing = normalize_claim(existing.get("claim", "")) + if not norm_existing: + continue + ratio = SequenceMatcher(None, norm_new, norm_existing).ratio() + if ratio > best_ratio: + best_ratio = ratio + best_match = existing + + if best_ratio >= threshold: + logger.debug(f"Claim-Match ({best_ratio:.2f}): '{new_claim[:50]}...' → '{best_match['claim'][:50]}...'") + return best_match + return None + class FactCheckerAgent: """Prüft Fakten über Claude CLI gegen unabhängige Quellen.""" - async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]: - """Führt Faktencheck für eine Lage durch.""" - if not articles: - return [], None - + def _format_articles_text(self, articles: list[dict], max_articles: int = 20) -> str: + """Formatiert Artikel als Text für den Prompt.""" articles_text = "" - for i, article in enumerate(articles[:20]): + for i, article in enumerate(articles[:max_articles]): articles_text += f"\n--- Meldung {i+1} ---\n" articles_text += f"Quelle: {article.get('source', 'Unbekannt')}\n" source_url = article.get('source_url', '') @@ -101,7 +223,27 @@ class FactCheckerAgent: articles_text += f"Überschrift: {headline}\n" content = article.get('content_de') or article.get('content_original', '') if content: - articles_text += f"Inhalt: {content[:300]}\n" + articles_text += f"Inhalt: {content[:500]}\n" + return articles_text + + def _format_existing_facts(self, facts: list[dict]) -> str: + """Formatiert bestehende Fakten als Text für den inkrementellen Prompt.""" + if not facts: + return "Keine bisherigen Fakten" + lines = [] + for fc in facts: + status = fc.get("status", "developing") + claim = fc.get("claim", "") + sources = fc.get("sources_count", 0) + lines.append(f"- [{status}] ({sources} Quellen) {claim}") + return "\n".join(lines) + + async def check(self, title: str, articles: list[dict], incident_type: str = "adhoc") -> tuple[list[dict], ClaudeUsage | None]: + """Führt vollständigen Faktencheck durch (erster Refresh).""" + if not articles: + return [], None + + articles_text = self._format_articles_text(articles) from config import OUTPUT_LANGUAGE template = RESEARCH_FACTCHECK_PROMPT_TEMPLATE if incident_type == "research" else FACTCHECK_PROMPT_TEMPLATE @@ -120,6 +262,46 @@ class FactCheckerAgent: logger.error(f"Faktencheck-Fehler: {e}") return [], None + async def check_incremental( + self, + title: str, + new_articles: list[dict], + existing_facts: list[dict], + incident_type: str = "adhoc", + ) -> tuple[list[dict], ClaudeUsage | None]: + """Inkrementeller Faktencheck: Prüft nur neue Artikel gegen bestehende Fakten. + + Spart Tokens, da nur neue Artikel + Zusammenfassung der bestehenden Fakten gesendet werden. + """ + if not new_articles: + logger.info("Inkrementeller Faktencheck übersprungen: keine neuen Artikel") + return [], None + + articles_text = self._format_articles_text(new_articles, max_articles=15) + existing_facts_text = self._format_existing_facts(existing_facts) + + from config import OUTPUT_LANGUAGE + if incident_type == "research": + template = INCREMENTAL_RESEARCH_FACTCHECK_PROMPT_TEMPLATE + else: + template = INCREMENTAL_FACTCHECK_PROMPT_TEMPLATE + + prompt = template.format( + title=title, + articles_text=articles_text, + existing_facts_text=existing_facts_text, + output_language=OUTPUT_LANGUAGE, + ) + + try: + result, usage = await call_claude(prompt) + facts = self._parse_response(result) + logger.info(f"Inkrementeller Faktencheck: {len(facts)} Fakten (neu + aktualisiert)") + return facts, usage + except Exception as e: + logger.error(f"Inkrementeller Faktencheck-Fehler: {e}") + return [], None + def _parse_response(self, response: str) -> list[dict]: """Parst die Claude-Antwort als JSON-Array.""" try: diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 52438f3..57b5957 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -9,6 +9,7 @@ from typing import Optional from urllib.parse import urlparse, urlunparse from agents.claude_client import UsageAccumulator +from agents.factchecker import find_matching_claim from source_rules import ( _detect_category, _extract_domain, @@ -18,6 +19,17 @@ from source_rules import ( logger = logging.getLogger("osint.orchestrator") +# Reputations-Score nach Quellenkategorie (für Relevanz-Scoring) +CATEGORY_REPUTATION = { + "nachrichten_de": 0.9, + "nachrichten_int": 0.9, + "presseagenturen": 1.0, + "behoerden": 1.0, + "fachmedien": 0.8, + "international": 0.7, + "sonstige": 0.4, +} + def _normalize_url(url: str) -> str: """URL normalisieren für Duplikat-Erkennung.""" @@ -76,6 +88,50 @@ def _is_duplicate(article: dict, seen_urls: set, seen_headlines: set) -> bool: return False +def _score_relevance(article: dict, search_words: list[str] = None) -> float: + """Berechnet einen Relevanz-Score (0.0-1.0) für einen Artikel. + + Gewichtung: + - 40% Keyword-Dichte (wie gut passt der Artikel zum Suchbegriff) + - 30% Quellen-Reputation (basierend auf Kategorie) + - 20% Inhaltstiefe (hat der Artikel substantiellen Inhalt) + - 10% RSS-Score (falls vorhanden, vom RSS-Parser) + """ + score = 0.0 + + # 1. Keyword-Dichte (40%) + rss_score = article.get("relevance_score", 0.0) + if rss_score > 0: + score += 0.4 * rss_score + elif search_words: + text = f"{article.get('headline', '')} {article.get('content_original', '')}".lower() + match_count = sum(1 for w in search_words if w in text) + score += 0.4 * (match_count / len(search_words)) if search_words else 0.0 + + # 2. Quellen-Reputation (30%) + source_url = article.get("source_url", "") + if source_url: + domain = _extract_domain(source_url) + category = _detect_category(domain) + score += 0.3 * CATEGORY_REPUTATION.get(category, 0.4) + else: + score += 0.3 * 0.4 # Unbekannte Quelle + + # 3. Inhaltstiefe (20%) + content = article.get("content_original") or article.get("content_de") or "" + if len(content) > 500: + score += 0.2 + elif len(content) > 200: + score += 0.1 + elif len(content) > 50: + score += 0.05 + + # 4. RSS-Score Bonus (10%) + score += 0.1 * rss_score + + return min(1.0, score) + + async def _background_discover_sources(articles: list[dict]): """Background-Task: Registriert seriöse, unbekannte Quellen aus Recherche-Ergebnissen.""" from database import get_db @@ -478,6 +534,8 @@ class AgentOrchestrator: visibility = incident["visibility"] if "visibility" in incident.keys() else "public" created_by = incident["created_by"] if "created_by" in incident.keys() else None tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None + previous_summary = incident["summary"] or "" + previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None # Bei Retry: vorherigen running-Eintrag als error markieren if retry_count > 0: @@ -569,6 +627,12 @@ class AgentOrchestrator: if dupes_removed > 0: logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend") + # Relevanz-Scoring und Sortierung + for article in unique_results: + if "relevance_score" not in article or article["relevance_score"] == 0: + article["relevance_score"] = _score_relevance(article) + unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True) + source_count = len(set(a.get("source", "") for a in unique_results)) if self._ws_manager: await self._ws_manager.broadcast_for_incident({ @@ -581,43 +645,45 @@ class AgentOrchestrator: }, }, visibility, created_by, tenant_id) - # In DB speichern (neue Artikel) — auch gegen bestehende DB-Einträge prüfen + # --- Set-basierte DB-Deduplizierung (statt N×M Queries) --- + cursor = await db.execute( + "SELECT id, source_url, headline FROM articles WHERE incident_id = ?", + (incident_id,), + ) + existing_db_articles = await cursor.fetchall() + existing_urls = set() + existing_headlines = set() + for row in existing_db_articles: + if row["source_url"]: + existing_urls.add(_normalize_url(row["source_url"])) + if row["headline"] and len(row["headline"]) > 20: + norm_h = _normalize_headline(row["headline"]) + if norm_h: + existing_headlines.add(norm_h) + + logger.info(f"DB-Dedup: {len(existing_urls)} URLs, {len(existing_headlines)} Headlines im Bestand") + + # Neue Artikel speichern und für Analyse tracken new_count = 0 + new_articles_for_analysis = [] for article in unique_results: - # Prüfen ob URL (normalisiert) schon existiert + # URL-Duplikat gegen DB if article.get("source_url"): norm_url = _normalize_url(article["source_url"]) - cursor = await db.execute( - "SELECT id, source_url FROM articles WHERE incident_id = ?", - (incident_id,), - ) - existing_articles = await cursor.fetchall() - already_exists = False - for existing in existing_articles: - if existing["source_url"] and _normalize_url(existing["source_url"]) == norm_url: - already_exists = True - break - if already_exists: + if norm_url in existing_urls: continue + existing_urls.add(norm_url) - # Headline-Duplikat gegen DB prüfen + # Headline-Duplikat gegen DB headline = article.get("headline", "") if headline and len(headline) > 20: norm_h = _normalize_headline(headline) - cursor = await db.execute( - "SELECT id, headline FROM articles WHERE incident_id = ?", - (incident_id,), - ) - existing_articles = await cursor.fetchall() - headline_exists = False - for existing in existing_articles: - if _normalize_headline(existing["headline"]) == norm_h: - headline_exists = True - break - if headline_exists: + if norm_h and norm_h in existing_headlines: continue + if norm_h: + existing_headlines.add(norm_h) - await db.execute( + cursor = await db.execute( """INSERT INTO articles (incident_id, headline, headline_de, source, source_url, content_original, content_de, language, published_at, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", @@ -635,6 +701,10 @@ class AgentOrchestrator: ), ) new_count += 1 + # Artikel mit DB-ID für die Analyse tracken + article_with_id = dict(article) + article_with_id["id"] = cursor.lastrowid + new_articles_for_analysis.append(article_with_id) await db.commit() @@ -647,23 +717,34 @@ class AgentOrchestrator: logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}") # Schritt 3: Analyse und Zusammenfassung - if new_count > 0 or not incident["summary"]: - cursor = await db.execute( - "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", - (incident_id,), - ) - all_articles = [dict(row) for row in await cursor.fetchall()] - + if new_count > 0 or not previous_summary: analyzer = AnalyzerAgent() - analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type) + + # Inkrementelle Analyse wenn Lagebild bereits existiert und neue Artikel vorhanden + if previous_summary and new_count > 0: + logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") + analysis, analysis_usage = await analyzer.analyze_incremental( + title, description, new_articles_for_analysis, + previous_summary, previous_sources_json, incident_type, + ) + else: + # Erstanalyse: Alle Artikel laden + logger.info("Erstanalyse: Alle Artikel werden analysiert") + cursor = await db.execute( + "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", + (incident_id,), + ) + all_articles = [dict(row) for row in await cursor.fetchall()] + analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type) + if analysis_usage: usage_acc.add(analysis_usage) if analysis: - is_first_summary = not incident["summary"] + is_first_summary = not previous_summary # Snapshot des alten Lagebilds sichern (nur wenn schon eins existiert) - if incident["summary"]: + if previous_summary: cursor = await db.execute( "SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?", (incident_id,), @@ -679,7 +760,7 @@ class AgentOrchestrator: (incident_id, summary, sources_json, article_count, fact_check_count, refresh_log_id, created_at, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (incident_id, incident["summary"], incident["sources_json"], + (incident_id, previous_summary, previous_sources_json, snap_articles, snap_fcs, log_id, now, tenant_id), ) @@ -715,13 +796,13 @@ class AgentOrchestrator: snap_articles, snap_fcs, log_id, now, tenant_id), ) - # Übersetzungen aktualisieren + # Übersetzungen aktualisieren (nur für gültige DB-IDs) for translation in analysis.get("translations", []): article_id = translation.get("article_id") - if article_id: + if isinstance(article_id, int): await db.execute( - "UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ?", - (translation.get("headline_de"), translation.get("content_de"), article_id), + "UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ? AND incident_id = ?", + (translation.get("headline_de"), translation.get("content_de"), article_id, incident_id), ) await db.commit() @@ -738,7 +819,29 @@ class AgentOrchestrator: # Schritt 4: Faktencheck factchecker = FactCheckerAgent() - fact_checks, fc_usage = await factchecker.check(title, all_articles, incident_type) + + # Bestehende Fakten laden für inkrementellen Check + cursor = await db.execute( + "SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?", + (incident_id,), + ) + existing_facts = [dict(row) for row in await cursor.fetchall()] + + if existing_facts and new_count > 0: + # Inkrementeller Faktencheck: nur neue Artikel + bestehende Fakten + logger.info(f"Inkrementeller Faktencheck: {new_count} neue Artikel, {len(existing_facts)} bestehende Fakten") + fact_checks, fc_usage = await factchecker.check_incremental( + title, new_articles_for_analysis, existing_facts, incident_type, + ) + else: + # Erstcheck: alle Artikel + cursor = await db.execute( + "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", + (incident_id,), + ) + all_articles_for_fc = [dict(row) for row in await cursor.fetchall()] + fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type) + if fc_usage: usage_acc.add(fc_usage) @@ -746,54 +849,52 @@ class AgentOrchestrator: self._check_cancelled(incident_id) # Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks) - cursor = await db.execute( - "SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?", - (incident_id,), - ) - row = await cursor.fetchone() - is_first_refresh = row["cnt"] == 0 + is_first_refresh = len(existing_facts) == 0 # Notification-Summary sammeln confirmed_count = 0 contradicted_count = 0 status_changes = [] + # Mutable Kopie für Fuzzy-Matching + remaining_existing = list(existing_facts) + for fc in fact_checks: - # Prüfen ob Claim schon existiert (mit altem Status) - cursor = await db.execute( - "SELECT id, status FROM fact_checks WHERE incident_id = ? AND claim = ?", - (incident_id, fc.get("claim", "")), - ) - existing = await cursor.fetchone() - old_status = existing["status"] if existing else None + new_claim = fc.get("claim", "") new_status = fc.get("status", "developing") - if existing: + # Fuzzy-Matching gegen bestehende Claims + matched = find_matching_claim(new_claim, remaining_existing) + + if matched: + old_status = matched.get("status") await db.execute( - "UPDATE fact_checks SET status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?", - (new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, existing["id"]), + "UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?", + (new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, matched["id"]), ) + # Aus der Liste entfernen damit nicht doppelt gematcht wird + remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]] + + # Status-Änderung tracken + if not is_first_refresh and old_status and old_status != new_status: + status_changes.append({ + "claim": new_claim, + "old_status": old_status, + "new_status": new_status, + }) else: await db.execute( """INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?)""", - (incident_id, fc.get("claim", ""), new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id), + (incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id), ) # Status-Statistik sammeln - if new_status == "confirmed" or new_status == "established": + if new_status in ("confirmed", "established"): confirmed_count += 1 - elif new_status == "contradicted" or new_status == "disputed": + elif new_status in ("contradicted", "disputed"): contradicted_count += 1 - # Echte Status-Änderungen tracken (nicht beim ersten Refresh) - if not is_first_refresh and old_status and old_status != new_status: - status_changes.append({ - "claim": fc.get("claim", ""), - "old_status": old_status, - "new_status": new_status, - }) - await db.commit() # Gebündelte Notification senden (nicht beim ersten Refresh) diff --git a/src/agents/researcher.py b/src/agents/researcher.py index 0adeeb4..5d60c8a 100644 --- a/src/agents/researcher.py +++ b/src/agents/researcher.py @@ -3,233 +3,10 @@ import json import logging import re from agents.claude_client import call_claude, ClaudeUsage +from config import CLAUDE_MODEL_FAST logger = logging.getLogger("osint.researcher") RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Recherche-Agent für ein Lagemonitoring-System. -AUSGABESPRACHE: {output_language} - -AUFTRAG: Suche nach aktuellen Informationen zu folgendem Vorfall: -Titel: {title} -Kontext: {description} - -REGELN: -- Suche nur bei seriösen Nachrichtenquellen (Nachrichtenagenturen, Qualitätszeitungen, öffentlich-rechtliche Medien, Behörden) -- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit) -- KEINE Boulevardmedien (Bild, Sun, Daily Mail etc.) -{language_instruction} -- Faktenbasiert und neutral - keine Spekulationen -- Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. Spiegel+, Zeit+, SZ+): https://www.removepaywalls.com/search?url=ARTIKEL_URL - -Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach. -Jedes Element hat diese Felder: -- "headline": Originale Überschrift -- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht) -- "source": Name der Quelle (z.B. "Reuters", "tagesschau") -- "source_url": URL des Artikels -- "content_summary": Zusammenfassung des Inhalts (2-3 Sätze, in Ausgabesprache) -- "language": Sprache des Originals (z.B. "de", "en", "fr") -- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format) - -Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" - -DEEP_RESEARCH_PROMPT_TEMPLATE = """Du bist ein OSINT-Tiefenrecherche-Agent für ein Lagemonitoring-System. -AUSGABESPRACHE: {output_language} - -AUFTRAG: Führe eine umfassende Hintergrundrecherche durch zu: -Titel: {title} -Kontext: {description} - -RECHERCHE-STRATEGIE: -- Breite Suche: Hintergrundberichte, Analysen, Expertenmeinungen, Think-Tank-Publikationen -- Suche nach: Akteuren, Zusammenhängen, historischem Kontext, rechtlichen Rahmenbedingungen -- Akademische und Fachquellen zusätzlich zu Nachrichtenquellen -- Nutze removepaywalls.com für Paywall-geschützte Artikel (z.B. https://www.removepaywalls.com/search?url=ARTIKEL_URL) -{language_instruction} -- Ziel: 8-15 hochwertige Quellen - -QUELLENTYPEN (priorisiert): -1. Fachzeitschriften und Branchenmedien -2. Qualitätszeitungen (Hintergrundberichte, Dossiers) -3. Think Tanks und Forschungsinstitute -4. Offizielle Dokumente und Pressemitteilungen -5. Nachrichtenagenturen (für Faktengrundlage) - -AUSSCHLUSS: -- KEIN Social Media (Twitter/X, Facebook, Instagram, TikTok, Reddit) -- KEINE Boulevardmedien -- KEINE Meinungsblogs ohne Quellenbelege - -Gib die Ergebnisse AUSSCHLIESSLICH als JSON-Array zurück, ohne Erklärungen davor oder danach. -Jedes Element hat diese Felder: -- "headline": Originale Überschrift -- "headline_de": Übersetzung in Ausgabesprache (falls Originalsprache abweicht) -- "source": Name der Quelle (z.B. "netzpolitik.org", "Handelsblatt") -- "source_url": URL des Artikels -- "content_summary": Ausführliche Zusammenfassung des Inhalts (3-5 Sätze, in Ausgabesprache) -- "language": Sprache des Originals (z.B. "de", "en", "fr") -- "published_at": Veröffentlichungsdatum falls bekannt (ISO-Format) - -Antworte NUR mit dem JSON-Array. Keine Einleitung, keine Erklärung.""" - -# Sprach-Anweisungen -LANG_INTERNATIONAL = "- Suche in Deutsch UND Englisch für internationale Abdeckung" -LANG_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen" - -LANG_DEEP_INTERNATIONAL = "- Suche in Deutsch, Englisch und weiteren relevanten Sprachen" -LANG_DEEP_GERMAN_ONLY = "- Suche NUR auf Deutsch bei deutschsprachigen Quellen (Deutschland, Österreich, Schweiz)\n- KEINE englischsprachigen oder anderssprachigen Quellen" - - -FEED_SELECTION_PROMPT_TEMPLATE = """Du bist ein OSINT-Analyst. Wähle aus dieser Feed-Liste die Feeds aus, die für die Lage relevant sein könnten. - -LAGE: {title} -KONTEXT: {description} -INTERNATIONALE QUELLEN: {international} - -FEEDS: -{feed_list} - -REGELN: -- Wähle alle Feeds die thematisch oder regional relevant sein könnten -- Lieber einen Feed zu viel als zu wenig auswählen -- Bei "Internationale Quellen: Nein": Keine internationalen Feeds auswählen -- Allgemeine Nachrichtenfeeds (tagesschau, Spiegel etc.) sind fast immer relevant -- Antworte NUR mit einem JSON-Array der Nummern, z.B. [1, 2, 5, 12]""" - - -class ResearcherAgent: - """Führt OSINT-Recherchen über Claude CLI WebSearch durch.""" - - async def select_relevant_feeds( - self, - title: str, - description: str, - international: bool, - feeds_metadata: list[dict], - ) -> tuple[list[dict], ClaudeUsage | None]: - """Lässt Claude die relevanten Feeds für eine Lage vorauswählen. - - Returns: - (ausgewählte Feeds, usage) — Bei Fehler: (alle Feeds, None) - """ - # Feed-Liste als nummerierte Übersicht formatieren - feed_lines = [] - for i, feed in enumerate(feeds_metadata, 1): - feed_lines.append( - f"{i}. {feed['name']} ({feed['domain']}) [{feed['category']}]" - ) - - prompt = FEED_SELECTION_PROMPT_TEMPLATE.format( - title=title, - description=description or "Keine weitere Beschreibung", - international="Ja" if international else "Nein", - feed_list="\n".join(feed_lines), - ) - - try: - result, usage = await call_claude(prompt, tools=None) - - # JSON-Array aus Antwort extrahieren - match = re.search(r'\[[\d\s,]+\]', result) - if not match: - logger.warning("Feed-Selektion: Kein JSON-Array in Antwort, nutze alle Feeds") - return feeds_metadata, usage - - indices = json.loads(match.group()) - selected = [] - for idx in indices: - if isinstance(idx, int) and 1 <= idx <= len(feeds_metadata): - selected.append(feeds_metadata[idx - 1]) - - if not selected: - logger.warning("Feed-Selektion: Keine gültigen Indizes, nutze alle Feeds") - return feeds_metadata, usage - - logger.info( - f"Feed-Selektion: {len(selected)} von {len(feeds_metadata)} Feeds ausgewählt" - ) - return selected, usage - - except Exception as e: - logger.warning(f"Feed-Selektion fehlgeschlagen ({e}), nutze alle Feeds") - return feeds_metadata, None - - async def search(self, title: str, description: str = "", incident_type: str = "adhoc", international: bool = True) -> tuple[list[dict], ClaudeUsage | None]: - """Sucht nach Informationen zu einem Vorfall.""" - from config import OUTPUT_LANGUAGE - if incident_type == "research": - lang_instruction = LANG_DEEP_INTERNATIONAL if international else LANG_DEEP_GERMAN_ONLY - prompt = DEEP_RESEARCH_PROMPT_TEMPLATE.format( - title=title, description=description, language_instruction=lang_instruction, - output_language=OUTPUT_LANGUAGE, - ) - else: - lang_instruction = LANG_INTERNATIONAL if international else LANG_GERMAN_ONLY - prompt = RESEARCH_PROMPT_TEMPLATE.format( - title=title, description=description, language_instruction=lang_instruction, - output_language=OUTPUT_LANGUAGE, - ) - - try: - result, usage = await call_claude(prompt) - articles = self._parse_response(result) - - # Ausgeschlossene Quellen dynamisch aus DB laden - excluded_sources = await self._get_excluded_sources() - - # Ausgeschlossene Quellen filtern - filtered = [] - for article in articles: - source = article.get("source", "").lower() - source_url = article.get("source_url", "").lower() - excluded = False - for excl in excluded_sources: - if excl in source or excl in source_url: - excluded = True - break - if not excluded: - # Bei nur-deutsch: nicht-deutsche Ergebnisse nachfiltern - if not international and article.get("language", "de") != "de": - continue - filtered.append(article) - - logger.info(f"Recherche ergab {len(filtered)} Artikel (von {len(articles)} gefundenen, international={international})") - return filtered, usage - - except Exception as e: - logger.error(f"Recherche-Fehler: {e}") - return [], None - - async def _get_excluded_sources(self) -> list[str]: - """Lädt ausgeschlossene Quellen aus der Datenbank.""" - try: - from source_rules import get_source_rules - rules = await get_source_rules() - return rules.get("excluded_domains", []) - except Exception as e: - logger.warning(f"Fallback auf config.py für Excluded Sources: {e}") - from config import EXCLUDED_SOURCES - return list(EXCLUDED_SOURCES) - - def _parse_response(self, response: str) -> list[dict]: - """Parst die Claude-Antwort als JSON-Array.""" - # Versuche JSON direkt zu parsen - try: - data = json.loads(response) - if isinstance(data, list): - return data - except json.JSONDecodeError: - pass - - # Versuche JSON aus der Antwort zu extrahieren (zwischen [ und ]) - match = re.search(r'\[.*\]', response, re.DOTALL) - if match: - try: - data = json.loads(match.group()) - if isinstance(data, list): - return data - except json.JSONDecodeError: - pass - - logger.warning("Konnte Claude-Antwort nicht als JSON parsen") - return [] +AUSPSN]][XY_BUQQΈXHXZY[[[ܛX][ۙ[H[[Hܙ[][]_B۝^\ܚ\[۟BQSHXH\ZH\p[XX[]Y[[ +XX[Y[\[]X[]0ޙZ][[0홙[X2 \XXHYYY[p((-%8M5QݥѕȽ`%хɅQQIФ(-%9 ձمɑ Mո5ь)Յ}Սѥ(ѕͥЁչɅMձѥ(9锁]эմ̴ԁݥѥѕѥٽՙչ͛ɱɔiͅչԁѕ(9锁ɕٕ̹݅ȁA͍݅鱥ѥMiЬMh輽ܹɕٕ̹͕݅ɍɰIQ%-1}UI0()ɝ͔UMM !1'e1% ́)M=8Ʌ񍬰ɭչٽȀmȁ))́ЁЁ͔(=ɥq͍ɥ(}U͕չ͝Ʌ́=ɥɅݕФ(ͽɍ9ȁEՕIѕ̈х͍Ԉ(ͽɍ}ɰUI0́ѥ(ѕ}յiͅչ́%̴̀ԁO锰͝Ʌ ȁ]эՙѥ聅͛ɱɔiͅչԴO锤(ՅMɅ́=ɥ̀Ȉ(Չ͡}ЈYٙѱཱչ͑մ́Ѐ%M<ɵФ()ݽє9UHЁ)M=8Ʌ丁-չɭչ()A}IMI !}AI=5AQ}Q5A1Q􀈈ԁЁ=M%9PQɕɍЁȁ1ѽɥMѕ)UMQAIM!UIcΔ \ No newline at end of file diff --git a/src/config.py b/src/config.py index 8588c50..7e4603e 100644 --- a/src/config.py +++ b/src/config.py @@ -22,6 +22,8 @@ JWT_EXPIRE_HOURS = 24 # Claude CLI CLAUDE_PATH = os.environ.get("CLAUDE_PATH", "/home/claude-dev/.claude/local/claude") CLAUDE_TIMEOUT = 300 # Sekunden (Claude mit WebSearch braucht oft 2-3 Min) +# Claude Modelle +CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001" # Für einfache Aufgaben (Feed-Selektion) # Ausgabesprache (Lagebilder, Faktenchecks, Zusammenfassungen) OUTPUT_LANGUAGE = "Deutsch" diff --git a/src/feeds/rss_parser.py b/src/feeds/rss_parser.py index d40a3a2..b047b85 100644 --- a/src/feeds/rss_parser.py +++ b/src/feeds/rss_parser.py @@ -121,8 +121,11 @@ class RSSParser: summary = entry.get("summary", "") text = f"{title} {summary}".lower() - # Prüfe ob mindestens ein Suchwort vorkommt - if all(word in text for word in search_words): + # Flexibles Keyword-Matching: mindestens die Hälfte der Suchworte muss vorkommen + min_matches = max(1, len(search_words) // 2) + match_count = sum(1 for word in search_words if word in text) + + if match_count >= min_matches: published = None if hasattr(entry, "published_parsed") and entry.published_parsed: try: @@ -130,6 +133,9 @@ class RSSParser: except (TypeError, ValueError): pass + # Relevanz-Score: Anteil der gematchten Suchworte (0.0-1.0) + relevance_score = match_count / len(search_words) if search_words else 0.0 + articles.append({ "headline": title, "headline_de": title if self._is_german(title) else None, @@ -139,6 +145,7 @@ class RSSParser: "content_de": summary[:1000] if summary and self._is_german(summary) else None, "language": "de" if self._is_german(title) else "en", "published_at": published, + "relevance_score": relevance_score, }) except Exception as e: