Dateien
AegisSight-Monitor-Verwaltung/src/routers/users.py
claude-dev 9000750df2 Phase 9: Code-Hygiene - alle pyflakes-Issues fixen
15 pyflakes-Warnings entfernt:
- src/audit.py: HTTPException (in router import statt helper, war hier ungenutzt)
- src/routers/auth.py: status (FastAPI-status ungenutzt)
- src/routers/audit.py: HTTPException (ungenutzt)
- src/routers/users.py: MAGIC_LINK_EXPIRE_MINUTES (ungenutzt)
- src/routers/sources.py: row_to_dict, _extract_domain, _detect_category,
  urlparse, status (alle ungenutzt - status.HTTP_* wird nirgendwo aufgerufen)
- src/routers/sources.py: 2x f-string ohne Placeholder (URL aktualisiert,
  Verbindung fehlgeschlagen) zu normalen Strings
- src/routers/sources.py: except httpx.ConnectError as e -> e ungenutzt, weg
- src/database.py: os ungenutzt
- src/models.py: EmailStr ungenutzt

Audit-Coverage geprueft: alle write-Endpoints in users.py rufen
_toggle_field() auf, das die log_action-Aufrufe macht. Keine Audit-Luecken.
Alle anderen Routers (organizations/licenses/dashboard/token_usage)
hatten bereits saubere Audit-Coverage.

Mojibake-Diagnose ueber alle src/*.py: 0 Treffer.
2026-05-09 03:49:53 +00:00

209 Zeilen
7.4 KiB
Python

"""Nutzer-Verwaltung pro Organisation."""
import secrets
import string
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, status, Request
from models import UserCreate, UserResponse
from auth import get_current_admin
from database import db_dependency
from audit import log_action, get_client_ip, row_to_dict
from config import MAGIC_LINK_BASE_URL
import aiosqlite
router = APIRouter(prefix="/api/users", tags=["users"])
@router.get("", response_model=list[UserResponse])
async def list_users(
org_id: int = None,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
if org_id:
cursor = await db.execute(
"SELECT * FROM users WHERE organization_id = ? ORDER BY created_at",
(org_id,),
)
else:
cursor = await db.execute("SELECT * FROM users ORDER BY organization_id, created_at")
return [dict(row) for row in await cursor.fetchall()]
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
request: Request,
org_id: int = None,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
if not org_id:
raise HTTPException(status_code=400, detail="org_id Parameter erforderlich")
email = data.email.lower().strip()
cursor = await db.execute("SELECT id, name FROM organizations WHERE id = ?", (org_id,))
org = await cursor.fetchone()
if not org:
raise HTTPException(status_code=404, detail="Organisation nicht gefunden")
cursor = await db.execute(
"SELECT max_users FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY created_at DESC LIMIT 1",
(org_id,),
)
lic = await cursor.fetchone()
if lic:
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM users WHERE organization_id = ? AND is_active = 1",
(org_id,),
)
current = (await cursor.fetchone())["cnt"]
if current >= lic["max_users"]:
raise HTTPException(status_code=400, detail=f"Nutzer-Limit erreicht ({current}/{lic['max_users']})")
cursor = await db.execute("SELECT id FROM users WHERE LOWER(email) = ?", (email,))
if await cursor.fetchone():
raise HTTPException(status_code=400, detail="E-Mail bereits vergeben")
username = data.username if data.username else email.split("@")[0]
now = datetime.now(timezone.utc).isoformat()
cursor = await db.execute(
"""INSERT INTO users (email, username, password_hash, organization_id, role, is_active, created_at)
VALUES (?, ?, '', ?, ?, 1, ?)""",
(email, username, org_id, data.role, now),
)
user_id = cursor.lastrowid
token = secrets.token_urlsafe(48)
code = ''.join(secrets.choice(string.digits) for _ in range(6))
expires_at = (datetime.now(timezone.utc) + timedelta(hours=48)).isoformat()
await db.execute(
"""INSERT INTO magic_links (email, token, code, purpose, user_id, expires_at)
VALUES (?, ?, ?, 'invite', ?, ?)""",
(email, token, code, user_id, expires_at),
)
await db.commit()
try:
from email_utils.sender import send_email
from email_utils.templates import invite_email
link = f"{MAGIC_LINK_BASE_URL}/auth/verify?token={token}"
subject, html = invite_email(username, org["name"], code, link)
await send_email(email, subject, html)
except Exception:
pass
cursor = await db.execute("SELECT * FROM users WHERE id = ?", (user_id,))
new_user = dict(await cursor.fetchone())
await log_action(
db, admin, get_client_ip(request),
action="create", resource_type="user", resource_id=user_id,
after={k: v for k, v in new_user.items() if k != "password_hash"},
)
return new_user
async def _toggle_field(db, request, admin, user_id: int, field: str, value: int):
"""Hilfsfunktion: ein Feld aktualisieren + Audit."""
before = await row_to_dict(db, "users", user_id)
if not before:
raise HTTPException(status_code=404, detail="Nutzer nicht gefunden")
await db.execute(f"UPDATE users SET {field} = ? WHERE id = ?", (value, user_id))
await db.commit()
after = await row_to_dict(db, "users", user_id)
await log_action(
db, admin, get_client_ip(request),
action="update", resource_type="user", resource_id=user_id,
before={field: before.get(field)},
after={field: after.get(field)},
)
return after
@router.put("/{user_id}/deactivate")
async def deactivate_user(
user_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
await _toggle_field(db, request, admin, user_id, "is_active", 0)
return {"ok": True}
@router.put("/{user_id}/activate")
async def activate_user(
user_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
await _toggle_field(db, request, admin, user_id, "is_active", 1)
return {"ok": True}
@router.put("/{user_id}/globe-access")
async def toggle_globe_access(
user_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
cursor = await db.execute("SELECT id, globe_access FROM users WHERE id = ?", (user_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Nutzer nicht gefunden")
new_val = 0 if row[1] else 1
await _toggle_field(db, request, admin, user_id, "globe_access", new_val)
return {"ok": True, "globe_access": bool(new_val)}
@router.put("/{user_id}/network-access")
async def toggle_network_access(
user_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
cursor = await db.execute("SELECT id, network_access FROM users WHERE id = ?", (user_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Nutzer nicht gefunden")
new_val = 0 if row[1] else 1
await _toggle_field(db, request, admin, user_id, "network_access", new_val)
return {"ok": True, "network_access": bool(new_val)}
@router.put("/{user_id}/role")
async def change_role(
user_id: int,
request: Request,
role: str = "member",
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
if role not in ("org_admin", "member"):
raise HTTPException(status_code=400, detail="Ungueltige Rolle")
await _toggle_field(db, request, admin, user_id, "role", role)
return {"ok": True}
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
request: Request,
admin: dict = Depends(get_current_admin),
db: aiosqlite.Connection = Depends(db_dependency),
):
before = await row_to_dict(db, "users", user_id)
if not before:
raise HTTPException(status_code=404, detail="Nutzer nicht gefunden")
await db.execute("DELETE FROM users WHERE id = ?", (user_id,))
await db.commit()
await log_action(
db, admin, get_client_ip(request),
action="delete", resource_type="user", resource_id=user_id,
before={k: v for k, v in before.items() if k != "password_hash"},
)