Dateien
AegisSight-Monitor/src/routers/auth.py
Claude Dev 7bfa1d29cf feat: Credits-System mit Verbrauchsanzeige im User-Dropdown
- DB-Migration: credits_total/credits_used/cost_per_credit auf licenses, token_usage_monthly Tabelle
- Orchestrator: Monatliche Token-Aggregation + Credits-Abzug nach Refresh
- Auth: Credits-Daten im /me Endpoint + Bugfix fehlende Klammer in get()
- Frontend: Credits-Balken im User-Dropdown mit Farbwechsel

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 23:53:19 +01:00

294 Zeilen
10 KiB
Python

"""Auth-Router: Magic-Link-Login und Nutzerverwaltung."""
import logging
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Request, status
from models import (
MagicLinkRequest,
MagicLinkResponse,
VerifyTokenRequest,
VerifyCodeRequest,
TokenResponse,
UserMeResponse,
)
from auth import (
create_token,
get_current_user,
generate_magic_token,
generate_magic_code,
)
from database import db_dependency
from config import TIMEZONE, MAGIC_LINK_EXPIRE_MINUTES, MAGIC_LINK_BASE_URL
from email_utils.sender import send_email
from email_utils.templates import magic_link_login_email
from email_utils.rate_limiter import magic_link_limiter, verify_code_limiter
import aiosqlite
logger = logging.getLogger("osint.auth")
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/magic-link", response_model=MagicLinkResponse)
async def request_magic_link(
data: MagicLinkRequest,
request: Request,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic Link anfordern. Sendet E-Mail mit Link + Code."""
email = data.email.lower().strip()
ip = request.client.host if request.client else "unknown"
# Rate-Limit pruefen
allowed, reason = magic_link_limiter.check(email, ip)
if not allowed:
logger.warning(f"Rate-Limit fuer {email} von {ip}: {reason}")
# Trotzdem 200 zurueckgeben (kein Information-Leak)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Nutzer suchen
cursor = await db.execute(
"""SELECT u.id, u.email, u.username, u.role, u.organization_id, u.is_active,
o.is_active as org_active, o.slug as org_slug
FROM users u
JOIN organizations o ON o.id = u.organization_id
WHERE LOWER(u.email) = ?""",
(email,),
)
user = await cursor.fetchone()
if not user:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
if not user["is_active"]:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
if not user["org_active"]:
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Lizenz pruefen
from services.license_service import check_license
lic = await check_license(db, user["organization_id"])
if lic.get("status") == "org_disabled":
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
# Token + Code generieren
token = generate_magic_token()
code = generate_magic_code()
expires_at = (datetime.now(TIMEZONE) + timedelta(minutes=MAGIC_LINK_EXPIRE_MINUTES)).strftime('%Y-%m-%d %H:%M:%S')
# Alte ungenutzte Magic Links fuer diese E-Mail invalidieren
await db.execute(
"UPDATE magic_links SET is_used = 1 WHERE email = ? AND is_used = 0",
(email,),
)
# Neuen Magic Link speichern
await db.execute(
"""INSERT INTO magic_links (email, token, code, purpose, user_id, expires_at, ip_address)
VALUES (?, ?, ?, 'login', ?, ?, ?)""",
(email, token, code, user["id"], expires_at, ip),
)
await db.commit()
# E-Mail senden
link = f"{MAGIC_LINK_BASE_URL}/?token={token}"
subject, html = magic_link_login_email(user["email"].split("@")[0], code, link)
await send_email(email, subject, html)
magic_link_limiter.record(email, ip)
return MagicLinkResponse(message="Wenn ein Konto existiert, wurde eine E-Mail gesendet.")
@router.post("/verify", response_model=TokenResponse)
async def verify_magic_link(
data: VerifyTokenRequest,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic Link verifizieren (Token aus URL)."""
cursor = await db.execute(
"""SELECT ml.*, u.username, u.email, u.role, u.organization_id, u.is_active,
o.slug as org_slug, o.is_active as org_active
FROM magic_links ml
JOIN users u ON u.id = ml.user_id
JOIN organizations o ON o.id = u.organization_id
WHERE ml.token = ? AND ml.is_used = 0""",
(data.token,),
)
ml = await cursor.fetchone()
if not ml:
raise HTTPException(status_code=400, detail="Ungueltiger oder bereits verwendeter Link")
# Ablauf pruefen
now = datetime.now(TIMEZONE)
expires = datetime.fromisoformat(ml["expires_at"])
if expires.tzinfo is None:
expires = expires.replace(tzinfo=TIMEZONE)
if now > expires:
raise HTTPException(status_code=400, detail="Link abgelaufen. Bitte neuen Link anfordern.")
if not ml["is_active"] or not ml["org_active"]:
raise HTTPException(status_code=403, detail="Konto oder Organisation deaktiviert")
# Magic Link als verwendet markieren
await db.execute("UPDATE magic_links SET is_used = 1 WHERE id = ?", (ml["id"],))
# Letzten Login aktualisieren
await db.execute(
"UPDATE users SET last_login_at = ? WHERE id = ?",
(now.isoformat(), ml["user_id"]),
)
await db.commit()
# JWT erstellen
token = create_token(
user_id=ml["user_id"],
username=ml["username"],
email=ml["email"],
role=ml["role"],
tenant_id=ml["organization_id"],
org_slug=ml["org_slug"],
)
return TokenResponse(
access_token=token,
username=ml["username"],
)
@router.post("/verify-code", response_model=TokenResponse)
async def verify_magic_code(
data: VerifyCodeRequest,
request: Request,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic Code verifizieren (6-stelliger Code + E-Mail)."""
email = data.email.lower().strip()
ip = request.client.host if request.client else "unknown"
# Brute-Force-Schutz: Fehlversuche pruefen
allowed, reason = verify_code_limiter.check(email, ip)
if not allowed:
logger.warning(f"Verify-Code Rate-Limit fuer {email} von {ip}: {reason}")
# Bei Sperre alle offenen Magic Links fuer diese E-Mail invalidieren
await db.execute(
"UPDATE magic_links SET is_used = 1 WHERE email = ? AND is_used = 0",
(email,),
)
await db.commit()
raise HTTPException(status_code=429, detail=reason)
cursor = await db.execute(
"""SELECT ml.*, u.username, u.email as user_email, u.role, u.organization_id, u.is_active,
o.slug as org_slug, o.is_active as org_active
FROM magic_links ml
JOIN users u ON u.id = ml.user_id
JOIN organizations o ON o.id = u.organization_id
WHERE LOWER(ml.email) = ? AND ml.code = ? AND ml.is_used = 0
ORDER BY ml.created_at DESC LIMIT 1""",
(email, data.code),
)
ml = await cursor.fetchone()
if not ml:
verify_code_limiter.record_failure(email, ip)
logger.warning(f"Fehlgeschlagener Code-Versuch fuer {email} von {ip}")
raise HTTPException(status_code=400, detail="Ungueltiger Code")
# Ablauf pruefen
now = datetime.now(TIMEZONE)
expires = datetime.fromisoformat(ml["expires_at"])
if expires.tzinfo is None:
expires = expires.replace(tzinfo=TIMEZONE)
if now > expires:
raise HTTPException(status_code=400, detail="Code abgelaufen. Bitte neuen Code anfordern.")
if not ml["is_active"] or not ml["org_active"]:
raise HTTPException(status_code=403, detail="Konto oder Organisation deaktiviert")
# Magic Link als verwendet markieren
await db.execute("UPDATE magic_links SET is_used = 1 WHERE id = ?", (ml["id"],))
# Letzten Login aktualisieren
await db.execute(
"UPDATE users SET last_login_at = ? WHERE id = ?",
(now.isoformat(), ml["user_id"]),
)
await db.commit()
# Fehlversuche-Zaehler nach Erfolg zuruecksetzen
verify_code_limiter.clear(email)
token = create_token(
user_id=ml["user_id"],
username=ml["username"],
email=ml["user_email"],
role=ml["role"],
tenant_id=ml["organization_id"],
org_slug=ml["org_slug"],
)
return TokenResponse(
access_token=token,
username=ml["username"],
)
@router.get("/me", response_model=UserMeResponse)
async def get_me(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aktuellen Nutzer mit Org-Info abfragen."""
# Org-Name laden
org_name = ""
if current_user.get("tenant_id"):
cursor = await db.execute(
"SELECT name FROM organizations WHERE id = ?",
(current_user["tenant_id"],),
)
org = await cursor.fetchone()
if org:
org_name = org["name"]
# Lizenzstatus laden
license_info = {}
if current_user.get("tenant_id"):
from services.license_service import check_license
license_info = await check_license(db, current_user["tenant_id"])
# Credits-Daten laden
credits_total = None
credits_remaining = None
credits_percent_used = None
if current_user.get("tenant_id"):
lic_cursor = await db.execute(
"SELECT credits_total, credits_used, cost_per_credit FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY id DESC LIMIT 1",
(current_user["tenant_id"],))
lic_row = await lic_cursor.fetchone()
if lic_row and lic_row["credits_total"]:
credits_total = lic_row["credits_total"]
credits_used = lic_row["credits_used"] or 0
credits_remaining = max(0, int(credits_total - credits_used))
credits_percent_used = round(min(100, (credits_used / credits_total) * 100), 1) if credits_total > 0 else 0
return UserMeResponse(
id=current_user["id"],
username=current_user["username"],
email=current_user.get("email", ""),
credits_total=credits_total,
credits_remaining=credits_remaining,
credits_percent_used=credits_percent_used,
role=current_user["role"],
org_name=org_name,
org_slug=current_user.get("org_slug", ""),
tenant_id=current_user.get("tenant_id"),
license_status=license_info.get("status", "unknown"),
license_type=license_info.get("license_type", ""),
read_only=license_info.get("read_only", False),
)