Dateien
AegisSight-Monitor/src/agents/orchestrator.py
UserIsMH b4898614c4 feat(topic-filter): Pre-Topic-Headline-Übersetzung für fremdsprachige Quellen
Der Topic-Filter (Haiku) hat bisher fremdsprachige Headlines (CJK, Arabisch,
Hebräisch, Kyrillisch) konservativ verworfen, weil er die Sicherheitsregel
"im Zweifel NICHT relevant" auf jeden Text anwandte, den er nicht klar lesen
konnte. Bei Lage 96 (Verfassungsänderung Japan) landeten so 79 von 87
Kandidaten im Papierkorb, darunter alle ja-Quellen mit Kanji-Headlines.

Lösung: ein eigener kleiner Haiku-Batch-Call vor dem Topic-Filter übersetzt
die Headlines (+ erste 240 Zeichen Content) fremdsprachiger Artikel ins
Englische und hängt sie als article["headline_en_for_topic"] /
"content_en_for_topic" an. Der Topic-Filter zeigt sie zusätzlich zum Original
und beurteilt damit ja/zh/ko/ar/he/ru/fa-Artikel fair.

- agents/translator.py: neue Funktion translate_headlines_for_topic_filter,
  unabhängig vom TRANSLATOR_ENABLED-Flag (Pflicht für korrekten Topic-Filter).
- agents/analyzer.py: filter_relevant_articles zeigt Übersetzungen mit an;
  Prompt-Regel erweitert.
- agents/orchestrator.py: Aufruf direkt vor dem Topic-Filter-Schritt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 01:43:27 +02:00

2039 Zeilen
99 KiB
Python

