Kunden & Lizenzen - Part1

Dieser Commit ist enthalten in:
2025-06-09 19:13:44 +02:00
Ursprung dbd50bdde6
Commit 97b87465e4
12 geänderte Dateien mit 1625 neuen und 227 gelöschten Zeilen

Datei anzeigen

@@ -1096,12 +1096,43 @@ def api_customers():
search = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
customer_id = request.args.get('id', type=int)
conn = get_connection()
cur = conn.cursor()
# Einzelnen Kunden per ID abrufen
if customer_id:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE c.id = %s
GROUP BY c.id, c.name, c.email
""", (customer_id,))
customer = cur.fetchone()
results = []
if customer:
results.append({
'id': customer[0],
'text': f"{customer[1]} ({customer[2]})",
'name': customer[1],
'email': customer[2],
'license_count': customer[3]
})
cur.close()
conn.close()
return jsonify({
'results': results,
'pagination': {'more': False}
})
# SQL Query mit optionaler Suche
if search:
elif search:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
@@ -1615,9 +1646,15 @@ def create_license():
cur.close()
conn.close()
return redirect("/create")
# Preserve show_test parameter if present
redirect_url = "/create"
if request.args.get('show_test') == 'true':
redirect_url += "?show_test=true"
return redirect(redirect_url)
return render_template("index.html", username=session.get('username'))
# Unterstützung für vorausgewählten Kunden
preselected_customer_id = request.args.get('customer_id', type=int)
return render_template("index.html", username=session.get('username'), preselected_customer_id=preselected_customer_id)
@app.route("/batch", methods=["GET", "POST"])
@login_required
@@ -1915,129 +1952,8 @@ def export_batch():
@app.route("/licenses")
@login_required
def licenses():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
page = request.args.get('page', 1, type=int)
sort = request.args.get('sort', 'valid_until')
order = request.args.get('order', 'desc')
per_page = 20
# Whitelist für erlaubte Sortierfelder
allowed_sort_fields = {
'id': 'l.id',
'license_key': 'l.license_key',
'customer': 'c.name',
'email': 'c.email',
'type': 'l.license_type',
'valid_from': 'l.valid_from',
'valid_until': 'l.valid_until',
'status': 'status',
'active': 'l.is_active'
}
# Validierung
if sort not in allowed_sort_fields:
sort = 'valid_until'
if order not in ['asc', 'desc']:
order = 'desc'
sort_field = allowed_sort_fields[sort]
# SQL Query mit optionaler Suche und Filtern
query = """
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active, l.is_test,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
# Suchfilter
if search:
query += """
AND (LOWER(l.license_key) LIKE LOWER(%s)
OR LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s))
"""
search_param = f'%{search}%'
params.extend([search_param, search_param, search_param])
# Typ-Filter
if filter_type:
if filter_type == 'test_data':
query += " AND l.is_test = TRUE"
elif filter_type == 'live_data':
query += " AND l.is_test = FALSE"
else:
query += " AND l.license_type = %s AND l.is_test = FALSE"
params.append(filter_type)
# Status-Filter
if filter_status == 'active':
query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE"
elif filter_status == 'expiring':
query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE"
elif filter_status == 'expired':
query += " AND l.valid_until < CURRENT_DATE"
elif filter_status == 'inactive':
query += " AND l.is_active = FALSE"
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
# Spezialbehandlung für berechnete Felder
if sort == 'status':
# Für Status müssen wir die CASE-Bedingung in ORDER BY wiederholen
query += f""" ORDER BY
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END {order.upper()} LIMIT %s OFFSET %s"""
else:
query += f" ORDER BY {sort_field} {order.upper()} LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
licenses = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("licenses.html",
licenses=licenses,
search=search,
filter_type=filter_type,
filter_status=filter_status,
page=page,
total_pages=total_pages,
total=total,
sort=sort,
order=order,
username=session.get('username'))
# Redirect zur kombinierten Ansicht
return redirect("/customers-licenses")
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
@login_required
@@ -2092,7 +2008,23 @@ def edit_license(license_id):
cur.close()
conn.close()
return redirect("/licenses")
# Redirect zurück zu customers-licenses mit beibehaltenen Parametern
redirect_url = "/customers-licenses"
# Behalte show_test Parameter bei (aus Form oder GET-Parameter)
show_test = request.form.get('show_test') or request.args.get('show_test')
if show_test == 'true':
redirect_url += "?show_test=true"
# Behalte customer_id bei wenn vorhanden
if request.referrer and 'customer_id=' in request.referrer:
import re
match = re.search(r'customer_id=(\d+)', request.referrer)
if match:
connector = "&" if "?" in redirect_url else "?"
redirect_url += f"{connector}customer_id={match.group(1)}"
return redirect(redirect_url)
# Get license data
cur.execute("""
@@ -2148,93 +2080,8 @@ def delete_license(license_id):
@app.route("/customers")
@login_required
def customers():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
per_page = 20
# Whitelist für erlaubte Sortierfelder
allowed_sort_fields = {
'id': 'c.id',
'name': 'c.name',
'email': 'c.email',
'created_at': 'c.created_at',
'licenses': 'license_count',
'active_licenses': 'active_licenses'
}
# Validierung
if sort not in allowed_sort_fields:
sort = 'created_at'
if order not in ['asc', 'desc']:
order = 'desc'
sort_field = allowed_sort_fields[sort]
# SQL Query mit optionaler Suche
base_query = """
SELECT c.id, c.name, c.email, c.created_at, c.is_test,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
"""
params = []
if search:
base_query += """
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
"""
search_param = f'%{search}%'
params.extend([search_param, search_param])
# Gesamtanzahl für Pagination
count_query = f"""
SELECT COUNT(DISTINCT c.id)
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
{"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""}
"""
if search:
cur.execute(count_query, params)
else:
cur.execute(count_query)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query = base_query + f"""
GROUP BY c.id, c.name, c.email, c.created_at, c.is_test
ORDER BY {sort_field} {order.upper()}
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
cur.execute(query, params)
customers = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("customers.html",
customers=customers,
search=search,
page=page,
total_pages=total_pages,
total=total,
sort=sort,
order=order,
username=session.get('username'))
# Redirect zur kombinierten Ansicht
return redirect("/customers-licenses")
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
@login_required
@@ -2276,11 +2123,23 @@ def edit_customer(customer_id):
cur.close()
conn.close()
return redirect("/customers")
# Redirect zurück zu customers-licenses mit beibehaltenen Parametern
redirect_url = "/customers-licenses"
# Behalte show_test Parameter bei (aus Form oder GET-Parameter)
show_test = request.form.get('show_test') or request.args.get('show_test')
if show_test == 'true':
redirect_url += "?show_test=true"
# Behalte customer_id bei (immer der aktuelle Kunde)
connector = "&" if "?" in redirect_url else "?"
redirect_url += f"{connector}customer_id={customer_id}"
return redirect(redirect_url)
# Get customer data with licenses
cur.execute("""
SELECT id, name, email, is_test FROM customers WHERE id = %s
SELECT id, name, email, created_at, is_test FROM customers WHERE id = %s
""", (customer_id,))
customer = cur.fetchone()
@@ -2304,7 +2163,7 @@ def edit_customer(customer_id):
conn.close()
if not customer:
return redirect("/customers")
return redirect("/customers-licenses")
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
@@ -2346,6 +2205,247 @@ def delete_customer(customer_id):
return redirect("/customers")
@app.route("/customers-licenses")
@login_required
def customers_licenses():
"""Kombinierte Ansicht für Kunden und deren Lizenzen"""
conn = get_connection()
cur = conn.cursor()
# Hole alle Kunden mit Lizenzstatistiken (inkl. Testkunden wenn gewünscht)
show_test = request.args.get('show_test', 'false').lower() == 'true'
query = """
SELECT
c.id,
c.name,
c.email,
c.created_at,
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
"""
if not show_test:
query += " WHERE c.is_test = FALSE"
query += """
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.name
"""
cur.execute(query)
customers = cur.fetchall()
# Hole ersten Kunden für initiale Anzeige (falls vorhanden)
selected_customer_id = request.args.get('customer_id', type=int)
licenses = []
selected_customer = None
if customers:
if not selected_customer_id:
selected_customer_id = customers[0][0] # Erster Kunde
# Hole Daten des ausgewählten Kunden
for customer in customers:
if customer[0] == selected_customer_id:
selected_customer = customer
break
# Hole Lizenzen des ausgewählten Kunden
if selected_customer:
cur.execute("""
SELECT
l.id,
l.license_key,
l.license_type,
l.valid_from,
l.valid_until,
l.is_active,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status,
l.domain_count,
l.ipv4_count,
l.phone_count
FROM licenses l
WHERE l.customer_id = %s
ORDER BY l.created_at DESC
""", (selected_customer_id,))
licenses = cur.fetchall()
cur.close()
conn.close()
return render_template("customers_licenses.html",
customers=customers,
selected_customer=selected_customer,
selected_customer_id=selected_customer_id,
licenses=licenses,
show_test=show_test)
@app.route("/api/customer/<int:customer_id>/licenses")
@login_required
def api_customer_licenses(customer_id):
"""API-Endpoint für AJAX-Abruf der Lizenzen eines Kunden"""
conn = get_connection()
cur = conn.cursor()
# Hole Lizenzen des Kunden
cur.execute("""
SELECT
l.id,
l.license_key,
l.license_type,
l.valid_from,
l.valid_until,
l.is_active,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status,
l.domain_count,
l.ipv4_count,
l.phone_count
FROM licenses l
WHERE l.customer_id = %s
ORDER BY l.created_at DESC
""", (customer_id,))
licenses = []
for row in cur.fetchall():
licenses.append({
'id': row[0],
'license_key': row[1],
'license_type': row[2],
'valid_from': row[3].strftime('%d.%m.%Y') if row[3] else '',
'valid_until': row[4].strftime('%d.%m.%Y') if row[4] else '',
'is_active': row[5],
'status': row[6],
'domain_count': row[7],
'ipv4_count': row[8],
'phone_count': row[9]
})
cur.close()
conn.close()
return jsonify({
'success': True,
'licenses': licenses,
'count': len(licenses)
})
@app.route("/api/customer/<int:customer_id>/quick-stats")
@login_required
def api_customer_quick_stats(customer_id):
"""API-Endpoint für Schnellstatistiken eines Kunden"""
conn = get_connection()
cur = conn.cursor()
# Hole Kundenstatistiken
cur.execute("""
SELECT
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.valid_until >= CURRENT_DATE THEN 1 END) as expiring_soon
FROM licenses l
WHERE l.customer_id = %s
""", (customer_id,))
stats = cur.fetchone()
cur.close()
conn.close()
return jsonify({
'success': True,
'stats': {
'total': stats[0],
'active': stats[1],
'expired': stats[2],
'expiring_soon': stats[3]
}
})
@app.route("/api/license/<int:license_id>/quick-edit", methods=['POST'])
@login_required
def api_license_quick_edit(license_id):
"""API-Endpoint für schnelle Lizenz-Bearbeitung"""
conn = get_connection()
cur = conn.cursor()
try:
data = request.get_json()
# Hole alte Werte für Audit-Log
cur.execute("""
SELECT is_active, valid_until, license_type
FROM licenses WHERE id = %s
""", (license_id,))
old_values = cur.fetchone()
if not old_values:
return jsonify({'success': False, 'error': 'Lizenz nicht gefunden'}), 404
# Update-Felder vorbereiten
updates = []
params = []
new_values = {}
if 'is_active' in data:
updates.append("is_active = %s")
params.append(data['is_active'])
new_values['is_active'] = data['is_active']
if 'valid_until' in data:
updates.append("valid_until = %s")
params.append(data['valid_until'])
new_values['valid_until'] = data['valid_until']
if 'license_type' in data:
updates.append("license_type = %s")
params.append(data['license_type'])
new_values['license_type'] = data['license_type']
if updates:
params.append(license_id)
cur.execute(f"""
UPDATE licenses
SET {', '.join(updates)}
WHERE id = %s
""", params)
conn.commit()
# Audit-Log
log_audit('UPDATE', 'license', license_id,
old_values={
'is_active': old_values[0],
'valid_until': old_values[1].isoformat() if old_values[1] else None,
'license_type': old_values[2]
},
new_values=new_values)
cur.close()
conn.close()
return jsonify({'success': True})
except Exception as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'success': False, 'error': str(e)}), 500
@app.route("/sessions")
@login_required
def sessions():