"""Lizenz-Verwaltung und -Pruefung.""" import logging from datetime import datetime from config import TIMEZONE import aiosqlite logger = logging.getLogger("osint.license") async def check_license(db: aiosqlite.Connection, organization_id: int) -> dict: """Prueft den Lizenzstatus einer Organisation. Returns: dict mit: valid, status, license_type, max_users, current_users, read_only, message """ # Organisation pruefen cursor = await db.execute( "SELECT id, name, is_active FROM organizations WHERE id = ?", (organization_id,), ) org = await cursor.fetchone() if not org: return {"valid": False, "status": "not_found", "read_only": True, "message": "Organisation nicht gefunden"} if not org["is_active"]: return {"valid": False, "status": "org_disabled", "read_only": True, "message": "Organisation deaktiviert"} # Aktive Lizenz suchen cursor = await db.execute( """SELECT * FROM licenses WHERE organization_id = ? AND status = 'active' ORDER BY created_at DESC LIMIT 1""", (organization_id,), ) license_row = await cursor.fetchone() if not license_row: return {"valid": False, "status": "no_license", "read_only": True, "message": "Keine aktive Lizenz"} # Ablauf pruefen now = datetime.now(TIMEZONE) valid_until = license_row["valid_until"] if valid_until is not None: try: expiry = datetime.fromisoformat(valid_until) if expiry.tzinfo is None: expiry = expiry.replace(tzinfo=TIMEZONE) if now > expiry: return { "valid": False, "status": "expired", "license_type": license_row["license_type"], "read_only": True, "message": "Lizenz abgelaufen", } except (ValueError, TypeError): pass # Nutzerzahl pruefen cursor = await db.execute( "SELECT COUNT(*) as cnt FROM users WHERE organization_id = ? AND is_active = 1", (organization_id,), ) current_users = (await cursor.fetchone())["cnt"] return { "valid": True, "status": license_row["status"], "license_type": license_row["license_type"], "max_users": license_row["max_users"], "current_users": current_users, "read_only": False, "message": "Lizenz aktiv", } async def can_add_user(db: aiosqlite.Connection, organization_id: int) -> tuple[bool, str]: """Prueft ob ein neuer Nutzer hinzugefuegt werden kann (Nutzer-Limit). Returns: (erlaubt, grund) """ lic = await check_license(db, organization_id) if not lic["valid"]: return False, lic["message"] if lic["current_users"] >= lic["max_users"]: return False, f"Nutzer-Limit erreicht ({lic['current_users']}/{lic['max_users']})" return True, "" async def expire_licenses(db: aiosqlite.Connection): """Setzt abgelaufene Lizenzen auf 'expired'. Taeglich aufrufen.""" cursor = await db.execute( """SELECT id, organization_id FROM licenses WHERE status = 'active' AND valid_until IS NOT NULL AND valid_until < datetime('now')""" ) expired = await cursor.fetchall() count = 0 for lic in expired: await db.execute( "UPDATE licenses SET status = 'expired' WHERE id = ?", (lic["id"],), ) count += 1 logger.info(f"Lizenz {lic['id']} fuer Org {lic['organization_id']} als abgelaufen markiert") if count > 0: await db.commit() logger.info(f"{count} Lizenz(en) als abgelaufen markiert") return count