"""Agenten-Orchestrierung: Queue und Steuerung der Claude-Agenten.""" import asyncio import json import logging import re from datetime import datetime, timezone from config import TIMEZONE, MAX_FEEDS_PER_DOMAIN from typing import Optional from collections import defaultdict from urllib.parse import urlparse, urlunparse from agents.claude_client import UsageAccumulator from agents.factchecker import find_matching_claim from source_rules import ( _detect_category, _extract_domain, discover_source, domain_to_display_name, ) logger = logging.getLogger("osint.orchestrator") # Reputations-Score nach Quellenkategorie (für Relevanz-Scoring) CATEGORY_REPUTATION = { "nachrichten_de": 0.9, "nachrichten_int": 0.9, "presseagenturen": 1.0, "behoerden": 1.0, "fachmedien": 0.8, "international": 0.7, "sonstige": 0.4, } def _normalize_url(url: str) -> str: """URL normalisieren für Duplikat-Erkennung.""" if not url: return "" url = url.strip() try: parsed = urlparse(url) # Scheme normalisieren scheme = parsed.scheme.lower() or "https" # Host normalisieren (www entfernen, lowercase) netloc = parsed.netloc.lower() if netloc.startswith("www."): netloc = netloc[4:] # Pfad normalisieren (trailing slash entfernen) path = parsed.path.rstrip("/") # Query-Parameter und Fragment entfernen (Tracking-Params etc.) return urlunparse((scheme, netloc, path, "", "", "")) except Exception: return url.lower().strip().rstrip("/") def _normalize_headline(headline: str) -> str: """Überschrift normalisieren für Ähnlichkeitsvergleich.""" if not headline: return "" h = headline.lower().strip() # Umlaute normalisieren h = h.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss") # Sonderzeichen entfernen h = re.sub(r"[^\w\s]", "", h) h = re.sub(r"\s+", " ", h).strip() return h def _is_duplicate(article: dict, seen_urls: set, seen_headlines: set) -> bool: """Prüft ob ein Artikel ein Duplikat ist (URL oder Headline).""" url = article.get("source_url", "") headline = article.get("headline", "") # URL-Duplikat if url: norm_url = _normalize_url(url) if norm_url in seen_urls: return True seen_urls.add(norm_url) # Headline-Duplikat (nur wenn Überschrift lang genug) if headline and len(headline) > 20: norm_headline = _normalize_headline(headline) if norm_headline and norm_headline in seen_headlines: return True if norm_headline: seen_headlines.add(norm_headline) return False def _score_relevance(article: dict, search_words: list[str] = None) -> float: """Berechnet einen Relevanz-Score (0.0-1.0) für einen Artikel. Gewichtung: - 40% Keyword-Dichte (wie gut passt der Artikel zum Suchbegriff) - 30% Quellen-Reputation (basierend auf Kategorie) - 20% Inhaltstiefe (hat der Artikel substantiellen Inhalt) - 10% RSS-Score (falls vorhanden, vom RSS-Parser) """ score = 0.0 # 1. Keyword-Dichte (40%) rss_score = article.get("relevance_score", 0.0) if rss_score > 0: score += 0.4 * rss_score elif search_words: text = f"{article.get('headline', '')} {article.get('content_original', '')}".lower() match_count = sum(1 for w in search_words if w in text) score += 0.4 * (match_count / len(search_words)) if search_words else 0.0 # 2. Quellen-Reputation (30%) source_url = article.get("source_url", "") if source_url: domain = _extract_domain(source_url) category = _detect_category(domain) score += 0.3 * CATEGORY_REPUTATION.get(category, 0.4) else: score += 0.3 * 0.4 # Unbekannte Quelle # 3. Inhaltstiefe (20%) content = article.get("content_original") or article.get("content_de") or "" if len(content) > 500: score += 0.2 elif len(content) > 200: score += 0.1 elif len(content) > 50: score += 0.05 # 4. RSS-Score Bonus (10%) score += 0.1 * rss_score return min(1.0, score) async def _background_discover_sources(articles: list[dict]): """Background-Task: Registriert seriöse, unbekannte Quellen aus Recherche-Ergebnissen.""" from database import get_db db = await get_db() try: # 1. Unique Domains extrahieren seen_domains: set[str] = set() domains_to_check: list[tuple[str, str, str]] = [] for article in articles: url = article.get("source_url") if not url: continue domain = _extract_domain(url) if not domain or domain in seen_domains: continue seen_domains.add(domain) # 2. Nur seriöse Domains (in DOMAIN_CATEGORY_MAP, nicht "sonstige") category = _detect_category(domain) if category == "sonstige": continue domains_to_check.append((domain, url, category)) if not domains_to_check: return # 3. Gegen DB prüfen — welche Domains existieren schon? new_count = 0 for domain, url, category in domains_to_check: cursor = await db.execute( "SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'rss_feed' AND status = 'active'", (domain.lower(),), ) existing_feeds = await cursor.fetchall() if len(existing_feeds) >= MAX_FEEDS_PER_DOMAIN: continue # Domain hat bereits genug aktive Feeds cursor = await db.execute( "SELECT id FROM sources WHERE LOWER(domain) = ?", (domain.lower(),), ) if await cursor.fetchone(): continue # Domain schon bekannt # 4. RSS-Feed-Erkennung try: result = await discover_source(url) name = domain_to_display_name(domain) source_type = result["source_type"] # "rss_feed" oder "web_source" feed_url = result.get("rss_url") await db.execute( """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by) VALUES (?, ?, ?, ?, ?, 'active', 'Auto-entdeckt via Recherche', 'system')""", (name, feed_url or f"https://{domain}", domain, source_type, category), ) new_count += 1 logger.info(f"Neue Quelle registriert: {name} ({domain}) als {source_type}") except Exception as e: logger.debug(f"Discovery fehlgeschlagen für {domain}: {e}") if new_count > 0: await db.commit() logger.info(f"Background-Discovery: {new_count} neue Quellen registriert") except Exception as e: logger.warning(f"Background-Discovery Fehler: {e}") finally: await db.close() async def _create_notifications_for_incident( db, incident_id: int, visibility: str, created_by: int, tenant_id: int, notifications: list[dict] ): """Erzeugt DB-Notifications fuer alle betroffenen Nutzer der Organisation. - Oeffentliche Lagen -> alle Nutzer der Org - Private Lagen -> nur Ersteller """ if not notifications: return now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') if visibility == "public" and tenant_id: cursor = await db.execute( "SELECT id FROM users WHERE organization_id = ? AND is_active = 1 AND last_login_at IS NOT NULL", (tenant_id,), ) user_ids = [row["id"] for row in await cursor.fetchall()] else: user_ids = [created_by] for user_id in user_ids: for notif in notifications: await db.execute( """INSERT INTO notifications (user_id, incident_id, type, title, text, icon, tenant_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( user_id, incident_id, notif.get("type", "refresh_summary"), notif["title"], notif["text"], notif.get("icon", "info"), tenant_id, now, ), ) await db.commit() logger.info(f"Notifications erstellt: {len(notifications)} x {len(user_ids)} Nutzer fuer Lage {incident_id}") async def _send_email_notifications_for_incident( db, incident_id: int, incident_title: str, visibility: str, created_by: int, tenant_id: int, notifications: list[dict] ): """Sendet E-Mail-Benachrichtigungen basierend auf individuellen Nutzer-Abos. Jeder Nutzer hat eigene E-Mail-Praeferenzen pro Lage (incident_subscriptions). Nur Nutzer die aktiv sind und sich mindestens einmal eingeloggt haben (last_login_at IS NOT NULL) werden beruecksichtigt. """ if not notifications: return from email_utils.sender import send_email from email_utils.templates import incident_notification_email from config import MAGIC_LINK_BASE_URL # Alle Nutzer mit aktiven Abos fuer diese Lage laden cursor = await db.execute( """SELECT s.notify_email_summary, s.notify_email_new_articles, s.notify_email_status_change, u.id, u.email, u.username FROM incident_subscriptions s JOIN users u ON u.id = s.user_id WHERE s.incident_id = ? AND u.is_active = 1 AND u.last_login_at IS NOT NULL AND (s.notify_email_summary = 1 OR s.notify_email_new_articles = 1 OR s.notify_email_status_change = 1)""", (incident_id,), ) subscribers = await cursor.fetchall() if not subscribers: return dashboard_url = f"{MAGIC_LINK_BASE_URL}/dashboard" for sub in subscribers: prefs = dict(sub) # Relevante Notifications basierend auf Nutzer-Praeferenzen filtern filtered_notifications = [] for n in notifications: ntype = n.get("type", "refresh_summary") if ntype == "refresh_summary" and prefs.get("notify_email_summary"): filtered_notifications.append(n) elif ntype == "new_articles" and prefs.get("notify_email_new_articles"): filtered_notifications.append(n) elif ntype == "status_change" and prefs.get("notify_email_status_change"): filtered_notifications.append(n) if not filtered_notifications: continue subject, html = incident_notification_email( username=prefs["username"], incident_title=incident_title, notifications=filtered_notifications, dashboard_url=dashboard_url, ) try: await send_email(prefs["email"], subject, html) logger.info(f"E-Mail-Benachrichtigung gesendet an {prefs['email']} fuer Lage {incident_id} ({len(filtered_notifications)} Items)") except Exception as e: logger.error(f"E-Mail-Benachrichtigung fehlgeschlagen fuer {prefs['email']}: {e}") class AgentOrchestrator: """Verwaltet die Claude-Agenten-Queue und koordiniert Recherche-Zyklen.""" def __init__(self): self._queue: asyncio.Queue = asyncio.Queue() self._running = False self._current_task: Optional[int] = None self._ws_manager = None self._queued_ids: set[int] = set() self._cancel_requested: set[int] = set() def set_ws_manager(self, ws_manager): """WebSocket-Manager setzen für Echtzeit-Updates.""" self._ws_manager = ws_manager async def start(self): """Queue-Worker starten.""" self._running = True asyncio.create_task(self._worker()) logger.info("Agenten-Orchestrator gestartet") async def stop(self): """Queue-Worker stoppen.""" self._running = False logger.info("Agenten-Orchestrator gestoppt") async def enqueue_refresh(self, incident_id: int, trigger_type: str = "manual") -> bool: """Refresh-Auftrag in die Queue stellen. Gibt False zurueck wenn bereits in Queue/aktiv.""" if incident_id in self._queued_ids or self._current_task == incident_id: logger.info(f"Refresh fuer Lage {incident_id} uebersprungen: bereits aktiv/in Queue") return False visibility, created_by, tenant_id = await self._get_incident_visibility(incident_id) self._queued_ids.add(incident_id) await self._queue.put((incident_id, trigger_type)) queue_size = self._queue.qsize() logger.info(f"Refresh fuer Lage {incident_id} eingereiht (Queue: {queue_size}, Trigger: {trigger_type})") if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": {"status": "queued", "queue_position": queue_size}, }, visibility, created_by, tenant_id) return True async def cancel_refresh(self, incident_id: int) -> bool: """Fordert Abbruch eines laufenden Refreshes an.""" if self._current_task != incident_id: return False self._cancel_requested.add(incident_id) logger.info(f"Cancel angefordert fuer Lage {incident_id}") if self._ws_manager: try: vis, cb, tid = await self._get_incident_visibility(incident_id) except Exception: vis, cb, tid = "public", None, None await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": {"status": "cancelling", "detail": "Wird abgebrochen..."}, }, vis, cb, tid) return True def _check_cancelled(self, incident_id: int): """Prüft ob Abbruch angefordert wurde und wirft CancelledError.""" if incident_id in self._cancel_requested: self._cancel_requested.discard(incident_id) raise asyncio.CancelledError("Vom Nutzer abgebrochen") async def _worker(self): """Verarbeitet Refresh-Aufträge sequentiell.""" while self._running: try: item = await asyncio.wait_for(self._queue.get(), timeout=5.0) except asyncio.TimeoutError: continue incident_id, trigger_type = item self._queued_ids.discard(incident_id) self._current_task = incident_id logger.info(f"Starte Refresh für Lage {incident_id} (Trigger: {trigger_type})") RETRY_DELAYS = [0, 120, 300] # Sekunden: sofort, 2min, 5min TRANSIENT_ERRORS = (asyncio.TimeoutError, ConnectionError, OSError) last_error = None try: for attempt in range(3): try: await self._run_refresh(incident_id, trigger_type=trigger_type, retry_count=attempt) last_error = None break # Erfolg except asyncio.CancelledError: logger.info(f"Refresh fuer Lage {incident_id} abgebrochen") await self._mark_refresh_cancelled(incident_id) try: _vis, _cb, _tid = await self._get_incident_visibility(incident_id) except Exception: _vis, _cb, _tid = "public", None, None if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "refresh_cancelled", "incident_id": incident_id, "data": {"status": "cancelled"}, }, _vis, _cb, _tid) last_error = None break except TRANSIENT_ERRORS as e: last_error = e logger.warning(f"Transienter Fehler bei Lage {incident_id} (Versuch {attempt + 1}/3): {e}") if attempt < 2: await self._mark_refresh_failed(incident_id, str(e)) delay = RETRY_DELAYS[attempt + 1] logger.info(f"Retry in {delay}s für Lage {incident_id}") # Retry-Status per WebSocket senden if self._ws_manager: try: _vis, _cb, _tid = await self._get_incident_visibility(incident_id) except Exception: _vis, _cb, _tid = "public", None, None await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": {"status": "retrying", "attempt": attempt + 1, "delay": delay}, }, _vis, _cb, _tid) await asyncio.sleep(delay) else: await self._mark_refresh_failed(incident_id, f"Endgültig fehlgeschlagen nach 3 Versuchen: {e}") except Exception as e: last_error = e logger.error(f"Permanenter Fehler bei Refresh für Lage {incident_id}: {e}") await self._mark_refresh_failed(incident_id, str(e)) break # Permanenter Fehler, kein Retry if last_error and self._ws_manager: try: _vis, _cb, _tid = await self._get_incident_visibility(incident_id) except Exception: _vis, _cb, _tid = "public", None, None await self._ws_manager.broadcast_for_incident({ "type": "refresh_error", "incident_id": incident_id, "data": {"error": str(last_error)}, }, _vis, _cb, _tid) finally: self._current_task = None self._queue.task_done() async def _mark_refresh_cancelled(self, incident_id: int): """Markiert den laufenden Refresh-Log-Eintrag als cancelled.""" from database import get_db db = await get_db() try: await db.execute( """UPDATE refresh_log SET status = 'cancelled', error_message = 'Vom Nutzer abgebrochen', completed_at = ? WHERE incident_id = ? AND status = 'running'""", (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() except Exception as e: logger.warning(f"Konnte Refresh-Log nicht als abgebrochen markieren: {e}") finally: await db.close() async def _mark_refresh_failed(self, incident_id: int, error: str): """Markiert den laufenden Refresh-Log-Eintrag als error.""" from database import get_db db = await get_db() try: await db.execute( """UPDATE refresh_log SET status = 'error', error_message = ?, completed_at = ? WHERE incident_id = ? AND status = 'running'""", (error[:500], datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() except Exception as e: logger.warning(f"Konnte Refresh-Log nicht als fehlgeschlagen markieren: {e}") finally: await db.close() async def _get_incident_visibility(self, incident_id: int) -> tuple[str, Optional[int], Optional[int]]: """Incident-Visibility, created_by und tenant_id laden.""" from database import get_db visibility = "public" created_by = None tenant_id = None db = await get_db() try: cursor = await db.execute( "SELECT visibility, created_by, tenant_id FROM incidents WHERE id = ?", (incident_id,) ) row = await cursor.fetchone() if row: visibility = row["visibility"] or "public" created_by = row["created_by"] tenant_id = row["tenant_id"] finally: await db.close() return visibility, created_by, tenant_id async def _run_refresh(self, incident_id: int, trigger_type: str = "manual", retry_count: int = 0): """Führt einen kompletten Refresh-Zyklus durch.""" import aiosqlite from database import get_db from agents.researcher import ResearcherAgent from agents.analyzer import AnalyzerAgent from agents.factchecker import FactCheckerAgent from feeds.rss_parser import RSSParser db = await get_db() try: # Lage laden cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,)) incident = await cursor.fetchone() if not incident: logger.warning(f"Lage {incident_id} nicht gefunden") return title = incident["title"] description = incident["description"] or "" incident_type = incident["type"] or "adhoc" international = bool(incident["international_sources"]) if "international_sources" in incident.keys() else True visibility = incident["visibility"] if "visibility" in incident.keys() else "public" created_by = incident["created_by"] if "created_by" in incident.keys() else None tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None previous_summary = incident["summary"] or "" previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None # Bei Retry: vorherigen running-Eintrag als error markieren if retry_count > 0: await db.execute( """UPDATE refresh_log SET status = 'error', error_message = 'Retry gestartet', completed_at = ? WHERE incident_id = ? AND status = 'running'""", (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() # Refresh-Log starten now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') cursor = await db.execute( "INSERT INTO refresh_log (incident_id, started_at, status, trigger_type, retry_count, tenant_id) VALUES (?, ?, 'running', ?, ?, ?)", (incident_id, now, trigger_type, retry_count, tenant_id), ) await db.commit() log_id = cursor.lastrowid usage_acc = UsageAccumulator() research_status = "deep_researching" if incident_type == "research" else "researching" research_detail = "Hintergrundrecherche im Web läuft..." if incident_type == "research" else "RSS-Feeds und Web werden durchsucht..." if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": {"status": research_status, "detail": research_detail, "started_at": now}, }, visibility, created_by, tenant_id) # Schritt 1+2: RSS-Feeds und Claude-Recherche parallel ausführen async def _rss_pipeline(): """RSS-Feed-Suche (Feed-Selektion + Parsing).""" if incident_type != "adhoc": logger.info("Recherche-Modus: RSS-Feeds übersprungen") return [], None rss_researcher = ResearcherAgent() rss_parser = RSSParser() from source_rules import get_feeds_with_metadata all_feeds = await get_feeds_with_metadata(tenant_id=tenant_id) # Domain-Balance: Max. MAX_FEEDS_PER_DOMAIN Feeds pro Domain feeds_by_domain: dict[str, list[dict]] = defaultdict(list) for feed in all_feeds: feeds_by_domain[feed.get("domain", "")].append(feed) balanced_feeds = [] for domain, domain_feeds in feeds_by_domain.items(): if len(domain_feeds) > MAX_FEEDS_PER_DOMAIN: # Nach article_count sortieren, meistgenutzte behalten domain_feeds.sort(key=lambda f: f.get("article_count", 0), reverse=True) kept = domain_feeds[:MAX_FEEDS_PER_DOMAIN] logger.info( f"Domain-Balance: {domain} von {len(domain_feeds)} auf {MAX_FEEDS_PER_DOMAIN} Feeds begrenzt" ) balanced_feeds.extend(kept) else: balanced_feeds.extend(domain_feeds) if len(balanced_feeds) < len(all_feeds): logger.info(f"Domain-Balance gesamt: {len(all_feeds)} → {len(balanced_feeds)} Feeds") all_feeds = balanced_feeds feed_usage = None if len(all_feeds) > 20: selected_feeds, feed_usage = await rss_researcher.select_relevant_feeds( title, description, international, all_feeds ) logger.info(f"Feed-Selektion: {len(selected_feeds)} von {len(all_feeds)} Feeds ausgewählt") articles = await rss_parser.search_feeds_selective(title, selected_feeds) else: articles = await rss_parser.search_feeds(title, international=international, tenant_id=tenant_id) logger.info(f"RSS: {len(articles)} relevante Artikel gefunden (international={international})") return articles, feed_usage async def _web_search_pipeline(): """Claude WebSearch-Recherche.""" researcher = ResearcherAgent() results, usage = await researcher.search(title, description, incident_type, international=international) logger.info(f"Claude-Recherche: {len(results)} Ergebnisse") return results, usage # Beide Pipelines parallel starten (rss_articles, rss_feed_usage), (search_results, search_usage) = await asyncio.gather( _rss_pipeline(), _web_search_pipeline(), ) if rss_feed_usage: usage_acc.add(rss_feed_usage) if search_usage: usage_acc.add(search_usage) # Checkpoint 1: Cancel prüfen nach RSS/WebSearch self._check_cancelled(incident_id) # Alle Ergebnisse zusammenführen all_results = rss_articles + search_results # Duplikate entfernen (normalisierte URL + Headline-Ähnlichkeit) seen_urls = set() seen_headlines = set() unique_results = [] for article in all_results: if not _is_duplicate(article, seen_urls, seen_headlines): unique_results.append(article) dupes_removed = len(all_results) - len(unique_results) if dupes_removed > 0: logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend") # Relevanz-Scoring und Sortierung for article in unique_results: if "relevance_score" not in article or article["relevance_score"] == 0: article["relevance_score"] = _score_relevance(article) unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True) source_count = len(set(a.get("source", "") for a in unique_results)) if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": { "status": "analyzing", "detail": f"Analysiert {len(unique_results)} Meldungen aus {source_count} Quellen...", "started_at": now, }, }, visibility, created_by, tenant_id) # --- Set-basierte DB-Deduplizierung (statt N×M Queries) --- cursor = await db.execute( "SELECT id, source_url, headline FROM articles WHERE incident_id = ?", (incident_id,), ) existing_db_articles = await cursor.fetchall() existing_urls = set() existing_headlines = set() for row in existing_db_articles: if row["source_url"]: existing_urls.add(_normalize_url(row["source_url"])) if row["headline"] and len(row["headline"]) > 20: norm_h = _normalize_headline(row["headline"]) if norm_h: existing_headlines.add(norm_h) logger.info(f"DB-Dedup: {len(existing_urls)} URLs, {len(existing_headlines)} Headlines im Bestand") # Neue Artikel speichern und für Analyse tracken new_count = 0 new_articles_for_analysis = [] for article in unique_results: # URL-Duplikat gegen DB if article.get("source_url"): norm_url = _normalize_url(article["source_url"]) if norm_url in existing_urls: continue existing_urls.add(norm_url) # Headline-Duplikat gegen DB headline = article.get("headline", "") if headline and len(headline) > 20: norm_h = _normalize_headline(headline) if norm_h and norm_h in existing_headlines: continue if norm_h: existing_headlines.add(norm_h) cursor = await db.execute( """INSERT INTO articles (incident_id, headline, headline_de, source, source_url, content_original, content_de, language, published_at, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( incident_id, article.get("headline", ""), article.get("headline_de"), article.get("source", "Unbekannt"), article.get("source_url"), article.get("content_original"), article.get("content_de"), article.get("language", "de"), article.get("published_at"), tenant_id, ), ) new_count += 1 # Artikel mit DB-ID für die Analyse tracken article_with_id = dict(article) article_with_id["id"] = cursor.lastrowid new_articles_for_analysis.append(article_with_id) await db.commit() # Geoparsing: Orte aus neuen Artikeln extrahieren und speichern if new_articles_for_analysis: try: from agents.geoparsing import geoparse_articles logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...") geo_results = await geoparse_articles(new_articles_for_analysis) geo_count = 0 for art_id, locations in geo_results.items(): for loc in locations: await db.execute( """INSERT INTO article_locations (article_id, incident_id, location_name, location_name_normalized, country_code, latitude, longitude, confidence, source_text, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (art_id, incident_id, loc["location_name"], loc["location_name_normalized"], loc["country_code"], loc["lat"], loc["lon"], loc["confidence"], loc.get("source_text", ""), tenant_id), ) geo_count += 1 if geo_count > 0: await db.commit() logger.info(f"Geoparsing: {geo_count} Orte aus {len(geo_results)} Artikeln gespeichert") except Exception as e: logger.warning(f"Geoparsing fehlgeschlagen (Pipeline laeuft weiter): {e}") # Quellen-Statistiken aktualisieren if new_count > 0: try: from database import refresh_source_counts await refresh_source_counts(db) except Exception as e: logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}") # Schritt 3: Analyse und Zusammenfassung if new_count > 0 or not previous_summary: analyzer = AnalyzerAgent() # Inkrementelle Analyse wenn Lagebild bereits existiert und neue Artikel vorhanden if previous_summary and new_count > 0: logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild") analysis, analysis_usage = await analyzer.analyze_incremental( title, description, new_articles_for_analysis, previous_summary, previous_sources_json, incident_type, ) else: # Erstanalyse: Alle Artikel laden logger.info("Erstanalyse: Alle Artikel werden analysiert") cursor = await db.execute( "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", (incident_id,), ) all_articles = [dict(row) for row in await cursor.fetchall()] analysis, analysis_usage = await analyzer.analyze(title, description, all_articles, incident_type) if analysis_usage: usage_acc.add(analysis_usage) if analysis: is_first_summary = not previous_summary # Snapshot des alten Lagebilds sichern (nur wenn schon eins existiert) if previous_summary: cursor = await db.execute( "SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?", (incident_id,), ) snap_articles = (await cursor.fetchone())["cnt"] cursor = await db.execute( "SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?", (incident_id,), ) snap_fcs = (await cursor.fetchone())["cnt"] await db.execute( """INSERT INTO incident_snapshots (incident_id, summary, sources_json, article_count, fact_check_count, refresh_log_id, created_at, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (incident_id, previous_summary, previous_sources_json, snap_articles, snap_fcs, log_id, now, tenant_id), ) # sources_json aus der Analyse extrahieren und speichern sources = analysis.get("sources", []) sources_json = json.dumps(sources, ensure_ascii=False) if sources else None new_summary = analysis.get("summary", "") await db.execute( "UPDATE incidents SET summary = ?, sources_json = ?, updated_at = ? WHERE id = ?", (new_summary, sources_json, now, incident_id), ) # Beim ersten Refresh: Snapshot des neuen Lagebilds erstellen if is_first_summary and new_summary: cursor = await db.execute( "SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?", (incident_id,), ) snap_articles = (await cursor.fetchone())["cnt"] cursor = await db.execute( "SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?", (incident_id,), ) snap_fcs = (await cursor.fetchone())["cnt"] await db.execute( """INSERT INTO incident_snapshots (incident_id, summary, sources_json, article_count, fact_check_count, refresh_log_id, created_at, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (incident_id, new_summary, sources_json, snap_articles, snap_fcs, log_id, now, tenant_id), ) # Übersetzungen aktualisieren (nur für gültige DB-IDs) for translation in analysis.get("translations", []): article_id = translation.get("article_id") if isinstance(article_id, int): await db.execute( "UPDATE articles SET headline_de = ?, content_de = ? WHERE id = ? AND incident_id = ?", (translation.get("headline_de"), translation.get("content_de"), article_id, incident_id), ) await db.commit() # Checkpoint 2: Cancel prüfen nach Analyse self._check_cancelled(incident_id) if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": {"status": "factchecking", "detail": "Prüft Fakten gegen unabhängige Quellen...", "started_at": now}, }, visibility, created_by, tenant_id) # Schritt 4: Faktencheck factchecker = FactCheckerAgent() # Bestehende Fakten laden für inkrementellen Check cursor = await db.execute( "SELECT id, claim, status, sources_count FROM fact_checks WHERE incident_id = ?", (incident_id,), ) existing_facts = [dict(row) for row in await cursor.fetchall()] if existing_facts and new_count > 0: # Inkrementeller Faktencheck: nur neue Artikel + bestehende Fakten logger.info(f"Inkrementeller Faktencheck: {new_count} neue Artikel, {len(existing_facts)} bestehende Fakten") fact_checks, fc_usage = await factchecker.check_incremental( title, new_articles_for_analysis, existing_facts, incident_type, ) else: # Erstcheck: alle Artikel cursor = await db.execute( "SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC", (incident_id,), ) all_articles_for_fc = [dict(row) for row in await cursor.fetchall()] fact_checks, fc_usage = await factchecker.check(title, all_articles_for_fc, incident_type) if fc_usage: usage_acc.add(fc_usage) # Checkpoint 3: Cancel prüfen nach Faktencheck self._check_cancelled(incident_id) # Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks) is_first_refresh = len(existing_facts) == 0 # Notification-Summary sammeln confirmed_count = 0 contradicted_count = 0 status_changes = [] # Mutable Kopie für Fuzzy-Matching remaining_existing = list(existing_facts) for fc in fact_checks: new_claim = fc.get("claim", "") new_status = fc.get("status", "developing") # Fuzzy-Matching gegen bestehende Claims matched = find_matching_claim(new_claim, remaining_existing) if matched: old_status = matched.get("status") await db.execute( "UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?", (new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, matched["id"]), ) # Aus der Liste entfernen damit nicht doppelt gematcht wird remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]] # Status-Änderung tracken if not is_first_refresh and old_status and old_status != new_status: status_changes.append({ "claim": new_claim, "old_status": old_status, "new_status": new_status, }) else: await db.execute( """INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?)""", (incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id), ) # Status-Statistik sammeln if new_status in ("confirmed", "established"): confirmed_count += 1 elif new_status in ("contradicted", "disputed"): contradicted_count += 1 await db.commit() # Gebündelte Notification senden (nicht beim ersten Refresh) if not is_first_refresh: if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "refresh_summary", "incident_id": incident_id, "data": { "new_articles": new_count, "confirmed_count": confirmed_count, "contradicted_count": contradicted_count, "status_changes": status_changes, "is_first_refresh": False, "incident_title": title, }, }, visibility, created_by, tenant_id) # DB-Notifications erzeugen parts = [] if new_count > 0: parts.append(f"{new_count} neue Meldung{'en' if new_count != 1 else ''}") if confirmed_count > 0: parts.append(f"{confirmed_count} bestätigt") if contradicted_count > 0: parts.append(f"{contradicted_count} widersprochen") summary_text = ", ".join(parts) if parts else "Keine neuen Entwicklungen" db_notifications = [{ "type": "refresh_summary", "title": title, "text": f"Recherche: {summary_text}", "icon": "warning" if contradicted_count > 0 else "success", }] if new_count > 0: db_notifications.append({ "type": "new_articles", "title": title, "text": f"{new_count} neue Meldung{'en' if new_count != 1 else ''} gefunden", "icon": "info", }) for sc in status_changes: db_notifications.append({ "type": "status_change", "title": title, "text": f"{sc['claim']}: {sc['old_status']} \u2192 {sc['new_status']}", "icon": "error" if sc["new_status"] in ("contradicted", "disputed") else "success", }) if created_by: await _create_notifications_for_incident( db, incident_id, visibility, created_by, tenant_id, db_notifications ) # E-Mail-Benachrichtigungen versenden await _send_email_notifications_for_incident( db, incident_id, title, visibility, created_by, tenant_id, db_notifications ) # Refresh-Log abschließen (mit Token-Statistiken) await db.execute( """UPDATE refresh_log SET completed_at = ?, articles_found = ?, status = 'completed', input_tokens = ?, output_tokens = ?, cache_creation_tokens = ?, cache_read_tokens = ?, total_cost_usd = ?, api_calls = ? WHERE id = ?""", (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), new_count, usage_acc.input_tokens, usage_acc.output_tokens, usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, round(usage_acc.total_cost_usd, 7), usage_acc.call_count, log_id), ) await db.commit() logger.info( f"Token: {usage_acc.input_tokens} in / {usage_acc.output_tokens} out / " f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)" ) # Quellen-Discovery im Background starten if unique_results: asyncio.create_task(_background_discover_sources(unique_results)) if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "refresh_complete", "incident_id": incident_id, "data": {"new_articles": new_count, "status": "idle"}, }, visibility, created_by, tenant_id) logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel") finally: await db.close() # Singleton-Instanz orchestrator = AgentOrchestrator()