Dateien
AegisSight-Monitor/src/auth.py
claude-dev a69352575d Fix: Komplett auf Europe/Berlin + DB-Migration + Timer-Fix
- ALLE Timestamps einheitlich Europe/Berlin (kein UTC mehr)
- DB-Migration: 1704 bestehende Timestamps von UTC nach Berlin konvertiert
- Auto-Refresh Timer Fix: ORDER BY id DESC statt completed_at DESC
  (verhindert falsche Sortierung bei gemischten Timestamp-Formaten)
- started_at statt completed_at fuer Timer-Vergleich (konsistenter)
- Manuelle Refreshes werden bei Intervall-Pruefung beruecksichtigt
- Debug-Logging fuer Auto-Refresh Entscheidungen
- astimezone() fuer Timestamps mit Offset-Info

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 02:56:51 +01:00

84 Zeilen
2.3 KiB
Python

"""JWT-Authentifizierung mit Magic-Link-Support und Multi-Tenancy."""
import secrets
import string
from datetime import datetime, timedelta
from jose import jwt, JWTError
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRE_HOURS, TIMEZONE
security = HTTPBearer()
JWT_ISSUER = "intelsight-osint"
JWT_AUDIENCE = "intelsight-osint"
def create_token(
user_id: int,
username: str,
email: str,
role: str = "member",
tenant_id: int = None,
org_slug: str = None,
) -> str:
"""JWT-Token erstellen mit Tenant-Kontext."""
now = datetime.now(TIMEZONE)
expire = now + timedelta(hours=JWT_EXPIRE_HOURS)
payload = {
"sub": str(user_id),
"username": username,
"email": email,
"role": role,
"tenant_id": tenant_id,
"org_slug": org_slug,
"iss": JWT_ISSUER,
"aud": JWT_AUDIENCE,
"iat": now,
"exp": expire,
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token: str) -> dict:
"""JWT-Token dekodieren und validieren."""
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
issuer=JWT_ISSUER,
audience=JWT_AUDIENCE,
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token ungueltig oder abgelaufen",
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""FastAPI Dependency: Aktuellen Nutzer aus Token extrahieren."""
payload = decode_token(credentials.credentials)
return {
"id": int(payload["sub"]),
"username": payload["username"],
"email": payload.get("email", ""),
"role": payload.get("role", "member"),
"tenant_id": payload.get("tenant_id"),
"org_slug": payload.get("org_slug"),
}
def generate_magic_token() -> str:
"""Generiert einen 64-Zeichen URL-safe Token."""
return secrets.token_urlsafe(48)
def generate_magic_code() -> str:
"""Generiert einen 6-stelligen numerischen Code."""
return ''.join(secrets.choice(string.digits) for _ in range(6))