Dateien
AegisSight-Monitor/src/main.py
Claude Code 7777b77abd feat(pipeline): Translator als Pipeline-Step + Watchdog-Limits erhoehen
Folgefix zu 952df87. Der Translator-Block laeuft post-summary bei jp_demo
40+ Min und war bisher fuer das Frontend unsichtbar und fuer den Watchdog
ein blinder Fleck (kein Pipeline-Step-Eintrag).

Aenderungen:
- pipeline_tracker.py: neuer Step 'translate' zwischen 'summary' und 'qc'
  (DE+EN Label/Tooltip). Bewusst conditional sichtbar: erscheint nur, wenn
  fremdsprachige Artikel ohne DE-Uebersetzung vorliegen UND
  translator_enabled fuer die Org an ist.
- orchestrator.py: Translator-Block umrandet mit _pipe_start('translate')
  und _pipe_done('translate', count_value=uebersetzt, count_secondary=
  pending). Translator-Fehler schliesst Step trotzdem sauber ab.
  Bedingung 'pending_translations and translator_enabled' ersetzt das
  alte 'pending_translations' - skipped den Block sauber wenn Org-Override
  deaktiviert (war vorher redundant in translate_articles selbst).
- main.py: ORPHAN_IDLE_LIMIT 30->60 Min, ORPHAN_HARD_LIMIT 90->120 Min.
  Deckt jp_demo Translator-Phase (beobachtet bis 41 Min) mit Puffer ab,
  ohne echte Haenger durchzulassen.

Resultierend: Frontend zeigt den Uebersetzungs-Schritt mit Fortschritt
(uebersetzt/gesamt). Watchdog killt nicht mehr vorzeitig.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 00:22:34 +00:00

496 Zeilen
19 KiB
Python