"""Agenten-Orchestrierung: Queue und Steuerung der Claude-Agenten."""
import asyncio
import json
import logging
import re
from datetime import datetime
from config import TIMEZONE
from typing import Optional
from urllib.parse import urlparse, urlunparse, quote_plus
import httpx
from agents.claude_client import UsageAccumulator, _cancel_event_var
from agents.factchecker import find_matching_claim, deduplicate_new_facts, TWOPHASE_MIN_FACTS
from source_rules import (
_detect_category,
_extract_domain,
discover_source,
domain_to_display_name,
)
logger = logging.getLogger("osint.orchestrator")
# Reputations-Score nach Quellenkategorie (fuer Relevanz-Scoring).
# Keys muessen mit den tatsaechlichen DB-Werten in sources.category uebereinstimmen
# (siehe DOMAIN_CATEGORY_MAP in source_rules.py).
CATEGORY_REPUTATION = {
"nachrichtenagentur": 1.0, # Reuters, AP, dpa, AFP — Primärquellen
"behoerde": 1.0, # BMI, BSI, Europol — offizielle Quellen
"oeffentlich-rechtlich": 0.95, # tagesschau, ZDF, ARD, BBC, ORF
"qualitaetszeitung": 0.85, # Spiegel, Zeit, FAZ, NZZ, Süddeutsche
"think-tank": 0.85, # SWP, IISS, Brookings, Chatham House
"fachmedien": 0.8, # heise, golem, netzpolitik, Handelsblatt
"international": 0.75, # CNN, Guardian, NYT, Al Jazeera, France24
"regional": 0.65, # regionale Tageszeitungen
"telegram": 0.5, # OSINT-Kanaele — gemischte Qualitaet
"sonstige": 0.4, # unkategorisiert
"boulevard": 0.3, # Bild, Sun etc.
}
# Research-Modus: Automatisch 3 Durchläufe für optimale Ergebnisse
RESEARCH_MULTI_PASS_COUNT = 3
RESEARCH_PASS_LABELS = {1: "Breite Erfassung", 2: "Vertiefung", 3: "Konsolidierung"}
def _normalize_url(url: str) -> str:
"""URL normalisieren für Duplikat-Erkennung."""
if not url:
return ""
url = url.strip()
try:
parsed = urlparse(url)
# Scheme normalisieren
scheme = parsed.scheme.lower() or "https"
# Host normalisieren (www entfernen, lowercase)
netloc = parsed.netloc.lower()
if netloc.startswith("www."):
netloc = netloc[4:]
# Pfad normalisieren (trailing slash entfernen)
path = parsed.path.rstrip("/")
# Query-Parameter und Fragment entfernen (Tracking-Params etc.)
return urlunparse((scheme, netloc, path, "", "", ""))
except Exception:
return url.lower().strip().rstrip("/")
def _normalize_headline(headline: str) -> str:
"""Überschrift normalisieren für Ähnlichkeitsvergleich."""
if not headline:
return ""
h = headline.lower().strip()
# Umlaute normalisieren
h = h.replace("ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss")
# Sonderzeichen entfernen
h = re.sub(r"[^\w\s]", "", h)
h = re.sub(r"\s+", " ", h).strip()
return h
def _is_duplicate(article: dict, seen_urls: set, seen_headlines: set) -> bool:
"""Prüft ob ein Artikel ein Duplikat ist (URL oder Headline)."""
url = article.get("source_url", "")
headline = article.get("headline", "")
# URL-Duplikat
if url:
norm_url = _normalize_url(url)
if norm_url in seen_urls:
return True
seen_urls.add(norm_url)
# Headline-Duplikat (nur wenn Überschrift lang genug)
if headline and len(headline) > 20:
norm_headline = _normalize_headline(headline)
if norm_headline and norm_headline in seen_headlines:
return True
if norm_headline:
seen_headlines.add(norm_headline)
return False
def _score_relevance(article: dict, search_words: list[str] = None) -> float:
"""Berechnet einen Relevanz-Score (0.0-1.0) für einen Artikel.
Gewichtung:
- 40% Keyword-Dichte (wie gut passt der Artikel zum Suchbegriff)
- 30% Quellen-Reputation (basierend auf Kategorie)
- 20% Inhaltstiefe (hat der Artikel substantiellen Inhalt)
- 10% RSS-Score (falls vorhanden, vom RSS-Parser)
"""
score = 0.0
# 1. Keyword-Dichte (40%)
rss_score = article.get("relevance_score", 0.0)
if rss_score > 0:
score += 0.4 * rss_score
elif search_words:
text = f"{article.get('headline', '')} {article.get('content_original', '')}".lower()
match_count = sum(1 for w in search_words if w in text)
score += 0.4 * (match_count / len(search_words)) if search_words else 0.0
# 2. Quellen-Reputation (30%)
source_url = article.get("source_url", "")
if source_url:
domain = _extract_domain(source_url)
category = _detect_category(domain)
score += 0.3 * CATEGORY_REPUTATION.get(category, 0.4)
else:
score += 0.3 * 0.4 # Unbekannte Quelle
# 3. Inhaltstiefe (20%)
content = article.get("content_original") or article.get("content_de") or ""
if len(content) > 500:
score += 0.2
elif len(content) > 200:
score += 0.1
elif len(content) > 50:
score += 0.05
# 4. RSS-Score Bonus (10%)
score += 0.1 * rss_score
return min(1.0, score)
async def _verify_article_urls(
articles: list[dict],
concurrency: int = 10,
timeout: float = 8.0,
) -> list[dict]:
"""Prueft WebSearch-URLs auf Erreichbarkeit. Ersetzt unerreichbare URLs durch Suchlinks."""
if not articles:
return []
sem = asyncio.Semaphore(concurrency)
results: list[dict | None] = [None] * len(articles)
async def _check(idx: int, article: dict, client: httpx.AsyncClient):
url = article.get("source_url", "").strip()
if not url:
results[idx] = article # Kein URL -> behalten (wird eh nicht verlinkt)
return
async with sem:
try:
resp = await client.head(url)
if resp.status_code == 405:
# Manche Server unterstuetzen kein HEAD
resp = await client.get(url, headers={"Range": "bytes=0-0"})
if 200 <= resp.status_code < 400:
results[idx] = article
return
# 404 oder anderer Fehler -> Fallback-Suchlink
logger.info(f"URL-Verifizierung: {resp.status_code} fuer {url}")
except Exception as e:
logger.debug(f"URL-Verifizierung fehlgeschlagen fuer {url}: {e}")
# Fallback: Google-Suchlink aus Headline + Source-Domain
headline = article.get("headline", "")
source = article.get("source", "")
domain = ""
try:
from urllib.parse import urlparse as _urlparse
domain = _urlparse(url).netloc
except Exception:
pass
if headline:
search_query = f"site:{domain} {headline}" if domain else f"{source} {headline}"
fallback_url = f"https://www.google.com/search?q={quote_plus(search_query)}"
article_copy = dict(article)
article_copy["source_url"] = fallback_url
article_copy["_url_repaired"] = True
results[idx] = article_copy
logger.info(f"URL-Fallback: {url} -> Google-Suche fuer \"{headline[:60]}...\"")
else:
results[idx] = article # Kein Headline -> Original behalten
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; AegisSight-Monitor/1.0)"},
) as client:
await asyncio.gather(*[_check(i, a, client) for i, a in enumerate(articles)])
verified = [r for r in results if r is not None]
repaired = sum(1 for r in verified if r.get("_url_repaired"))
ok = len(verified) - repaired
if repaired > 0:
logger.warning(
f"URL-Verifizierung: {ok} OK, {repaired} durch Suchlinks ersetzt "
f"(von {len(articles)} WebSearch-Artikeln)"
)
else:
logger.info(f"URL-Verifizierung: Alle {len(articles)} WebSearch-URLs erreichbar")
return verified
async def _background_discover_sources(articles: list[dict]):
"""Background-Task: Registriert seriöse, unbekannte Quellen aus Recherche-Ergebnissen."""
from database import get_db
db = await get_db()
try:
# 1. Unique Domains extrahieren
seen_domains: set[str] = set()
domains_to_check: list[tuple[str, str, str]] = []
for article in articles:
url = article.get("source_url")
if not url:
continue
domain = _extract_domain(url)
if not domain or domain in seen_domains:
continue
seen_domains.add(domain)
# 2. Nur seriöse Domains (in DOMAIN_CATEGORY_MAP, nicht "sonstige")
category = _detect_category(domain)
if category == "sonstige":
continue
domains_to_check.append((domain, url, category))
if not domains_to_check:
return
# 3. Gegen DB prüfen — welche Domains existieren schon?
new_count = 0
for domain, url, category in domains_to_check:
cursor = await db.execute(
"SELECT id FROM sources WHERE LOWER(domain) = ?",
(domain.lower(),),
)
if await cursor.fetchone():
continue # Domain schon bekannt
# 4. RSS-Feed-Erkennung
try:
result = await discover_source(url)
name = domain_to_display_name(domain)
source_type = result["source_type"] # "rss_feed" oder "web_source"
feed_url = result.get("rss_url")
await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by)
VALUES (?, ?, ?, ?, ?, 'active', 'Auto-entdeckt via Recherche', 'system')""",
(name, feed_url or f"https://{domain}", domain, source_type, category),
)
new_count += 1
logger.info(f"Neue Quelle registriert: {name} ({domain}) als {source_type}")
except Exception as e:
logger.debug(f"Discovery fehlgeschlagen für {domain}: {e}")
if new_count > 0:
await db.commit()
logger.info(f"Background-Discovery: {new_count} neue Quellen registriert")
except Exception as e:
logger.warning(f"Background-Discovery Fehler: {e}")
finally:
await db.close()
async def _create_notifications_for_incident(
db, incident_id: int, visibility: str, created_by: int, tenant_id: int, notifications: list[dict]
):
"""Erzeugt DB-Notifications fuer alle betroffenen Nutzer der Organisation.
- Oeffentliche Lagen -> alle Nutzer der Org
- Private Lagen -> nur Ersteller
"""
if not notifications:
return
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
if visibility == "public" and tenant_id:
cursor = await db.execute(
"SELECT id FROM users WHERE organization_id = ? AND is_active = 1 AND last_login_at IS NOT NULL",
(tenant_id,),
)
user_ids = [row["id"] for row in await cursor.fetchall()]
else:
user_ids = [created_by]
for user_id in user_ids:
for notif in notifications:
await db.execute(
"""INSERT INTO notifications (user_id, incident_id, type, title, text, icon, tenant_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
user_id,
incident_id,
notif.get("type", "refresh_summary"),
notif["title"],
notif["text"],
notif.get("icon", "info"),
tenant_id,
now,
),
)
await db.commit()
logger.info(f"Notifications erstellt: {len(notifications)} x {len(user_ids)} Nutzer fuer Lage {incident_id}")
async def _send_email_notifications_for_incident(
db, incident_id: int, incident_title: str, visibility: str,
created_by: int, tenant_id: int, notifications: list[dict],
incident_type: str = "adhoc",
):
"""Sendet E-Mail-Benachrichtigungen basierend auf individuellen Nutzer-Abos.
Jeder Nutzer hat eigene E-Mail-Praeferenzen pro Lage (incident_subscriptions).
Nur Nutzer die aktiv sind und sich mindestens einmal eingeloggt haben
(last_login_at IS NOT NULL) werden beruecksichtigt.
"""
if not notifications:
return
from email_utils.sender import send_email
from email_utils.templates import incident_notification_email
from config import MAGIC_LINK_BASE_URL
from services.org_settings import get_org_language
# Sprache der Org bestimmen (die Lage gehoert genau einer Org)
org_lang_iso = await get_org_language(db, tenant_id) if tenant_id else "de"
# Alle Nutzer mit aktiven Abos fuer diese Lage laden
cursor = await db.execute(
"""SELECT s.notify_email_summary, s.notify_email_new_articles,
s.notify_email_status_change, u.id, u.email, u.username
FROM incident_subscriptions s
JOIN users u ON u.id = s.user_id
WHERE s.incident_id = ?
AND u.is_active = 1
AND u.last_login_at IS NOT NULL
AND (s.notify_email_summary = 1
OR s.notify_email_new_articles = 1
OR s.notify_email_status_change = 1)""",
(incident_id,),
)
subscribers = await cursor.fetchall()
if not subscribers:
return
dashboard_url = f"{MAGIC_LINK_BASE_URL}/dashboard"
for sub in subscribers:
prefs = dict(sub)
# Relevante Notifications basierend auf Nutzer-Praeferenzen filtern
filtered_notifications = []
for n in notifications:
ntype = n.get("type", "refresh_summary")
if ntype == "refresh_summary" and prefs.get("notify_email_summary"):
filtered_notifications.append(n)
elif ntype == "new_articles" and prefs.get("notify_email_new_articles"):
filtered_notifications.append(n)
elif ntype == "status_change" and prefs.get("notify_email_status_change"):
filtered_notifications.append(n)
if not filtered_notifications:
continue
subject, html = incident_notification_email(
username=prefs["email"].split("@")[0],
incident_title=incident_title,
notifications=filtered_notifications,
dashboard_url=dashboard_url,
incident_type=incident_type,
lang=org_lang_iso,
)
try:
await send_email(prefs["email"], subject, html)
logger.info(f"E-Mail-Benachrichtigung gesendet an {prefs['email']} fuer Lage {incident_id} ({len(filtered_notifications)} Items)")
except Exception as e:
logger.error(f"E-Mail-Benachrichtigung fehlgeschlagen fuer {prefs['email']}: {e}")
class AgentOrchestrator:
"""Verwaltet die Claude-Agenten-Queue und koordiniert Recherche-Zyklen."""
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue()
self._running = False
self._current_task: Optional[int] = None
# Session-Start des aktuellen Tasks (UTC ISO mit 'Z'). Ueberspannt Multi-Pass
# und Retries innerhalb derselben Queue-Abarbeitung — verhindert, dass der
# Frontend-Timer beim Seiten-Reload auf den Pass/Retry-Start zurueckspringt.
self._current_task_started_at: Optional[str] = None
self._ws_manager = None
self._queued_ids: set[int] = set()
self._cancel_requested: set[int] = set()
self._cancel_event: asyncio.Event | None = None
def set_ws_manager(self, ws_manager):
"""WebSocket-Manager setzen für Echtzeit-Updates."""
self._ws_manager = ws_manager
async def start(self):
"""Queue-Worker starten."""
self._running = True
asyncio.create_task(self._worker())
logger.info("Agenten-Orchestrator gestartet")
async def stop(self):
"""Queue-Worker stoppen."""
self._running = False
logger.info("Agenten-Orchestrator gestoppt")
async def enqueue_refresh(self, incident_id: int, trigger_type: str = "manual", user_id: int = None) -> bool:
"""Refresh-Auftrag in die Queue stellen. Gibt False zurueck wenn bereits in Queue/aktiv."""
if incident_id in self._queued_ids or self._current_task == incident_id:
logger.info(f"Refresh fuer Lage {incident_id} uebersprungen: bereits aktiv/in Queue")
return False
visibility, created_by, tenant_id = await self._get_incident_visibility(incident_id)
self._queued_ids.add(incident_id)
await self._queue.put((incident_id, trigger_type, user_id))
queue_size = self._queue.qsize()
logger.info(f"Refresh fuer Lage {incident_id} eingereiht (Queue: {queue_size}, Trigger: {trigger_type})")
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "queued", "queue_position": queue_size},
}, visibility, created_by, tenant_id)
return True
async def cancel_refresh(self, incident_id: int) -> bool:
"""Fordert Abbruch eines laufenden oder wartenden Refreshes an."""
# Check if it's the currently running task
if self._current_task == incident_id:
self._cancel_requested.add(incident_id)
if self._cancel_event:
self._cancel_event.set()
logger.info(f"Cancel angefordert fuer laufende Lage {incident_id}")
if self._ws_manager:
try:
vis, cb, tid = await self._get_incident_visibility(incident_id)
except Exception:
vis, cb, tid = "public", None, None
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "cancelling", "detail": "Wird abgebrochen..."},
}, vis, cb, tid)
return True
# Check if it's in the queue (not yet started)
if incident_id in self._queued_ids:
self._queued_ids.discard(incident_id)
# Remove from asyncio queue (rebuild without this ID)
removed = False
new_items = []
while not self._queue.empty():
try:
item = self._queue.get_nowait()
iid = item[0] if isinstance(item, tuple) else item
if iid == incident_id:
removed = True
self._queue.task_done()
else:
new_items.append(item)
except Exception:
break
for item in new_items:
self._queue.put_nowait(item)
logger.info(f"Lage {incident_id} aus Warteschlange entfernt (removed={removed})")
# refresh_log-Eintrag schreiben, damit Auto-Refresh nicht im naechsten Tick erneut einreiht
await self._log_queued_cancellation(incident_id)
# Send cancelled event
if self._ws_manager:
try:
vis, cb, tid = await self._get_incident_visibility(incident_id)
except Exception:
vis, cb, tid = "public", None, None
await self._ws_manager.broadcast_for_incident({
"type": "refresh_cancelled",
"incident_id": incident_id,
"data": {"status": "cancelled"},
}, vis, cb, tid)
return True
return False
def _check_cancelled(self, incident_id: int):
"""Prüft ob Abbruch angefordert wurde und wirft CancelledError."""
if incident_id in self._cancel_requested:
self._cancel_requested.discard(incident_id)
raise asyncio.CancelledError("Vom Nutzer abgebrochen")
async def _worker(self):
"""Verarbeitet Refresh-Aufträge sequentiell."""
while self._running:
try:
item = await asyncio.wait_for(self._queue.get(), timeout=5.0)
except asyncio.TimeoutError:
continue
if len(item) == 3:
incident_id, trigger_type, user_id = item
else:
incident_id, trigger_type = item
user_id = None
self._queued_ids.discard(incident_id)
self._current_task = incident_id
# Session-Start EINMAL setzen — bleibt ueber Multi-Pass/Retry hinweg stabil
self._current_task_started_at = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
self._cancel_event = asyncio.Event()
_cancel_event_var.set(self._cancel_event)
logger.info(f"Starte Refresh für Lage {incident_id} (Trigger: {trigger_type})")
RETRY_DELAYS = [0, 120, 300] # Sekunden: sofort, 2min, 5min
TRANSIENT_ERRORS = (asyncio.TimeoutError, TimeoutError, ConnectionError, OSError)
from agents.claude_client import ClaudeCliError
last_error = None
def _is_transient_cli(err: Exception) -> bool:
return isinstance(err, ClaudeCliError) and err.error_type in ("rate_limit", "timeout")
try:
# Research-Lagen: Automatisch 3 Durchläufe nur beim ersten Refresh
incident_type, has_summary = await self._get_incident_info(incident_id)
use_multi_pass = incident_type == "research" and not has_summary
for attempt in range(3):
try:
if use_multi_pass:
await self._run_research_multi_pass(incident_id, trigger_type=trigger_type, user_id=user_id)
else:
await self._run_refresh(incident_id, trigger_type=trigger_type, retry_count=attempt, user_id=user_id)
last_error = None
break # Erfolg
except asyncio.CancelledError:
logger.info(f"Refresh fuer Lage {incident_id} abgebrochen")
await self._mark_refresh_cancelled(incident_id)
try:
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
except Exception:
_vis, _cb, _tid = "public", None, None
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "refresh_cancelled",
"incident_id": incident_id,
"data": {"status": "cancelled"},
}, _vis, _cb, _tid)
last_error = None
break
except Exception as e:
# Auth/CLI-Fehler: sofort abbrechen, kein Retry sinnvoll
if isinstance(e, ClaudeCliError) and e.error_type in ("auth_error", "cli_error"):
last_error = e
logger.error(f"Permanenter Claude-Fehler [{e.error_type}] bei Lage {incident_id}: {e}")
await self._mark_refresh_failed(incident_id, str(e))
break
# Transiente Fehler: Retry bis 3x
if isinstance(e, TRANSIENT_ERRORS) or _is_transient_cli(e):
last_error = e
kind = e.error_type if isinstance(e, ClaudeCliError) else type(e).__name__
logger.warning(f"Transienter Fehler [{kind}] bei Lage {incident_id} (Versuch {attempt + 1}/3): {e}")
if attempt < 2:
await self._mark_refresh_failed(incident_id, str(e))
delay = RETRY_DELAYS[attempt + 1]
logger.info(f"Retry in {delay}s für Lage {incident_id}")
if self._ws_manager:
try:
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
except Exception:
_vis, _cb, _tid = "public", None, None
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "retrying", "attempt": attempt + 1, "delay": delay},
}, _vis, _cb, _tid)
await asyncio.sleep(delay)
continue
else:
await self._mark_refresh_failed(incident_id, f"Endgültig fehlgeschlagen nach 3 Versuchen: {e}")
break
# Alles andere: permanent
last_error = e
logger.error(f"Permanenter Fehler bei Refresh für Lage {incident_id}: {e}")
await self._mark_refresh_failed(incident_id, str(e))
break
if last_error and self._ws_manager:
try:
_vis, _cb, _tid = await self._get_incident_visibility(incident_id)
except Exception:
_vis, _cb, _tid = "public", None, None
await self._ws_manager.broadcast_for_incident({
"type": "refresh_error",
"incident_id": incident_id,
"data": {"error": str(last_error)},
}, _vis, _cb, _tid)
finally:
self._current_task = None
self._current_task_started_at = None
self._cancel_event = None
_cancel_event_var.set(None)
self._queue.task_done()
async def _mark_refresh_cancelled(self, incident_id: int):
"""Markiert den laufenden Refresh-Log-Eintrag als cancelled und schliesst
alle noch aktiven Pipeline-Schritte. Ohne den zweiten Schritt blieb der
zuletzt aktive Step-Eintrag verwaist und das Frontend zeigte dauerhaft
'Schritt X laeuft', weil /api/incidents/<id>/pipeline aus
refresh_pipeline_steps liest."""
from database import get_db
from services.pipeline_tracker import cancel_active_steps
db = await get_db()
try:
now_str = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
cur = await db.execute(
"SELECT id FROM refresh_log WHERE incident_id = ? AND status = 'running'",
(incident_id,),
)
row = await cur.fetchone()
refresh_log_id = row["id"] if row else None
await db.execute(
"""UPDATE refresh_log SET status = 'cancelled', error_message = 'Vom Nutzer abgebrochen',
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
(now_str, incident_id),
)
await db.commit()
if refresh_log_id is not None:
await cancel_active_steps(db, refresh_log_id=refresh_log_id)
except Exception as e:
logger.warning(f"Konnte Refresh-Log nicht als abgebrochen markieren: {e}")
finally:
await db.close()
async def _log_queued_cancellation(self, incident_id: int):
"""Schreibt einen cancelled-Eintrag fuer einen Queue-Abbruch (Lage war noch nicht laufend).
Verhindert, dass der Auto-Refresh-Scheduler im naechsten Tick sofort wieder einreiht."""
from database import get_db
db = await get_db()
try:
cur = await db.execute("SELECT tenant_id FROM incidents WHERE id = ?", (incident_id,))
row = await cur.fetchone()
tid = row["tenant_id"] if row else None
now_str = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
await db.execute(
"""INSERT INTO refresh_log (incident_id, started_at, completed_at, status,
trigger_type, error_message, tenant_id)
VALUES (?, ?, ?, 'cancelled', 'manual', 'Aus Warteschlange entfernt', ?)""",
(incident_id, now_str, now_str, tid),
)
await db.commit()
except Exception as e:
logger.warning(f"Konnte Queue-Cancel nicht in refresh_log loggen: {e}")
finally:
await db.close()
async def _mark_refresh_failed(self, incident_id: int, error: str):
"""Markiert den laufenden Refresh-Log-Eintrag als error."""
from database import get_db
db = await get_db()
try:
await db.execute(
"""UPDATE refresh_log SET status = 'error', error_message = ?,
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
(error[:500], datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id),
)
await db.commit()
except Exception as e:
logger.warning(f"Konnte Refresh-Log nicht als fehlgeschlagen markieren: {e}")
finally:
await db.close()
async def _get_incident_visibility(self, incident_id: int) -> tuple[str, Optional[int], Optional[int]]:
"""Incident-Visibility, created_by und tenant_id laden."""
from database import get_db
visibility = "public"
created_by = None
tenant_id = None
db = await get_db()
try:
cursor = await db.execute(
"SELECT visibility, created_by, tenant_id FROM incidents WHERE id = ?", (incident_id,)
)
row = await cursor.fetchone()
if row:
visibility = row["visibility"] or "public"
created_by = row["created_by"]
tenant_id = row["tenant_id"]
finally:
await db.close()
return visibility, created_by, tenant_id
async def _run_refresh(self, incident_id: int, trigger_type: str = "manual", retry_count: int = 0, user_id: int = None, _suppress_complete: bool = False, _pass_info: dict = None):
"""Führt einen kompletten Refresh-Zyklus durch."""
import aiosqlite
from database import get_db
from agents.researcher import ResearcherAgent
from agents.analyzer import AnalyzerAgent
from agents.factchecker import FactCheckerAgent
from feeds.rss_parser import RSSParser
from services import pipeline_tracker as _pipe
db = await get_db()
try:
# Lage laden
cursor = await db.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,))
incident = await cursor.fetchone()
if not incident:
logger.warning(f"Lage {incident_id} nicht gefunden")
return
title = incident["title"]
description = incident["description"] or ""
incident_type = incident["type"] or "adhoc"
international = bool(incident["international_sources"]) if "international_sources" in incident.keys() else True
include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False
visibility = incident["visibility"] if "visibility" in incident.keys() else "public"
created_by = incident["created_by"] if "created_by" in incident.keys() else None
tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None
# Org-Sprache fuer alle KI-Agenten (Lagebild, Faktencheck, Recherche)
from services.org_settings import get_org_language, language_display
output_language_iso = await get_org_language(db, tenant_id) if tenant_id else "de"
output_language = language_display(output_language_iso)
previous_summary = incident["summary"] or ""
previous_sources_json = incident["sources_json"] if "sources_json" in incident.keys() else None
previous_developments = incident["latest_developments"] if "latest_developments" in incident.keys() else None
# Bei Retry: vorherigen running-Eintrag als error markieren
if retry_count > 0:
await db.execute(
"""UPDATE refresh_log SET status = 'error', error_message = 'Retry gestartet',
completed_at = ? WHERE incident_id = ? AND status = 'running'""",
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id),
)
await db.commit()
# Refresh-Log starten
now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S')
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
cursor = await db.execute(
"INSERT INTO refresh_log (incident_id, started_at, status, trigger_type, retry_count, tenant_id) VALUES (?, ?, 'running', ?, ?, ?)",
(incident_id, now, trigger_type, retry_count, tenant_id),
)
await db.commit()
log_id = cursor.lastrowid
usage_acc = UsageAccumulator()
# --- Pipeline-Tracking (Analysepipeline-Visualisierung) ---
_pass_nr = (_pass_info or {}).get("nr", 1)
_step_ids: dict[str, Optional[int]] = {}
async def _pipe_start(step_key: str):
try:
sid = await _pipe.start_step(
db, self._ws_manager,
refresh_log_id=log_id, incident_id=incident_id, step_key=step_key,
pass_number=_pass_nr, tenant_id=tenant_id,
visibility=visibility, created_by=created_by,
)
_step_ids[step_key] = sid
return sid
except Exception as _e:
logger.debug(f"_pipe_start({step_key}) ignoriert: {_e}")
return None
async def _pipe_done(step_key: str, count_value=None, count_secondary=None):
try:
sid = _step_ids.pop(step_key, None)
await _pipe.complete_step(
db, self._ws_manager, step_id=sid,
refresh_log_id=log_id, incident_id=incident_id, step_key=step_key,
pass_number=_pass_nr, count_value=count_value, count_secondary=count_secondary,
tenant_id=tenant_id, visibility=visibility, created_by=created_by,
)
except Exception as _e:
logger.debug(f"_pipe_done({step_key}) ignoriert: {_e}")
async def _pipe_skip(step_key: str):
try:
await _pipe.skip_step(
db, self._ws_manager,
refresh_log_id=log_id, incident_id=incident_id, step_key=step_key,
pass_number=_pass_nr, tenant_id=tenant_id,
visibility=visibility, created_by=created_by,
)
except Exception as _e:
logger.debug(f"_pipe_skip({step_key}) ignoriert: {_e}")
research_status = "deep_researching" if incident_type == "research" else "researching"
research_detail = "Hintergrundrecherche im Web läuft..." if incident_type == "research" else "RSS-Feeds und Web werden durchsucht..."
# Multi-Pass: Detail-Text mit Durchlauf-Info versehen
_ws_extra = {}
if _pass_info:
_pnr, _ptotal, _plabel = _pass_info["nr"], _pass_info["total"], _pass_info["label"]
research_detail = f"Recherche {_pnr}/{_ptotal}: {_plabel}..."
_ws_extra = {"research_pass": _pnr, "research_total_passes": _ptotal}
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": research_status, "detail": research_detail, "started_at": now_utc, **_ws_extra},
}, visibility, created_by, tenant_id)
# Bestehende Artikel vorladen (für Dedup UND Kontext)
cursor = await db.execute(
"SELECT id, source_url, headline, source FROM articles WHERE incident_id = ?",
(incident_id,),
)
existing_db_articles_full = await cursor.fetchall()
# Pipeline-Schritt 1: Quellen sichten (vorbereitet)
await _pipe_start("sources_review")
try:
if incident_type == "adhoc":
_src_cursor = await db.execute(
"SELECT COUNT(*) AS cnt FROM sources "
"WHERE status = 'active' "
"AND (tenant_id IS NULL OR tenant_id = ?)",
(tenant_id,),
)
_src_row = await _src_cursor.fetchone()
_src_total = _src_row["cnt"] if _src_row else 0
else:
_src_total = None
except Exception:
_src_total = None
# secondary wird später mit der Anzahl tatsächlich liefernder Quellen ergänzt
await _pipe_done("sources_review", count_value=_src_total, count_secondary=None)
# Schritt 1+2: RSS-Feeds und Claude-Recherche parallel ausführen
async def _rss_pipeline():
"""RSS-Feed-Suche (Feed-Selektion + dynamische Keywords + Parsing)."""
if incident_type != "adhoc":
logger.info("Recherche-Modus: RSS-Feeds übersprungen")
return [], None
rss_researcher = ResearcherAgent()
rss_parser = RSSParser()
from source_rules import get_feeds_with_metadata
all_feeds = await get_feeds_with_metadata(tenant_id=tenant_id)
# Dynamische Keywords aus den letzten Headlines extrahieren
cursor_hl = await db.execute(
"""SELECT COALESCE(headline_de, headline) as hl
FROM articles WHERE incident_id = ?
AND COALESCE(headline_de, headline) IS NOT NULL
ORDER BY collected_at DESC LIMIT 30""",
(incident_id,),
)
recent_headlines = [row["hl"] for row in await cursor_hl.fetchall() if row["hl"]]
dynamic_keywords, kw_usage = await rss_researcher.extract_dynamic_keywords(title, recent_headlines)
if kw_usage:
usage_acc.add(kw_usage)
feed_usage = None
keywords = dynamic_keywords # Dynamische Keywords bevorzugen
if len(all_feeds) > 20:
selected_feeds, feed_sel_keywords, feed_usage = await rss_researcher.select_relevant_feeds(
title, description, international, all_feeds
)
logger.info(f"Feed-Selektion: {len(selected_feeds)} von {len(all_feeds)} Feeds ausgewählt")
# Feed-Selektion-Keywords nur als Fallback wenn dynamische fehlen
if not keywords:
keywords = feed_sel_keywords
articles = await rss_parser.search_feeds_selective(title, selected_feeds, keywords=keywords)
else:
articles = await rss_parser.search_feeds(title, international=international, tenant_id=tenant_id, keywords=keywords, user_id=user_id)
logger.info(f"RSS: {len(articles)} relevante Artikel gefunden (international={international})")
return articles, feed_usage
async def _web_search_pipeline():
"""Claude WebSearch-Recherche mit Vorselektion eingetragener Web-Quellen."""
researcher = ResearcherAgent()
# Bestehende Artikel als Kontext mitgeben (Research + Adhoc)
existing_for_context = None
if existing_db_articles_full:
existing_for_context = [
{"source": row["source"] if "source" in row.keys() else "",
"headline": row["headline"],
"source_url": row["source_url"]}
for row in existing_db_articles_full
]
# Web-Quellen vorselektieren (Haiku) — nur thematisch passende werden Claude im Prompt empfohlen
preferred_sources = []
try:
from source_rules import get_feeds_with_metadata
web_sources = await get_feeds_with_metadata(tenant_id=tenant_id, source_type="web_source")
if web_sources:
preferred_sources, web_sel_usage = await researcher.select_relevant_web_sources(
title, description, web_sources,
)
if web_sel_usage:
usage_acc.add(web_sel_usage)
except Exception as e:
logger.warning(f"Web-Source-Vorselektion fehlgeschlagen (Pipeline laeuft weiter): {e}")
preferred_sources = []
results, usage, parse_failed = await researcher.search(
title, description, incident_type,
international=international, user_id=user_id,
existing_articles=existing_for_context,
preferred_sources=preferred_sources,
output_language=output_language,
output_language_iso=output_language_iso,
)
logger.info(
f"Claude-Recherche: {len(results)} Ergebnisse"
+ (f" (mit {len(preferred_sources)} Web-Quellen-Hinweis)" if preferred_sources else "")
+ (" (Parser fehlgeschlagen)" if parse_failed else "")
)
return results, usage, parse_failed
async def _podcast_pipeline():
"""Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten)."""
if incident_type != "adhoc":
logger.info("Recherche-Modus: Podcasts uebersprungen")
return [], None
from source_rules import get_feeds_with_metadata
podcast_feeds = await get_feeds_with_metadata(tenant_id=tenant_id, source_type="podcast_feed")
if not podcast_feeds:
return [], None
from feeds.podcast_parser import PodcastFeedParser
pd_parser = PodcastFeedParser()
pd_researcher = ResearcherAgent()
# Dynamische Keywords (eigener Haiku-Call, parallel zu RSS —
# billig und hält Pipelines unabhaengig)
cursor_pd_hl = await db.execute(
"""SELECT COALESCE(headline_de, headline) as hl
FROM articles WHERE incident_id = ?
AND COALESCE(headline_de, headline) IS NOT NULL
ORDER BY collected_at DESC LIMIT 30""",
(incident_id,),
)
pd_headlines = [row["hl"] for row in await cursor_pd_hl.fetchall() if row["hl"]]
pd_keywords, pd_kw_usage = await pd_researcher.extract_dynamic_keywords(title, pd_headlines)
if pd_kw_usage:
usage_acc.add(pd_kw_usage)
# Podcast-Parser erwartet (noch) eine flache Liste – Podcasts sind
# primaer deutschsprachig, daher reicht das gemeinsame Flatten.
from agents.researcher import flatten_keywords
pd_keywords_flat = flatten_keywords(pd_keywords)
articles = await pd_parser.search_feeds_selective(title, podcast_feeds, keywords=pd_keywords_flat or None)
logger.info(f"Podcast-Pipeline: {len(articles)} Episoden gefunden")
return articles, None
async def _telegram_pipeline():
"""Telegram-Kanal-Suche mit KI-basierter Kanal-Selektion."""
from feeds.telegram_parser import TelegramParser
tg_parser = TelegramParser()
# Alle Telegram-Kanaele laden
all_channels = await tg_parser._get_telegram_channels(tenant_id=tenant_id)
if not all_channels:
logger.info("Keine Telegram-Kanaele konfiguriert")
return [], None
# KI waehlt relevante Kanaele aus
tg_researcher = ResearcherAgent()
selected_channels, tg_sel_usage = await tg_researcher.select_relevant_telegram_channels(
title, description, all_channels
)
if tg_sel_usage:
usage_acc.add(tg_sel_usage)
selected_ids = [ch["id"] for ch in selected_channels]
logger.info(f"Telegram-Selektion: {len(selected_ids)} von {len(all_channels)} Kanaelen")
# Dynamische Keywords fuer Telegram (eigener Aufruf, da parallel zu RSS)
cursor_tg_hl = await db.execute(
"""SELECT COALESCE(headline_de, headline) as hl
FROM articles WHERE incident_id = ?
AND COALESCE(headline_de, headline) IS NOT NULL
ORDER BY collected_at DESC LIMIT 30""",
(incident_id,),
)
tg_headlines = [row["hl"] for row in await cursor_tg_hl.fetchall() if row["hl"]]
tg_keywords, tg_kw_usage = await tg_researcher.extract_dynamic_keywords(title, tg_headlines)
if tg_kw_usage:
usage_acc.add(tg_kw_usage)
if isinstance(tg_keywords, dict):
logger.info(f"Telegram-Keywords (Sprachen): { {k: len(v) for k, v in tg_keywords.items()} }")
else:
logger.info(f"Telegram-Keywords: {tg_keywords}")
articles = await tg_parser.search_channels(title, tenant_id=tenant_id, keywords=tg_keywords, channel_ids=selected_ids)
logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten")
return articles, None
# Pipeline-Schritt 2: Nachrichten sammeln (Start)
await _pipe_start("collect")
# Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram)
pipelines = [_rss_pipeline(), _web_search_pipeline(), _podcast_pipeline()]
if include_telegram:
pipelines.append(_telegram_pipeline())
pipeline_results = await asyncio.gather(*pipelines)
(rss_articles, rss_feed_usage) = pipeline_results[0]
(search_results, search_usage, search_parse_failed) = pipeline_results[1]
(podcast_articles, _podcast_usage) = pipeline_results[2]
telegram_articles = pipeline_results[3][0] if include_telegram else []
# Podcast-Artikel in die RSS-Liste einfuegen (gleicher Downstream-Pfad)
if podcast_articles:
rss_articles = (rss_articles or []) + podcast_articles
# URL-Verifizierung nur fuer WebSearch-Ergebnisse (RSS-URLs sind bereits verifiziert)
if search_results:
search_results = await _verify_article_urls(search_results)
if rss_feed_usage:
usage_acc.add(rss_feed_usage)
if search_usage:
usage_acc.add(search_usage)
# Checkpoint 1: Cancel prüfen nach RSS/WebSearch
self._check_cancelled(incident_id)
# Alle Ergebnisse zusammenführen
all_results = rss_articles + search_results + telegram_articles
# Pipeline-Schritt 2: Nachrichten sammeln (fertig)
try:
_delivering_sources = len({a.get("source", "") for a in all_results if a.get("source")})
except Exception:
_delivering_sources = None
await _pipe_done("collect", count_value=len(all_results), count_secondary=_delivering_sources)
# Pipeline-Schritt 3: Doppeltes filtern (Start)
await _pipe_start("dedup")
# Duplikate entfernen (normalisierte URL + Headline-Ähnlichkeit)
seen_urls = set()
seen_headlines = set()
unique_results = []
for article in all_results:
if not _is_duplicate(article, seen_urls, seen_headlines):
unique_results.append(article)
dupes_removed = len(all_results) - len(unique_results)
if dupes_removed > 0:
logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend")
await _pipe_done("dedup", count_value=dupes_removed, count_secondary=len(unique_results))
# Relevanz-Scoring und Sortierung
for article in unique_results:
if "relevance_score" not in article or article["relevance_score"] == 0:
article["relevance_score"] = _score_relevance(article)
unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True)
source_count = len(set(a.get("source", "") for a in unique_results))
_analyze_detail = f"Analysiert {len(unique_results)} Meldungen aus {source_count} Quellen..."
if _pass_info:
_analyze_detail = f"[{_pass_info['nr']}/{_pass_info['total']}] {_analyze_detail}"
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {
"status": "analyzing",
"detail": _analyze_detail,
"started_at": now,
**_ws_extra,
},
}, visibility, created_by, tenant_id)
# --- Set-basierte DB-Deduplizierung (statt N×M Queries) ---
# existing_db_articles_full wurde bereits oben geladen
existing_urls = set()
existing_headlines = set()
for row in existing_db_articles_full:
if row["source_url"]:
existing_urls.add(_normalize_url(row["source_url"]))
if row["headline"] and len(row["headline"]) > 20:
norm_h = _normalize_headline(row["headline"])
if norm_h:
existing_headlines.add(norm_h)
logger.info(f"DB-Dedup: {len(existing_urls)} URLs, {len(existing_headlines)} Headlines im Bestand")
# --- Dedup gegen Bestand: nur neue (noch nicht gespeicherte) Kandidaten behalten ---
new_candidates = []
for article in unique_results:
if article.get("source_url"):
norm_url = _normalize_url(article["source_url"])
if norm_url in existing_urls:
continue
existing_urls.add(norm_url)
headline = article.get("headline", "")
if headline and len(headline) > 20:
norm_h = _normalize_headline(headline)
if norm_h and norm_h in existing_headlines:
continue
if norm_h:
existing_headlines.add(norm_h)
new_candidates.append(article)
# Pipeline-Schritt 4: Relevanz bewerten (Start)
await _pipe_start("relevance")
_candidates_before_topic = len(new_candidates)
# --- Pre-Topic-Übersetzung: fremdsprachige Headlines ins Englische ---
# Damit der nachgelagerte Topic-Filter (Haiku) auch CJK/Arabisch/
# Hebräisch/Kyrillisch-Headlines fair beurteilen kann statt sie aus
# Sicherheit zu verwerfen.
if new_candidates:
try:
from agents.translator import translate_headlines_for_topic_filter
_pt_count, _pt_usage = await translate_headlines_for_topic_filter(new_candidates)
if _pt_usage:
usage_acc.add(_pt_usage)
if _pt_count:
logger.info(
f"Pre-Topic-Translate: {_pt_count} fremdsprachige Headlines übersetzt"
)
except Exception as e:
logger.warning(
f"Pre-Topic-Translate fehlgeschlagen (Pipeline laeuft weiter): {e}"
)
# --- Semantischer Topic-Filter (Haiku) ---
# Wirft Artikel raus, die zwar Keyword-Treffer hatten, aber das Kernthema
# der Lage nicht inhaltlich behandeln. Bei Fehler Fallback auf alle Kandidaten.
if new_candidates:
_tf_agent = AnalyzerAgent()
new_candidates, _tf_usage = await _tf_agent.filter_relevant_articles(
title, description, new_candidates,
)
if _tf_usage:
usage_acc.add(_tf_usage)
await _pipe_done("relevance", count_value=len(new_candidates), count_secondary=_candidates_before_topic)
# --- Neue (thematisch gefilterte) Artikel speichern und für Analyse tracken ---
new_count = 0
new_articles_for_analysis = []
for article in new_candidates:
cursor = await db.execute(
"""INSERT INTO articles (incident_id, headline, headline_de, source,
source_url, content_original, content_de, language, published_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
incident_id,
article.get("headline", ""),
article.get("headline_de"),
article.get("source", "Unbekannt"),
article.get("source_url"),
article.get("content_original"),
article.get("content_de"),
article.get("language", "de"),
article.get("published_at"),
tenant_id,
),
)
new_count += 1
article_with_id = dict(article)
article_with_id["id"] = cursor.lastrowid
new_articles_for_analysis.append(article_with_id)
await db.commit()
# Geoparsing: Orte aus neuen Artikeln extrahieren und speichern
if new_articles_for_analysis:
# Pipeline-Schritt 5: Orte erkennen (Start)
await _pipe_start("geoparsing")
try:
from agents.geoparsing import geoparse_articles
incident_context = f"{title} - {description}"
logger.info(f"Geoparsing fuer {len(new_articles_for_analysis)} neue Artikel...")
geo_results, category_labels = await geoparse_articles(new_articles_for_analysis, incident_context)
geo_count = 0
for art_id, locations in geo_results.items():
for loc in locations:
await db.execute(
"""INSERT INTO article_locations
(article_id, incident_id, location_name, location_name_normalized,
country_code, latitude, longitude, confidence, source_text, tenant_id, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(art_id, incident_id, loc["location_name"], loc["location_name_normalized"],
loc["country_code"], loc["lat"], loc["lon"], loc["confidence"],
loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")),
)
geo_count += 1
if geo_count > 0:
await db.commit()
logger.info(f"Geoparsing: {geo_count} Orte aus {len(geo_results)} Artikeln gespeichert")
# Category-Labels in Incident speichern (nur wenn neu generiert)
if category_labels:
import json as _json
await db.execute(
"UPDATE incidents SET category_labels = ? WHERE id = ? AND category_labels IS NULL",
(_json.dumps(category_labels, ensure_ascii=False), incident_id),
)
await db.commit()
logger.info(f"Category-Labels gespeichert fuer Incident {incident_id}: {category_labels}")
await _pipe_done("geoparsing", count_value=geo_count, count_secondary=len(geo_results) if geo_results else 0)
except Exception as e:
logger.warning(f"Geoparsing fehlgeschlagen (Pipeline laeuft weiter): {e}")
await _pipe_done("geoparsing", count_value=0, count_secondary=0)
else:
await _pipe_skip("geoparsing")
# Quellen-Statistiken aktualisieren
if new_count > 0:
try:
from database import refresh_source_counts
await refresh_source_counts(db)
except Exception as e:
logger.warning(f"Quellen-Statistiken konnten nicht aktualisiert werden: {e}")
# Schritt 3+4: Analyse und Faktencheck PARALLEL
if new_count > 0 or not previous_summary:
is_first_summary = not previous_summary
# Snapshot des alten Lagebilds sichern BEVOR parallele Verarbeitung startet
if previous_summary:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
snap_articles = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
snap_fcs = (await cursor.fetchone())["cnt"]
await db.execute(
"""INSERT INTO incident_snapshots
(incident_id, summary, sources_json,
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, previous_summary, previous_sources_json,
snap_articles, snap_fcs, log_id, now, tenant_id),
)
await db.commit()
# Bestehende Fakten und alle Artikel vorladen (für parallele Tasks)
cursor = await db.execute(
"SELECT id, claim, status, sources_count, evidence FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
existing_facts = [dict(row) for row in await cursor.fetchall()]
# Alle Artikel vorladen für Erstanalyse/Erstcheck
all_articles_preloaded = None
if not previous_summary or new_count == 0 or not existing_facts:
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
all_articles_preloaded = [dict(row) for row in await cursor.fetchall()]
_parallel_detail = "Analyse und Faktencheck laufen parallel..."
if _pass_info:
_parallel_detail = f"[{_pass_info['nr']}/{_pass_info['total']}] {_parallel_detail}"
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "status_update",
"incident_id": incident_id,
"data": {"status": "analyzing", "detail": _parallel_detail, "started_at": now_utc, **_ws_extra},
}, visibility, created_by, tenant_id)
# Quelleneinordnung (Bias) an Artikel anhaengen
try:
cursor = await db.execute(
"SELECT name, domain, bias FROM sources WHERE bias IS NOT NULL"
)
_bias_rows = await cursor.fetchall()
_bias_by_domain = {}
_bias_by_name = {}
for br in _bias_rows:
brd = dict(br)
if brd.get("domain"):
_bias_by_domain[brd["domain"].lower()] = brd["bias"]
if brd.get("name"):
_bias_by_name[brd["name"].lower()] = brd["bias"]
def _enrich_bias(articles_list):
if not articles_list:
return
for art in articles_list:
if art.get("source_bias"):
continue
src = (art.get("source") or "").lower()
url = (art.get("source_url") or "").lower()
# Match by name
bias = _bias_by_name.get(src)
if not bias:
# Match by domain in URL
for dom, b in _bias_by_domain.items():
if dom and dom in url:
bias = b
break
if bias:
art["source_bias"] = bias
_enrich_bias(new_articles_for_analysis)
_enrich_bias(all_articles_preloaded)
except Exception as e:
logger.warning("Bias-Anreicherung fehlgeschlagen (Pipeline laeuft weiter): %s", e)
# --- Analyse-Task (wird nach _do_factcheck mit fact_context_block aufgerufen) ---
async def _do_analysis(fact_context_block: str = ""):
analyzer = AnalyzerAgent()
if previous_summary and new_count > 0:
logger.info(f"Inkrementelle Analyse: {new_count} neue Artikel zum bestehenden Lagebild")
return await analyzer.analyze_incremental(
title, description, new_articles_for_analysis,
previous_summary, previous_sources_json, incident_type,
fact_context_block=fact_context_block,
output_language=output_language,
)
else:
logger.info("Erstanalyse: Alle Artikel werden analysiert")
return await analyzer.analyze(
title, description, all_articles_preloaded, incident_type,
fact_context_block=fact_context_block,
output_language=output_language,
)
# --- Faktencheck-Task ---
async def _do_factcheck():
factchecker = FactCheckerAgent()
if existing_facts and new_count > 0:
if len(existing_facts) >= TWOPHASE_MIN_FACTS:
logger.info(
f"Zwei-Phasen-Faktencheck: {new_count} neue Artikel, "
f"{len(existing_facts)} bestehende Fakten"
)
return await factchecker.check_incremental_twophase(
title, new_articles_for_analysis, existing_facts, incident_type,
output_language=output_language,
)
else:
logger.info(
f"Inkrementeller Faktencheck: {new_count} neue Artikel, "
f"{len(existing_facts)} bestehende Fakten"
)
return await factchecker.check_incremental(
title, new_articles_for_analysis, existing_facts, incident_type,
output_language=output_language,
)
else:
# Alle Artikel laden falls nicht vorab geladen (Henne-Ei-Problem:
# Summary existiert aber noch keine Factchecks)
articles_for_check = all_articles_preloaded
if not articles_for_check:
cursor = await db.execute(
"SELECT * FROM articles WHERE incident_id = ? ORDER BY collected_at DESC",
(incident_id,),
)
articles_for_check = [dict(row) for row in await cursor.fetchall()]
return await factchecker.check(title, articles_for_check, incident_type, output_language=output_language)
# Pipeline-Schritt 6: Faktencheck zuerst (sequenziell). Liefert den
# Faktenkontext fuer das Lagebild, damit dieses auf geprueftem Stand
# schreibt und Unklarheiten explizit benennt. Variante 1: bei
# Faktencheck-Fehler faellt das Lagebild auf den alten Pfad ohne
# Faktenkontext zurueck (Refresh bricht NICHT ab).
await _pipe_start("factcheck")
factcheck_result: tuple = ([], None)
fact_context_block = ""
factcheck_failed_reason: str | None = None
try:
factcheck_result = await _do_factcheck()
except Exception as fc_err:
factcheck_failed_reason = str(fc_err)
logger.warning(
"Faktencheck fehlgeschlagen, Lagebild laeuft ohne Faktenkontext: %s",
fc_err, exc_info=True,
)
fact_checks, fc_usage = factcheck_result if factcheck_result else ([], None)
# Pipeline-Schritt 6 done direkt nach dem Aufruf — die finale
# DB-Persistierung passiert weiter unten, aber fuer die UI ist
# der Faktencheck-Aufruf hier abgeschlossen. Der count_value
# ist eine Schaetzung (echte Zahl steht spaeter in der DB).
_fc_estimated_new = max(0, len(fact_checks or []) - len(existing_facts or []))
await _pipe_done(
"factcheck",
count_value=_fc_estimated_new,
count_secondary=len(fact_checks) if fact_checks else 0,
)
# Faktenkontext fuer das Lagebild bauen.
try:
from agents.analyzer import build_fact_context_block as _build_fc_ctx
fact_context_block = _build_fc_ctx(
existing_facts or [], fact_checks or [], incident_type,
)
if fact_context_block:
logger.info(
"Faktenkontext fuer Lagebild: %d Zeichen, basierend auf %d alten + %d neuen Fakten",
len(fact_context_block), len(existing_facts or []), len(fact_checks or []),
)
except Exception as ctx_err:
logger.warning("build_fact_context_block fehlgeschlagen: %s", ctx_err, exc_info=True)
fact_context_block = ""
# Pipeline-Schritt 7: Lagebild verfassen (jetzt mit Faktenkontext)
await _pipe_start("summary")
logger.info(
"Starte Lagebild (sequenziell nach Faktencheck%s)",
" — OHNE Faktenkontext (Fallback)" if factcheck_failed_reason else "",
)
analysis_result = await _do_analysis(fact_context_block)
analysis, analysis_usage = analysis_result
await _pipe_done("summary", count_value=None, count_secondary=None)
# --- Analyse-Ergebnisse verarbeiten ---
if analysis_usage:
usage_acc.add(analysis_usage)
if analysis:
sources = analysis.get("sources", [])
new_summary = analysis.get("summary", "") or previous_summary
# Validierung: Fehlende Quellennummern im Summary erkennen und reparieren
if sources and new_summary:
import re as _re
# Auch alphanumerische Refs wie [389a] erkennen
referenced_raw = set(_re.findall(r'\[(\d+[a-z]?)\]', new_summary))
referenced_nrs = set()
for r in referenced_raw:
try:
referenced_nrs.add(int(r))
except ValueError:
referenced_nrs.add(r) # Keep alphanumeric as string
defined_nrs = set()
for s in sources:
nr = s.get("nr", 0)
if isinstance(nr, int):
defined_nrs.add(nr)
elif isinstance(nr, str):
try:
defined_nrs.add(int(nr))
except ValueError:
defined_nrs.add(nr) # Keep alphanumeric like '389a'
missing_nrs = sorted(referenced_nrs - defined_nrs)
if missing_nrs:
truly_missing = []
for nr in missing_nrs:
# Buchstaben-Suffix (z.B. "22b") -> Basisnummer (22) aufloesen
if isinstance(nr, str) and _re.match(r"^\d+[a-z]$", nr):
base_nr = int(nr[:-1])
if base_nr in defined_nrs:
new_summary = new_summary.replace(f"[{nr}]", f"[{base_nr}]")
logger.info(
"Incident %d: Suffix-Ref [%s] auf Basisquelle [%d] aufgeloest",
incident_id, nr, base_nr
)
continue
truly_missing.append(nr)
if truly_missing:
logger.warning(
"Incident %d: %d Quellennummern im Summary ohne Eintrag in sources: %s",
incident_id, len(truly_missing), truly_missing[:20]
)
for nr in truly_missing:
sources.append({"nr": nr, "name": "Quelle", "url": ""})
logger.info("Platzhalter fuer fehlende Quelle [%s] eingefuegt", nr)
sources.sort(key=lambda s: int(s.get("nr", 0)) if isinstance(s.get("nr"), int) or (isinstance(s.get("nr"), str) and str(s.get("nr", "")).isdigit()) else 9999)
# Sicherstellen dass alle nr-Werte Integer sind (Claude liefert manchmal Strings)
if sources:
for s in sources:
nr = s.get("nr")
if isinstance(nr, str):
try:
s["nr"] = int(nr)
except ValueError:
pass
sources_json = json.dumps(sources, ensure_ascii=False) if sources else previous_sources_json
await db.execute(
"UPDATE incidents SET summary = ?, sources_json = ?, executive_summary = NULL, updated_at = ? WHERE id = ?",
(new_summary, sources_json, now, incident_id),
)
# Beim ersten Refresh: Snapshot des neuen Lagebilds erstellen
if is_first_summary and new_summary:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE incident_id = ?",
(incident_id,),
)
snap_articles = (await cursor.fetchone())["cnt"]
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM fact_checks WHERE incident_id = ?",
(incident_id,),
)
snap_fcs = (await cursor.fetchone())["cnt"]
await db.execute(
"""INSERT INTO incident_snapshots
(incident_id, summary, sources_json,
article_count, fact_check_count, refresh_log_id, created_at, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, new_summary, sources_json,
snap_articles, snap_fcs, log_id, now, tenant_id),
)
# Translations werden vom dedizierten Translator-Agent unten
# erzeugt (frueher inline im Analyzer-Output, das war token-
# instabil und schaetzte regelmaessig content_de aus).
await db.commit()
# Cancel-Check nach paralleler Verarbeitung
self._check_cancelled(incident_id)
# --- Translator (Haiku) fuer fremdsprachige Artikel ohne DE-Texte ---
# Idempotent: nur Artikel ohne headline_de/content_de werden geholt.
# Lauft nach der Analyse (Lagebild ist schon committed) und vor QC
# (damit normalize_umlaut_articles auch die frischen DE-Texte fasst).
try:
tr_cursor = await db.execute(
"""SELECT id, headline, content_original, language
FROM articles
WHERE incident_id = ?
AND language IS NOT NULL AND LOWER(language) != 'de'
AND (headline_de IS NULL OR headline_de = ''
OR content_de IS NULL OR content_de = '')""",
(incident_id,),
)
pending_translations = [dict(r) for r in await tr_cursor.fetchall()]
if pending_translations:
logger.info(
"Translator fuer Incident %d: %d Artikel ohne DE-Uebersetzung",
incident_id, len(pending_translations),
)
from agents.translator import translate_articles
from services.post_refresh_qc import normalize_german_umlauts as _norm_de2
translations = await translate_articles(
pending_translations,
output_lang="de",
usage_accumulator=usage_acc,
)
for t in translations:
hd = t.get("headline_de")
cd = t.get("content_de")
if hd:
hd, _ = _norm_de2(hd)
if cd:
cd, _ = _norm_de2(cd)
if hd or cd:
await db.execute(
"UPDATE articles SET headline_de = COALESCE(?, headline_de), "
"content_de = COALESCE(?, content_de) WHERE id = ? AND incident_id = ?",
(hd, cd, t["id"], incident_id),
)
await db.commit()
logger.info(
"Translator fuer Incident %d: %d/%d Artikel uebersetzt",
incident_id, len(translations), len(pending_translations),
)
except Exception as e:
logger.error("Translator-Fehler fuer Incident %d: %s", incident_id, e, exc_info=True)
# Refresh trotz Translator-Fehler weiterlaufen lassen
# --- Neueste Entwicklungen (nur Live-Monitoring / adhoc) ---
# Basis ist jetzt das frisch generierte Lagebild (autoritativ, thematisch sauber).
# Zeitstempel und Quellen kommen aus den jüngsten belegenden Artikeln.
dev_summary_source = (locals().get("new_summary") or previous_summary or "").strip()
if incident_type == "adhoc" and dev_summary_source:
try:
# Top-60 neueste Artikel mit Publikationsdatum als Beleg-Pool.
dev_cursor = await db.execute(
"""SELECT id, headline, headline_de, source, source_url, published_at
FROM articles
WHERE incident_id = ? AND published_at IS NOT NULL
ORDER BY published_at DESC LIMIT 60""",
(incident_id,),
)
dev_articles = [dict(row) for row in await dev_cursor.fetchall()]
dev_analyzer = AnalyzerAgent()
dev_text, dev_usage = await dev_analyzer.generate_latest_developments(
title, description, dev_summary_source, dev_articles, previous_developments,
output_language=output_language,
)
if dev_usage:
usage_acc.add(dev_usage)
if dev_text is not None:
await db.execute(
"UPDATE incidents SET latest_developments = ? WHERE id = ?",
(dev_text, incident_id),
)
await db.commit()
previous_developments = dev_text
except Exception as e:
logger.warning(f"Latest-Developments-Generator fehlgeschlagen: {e}")
# Cancel-Check nach Analyse+Faktencheck
self._check_cancelled(incident_id)
# --- Faktencheck-Ergebnisse verarbeiten ---
# Pre-Dedup: Duplikate aus LLM-Antwort entfernen
fact_checks = deduplicate_new_facts(fact_checks)
if fc_usage:
usage_acc.add(fc_usage)
# Prüfen ob dies der erste Refresh ist (keine vorherigen Faktenchecks)
is_first_refresh = len(existing_facts) == 0
# Notification-Summary sammeln
confirmed_count = 0
contradicted_count = 0
status_changes = []
# --- Schutz gegen Massen-Downgrades ---
# Wenn >50% der established/confirmed Fakten auf unverified/unconfirmed
# herabgestuft wuerden, verwerfe die FC-Ergebnisse komplett.
established_ids = {ef["id"] for ef in existing_facts if ef.get("status") in ("established", "confirmed")}
if established_ids and fact_checks:
_downgrade_count = 0
_remaining_tmp = list(existing_facts)
for _fc in fact_checks:
_matched = find_matching_claim(_fc.get("claim", ""), _remaining_tmp)
if _matched and _matched["id"] in established_ids:
_new_st = _fc.get("status", "developing")
if _new_st in ("unverified", "unconfirmed", "developing"):
_downgrade_count += 1
_remaining_tmp = [ef for ef in _remaining_tmp if ef["id"] != _matched["id"]]
_downgrade_ratio = _downgrade_count / len(established_ids) if established_ids else 0
if _downgrade_ratio > 0.5:
logger.warning(
f"Faktencheck-Ergebnisse verworfen: {_downgrade_count}/{len(established_ids)} "
f"established Fakten wuerden herabgestuft ({_downgrade_ratio:.0%}). "
f"Bestehende Fakten bleiben unveraendert."
)
fact_checks = []
# Mutable Kopie für Fuzzy-Matching
remaining_existing = list(existing_facts)
for fc in fact_checks:
new_claim = fc.get("claim", "")
new_status = fc.get("status", "developing")
# Fuzzy-Matching gegen bestehende Claims
matched = find_matching_claim(new_claim, remaining_existing)
if matched:
old_status = matched.get("status")
# status_history aktualisieren bei Statusaenderung
history_update = ""
if old_status and old_status != new_status:
import json as _json
cursor_hist = await db.execute(
"SELECT status_history FROM fact_checks WHERE id = ?",
(matched["id"],),
)
hist_row = await cursor_hist.fetchone()
try:
history = _json.loads(hist_row[0] or "[]") if hist_row else []
except (ValueError, TypeError):
history = []
history.append({"status": new_status, "at": now})
history_update = _json.dumps(history)
# Evidence: Alte URLs beibehalten wenn neue keine hat
new_evidence = fc.get("evidence") or ""
import re as _re
if not _re.search(r"https?://", new_evidence) and matched.get("evidence"):
old_evidence = matched["evidence"] or ""
if _re.search(r"https?://", old_evidence):
bracket_match = _re.search(r"\[(?:Quellen|Weitere Quellen|Ursprungsquellen):.*?\]", old_evidence)
if bracket_match:
new_evidence = new_evidence.rstrip(". ") + ". " + bracket_match.group()
else:
new_evidence = old_evidence
await db.execute(
"UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ?"
+ (", status_history = ?" if history_update else "")
+ " WHERE id = ?",
(new_claim, new_status, fc.get("sources_count", 0), new_evidence, fc.get("is_notification", 0), now)
+ ((history_update,) if history_update else ())
+ (matched["id"],),
)
# Aus der Liste entfernen damit nicht doppelt gematcht wird
remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]]
# Status-Änderung tracken
if not is_first_refresh and old_status and old_status != new_status:
status_changes.append({
"claim": new_claim,
"old_status": old_status,
"new_status": new_status,
})
else:
import json as _json
initial_history = _json.dumps([{"status": new_status, "at": now}])
await db.execute(
"""INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id, status_history)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id, initial_history),
)
# Status-Statistik sammeln
if new_status in ("confirmed", "established"):
confirmed_count += 1
elif new_status in ("contradicted", "disputed"):
contradicted_count += 1
await db.commit()
# Pipeline-Schritt 7 (Fakten pruefen) wurde bereits frueher als done
# markiert (siehe weiter oben — direkt nach dem _do_factcheck-Aufruf,
# bevor das Lagebild generiert wurde). Hier nur noch die DB-
# Persistierung der Fakten, ohne den Step erneut zu schliessen.
# Pipeline-Schritt 8: Qualitätscheck (Start, ohne Zahlen)
await _pipe_start("qc")
# Post-Refresh Quality Check: Duplikate und Karten-Kategorien pruefen
try:
from services.post_refresh_qc import run_post_refresh_qc
qc_result = await run_post_refresh_qc(db, incident_id)
if qc_result.get("facts_removed", 0) > 0 or qc_result.get("locations_fixed", 0) > 0:
logger.info(
f"QC: {qc_result['facts_removed']} Duplikate, "
f"{qc_result['locations_fixed']} Location-Fixes"
)
except Exception as qc_err:
logger.warning(f"Post-Refresh QC fehlgeschlagen: {qc_err}")
await _pipe_done("qc", count_value=None, count_secondary=None)
# Pipeline-Schritt 9: Benachrichtigen (Start)
await _pipe_start("notify")
_notify_count = 0
# Gebündelte Notification senden (nicht beim ersten Refresh)
if not is_first_refresh:
if self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "refresh_summary",
"incident_id": incident_id,
"data": {
"new_articles": new_count,
"confirmed_count": confirmed_count,
"contradicted_count": contradicted_count,
"status_changes": status_changes,
"is_first_refresh": False,
"incident_title": title,
},
}, visibility, created_by, tenant_id)
# DB-Notifications erzeugen (Texte org-sprach-relativ)
is_en = output_language_iso == "en"
parts = []
if is_en:
if new_count > 0:
parts.append(f"{new_count} new article{'s' if new_count != 1 else ''}")
if confirmed_count > 0:
parts.append(f"{confirmed_count} confirmed")
if contradicted_count > 0:
parts.append(f"{contradicted_count} contradicted")
summary_text = ", ".join(parts) if parts else "No new developments"
research_prefix = "Research"
new_articles_msg = f"{new_count} new article{'s' if new_count != 1 else ''} found"
else:
if new_count > 0:
parts.append(f"{new_count} neue Meldung{'en' if new_count != 1 else ''}")
if confirmed_count > 0:
parts.append(f"{confirmed_count} bestätigt")
if contradicted_count > 0:
parts.append(f"{contradicted_count} widersprochen")
summary_text = ", ".join(parts) if parts else "Keine neuen Entwicklungen"
research_prefix = "Recherche"
new_articles_msg = f"{new_count} neue Meldung{'en' if new_count != 1 else ''} gefunden"
db_notifications = [{
"type": "refresh_summary",
"title": title,
"text": f"{research_prefix}: {summary_text}",
"icon": "warning" if contradicted_count > 0 else "success",
}]
if new_count > 0:
db_notifications.append({
"type": "new_articles",
"title": title,
"text": new_articles_msg,
"icon": "info",
})
for sc in status_changes:
db_notifications.append({
"type": "status_change",
"title": title,
"text": f"{sc['claim']}: {sc['old_status']} \u2192 {sc['new_status']}",
"icon": "error" if sc["new_status"] in ("contradicted", "disputed") else "success",
})
if created_by:
await _create_notifications_for_incident(
db, incident_id, visibility, created_by, tenant_id, db_notifications
)
# E-Mail-Benachrichtigungen versenden
await _send_email_notifications_for_incident(
db, incident_id, title, visibility, created_by, tenant_id, db_notifications,
incident_type=incident_type,
)
_notify_count = len(db_notifications)
# Pipeline-Schritt 9: Benachrichtigen (fertig)
await _pipe_done("notify", count_value=_notify_count, count_secondary=None)
# Falls Analyse-Block übersprungen wurde (kein neuer Artikel und Summary existiert),
# die noch offenen Pipeline-Schritte als übersprungen markieren.
for _skipped_key in ("summary", "factcheck", "qc", "notify"):
if _skipped_key in _step_ids or _skipped_key not in {"summary", "factcheck", "qc", "notify"}:
pass
# Saubere Variante: alle noch offenen Steps am Ende skippen
for _open_key in list(_step_ids.keys()):
await _pipe_skip(_open_key)
# Auch Steps die nie gestartet wurden (bei übersprungenem Outer-If)
_started_keys = set()
try:
_check_cursor = await db.execute(
"SELECT step_key FROM refresh_pipeline_steps WHERE refresh_log_id = ? AND pass_number = ?",
(log_id, _pass_nr),
)
_started_keys = {row[0] for row in await _check_cursor.fetchall()}
except Exception:
pass
for _missing_key in ("summary", "factcheck", "qc", "notify"):
if _missing_key not in _started_keys:
await _pipe_skip(_missing_key)
# Refresh-Log abschließen (mit Token-Statistiken)
await db.execute(
"""UPDATE refresh_log SET
completed_at = ?, articles_found = ?, status = 'completed',
input_tokens = ?, output_tokens = ?,
cache_creation_tokens = ?, cache_read_tokens = ?,
total_cost_usd = ?, api_calls = ?
WHERE id = ?""",
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), new_count,
usage_acc.input_tokens, usage_acc.output_tokens,
usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens,
round(usage_acc.total_cost_usd, 7), usage_acc.call_count, log_id),
)
await db.commit()
logger.info(
f"Token: {usage_acc.input_tokens} in / {usage_acc.output_tokens} out / "
f"${usage_acc.total_cost_usd:.4f} ({usage_acc.call_count} Calls)"
)
# Credits-Tracking: Monatliche Aggregation + Credits abziehen
if tenant_id and usage_acc.total_cost_usd > 0:
from services.license_service import charge_usage_to_tenant
await charge_usage_to_tenant(db, tenant_id, usage_acc, source="monitor")
await db.commit()
# Quellen-Discovery im Background starten
if unique_results:
asyncio.create_task(_background_discover_sources(unique_results))
if not _suppress_complete and self._ws_manager:
await self._ws_manager.broadcast_for_incident({
"type": "refresh_complete",
"incident_id": incident_id,
"data": {"new_articles": new_count, "status": "idle"},
}, visibility, created_by, tenant_id)
# updated_at IMMER aktualisieren wenn Refresh lief (auch bei fehlgeschlagener Analyse)
await db.execute(
"UPDATE incidents SET updated_at = ? WHERE id = ?",
(now, incident_id),
)
await db.commit()
logger.info(f"Refresh für Lage {incident_id} abgeschlossen: {new_count} neue Artikel")
# Multi-Pass-Diagnose: Pass-Ergebnis zurueck an Multi-Pass-Caller geben
if _pass_info is not None:
_pass_info["new_count"] = new_count
_pass_info["parse_failed"] = search_parse_failed
# Executive Summary im Hintergrund vorab generieren (fuer schnelleren Export)
if new_count > 0:
async def _pregenerate_exec_summary():
try:
from report_generator import generate_executive_summary
from database import get_db
_db = await get_db()
try:
cursor = await _db.execute(
"SELECT summary, executive_summary FROM incidents WHERE id = ?",
(incident_id,),
)
_row = await cursor.fetchone()
if _row and _row["summary"] and not _row["executive_summary"]:
es = await generate_executive_summary(_row["summary"])
await _db.execute(
"UPDATE incidents SET executive_summary = ? WHERE id = ?",
(es, incident_id),
)
await _db.commit()
logger.info(f"Executive Summary fuer Lage {incident_id} vorberechnet")
finally:
await _db.close()
except Exception as e:
logger.warning(f"Executive Summary Vorberechnung fehlgeschlagen: {e}")
asyncio.create_task(_pregenerate_exec_summary())
finally:
await db.close()
async def _get_incident_info(self, incident_id: int) -> tuple[str, bool]:
"""Incident-Typ und Summary-Status laden."""
from database import get_db
db = await get_db()
try:
cursor = await db.execute(
"SELECT type, summary FROM incidents WHERE id = ?", (incident_id,)
)
row = await cursor.fetchone()
if not row:
return "adhoc", False
return row["type"] or "adhoc", bool(row["summary"])
finally:
await db.close()
async def _run_research_multi_pass(self, incident_id: int, trigger_type: str, user_id: int = None):
"""Führt automatisch 3 Recherche-Durchläufe für Research-Lagen durch.
Durchlauf 1: Breite Erfassung (initiale 4-Phasen-Recherche)
Durchlauf 2: Vertiefung (andere Quellen, inkrementelle Analyse)
Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade)
"""
total = RESEARCH_MULTI_PASS_COUNT
pass_results = []
for pass_nr in range(1, total + 1):
# Cancel zwischen Durchläufen prüfen
self._check_cancelled(incident_id)
is_last = (pass_nr == total)
pass_info = {
"nr": pass_nr,
"total": total,
"label": RESEARCH_PASS_LABELS.get(pass_nr, f"Durchlauf {pass_nr}"),
}
logger.info(
f"Research Multi-Pass {pass_nr}/{total} für Lage {incident_id}: "
f"{pass_info['label']}"
)
try:
await self._run_refresh(
incident_id,
trigger_type=trigger_type,
retry_count=0,
user_id=user_id,
_suppress_complete=not is_last,
_pass_info=pass_info,
)
except asyncio.CancelledError:
logger.info(
f"Research Multi-Pass abgebrochen in Durchlauf {pass_nr}/{total} "
f"für Lage {incident_id}"
)
raise
except Exception as e:
logger.error(
f"Research Multi-Pass {pass_nr}/{total} fehlgeschlagen "
f"für Lage {incident_id}: {e}"
)
if is_last:
raise
# Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben
finally:
pass_results.append(pass_info)
logger.info(
f"Research Multi-Pass abgeschlossen für Lage {incident_id}: "
f"{total} Durchläufe"
)
# Diagnose: Wenn ALLE Passes 0 neue Artikel hatten UND mindestens einer
# an einem Parser-Fehler scheiterte, ist die Recherche faktisch fehlgeschlagen —
# Claude lieferte zwar Antworten, aber kein verwertbares JSON. Sonst bliebe
# die Lage ohne sichtbare Fehlermeldung leer (siehe staging Lage "Friedrich Merz").
total_new = sum(p.get("new_count", 0) for p in pass_results)
any_parse_failed = any(p.get("parse_failed") for p in pass_results)
if total_new == 0 and any_parse_failed:
raise RuntimeError(
"Recherche fehlgeschlagen: Claude lieferte keine verwertbaren Quellen "
"(JSON-Parsing schlug bei mindestens einem Durchlauf fehl). "
"Bitte Logs prüfen und Refresh erneut starten."
)
# Singleton-Instanz
orchestrator = AgentOrchestrator()