Dateien
AegisSight-Monitor/src/routers/auth.py
Claude Dev c22ae854fe feat: Global-Admin Org-Switcher fuer info@aegis-sight.de
Ermoeglicht dem Global Admin (is_global_admin Flag) zwischen
Organisationen zu wechseln. Neue Endpoints: GET /api/auth/organizations,
POST /api/auth/switch-org. Org-Dropdown im Header-Menue, nur fuer
Global Admin sichtbar. Komplett herausnehmbar (Flag + Code-Bloecke).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 22:25:41 +02:00

279 Zeilen
9.6 KiB
Python

"""Auth-Router: Magic-Link-Login und Nutzerverwaltung."""
import logging
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Request, status
from models import (
MagicLinkRequest,
MagicLinkResponse,
VerifyTokenRequest,
TokenResponse,
UserMeResponse,
)
from auth import (
create_token,
get_current_user,
generate_magic_token,
)
from database import db_dependency
from config import TIMEZONE, MAGIC_LINK_EXPIRE_MINUTES, MAGIC_LINK_BASE_URL
from email_utils.sender import send_email
from email_utils.templates import magic_link_login_email
from email_utils.rate_limiter import magic_link_limiter
import aiosqlite
logger = logging.getLogger("osint.auth")
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/magic-link", response_model=MagicLinkResponse)
async def request_magic_link(
data: MagicLinkRequest,
request: Request,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic Link anfordern. Sendet E-Mail mit Link."""
email = data.email.lower().strip()
ip = request.client.host if request.client else "unknown"
# Rate-Limit prüfen
allowed, reason = magic_link_limiter.check(email, ip)
if not allowed:
logger.warning(f"Rate-Limit für {email} von {ip}: {reason}")
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Nutzer suchen
cursor = await db.execute(
"""SELECT u.id, u.email, u.username, u.role, u.organization_id, u.is_active,
o.is_active as org_active, o.slug as org_slug
FROM users u
JOIN organizations o ON o.id = u.organization_id
WHERE LOWER(u.email) = ?""",
(email,),
)
user = await cursor.fetchone()
if not user:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
if not user["is_active"]:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
if not user["org_active"]:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Lizenz prüfen
from services.license_service import check_license
lic = await check_license(db, user["organization_id"])
if lic.get("status") == "org_disabled":
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Token generieren
token = generate_magic_token()
expires_at = (datetime.now(TIMEZONE) + timedelta(minutes=MAGIC_LINK_EXPIRE_MINUTES)).strftime('%Y-%m-%d %H:%M:%S')
# Alte ungenutzte Magic Links für diese E-Mail invalidieren
await db.execute(
"UPDATE magic_links SET is_used = 1 WHERE email = ? AND is_used = 0",
(email,),
)
# Neuen Magic Link speichern
await db.execute(
"""INSERT INTO magic_links (email, token, code, purpose, user_id, expires_at, ip_address)
VALUES (?, ?, '', 'login', ?, ?, ?)""",
(email, token, user["id"], expires_at, ip),
)
await db.commit()
# E-Mail senden
link = f"{MAGIC_LINK_BASE_URL}/?token={token}"
subject, html = magic_link_login_email(user["email"].split("@")[0], link)
await send_email(email, subject, html)
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
@router.post("/verify", response_model=TokenResponse)
async def verify_magic_link(
data: VerifyTokenRequest,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic Link verifizieren (Token aus URL)."""
cursor = await db.execute(
"""SELECT ml.*, u.username, u.email, u.role, u.organization_id, u.is_active,
o.slug as org_slug, o.is_active as org_active
FROM magic_links ml
JOIN users u ON u.id = ml.user_id
JOIN organizations o ON o.id = u.organization_id
WHERE ml.token = ? AND ml.is_used = 0""",
(data.token,),
)
ml = await cursor.fetchone()
if not ml:
raise HTTPException(status_code=400, detail="Ungültiger oder bereits verwendeter Link")
# Ablauf prüfen
now = datetime.now(TIMEZONE)
expires = datetime.fromisoformat(ml["expires_at"])
if expires.tzinfo is None:
expires = expires.replace(tzinfo=TIMEZONE)
if now > expires:
raise HTTPException(status_code=400, detail="Link abgelaufen. Bitte neuen Link anfordern.")
if not ml["is_active"] or not ml["org_active"]:
raise HTTPException(status_code=403, detail="Konto oder Organisation deaktiviert")
# Magic Link als verwendet markieren
await db.execute("UPDATE magic_links SET is_used = 1 WHERE id = ?", (ml["id"],))
# Letzten Login aktualisieren
await db.execute(
"UPDATE users SET last_login_at = ? WHERE id = ?",
(now.isoformat(), ml["user_id"]),
)
await db.commit()
# Global-Admin-Flag aus DB lesen
ga_cursor = await db.execute(
"SELECT is_global_admin FROM users WHERE id = ?", (ml["user_id"],)
)
ga_row = await ga_cursor.fetchone()
_is_global_admin = bool(ga_row["is_global_admin"]) if ga_row else False
# JWT erstellen
token = create_token(
user_id=ml["user_id"],
username=ml["username"],
email=ml["email"],
role=ml["role"],
tenant_id=ml["organization_id"],
org_slug=ml["org_slug"],
is_global_admin=_is_global_admin,
)
return TokenResponse(
access_token=token,
username=ml["username"],
)
@router.get("/me", response_model=UserMeResponse)
async def get_me(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aktuellen Nutzer mit Org-Info abfragen."""
# Org-Name laden
org_name = ""
if current_user.get("tenant_id"):
cursor = await db.execute(
"SELECT name FROM organizations WHERE id = ?",
(current_user["tenant_id"],),
)
org = await cursor.fetchone()
if org:
org_name = org["name"]
# Lizenzstatus laden
license_info = {}
if current_user.get("tenant_id"):
from services.license_service import check_license
license_info = await check_license(db, current_user["tenant_id"])
# Credits-Daten laden
credits_total = None
credits_remaining = None
credits_percent_used = None
if current_user.get("tenant_id"):
lic_cursor = await db.execute(
"SELECT credits_total, credits_used, cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
(current_user["tenant_id"],))
lic_row = await lic_cursor.fetchone()
if lic_row and lic_row["credits_total"]:
credits_total = lic_row["credits_total"]
credits_used = lic_row["credits_used"] or 0
credits_remaining = max(0, int(credits_total - credits_used))
credits_percent_used = round(min(100, (credits_used / credits_total) * 100), 1) if credits_total > 0 else 0
return UserMeResponse(
id=current_user["id"],
username=current_user["username"],
email=current_user.get("email", ""),
credits_total=credits_total,
credits_remaining=credits_remaining,
credits_percent_used=credits_percent_used,
role=current_user["role"],
org_name=org_name,
org_slug=current_user.get("org_slug", ""),
tenant_id=current_user.get("tenant_id"),
license_status=license_info.get("status", "unknown"),
license_type=license_info.get("license_type", ""),
read_only=license_info.get("read_only", False),
is_global_admin=current_user.get("is_global_admin", False),
)
# --- Global Admin: Org-Wechsel (herausnehmbar) ---
from models import SwitchOrgRequest, OrgListItem
@router.get("/organizations")
async def list_all_organizations(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Organisationen auflisten (nur fuer Global Admin)."""
if not current_user.get("is_global_admin"):
raise HTTPException(status_code=403, detail="Keine Berechtigung")
cursor = await db.execute(
"SELECT id, name, slug, is_active FROM organizations ORDER BY name"
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.post("/switch-org")
async def switch_organization(
data: SwitchOrgRequest,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Organisation wechseln (nur fuer Global Admin). Gibt neues JWT zurueck."""
if not current_user.get("is_global_admin"):
raise HTTPException(status_code=403, detail="Keine Berechtigung")
# Ziel-Org pruefen
cursor = await db.execute(
"SELECT id, name, slug FROM organizations WHERE id = ?", (data.organization_id,)
)
org = await cursor.fetchone()
if not org:
raise HTTPException(status_code=404, detail="Organisation nicht gefunden")
# Neues JWT mit anderem tenant_id ausstellen
token = create_token(
user_id=current_user["id"],
username=current_user["username"],
email=current_user["email"],
role=current_user["role"],
tenant_id=org["id"],
org_slug=org["slug"],
is_global_admin=True,
)
return {
"access_token": token,
"token_type": "bearer",
"org_name": org["name"],
"org_slug": org["slug"],
}