"""OSINT Lagemonitor - Hauptanwendung."""
import asyncio
import json
import logging
import os
import sys
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from typing import Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, RedirectResponse
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from starlette.middleware.base import BaseHTTPMiddleware
from config import STATIC_DIR, LOG_DIR, DATA_DIR, TIMEZONE, DEV_MODE
from database import init_db, get_db
from auth import decode_token
from agents.orchestrator import orchestrator
from services.source_health import run_health_checks, get_health_summary
from services.source_suggester import generate_suggestions
from services.fact_consolidation import consolidate_fact_checks
# Logging
os.makedirs(LOG_DIR, exist_ok=True)
from logging.handlers import RotatingFileHandler
import time as _time
_log_level = logging.DEBUG if DEV_MODE else logging.INFO
_file_handler = RotatingFileHandler(
os.path.join(LOG_DIR, "osint-monitor.log"),
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=5,
encoding="utf-8",
)
_file_handler.setLevel(_log_level)
_console_handler = logging.StreamHandler(sys.stdout)
_console_handler.setLevel(_log_level)
logging.basicConfig(
level=_log_level,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
handlers=[_console_handler, _file_handler],
)
# Externe Bibliotheken nicht auf DEBUG setzen
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("apscheduler").setLevel(logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logger = logging.getLogger("osint")
logger.info(f"Monitor gestartet (DEV_MODE={DEV_MODE}, Log-Level={logging.getLevelName(_log_level)})")
class WebSocketManager:
"""Verwaltet WebSocket-Verbindungen für Echtzeit-Updates."""
def __init__(self):
self._connections: Dict[WebSocket, int] = {} # ws -> user_id
async def connect(self, websocket: WebSocket, user_id: int):
self._connections[websocket] = user_id
logger.info(f"WebSocket verbunden (User {user_id}, {len(self._connections)} aktiv)")
def disconnect(self, websocket: WebSocket):
self._connections.pop(websocket, None)
logger.info(f"WebSocket getrennt ({len(self._connections)} aktiv)")
async def broadcast(self, message: dict):
"""Nachricht an alle verbundenen Clients senden."""
if not self._connections:
return
data = json.dumps(message, ensure_ascii=False)
disconnected = []
for ws in self._connections:
try:
await ws.send_text(data)
except Exception:
disconnected.append(ws)
for ws in disconnected:
self._connections.pop(ws, None)
async def broadcast_for_incident(self, message: dict, visibility: str, created_by: int, tenant_id: int = None):
"""Nachricht nur an berechtigte Clients senden (private Lagen → nur Ersteller)."""
if not self._connections:
return
data = json.dumps(message, ensure_ascii=False)
disconnected = []
for ws, user_id in self._connections.items():
if visibility == "private" and user_id != created_by:
continue
try:
await ws.send_text(data)
except Exception:
disconnected.append(ws)
for ws in disconnected:
self._connections.pop(ws, None)
ws_manager = WebSocketManager()
# Scheduler für Auto-Refresh
scheduler = AsyncIOScheduler()
async def check_auto_refresh():
"""Prüft welche Lagen einen Auto-Refresh brauchen (Slot-basiert)."""
db = await get_db()
try:
cursor = await db.execute(
"SELECT id, refresh_interval, refresh_start_time FROM incidents WHERE status = 'active' AND refresh_mode = 'auto'"
)
incidents = await cursor.fetchall()
now = datetime.now(TIMEZONE)
for incident in incidents:
incident_id = incident["id"]
interval = incident["refresh_interval"]
start_time_str = incident["refresh_start_time"]
# Letzten abgeschlossenen oder laufenden Refresh pruefen
cursor = await db.execute(
"SELECT started_at, status FROM refresh_log WHERE incident_id = ? AND status IN ('completed', 'running', 'cancelled', 'error') ORDER BY id DESC LIMIT 1",
(incident_id,),
)
last_refresh = await cursor.fetchone()
# Laufenden Refresh ueberspringen
if last_refresh and last_refresh["status"] == "running":
logger.debug(f"Auto-Refresh Lage {incident_id}: uebersprungen (laeuft bereits)")
continue
should_refresh = False
if not last_refresh:
# Noch nie gelaufen -> sofort starten
should_refresh = True
logger.info(f"Auto-Refresh Lage {incident_id}: erster Refresh")
elif start_time_str:
# Slot-basierte Logik: Naechsten faelligen Slot berechnen
try:
start_h, start_m = map(int, start_time_str.split(":"))
except (ValueError, AttributeError):
logger.warning(f"Auto-Refresh Lage {incident_id}: ungueltiges Startzeit-Format '{start_time_str}'")
continue
last_time = datetime.fromisoformat(last_refresh["started_at"])
if last_time.tzinfo is None:
last_time = last_time.replace(tzinfo=TIMEZONE)
else:
last_time = last_time.astimezone(TIMEZONE)
# Anker: heute um start_time
anchor_today = now.replace(hour=start_h, minute=start_m, second=0, microsecond=0)
interval_td = timedelta(minutes=interval)
if interval >= 1440:
# Taeglicher oder laengerer Rhythmus
days_interval = interval // 1440
# Letzter Slot der <= now ist
current_slot = anchor_today
if current_slot > now:
current_slot -= timedelta(days=days_interval)
# Sicherheitsschleife: weiter zurueck falls noetig
while current_slot > now:
current_slot -= timedelta(days=days_interval)
else:
# Untertaegig: Slots ab Anker im Intervall-Takt
# Anker zurueck bis vor last_refresh
ref_anchor = anchor_today
while ref_anchor > last_time:
ref_anchor -= interval_td
# Von dort vorwaerts bis zum letzten Slot <= now
current_slot = ref_anchor
while current_slot + interval_td <= now:
current_slot += interval_td
if current_slot > last_time:
should_refresh = True
logger.info(f"Auto-Refresh Lage {incident_id}: Slot {current_slot.strftime('%H:%M')} faellig (letzter Refresh: {last_time.strftime('%Y-%m-%d %H:%M')})")
else:
logger.debug(f"Auto-Refresh Lage {incident_id}: kein faelliger Slot (letzter: {current_slot.strftime('%H:%M')})")
else:
# Fallback: altes Intervall-Verhalten (kein start_time gesetzt)
last_time = datetime.fromisoformat(last_refresh["started_at"])
if last_time.tzinfo is None:
last_time = last_time.replace(tzinfo=TIMEZONE)
else:
last_time = last_time.astimezone(TIMEZONE)
elapsed = (now - last_time).total_seconds() / 60
if elapsed >= interval:
should_refresh = True
logger.info(f"Auto-Refresh Lage {incident_id}: {elapsed:.1f} Min seit letztem Refresh (Intervall: {interval} Min)")
else:
logger.debug(f"Auto-Refresh Lage {incident_id}: {elapsed:.1f}/{interval} Min — noch nicht faellig")
if should_refresh:
await orchestrator.enqueue_refresh(incident_id, trigger_type="auto")
except Exception as e:
logger.error(f"Auto-Refresh Check Fehler: {e}")
finally:
await db.close()
async def daily_source_health_check():
"""Täglicher Quellen-Health-Check + KI-Vorschläge."""
db = await get_db()
try:
result = await run_health_checks(db)
logger.info(f"Täglicher Health-Check: {result['checked']} geprüft, {result['issues']} Probleme")
suggestion_count = await generate_suggestions(db)
logger.info(f"Tägliche Vorschläge: {suggestion_count} neue Vorschläge")
except Exception as e:
logger.error(f"Täglicher Health-Check Fehler: {e}", exc_info=True)
finally:
await db.close()
async def cleanup_expired():
"""Bereinigt abgelaufene Lagen basierend auf retention_days."""
db = await get_db()
try:
cursor = await db.execute(
"SELECT id, retention_days, created_at FROM incidents WHERE retention_days > 0 AND status = 'active'"
)
incidents = await cursor.fetchall()
now = datetime.now(TIMEZONE)
for incident in incidents:
created = datetime.fromisoformat(incident["created_at"])
if created.tzinfo is None:
created = created.replace(tzinfo=TIMEZONE)
else:
created = created.astimezone(TIMEZONE)
age_days = (now - created).days
if age_days >= incident["retention_days"]:
await db.execute(
"UPDATE incidents SET status = 'archived' WHERE id = ?",
(incident["id"],),
)
logger.info(f"Lage {incident['id']} archiviert (Aufbewahrung abgelaufen)")
# Verwaiste running-Einträge bereinigen.
# Pruefen auf Pipeline-Fortschritt: legitime Long-Runner (z.B. Translator
# nach summary fuer jp_demo mit 200+ Artikeln ~20 Min) duerfen nicht
# vorzeitig gekillt werden. Ein Refresh gilt als verwaist, wenn entweder
# (a) seit ORPHAN_IDLE_LIMIT Min kein Pipeline-Step Fortschritt zeigte,
# oder (b) das harte Limit ORPHAN_HARD_LIMIT Min ueberschritten wurde.
ORPHAN_IDLE_LIMIT = 60
ORPHAN_HARD_LIMIT = 120
cursor = await db.execute(
"SELECT id, incident_id, started_at FROM refresh_log WHERE status = 'running'"
)
orphans = await cursor.fetchall()
for orphan in orphans:
started = datetime.fromisoformat(orphan["started_at"])
if started.tzinfo is None:
started = started.replace(tzinfo=TIMEZONE)
else:
started = started.astimezone(TIMEZONE)
age_minutes = (now - started).total_seconds() / 60
if age_minutes < ORPHAN_IDLE_LIMIT:
continue
# Letzter Pipeline-Step-Fortschritt (Start ODER Ende)
prog_cursor = await db.execute(
"""SELECT MAX(COALESCE(completed_at, started_at)) AS last_activity
FROM refresh_pipeline_steps WHERE refresh_log_id = ?""",
(orphan["id"],),
)
prog_row = await prog_cursor.fetchone()
last_activity_str = prog_row["last_activity"] if prog_row else None
is_orphan = False
reason = None
if age_minutes >= ORPHAN_HARD_LIMIT:
is_orphan = True
reason = f"Verwaist (>{int(age_minutes)} Min, hartes Limit {ORPHAN_HARD_LIMIT} Min)"
elif last_activity_str:
last_activity = datetime.fromisoformat(last_activity_str)
if last_activity.tzinfo is None:
last_activity = last_activity.replace(tzinfo=TIMEZONE)
else:
last_activity = last_activity.astimezone(TIMEZONE)
idle_minutes = (now - last_activity).total_seconds() / 60
if idle_minutes >= ORPHAN_IDLE_LIMIT:
is_orphan = True
reason = (
f"Verwaist (kein Pipeline-Fortschritt seit {int(idle_minutes)} Min, "
f"gesamt {int(age_minutes)} Min)"
)
else:
is_orphan = True
reason = f"Verwaist (keine Pipeline-Schritte nach {int(age_minutes)} Min)"
if is_orphan:
await db.execute(
"UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = ? WHERE id = ?",
(now.strftime('%Y-%m-%d %H:%M:%S'), reason, orphan["id"]),
)
logger.warning(f"Verwaisten Refresh #{orphan['id']} fuer Lage {orphan['incident_id']} bereinigt: {reason}")
# Alte Notifications bereinigen (> 7 Tage)
await db.execute("DELETE FROM notifications WHERE created_at < datetime('now', '-7 days')")
await db.commit()
except Exception as e:
logger.error(f"Cleanup Fehler: {e}")
finally:
await db.close()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""App Startup/Shutdown."""
# Startup
os.makedirs(DATA_DIR, exist_ok=True)
await init_db()
# Verwaiste running-Einträge beim Start bereinigen
db = await get_db()
try:
result = await db.execute(
"UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = 'Verwaist (Neustart, automatisch bereinigt)' WHERE status = 'running'",
(datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'),),
)
if result.rowcount > 0:
await db.commit()
logger.warning(f"{result.rowcount} verwaiste running-Einträge beim Start bereinigt")
finally:
await db.close()
orchestrator.set_ws_manager(ws_manager)
await orchestrator.start()
from services import pdf_ingest as _pdf_ingest
scheduler.add_job(_pdf_ingest.run_once, "interval", minutes=1, id="pdf_ingest", max_instances=1, coalesce=True)
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health")
scheduler.start()
logger.info("OSINT Lagemonitor gestartet")
yield
# Shutdown
scheduler.shutdown()
await orchestrator.stop()
logger.info("OSINT Lagemonitor gestoppt")
app = FastAPI(
title="OSINT Lagemonitor",
version="1.0.0",
lifespan=lifespan,
)
# Request-Logging Middleware (nur im Dev-Modus)
if DEV_MODE:
_req_logger = logging.getLogger("osint.http")
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = _time.monotonic()
response = await call_next(request)
duration = (_time.monotonic() - start) * 1000
# Statische Dateien und WebSocket-Upgrades nicht loggen
path = request.url.path
if not path.startswith(("/static/", "/favicon")) and "websocket" not in str(request.headers.get("upgrade", "")):
_req_logger.debug(
f"{request.method} {path} -> {response.status_code} ({duration:.0f}ms)"
)
return response
# Security-Headers Middleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://cdn.jsdelivr.net; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https://tile.openstreetmap.de; "
"connect-src 'self' wss: ws:; "
"frame-ancestors 'none'"
)
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=(), payment=()"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
if DEV_MODE:
app.add_middleware(RequestLoggingMiddleware)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://monitor.aegis-sight.de"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# Router einbinden
from routers.auth import router as auth_router
from routers.incidents import router as incidents_router
from routers.sources import router as sources_router
from routers.notifications import router as notifications_router
from routers.feedback import router as feedback_router
from routers.public_api import router as public_api_router
from routers.chat import router as chat_router
from routers.tutorial import router as tutorial_router
from routes.version_router import router as version_router
app.include_router(auth_router)
app.include_router(incidents_router)
app.include_router(sources_router)
app.include_router(notifications_router)
app.include_router(feedback_router)
app.include_router(public_api_router)
app.include_router(chat_router, prefix="/api/chat")
app.include_router(tutorial_router)
app.include_router(version_router)
@app.websocket("/api/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket-Endpunkt für Echtzeit-Updates."""
await websocket.accept()
# Token als erste Nachricht empfangen (nicht in URL)
try:
token = await asyncio.wait_for(websocket.receive_text(), timeout=5.0)
except asyncio.TimeoutError:
try:
await websocket.close(code=4001, reason="Token fehlt")
except Exception:
pass
return
except WebSocketDisconnect:
return
try:
payload = decode_token(token)
user_id = int(payload["sub"])
except Exception:
try:
await websocket.close(code=4001, reason="Token ungültig")
except Exception:
pass
return
# Authentifizierung erfolgreich
await ws_manager.connect(websocket, user_id)
await websocket.send_text("authenticated")
try:
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
# Statische Dateien und Frontend-Routing
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
@app.get("/")
async def root():
"""Login-Seite ausliefern."""
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
@app.get("/dashboard")
async def dashboard():
"""Dashboard ausliefern."""
return FileResponse(os.path.join(STATIC_DIR, "dashboard.html"))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8891)