"""OSINT Lagemonitor - Hauptanwendung.""" import asyncio import json import logging import os import sys from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Dict from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, RedirectResponse from apscheduler.schedulers.asyncio import AsyncIOScheduler from starlette.middleware.base import BaseHTTPMiddleware from config import STATIC_DIR, LOG_DIR, DATA_DIR, TIMEZONE, DEV_MODE from database import init_db, get_db from auth import decode_token from agents.orchestrator import orchestrator from services.source_health import run_health_checks, get_health_summary from services.source_suggester import generate_suggestions from services.fact_consolidation import consolidate_fact_checks # Logging os.makedirs(LOG_DIR, exist_ok=True) from logging.handlers import RotatingFileHandler import time as _time _log_level = logging.DEBUG if DEV_MODE else logging.INFO _file_handler = RotatingFileHandler( os.path.join(LOG_DIR, "osint-monitor.log"), maxBytes=5 * 1024 * 1024, # 5 MB backupCount=5, encoding="utf-8", ) _file_handler.setLevel(_log_level) _console_handler = logging.StreamHandler(sys.stdout) _console_handler.setLevel(_log_level) logging.basicConfig( level=_log_level, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", handlers=[_console_handler, _file_handler], ) # Externe Bibliotheken nicht auf DEBUG setzen logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("uvicorn.access").setLevel(logging.WARNING) logging.getLogger("apscheduler").setLevel(logging.INFO) logging.getLogger("aiosqlite").setLevel(logging.WARNING) logger = logging.getLogger("osint") logger.info(f"Monitor gestartet (DEV_MODE={DEV_MODE}, Log-Level={logging.getLevelName(_log_level)})") class WebSocketManager: """Verwaltet WebSocket-Verbindungen für Echtzeit-Updates.""" def __init__(self): self._connections: Dict[WebSocket, int] = {} # ws -> user_id async def connect(self, websocket: WebSocket, user_id: int): self._connections[websocket] = user_id logger.info(f"WebSocket verbunden (User {user_id}, {len(self._connections)} aktiv)") def disconnect(self, websocket: WebSocket): self._connections.pop(websocket, None) logger.info(f"WebSocket getrennt ({len(self._connections)} aktiv)") async def broadcast(self, message: dict): """Nachricht an alle verbundenen Clients senden.""" if not self._connections: return data = json.dumps(message, ensure_ascii=False) disconnected = [] for ws in self._connections: try: await ws.send_text(data) except Exception: disconnected.append(ws) for ws in disconnected: self._connections.pop(ws, None) async def broadcast_for_incident(self, message: dict, visibility: str, created_by: int, tenant_id: int = None): """Nachricht nur an berechtigte Clients senden (private Lagen → nur Ersteller).""" if not self._connections: return data = json.dumps(message, ensure_ascii=False) disconnected = [] for ws, user_id in self._connections.items(): if visibility == "private" and user_id != created_by: continue try: await ws.send_text(data) except Exception: disconnected.append(ws) for ws in disconnected: self._connections.pop(ws, None) ws_manager = WebSocketManager() # Scheduler für Auto-Refresh scheduler = AsyncIOScheduler() async def check_auto_refresh(): """Prüft welche Lagen einen Auto-Refresh brauchen (Slot-basiert).""" db = await get_db() try: cursor = await db.execute( "SELECT id, refresh_interval, refresh_start_time FROM incidents WHERE status = 'active' AND refresh_mode = 'auto'" ) incidents = await cursor.fetchall() now = datetime.now(TIMEZONE) for incident in incidents: incident_id = incident["id"] interval = incident["refresh_interval"] start_time_str = incident["refresh_start_time"] # Letzten abgeschlossenen oder laufenden Refresh pruefen cursor = await db.execute( "SELECT started_at, status FROM refresh_log WHERE incident_id = ? AND status IN ('completed', 'running') ORDER BY id DESC LIMIT 1", (incident_id,), ) last_refresh = await cursor.fetchone() # Laufenden Refresh ueberspringen if last_refresh and last_refresh["status"] == "running": logger.debug(f"Auto-Refresh Lage {incident_id}: uebersprungen (laeuft bereits)") continue should_refresh = False if not last_refresh: # Noch nie gelaufen -> sofort starten should_refresh = True logger.info(f"Auto-Refresh Lage {incident_id}: erster Refresh") elif start_time_str: # Slot-basierte Logik: Naechsten faelligen Slot berechnen try: start_h, start_m = map(int, start_time_str.split(":")) except (ValueError, AttributeError): logger.warning(f"Auto-Refresh Lage {incident_id}: ungueltiges Startzeit-Format '{start_time_str}'") continue last_time = datetime.fromisoformat(last_refresh["started_at"]) if last_time.tzinfo is None: last_time = last_time.replace(tzinfo=TIMEZONE) else: last_time = last_time.astimezone(TIMEZONE) # Anker: heute um start_time anchor_today = now.replace(hour=start_h, minute=start_m, second=0, microsecond=0) interval_td = timedelta(minutes=interval) if interval >= 1440: # Taeglicher oder laengerer Rhythmus days_interval = interval // 1440 # Letzter Slot der <= now ist current_slot = anchor_today if current_slot > now: current_slot -= timedelta(days=days_interval) # Sicherheitsschleife: weiter zurueck falls noetig while current_slot > now: current_slot -= timedelta(days=days_interval) else: # Untertaegig: Slots ab Anker im Intervall-Takt # Anker zurueck bis vor last_refresh ref_anchor = anchor_today while ref_anchor > last_time: ref_anchor -= interval_td # Von dort vorwaerts bis zum letzten Slot <= now current_slot = ref_anchor while current_slot + interval_td <= now: current_slot += interval_td if current_slot > last_time: should_refresh = True logger.info(f"Auto-Refresh Lage {incident_id}: Slot {current_slot.strftime('%H:%M')} faellig (letzter Refresh: {last_time.strftime('%Y-%m-%d %H:%M')})") else: logger.debug(f"Auto-Refresh Lage {incident_id}: kein faelliger Slot (letzter: {current_slot.strftime('%H:%M')})") else: # Fallback: altes Intervall-Verhalten (kein start_time gesetzt) last_time = datetime.fromisoformat(last_refresh["started_at"]) if last_time.tzinfo is None: last_time = last_time.replace(tzinfo=TIMEZONE) else: last_time = last_time.astimezone(TIMEZONE) elapsed = (now - last_time).total_seconds() / 60 if elapsed >= interval: should_refresh = True logger.info(f"Auto-Refresh Lage {incident_id}: {elapsed:.1f} Min seit letztem Refresh (Intervall: {interval} Min)") else: logger.debug(f"Auto-Refresh Lage {incident_id}: {elapsed:.1f}/{interval} Min — noch nicht faellig") if should_refresh: await orchestrator.enqueue_refresh(incident_id, trigger_type="auto") except Exception as e: logger.error(f"Auto-Refresh Check Fehler: {e}") finally: await db.close() async def daily_source_health_check(): """Täglicher Quellen-Health-Check + KI-Vorschläge.""" db = await get_db() try: result = await run_health_checks(db) logger.info(f"Täglicher Health-Check: {result['checked']} geprüft, {result['issues']} Probleme") suggestion_count = await generate_suggestions(db) logger.info(f"Tägliche Vorschläge: {suggestion_count} neue Vorschläge") except Exception as e: logger.error(f"Täglicher Health-Check Fehler: {e}", exc_info=True) finally: await db.close() async def cleanup_expired(): """Bereinigt abgelaufene Lagen basierend auf retention_days.""" db = await get_db() try: cursor = await db.execute( "SELECT id, retention_days, created_at FROM incidents WHERE retention_days > 0 AND status = 'active'" ) incidents = await cursor.fetchall() now = datetime.now(TIMEZONE) for incident in incidents: created = datetime.fromisoformat(incident["created_at"]) if created.tzinfo is None: created = created.replace(tzinfo=TIMEZONE) else: created = created.astimezone(TIMEZONE) age_days = (now - created).days if age_days >= incident["retention_days"]: await db.execute( "UPDATE incidents SET status = 'archived' WHERE id = ?", (incident["id"],), ) logger.info(f"Lage {incident['id']} archiviert (Aufbewahrung abgelaufen)") # Verwaiste running-Einträge bereinigen (> 15 Minuten ohne Abschluss) cursor = await db.execute( "SELECT id, incident_id, started_at FROM refresh_log WHERE status = 'running'" ) orphans = await cursor.fetchall() for orphan in orphans: started = datetime.fromisoformat(orphan["started_at"]) if started.tzinfo is None: started = started.replace(tzinfo=TIMEZONE) else: started = started.astimezone(TIMEZONE) age_minutes = (now - started).total_seconds() / 60 if age_minutes >= 15: await db.execute( "UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = ? WHERE id = ?", (now.strftime('%Y-%m-%d %H:%M:%S'), f"Verwaist (>{int(age_minutes)} Min ohne Abschluss, automatisch bereinigt)", orphan["id"]), ) logger.warning(f"Verwaisten Refresh #{orphan['id']} für Lage {orphan['incident_id']} bereinigt ({int(age_minutes)} Min)") # Alte Notifications bereinigen (> 7 Tage) await db.execute("DELETE FROM notifications WHERE created_at < datetime('now', '-7 days')") await db.commit() except Exception as e: logger.error(f"Cleanup Fehler: {e}") finally: await db.close() @asynccontextmanager async def lifespan(app: FastAPI): """App Startup/Shutdown.""" # Startup os.makedirs(DATA_DIR, exist_ok=True) await init_db() # Verwaiste running-Einträge beim Start bereinigen db = await get_db() try: result = await db.execute( "UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = 'Verwaist (Neustart, automatisch bereinigt)' WHERE status = 'running'", (datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'),), ) if result.rowcount > 0: await db.commit() logger.warning(f"{result.rowcount} verwaiste running-Einträge beim Start bereinigt") finally: await db.close() orchestrator.set_ws_manager(ws_manager) await orchestrator.start() scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh") scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup") scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health") scheduler.start() logger.info("OSINT Lagemonitor gestartet") yield # Shutdown scheduler.shutdown() await orchestrator.stop() logger.info("OSINT Lagemonitor gestoppt") app = FastAPI( title="OSINT Lagemonitor", version="1.0.0", lifespan=lifespan, ) # Request-Logging Middleware (nur im Dev-Modus) if DEV_MODE: _req_logger = logging.getLogger("osint.http") class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): start = _time.monotonic() response = await call_next(request) duration = (_time.monotonic() - start) * 1000 # Statische Dateien und WebSocket-Upgrades nicht loggen path = request.url.path if not path.startswith(("/static/", "/favicon")) and "websocket" not in str(request.headers.get("upgrade", "")): _req_logger.debug( f"{request.method} {path} -> {response.status_code} ({duration:.0f}ms)" ) return response # Security-Headers Middleware class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) response.headers["Content-Security-Policy"] = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; " "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://cdn.jsdelivr.net; " "font-src 'self' https://fonts.gstatic.com; " "img-src 'self' data: https://tile.openstreetmap.de; " "connect-src 'self' wss: ws:; " "frame-ancestors 'none'" ) response.headers["Permissions-Policy"] = ( "camera=(), microphone=(), geolocation=(), payment=()" ) return response app.add_middleware(SecurityHeadersMiddleware) if DEV_MODE: app.add_middleware(RequestLoggingMiddleware) # CORS app.add_middleware( CORSMiddleware, allow_origins=["https://monitor.aegis-sight.de"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], ) # Router einbinden from routers.auth import router as auth_router from routers.incidents import router as incidents_router from routers.sources import router as sources_router from routers.notifications import router as notifications_router from routers.feedback import router as feedback_router from routers.public_api import router as public_api_router from routers.chat import router as chat_router from routers.tutorial import router as tutorial_router from routes.version_router import router as version_router app.include_router(auth_router) app.include_router(incidents_router) app.include_router(sources_router) app.include_router(notifications_router) app.include_router(feedback_router) app.include_router(public_api_router) app.include_router(chat_router, prefix="/api/chat") app.include_router(tutorial_router) app.include_router(version_router) @app.websocket("/api/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket-Endpunkt für Echtzeit-Updates.""" await websocket.accept() # Token als erste Nachricht empfangen (nicht in URL) try: token = await asyncio.wait_for(websocket.receive_text(), timeout=5.0) except asyncio.TimeoutError: try: await websocket.close(code=4001, reason="Token fehlt") except Exception: pass return except WebSocketDisconnect: return try: payload = decode_token(token) user_id = int(payload["sub"]) except Exception: try: await websocket.close(code=4001, reason="Token ungültig") except Exception: pass return # Authentifizierung erfolgreich await ws_manager.connect(websocket, user_id) await websocket.send_text("authenticated") try: while True: data = await websocket.receive_text() if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: ws_manager.disconnect(websocket) # Statische Dateien und Frontend-Routing app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") @app.get("/") async def root(): """Login-Seite ausliefern.""" return FileResponse(os.path.join(STATIC_DIR, "index.html")) @app.get("/dashboard") async def dashboard(): """Dashboard ausliefern.""" return FileResponse(os.path.join(STATIC_DIR, "dashboard.html")) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8891)