Phase 2 Health-Check tenant-fähig + Historie

- migrations/2026-05-09d_source_health_history.py NEU: source_health_history-Tabelle
  (Append-only Verlauf der Health-Check-Runs mit run_id und archived_at)
- shared/services/source_health.py:
  - tenant_id IS NULL Filter raus -> auch Tenant-Quellen werden gecheckt
  - Mojibake (Triple-Encoded UTF-8) via ftfy gefixt
  - DELETE FROM source_health_checks: vorher Stand mit run_id (uuid4) in
    source_health_history archivieren -> kein Datenverlust mehr
  - User-Agent + Timeout aus config.HEALTH_CHECK_* statt hardcoded
- routers/sources.py /health/run-stream: gleiche Änderungen wie oben
- config.py: HEALTH_CHECK_USER_AGENT + HEALTH_CHECK_TIMEOUT_S ergänzt
Dieser Commit ist enthalten in:
claude-dev
2026-05-09 02:56:49 +00:00
Ursprung 650f8b0342
Commit ca4422ccd1
4 geänderte Dateien mit 376 neuen und 285 gelöschten Zeilen

Datei anzeigen

@@ -0,0 +1,57 @@
"""Migration 2026-05-09d: source_health_history (Verlauf der Health-Checks).
Bislang wurde vor jedem Health-Check-Run die Tabelle source_health_checks geleert
(DELETE FROM source_health_checks). Damit ging die Historie verloren - kein
Trend, keine Vergleichsmöglichkeit über Runs.
Diese Migration legt eine reine Append-Tabelle source_health_history an.
Vor jedem Health-Check-Run wird der aktuelle Stand von source_health_checks
hier archiviert (mit run_id und archived_at).
Ausführung:
DB_PATH=/home/claude-dev/osint-data/osint.db python3 migrations/2026-05-09d_source_health_history.py
DB_PATH=/home/claude-dev/AegisSight-Monitor-staging/data/osint.db python3 migrations/2026-05-09d_source_health_history.py
"""
import os
import sqlite3
import sys
def main(db_path: str) -> int:
if not os.path.exists(db_path):
print(f"FEHLER: DB nicht gefunden: {db_path}", file=sys.stderr)
return 1
conn = sqlite3.connect(db_path, timeout=60)
conn.execute("PRAGMA busy_timeout = 60000")
conn.execute("PRAGMA journal_mode = WAL")
print(f"Migration auf {db_path}")
conn.executescript("""
CREATE TABLE IF NOT EXISTS source_health_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
source_id INTEGER NOT NULL,
check_type TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
details TEXT,
checked_at TIMESTAMP,
archived_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_source_health_history_run ON source_health_history(run_id);
CREATE INDEX IF NOT EXISTS idx_source_health_history_source ON source_health_history(source_id, archived_at DESC);
CREATE INDEX IF NOT EXISTS idx_source_health_history_status ON source_health_history(status, archived_at DESC);
""")
print(" + source_health_history + Indizes (idempotent)")
conn.commit()
conn.close()
print("Migration abgeschlossen.")
return 0
if __name__ == "__main__":
db_path = os.environ.get("DB_PATH", "/home/claude-dev/osint-data/osint.db")
sys.exit(main(db_path))

Datei anzeigen

@@ -48,3 +48,10 @@ MAX_FEEDS_PER_DOMAIN = 3
CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001" CLAUDE_MODEL_FAST = "claude-haiku-4-5-20251001"
CLAUDE_MODEL_MEDIUM = "claude-sonnet-4-6" CLAUDE_MODEL_MEDIUM = "claude-sonnet-4-6"
CLAUDE_MODEL_STANDARD = "claude-opus-4-7" CLAUDE_MODEL_STANDARD = "claude-opus-4-7"
# Health-Check (genutzt von shared/services/source_health.py + routers/sources.py)
HEALTH_CHECK_USER_AGENT = os.environ.get(
"HEALTH_CHECK_USER_AGENT",
"Mozilla/5.0 (compatible; AegisSight-HealthCheck/1.0)",
)
HEALTH_CHECK_TIMEOUT_S = float(os.environ.get("HEALTH_CHECK_TIMEOUT_S", "15.0"))

Datei anzeigen

