Inkonsistenz behoben: Manche Timestamps wurden in UTC, andere in Berlin-Zeit gespeichert. Das fuehrte zu Fehlern beim Auto-Refresh und Faktencheck, da Zeitvergleiche falsche Ergebnisse lieferten. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
334 Zeilen
11 KiB
Python
334 Zeilen
11 KiB
Python
"""OSINT Lagemonitor - Hauptanwendung."""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
from typing import Dict
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse, RedirectResponse
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
|
|
from config import STATIC_DIR, LOG_DIR, DATA_DIR, TIMEZONE
|
|
from database import init_db, get_db
|
|
from auth import decode_token
|
|
from agents.orchestrator import orchestrator
|
|
|
|
# Logging
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler(os.path.join(LOG_DIR, "osint-monitor.log")),
|
|
],
|
|
)
|
|
logger = logging.getLogger("osint")
|
|
|
|
|
|
class WebSocketManager:
|
|
"""Verwaltet WebSocket-Verbindungen für Echtzeit-Updates."""
|
|
|
|
def __init__(self):
|
|
self._connections: Dict[WebSocket, int] = {} # ws -> user_id
|
|
|
|
async def connect(self, websocket: WebSocket, user_id: int):
|
|
self._connections[websocket] = user_id
|
|
logger.info(f"WebSocket verbunden (User {user_id}, {len(self._connections)} aktiv)")
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
self._connections.pop(websocket, None)
|
|
logger.info(f"WebSocket getrennt ({len(self._connections)} aktiv)")
|
|
|
|
async def broadcast(self, message: dict):
|
|
"""Nachricht an alle verbundenen Clients senden."""
|
|
if not self._connections:
|
|
return
|
|
data = json.dumps(message, ensure_ascii=False)
|
|
disconnected = []
|
|
for ws in self._connections:
|
|
try:
|
|
await ws.send_text(data)
|
|
except Exception:
|
|
disconnected.append(ws)
|
|
for ws in disconnected:
|
|
self._connections.pop(ws, None)
|
|
|
|
async def broadcast_for_incident(self, message: dict, visibility: str, created_by: int, tenant_id: int = None):
|
|
"""Nachricht nur an berechtigte Clients senden (private Lagen → nur Ersteller)."""
|
|
if not self._connections:
|
|
return
|
|
data = json.dumps(message, ensure_ascii=False)
|
|
disconnected = []
|
|
for ws, user_id in self._connections.items():
|
|
if visibility == "private" and user_id != created_by:
|
|
continue
|
|
try:
|
|
await ws.send_text(data)
|
|
except Exception:
|
|
disconnected.append(ws)
|
|
for ws in disconnected:
|
|
self._connections.pop(ws, None)
|
|
|
|
|
|
ws_manager = WebSocketManager()
|
|
|
|
# Scheduler für Auto-Refresh
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
|
|
async def check_auto_refresh():
|
|
"""Prüft welche Lagen einen Auto-Refresh brauchen."""
|
|
db = await get_db()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT id, refresh_interval FROM incidents WHERE status = 'active' AND refresh_mode = 'auto'"
|
|
)
|
|
incidents = await cursor.fetchall()
|
|
|
|
now = datetime.now(TIMEZONE)
|
|
|
|
for incident in incidents:
|
|
incident_id = incident["id"]
|
|
interval = incident["refresh_interval"]
|
|
|
|
# Nur letzten AUTO-Refresh prüfen (manuelle Refreshs ignorieren)
|
|
cursor = await db.execute(
|
|
"SELECT completed_at FROM refresh_log WHERE incident_id = ? AND trigger_type = 'auto' AND status = 'completed' ORDER BY completed_at DESC LIMIT 1",
|
|
(incident_id,),
|
|
)
|
|
last_refresh = await cursor.fetchone()
|
|
|
|
should_refresh = False
|
|
if not last_refresh:
|
|
should_refresh = True
|
|
else:
|
|
last_time = datetime.fromisoformat(last_refresh["completed_at"])
|
|
if last_time.tzinfo is None:
|
|
last_time = last_time.replace(tzinfo=TIMEZONE)
|
|
elapsed = (now - last_time).total_seconds() / 60
|
|
if elapsed >= interval:
|
|
should_refresh = True
|
|
|
|
if should_refresh:
|
|
# Prüfen ob bereits ein laufender Refresh existiert
|
|
cursor = await db.execute(
|
|
"SELECT id FROM refresh_log WHERE incident_id = ? AND status = 'running' LIMIT 1",
|
|
(incident_id,),
|
|
)
|
|
if await cursor.fetchone():
|
|
continue # Laufender Refresh — überspringen
|
|
|
|
await orchestrator.enqueue_refresh(incident_id, trigger_type="auto")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Auto-Refresh Check Fehler: {e}")
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def cleanup_expired():
|
|
"""Bereinigt abgelaufene Lagen basierend auf retention_days."""
|
|
db = await get_db()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT id, retention_days, created_at FROM incidents WHERE retention_days > 0 AND status = 'active'"
|
|
)
|
|
incidents = await cursor.fetchall()
|
|
|
|
now = datetime.now(TIMEZONE)
|
|
for incident in incidents:
|
|
created = datetime.fromisoformat(incident["created_at"])
|
|
if created.tzinfo is None:
|
|
created = created.replace(tzinfo=TIMEZONE)
|
|
age_days = (now - created).days
|
|
if age_days >= incident["retention_days"]:
|
|
await db.execute(
|
|
"UPDATE incidents SET status = 'archived' WHERE id = ?",
|
|
(incident["id"],),
|
|
)
|
|
logger.info(f"Lage {incident['id']} archiviert (Aufbewahrung abgelaufen)")
|
|
|
|
# Verwaiste running-Einträge bereinigen (> 15 Minuten ohne Abschluss)
|
|
cursor = await db.execute(
|
|
"SELECT id, incident_id, started_at FROM refresh_log WHERE status = 'running'"
|
|
)
|
|
orphans = await cursor.fetchall()
|
|
for orphan in orphans:
|
|
started = datetime.fromisoformat(orphan["started_at"])
|
|
if started.tzinfo is None:
|
|
started = started.replace(tzinfo=TIMEZONE)
|
|
age_minutes = (now - started).total_seconds() / 60
|
|
if age_minutes >= 15:
|
|
await db.execute(
|
|
"UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = ? WHERE id = ?",
|
|
(now.isoformat(), f"Verwaist (>{int(age_minutes)} Min ohne Abschluss, automatisch bereinigt)", orphan["id"]),
|
|
)
|
|
logger.warning(f"Verwaisten Refresh #{orphan['id']} für Lage {orphan['incident_id']} bereinigt ({int(age_minutes)} Min)")
|
|
|
|
# Alte Notifications bereinigen (> 7 Tage)
|
|
await db.execute("DELETE FROM notifications WHERE created_at < datetime('now', '-7 days')")
|
|
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.error(f"Cleanup Fehler: {e}")
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""App Startup/Shutdown."""
|
|
# Startup
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
await init_db()
|
|
|
|
# Verwaiste running-Einträge beim Start bereinigen
|
|
db = await get_db()
|
|
try:
|
|
result = await db.execute(
|
|
"UPDATE refresh_log SET status = 'error', completed_at = ?, error_message = 'Verwaist (Neustart, automatisch bereinigt)' WHERE status = 'running'",
|
|
(datetime.now(TIMEZONE).isoformat(),),
|
|
)
|
|
if result.rowcount > 0:
|
|
await db.commit()
|
|
logger.warning(f"{result.rowcount} verwaiste running-Einträge beim Start bereinigt")
|
|
finally:
|
|
await db.close()
|
|
|
|
orchestrator.set_ws_manager(ws_manager)
|
|
await orchestrator.start()
|
|
|
|
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
|
|
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
|
|
scheduler.start()
|
|
|
|
logger.info("OSINT Lagemonitor gestartet")
|
|
yield
|
|
|
|
# Shutdown
|
|
scheduler.shutdown()
|
|
await orchestrator.stop()
|
|
logger.info("OSINT Lagemonitor gestoppt")
|
|
|
|
|
|
app = FastAPI(
|
|
title="OSINT Lagemonitor",
|
|
version="1.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
|
|
# Security-Headers Middleware
|
|
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request: Request, call_next):
|
|
response = await call_next(request)
|
|
response.headers["Content-Security-Policy"] = (
|
|
"default-src 'self'; "
|
|
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
|
|
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://cdn.jsdelivr.net; "
|
|
"font-src 'self' https://fonts.gstatic.com; "
|
|
"img-src 'self' data: https://tile.openstreetmap.de; "
|
|
"connect-src 'self' wss: ws:; "
|
|
"frame-ancestors 'none'"
|
|
)
|
|
response.headers["Permissions-Policy"] = (
|
|
"camera=(), microphone=(), geolocation=(), payment=()"
|
|
)
|
|
return response
|
|
|
|
|
|
app.add_middleware(SecurityHeadersMiddleware)
|
|
|
|
# CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["https://osint.intelsight.de"],
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "PUT", "DELETE"],
|
|
allow_headers=["Authorization", "Content-Type"],
|
|
)
|
|
|
|
# Router einbinden
|
|
from routers.auth import router as auth_router
|
|
from routers.incidents import router as incidents_router
|
|
from routers.sources import router as sources_router
|
|
from routers.notifications import router as notifications_router
|
|
from routers.feedback import router as feedback_router
|
|
from routers.public_api import router as public_api_router
|
|
|
|
app.include_router(auth_router)
|
|
app.include_router(incidents_router)
|
|
app.include_router(sources_router)
|
|
app.include_router(notifications_router)
|
|
app.include_router(feedback_router)
|
|
app.include_router(public_api_router)
|
|
|
|
|
|
@app.websocket("/api/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
"""WebSocket-Endpunkt für Echtzeit-Updates."""
|
|
await websocket.accept()
|
|
|
|
# Token als erste Nachricht empfangen (nicht in URL)
|
|
try:
|
|
token = await asyncio.wait_for(websocket.receive_text(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
try:
|
|
await websocket.close(code=4001, reason="Token fehlt")
|
|
except Exception:
|
|
pass
|
|
return
|
|
except WebSocketDisconnect:
|
|
return
|
|
|
|
try:
|
|
payload = decode_token(token)
|
|
user_id = int(payload["sub"])
|
|
except Exception:
|
|
try:
|
|
await websocket.close(code=4001, reason="Token ungültig")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Authentifizierung erfolgreich
|
|
await ws_manager.connect(websocket, user_id)
|
|
await websocket.send_text("authenticated")
|
|
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_text()
|
|
if data == "ping":
|
|
await websocket.send_text("pong")
|
|
except WebSocketDisconnect:
|
|
ws_manager.disconnect(websocket)
|
|
|
|
|
|
# Statische Dateien und Frontend-Routing
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Login-Seite ausliefern."""
|
|
return FileResponse(os.path.join(STATIC_DIR, "index.html"))
|
|
|
|
|
|
@app.get("/dashboard")
|
|
async def dashboard():
|
|
"""Dashboard ausliefern."""
|
|
return FileResponse(os.path.join(STATIC_DIR, "dashboard.html"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="127.0.0.1", port=8891)
|