aktiv-inaktiv der Lizenzen ist gefixt

Dieser Commit ist enthalten in:
2025-06-18 19:09:36 +02:00
Ursprung eb9f86b918
Commit 4bfe1983a3
49 geänderte Dateien mit 58 neuen und 39933 gelöschten Zeilen

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Binäre Datei nicht angezeigt.

Datei anzeigen

@@ -65,22 +65,22 @@ def toggle_license(license_id):
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
new_status = not license_data['active']
new_status = not license_data['is_active']
# Update status
cur.execute("UPDATE licenses SET active = %s WHERE id = %s", (new_status, license_id))
cur.execute("UPDATE licenses SET is_active = %s WHERE id = %s", (new_status, license_id))
conn.commit()
# Log change
log_audit('TOGGLE', 'license', license_id,
old_values={'active': license_data['active']},
new_values={'active': new_status})
old_values={'is_active': license_data['is_active']},
new_values={'is_active': new_status})
return jsonify({'success': True, 'active': new_status})
return jsonify({'success': True, 'is_active': new_status})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Umschalten der Lizenz: {str(e)}")
logging.error(f"Fehler beim Umschalten der Lizenz: {str(e)}", exc_info=True)
return jsonify({'error': 'Fehler beim Umschalten der Lizenz'}), 500
finally:
cur.close()
@@ -104,8 +104,8 @@ def bulk_activate_licenses():
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET active = true
WHERE id = ANY(%s) AND active = false
SET is_active = true
WHERE id = ANY(%s) AND is_active = false
RETURNING id
""", (license_ids,))
@@ -115,7 +115,7 @@ def bulk_activate_licenses():
# Log changes
for license_id in updated_ids:
log_audit('BULK_ACTIVATE', 'license', license_id,
new_values={'active': True})
new_values={'is_active': True})
return jsonify({
'success': True,
@@ -148,8 +148,8 @@ def bulk_deactivate_licenses():
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET active = false
WHERE id = ANY(%s) AND active = true
SET is_active = false
WHERE id = ANY(%s) AND is_active = true
RETURNING id
""", (license_ids,))
@@ -159,7 +159,7 @@ def bulk_deactivate_licenses():
# Log changes
for license_id in updated_ids:
log_audit('BULK_DEACTIVATE', 'license', license_id,
new_values={'active': False})
new_values={'is_active': False})
return jsonify({
'success': True,
@@ -451,10 +451,10 @@ def quick_edit_license(license_id):
new_values['valid_until'] = data['valid_until']
if 'active' in data:
updates.append("active = %s")
updates.append("is_active = %s")
params.append(bool(data['active']))
old_values['active'] = current_license['active']
new_values['active'] = bool(data['active'])
old_values['is_active'] = current_license['is_active']
new_values['is_active'] = bool(data['active'])
if not updates:
return jsonify({'error': 'Keine Änderungen angegeben'}), 400
@@ -797,7 +797,7 @@ def global_search():
try:
# Suche in Lizenzen
cur.execute("""
SELECT id, license_key, customer_name, active
SELECT id, license_key, customer_name, is_active
FROM licenses
WHERE license_key ILIKE %s
OR customer_name ILIKE %s
@@ -810,7 +810,7 @@ def global_search():
'id': row[0],
'license_key': row[1],
'customer_name': row[2],
'active': row[3]
'is_active': row[3]
})
# Suche in Kunden

Datei anzeigen

@@ -1,943 +0,0 @@
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from flask import Blueprint, request, jsonify, session
import config
from auth.decorators import login_required
from utils.audit import log_audit
from utils.network import get_client_ip
from utils.license import generate_license_key
from db import get_connection, get_db_connection, get_db_cursor
from models import get_license_by_id, get_customers
# Create Blueprint
api_bp = Blueprint('api', __name__, url_prefix='/api')
@api_bp.route("/customers", methods=["GET"])
@login_required
def api_customers():
"""API endpoint for customer search (used by Select2)"""
search = request.args.get('q', '').strip()
page = int(request.args.get('page', 1))
per_page = 20
try:
# Get all customers (with optional search)
customers = get_customers(show_test=True, search=search)
# Pagination
start = (page - 1) * per_page
end = start + per_page
paginated_customers = customers[start:end]
# Format for Select2
results = []
for customer in paginated_customers:
results.append({
'id': customer['id'],
'text': f"{customer['name']} ({customer['email'] or 'keine E-Mail'})"
})
return jsonify({
'results': results,
'pagination': {
'more': len(customers) > end
}
})
except Exception as e:
logging.error(f"Error in api_customers: {str(e)}")
return jsonify({'error': 'Fehler beim Laden der Kunden'}), 500
@api_bp.route("/license/<int:license_id>/toggle", methods=["POST"])
@login_required
def toggle_license(license_id):
"""Toggle license active status"""
conn = get_connection()
cur = conn.cursor()
try:
# Get current status
license_data = get_license_by_id(license_id)
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
new_status = not license_data['active']
# Update status
cur.execute("UPDATE licenses SET active = %s WHERE id = %s", (new_status, license_id))
conn.commit()
# Log change
log_audit('TOGGLE', 'license', license_id,
old_values={'active': license_data['active']},
new_values={'active': new_status})
return jsonify({'success': True, 'active': new_status})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Umschalten der Lizenz: {str(e)}")
return jsonify({'error': 'Fehler beim Umschalten der Lizenz'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/licenses/bulk-activate", methods=["POST"])
@login_required
def bulk_activate_licenses():
"""Aktiviere mehrere Lizenzen gleichzeitig"""
data = request.get_json()
license_ids = data.get('license_ids', [])
if not license_ids:
return jsonify({'error': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
try:
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET active = true
WHERE id = ANY(%s) AND active = false
RETURNING id
""", (license_ids,))
updated_ids = [row[0] for row in cur.fetchall()]
conn.commit()
# Log changes
for license_id in updated_ids:
log_audit('BULK_ACTIVATE', 'license', license_id,
new_values={'active': True})
return jsonify({
'success': True,
'updated_count': len(updated_ids)
})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Bulk-Aktivieren: {str(e)}")
return jsonify({'error': 'Fehler beim Aktivieren der Lizenzen'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/licenses/bulk-deactivate", methods=["POST"])
@login_required
def bulk_deactivate_licenses():
"""Deaktiviere mehrere Lizenzen gleichzeitig"""
data = request.get_json()
license_ids = data.get('license_ids', [])
if not license_ids:
return jsonify({'error': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
try:
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET active = false
WHERE id = ANY(%s) AND active = true
RETURNING id
""", (license_ids,))
updated_ids = [row[0] for row in cur.fetchall()]
conn.commit()
# Log changes
for license_id in updated_ids:
log_audit('BULK_DEACTIVATE', 'license', license_id,
new_values={'active': False})
return jsonify({
'success': True,
'updated_count': len(updated_ids)
})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Bulk-Deaktivieren: {str(e)}")
return jsonify({'error': 'Fehler beim Deaktivieren der Lizenzen'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/license/<int:license_id>/devices")
@login_required
def get_license_devices(license_id):
"""Hole alle Geräte einer Lizenz"""
conn = get_connection()
cur = conn.cursor()
try:
# Hole Lizenz-Info
license_data = get_license_by_id(license_id)
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
# Hole registrierte Geräte
cur.execute("""
SELECT
dr.id,
dr.device_id,
dr.device_name,
dr.device_type,
dr.registration_date,
dr.last_seen,
dr.is_active,
(SELECT COUNT(*) FROM sessions s
WHERE s.license_key = dr.license_key
AND s.device_id = dr.device_id
AND s.active = true) as active_sessions
FROM device_registrations dr
WHERE dr.license_key = %s
ORDER BY dr.registration_date DESC
""", (license_data['license_key'],))
devices = []
for row in cur.fetchall():
devices.append({
'id': row[0],
'device_id': row[1],
'device_name': row[2],
'device_type': row[3],
'registration_date': row[4].isoformat() if row[4] else None,
'last_seen': row[5].isoformat() if row[5] else None,
'is_active': row[6],
'active_sessions': row[7]
})
return jsonify({
'license_key': license_data['license_key'],
'device_limit': license_data['device_limit'],
'devices': devices,
'device_count': len(devices)
})
except Exception as e:
logging.error(f"Fehler beim Abrufen der Geräte: {str(e)}")
return jsonify({'error': 'Fehler beim Abrufen der Geräte'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/license/<int:license_id>/register-device", methods=["POST"])
@login_required
def register_device(license_id):
"""Registriere ein neues Gerät für eine Lizenz"""
data = request.get_json()
device_id = data.get('device_id')
device_name = data.get('device_name')
device_type = data.get('device_type', 'unknown')
if not device_id or not device_name:
return jsonify({'error': 'Geräte-ID und Name erforderlich'}), 400
conn = get_connection()
cur = conn.cursor()
try:
# Hole Lizenz-Info
license_data = get_license_by_id(license_id)
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
# Prüfe Gerätelimit
cur.execute("""
SELECT COUNT(*) FROM device_registrations
WHERE license_key = %s AND is_active = true
""", (license_data['license_key'],))
active_device_count = cur.fetchone()[0]
if active_device_count >= license_data['device_limit']:
return jsonify({'error': 'Gerätelimit erreicht'}), 400
# Prüfe ob Gerät bereits registriert
cur.execute("""
SELECT id, is_active FROM device_registrations
WHERE license_key = %s AND device_id = %s
""", (license_data['license_key'], device_id))
existing = cur.fetchone()
if existing:
if existing[1]: # is_active
return jsonify({'error': 'Gerät bereits registriert'}), 400
else:
# Reaktiviere Gerät
cur.execute("""
UPDATE device_registrations
SET is_active = true, last_seen = CURRENT_TIMESTAMP
WHERE id = %s
""", (existing[0],))
else:
# Registriere neues Gerät
cur.execute("""
INSERT INTO device_registrations
(license_key, device_id, device_name, device_type, is_active)
VALUES (%s, %s, %s, %s, true)
""", (license_data['license_key'], device_id, device_name, device_type))
conn.commit()
# Audit-Log
log_audit('DEVICE_REGISTER', 'license', license_id,
additional_info=f"Gerät {device_name} ({device_id}) registriert")
return jsonify({'success': True})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Registrieren des Geräts: {str(e)}")
return jsonify({'error': 'Fehler beim Registrieren des Geräts'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/license/<int:license_id>/deactivate-device/<int:device_id>", methods=["POST"])
@login_required
def deactivate_device(license_id, device_id):
"""Deaktiviere ein Gerät einer Lizenz"""
conn = get_connection()
cur = conn.cursor()
try:
# Prüfe ob Gerät zur Lizenz gehört
cur.execute("""
SELECT dr.device_name, dr.device_id, l.license_key
FROM device_registrations dr
JOIN licenses l ON dr.license_key = l.license_key
WHERE dr.id = %s AND l.id = %s
""", (device_id, license_id))
device = cur.fetchone()
if not device:
return jsonify({'error': 'Gerät nicht gefunden'}), 404
# Deaktiviere Gerät
cur.execute("""
UPDATE device_registrations
SET is_active = false
WHERE id = %s
""", (device_id,))
# Beende aktive Sessions
cur.execute("""
UPDATE sessions
SET active = false, logout_time = CURRENT_TIMESTAMP
WHERE license_key = %s AND device_id = %s AND active = true
""", (device[2], device[1]))
conn.commit()
# Audit-Log
log_audit('DEVICE_DEACTIVATE', 'license', license_id,
additional_info=f"Gerät {device[0]} ({device[1]}) deaktiviert")
return jsonify({'success': True})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Deaktivieren des Geräts: {str(e)}")
return jsonify({'error': 'Fehler beim Deaktivieren des Geräts'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/licenses/bulk-delete", methods=["POST"])
@login_required
def bulk_delete_licenses():
"""Lösche mehrere Lizenzen gleichzeitig"""
data = request.get_json()
license_ids = data.get('license_ids', [])
if not license_ids:
return jsonify({'error': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
try:
deleted_count = 0
for license_id in license_ids:
# Hole Lizenz-Info für Audit
cur.execute("SELECT license_key FROM licenses WHERE id = %s", (license_id,))
result = cur.fetchone()
if result:
license_key = result[0]
# Lösche Sessions
cur.execute("DELETE FROM sessions WHERE license_key = %s", (license_key,))
# Lösche Geräte-Registrierungen
cur.execute("DELETE FROM device_registrations WHERE license_key = %s", (license_key,))
# Lösche Lizenz
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
# Audit-Log
log_audit('BULK_DELETE', 'license', license_id,
old_values={'license_key': license_key})
deleted_count += 1
conn.commit()
return jsonify({
'success': True,
'deleted_count': deleted_count
})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Bulk-Löschen: {str(e)}")
return jsonify({'error': 'Fehler beim Löschen der Lizenzen'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/license/<int:license_id>/quick-edit", methods=['POST'])
@login_required
def quick_edit_license(license_id):
"""Schnellbearbeitung einer Lizenz"""
data = request.get_json()
conn = get_connection()
cur = conn.cursor()
try:
# Hole aktuelle Lizenz für Vergleich
current_license = get_license_by_id(license_id)
if not current_license:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
# Update nur die übergebenen Felder
updates = []
params = []
old_values = {}
new_values = {}
if 'device_limit' in data:
updates.append("device_limit = %s")
params.append(int(data['device_limit']))
old_values['device_limit'] = current_license['device_limit']
new_values['device_limit'] = int(data['device_limit'])
if 'valid_until' in data:
updates.append("valid_until = %s")
params.append(data['valid_until'])
old_values['valid_until'] = str(current_license['valid_until'])
new_values['valid_until'] = data['valid_until']
if 'active' in data:
updates.append("active = %s")
params.append(bool(data['active']))
old_values['active'] = current_license['active']
new_values['active'] = bool(data['active'])
if not updates:
return jsonify({'error': 'Keine Änderungen angegeben'}), 400
# Führe Update aus
params.append(license_id)
cur.execute(f"""
UPDATE licenses
SET {', '.join(updates)}
WHERE id = %s
""", params)
conn.commit()
# Audit-Log
log_audit('QUICK_EDIT', 'license', license_id,
old_values=old_values,
new_values=new_values)
return jsonify({'success': True})
except Exception as e:
conn.rollback()
logging.error(f"Fehler bei Schnellbearbeitung: {str(e)}")
return jsonify({'error': 'Fehler bei der Bearbeitung'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/license/<int:license_id>/resources")
@login_required
def get_license_resources(license_id):
"""Hole alle Ressourcen einer Lizenz"""
conn = get_connection()
cur = conn.cursor()
try:
# Hole Lizenz-Info
license_data = get_license_by_id(license_id)
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
# Hole zugewiesene Ressourcen
cur.execute("""
SELECT
rp.id,
rp.resource_type,
rp.resource_value,
rp.is_test,
rp.status_changed_at,
lr.assigned_at,
lr.assigned_by
FROM resource_pools rp
JOIN license_resources lr ON rp.id = lr.resource_id
WHERE lr.license_id = %s
ORDER BY rp.resource_type, rp.resource_value
""", (license_id,))
resources = []
for row in cur.fetchall():
resources.append({
'id': row[0],
'type': row[1],
'value': row[2],
'is_test': row[3],
'status_changed_at': row[4].isoformat() if row[4] else None,
'assigned_at': row[5].isoformat() if row[5] else None,
'assigned_by': row[6]
})
# Gruppiere nach Typ
grouped = {}
for resource in resources:
res_type = resource['type']
if res_type not in grouped:
grouped[res_type] = []
grouped[res_type].append(resource)
return jsonify({
'license_key': license_data['license_key'],
'resources': resources,
'grouped': grouped,
'total_count': len(resources)
})
except Exception as e:
logging.error(f"Fehler beim Abrufen der Ressourcen: {str(e)}")
return jsonify({'error': 'Fehler beim Abrufen der Ressourcen'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/resources/allocate", methods=['POST'])
@login_required
def allocate_resources():
"""Weise Ressourcen einer Lizenz zu"""
data = request.get_json()
license_id = data.get('license_id')
resource_ids = data.get('resource_ids', [])
if not license_id or not resource_ids:
return jsonify({'error': 'Lizenz-ID und Ressourcen erforderlich'}), 400
conn = get_connection()
cur = conn.cursor()
try:
# Prüfe Lizenz
license_data = get_license_by_id(license_id)
if not license_data:
return jsonify({'error': 'Lizenz nicht gefunden'}), 404
allocated_count = 0
errors = []
for resource_id in resource_ids:
try:
# Prüfe ob Ressource verfügbar ist
cur.execute("""
SELECT resource_value, status, is_test
FROM resource_pools
WHERE id = %s
""", (resource_id,))
resource = cur.fetchone()
if not resource:
errors.append(f"Ressource {resource_id} nicht gefunden")
continue
if resource[1] != 'available':
errors.append(f"Ressource {resource[0]} ist nicht verfügbar")
continue
# Prüfe Test/Produktion Kompatibilität
if resource[2] != license_data['is_test']:
errors.append(f"Ressource {resource[0]} ist {'Test' if resource[2] else 'Produktion'}, Lizenz ist {'Test' if license_data['is_test'] else 'Produktion'}")
continue
# Weise Ressource zu
cur.execute("""
UPDATE resource_pools
SET status = 'allocated',
allocated_to_license = %s,
status_changed_at = CURRENT_TIMESTAMP,
status_changed_by = %s
WHERE id = %s
""", (license_id, session['username'], resource_id))
# Erstelle Verknüpfung
cur.execute("""
INSERT INTO license_resources (license_id, resource_id, assigned_by)
VALUES (%s, %s, %s)
""", (license_id, resource_id, session['username']))
# History-Eintrag
cur.execute("""
INSERT INTO resource_history (resource_id, license_id, action, action_by, ip_address)
VALUES (%s, %s, 'allocated', %s, %s)
""", (resource_id, license_id, session['username'], get_client_ip()))
allocated_count += 1
except Exception as e:
errors.append(f"Fehler bei Ressource {resource_id}: {str(e)}")
conn.commit()
# Audit-Log
if allocated_count > 0:
log_audit('RESOURCE_ALLOCATE', 'license', license_id,
additional_info=f"{allocated_count} Ressourcen zugewiesen")
return jsonify({
'success': True,
'allocated_count': allocated_count,
'errors': errors
})
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Zuweisen der Ressourcen: {str(e)}")
return jsonify({'error': 'Fehler beim Zuweisen der Ressourcen'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/resources/check-availability", methods=['GET'])
@login_required
def check_resource_availability():
"""Prüfe Verfügbarkeit von Ressourcen"""
resource_type = request.args.get('type')
count = int(request.args.get('count', 1))
is_test = request.args.get('is_test', 'false') == 'true'
if not resource_type:
return jsonify({'error': 'Ressourcen-Typ erforderlich'}), 400
conn = get_connection()
cur = conn.cursor()
try:
# Zähle verfügbare Ressourcen
cur.execute("""
SELECT COUNT(*)
FROM resource_pools
WHERE resource_type = %s
AND status = 'available'
AND is_test = %s
""", (resource_type, is_test))
available_count = cur.fetchone()[0]
return jsonify({
'resource_type': resource_type,
'requested': count,
'available': available_count,
'sufficient': available_count >= count,
'is_test': is_test
})
except Exception as e:
logging.error(f"Fehler beim Prüfen der Verfügbarkeit: {str(e)}")
return jsonify({'error': 'Fehler beim Prüfen der Verfügbarkeit'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/global-search", methods=['GET'])
@login_required
def global_search():
"""Globale Suche über alle Entitäten"""
query = request.args.get('q', '').strip()
if not query or len(query) < 3:
return jsonify({'error': 'Suchbegriff muss mindestens 3 Zeichen haben'}), 400
conn = get_connection()
cur = conn.cursor()
results = {
'licenses': [],
'customers': [],
'resources': [],
'sessions': []
}
try:
# Suche in Lizenzen
cur.execute("""
SELECT id, license_key, customer_name, active
FROM licenses
WHERE license_key ILIKE %s
OR customer_name ILIKE %s
OR customer_email ILIKE %s
LIMIT 10
""", (f'%{query}%', f'%{query}%', f'%{query}%'))
for row in cur.fetchall():
results['licenses'].append({
'id': row[0],
'license_key': row[1],
'customer_name': row[2],
'active': row[3]
})
# Suche in Kunden
cur.execute("""
SELECT id, name, email
FROM customers
WHERE name ILIKE %s OR email ILIKE %s
LIMIT 10
""", (f'%{query}%', f'%{query}%'))
for row in cur.fetchall():
results['customers'].append({
'id': row[0],
'name': row[1],
'email': row[2]
})
# Suche in Ressourcen
cur.execute("""
SELECT id, resource_type, resource_value, status
FROM resource_pools
WHERE resource_value ILIKE %s
LIMIT 10
""", (f'%{query}%',))
for row in cur.fetchall():
results['resources'].append({
'id': row[0],
'type': row[1],
'value': row[2],
'status': row[3]
})
# Suche in Sessions
cur.execute("""
SELECT id, license_key, username, device_id, active
FROM sessions
WHERE username ILIKE %s OR device_id ILIKE %s
ORDER BY login_time DESC
LIMIT 10
""", (f'%{query}%', f'%{query}%'))
for row in cur.fetchall():
results['sessions'].append({
'id': row[0],
'license_key': row[1],
'username': row[2],
'device_id': row[3],
'active': row[4]
})
return jsonify(results)
except Exception as e:
logging.error(f"Fehler bei der globalen Suche: {str(e)}")
return jsonify({'error': 'Fehler bei der Suche'}), 500
finally:
cur.close()
conn.close()
@api_bp.route("/generate-license-key", methods=['POST'])
@login_required
def api_generate_key():
"""API Endpoint zur Generierung eines neuen Lizenzschlüssels"""
try:
# Lizenztyp aus Request holen (default: full)
data = request.get_json() or {}
license_type = data.get('type', 'full')
# Key generieren
key = generate_license_key(license_type)
# Prüfen ob Key bereits existiert (sehr unwahrscheinlich aber sicher ist sicher)
conn = get_connection()
cur = conn.cursor()
# Wiederhole bis eindeutiger Key gefunden
attempts = 0
while attempts < 10: # Max 10 Versuche
cur.execute("SELECT 1 FROM licenses WHERE license_key = %s", (key,))
if not cur.fetchone():
break # Key ist eindeutig
key = generate_license_key(license_type)
attempts += 1
cur.close()
conn.close()
# Log für Audit
log_audit('GENERATE_KEY', 'license',
additional_info={'type': license_type, 'key': key})
return jsonify({
'success': True,
'key': key,
'type': license_type
})
except Exception as e:
logging.error(f"Fehler bei Key-Generierung: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler bei der Key-Generierung'
}), 500
@api_bp.route("/customers", methods=['GET'])
@login_required
def api_customers():
"""API Endpoint für die Kundensuche mit Select2"""
try:
# Suchparameter
search = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
customer_id = request.args.get('id', type=int)
conn = get_connection()
cur = conn.cursor()
# Einzelnen Kunden per ID abrufen
if customer_id:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE c.id = %s
GROUP BY c.id, c.name, c.email
""", (customer_id,))
customer = cur.fetchone()
results = []
if customer:
results.append({
'id': customer[0],
'text': f"{customer[1]} ({customer[2]})",
'name': customer[1],
'email': customer[2],
'license_count': customer[3]
})
cur.close()
conn.close()
return jsonify({
'results': results,
'pagination': {'more': False}
})
# SQL Query mit optionaler Suche
elif search:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
GROUP BY c.id, c.name, c.email
ORDER BY c.name
LIMIT %s OFFSET %s
""", (f'%{search}%', f'%{search}%', per_page, (page - 1) * per_page))
else:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email
ORDER BY c.name
LIMIT %s OFFSET %s
""", (per_page, (page - 1) * per_page))
customers = cur.fetchall()
# Format für Select2
results = []
for customer in customers:
results.append({
'id': customer[0],
'text': f"{customer[1]} - {customer[2]} ({customer[3]} Lizenzen)",
'name': customer[1],
'email': customer[2],
'license_count': customer[3]
})
# Gesamtanzahl für Pagination
if search:
cur.execute("""
SELECT COUNT(*) FROM customers
WHERE LOWER(name) LIKE LOWER(%s)
OR LOWER(email) LIKE LOWER(%s)
""", (f'%{search}%', f'%{search}%'))
else:
cur.execute("SELECT COUNT(*) FROM customers")
total_count = cur.fetchone()[0]
cur.close()
conn.close()
# Select2 Response Format
return jsonify({
'results': results,
'pagination': {
'more': (page * per_page) < total_count
}
})
except Exception as e:
logging.error(f"Fehler bei Kundensuche: {str(e)}")
return jsonify({
'results': [],
'pagination': {'more': False},
'error': str(e)
}), 500

Datei anzeigen

@@ -24,12 +24,39 @@ def test_customers():
def customers():
show_test = request.args.get('show_test', 'false').lower() == 'true'
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
sort = request.args.get('sort', 'name')
order = request.args.get('order', 'asc')
customers_list = get_customers(show_test=show_test, search=search)
# Sortierung
if sort == 'name':
customers_list.sort(key=lambda x: x['name'].lower(), reverse=(order == 'desc'))
elif sort == 'email':
customers_list.sort(key=lambda x: x['email'].lower(), reverse=(order == 'desc'))
elif sort == 'created_at':
customers_list.sort(key=lambda x: x['created_at'], reverse=(order == 'desc'))
# Paginierung
total_customers = len(customers_list)
total_pages = (total_customers + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
paginated_customers = customers_list[start:end]
return render_template("customers.html",
customers=customers_list,
customers=paginated_customers,
show_test=show_test,
search=search)
search=search,
page=page,
per_page=per_page,
total_pages=total_pages,
total_customers=total_customers,
sort=sort,
order=order,
current_order=order)
@customer_bp.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])