Dateien
Hetzner-Backup/v2_adminpanel/routes/license_routes.py
2025-06-18 01:35:54 +02:00

444 Zeilen
20 KiB
Python

import os
import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from dateutil.relativedelta import relativedelta
from flask import Blueprint, render_template, request, redirect, session, url_for, flash, jsonify
import config
from auth.decorators import login_required
from utils.audit import log_audit
from utils.network import get_client_ip
from utils.license import validate_license_key
from db import get_connection, get_db_connection, get_db_cursor
from models import get_licenses, get_license_by_id
# Create Blueprint
license_bp = Blueprint('licenses', __name__)
@license_bp.route("/licenses")
@login_required
def licenses():
from datetime import datetime, timedelta
# Get filter parameters
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
page = request.args.get('page', 1, type=int)
per_page = 50
# Process type filter to determine show_test
show_test = filter_type in ['test_data', 'test']
# Get licenses based on filters
licenses_list = get_licenses(show_test=show_test)
# Additional filtering based on type and status
if filter_type:
if filter_type == 'full':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'full' and not l.get('is_test')]
elif filter_type == 'test':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'test' and not l.get('is_test')]
elif filter_type == 'test_data':
licenses_list = [l for l in licenses_list if l.get('is_test')]
elif filter_type == 'live_data':
licenses_list = [l for l in licenses_list if not l.get('is_test')]
# Status filtering
if filter_status:
now = datetime.now()
if filter_status == 'active':
licenses_list = [l for l in licenses_list if l.get('is_active') and l.get('valid_until') and l.get('valid_until') > now]
elif filter_status == 'expiring':
expiry_threshold = now + timedelta(days=30)
licenses_list = [l for l in licenses_list if l.get('valid_until') and now < l.get('valid_until') <= expiry_threshold]
elif filter_status == 'expired':
licenses_list = [l for l in licenses_list if l.get('valid_until') and l.get('valid_until') <= now]
elif filter_status == 'inactive':
licenses_list = [l for l in licenses_list if not l.get('is_active')]
# Search filtering
if search:
search_lower = search.lower()
licenses_list = [l for l in licenses_list if
search_lower in str(l.get('license_key', '')).lower() or
search_lower in str(l.get('customer_name', '')).lower() or
search_lower in str(l.get('customer_email', '')).lower()]
# Calculate pagination
total = len(licenses_list)
total_pages = (total + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
licenses_list = licenses_list[start:end]
return render_template("licenses.html",
licenses=licenses_list,
show_test=show_test,
search=search,
filter_type=filter_type,
filter_status=filter_status,
sort=sort,
order=order,
page=page,
total=total,
total_pages=total_pages,
per_page=per_page,
now=datetime.now,
timedelta=timedelta)
@license_bp.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
@login_required
def edit_license(license_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
try:
# Get current license data for comparison
current_license = get_license_by_id(license_id)
if not current_license:
flash('Lizenz nicht gefunden!', 'error')
return redirect(url_for('licenses.licenses'))
# Update license data
new_values = {
'license_key': request.form['license_key'],
'license_type': request.form['license_type'],
'valid_from': request.form['valid_from'],
'valid_until': request.form['valid_until'],
'is_active': 'is_active' in request.form,
'is_test': 'is_test' in request.form,
'device_limit': int(request.form.get('device_limit', 3))
}
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s, is_test = %s, device_limit = %s
WHERE id = %s
""", (
new_values['license_key'],
new_values['license_type'],
new_values['valid_from'],
new_values['valid_until'],
new_values['is_active'],
new_values['is_test'],
new_values['device_limit'],
license_id
))
conn.commit()
# Log changes
log_audit('UPDATE', 'license', license_id,
old_values={
'license_key': current_license.get('license_key'),
'license_type': current_license.get('license_type'),
'valid_from': str(current_license.get('valid_from', '')),
'valid_until': str(current_license.get('valid_until', '')),
'is_active': current_license.get('is_active'),
'is_test': current_license.get('is_test'),
'device_limit': current_license.get('device_limit', 3)
},
new_values=new_values)
flash('Lizenz erfolgreich aktualisiert!', 'success')
# Preserve show_test parameter if present
show_test = request.args.get('show_test', 'false')
return redirect(url_for('licenses.licenses', show_test=show_test))
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Aktualisieren der Lizenz: {str(e)}")
flash('Fehler beim Aktualisieren der Lizenz!', 'error')
finally:
cur.close()
conn.close()
# GET request
license_data = get_license_by_id(license_id)
if not license_data:
flash('Lizenz nicht gefunden!', 'error')
return redirect(url_for('licenses.licenses'))
return render_template("edit_license.html", license=license_data)
@license_bp.route("/license/delete/<int:license_id>", methods=["POST"])
@login_required
def delete_license(license_id):
conn = get_connection()
cur = conn.cursor()
try:
# Get license data before deletion
license_data = get_license_by_id(license_id)
if not license_data:
flash('Lizenz nicht gefunden!', 'error')
return redirect(url_for('licenses.licenses'))
# Delete from sessions first
cur.execute("DELETE FROM sessions WHERE license_key = %s", (license_data['license_key'],))
# Delete the license
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
conn.commit()
# Log deletion
log_audit('DELETE', 'license', license_id,
old_values={
'license_key': license_data['license_key'],
'customer_name': license_data['customer_name'],
'customer_email': license_data['customer_email']
})
flash(f'Lizenz {license_data["license_key"]} erfolgreich gelöscht!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Löschen der Lizenz: {str(e)}")
flash('Fehler beim Löschen der Lizenz!', 'error')
finally:
cur.close()
conn.close()
# Preserve show_test parameter if present
show_test = request.args.get('show_test', 'false')
return redirect(url_for('licenses.licenses', show_test=show_test))
@license_bp.route("/create", methods=["GET", "POST"])
@login_required
def create_license():
if request.method == "POST":
customer_id = request.form.get("customer_id")
license_key = request.form["license_key"].upper() # Immer Großbuchstaben
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
is_test = request.form.get("is_test") == "on" # Checkbox value
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
duration_type = request.form.get("duration_type", "years")
start_date = datetime.strptime(valid_from, "%Y-%m-%d")
if duration_type == "days":
end_date = start_date + timedelta(days=duration)
elif duration_type == "months":
end_date = start_date + relativedelta(months=duration)
else: # years
end_date = start_date + relativedelta(years=duration)
# Ein Tag abziehen, da der Starttag mitgezählt wird
end_date = end_date - timedelta(days=1)
valid_until = end_date.strftime("%Y-%m-%d")
# Validiere License Key Format
if not validate_license_key(license_key):
flash('Ungültiges License Key Format! Erwartet: AF-YYYYMMFT-XXXX-YYYY-ZZZZ', 'error')
return redirect(url_for('licenses.create_license'))
# Resource counts
domain_count = int(request.form.get("domain_count", 1))
ipv4_count = int(request.form.get("ipv4_count", 1))
phone_count = int(request.form.get("phone_count", 1))
device_limit = int(request.form.get("device_limit", 3))
conn = get_connection()
cur = conn.cursor()
try:
# Prüfe ob neuer Kunde oder bestehender
if customer_id == "new":
# Neuer Kunde
name = request.form.get("customer_name")
email = request.form.get("email")
if not name:
flash('Kundenname ist erforderlich!', 'error')
return redirect(url_for('licenses.create_license'))
# Prüfe ob E-Mail bereits existiert
if email:
cur.execute("SELECT id, name FROM customers WHERE LOWER(email) = LOWER(%s)", (email,))
existing = cur.fetchone()
if existing:
flash(f'E-Mail bereits vergeben für Kunde: {existing[1]}', 'error')
return redirect(url_for('licenses.create_license'))
# Kunde einfügen (erbt Test-Status von Lizenz)
cur.execute("""
INSERT INTO customers (name, email, is_test, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id
""", (name, email, is_test))
customer_id = cur.fetchone()[0]
customer_info = {'name': name, 'email': email, 'is_test': is_test}
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email, 'is_test': is_test})
else:
# Bestehender Kunde - hole Infos für Audit-Log
cur.execute("SELECT name, email, is_test FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('licenses.create_license'))
customer_info = {'name': customer_data[0], 'email': customer_data[1]}
# Wenn Kunde Test-Kunde ist, Lizenz auch als Test markieren
if customer_data[2]: # is_test des Kunden
is_test = True
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active,
domain_count, ipv4_count, phone_count, device_limit, is_test)
VALUES (%s, %s, %s, %s, %s, TRUE, %s, %s, %s, %s, %s)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until,
domain_count, ipv4_count, phone_count, device_limit, is_test))
license_id = cur.fetchone()[0]
# Ressourcen zuweisen
try:
# Prüfe Verfügbarkeit
cur.execute("""
SELECT
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s) as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s) as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s) as phones
""", (is_test, is_test, is_test))
available = cur.fetchone()
if available[0] < domain_count:
raise ValueError(f"Nicht genügend Domains verfügbar (benötigt: {domain_count}, verfügbar: {available[0]})")
if available[1] < ipv4_count:
raise ValueError(f"Nicht genügend IPv4-Adressen verfügbar (benötigt: {ipv4_count}, verfügbar: {available[1]})")
if available[2] < phone_count:
raise ValueError(f"Nicht genügend Telefonnummern verfügbar (benötigt: {phone_count}, verfügbar: {available[2]})")
# Domains zuweisen
if domain_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (is_test, domain_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
SET status = 'allocated', allocated_to_license = %s,
status_changed_at = CURRENT_TIMESTAMP, status_changed_by = %s
WHERE id = %s
""", (license_id, session['username'], resource_id))
cur.execute("""
INSERT INTO license_resources (license_id, resource_id, assigned_by)
VALUES (%s, %s, %s)
""", (license_id, resource_id, session['username']))
cur.execute("""
INSERT INTO resource_history (resource_id, license_id, action, action_by, ip_address)
VALUES (%s, %s, 'allocated', %s, %s)
""", (resource_id, license_id, session['username'], get_client_ip()))
# IPv4s zuweisen
if ipv4_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (is_test, ipv4_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
SET status = 'allocated', allocated_to_license = %s,
status_changed_at = CURRENT_TIMESTAMP, status_changed_by = %s
WHERE id = %s
""", (license_id, session['username'], resource_id))
cur.execute("""
INSERT INTO license_resources (license_id, resource_id, assigned_by)
VALUES (%s, %s, %s)
""", (license_id, resource_id, session['username']))
cur.execute("""
INSERT INTO resource_history (resource_id, license_id, action, action_by, ip_address)
VALUES (%s, %s, 'allocated', %s, %s)
""", (resource_id, license_id, session['username'], get_client_ip()))
# Telefonnummern zuweisen
if phone_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (is_test, phone_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
SET status = 'allocated', allocated_to_license = %s,
status_changed_at = CURRENT_TIMESTAMP, status_changed_by = %s
WHERE id = %s
""", (license_id, session['username'], resource_id))
cur.execute("""
INSERT INTO license_resources (license_id, resource_id, assigned_by)
VALUES (%s, %s, %s)
""", (license_id, resource_id, session['username']))
cur.execute("""
INSERT INTO resource_history (resource_id, license_id, action, action_by, ip_address)
VALUES (%s, %s, 'allocated', %s, %s)
""", (resource_id, license_id, session['username'], get_client_ip()))
except ValueError as e:
conn.rollback()
flash(str(e), 'error')
return redirect(url_for('licenses.create_license'))
conn.commit()
# Audit-Log
log_audit('CREATE', 'license', license_id,
new_values={
'license_key': license_key,
'customer_name': customer_info['name'],
'customer_email': customer_info['email'],
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until,
'device_limit': device_limit,
'is_test': is_test
})
flash(f'Lizenz {license_key} erfolgreich erstellt!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Erstellen der Lizenz: {str(e)}")
flash('Fehler beim Erstellen der Lizenz!', 'error')
finally:
cur.close()
conn.close()
# Preserve show_test parameter if present
redirect_url = url_for('licenses.create_license')
if request.args.get('show_test') == 'true':
redirect_url += "?show_test=true"
return redirect(redirect_url)
# Unterstützung für vorausgewählten Kunden
preselected_customer_id = request.args.get('customer_id', type=int)
return render_template("index.html", username=session.get('username'), preselected_customer_id=preselected_customer_id)