Files
v2-Docker/v2_adminpanel/routes/batch_routes.py
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

439 Zeilen
17 KiB
Python

import os
import logging
import secrets
import string
from datetime import datetime, timedelta
from pathlib import Path
from flask import Blueprint, render_template, request, redirect, session, url_for, flash, send_file
import config
from auth.decorators import login_required
from utils.audit import log_audit
from utils.network import get_client_ip
from utils.export import create_batch_export
from db import get_connection, get_db_connection, get_db_cursor
from models import get_customers
# Create Blueprint
batch_bp = Blueprint('batch', __name__)
def generate_license_key():
"""Generiert einen zufälligen Lizenzschlüssel"""
chars = string.ascii_uppercase + string.digits
return '-'.join([''.join(secrets.choice(chars) for _ in range(4)) for _ in range(4)])
@batch_bp.route("/batch", methods=["GET", "POST"])
@login_required
def batch_create():
"""Batch-Erstellung von Lizenzen"""
customers = get_customers()
if request.method == "POST":
conn = get_connection()
cur = conn.cursor()
try:
# Form data
customer_id = int(request.form['customer_id'])
license_type = request.form['license_type']
count = int(request.form['quantity']) # Korrigiert von 'count' zu 'quantity'
valid_from = request.form['valid_from']
valid_until = request.form['valid_until']
device_limit = int(request.form['device_limit'])
# Resource allocation parameters
domain_count = int(request.form.get('domain_count', 0))
ipv4_count = int(request.form.get('ipv4_count', 0))
phone_count = int(request.form.get('phone_count', 0))
# Validierung
if count < 1 or count > 100:
flash('Anzahl muss zwischen 1 und 100 liegen!', 'error')
return redirect(url_for('batch.batch_create'))
# Hole Kundendaten inkl. is_fake Status
cur.execute("SELECT name, email, is_fake FROM customers WHERE id = %s", (customer_id,))
customer = cur.fetchone()
if not customer:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('batch.batch_create'))
# Lizenz erbt immer den is_fake Status vom Kunden
is_fake = customer[2]
created_licenses = []
# Erstelle Lizenzen
for i in range(count):
license_key = generate_license_key()
# Prüfe ob Schlüssel bereits existiert
while True:
cur.execute("SELECT id FROM licenses WHERE license_key = %s", (license_key,))
if not cur.fetchone():
break
license_key = generate_license_key()
# Erstelle Lizenz
cur.execute("""
INSERT INTO licenses (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_fake, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_fake, datetime.now()
))
license_id = cur.fetchone()[0]
created_licenses.append({
'id': license_id,
'license_key': license_key
})
# Allocate resources if requested
if domain_count > 0 or ipv4_count > 0 or phone_count > 0:
# Allocate domains
if domain_count > 0:
cur.execute("""
UPDATE resource_pool
SET status = 'allocated',
license_id = %s,
allocated_at = NOW()
WHERE id IN (
SELECT id FROM resource_pool
WHERE type = 'domain'
AND status = 'available'
AND is_fake = %s
ORDER BY id
LIMIT %s
)
""", (license_id, is_fake, domain_count))
# Allocate IPv4s
if ipv4_count > 0:
cur.execute("""
UPDATE resource_pool
SET status = 'allocated',
license_id = %s,
allocated_at = NOW()
WHERE id IN (
SELECT id FROM resource_pool
WHERE type = 'ipv4'
AND status = 'available'
AND is_fake = %s
ORDER BY id
LIMIT %s
)
""", (license_id, is_fake, ipv4_count))
# Allocate phones
if phone_count > 0:
cur.execute("""
UPDATE resource_pool
SET status = 'allocated',
license_id = %s,
allocated_at = NOW()
WHERE id IN (
SELECT id FROM resource_pool
WHERE type = 'phone'
AND status = 'available'
AND is_fake = %s
ORDER BY id
LIMIT %s
)
""", (license_id, is_fake, phone_count))
# Audit-Log
log_audit('CREATE', 'license', license_id,
new_values={
'license_key': license_key,
'customer_name': customer[0],
'batch_creation': True
})
conn.commit()
# Speichere erstellte Lizenzen in Session für Export
session['batch_created_licenses'] = created_licenses
session['batch_customer_name'] = customer[0]
session['batch_customer_email'] = customer[1]
flash(f'{count} Lizenzen erfolgreich erstellt!', 'success')
# Weiterleitung zum Export
return redirect(url_for('batch.batch_export'))
except Exception as e:
conn.rollback()
logging.error(f"Fehler bei Batch-Erstellung: {str(e)}")
flash('Fehler bei der Batch-Erstellung!', 'error')
finally:
cur.close()
conn.close()
return render_template("batch_form.html", customers=customers)
@batch_bp.route("/batch/export")
@login_required
def batch_export():
"""Exportiert die zuletzt erstellten Batch-Lizenzen"""
created_licenses = session.get('batch_created_licenses', [])
if not created_licenses:
flash('Keine Lizenzen zum Exportieren gefunden!', 'error')
return redirect(url_for('batch.batch_create'))
conn = get_connection()
cur = conn.cursor()
try:
# Hole vollständige Lizenzdaten
license_ids = [l['id'] for l in created_licenses]
cur.execute("""
SELECT
l.license_key, c.name, c.email,
l.license_type, l.valid_from, l.valid_until,
l.device_limit, l.is_fake, l.created_at
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = ANY(%s)
ORDER BY l.id
""", (license_ids,))
licenses = []
for row in cur.fetchall():
licenses.append({
'license_key': row[0],
'customer_name': row[1],
'customer_email': row[2],
'license_type': row[3],
'valid_from': row[4],
'valid_until': row[5],
'device_limit': row[6],
'is_fake': row[7],
'created_at': row[8]
})
# Lösche aus Session
session.pop('batch_created_licenses', None)
session.pop('batch_customer_name', None)
session.pop('batch_customer_email', None)
# Erstelle und sende Excel-Export
return create_batch_export(licenses)
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
flash('Fehler beim Exportieren der Lizenzen!', 'error')
return redirect(url_for('batch.batch_create'))
finally:
cur.close()
conn.close()
@batch_bp.route("/batch/update", methods=["GET", "POST"])
@login_required
def batch_update():
"""Batch-Update von Lizenzen"""
if request.method == "POST":
conn = get_connection()
cur = conn.cursor()
try:
# Form data
license_keys = request.form.get('license_keys', '').strip().split('\n')
license_keys = [key.strip() for key in license_keys if key.strip()]
if not license_keys:
flash('Keine Lizenzschlüssel angegeben!', 'error')
return redirect(url_for('batch.batch_update'))
# Update-Parameter
updates = []
params = []
if 'update_valid_until' in request.form and request.form['valid_until']:
updates.append("valid_until = %s")
params.append(request.form['valid_until'])
if 'update_device_limit' in request.form and request.form['device_limit']:
updates.append("device_limit = %s")
params.append(int(request.form['device_limit']))
if 'update_active' in request.form:
updates.append("is_active = %s")
params.append('is_active' in request.form)
if not updates:
flash('Keine Änderungen angegeben!', 'error')
return redirect(url_for('batch.batch_update'))
# Führe Updates aus
updated_count = 0
not_found = []
for license_key in license_keys:
# Prüfe ob Lizenz existiert
cur.execute("SELECT id FROM licenses WHERE license_key = %s", (license_key,))
result = cur.fetchone()
if not result:
not_found.append(license_key)
continue
license_id = result[0]
# Update ausführen
update_params = params + [license_id]
cur.execute(f"""
UPDATE licenses
SET {', '.join(updates)}
WHERE id = %s
""", update_params)
# Audit-Log
log_audit('BATCH_UPDATE', 'license', license_id,
additional_info=f"Batch-Update: {', '.join(updates)}")
updated_count += 1
conn.commit()
# Feedback
flash(f'{updated_count} Lizenzen erfolgreich aktualisiert!', 'success')
if not_found:
flash(f'{len(not_found)} Lizenzen nicht gefunden: {", ".join(not_found[:5])}{"..." if len(not_found) > 5 else ""}', 'warning')
except Exception as e:
conn.rollback()
logging.error(f"Fehler bei Batch-Update: {str(e)}")
flash('Fehler beim Batch-Update!', 'error')
finally:
cur.close()
conn.close()
return render_template("batch_update.html")
@batch_bp.route("/batch/import", methods=["GET", "POST"])
@login_required
def batch_import():
"""Import von Lizenzen aus CSV/Excel"""
if request.method == "POST":
if 'file' not in request.files:
flash('Keine Datei ausgewählt!', 'error')
return redirect(url_for('batch.batch_import'))
file = request.files['file']
if file.filename == '':
flash('Keine Datei ausgewählt!', 'error')
return redirect(url_for('batch.batch_import'))
# Verarbeite Datei
try:
import pandas as pd
# Lese Datei
if file.filename.endswith('.csv'):
df = pd.read_csv(file)
elif file.filename.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file)
else:
flash('Ungültiges Dateiformat! Nur CSV und Excel erlaubt.', 'error')
return redirect(url_for('batch.batch_import'))
# Validiere Spalten
required_columns = ['customer_email', 'license_type', 'valid_from', 'valid_until', 'device_limit']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
flash(f'Fehlende Spalten: {", ".join(missing_columns)}', 'error')
return redirect(url_for('batch.batch_import'))
conn = get_connection()
cur = conn.cursor()
imported_count = 0
errors = []
for index, row in df.iterrows():
try:
# Finde oder erstelle Kunde
email = row['customer_email']
cur.execute("SELECT id, name FROM customers WHERE email = %s", (email,))
customer = cur.fetchone()
if not customer:
# Erstelle neuen Kunden
name = row.get('customer_name', email.split('@')[0])
# Neue Kunden werden immer als Fake erstellt in der Testphase
# TODO: Nach Testphase muss hier die Business-Logik angepasst werden
is_fake = True
cur.execute("""
INSERT INTO customers (name, email, is_fake, created_at)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (name, email, is_fake, datetime.now()))
customer_id = cur.fetchone()[0]
customer_name = name
else:
customer_id = customer[0]
customer_name = customer[1]
# Hole is_fake Status vom existierenden Kunden
cur.execute("SELECT is_fake FROM customers WHERE id = %s", (customer_id,))
is_fake = cur.fetchone()[0]
# Generiere Lizenzschlüssel
license_key = row.get('license_key', generate_license_key())
# Erstelle Lizenz - is_fake wird vom Kunden geerbt
cur.execute("""
INSERT INTO licenses (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_fake, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
license_key, customer_id,
row['license_type'], row['valid_from'], row['valid_until'],
int(row['device_limit']), is_fake,
datetime.now()
))
license_id = cur.fetchone()[0]
imported_count += 1
# Audit-Log
log_audit('IMPORT', 'license', license_id,
additional_info=f"Importiert aus {file.filename}")
except Exception as e:
errors.append(f"Zeile {index + 2}: {str(e)}")
conn.commit()
# Feedback
flash(f'{imported_count} Lizenzen erfolgreich importiert!', 'success')
if errors:
flash(f'{len(errors)} Fehler aufgetreten. Erste Fehler: {"; ".join(errors[:3])}', 'warning')
except Exception as e:
logging.error(f"Fehler beim Import: {str(e)}")
flash(f'Fehler beim Import: {str(e)}', 'error')
finally:
if 'conn' in locals():
cur.close()
conn.close()
return render_template("batch_import.html")