Dateien
Hetzner-Backup/v2_adminpanel/routes/customer_routes.py
2025-06-18 00:07:34 +02:00

362 Zeilen
13 KiB
Python

import os
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from flask import Blueprint, render_template, request, redirect, session, url_for, flash, jsonify
import config
from auth.decorators import login_required
from utils.audit import log_audit
from db import get_connection, get_db_connection, get_db_cursor
from models import get_customers, get_customer_by_id
# Create Blueprint
customer_bp = Blueprint('customers', __name__)
# Test route
@customer_bp.route("/test-customers")
def test_customers():
return "Customer blueprint is working!"
@customer_bp.route("/customers")
@login_required
def customers():
show_test = request.args.get('show_test', 'false').lower() == 'true'
search = request.args.get('search', '').strip()
customers_list = get_customers(show_test=show_test, search=search)
return render_template("customers.html",
customers=customers_list,
show_test=show_test,
search=search)
@customer_bp.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
@login_required
def edit_customer(customer_id):
if request.method == "POST":
try:
# Get current customer data for comparison
current_customer = get_customer_by_id(customer_id)
if not current_customer:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('customers.customers'))
with get_db_connection() as conn:
cur = conn.cursor()
try:
# Update customer data
new_values = {
'name': request.form['name'],
'email': request.form['email'],
'is_test': 'is_test' in request.form
}
cur.execute("""
UPDATE customers
SET name = %s, email = %s, is_test = %s
WHERE id = %s
""", (
new_values['name'],
new_values['email'],
new_values['is_test'],
customer_id
))
conn.commit()
# Log changes
log_audit('UPDATE', 'customer', customer_id,
old_values={
'name': current_customer['name'],
'email': current_customer['email'],
'is_test': current_customer.get('is_test', False)
},
new_values=new_values)
flash('Kunde erfolgreich aktualisiert!', 'success')
# Redirect mit show_test Parameter wenn nötig
redirect_url = url_for('customers.customers')
if request.form.get('show_test') == 'true':
redirect_url += '?show_test=true'
return redirect(redirect_url)
finally:
cur.close()
except Exception as e:
logging.error(f"Fehler beim Aktualisieren des Kunden: {str(e)}")
flash('Fehler beim Aktualisieren des Kunden!', 'error')
return redirect(url_for('customers.customers'))
# GET request
customer_data = get_customer_by_id(customer_id)
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('customers.customers'))
return render_template("edit_customer.html", customer=customer_data)
@customer_bp.route("/customer/create", methods=["GET", "POST"])
@login_required
def create_customer():
if request.method == "POST":
conn = get_connection()
cur = conn.cursor()
try:
# Insert new customer
name = request.form['name']
email = request.form['email']
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, %s)
RETURNING id
""", (name, email, datetime.now()))
customer_id = cur.fetchone()[0]
conn.commit()
# Log creation
log_audit('CREATE', 'customer', customer_id,
new_values={
'name': name,
'email': email
})
flash(f'Kunde {name} erfolgreich erstellt!', 'success')
return redirect(url_for('customers.customers'))
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Erstellen des Kunden: {str(e)}")
flash('Fehler beim Erstellen des Kunden!', 'error')
finally:
cur.close()
conn.close()
return render_template("create_customer.html")
@customer_bp.route("/customer/delete/<int:customer_id>", methods=["POST"])
@login_required
def delete_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
try:
# Get customer data before deletion
customer_data = get_customer_by_id(customer_id)
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('customers.customers'))
# Check if customer has licenses
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
license_count = cur.fetchone()[0]
if license_count > 0:
flash(f'Kunde kann nicht gelöscht werden - hat noch {license_count} Lizenz(en)!', 'error')
return redirect(url_for('customers.customers'))
# Delete the customer
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
conn.commit()
# Log deletion
log_audit('DELETE', 'customer', customer_id,
old_values={
'name': customer_data['name'],
'email': customer_data['email']
})
flash(f'Kunde {customer_data["name"]} erfolgreich gelöscht!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Löschen des Kunden: {str(e)}")
flash('Fehler beim Löschen des Kunden!', 'error')
finally:
cur.close()
conn.close()
return redirect(url_for('customers.customers'))
@customer_bp.route("/customers-licenses")
@login_required
def customers_licenses():
"""Zeigt die Übersicht von Kunden und deren Lizenzen"""
import logging
import psycopg2
logging.info("=== CUSTOMERS-LICENSES ROUTE CALLED ===")
try:
# Direkte Verbindung ohne Helper-Funktionen
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "5432"),
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
conn.set_client_encoding('UTF8')
cur = conn.cursor()
try:
# Hole alle Kunden mit ihren Lizenzen
cur.execute("""
SELECT
c.id,
c.name,
c.email,
c.created_at,
COUNT(l.id),
COUNT(CASE WHEN l.is_active = true THEN 1 END),
COUNT(CASE WHEN l.is_test = true THEN 1 END),
MAX(l.created_at)
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.name
""")
customers = []
results = cur.fetchall()
logging.info(f"=== QUERY RETURNED {len(results)} ROWS ===")
for idx, row in enumerate(results):
logging.info(f"Row {idx}: Type={type(row)}, Length={len(row) if hasattr(row, '__len__') else 'N/A'}")
customers.append({
'id': row[0],
'name': row[1],
'email': row[2],
'created_at': row[3],
'license_count': row[4],
'active_licenses': row[5],
'test_licenses': row[6],
'last_license_created': row[7]
})
return render_template("customers_licenses.html", customers=customers)
finally:
cur.close()
conn.close()
except Exception as e:
import traceback
error_details = f"Fehler beim Laden der Kunden-Lizenz-Übersicht: {str(e)}\nType: {type(e)}\nTraceback: {traceback.format_exc()}"
logging.error(error_details)
flash(f'Datenbankfehler: {str(e)}', 'error')
return redirect(url_for('admin.dashboard'))
@customer_bp.route("/api/customer/<int:customer_id>/licenses")
@login_required
def api_customer_licenses(customer_id):
"""API-Endpunkt für die Lizenzen eines Kunden"""
conn = get_connection()
cur = conn.cursor()
try:
# Hole Kundeninformationen
customer = get_customer_by_id(customer_id)
if not customer:
return jsonify({'error': 'Kunde nicht gefunden'}), 404
# Hole alle Lizenzen des Kunden
cur.execute("""
SELECT
l.id,
l.license_key,
l.license_type,
l.is_active,
l.is_test,
l.valid_from,
l.valid_until,
l.device_limit,
l.created_at,
(SELECT COUNT(*) FROM sessions s WHERE s.license_key = l.license_key AND s.is_active = true) as active_sessions,
(SELECT COUNT(DISTINCT hardware_id) FROM sessions s WHERE s.license_key = l.license_key) as registered_devices,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'expired'
WHEN l.is_active = false THEN 'inactive'
ELSE 'active'
END as status
FROM licenses l
WHERE l.customer_id = %s
ORDER BY l.created_at DESC
""", (customer_id,))
licenses = []
for row in cur.fetchall():
licenses.append({
'id': row[0],
'license_key': row[1],
'license_type': row[2],
'active': row[3],
'is_test': row[4],
'valid_from': row[5].strftime('%Y-%m-%d') if row[5] else None,
'valid_until': row[6].strftime('%Y-%m-%d') if row[6] else None,
'device_limit': row[7],
'created_at': row[8].strftime('%Y-%m-%d %H:%M:%S') if row[8] else None,
'active_sessions': row[9],
'registered_devices': row[10],
'status': row[11]
})
return jsonify({
'customer': {
'id': customer['id'],
'name': customer['name'],
'email': customer['email']
},
'licenses': licenses
})
except Exception as e:
logging.error(f"Fehler beim Laden der Kundenlizenzen: {str(e)}")
return jsonify({'error': 'Fehler beim Laden der Daten'}), 500
finally:
cur.close()
conn.close()
@customer_bp.route("/api/customer/<int:customer_id>/quick-stats")
@login_required
def api_customer_quick_stats(customer_id):
"""Schnelle Statistiken für einen Kunden"""
conn = get_connection()
cur = conn.cursor()
try:
cur.execute("""
SELECT
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.is_test = true THEN 1 END) as test_licenses,
SUM(l.device_limit) as total_device_limit
FROM licenses l
WHERE l.customer_id = %s
""", (customer_id,))
row = cur.fetchone()
return jsonify({
'total_licenses': row[0] or 0,
'active_licenses': row[1] or 0,
'test_licenses': row[2] or 0,
'total_device_limit': row[3] or 0
})
except Exception as e:
logging.error(f"Fehler beim Laden der Kundenstatistiken: {str(e)}")
return jsonify({'error': 'Fehler beim Laden der Daten'}), 500
finally:
cur.close()
conn.close()