Auth: Verwaltung auf Magic-Link umstellen (Passwort-Login entfernt)

Backend:
- src/routers/auth.py NEU: POST /api/auth/magic-link + POST /api/auth/verify
- src/auth.py: verify_password/hash_password raus, generate_magic_token rein
- src/main.py: alter Login-Endpoint + Brute-Force-Logik raus, neuer auth-Router eingebunden
- src/config.py: ALLOWED_EMAIL + PORTAL_MAGIC_LINK_* hinzu
- src/models.py: LoginRequest raus, MagicLinkRequest etc. rein
- src/email_utils/templates.py: portal_magic_link_email Template

Frontend:
- src/static/index.html: Email-Eingabe statt Passwort, Token-Verify-Logik fuer ?token= aus URL

Datenbank-Migration (migrations/2026-05-09_portal_magic_link.py):
- portal_magic_links + portal_magic_link_attempts neu
- portal_login_attempts gedroppt
- portal_admins.email Spalte hinzu, password_hash geleert

Whitelist info@aegis-sight.de, Rate-Limit 5/15 Min, Anti-Enumeration generische Antwort.
Dieser Commit ist enthalten in:
claude-dev
2026-05-09 02:21:40 +00:00
Ursprung e6fdc5cfa0
Commit 7c741062a9
9 geänderte Dateien mit 482 neuen und 151 gelöschten Zeilen

191
src/routers/auth.py Normale Datei
Datei anzeigen

