Fortschrittsanzeige beim Health-Check (SSE-Streaming)

- Neuer Endpoint /health/run-stream mit Server-Sent Events
- Frontend zeigt Fortschrittsbalken: 4/12 + Quellenname + Prozent
- Status-Icons pro Quelle (Fehler/Warnung/OK)
- Phase Vorschläge wird separat angezeigt
- Ergebnis-Zusammenfassung verschwindet nach 5 Sekunden

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dieser Commit ist enthalten in:
claude-dev
2026-03-08 17:35:37 +01:00
Ursprung eaceb5ef6b
Commit d46eac4733
2 geänderte Dateien mit 255 neuen und 35 gelöschten Zeilen

Datei anzeigen

@@ -1,12 +1,13 @@
import os
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
import sys
import logging
# Monitor-Source-Rules verfügbar machen
# Monitor-Source-Rules verfügbar machen
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional
from auth import get_current_admin
@@ -297,7 +298,7 @@ async def add_discovered_sources(
existing_urls.add(feed["url"])
added += 1
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
if feeds and feeds[0].get("domain"):
domain = feeds[0]["domain"]
cursor = await db.execute(
@@ -318,7 +319,7 @@ async def add_discovered_sources(
# --- Health-Check & Vorschläge ---
# --- Health-Check & Vorschläge ---
@router.get("/health")
async def get_health(
@@ -326,7 +327,7 @@ async def get_health(
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check-Ergebnisse abrufen."""
# Prüfen ob Tabelle existiert
# Prüfen ob Tabelle existiert
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'"
)
@@ -368,7 +369,7 @@ async def get_suggestions(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'"
)
@@ -431,7 +432,7 @@ async def update_suggestion(
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,)
)
if await cursor.fetchone():
result_action = "übersprungen (URL bereits vorhanden)"
result_action = "übersprungen (URL bereits vorhanden)"
new_status = "rejected"
else:
await db.execute(
@@ -441,7 +442,7 @@ async def update_suggestion(
)
result_action = f"Quelle '{name}' angelegt"
else:
result_action = "übersprungen (keine URL)"
result_action = "übersprungen (keine URL)"
new_status = "rejected"
elif stype == "deactivate_source":
@@ -454,7 +455,7 @@ async def update_suggestion(
source_id = suggestion["source_id"]
if source_id:
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
result_action = "Quelle gelöscht"
result_action = "Quelle gelöscht"
elif stype == "fix_url":
source_id = suggestion["source_id"]
@@ -464,7 +465,7 @@ async def update_suggestion(
result_action = f"URL aktualisiert"
# Auto-Reject: Wenn fix_url oder add_source akzeptiert wird,
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
if stype in ("fix_url", "add_source") and suggestion.get("source_id"):
await db.execute(
"UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP "
@@ -528,13 +529,175 @@ async def run_health_check_now(
@router.post("/health/run-stream")
async def run_health_check_stream(
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Health-Check mit Fortschrittsanzeige (SSE-Stream)."""
import json as _json
import asyncio
from urllib.parse import urlparse
# Tabellen sicherstellen
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
check_type TEXT NOT NULL, status TEXT NOT NULL,
message TEXT, details TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suggestion_type TEXT NOT NULL, title TEXT NOT NULL,
description TEXT, source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
suggested_data TEXT, priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending', reviewed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
await db.commit()
# Quellen laden
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL"
)
sources = [dict(row) for row in await cursor.fetchall()]
sources_with_url = [s for s in sources if s["url"]]
total = len(sources_with_url)
async def generate():
import httpx
import feedparser
# Phase 1: Erreichbarkeit
yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n"
await db.execute("DELETE FROM source_health_checks")
await db.commit()
issues_found = 0
checked = 0
async with httpx.AsyncClient(
timeout=15.0, follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
for source in sources_with_url:
try:
checks = []
try:
resp = await client.get(source["url"])
if resp.status_code >= 400:
checks.append({"type": "reachability", "status": "error",
"message": f"HTTP {resp.status_code} - nicht erreichbar"})
else:
checks.append({"type": "reachability", "status": "ok", "message": "Erreichbar"})
if source["source_type"] == "rss_feed":
text = resp.text[:20000]
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
checks.append({"type": "feed_validity", "status": "error",
"message": "Kein RSS/Atom-Feed"})
else:
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
checks.append({"type": "feed_validity", "status": "error",
"message": "Feed fehlerhaft"})
elif not feed.entries:
checks.append({"type": "feed_validity", "status": "warning",
"message": "Feed leer"})
else:
checks.append({"type": "feed_validity", "status": "ok",
"message": f"Feed OK ({len(feed.entries)} Eintr.)"})
except httpx.TimeoutException:
checks.append({"type": "reachability", "status": "error", "message": "Timeout (15s)"})
except httpx.ConnectError as e:
checks.append({"type": "reachability", "status": "error", "message": f"Verbindung fehlgeschlagen"})
except Exception as e:
checks.append({"type": "reachability", "status": "error", "message": f"{type(e).__name__}"})
for c in checks:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, ?, ?, ?)",
(source["id"], c["type"], c["status"], c["message"]))
if c["status"] != "ok":
issues_found += 1
except Exception:
pass
checked += 1
status_icon = "ok"
if any(c["status"] == "error" for c in checks):
status_icon = "error"
elif any(c["status"] == "warning" for c in checks):
status_icon = "warning"
yield f"data: {_json.dumps({'phase': 'check', 'checked': checked, 'total': total, 'current': source['name'], 'status': status_icon})}\n\n"
# Stale + Duplikate (schnell, kein Fortschritt noetig)
for source in sources:
if source["source_type"] == "excluded":
continue
article_count = source.get("article_count") or 0
if article_count == 0:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', 'Noch nie Artikel geliefert')",
(source["id"],))
issues_found += 1
elif source.get("last_seen_at"):
try:
from datetime import datetime
last_dt = datetime.fromisoformat(source["last_seen_at"])
age = (datetime.now() - last_dt).days
if age > 30:
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', ?)",
(source["id"], f"Letzter Artikel vor {age} Tagen"))
issues_found += 1
except (ValueError, TypeError):
pass
# Duplikate
url_map = {}
for s in sources:
if not s["url"]:
continue
url_norm = s["url"].lower().rstrip("/")
if url_norm in url_map:
existing = url_map[url_norm]
await db.execute(
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'duplicate', 'warning', ?)",
(s["id"], f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})"))
issues_found += 1
else:
url_map[url_norm] = s
await db.commit()
# Phase 2: Vorschlaege
yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n"
sys.path.insert(0, "/home/claude-dev/AegisSight-Monitor/src")
from services.source_suggester import generate_suggestions
suggestion_count = await generate_suggestions(db)
# Fertig
yield f"data: {_json.dumps({'phase': 'done', 'checked': checked, 'total': total, 'issues': issues_found, 'suggestions': suggestion_count})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@router.post("/health/search-fix/{source_id}")
async def search_fix_for_source(
source_id: int,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen."""
"""Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen."""
import json as _json
cursor = await db.execute(
@@ -547,7 +710,7 @@ async def search_fix_for_source(
source = dict(source)
# Health-Check-Probleme für diese Quelle laden
# Health-Check-Probleme für diese Quelle laden
cursor = await db.execute(
"SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?",
(source_id,),
@@ -566,14 +729,14 @@ Kategorie: {source['category']}
Probleme:
{issues_text}
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
Regeln:
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
- Maximal 3 Lösungen vorschlagen (die besten)
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
Antworte NUR mit einem JSON-Objekt:
{{
@@ -583,10 +746,10 @@ Antworte NUR mit einem JSON-Objekt:
"type": "replace_url|add_feed|deactivate",
"name": "Anzeigename",
"url": "https://...",
"description": "Kurze Begründung"
"description": "Kurze Begründung"
}}
],
"summary": "Zusammenfassung in 1-2 Sätzen"
"summary": "Zusammenfassung in 1-2 Sätzen"
}}
Nur das JSON, kein anderer Text."""
@@ -604,7 +767,7 @@ Nur das JSON, kein anderer Text."""
else:
result = {"fixable": False, "solutions": [], "summary": response[:500]}
# Lösungen als Vorschläge speichern
# Lösungen als Vorschläge speichern
await db.executescript("""
CREATE TABLE IF NOT EXISTS source_suggestions (
id INTEGER PRIMARY KEY AUTOINCREMENT,

Datei anzeigen

@@ -220,26 +220,83 @@ async function handleSuggestion(id, accept) {
// --- Health-Check manuell starten ---
async function runHealthCheck() {
const btn = document.getElementById("runHealthCheckBtn");
if (btn) {
btn.disabled = true;
btn.textContent = "Läuft...";
if (!btn) return;
btn.disabled = true;
// Fortschrittsanzeige erstellen
let progressEl = document.getElementById("healthProgress");
if (!progressEl) {
progressEl = document.createElement("div");
progressEl.id = "healthProgress";
progressEl.style.cssText = "display:flex;align-items:center;gap:12px;padding:12px 16px;background:var(--bg-secondary);border:1px solid var(--border);border-radius:var(--radius);margin-bottom:16px;font-size:13px;";
btn.parentElement.after(progressEl);
}
progressEl.style.display = "flex";
function updateProgress(data) {
if (data.phase === "check") {
const pct = data.total > 0 ? Math.round((data.checked / data.total) * 100) : 0;
const statusIcon = data.status === "error" ? "\u2717" : data.status === "warning" ? "\u26A0" : "\u2713";
btn.textContent = data.checked + "/" + data.total;
progressEl.innerHTML =
'<div style="flex:1;">' +
'<div style="display:flex;justify-content:space-between;margin-bottom:4px;">' +
'<span>' + (data.current ? statusIcon + " " + esc(data.current) : "Starte...") + '</span>' +
'<span class="text-secondary">' + pct + '%</span>' +
'</div>' +
'<div style="height:4px;background:var(--bg-tertiary);border-radius:2px;overflow:hidden;">' +
'<div style="height:100%;width:' + pct + '%;background:var(--accent);transition:width 0.3s;"></div>' +
'</div>' +
'</div>';
} else if (data.phase === "suggestions") {
progressEl.innerHTML = '<span class="text-secondary">Generiere Vorschl\u00e4ge...</span>';
} else if (data.phase === "done") {
progressEl.innerHTML = '<span class="text-success">' + data.checked + ' gepr\u00fcft, ' + data.issues + ' Probleme, ' + data.suggestions + ' Vorschl\u00e4ge</span>';
setTimeout(function() { progressEl.style.display = "none"; }, 5000);
}
}
try {
const result = await API.post("/api/sources/health/run");
alert(
`Health-Check abgeschlossen: ${result.checked} Quellen geprüft, ` +
`${result.issues} Probleme gefunden. ` +
`${result.suggestions} neue Vorschläge generiert.`,
);
const headers = { "Content-Type": "application/json" };
if (API.token) headers["Authorization"] = "Bearer " + API.token;
const response = await fetch("/api/sources/health/run-stream", {
method: "POST",
headers: headers,
});
if (!response.ok) {
throw new Error("HTTP " + response.status);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
try {
const data = JSON.parse(line.slice(6));
updateProgress(data);
} catch (_) {}
}
}
}
loadHealthData();
} catch (err) {
alert("Fehler: " + err.message);
progressEl.innerHTML = '<span class="text-danger">Fehler: ' + esc(err.message) + '</span>';
} finally {
if (btn) {
btn.disabled = false;
btn.textContent = "Jetzt prüfen";
}
btn.disabled = false;
btn.textContent = "Jetzt pr\u00fcfen";
}
}