"""Analysepipeline-Tracking: persistiert Pipeline-Schritte pro Refresh und sendet Live-Status an die Frontend-Visualisierung. Die Pipeline hat 9 Schritte und ist eine bewusst vereinfachte Außensicht der internen Refresh-Pipeline (siehe orchestrator.py). Sie verschweigt Internas (Modellnamen, Tools, Phasen, Multi-Pass-Labels) und beschreibt jeden Schritt in verständlicher Sprache. """ from __future__ import annotations import logging from datetime import datetime from typing import Optional from config import TIMEZONE logger = logging.getLogger("osint.pipeline") # Single Source of Truth für die Pipeline-Definition. # Reihenfolge bestimmt die Anzeige im Frontend. _PIPELINE_STEPS_DE = [ {"key": "sources_review", "label": "Quellen sichten", "icon": "search", "tooltip": "Wir prüfen alle deine Nachrichtenquellen, ob sie aktuell erreichbar sind und was sie zu deiner Lage melden."}, {"key": "collect", "label": "Nachrichten sammeln", "icon": "rss", "tooltip": "Aus den passenden Quellen werden alle relevanten Meldungen eingesammelt - aus deinen RSS-Feeds, dem Web und optional Telegram-Kanälen."}, {"key": "dedup", "label": "Doppeltes filtern", "icon": "copy-x", "tooltip": "Mehrfach gemeldete Nachrichten werden zusammengefasst, damit nichts doppelt im Lagebild auftaucht."}, {"key": "relevance", "label": "Relevanz bewerten", "icon": "scale", "tooltip": "Jede Meldung wird darauf geprüft, ob sie wirklich zu deiner Lage passt. Themenfremdes wird aussortiert."}, {"key": "geoparsing", "label": "Orte erkennen", "icon": "map-pin", "tooltip": "Aus den Meldungen werden Ortsangaben erkannt und auf der Karte verortet."}, {"key": "factcheck", "label": "Fakten prüfen", "icon": "shield", "tooltip": "Behauptungen aus den Meldungen werden gegeneinander abgeglichen: Bestätigt? Umstritten? Noch unklar?"}, {"key": "summary", "label": "Lagebild verfassen", "icon": "file-text", "tooltip": "Aus allen geprüften Meldungen wird ein zusammenhängendes Lagebild geschrieben, mit Quellenangaben am Text."}, {"key": "qc", "label": "Qualitätscheck", "icon": "check-circle", "tooltip": "Eine letzte Kontrollprüfung am Ergebnis: Doppelte Fakten zusammenführen, Karten-Verortung prüfen, bevor du benachrichtigt wirst."}, {"key": "notify", "label": "Benachrichtigen", "icon": "bell", "tooltip": "Wenn etwas Wichtiges dabei war, gehen Benachrichtigungen raus, im Glockensymbol oben rechts und optional per E-Mail."}, ] _PIPELINE_STEPS_EN = [ {"key": "sources_review", "label": "Reviewing sources", "icon": "search", "tooltip": "We check all your news sources for availability and what they report on your situation."}, {"key": "collect", "label": "Collecting articles", "icon": "rss", "tooltip": "All relevant articles are pulled from matching sources - your RSS feeds, the open web, and optionally Telegram channels."}, {"key": "dedup", "label": "Filtering duplicates", "icon": "copy-x", "tooltip": "Articles reported by multiple sources are consolidated so nothing appears twice in the briefing."}, {"key": "relevance", "label": "Scoring relevance", "icon": "scale", "tooltip": "Each article is checked for fit with your situation. Off-topic items are dropped."}, {"key": "geoparsing", "label": "Detecting locations", "icon": "map-pin", "tooltip": "Locations are extracted from the articles and placed on the map."}, {"key": "factcheck", "label": "Checking facts", "icon": "shield", "tooltip": "Claims from the articles are cross-checked: Confirmed? Disputed? Still unclear?"}, {"key": "summary", "label": "Writing the briefing", "icon": "file-text", "tooltip": "All verified articles are combined into a coherent briefing with inline citations."}, {"key": "qc", "label": "Quality check", "icon": "check-circle", "tooltip": "A final review: consolidate duplicate facts, verify map locations, before you get notified."}, {"key": "notify", "label": "Notifying", "icon": "bell", "tooltip": "If something important emerged, notifications go out - to the bell icon and optionally by email."}, ] def get_pipeline_steps(lang_iso: str = "de") -> list[dict]: """Liefert die Pipeline-Definition in der gewuenschten Sprache.""" return _PIPELINE_STEPS_EN if lang_iso == "en" else _PIPELINE_STEPS_DE # Backward-compat (Default DE) PIPELINE_STEPS = _PIPELINE_STEPS_DE VALID_KEYS = {s["key"] for s in _PIPELINE_STEPS_DE} def _now_db() -> str: """Aktuelle Zeit im DB-Format (lokal).""" return datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") async def _broadcast(ws_manager, incident_id: int, payload: dict, visibility: str, created_by: Optional[int], tenant_id: Optional[int]): """Sendet ein pipeline_step-Event an verbundene Clients der Lage.""" if not ws_manager: return try: await ws_manager.broadcast_for_incident( {"type": "pipeline_step", "incident_id": incident_id, "data": payload}, visibility, created_by, tenant_id, ) except Exception as e: logger.warning(f"Pipeline-WS-Broadcast fehlgeschlagen: {e}") async def start_step(db, ws_manager, *, refresh_log_id: int, incident_id: int, step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None, visibility: str = "public", created_by: Optional[int] = None) -> Optional[int]: """Markiert einen Pipeline-Schritt als aktiv. Returns die DB-ID der Step-Zeile (für späteres Update via complete_step), oder None bei Fehler. """ if step_key not in VALID_KEYS: logger.warning(f"Unbekannter Pipeline-Schritt: {step_key}") return None try: cursor = await db.execute( """INSERT INTO refresh_pipeline_steps (refresh_log_id, incident_id, step_key, pass_number, started_at, status, tenant_id) VALUES (?, ?, ?, ?, ?, 'active', ?)""", (refresh_log_id, incident_id, step_key, pass_number, _now_db(), tenant_id), ) await db.commit() step_id = cursor.lastrowid except Exception as e: logger.warning(f"Pipeline start_step({step_key}) DB-Fehler: {e}") step_id = None await _broadcast(ws_manager, incident_id, { "step_key": step_key, "status": "active", "pass_number": pass_number, }, visibility, created_by, tenant_id) return step_id async def complete_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int, incident_id: int, step_key: str, pass_number: int = 1, count_value: Optional[int] = None, count_secondary: Optional[int] = None, tenant_id: Optional[int] = None, visibility: str = "public", created_by: Optional[int] = None): """Markiert einen Pipeline-Schritt als abgeschlossen, mit Zahlen.""" if step_key not in VALID_KEYS: return try: if step_id: await db.execute( """UPDATE refresh_pipeline_steps SET status = 'done', completed_at = ?, count_value = ?, count_secondary = ? WHERE id = ?""", (_now_db(), count_value, count_secondary, step_id), ) else: # Fallback wenn start_step keine ID lieferte await db.execute( """INSERT INTO refresh_pipeline_steps (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, status, count_value, count_secondary, tenant_id) VALUES (?, ?, ?, ?, ?, ?, 'done', ?, ?, ?)""", (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), count_value, count_secondary, tenant_id), ) await db.commit() except Exception as e: logger.warning(f"Pipeline complete_step({step_key}) DB-Fehler: {e}") await _broadcast(ws_manager, incident_id, { "step_key": step_key, "status": "done", "pass_number": pass_number, "count_value": count_value, "count_secondary": count_secondary, }, visibility, created_by, tenant_id) async def skip_step(db, ws_manager, *, refresh_log_id: int, incident_id: int, step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None, visibility: str = "public", created_by: Optional[int] = None): """Markiert einen Schritt als übersprungen (z.B. Geoparsing ohne neue Artikel).""" if step_key not in VALID_KEYS: return try: await db.execute( """INSERT INTO refresh_pipeline_steps (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, status, tenant_id) VALUES (?, ?, ?, ?, ?, ?, 'skipped', ?)""", (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id), ) await db.commit() except Exception as e: logger.warning(f"Pipeline skip_step({step_key}) DB-Fehler: {e}") await _broadcast(ws_manager, incident_id, { "step_key": step_key, "status": "skipped", "pass_number": pass_number, }, visibility, created_by, tenant_id) async def error_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int, incident_id: int, step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None, visibility: str = "public", created_by: Optional[int] = None): """Markiert einen Schritt als fehlgeschlagen.""" if step_key not in VALID_KEYS: return try: if step_id: await db.execute( """UPDATE refresh_pipeline_steps SET status = 'error', completed_at = ? WHERE id = ?""", (_now_db(), step_id), ) else: await db.execute( """INSERT INTO refresh_pipeline_steps (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, status, tenant_id) VALUES (?, ?, ?, ?, ?, ?, 'error', ?)""", (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id), ) await db.commit() except Exception as e: logger.warning(f"Pipeline error_step({step_key}) DB-Fehler: {e}") await _broadcast(ws_manager, incident_id, { "step_key": step_key, "status": "error", "pass_number": pass_number, }, visibility, created_by, tenant_id) async def cancel_active_steps(db, *, refresh_log_id: int) -> int: """Schliesst alle noch aktiven Pipeline-Schritte eines Refreshs als 'cancelled' ab. Wird vom Orchestrator nach einem User-Cancel aufgerufen. Ohne diesen Schritt bleibt der zuletzt aktive Step-Eintrag verwaist und der Pipeline-Endpoint liefert dauerhaft 'Schritt X laeuft' an die UI. """ try: cur = await db.execute( """UPDATE refresh_pipeline_steps SET status = 'cancelled', completed_at = ? WHERE refresh_log_id = ? AND status = 'active'""", (_now_db(), refresh_log_id), ) await db.commit() return cur.rowcount or 0 except Exception as e: logger.warning(f"Pipeline cancel_active_steps DB-Fehler: {e}") return 0