@@ -1,5 +1,6 @@
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht.""" """Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
import logging import logging
import uuid
from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
@@ -10,6 +11,7 @@ import aiosqlite
from auth import get_current_admin from auth import get_current_admin
from database import db_dependency from database import db_dependency
from audit import log_action, get_client_ip, row_to_dict from audit import log_action, get_client_ip, row_to_dict
from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S
from shared.source_rules import ( from shared.source_rules import (
discover_source, discover_source,
discover_all_feeds, discover_all_feeds,
@@ -564,7 +566,7 @@ async def run_health_check_stream(
# Quellen laden # Quellen laden
cursor = await db.execute( cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at " "SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL" "FROM sources WHERE status = 'active'" # tenant + global
) )
sources = [dict(row) for row in await cursor.fetchall()] sources = [dict(row) for row in await cursor.fetchall()]
sources_with_url = [s for s in sources if s["url"]] sources_with_url = [s for s in sources if s["url"]]
@@ -577,6 +579,15 @@ async def run_health_check_stream(
# Phase 1: Erreichbarkeit # Phase 1: Erreichbarkeit
yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n" yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n"
# Bisherigen Stand archivieren, dann frisch
run_id = uuid.uuid4().hex[:12]
await db.execute(
"INSERT INTO source_health_history "
"(run_id, source_id, check_type, status, message, details, checked_at) "
"SELECT ?, source_id, check_type, status, message, details, checked_at "
"FROM source_health_checks",
(run_id,),
)
await db.execute("DELETE FROM source_health_checks") await db.execute("DELETE FROM source_health_checks")
await db.commit() await db.commit()
@@ -584,8 +595,8 @@ async def run_health_check_stream(
checked = 0 checked = 0
async with httpx.AsyncClient( async with httpx.AsyncClient(
timeout=15.0, follow_redirects=True, timeout=HEALTH_CHECK_TIMEOUT_S, follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, headers={"User-Agent": HEALTH_CHECK_USER_AGENT},
) as client: ) as client:
for source in sources_with_url: for source in sources_with_url:
try: try:

Datei anzeigen

@@ -1,282 +1,298 @@
"""Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate.""" """Quellen-Health-Check Engine - prüft Erreichbarkeit, Feed-Validität, Duplikate."""
import asyncio import asyncio
import logging import logging
import json import json
from urllib.parse import urlparse import uuid
from urllib.parse import urlparse
import httpx
import feedparser import httpx
import aiosqlite import feedparser
import aiosqlite
logger = logging.getLogger("osint.source_health")
try:
from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S
async def run_health_checks(db: aiosqlite.Connection) -> dict: except ImportError:
"""Führt alle Health-Checks für aktive Grundquellen durch.""" HEALTH_CHECK_USER_AGENT = "Mozilla/5.0 (compatible; AegisSight-HealthCheck/1.0)"
logger.info("Starte Quellen-Health-Check...") HEALTH_CHECK_TIMEOUT_S = 15.0
# Alle aktiven Grundquellen laden logger = logging.getLogger("osint.source_health")
cursor = await db.execute(
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' AND tenant_id IS NULL" async def run_health_checks(db: aiosqlite.Connection) -> dict:
) """Führt Health-Checks für alle aktiven Quellen durch (global + Tenant)."""
sources = [dict(row) for row in await cursor.fetchall()] logger.info("Starte Quellen-Health-Check...")
# Aktuelle Health-Check-Ergebnisse löschen (werden neu geschrieben) # Alle aktiven Quellen laden (global UND Tenant-spezifisch)
await db.execute("DELETE FROM source_health_checks") cursor = await db.execute(
await db.commit() "SELECT id, name, url, domain, source_type, article_count, last_seen_at "
"FROM sources WHERE status = 'active' "
checks_done = 0 )
issues_found = 0 sources = [dict(row) for row in await cursor.fetchall()]
# 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL) # Bisherigen Stand in History archivieren, dann frisch starten
sources_with_url = [s for s in sources if s["url"]] run_id = uuid.uuid4().hex[:12]
await db.execute(
async with httpx.AsyncClient( "INSERT INTO source_health_history "
timeout=15.0, "(run_id, source_id, check_type, status, message, details, checked_at) "
follow_redirects=True, "SELECT ?, source_id, check_type, status, message, details, checked_at "
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"}, "FROM source_health_checks",
) as client: (run_id,),
for i in range(0, len(sources_with_url), 5): )
batch = sources_with_url[i:i + 5] await db.execute("DELETE FROM source_health_checks")
tasks = [_check_source_reachability(client, s) for s in batch] await db.commit()
results = await asyncio.gather(*tasks, return_exceptions=True) logger.info(f"Health-Check Run {run_id}: vorigen Stand archiviert")
for source, result in zip(batch, results): checks_done = 0
if isinstance(result, Exception): issues_found = 0
await _save_check(
db, source["id"], "reachability", "error", # 1. Erreichbarkeit + Feed-Validität (nur Quellen mit URL)
f"Prüfung fehlgeschlagen: {result}", sources_with_url = [s for s in sources if s["url"]]
)
issues_found += 1 async with httpx.AsyncClient(
else: timeout=HEALTH_CHECK_TIMEOUT_S,
for check in result: follow_redirects=True,
await _save_check( headers={"User-Agent": HEALTH_CHECK_USER_AGENT},
db, source["id"], check["type"], check["status"], ) as client:
check["message"], check.get("details"), for i in range(0, len(sources_with_url), 5):
) batch = sources_with_url[i:i + 5]
if check["status"] != "ok": tasks = [_check_source_reachability(client, s) for s in batch]
issues_found += 1 results = await asyncio.gather(*tasks, return_exceptions=True)
checks_done += 1
for source, result in zip(batch, results):
# 2. Veraltete Quellen (kein Artikel seit >30 Tagen) if isinstance(result, Exception):
for source in sources: await _save_check(
if source["source_type"] in ("excluded", "web_source"): db, source["id"], "reachability", "error",
continue f"Prüfung fehlgeschlagen: {result}",
stale_check = _check_stale(source) )
if stale_check: issues_found += 1
await _save_check( else:
db, source["id"], stale_check["type"], for check in result:
stale_check["status"], stale_check["message"], await _save_check(
) db, source["id"], check["type"], check["status"],
if stale_check["status"] != "ok": check["message"], check.get("details"),
issues_found += 1 )
if check["status"] != "ok":
# 3. Duplikate erkennen issues_found += 1
duplicates = _find_duplicates(sources) checks_done += 1
for dup in duplicates:
await _save_check( # 2. Veraltete Quellen (kein Artikel seit >30 Tagen)
db, dup["source_id"], "duplicate", "warning", for source in sources:
dup["message"], json.dumps(dup.get("details", {})), if source["source_type"] in ("excluded", "web_source"):
) continue
issues_found += 1 stale_check = _check_stale(source)
if stale_check:
await db.commit() await _save_check(
logger.info( db, source["id"], stale_check["type"],
f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, " stale_check["status"], stale_check["message"],
f"{issues_found} Probleme gefunden" )
) if stale_check["status"] != "ok":
return {"checked": checks_done, "issues": issues_found} issues_found += 1
# 3. Duplikate erkennen
async def _check_source_reachability( duplicates = _find_duplicates(sources)
client: httpx.AsyncClient, source: dict, for dup in duplicates:
) -> list[dict]: await _save_check(
"""Prüft Erreichbarkeit und Feed-Validität einer Quelle.""" db, dup["source_id"], "duplicate", "warning",
checks = [] dup["message"], json.dumps(dup.get("details", {})),
url = source["url"] )
issues_found += 1
try:
resp = await client.get(url) await db.commit()
logger.info(
if resp.status_code >= 400: f"Health-Check abgeschlossen: {checks_done} Quellen geprüft, "
checks.append({ f"{issues_found} Probleme gefunden"
"type": "reachability", )
"status": "error", return {"checked": checks_done, "issues": issues_found}
"message": f"HTTP {resp.status_code} - nicht erreichbar",
"details": json.dumps({"status_code": resp.status_code, "url": url}),
}) async def _check_source_reachability(
return checks client: httpx.AsyncClient, source: dict,
) -> list[dict]:
if resp.status_code >= 300: """Prüft Erreichbarkeit und Feed-Validität einer Quelle."""
checks.append({ checks = []
"type": "reachability", url = source["url"]
"status": "warning",
"message": f"HTTP {resp.status_code} - Weiterleitung", try:
"details": json.dumps({ resp = await client.get(url)
"status_code": resp.status_code,
"final_url": str(resp.url), if resp.status_code >= 400:
}), checks.append({
}) "type": "reachability",
else: "status": "error",
checks.append({ "message": f"HTTP {resp.status_code} - nicht erreichbar",
"type": "reachability", "details": json.dumps({"status_code": resp.status_code, "url": url}),
"status": "ok", })
"message": "Erreichbar", return checks
})
if resp.status_code >= 300:
# Feed-Validität nur für RSS-Feeds checks.append({
if source["source_type"] == "rss_feed": "type": "reachability",
text = resp.text[:20000] "status": "warning",
if "<rss" not in text and "<feed" not in text and "<channel" not in text: "message": f"HTTP {resp.status_code} - Weiterleitung",
checks.append({ "details": json.dumps({
"type": "feed_validity", "status_code": resp.status_code,
"status": "error", "final_url": str(resp.url),
"message": "Kein gültiger RSS/Atom-Feed", }),
}) })
else: else:
feed = await asyncio.to_thread(feedparser.parse, text) checks.append({
if feed.get("bozo") and not feed.entries: "type": "reachability",
checks.append({ "status": "ok",
"type": "feed_validity", "message": "Erreichbar",
"status": "error", })
"message": "Feed fehlerhaft (bozo)",
"details": json.dumps({ # Feed-Validität nur für RSS-Feeds
"bozo_exception": str(feed.get("bozo_exception", "")), if source["source_type"] == "rss_feed":
}), text = resp.text[:20000]
}) if "<rss" not in text and "<feed" not in text and "<channel" not in text:
elif not feed.entries: checks.append({
checks.append({ "type": "feed_validity",
"type": "feed_validity", "status": "error",
"status": "warning", "message": "Kein gültiger RSS/Atom-Feed",
"message": "Feed erreichbar aber leer", })
}) else:
else: feed = await asyncio.to_thread(feedparser.parse, text)
checks.append({ if feed.get("bozo") and not feed.entries:
"type": "feed_validity", checks.append({
"status": "ok", "type": "feed_validity",
"message": f"Feed gültig ({len(feed.entries)} Einträge)", "status": "error",
}) "message": "Feed fehlerhaft (bozo)",
"details": json.dumps({
except httpx.TimeoutException: "bozo_exception": str(feed.get("bozo_exception", "")),
checks.append({ }),
"type": "reachability", })
"status": "error", elif not feed.entries:
"message": "Timeout (15s)", checks.append({
}) "type": "feed_validity",
except httpx.ConnectError as e: "status": "warning",
checks.append({ "message": "Feed erreichbar aber leer",
"type": "reachability", })
"status": "error", else:
"message": f"Verbindung fehlgeschlagen: {e}", checks.append({
}) "type": "feed_validity",
except Exception as e: "status": "ok",
checks.append({ "message": f"Feed gültig ({len(feed.entries)} Einträge)",
"type": "reachability", })
"status": "error",
"message": f"{type(e).__name__}: {e}", except httpx.TimeoutException:
}) checks.append({
"type": "reachability",
return checks "status": "error",
"message": "Timeout (15s)",
})
def _check_stale(source: dict) -> dict | None: except httpx.ConnectError as e:
"""Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen).""" checks.append({
if source["source_type"] == "excluded": "type": "reachability",
return None "status": "error",
"message": f"Verbindung fehlgeschlagen: {e}",
article_count = source.get("article_count") or 0 })
last_seen = source.get("last_seen_at") except Exception as e:
checks.append({
if article_count == 0: "type": "reachability",
return { "status": "error",
"type": "stale", "message": f"{type(e).__name__}: {e}",
"status": "warning", })
"message": "Noch nie Artikel geliefert",
} return checks
if last_seen:
try: def _check_stale(source: dict) -> dict | None:
from datetime import datetime """Prüft ob eine Quelle veraltet ist (keine Artikel seit >30 Tagen)."""
last_dt = datetime.fromisoformat(last_seen) if source["source_type"] == "excluded":
now = datetime.now() return None
age_days = (now - last_dt).days
if age_days > 30: article_count = source.get("article_count") or 0
return { last_seen = source.get("last_seen_at")
"type": "stale",
"status": "warning", if article_count == 0:
"message": f"Letzter Artikel vor {age_days} Tagen", return {
} "type": "stale",
except (ValueError, TypeError): "status": "warning",
pass "message": "Noch nie Artikel geliefert",
}
return None
if last_seen:
try:
def _find_duplicates(sources: list[dict]) -> list[dict]: from datetime import datetime
"""Findet doppelte Quellen (gleiche URL).""" last_dt = datetime.fromisoformat(last_seen)
duplicates = [] now = datetime.now()
url_map = {} age_days = (now - last_dt).days
if age_days > 30:
for s in sources: return {
if not s["url"]: "type": "stale",
continue "status": "warning",
url_norm = s["url"].lower().rstrip("/") "message": f"Letzter Artikel vor {age_days} Tagen",
if url_norm in url_map: }
existing = url_map[url_norm] except (ValueError, TypeError):
duplicates.append({ pass
"source_id": s["id"],
"message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})", return None
"details": {"duplicate_of": existing["id"], "type": "url"},
})
else: def _find_duplicates(sources: list[dict]) -> list[dict]:
url_map[url_norm] = s """Findet doppelte Quellen (gleiche URL)."""
duplicates = []
return duplicates url_map = {}
for s in sources:
async def _save_check( if not s["url"]:
db: aiosqlite.Connection, source_id: int, check_type: str, continue
status: str, message: str, details: str = None, url_norm = s["url"].lower().rstrip("/")
): if url_norm in url_map:
"""Speichert ein Health-Check-Ergebnis.""" existing = url_map[url_norm]
await db.execute( duplicates.append({
"INSERT INTO source_health_checks " "source_id": s["id"],
"(source_id, check_type, status, message, details) " "message": f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})",
"VALUES (?, ?, ?, ?, ?)", "details": {"duplicate_of": existing["id"], "type": "url"},
(source_id, check_type, status, message, details), })
) else:
url_map[url_norm] = s
async def get_health_summary(db: aiosqlite.Connection) -> dict: return duplicates
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute("""
SELECT async def _save_check(
h.id, h.source_id, s.name, s.domain, s.url, s.source_type, db: aiosqlite.Connection, source_id: int, check_type: str,
h.check_type, h.status, h.message, h.details, h.checked_at status: str, message: str, details: str = None,
FROM source_health_checks h ):
JOIN sources s ON s.id = h.source_id """Speichert ein Health-Check-Ergebnis."""
ORDER BY await db.execute(
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, "INSERT INTO source_health_checks "
s.name "(source_id, check_type, status, message, details) "
""") "VALUES (?, ?, ?, ?, ?)",
checks = [dict(row) for row in await cursor.fetchall()] (source_id, check_type, status, message, details),
)
error_count = sum(1 for c in checks if c["status"] == "error")
warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok") async def get_health_summary(db: aiosqlite.Connection) -> dict:
"""Gibt eine Zusammenfassung der letzten Health-Check-Ergebnisse zurück."""
cursor = await db.execute( cursor = await db.execute("""
"SELECT MAX(checked_at) as last_check FROM source_health_checks" SELECT
) h.id, h.source_id, s.name, s.domain, s.url, s.source_type,
row = await cursor.fetchone() h.check_type, h.status, h.message, h.details, h.checked_at
last_check = row["last_check"] if row else None FROM source_health_checks h
JOIN sources s ON s.id = h.source_id
return { ORDER BY
"last_check": last_check, CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
"total_checks": len(checks), s.name
"errors": error_count, """)
"warnings": warning_count, checks = [dict(row) for row in await cursor.fetchall()]
"ok": ok_count,
"checks": checks, error_count = sum(1 for c in checks if c["status"] == "error")
} warning_count = sum(1 for c in checks if c["status"] == "warning")
ok_count = sum(1 for c in checks if c["status"] == "ok")
cursor = await db.execute(
"SELECT MAX(checked_at) as last_check FROM source_health_checks"
)
row = await cursor.fetchone()
last_check = row["last_check"] if row else None
return {
"last_check": last_check,
"total_checks": len(checks),
"errors": error_count,
"warnings": warning_count,
"ok": ok_count,
"checks": checks,
}