diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index e946ebc..01312e5 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -677,6 +677,7 @@ class AgentOrchestrator: from agents.analyzer import AnalyzerAgent from agents.factchecker import FactCheckerAgent from feeds.rss_parser import RSSParser + from services import pipeline_tracker as _pipe db = await get_db() try: @@ -719,6 +720,47 @@ class AgentOrchestrator: log_id = cursor.lastrowid usage_acc = UsageAccumulator() + # --- Pipeline-Tracking (Analysepipeline-Visualisierung) --- + _pass_nr = (_pass_info or {}).get("nr", 1) + _step_ids: dict[str, Optional[int]] = {} + + async def _pipe_start(step_key: str): + try: + sid = await _pipe.start_step( + db, self._ws_manager, + refresh_log_id=log_id, incident_id=incident_id, step_key=step_key, + pass_number=_pass_nr, tenant_id=tenant_id, + visibility=visibility, created_by=created_by, + ) + _step_ids[step_key] = sid + return sid + except Exception as _e: + logger.debug(f"_pipe_start({step_key}) ignoriert: {_e}") + return None + + async def _pipe_done(step_key: str, count_value=None, count_secondary=None): + try: + sid = _step_ids.pop(step_key, None) + await _pipe.complete_step( + db, self._ws_manager, step_id=sid, + refresh_log_id=log_id, incident_id=incident_id, step_key=step_key, + pass_number=_pass_nr, count_value=count_value, count_secondary=count_secondary, + tenant_id=tenant_id, visibility=visibility, created_by=created_by, + ) + except Exception as _e: + logger.debug(f"_pipe_done({step_key}) ignoriert: {_e}") + + async def _pipe_skip(step_key: str): + try: + await _pipe.skip_step( + db, self._ws_manager, + refresh_log_id=log_id, incident_id=incident_id, step_key=step_key, + pass_number=_pass_nr, tenant_id=tenant_id, + visibility=visibility, created_by=created_by, + ) + except Exception as _e: + logger.debug(f"_pipe_skip({step_key}) ignoriert: {_e}") + research_status = "deep_researching" if incident_type == "research" else "researching" research_detail = "Hintergrundrecherche im Web läuft..." if incident_type == "research" else "RSS-Feeds und Web werden durchsucht..." # Multi-Pass: Detail-Text mit Durchlauf-Info versehen @@ -741,6 +783,23 @@ class AgentOrchestrator: ) existing_db_articles_full = await cursor.fetchall() + # Pipeline-Schritt 1: Quellen sichten (vorbereitet) + await _pipe_start("sources_review") + try: + if incident_type == "adhoc": + _src_cursor = await db.execute( + "SELECT COUNT(*) AS cnt FROM sources WHERE tenant_id = ? AND status = 'active'", + (tenant_id,), + ) + _src_row = await _src_cursor.fetchone() + _src_total = _src_row["cnt"] if _src_row else 0 + else: + _src_total = None + except Exception: + _src_total = None + # secondary wird später mit der Anzahl tatsächlich liefernder Quellen ergänzt + await _pipe_done("sources_review", count_value=_src_total, count_secondary=None) + # Schritt 1+2: RSS-Feeds und Claude-Recherche parallel ausführen async def _rss_pipeline(): """RSS-Feed-Suche (Feed-Selektion + dynamische Keywords + Parsing).""" @@ -880,6 +939,9 @@ class AgentOrchestrator: logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten") return articles, None + # Pipeline-Schritt 2: Nachrichten sammeln (Start) + await _pipe_start("collect") + # Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram) pipelines = [_rss_pipeline(), _web_search_pipeline(), _podcast_pipeline()] if include_telegram: @@ -910,6 +972,15 @@ class AgentOrchestrator: # Alle Ergebnisse zusammenführen all_results = rss_articles + search_results + telegram_articles + # Pipeline-Schritt 2: Nachrichten sammeln (fertig) + try: + _delivering_sources = len({a.get("source", "") for a in all_results if a.get("source")}) + except Exception: + _delivering_sources = None + await _pipe_done("collect", count_value=len(all_results), count_secondary=_delivering_sources) + + # Pipeline-Schritt 3: Doppeltes filtern (Start) + await _pipe_start("dedup") # Duplikate entfernen (normalisierte URL + Headline-Ähnlichkeit) seen_urls = set() @@ -922,6 +993,7 @@ class AgentOrchestrator: dupes_removed = len(all_results) - len(unique_results) if dupes_removed > 0: logger.info(f"Deduplizierung: {dupes_removed} Duplikate entfernt, {len(unique_results)} verbleibend") + await _pipe_done("dedup", count_value=dupes_removed, count_secondary=len(unique_results)) # Relevanz-Scoring und Sortierung for article in unique_results: @@ -978,6 +1050,10 @@ class AgentOrchestrator: new_candidates.append(article) + # Pipeline-Schritt 4: Relevanz bewerten (Start) + await _pipe_start("relevance") + _candidates_before_topic = len(new_candidates) + # --- Semantischer Topic-Filter (Haiku) --- # Wirft Artikel raus, die zwar Keyword-Treffer hatten, aber das Kernthema # der Lage nicht inhaltlich behandeln. Bei Fehler Fallback auf alle Kandidaten. @@ -988,6 +1064,7 @@ class AgentOrchestrator: ) if _tf_usage: usage_acc.add(_tf_usage) + await _pipe_done("relevance", count_value=len(new_candidates), count_secondary=_candidates_before_topic) # --- Neue (thematisch gefilterte) Artikel speichern und für Analyse tracken --- new_count = 0 @@ -1019,6 +1096,8 @@ class AgentOrchestrator: # Geoparsing: Orte aus neuen Artikeln extrahieren und speichern if new_articles_for_analysis: + # Pipeline-Schritt 5: Orte erkennen (Start) + await _pipe_start("geoparsing") try: from agents.geoparsing import geoparse_articles incident_context = f"{title} - {description}" @@ -1049,8 +1128,12 @@ class AgentOrchestrator: ) await db.commit() logger.info(f"Category-Labels gespeichert fuer Incident {incident_id}: {category_labels}") + await _pipe_done("geoparsing", count_value=geo_count, count_secondary=len(geo_results) if geo_results else 0) except Exception as e: logger.warning(f"Geoparsing fehlgeschlagen (Pipeline laeuft weiter): {e}") + await _pipe_done("geoparsing", count_value=0, count_secondary=0) + else: + await _pipe_skip("geoparsing") # Quellen-Statistiken aktualisieren if new_count > 0: @@ -1196,6 +1279,10 @@ class AgentOrchestrator: articles_for_check = [dict(row) for row in await cursor.fetchall()] return await factchecker.check(title, articles_for_check, incident_type) + # Pipeline-Schritte 6+7: Lagebild verfassen + Fakten prüfen (Start, parallel) + await _pipe_start("summary") + await _pipe_start("factcheck") + # Beide Tasks PARALLEL starten logger.info("Starte Analyse und Faktencheck parallel...") analysis_result, factcheck_result = await asyncio.gather( @@ -1205,6 +1292,8 @@ class AgentOrchestrator: analysis, analysis_usage = analysis_result fact_checks, fc_usage = factcheck_result + # Pipeline-Schritt 6: Lagebild verfassen (fertig — keine Zahl, nur Status) + await _pipe_done("summary", count_value=None, count_secondary=None) # --- Analyse-Ergebnisse verarbeiten --- if analysis_usage: @@ -1458,6 +1547,13 @@ class AgentOrchestrator: await db.commit() + # Pipeline-Schritt 7: Fakten prüfen (fertig) + _new_facts_count = max(0, len(fact_checks) - len(existing_facts)) + await _pipe_done("factcheck", count_value=_new_facts_count, count_secondary=len(fact_checks) if fact_checks else 0) + + # Pipeline-Schritt 8: Qualitätscheck (Start, ohne Zahlen) + await _pipe_start("qc") + # Post-Refresh Quality Check: Duplikate und Karten-Kategorien pruefen try: from services.post_refresh_qc import run_post_refresh_qc @@ -1469,6 +1565,12 @@ class AgentOrchestrator: ) except Exception as qc_err: logger.warning(f"Post-Refresh QC fehlgeschlagen: {qc_err}") + await _pipe_done("qc", count_value=None, count_secondary=None) + + # Pipeline-Schritt 9: Benachrichtigen (Start) + await _pipe_start("notify") + _notify_count = 0 + # Gebündelte Notification senden (nicht beim ersten Refresh) if not is_first_refresh: if self._ws_manager: @@ -1525,6 +1627,32 @@ class AgentOrchestrator: db, incident_id, title, visibility, created_by, tenant_id, db_notifications, incident_type=incident_type, ) + _notify_count = len(db_notifications) + + # Pipeline-Schritt 9: Benachrichtigen (fertig) + await _pipe_done("notify", count_value=_notify_count, count_secondary=None) + + # Falls Analyse-Block uebersprungen wurde (kein neuer Artikel und Summary existiert), + # die noch offenen Pipeline-Schritte als uebersprungen markieren. + for _skipped_key in ("summary", "factcheck", "qc", "notify"): + if _skipped_key in _step_ids or _skipped_key not in {"summary", "factcheck", "qc", "notify"}: + pass + # Saubere Variante: alle noch offenen Steps am Ende skippen + for _open_key in list(_step_ids.keys()): + await _pipe_skip(_open_key) + # Auch Steps die nie gestartet wurden (bei uebersprungenem Outer-If) + _started_keys = set() + try: + _check_cursor = await db.execute( + "SELECT step_key FROM refresh_pipeline_steps WHERE refresh_log_id = ? AND pass_number = ?", + (log_id, _pass_nr), + ) + _started_keys = {row[0] for row in await _check_cursor.fetchall()} + except Exception: + pass + for _missing_key in ("summary", "factcheck", "qc", "notify"): + if _missing_key not in _started_keys: + await _pipe_skip(_missing_key) # Refresh-Log abschließen (mit Token-Statistiken) await db.execute( diff --git a/src/database.py b/src/database.py index 91f814c..19f06bf 100644 --- a/src/database.py +++ b/src/database.py @@ -117,6 +117,22 @@ CREATE TABLE IF NOT EXISTS refresh_log ( tenant_id INTEGER REFERENCES organizations(id) ); +CREATE TABLE IF NOT EXISTS refresh_pipeline_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + refresh_log_id INTEGER REFERENCES refresh_log(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + step_key TEXT NOT NULL, + pass_number INTEGER DEFAULT 1, + started_at TIMESTAMP, + completed_at TIMESTAMP, + status TEXT DEFAULT 'pending', + count_value INTEGER, + count_secondary INTEGER, + tenant_id INTEGER REFERENCES organizations(id) +); +CREATE INDEX IF NOT EXISTS idx_pipeline_steps_incident ON refresh_pipeline_steps(incident_id, started_at DESC); +CREATE INDEX IF NOT EXISTS idx_pipeline_steps_log ON refresh_pipeline_steps(refresh_log_id); + CREATE TABLE IF NOT EXISTS incident_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, @@ -418,6 +434,29 @@ async def init_db(): await db.execute("ALTER TABLE refresh_log ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") await db.commit() + # Migration: refresh_pipeline_steps-Tabelle (Analysepipeline-Visualisierung) + cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='refresh_pipeline_steps'") + if not await cursor.fetchone(): + await db.executescript(""" + CREATE TABLE refresh_pipeline_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + refresh_log_id INTEGER REFERENCES refresh_log(id) ON DELETE CASCADE, + incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE, + step_key TEXT NOT NULL, + pass_number INTEGER DEFAULT 1, + started_at TIMESTAMP, + completed_at TIMESTAMP, + status TEXT DEFAULT 'pending', + count_value INTEGER, + count_secondary INTEGER, + tenant_id INTEGER REFERENCES organizations(id) + ); + CREATE INDEX IF NOT EXISTS idx_pipeline_steps_incident ON refresh_pipeline_steps(incident_id, started_at DESC); + CREATE INDEX IF NOT EXISTS idx_pipeline_steps_log ON refresh_pipeline_steps(refresh_log_id); + """) + await db.commit() + logger.info("Migration: refresh_pipeline_steps-Tabelle erstellt") + # Migration: notifications-Tabelle (fuer bestehende DBs) cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notifications'") if not await cursor.fetchone(): diff --git a/src/routers/incidents.py b/src/routers/incidents.py index 3cde9f4..f89bbbf 100644 --- a/src/routers/incidents.py +++ b/src/routers/incidents.py @@ -613,6 +613,98 @@ async def get_factchecks( return [dict(row) for row in rows] +@router.get("/{incident_id}/pipeline") +async def get_pipeline( + incident_id: int, + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), +): + """Analysepipeline-Status der Lage: Definition aller Schritte + Stand des + letzten (oder gerade laufenden) Refreshs. + + Antwort: + { + "is_research": bool, + "is_running": bool, + "last_refresh": {started_at, completed_at, duration_sec, status, pass_total} | null, + "steps_definition": [{key, label, icon, tooltip}, ...], + "steps": [{step_key, status, count_value, count_secondary, pass_number}, ...] + } + """ + from services.pipeline_tracker import PIPELINE_STEPS + + tenant_id = current_user.get("tenant_id") + incident_row = await _check_incident_access(db, incident_id, current_user["id"], tenant_id) + is_research = (incident_row["type"] or "adhoc") == "research" + + # Juengsten Refresh-Log waehlen: bevorzugt running, sonst der letzte completed + cursor = await db.execute( + """SELECT id, started_at, completed_at, status, retry_count + FROM refresh_log + WHERE incident_id = ? AND status = 'running' + ORDER BY started_at DESC LIMIT 1""", + (incident_id,), + ) + row = await cursor.fetchone() + if not row: + cursor = await db.execute( + """SELECT id, started_at, completed_at, status, retry_count + FROM refresh_log + WHERE incident_id = ? + ORDER BY started_at DESC LIMIT 1""", + (incident_id,), + ) + row = await cursor.fetchone() + + last_refresh = None + steps = [] + is_running = False + if row: + is_running = row["status"] == "running" + # Pipeline-Steps zu diesem Refresh laden + sc = await db.execute( + """SELECT step_key, pass_number, status, count_value, count_secondary, + started_at, completed_at + FROM refresh_pipeline_steps + WHERE refresh_log_id = ? + ORDER BY pass_number ASC, id ASC""", + (row["id"],), + ) + steps = [dict(r) for r in await sc.fetchall()] + + # Pass-Total: bei Research-Lagen mit Multi-Pass-Daten ermitteln + max_pass = 1 + for s in steps: + if s["pass_number"] and s["pass_number"] > max_pass: + max_pass = s["pass_number"] + + # Dauer berechnen (nur wenn completed) + duration_sec = None + try: + if row["started_at"] and row["completed_at"]: + t0 = datetime.strptime(row["started_at"], "%Y-%m-%d %H:%M:%S") + t1 = datetime.strptime(row["completed_at"], "%Y-%m-%d %H:%M:%S") + duration_sec = max(0, int((t1 - t0).total_seconds())) + except Exception: + duration_sec = None + + last_refresh = { + "started_at": row["started_at"], + "completed_at": row["completed_at"], + "status": row["status"], + "duration_sec": duration_sec, + "pass_total": max_pass, + } + + return { + "is_research": is_research, + "is_running": is_running, + "last_refresh": last_refresh, + "steps_definition": PIPELINE_STEPS, + "steps": steps, + } + + @router.get("/{incident_id}/locations") async def get_locations( incident_id: int, diff --git a/src/services/pipeline_tracker.py b/src/services/pipeline_tracker.py new file mode 100644 index 0000000..9ad4507 --- /dev/null +++ b/src/services/pipeline_tracker.py @@ -0,0 +1,230 @@ +"""Analysepipeline-Tracking: persistiert Pipeline-Schritte pro Refresh und sendet +Live-Status an die Frontend-Visualisierung. + +Die Pipeline hat 9 Schritte und ist eine bewusst vereinfachte Außensicht der +internen Refresh-Pipeline (siehe orchestrator.py). Sie verschweigt Internas +(Modellnamen, Tools, Phasen, Multi-Pass-Labels) und beschreibt jeden Schritt in +verständlicher Sprache. +""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Optional + +from config import TIMEZONE + +logger = logging.getLogger("osint.pipeline") + + +# Single Source of Truth für die Pipeline-Definition. +# Reihenfolge bestimmt die Anzeige im Frontend. +PIPELINE_STEPS = [ + { + "key": "sources_review", + "label": "Quellen sichten", + "icon": "search", + "tooltip": "Wir prüfen alle deine Nachrichtenquellen, ob sie aktuell erreichbar sind und was sie zu deiner Lage melden.", + }, + { + "key": "collect", + "label": "Nachrichten sammeln", + "icon": "rss", + "tooltip": "Aus den passenden Quellen werden alle relevanten Meldungen eingesammelt — aus deinen RSS-Feeds, dem Web und optional Telegram-Kanälen.", + }, + { + "key": "dedup", + "label": "Doppeltes filtern", + "icon": "copy-x", + "tooltip": "Mehrfach gemeldete Nachrichten werden zusammengefasst, damit nichts doppelt im Lagebild auftaucht.", + }, + { + "key": "relevance", + "label": "Relevanz bewerten", + "icon": "scale", + "tooltip": "Jede Meldung wird darauf geprüft, ob sie wirklich zu deiner Lage passt. Themenfremdes wird aussortiert.", + }, + { + "key": "geoparsing", + "label": "Orte erkennen", + "icon": "map-pin", + "tooltip": "Aus den Meldungen werden Ortsangaben erkannt und auf der Karte verortet.", + }, + { + "key": "summary", + "label": "Lagebild verfassen", + "icon": "file-text", + "tooltip": "Aus allen geprüften Meldungen wird ein zusammenhängendes Lagebild geschrieben — mit Quellenangaben am Text.", + }, + { + "key": "factcheck", + "label": "Fakten prüfen", + "icon": "shield", + "tooltip": "Behauptungen aus den Meldungen werden gegeneinander abgeglichen: Bestätigt? Umstritten? Noch unklar?", + }, + { + "key": "qc", + "label": "Qualitätscheck", + "icon": "check-circle", + "tooltip": "Eine letzte Kontrollprüfung am Ergebnis: Doppelte Fakten zusammenführen, Karten-Verortung prüfen, bevor du benachrichtigt wirst.", + }, + { + "key": "notify", + "label": "Benachrichtigen", + "icon": "bell", + "tooltip": "Wenn etwas Wichtiges dabei war, gehen Benachrichtigungen raus — im Glockensymbol oben rechts und optional per E-Mail.", + }, +] + +VALID_KEYS = {s["key"] for s in PIPELINE_STEPS} + + +def _now_db() -> str: + """Aktuelle Zeit im DB-Format (lokal).""" + return datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M:%S") + + +async def _broadcast(ws_manager, incident_id: int, payload: dict, + visibility: str, created_by: Optional[int], tenant_id: Optional[int]): + """Sendet ein pipeline_step-Event an verbundene Clients der Lage.""" + if not ws_manager: + return + try: + await ws_manager.broadcast_for_incident( + {"type": "pipeline_step", "incident_id": incident_id, "data": payload}, + visibility, created_by, tenant_id, + ) + except Exception as e: + logger.warning(f"Pipeline-WS-Broadcast fehlgeschlagen: {e}") + + +async def start_step(db, ws_manager, *, refresh_log_id: int, incident_id: int, + step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None, + visibility: str = "public", created_by: Optional[int] = None) -> Optional[int]: + """Markiert einen Pipeline-Schritt als aktiv. + + Returns die DB-ID der Step-Zeile (für späteres Update via complete_step), oder None bei Fehler. + """ + if step_key not in VALID_KEYS: + logger.warning(f"Unbekannter Pipeline-Schritt: {step_key}") + return None + + try: + cursor = await db.execute( + """INSERT INTO refresh_pipeline_steps + (refresh_log_id, incident_id, step_key, pass_number, started_at, status, tenant_id) + VALUES (?, ?, ?, ?, ?, 'active', ?)""", + (refresh_log_id, incident_id, step_key, pass_number, _now_db(), tenant_id), + ) + await db.commit() + step_id = cursor.lastrowid + except Exception as e: + logger.warning(f"Pipeline start_step({step_key}) DB-Fehler: {e}") + step_id = None + + await _broadcast(ws_manager, incident_id, { + "step_key": step_key, + "status": "active", + "pass_number": pass_number, + }, visibility, created_by, tenant_id) + + return step_id + + +async def complete_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int, + incident_id: int, step_key: str, pass_number: int = 1, + count_value: Optional[int] = None, count_secondary: Optional[int] = None, + tenant_id: Optional[int] = None, visibility: str = "public", + created_by: Optional[int] = None): + """Markiert einen Pipeline-Schritt als abgeschlossen, mit Zahlen.""" + if step_key not in VALID_KEYS: + return + + try: + if step_id: + await db.execute( + """UPDATE refresh_pipeline_steps + SET status = 'done', completed_at = ?, count_value = ?, count_secondary = ? + WHERE id = ?""", + (_now_db(), count_value, count_secondary, step_id), + ) + else: + # Fallback wenn start_step keine ID lieferte + await db.execute( + """INSERT INTO refresh_pipeline_steps + (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, + status, count_value, count_secondary, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, 'done', ?, ?, ?)""", + (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), + count_value, count_secondary, tenant_id), + ) + await db.commit() + except Exception as e: + logger.warning(f"Pipeline complete_step({step_key}) DB-Fehler: {e}") + + await _broadcast(ws_manager, incident_id, { + "step_key": step_key, + "status": "done", + "pass_number": pass_number, + "count_value": count_value, + "count_secondary": count_secondary, + }, visibility, created_by, tenant_id) + + +async def skip_step(db, ws_manager, *, refresh_log_id: int, incident_id: int, + step_key: str, pass_number: int = 1, tenant_id: Optional[int] = None, + visibility: str = "public", created_by: Optional[int] = None): + """Markiert einen Schritt als übersprungen (z.B. Geoparsing ohne neue Artikel).""" + if step_key not in VALID_KEYS: + return + try: + await db.execute( + """INSERT INTO refresh_pipeline_steps + (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, + status, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, 'skipped', ?)""", + (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id), + ) + await db.commit() + except Exception as e: + logger.warning(f"Pipeline skip_step({step_key}) DB-Fehler: {e}") + + await _broadcast(ws_manager, incident_id, { + "step_key": step_key, + "status": "skipped", + "pass_number": pass_number, + }, visibility, created_by, tenant_id) + + +async def error_step(db, ws_manager, *, step_id: Optional[int], refresh_log_id: int, + incident_id: int, step_key: str, pass_number: int = 1, + tenant_id: Optional[int] = None, visibility: str = "public", + created_by: Optional[int] = None): + """Markiert einen Schritt als fehlgeschlagen.""" + if step_key not in VALID_KEYS: + return + try: + if step_id: + await db.execute( + """UPDATE refresh_pipeline_steps + SET status = 'error', completed_at = ? + WHERE id = ?""", + (_now_db(), step_id), + ) + else: + await db.execute( + """INSERT INTO refresh_pipeline_steps + (refresh_log_id, incident_id, step_key, pass_number, started_at, completed_at, + status, tenant_id) + VALUES (?, ?, ?, ?, ?, ?, 'error', ?)""", + (refresh_log_id, incident_id, step_key, pass_number, _now_db(), _now_db(), tenant_id), + ) + await db.commit() + except Exception as e: + logger.warning(f"Pipeline error_step({step_key}) DB-Fehler: {e}") + + await _broadcast(ws_manager, incident_id, { + "step_key": step_key, + "status": "error", + "pass_number": pass_number, + }, visibility, created_by, tenant_id) diff --git a/src/static/css/style.css b/src/static/css/style.css index 09db3f2..e56bfaf 100644 --- a/src/static/css/style.css +++ b/src/static/css/style.css @@ -5638,3 +5638,335 @@ body.tutorial-active .tutorial-cursor { border-color: var(--accent); box-shadow: 0 0 0 2px rgba(var(--accent-rgb, 59, 130, 246), 0.15); } + +/* === Analysepipeline (Visualisierung n8n-Stil) === */ +.pipeline-card { padding: 0; overflow: hidden; } +.pipeline-card .card-header { padding: var(--sp-lg) var(--sp-xl); border-bottom: 1px solid var(--border); } +.pipeline-header-meta { font-size: 12px; color: var(--text-secondary); } +.pipeline-body { + position: relative; + padding: var(--sp-3xl) var(--sp-xl); + background-color: var(--bg-card); + background-image: + linear-gradient(var(--pipeline-circuit, rgba(150, 121, 26, 0.045)) 1px, transparent 1px), + linear-gradient(90deg, var(--pipeline-circuit, rgba(150, 121, 26, 0.045)) 1px, transparent 1px), + radial-gradient(circle at 30px 30px, var(--pipeline-circuit-dot, rgba(150, 121, 26, 0.10)) 1.5px, transparent 2px); + background-size: 60px 60px, 60px 60px, 60px 60px; +} +[data-theme="light"] .pipeline-body { + --pipeline-circuit: rgba(31, 51, 89, 0.05); + --pipeline-circuit-dot: rgba(31, 51, 89, 0.10); +} +.pipeline-stage { + position: relative; + overflow-x: auto; + overflow-y: visible; +} +.pipeline-track { + display: flex; + align-items: stretch; + gap: var(--sp-md); + min-width: max-content; + padding: var(--sp-md) 0; +} +.pipeline-empty { + text-align: center; + color: var(--text-secondary); + padding: var(--sp-4xl) var(--sp-xl); + font-style: italic; +} +.pipeline-sidenote { + margin-top: var(--sp-xl); + padding: var(--sp-lg) var(--sp-xl); + border-left: 3px solid var(--accent); + background: var(--tint-accent-faint); + border-radius: 0 var(--radius-lg) var(--radius-lg) 0; + font-size: 13px; + color: var(--text-secondary); + max-width: 720px; +} + +.pipeline-block { + position: relative; + flex: 0 0 168px; + min-height: 132px; + padding: var(--sp-lg) var(--sp-md); + background: var(--bg-elevated); + border: 1px solid var(--border); + border-radius: var(--radius-lg); + display: flex; + flex-direction: column; + align-items: center; + justify-content: flex-start; + text-align: center; + cursor: pointer; + transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; + outline: none; +} +.pipeline-block:hover { transform: translateY(-2px); border-color: var(--accent); } +.pipeline-block:focus-visible { box-shadow: 0 0 0 3px var(--tint-accent-strong); } +.pipeline-block-icon { + width: 36px; + height: 36px; + color: var(--text-secondary); + margin-bottom: var(--sp-sm); + transition: color 0.3s ease; +} +.pipeline-block-icon svg { width: 100%; height: 100%; } +.pipeline-block-title { + font-size: 13px; + font-weight: 600; + color: var(--text-primary); + margin-bottom: var(--sp-xs); + line-height: 1.2; +} +.pipeline-block-count { + font-size: 11px; + color: var(--text-secondary); + line-height: 1.3; +} +.pipeline-block-count small { display: block; opacity: 0.75; font-size: 10px; } +.pipeline-block-count .count-status { font-style: italic; opacity: 0.7; } +.pipeline-block-check { + position: absolute; + top: 6px; + right: 6px; + width: 18px; + height: 18px; + color: var(--success); + opacity: 0; + transform: scale(0.6); + transition: opacity 0.3s ease, transform 0.3s ease; +} +.pipeline-block-check svg { width: 100%; height: 100%; } + +.pipeline-block.status-pending { opacity: 0.55; } +.pipeline-block.status-pending .pipeline-block-icon { color: var(--text-tertiary); } + +.pipeline-block.status-active { + border-color: var(--accent); + box-shadow: var(--glow-accent-strong); + animation: pipelinePulse 1.6s ease-in-out infinite; +} +.pipeline-block.status-active .pipeline-block-icon { color: var(--accent); } +@keyframes pipelinePulse { + 0%, 100% { box-shadow: 0 0 8px rgba(150, 121, 26, 0.35), 0 0 0 1px var(--accent); } + 50% { box-shadow: 0 0 22px rgba(150, 121, 26, 0.65), 0 0 0 2px var(--accent); } +} + +.pipeline-block.status-done { + border-color: var(--success); + background: linear-gradient(180deg, var(--bg-elevated) 0%, var(--tint-success) 100%); +} +.pipeline-block.status-done .pipeline-block-icon { color: var(--success); } +.pipeline-block.status-done .pipeline-block-check { opacity: 1; transform: scale(1); } + +.pipeline-block.status-error { + border-color: var(--error); + background: linear-gradient(180deg, var(--bg-elevated) 0%, var(--tint-error) 100%); +} +.pipeline-block.status-error .pipeline-block-icon { color: var(--error); } + +.pipeline-arrow { + flex: 0 0 28px; + align-self: center; + height: 2px; + position: relative; + background: var(--border); +} +.pipeline-arrow::after { + content: ""; + position: absolute; + right: -4px; + top: 50%; + width: 0; + height: 0; + border-top: 4px solid transparent; + border-bottom: 4px solid transparent; + border-left: 6px solid var(--border); + transform: translateY(-50%); +} +.pipeline-arrow.is-flowing { + background: linear-gradient(90deg, var(--accent), var(--accent) 50%, transparent 50%, transparent); + background-size: 12px 100%; + animation: pipelineFlow 0.8s linear infinite; +} +.pipeline-arrow.is-flowing::after { border-left-color: var(--accent); } +@keyframes pipelineFlow { + from { background-position: 0 0; } + to { background-position: 12px 0; } +} + +.pipeline-loop { + position: absolute; + bottom: -10px; + right: -10px; + width: 26px; + height: 26px; + color: var(--accent); + background: var(--bg-card); + border-radius: 50%; + padding: 4px; + border: 1px solid var(--border); + opacity: 0.5; + transition: opacity 0.3s ease; +} +.pipeline-loop svg { width: 100%; height: 100%; } +.pipeline-stage.is-looping .pipeline-loop { + opacity: 1; + animation: pipelineLoop 1.2s ease-in-out; +} +@keyframes pipelineLoop { + 0% { transform: rotate(0deg) scale(1); } + 50% { transform: rotate(180deg) scale(1.3); } + 100% { transform: rotate(360deg) scale(1); } +} + +.pipeline-tooltip { + position: fixed; + background: var(--bg-card); + color: var(--text-primary); + border: 1px solid var(--accent); + padding: var(--sp-md) var(--sp-lg); + border-radius: var(--radius); + font-size: 12px; + line-height: 1.4; + width: 280px; + box-shadow: var(--shadow-md); + pointer-events: none; + opacity: 0; + transition: opacity 0.15s ease; + z-index: 9999; +} +.pipeline-tooltip.visible { opacity: 1; } + +.pipeline-popup { + position: fixed; + inset: 0; + background: var(--backdrop); + display: flex; + align-items: center; + justify-content: center; + z-index: 9998; +} +.pipeline-popup-inner { + background: var(--bg-card); + border: 1px solid var(--accent); + border-radius: var(--radius-lg); + padding: var(--sp-3xl); + max-width: 480px; + width: 90%; + box-shadow: var(--shadow-lg); + position: relative; +} +.pipeline-popup-title { + font-family: var(--font-title); + font-size: 18px; + font-weight: 600; + color: var(--text-primary); + margin-bottom: var(--sp-lg); +} +.pipeline-popup-text { color: var(--text-secondary); line-height: 1.6; font-size: 14px; } +.pipeline-popup-close { + position: absolute; + top: 8px; + right: 8px; + width: 30px; + height: 30px; + border: none; + background: transparent; + color: var(--text-secondary); + font-size: 22px; + cursor: pointer; + border-radius: var(--radius); +} +.pipeline-popup-close:hover { background: var(--bg-hover); color: var(--text-primary); } + +.pipeline-mini { + display: flex; + align-items: center; + justify-content: center; + flex-wrap: wrap; + gap: var(--sp-xs); + padding: var(--sp-md) 0; + margin-bottom: var(--sp-md); +} +.pipeline-mini-block { + width: 28px; + height: 28px; + padding: 5px; + border: 1px solid var(--border); + border-radius: 50%; + color: var(--text-tertiary); + display: inline-flex; + align-items: center; + justify-content: center; + transition: all 0.3s ease; +} +.pipeline-mini-block svg { width: 100%; height: 100%; } +.pipeline-mini-block.status-pending { opacity: 0.4; } +.pipeline-mini-block.status-active { + color: var(--accent); + border-color: var(--accent); + box-shadow: var(--glow-accent); + animation: pipelinePulse 1.6s ease-in-out infinite; +} +.pipeline-mini-block.status-done { + color: var(--success); + border-color: var(--success); + background: var(--tint-success); +} +.pipeline-mini-block.status-error { + color: var(--error); + border-color: var(--error); + background: var(--tint-error); +} +.pipeline-mini-sep { + width: 12px; + height: 1px; + background: var(--border); +} + +@media (max-width: 900px) { + .pipeline-track { flex-direction: column; min-width: auto; align-items: stretch; } + .pipeline-block { flex: 0 0 auto; width: 100%; min-height: auto; flex-direction: row; padding: var(--sp-md); text-align: left; gap: var(--sp-md); } + .pipeline-block-icon { width: 28px; height: 28px; margin-bottom: 0; flex-shrink: 0; } + .pipeline-block-title { margin-bottom: 2px; } + .pipeline-block-count { font-size: 11px; } + .pipeline-arrow { + flex: 0 0 18px; + width: 2px; + height: 18px; + margin: 0 auto; + align-self: center; + background: var(--border); + } + .pipeline-arrow::after { + right: 50%; + top: auto; + bottom: -4px; + border-top: 6px solid var(--border); + border-bottom: none; + border-left: 4px solid transparent; + border-right: 4px solid transparent; + transform: translateX(50%); + } + .pipeline-arrow.is-flowing { + background: linear-gradient(180deg, var(--accent), var(--accent) 50%, transparent 50%, transparent); + background-size: 100% 12px; + animation: pipelineFlowVertical 0.8s linear infinite; + } + .pipeline-arrow.is-flowing::after { border-top-color: var(--accent); } + @keyframes pipelineFlowVertical { + from { background-position: 0 0; } + to { background-position: 0 12px; } + } +} + +@media (prefers-reduced-motion: reduce) { + .pipeline-block, + .pipeline-mini-block { animation: none !important; } + .pipeline-arrow.is-flowing { animation: none !important; } + .pipeline-block.status-active { box-shadow: var(--glow-accent); } + .pipeline-stage.is-looping .pipeline-loop { animation: none !important; opacity: 1; } +} diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 0ad8ebb..221ea94 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -198,6 +198,7 @@ + @@ -281,6 +282,23 @@ +