Analysepipeline: Visualisierung der Refresh-Schritte
Neuer Tab "Analysepipeline" zwischen Faktencheck und Quellenuebersicht.
Zeigt 9 Verarbeitungsschritte als n8n-artige Blockkette: Quellen sichten,
Nachrichten sammeln, Doppeltes filtern, Relevanz bewerten, Orte erkennen,
Lagebild verfassen, Fakten pruefen, Qualitaetscheck, Benachrichtigen.
- Backend: refresh_pipeline_steps-Tabelle persistiert pro Refresh+Pass die
Status- und Zahlen-Werte. pipeline_tracker.py kapselt Start/Done/Skip/Error
inkl. WebSocket-Broadcast (Event-Typ pipeline_step). 9 Hooks im Orchestrator
speisen die Anzeige.
- API: GET /api/incidents/{id}/pipeline liefert Definition + letzten Stand
(Zahlen aus letztem Refresh, Multi-Pass-Konsolidierung).
- Frontend: pipeline.js rendert Vollbild-Blockkette mit pulsierendem Glow am
aktiven Block, animierten Pfeilen bei Datenfluss, Haekchen am fertigen Block.
Hover-Tooltip mit Erklaerung in Nutzersprache, Klick oeffnet Detail-Popup.
Bei Research-Lagen leuchtet ein Schleifen-Pfeil pro Mehrfach-Durchlauf auf.
Mini-Variante (nur Icons) im Refresh-Progress-Popup.
- CSS: Light/Dark-Theme-fest, dezenter Circuit-Hintergrund (5% Opacity),
Mobile-vertikale Stapelung unter 900px, prefers-reduced-motion respektiert.
- Uebersprungene Schritte (z.B. Geoparsing ohne neue Artikel) werden
ausgeblendet, brandneue Lagen ohne Refresh zeigen Hinweis.
Tooltips bewusst in normaler Sprache ohne Internas (keine Modellnamen,
keine Toolnamen, keine Phasen-Labels).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
230
src/services/pipeline_tracker.py
Normale Datei
230
src/services/pipeline_tracker.py
Normale Datei
@@ -0,0 +1,230 @@
|
||||
"""Analysepipeline-Tracking: persistiert Pipeline-Schritte pro Refresh und sendet
|
||||
Live-Status an die Frontend-Visualisierung.
|
||||
|
||||
Die Pipeline hat 9 Schritte und ist eine bewusst vereinfachte Außensicht der
|
||||
internen Refresh-Pipeline (siehe orchestrator.py). Sie verschweigt Internas
|
||||
(Modellnamen, Tools, Phasen, Multi-Pass-Labels) und beschreibt jeden Schritt in
|
||||
verständlicher Sprache.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from config import TIMEZONE
|
||||
|
||||
logger = logging.getLogger("osint.pipeline")
|
||||
|
||||
|
||||
# Single Source of Truth für die Pipeline-Definition.
|
||||
# Reihenfolge bestimmt die Anzeige im Frontend.
|
||||
PIPELINE_STEPS = [
|
||||
{
|
||||
"key": "sources_review",
|
||||
"label": "Quellen sichten",
|
||||
"icon": "search",
|
||||
"tooltip": "Wir prüfen alle deine Nachrichtenquellen, ob sie aktuell erreichbar sind und was sie zu deiner Lage melden.",
|
||||
},
|
||||
{
|
||||
"key": "collect",
|
||||
"label": "Nachrichten sammeln",
|
||||
"icon": "rss",
|
||||
"tooltip": "Aus den passenden Quellen werden alle relevanten Meldungen eingesammelt — aus deinen RSS-Feeds, dem Web und optional Telegram-Kanälen.",
|
||||
},
|
||||
{
|
||||
"key": "dedup",
|
||||
"label": "Doppeltes filtern",
|
||||
"icon": "copy-x",
|
||||
"tooltip": "Mehrfach gemeldete Nachrichten werden zusammengefasst, damit nichts doppelt im Lagebild auftaucht.",
|
||||
},
|
||||
{
|
||||
"key": "relevance",
|
||||
"label": "Relevanz bewerten",
|
||||
"icon": "scale",
|
||||
"tooltip": "Jede Meldung wird darauf geprüft, ob sie wirklich zu deiner Lage passt. Themenfremdes wird aussortiert.",
|
||||
},
|
||||
{
|
||||
"key": "geoparsing",
|
||||
"label": "Orte erkennen",
|
||||
"icon": "map-pin",
|
||||
"tooltip": "Aus den Meldungen werden Ortsangaben erkannt und auf der Karte verortet.",
|
||||
},
|
||||
{
|
||||
"key": "summary",
|
||||
"label": "Lagebild verfassen",
|
||||
"icon": "file-text",
|
||||
"tooltip": "Aus allen geprüften Meldungen wird ein zusammenhängendes Lagebild geschrieben — mit Quellenangaben am Text.",
|
||||
},
|
||||
{
|
||||
"key": "factcheck",
|
||||
"label": "Fakten prüfen",
|
||||
"icon": "shield",
|
||||
"tooltip": "Behauptungen aus den Meldungen werden gegeneinander abgeglichen: Bestätigt? Umstritten? Noch unklar?",
|
||||
},
|
||||
{
|
||||
"key": "qc",
|
||||
"label": "Qualitätscheck",
|
||||
"icon": "check-circle",
|
||||
"tooltip": "Eine letzte Kontrollprüfung am Ergebnis: Doppelte Fakten zusammenführen, Karten-Verortung prüfen, bevor du benachrichtigt wirst.",
|
||||
},
|
||||
{
|
||||
"key": "notify",
|
||||
"label": "Benachrichtigen",
|
||||
"icon": "bell",
|
||||
"tooltip": "Wenn etwas Wichtiges dabei war, gehen Benachrichtigungen raus — im Glockensymbol oben rechts und optional per E-Mail.",
|
||||
},
|
||||
]
|
||||
|
||||
VALID_KEYS = {s["key"] for s in PIPELINE_STEPS}
|
||||
|
||||
|
||||
def _now_db() -> str:
|
||||
"""Aktuelle Zeit im DB-Format (lokal)."""
|
||||
return datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
async def _broadcast(ws_manager, incident_id: int, payload: dict,
|
||||
visibility: str, created_by: Optional[int], tenant_id: Optional[int]):
|
||||
"""Sendet ein pipeline_step-Event an verbundene Clients der Lage."""
|
||||
if not ws_manager:
|
||||
return
|
||||
try:
|
||||
await ws_manager.broadcast_for_incident(
|
||||
{"type": "pipeline_step", "incident_id": incident_id, "data": payload},
|
||||
visibility, created_by, tenant_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Pipeline-WS-Broadcast fehlgeschlagen: {e}")
|
||||
|
||||
|
||||
async def start_step(db, ws_manager, *, refresh_log_id: int, incident_id: int,
|
||||
step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None,
|
||||
visibility: str = "public", created_by: Optional[int] = None) -> Optional[int]:
|
||||
"""Markiert einen Pipeline-Schritt als aktiv.
|
||||
|
||||
Returns die DB-ID der Step-Zeile (für späteres Update via complete_step), oder None bei Fehler.
|
||||
"""
|
||||
if step_key not in VALID_KEYS:
|
||||
logger.warning(f"Unbekannter Pipeline-Schritt: {step_key}")
|
||||
return None
|
||||
|
||||
try:
|
||||
cursor = await db.execute(
|
||||
"""INSERT INTO refresh_pipeline_steps
|
||||
(refresh_log_id, incident_id, step_key, pass_number, started_at, status, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, 'active', ?)""",
|
||||
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), tenant_id),
|
||||
)
|
||||
await db.commit()
|
||||
step_id = cursor.lastrowid
|
||||
except Exception as e:
|
||||
logger.warning(f"Pipeline start_step({step_key}) DB-Fehler: {e}")
|
||||
step_id = None
|
||||
|
||||
await _broadcast(ws_manager, incident_id, {
|
||||
"step_key": step_key,
|
||||
"status": "active",
|
||||
"pass_number": pass_number,
|
||||
}, visibility, created_by, tenant_id)
|
||||
|
||||
return step_id
|
||||
|
||||
|
||||
async def complete_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int,
|
||||
incident_id: int, step_key: str, pass_number: int = 1,
|
||||
count_value: Optional[int] = None, count_secondary: Optional[int] = None,
|
||||
tenant_id: Optional[int] = None, visibility: str = "public",
|
||||
created_by: Optional[int] = None):
|
||||
"""Markiert einen Pipeline-Schritt als abgeschlossen, mit Zahlen."""
|
||||
if step_key not in VALID_KEYS:
|
||||
return
|
||||
|
||||
try:
|
||||
if step_id:
|
||||
await db.execute(
|
||||
"""UPDATE refresh_pipeline_steps
|
||||
SET status = 'done', completed_at = ?, count_value = ?, count_secondary = ?
|
||||
WHERE id = ?""",
|
||||
(_now_db(), count_value, count_secondary, step_id),
|
||||
)
|
||||
else:
|
||||
# Fallback wenn start_step keine ID lieferte
|
||||
await db.execute(
|
||||
"""INSERT INTO refresh_pipeline_steps
|
||||
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
|
||||
status, count_value, count_secondary, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, 'done', ?, ?, ?)""",
|
||||
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(),
|
||||
count_value, count_secondary, tenant_id),
|
||||
)
|
||||
await db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"Pipeline complete_step({step_key}) DB-Fehler: {e}")
|
||||
|
||||
await _broadcast(ws_manager, incident_id, {
|
||||
"step_key": step_key,
|
||||
"status": "done",
|
||||
"pass_number": pass_number,
|
||||
"count_value": count_value,
|
||||
"count_secondary": count_secondary,
|
||||
}, visibility, created_by, tenant_id)
|
||||
|
||||
|
||||
async def skip_step(db, ws_manager, *, refresh_log_id: int, incident_id: int,
|
||||
step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None,
|
||||
visibility: str = "public", created_by: Optional[int] = None):
|
||||
"""Markiert einen Schritt als übersprungen (z.B. Geoparsing ohne neue Artikel)."""
|
||||
if step_key not in VALID_KEYS:
|
||||
return
|
||||
try:
|
||||
await db.execute(
|
||||
"""INSERT INTO refresh_pipeline_steps
|
||||
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
|
||||
status, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, 'skipped', ?)""",
|
||||
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id),
|
||||
)
|
||||
await db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"Pipeline skip_step({step_key}) DB-Fehler: {e}")
|
||||
|
||||
await _broadcast(ws_manager, incident_id, {
|
||||
"step_key": step_key,
|
||||
"status": "skipped",
|
||||
"pass_number": pass_number,
|
||||
}, visibility, created_by, tenant_id)
|
||||
|
||||
|
||||
async def error_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int,
|
||||
incident_id: int, step_key: str, pass_number: int = 1,
|
||||
tenant_id: Optional[int] = None, visibility: str = "public",
|
||||
created_by: Optional[int] = None):
|
||||
"""Markiert einen Schritt als fehlgeschlagen."""
|
||||
if step_key not in VALID_KEYS:
|
||||
return
|
||||
try:
|
||||
if step_id:
|
||||
await db.execute(
|
||||
"""UPDATE refresh_pipeline_steps
|
||||
SET status = 'error', completed_at = ?
|
||||
WHERE id = ?""",
|
||||
(_now_db(), step_id),
|
||||
)
|
||||
else:
|
||||
await db.execute(
|
||||
"""INSERT INTO refresh_pipeline_steps
|
||||
(refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at,
|
||||
status, tenant_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, 'error', ?)""",
|
||||
(refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id),
|
||||
)
|
||||
await db.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"Pipeline error_step({step_key}) DB-Fehler: {e}")
|
||||
|
||||
await _broadcast(ws_manager, incident_id, {
|
||||
"step_key": step_key,
|
||||
"status": "error",
|
||||
"pass_number": pass_number,
|
||||
}, visibility, created_by, tenant_id)
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren