- ALLE Timestamps einheitlich Europe/Berlin (kein UTC mehr) - DB-Migration: 1704 bestehende Timestamps von UTC nach Berlin konvertiert - Auto-Refresh Timer Fix: ORDER BY id DESC statt completed_at DESC (verhindert falsche Sortierung bei gemischten Timestamp-Formaten) - started_at statt completed_at fuer Timer-Vergleich (konsistenter) - Manuelle Refreshes werden bei Intervall-Pruefung beruecksichtigt - Debug-Logging fuer Auto-Refresh Entscheidungen - astimezone() fuer Timestamps mit Offset-Info Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
84 Zeilen
2.3 KiB
Python
84 Zeilen
2.3 KiB
Python
"""JWT-Authentifizierung mit Magic-Link-Support und Multi-Tenancy."""
|
|
import secrets
|
|
import string
|
|
from datetime import datetime, timedelta
|
|
from jose import jwt, JWTError
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRE_HOURS, TIMEZONE
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
JWT_ISSUER = "intelsight-osint"
|
|
JWT_AUDIENCE = "intelsight-osint"
|
|
|
|
|
|
def create_token(
|
|
user_id: int,
|
|
username: str,
|
|
email: str,
|
|
role: str = "member",
|
|
tenant_id: int = None,
|
|
org_slug: str = None,
|
|
) -> str:
|
|
"""JWT-Token erstellen mit Tenant-Kontext."""
|
|
now = datetime.now(TIMEZONE)
|
|
expire = now + timedelta(hours=JWT_EXPIRE_HOURS)
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"username": username,
|
|
"email": email,
|
|
"role": role,
|
|
"tenant_id": tenant_id,
|
|
"org_slug": org_slug,
|
|
"iss": JWT_ISSUER,
|
|
"aud": JWT_AUDIENCE,
|
|
"iat": now,
|
|
"exp": expire,
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
|
|
def decode_token(token: str) -> dict:
|
|
"""JWT-Token dekodieren und validieren."""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
JWT_SECRET,
|
|
algorithms=[JWT_ALGORITHM],
|
|
issuer=JWT_ISSUER,
|
|
audience=JWT_AUDIENCE,
|
|
)
|
|
return payload
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token ungueltig oder abgelaufen",
|
|
)
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> dict:
|
|
"""FastAPI Dependency: Aktuellen Nutzer aus Token extrahieren."""
|
|
payload = decode_token(credentials.credentials)
|
|
return {
|
|
"id": int(payload["sub"]),
|
|
"username": payload["username"],
|
|
"email": payload.get("email", ""),
|
|
"role": payload.get("role", "member"),
|
|
"tenant_id": payload.get("tenant_id"),
|
|
"org_slug": payload.get("org_slug"),
|
|
}
|
|
|
|
|
|
def generate_magic_token() -> str:
|
|
"""Generiert einen 64-Zeichen URL-safe Token."""
|
|
return secrets.token_urlsafe(48)
|
|
|
|
|
|
def generate_magic_code() -> str:
|
|
"""Generiert einen 6-stelligen numerischen Code."""
|
|
return ''.join(secrets.choice(string.digits) for _ in range(6))
|