@@ -0,0 +1,191 @@
"""Magic-Link-Authentifizierung."""
import logging
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
import aiosqlite
from auth import generate_magic_token, create_token
from config import (
ALLOWED_EMAIL,
PORTAL_MAGIC_LINK_BASE_URL,
PORTAL_MAGIC_LINK_EXPIRE_MINUTES,
)
from database import db_dependency
from email_utils.sender import send_email
from email_utils.templates import portal_magic_link_email
from models import MagicLinkRequest, MagicLinkResponse, TokenResponse, VerifyTokenRequest
from audit import log_action, get_client_ip
logger = logging.getLogger("verwaltung.auth")
router = APIRouter(prefix="/api/auth", tags=["auth"])
# Rate-Limit: max N Magic-Link-Anfragen pro Email/IP-Kombination im Zeitfenster
RATE_LIMIT_PER_WINDOW = 5
RATE_LIMIT_WINDOW_MINUTES = 15
ATTEMPTS_PURGE_AFTER_HOURS = 24
# Generische Antwort - keine Rückschlüsse auf gültige Emails (Anti-Enumeration)
GENERIC_RESPONSE = MagicLinkResponse(
message="Wenn die E-Mail-Adresse berechtigt ist, wurde ein Login-Link gesendet."
)
@router.post("/magic-link", response_model=MagicLinkResponse)
async def request_magic_link(
data: MagicLinkRequest,
request: Request,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic-Link anfordern. Sendet E-Mail mit zeitlich begrenztem Login-Link."""
email = data.email.lower().strip()
ip = get_client_ip(request)
# Alte Versuche purgen
await db.execute(
f"DELETE FROM portal_magic_link_attempts "
f"WHERE ts < datetime('now', '-{ATTEMPTS_PURGE_AFTER_HOURS} hours')"
)
# Rate-Limit prüfen
cur = await db.execute(
f"""SELECT COUNT(*) AS cnt FROM portal_magic_link_attempts
WHERE email = ? AND ip = ?
AND ts > datetime('now', '-{RATE_LIMIT_WINDOW_MINUTES} minutes')""",
(email, ip),
)
attempts = (await cur.fetchone())["cnt"]
# Versuch immer eintragen (auch wenn rate-limited oder Email nicht erlaubt)
await db.execute(
"INSERT INTO portal_magic_link_attempts (ip, email) VALUES (?, ?)",
(ip, email),
)
await db.commit()
if attempts >= RATE_LIMIT_PER_WINDOW:
logger.warning(f"Rate-Limit erreicht für {email} von {ip}: {attempts} Versuche")
return GENERIC_RESPONSE
# Whitelist-Check (still gegen Enumeration)
if email != ALLOWED_EMAIL.lower():
logger.info(f"Magic-Link-Anfrage für nicht erlaubte Email: {email} von {ip}")
return GENERIC_RESPONSE
# Token erzeugen
token = generate_magic_token()
expires_at = (
datetime.now(timezone.utc) + timedelta(minutes=PORTAL_MAGIC_LINK_EXPIRE_MINUTES)
).strftime("%Y-%m-%d %H:%M:%S")
# Vorige unbenutzte Tokens für diese Email entwerten (mehrfaches Anfordern)
await db.execute(
"UPDATE portal_magic_links SET used_at = CURRENT_TIMESTAMP "
"WHERE email = ? AND used_at IS NULL",
(email,),
)
await db.execute(
"""INSERT INTO portal_magic_links (email, token, expires_at, ip_address)
VALUES (?, ?, ?, ?)""",
(email, token, expires_at, ip),
)
await db.commit()
# E-Mail versenden
link = f"{PORTAL_MAGIC_LINK_BASE_URL}/?token={token}"
subject, html = portal_magic_link_email(link, PORTAL_MAGIC_LINK_EXPIRE_MINUTES)
sent = await send_email(email, subject, html)
if not sent:
logger.error(f"E-Mail-Versand fehlgeschlagen für {email}")
# Wir geben trotzdem die generische Antwort zurück, damit Angreifer
# SMTP-Fehler nicht von "Email nicht erlaubt" unterscheiden können
return GENERIC_RESPONSE
@router.post("/verify", response_model=TokenResponse)
async def verify_magic_link(
data: VerifyTokenRequest,
request: Request,
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Magic-Link-Token verifizieren, JWT-Session zurückgeben."""
ip = get_client_ip(request)
cur = await db.execute(
"""SELECT id, email, expires_at, used_at
FROM portal_magic_links
WHERE token = ?""",
(data.token,),
)
ml = await cur.fetchone()
if not ml:
raise HTTPException(status_code=400, detail="Ungültiger Login-Link")
if ml["used_at"] is not None:
raise HTTPException(
status_code=400, detail="Login-Link bereits verwendet. Bitte neuen anfordern."
)
expires = datetime.fromisoformat(ml["expires_at"])
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) > expires:
raise HTTPException(
status_code=400, detail="Login-Link abgelaufen. Bitte neuen anfordern."
)
email = ml["email"]
if email.lower() != ALLOWED_EMAIL.lower():
# Defense-in-depth: sollte nie passieren, da Einreichung schon Whitelist prüft
raise HTTPException(status_code=403, detail="Nicht berechtigt")
# Admin-Datensatz holen oder anlegen
cur = await db.execute(
"SELECT id, username, email FROM portal_admins WHERE LOWER(email) = ?",
(email.lower(),),
)
admin = await cur.fetchone()
if not admin:
# Beim ersten erfolgreichen Login mit dieser Email einen Admin-Eintrag erzeugen,
# falls noch keiner existiert (z.B. nach Migration). Username = local-part der E-Mail.
username = email.split("@")[0]
cur = await db.execute(
"""INSERT INTO portal_admins (username, password_hash, email)
VALUES (?, '', ?)""",
(username, email),
)
admin_id = cur.lastrowid
admin_username = username
await db.commit()
logger.info(f"Neuer portal_admin angelegt für {email} (id={admin_id})")
else:
admin_id = admin["id"]
admin_username = admin["username"]
# Token als verwendet markieren
await db.execute(
"UPDATE portal_magic_links SET used_at = CURRENT_TIMESTAMP WHERE id = ?",
(ml["id"],),
)
await db.commit()
# Audit
await log_action(
db,
admin={"id": admin_id, "username": admin_username},
ip=ip,
action="login_success",
resource_type="auth",
after={"email": email, "method": "magic_link"},
)
await db.commit()
jwt_token = create_token(admin_id, email, admin_username)
return TokenResponse(
access_token=jwt_token,
username=admin_username,
email=email,
)