Dateien
AegisSight-Monitor/src/services/pipeline_tracker.py
Claude Code f4c0c930b8 fix(orchestrator): aktive Pipeline-Schritte beim Cancel mitschliessen
Beim User-Cancel wurde nur refresh_log auf cancelled gesetzt, der zuletzt
aktive refresh_pipeline_steps-Eintrag blieb verwaist. Der
/api/incidents/<id>/pipeline-Endpoint liefert daraus dauerhaft
"Schritt X laeuft" an die UI, auch lange nach dem Cancel.

- pipeline_tracker.cancel_active_steps(): neuer Bulk-Helper, setzt alle
  noch active-Schritte eines refresh_log_id auf cancelled mit completed_at
- _mark_refresh_cancelled holt die refresh_log_id, macht das refresh_log-
  Update wie bisher und ruft danach cancel_active_steps auf

Reproduziert bei Lage 80 (Bjoern Hoecke), refresh_log 1273. Frontend-
CSS kennt status-cancelled nicht, faellt auf den neutralen Default-Style
zurueck (kein Spinner mehr, kein Haken, korrekt ent-hangen).
2026-05-06 23:40:39 +00:00

253 Zeilen
9.7 KiB
Python

"""Analysepipeline-Tracking: persistiert Pipeline-Schritte pro Refresh und sendet
Live-Status an die Frontend-Visualisierung.
Die Pipeline hat 9 Schritte und ist eine bewusst vereinfachte Außensicht der
internen Refresh-Pipeline (siehe orchestrator.py). Sie verschweigt Internas
(Modellnamen, Tools, Phasen, Multi-Pass-Labels) und beschreibt jeden Schritt in
verständlicher Sprache.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Optional
from config import TIMEZONE
logger = logging.getLogger("osint.pipeline")
# Single Source of Truth für die Pipeline-Definition.
# Reihenfolge bestimmt die Anzeige im Frontend.
PIPELINE_STEPS = [
{
"key": "sources_review",
"label": "Quellen sichten",
"icon": "search",
"tooltip": "Wir prüfen alle deine Nachrichtenquellen, ob sie aktuell erreichbar sind und was sie zu deiner Lage melden.",
},
{
"key": "collect",
"label": "Nachrichten sammeln",
"icon": "rss",
"tooltip": "Aus den passenden Quellen werden alle relevanten Meldungen eingesammelt - aus deinen RSS-Feeds, dem Web und optional Telegram-Kanälen.",
},
{
"key": "dedup",
"label": "Doppeltes filtern",
"icon": "copy-x",
"tooltip": "Mehrfach gemeldete Nachrichten werden zusammengefasst, damit nichts doppelt im Lagebild auftaucht.",
},
{
"key": "relevance",
"label": "Relevanz bewerten",
"icon": "scale",
"tooltip": "Jede Meldung wird darauf geprüft, ob sie wirklich zu deiner Lage passt. Themenfremdes wird aussortiert.",
},
{
"key": "geoparsing",
"label": "Orte erkennen",
"icon": "map-pin",
"tooltip": "Aus den Meldungen werden Ortsangaben erkannt und auf der Karte verortet.",
},
{
"key": "factcheck",
"label": "Fakten prüfen",
"icon": "shield",
"tooltip": "Behauptungen aus den Meldungen werden gegeneinander abgeglichen: Bestätigt? Umstritten? Noch unklar?",
},
{
"key": "summary",
"label": "Lagebild verfassen",
"icon": "file-text",
"tooltip": "Aus allen geprüften Meldungen wird ein zusammenhängendes Lagebild geschrieben, mit Quellenangaben am Text.",
},
{
"key": "qc",
"label": "Qualitätscheck",
"icon": "check-circle",
"tooltip": "Eine letzte Kontrollprüfung am Ergebnis: Doppelte Fakten zusammenführen, Karten-Verortung prüfen, bevor du benachrichtigt wirst.",
},
{
"key": "notify",
"label": "Benachrichtigen",
"icon": "bell",
"tooltip": "Wenn etwas Wichtiges dabei war, gehen Benachrichtigungen raus, im Glockensymbol oben rechts und optional per E-Mail.",
},
]
VALID_KEYS = {s["key"] for s in PIPELINE_STEPS}
def _now_db() -> str:
"""Aktuelle Zeit im DB-Format (lokal)."""
return datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
async def _broadcast(ws_manager, incident_id: int, payload: dict,
visibility: str, created_by: Optional[int], tenant_id: Optional[int]):
"""Sendet ein pipeline_step-Event an verbundene Clients der Lage."""
if not ws_manager:
return
try:
await ws_manager.broadcast_for_incident(
{"type": "pipeline_step", "incident_id": incident_id, "data": payload},
visibility, created_by, tenant_id,
)
except Exception as e:
logger.warning(f"Pipeline-WS-Broadcast fehlgeschlagen: {e}")
async def start_step(db, ws_manager, *, refresh_log_id: int, incident_id: int,
step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None,
visibility: str = "public", created_by: Optional[int] = None) -> Optional[int]:
"""Markiert einen Pipeline-Schritt als aktiv.
Returns die DB-ID der Step-Zeile (für späteres Update via complete_step), oder None bei Fehler.
"""
if step_key not in VALID_KEYS:
logger.warning(f"Unbekannter Pipeline-Schritt: {step_key}")
return None
try:
cursor = await db.execute(
"""INSERT INTO refresh_pipeline_steps
(refresh_log_id, incident_id, step_key, pass_number, started_at, status, tenant_id)
VALUES (?, ?, ?, ?, ?, 'active', ?)""",
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), tenant_id),
)
await db.commit()
step_id = cursor.lastrowid
except Exception as e:
logger.warning(f"Pipeline start_step({step_key}) DB-Fehler: {e}")
step_id = None
await _broadcast(ws_manager, incident_id, {
"step_key": step_key,
"status": "active",
"pass_number": pass_number,
}, visibility, created_by, tenant_id)
return step_id
async def complete_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int,
incident_id: int, step_key: str, pass_number: int = 1,
count_value: Optional[int] = None, count_secondary: Optional[int] = None,
tenant_id: Optional[int] = None, visibility: str = "public",
created_by: Optional[int] = None):
"""Markiert einen Pipeline-Schritt als abgeschlossen, mit Zahlen."""
if step_key not in VALID_KEYS:
return
try:
if step_id:
await db.execute(
"""UPDATE refresh_pipeline_steps
SET status = 'done', completed_at = ?, count_value = ?, count_secondary = ?
WHERE id = ?""",
(_now_db(), count_value, count_secondary, step_id),
)
else:
# Fallback wenn start_step keine ID lieferte
await db.execute(
"""INSERT INTO refresh_pipeline_steps
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
status, count_value, count_secondary, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, 'done', ?, ?, ?)""",
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(),
count_value, count_secondary, tenant_id),
)
await db.commit()
except Exception as e:
logger.warning(f"Pipeline complete_step({step_key}) DB-Fehler: {e}")
await _broadcast(ws_manager, incident_id, {
"step_key": step_key,
"status": "done",
"pass_number": pass_number,
"count_value": count_value,
"count_secondary": count_secondary,
}, visibility, created_by, tenant_id)
async def skip_step(db, ws_manager, *, refresh_log_id: int, incident_id: int,
step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None,
visibility: str = "public", created_by: Optional[int] = None):
"""Markiert einen Schritt als übersprungen (z.B. Geoparsing ohne neue Artikel)."""
if step_key not in VALID_KEYS:
return
try:
await db.execute(
"""INSERT INTO refresh_pipeline_steps
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
status, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, 'skipped', ?)""",
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id),
)
await db.commit()
except Exception as e:
logger.warning(f"Pipeline skip_step({step_key}) DB-Fehler: {e}")
await _broadcast(ws_manager, incident_id, {
"step_key": step_key,
"status": "skipped",
"pass_number": pass_number,
}, visibility, created_by, tenant_id)
async def error_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int,
incident_id: int, step_key: str, pass_number: int = 1,
tenant_id: Optional[int] = None, visibility: str = "public",
created_by: Optional[int] = None):
"""Markiert einen Schritt als fehlgeschlagen."""
if step_key not in VALID_KEYS:
return
try:
if step_id:
await db.execute(
"""UPDATE refresh_pipeline_steps
SET status = 'error', completed_at = ?
WHERE id = ?""",
(_now_db(), step_id),
)
else:
await db.execute(
"""INSERT INTO refresh_pipeline_steps
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
status, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, 'error', ?)""",
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id),
)
await db.commit()
except Exception as e:
logger.warning(f"Pipeline error_step({step_key}) DB-Fehler: {e}")
await _broadcast(ws_manager, incident_id, {
"step_key": step_key,
"status": "error",
"pass_number": pass_number,
}, visibility, created_by, tenant_id)
async def cancel_active_steps(db, *, refresh_log_id: int) -> int:
"""Schliesst alle noch aktiven Pipeline-Schritte eines Refreshs als 'cancelled' ab.
Wird vom Orchestrator nach einem User-Cancel aufgerufen. Ohne diesen Schritt
bleibt der zuletzt aktive Step-Eintrag verwaist und der Pipeline-Endpoint
liefert dauerhaft 'Schritt X laeuft' an die UI.
"""
try:
cur = await db.execute(
"""UPDATE refresh_pipeline_steps
SET status = 'cancelled', completed_at = ?
WHERE refresh_log_id = ? AND status = 'active'""",
(_now_db(), refresh_log_id),
)
await db.commit()
return cur.rowcount or 0
except Exception as e:
logger.warning(f"Pipeline cancel_active_steps DB-Fehler: {e}")
return 0