Trennung Testdaten und Livedaten

Dieser Commit ist enthalten in:
2025-06-09 15:48:15 +02:00
Ursprung b910a3d999
Commit 90ce9a52f8
12 geänderte Dateien mit 417 neuen und 105 gelöschten Zeilen

Datei anzeigen

@@ -1174,18 +1174,18 @@ def dashboard():
cur = conn.cursor()
# Statistiken abrufen
# Gesamtanzahl Kunden
cur.execute("SELECT COUNT(*) FROM customers")
# Gesamtanzahl Kunden (ohne Testdaten)
cur.execute("SELECT COUNT(*) FROM customers WHERE is_test = FALSE")
total_customers = cur.fetchone()[0]
# Gesamtanzahl Lizenzen
cur.execute("SELECT COUNT(*) FROM licenses")
# Gesamtanzahl Lizenzen (ohne Testdaten)
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_test = FALSE")
total_licenses = cur.fetchone()[0]
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
# Aktive Lizenzen (nicht abgelaufen und is_active = true, ohne Testdaten)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE AND is_test = FALSE
""")
active_licenses = cur.fetchone()[0]
@@ -1193,38 +1193,52 @@ def dashboard():
cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE")
active_sessions_count = cur.fetchone()[0]
# Abgelaufene Lizenzen
# Abgelaufene Lizenzen (ohne Testdaten)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until < CURRENT_DATE
WHERE valid_until < CURRENT_DATE AND is_test = FALSE
""")
expired_licenses = cur.fetchone()[0]
# Deaktivierte Lizenzen
# Deaktivierte Lizenzen (ohne Testdaten)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE is_active = FALSE
WHERE is_active = FALSE AND is_test = FALSE
""")
inactive_licenses = cur.fetchone()[0]
# Lizenzen die in den nächsten 30 Tagen ablaufen
# Lizenzen die in den nächsten 30 Tagen ablaufen (ohne Testdaten)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
AND is_active = TRUE
AND is_test = FALSE
""")
expiring_soon = cur.fetchone()[0]
# Testlizenzen vs Vollversionen
# Testlizenzen vs Vollversionen (ohne Testdaten)
cur.execute("""
SELECT license_type, COUNT(*)
FROM licenses
WHERE is_test = FALSE
GROUP BY license_type
""")
license_types = dict(cur.fetchall())
# Letzte 5 erstellten Lizenzen
# Anzahl Testdaten
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_test = TRUE")
test_data_count = cur.fetchone()[0]
# Anzahl Test-Kunden
cur.execute("SELECT COUNT(*) FROM customers WHERE is_test = TRUE")
test_customers_count = cur.fetchone()[0]
# Anzahl Test-Ressourcen
cur.execute("SELECT COUNT(*) FROM resource_pools WHERE is_test = TRUE")
test_resources_count = cur.fetchone()[0]
# Letzte 5 erstellten Lizenzen (ohne Testdaten)
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
CASE
@@ -1235,12 +1249,13 @@ def dashboard():
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.is_test = FALSE
ORDER BY l.id DESC
LIMIT 5
""")
recent_licenses = cur.fetchall()
# Bald ablaufende Lizenzen (nächste 30 Tage)
# Bald ablaufende Lizenzen (nächste 30 Tage, ohne Testdaten)
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
l.valid_until - CURRENT_DATE as days_left
@@ -1249,6 +1264,7 @@ def dashboard():
WHERE l.valid_until >= CURRENT_DATE
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
AND l.is_active = TRUE
AND l.is_test = FALSE
ORDER BY l.valid_until
LIMIT 10
""")
@@ -1358,6 +1374,9 @@ def dashboard():
'expiring_soon': expiring_soon,
'full_licenses': license_types.get('full', 0),
'test_licenses': license_types.get('test', 0),
'test_data_count': test_data_count,
'test_customers_count': test_customers_count,
'test_resources_count': test_resources_count,
'recent_licenses': recent_licenses,
'expiring_licenses': expiring_licenses,
'active_sessions': active_sessions_count,
@@ -1385,6 +1404,7 @@ def create_license():
license_key = request.form["license_key"].upper() # Immer Großbuchstaben
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
is_test = request.form.get("is_test") == "on" # Checkbox value
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
@@ -1438,35 +1458,39 @@ def create_license():
flash(f'E-Mail bereits vergeben für Kunde: {existing[1]}', 'error')
return redirect(url_for('create_license'))
# Kunde einfügen
# Kunde einfügen (erbt Test-Status von Lizenz)
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
INSERT INTO customers (name, email, is_test, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id
""", (name, email))
""", (name, email, is_test))
customer_id = cur.fetchone()[0]
customer_info = {'name': name, 'email': email}
customer_info = {'name': name, 'email': email, 'is_test': is_test}
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email})
new_values={'name': name, 'email': email, 'is_test': is_test})
else:
# Bestehender Kunde - hole Infos für Audit-Log
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
cur.execute("SELECT name, email, is_test FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('create_license'))
customer_info = {'name': customer_data[0], 'email': customer_data[1]}
# Wenn Kunde Test-Kunde ist, Lizenz auch als Test markieren
if customer_data[2]: # is_test des Kunden
is_test = True
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active,
domain_count, ipv4_count, phone_count)
VALUES (%s, %s, %s, %s, %s, TRUE, %s, %s, %s)
domain_count, ipv4_count, phone_count, is_test)
VALUES (%s, %s, %s, %s, %s, TRUE, %s, %s, %s, %s)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until,
domain_count, ipv4_count, phone_count))
domain_count, ipv4_count, phone_count, is_test))
license_id = cur.fetchone()[0]
# Ressourcen zuweisen
@@ -1474,10 +1498,10 @@ def create_license():
# Prüfe Verfügbarkeit
cur.execute("""
SELECT
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available') as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available') as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available') as phones
""")
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s) as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s) as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s) as phones
""", (is_test, is_test, is_test))
available = cur.fetchone()
if available[0] < domain_count:
@@ -1491,9 +1515,9 @@ def create_license():
if domain_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'domain' AND status = 'available'
WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (domain_count,))
""", (is_test, domain_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1516,9 +1540,9 @@ def create_license():
if ipv4_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'ipv4' AND status = 'available'
WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (ipv4_count,))
""", (is_test, ipv4_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1541,9 +1565,9 @@ def create_license():
if phone_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'phone' AND status = 'available'
WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (phone_count,))
""", (is_test, phone_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1577,7 +1601,8 @@ def create_license():
'customer_email': customer_info['email'],
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until
'valid_until': valid_until,
'is_test': is_test
})
flash(f'Lizenz {license_key} erfolgreich erstellt!', 'success')
@@ -1604,6 +1629,7 @@ def batch_licenses():
license_type = request.form["license_type"]
quantity = int(request.form["quantity"])
valid_from = request.form["valid_from"]
is_test = request.form.get("is_test") == "on" # Checkbox value
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
@@ -1657,26 +1683,30 @@ def batch_licenses():
flash(f'E-Mail bereits vergeben für Kunde: {existing[1]}', 'error')
return redirect(url_for('batch_licenses'))
# Kunde einfügen
# Kunde einfügen (erbt Test-Status von Lizenz)
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
INSERT INTO customers (name, email, is_test, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id
""", (name, email))
""", (name, email, is_test))
customer_id = cur.fetchone()[0]
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email})
new_values={'name': name, 'email': email, 'is_test': is_test})
else:
# Bestehender Kunde - hole Infos
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
cur.execute("SELECT name, email, is_test FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('batch_licenses'))
name = customer_data[0]
email = customer_data[1]
# Wenn Kunde Test-Kunde ist, Lizenzen auch als Test markieren
if customer_data[2]: # is_test des Kunden
is_test = True
# Prüfe Ressourcen-Verfügbarkeit für gesamten Batch
total_domains_needed = domain_count * quantity
@@ -1685,10 +1715,10 @@ def batch_licenses():
cur.execute("""
SELECT
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available') as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available') as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available') as phones
""")
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s) as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s) as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s) as phones
""", (is_test, is_test, is_test))
available = cur.fetchone()
if available[0] < total_domains_needed:
@@ -1715,12 +1745,12 @@ def batch_licenses():
# Lizenz einfügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type,
INSERT INTO licenses (license_key, customer_id, license_type, is_test,
valid_from, valid_until, is_active,
domain_count, ipv4_count, phone_count)
VALUES (%s, %s, %s, %s, %s, true, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, true, %s, %s, %s)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until,
""", (license_key, customer_id, license_type, is_test, valid_from, valid_until,
domain_count, ipv4_count, phone_count))
license_id = cur.fetchone()[0]
@@ -1729,9 +1759,9 @@ def batch_licenses():
if domain_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'domain' AND status = 'available'
WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (domain_count,))
""", (is_test, domain_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1754,9 +1784,9 @@ def batch_licenses():
if ipv4_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'ipv4' AND status = 'available'
WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (ipv4_count,))
""", (is_test, ipv4_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1779,9 +1809,9 @@ def batch_licenses():
if phone_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'phone' AND status = 'available'
WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s
LIMIT %s FOR UPDATE
""", (phone_count,))
""", (is_test, phone_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -1921,7 +1951,7 @@ def licenses():
# SQL Query mit optionaler Suche und Filtern
query = """
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active,
l.valid_from, l.valid_until, l.is_active, l.is_test,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
@@ -1947,8 +1977,13 @@ def licenses():
# Typ-Filter
if filter_type:
query += " AND l.license_type = %s"
params.append(filter_type)
if filter_type == 'test_data':
query += " AND l.is_test = TRUE"
elif filter_type == 'live_data':
query += " AND l.is_test = FALSE"
else:
query += " AND l.license_type = %s AND l.is_test = FALSE"
params.append(filter_type)
# Status-Filter
if filter_status == 'active':
@@ -2013,7 +2048,7 @@ def edit_license(license_id):
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("""
SELECT license_key, license_type, valid_from, valid_until, is_active
SELECT license_key, license_type, valid_from, valid_until, is_active, is_test
FROM licenses WHERE id = %s
""", (license_id,))
old_license = cur.fetchone()
@@ -2024,13 +2059,14 @@ def edit_license(license_id):
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
is_active = request.form.get("is_active") == "on"
is_test = request.form.get("is_test") == "on"
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s
valid_until = %s, is_active = %s, is_test = %s
WHERE id = %s
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
""", (license_key, license_type, valid_from, valid_until, is_active, is_test, license_id))
conn.commit()
@@ -2041,14 +2077,16 @@ def edit_license(license_id):
'license_type': old_license[1],
'valid_from': str(old_license[2]),
'valid_until': str(old_license[3]),
'is_active': old_license[4]
'is_active': old_license[4],
'is_test': old_license[5]
},
new_values={
'license_key': license_key,
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until,
'is_active': is_active
'is_active': is_active,
'is_test': is_test
})
cur.close()
@@ -2059,7 +2097,7 @@ def edit_license(license_id):
# Get license data
cur.execute("""
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active, c.id
l.valid_from, l.valid_until, l.is_active, c.id, l.is_test
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
@@ -2140,7 +2178,7 @@ def customers():
# SQL Query mit optionaler Suche
base_query = """
SELECT c.id, c.name, c.email, c.created_at,
SELECT c.id, c.name, c.email, c.created_at, c.is_test,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
FROM customers c
@@ -2173,7 +2211,7 @@ def customers():
# Pagination
offset = (page - 1) * per_page
query = base_query + f"""
GROUP BY c.id, c.name, c.email, c.created_at
GROUP BY c.id, c.name, c.email, c.created_at, c.is_test
ORDER BY {sort_field} {order.upper()}
LIMIT %s OFFSET %s
"""
@@ -2206,18 +2244,19 @@ def edit_customer(customer_id):
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
cur.execute("SELECT name, email, is_test FROM customers WHERE id = %s", (customer_id,))
old_customer = cur.fetchone()
# Update customer
name = request.form["name"]
email = request.form["email"]
is_test = request.form.get("is_test") == "on"
cur.execute("""
UPDATE customers
SET name = %s, email = %s
SET name = %s, email = %s, is_test = %s
WHERE id = %s
""", (name, email, customer_id))
""", (name, email, is_test, customer_id))
conn.commit()
@@ -2225,11 +2264,13 @@ def edit_customer(customer_id):
log_audit('UPDATE', 'customer', customer_id,
old_values={
'name': old_customer[0],
'email': old_customer[1]
'email': old_customer[1],
'is_test': old_customer[2]
},
new_values={
'name': name,
'email': email
'email': email,
'is_test': is_test
})
cur.close()
@@ -2239,12 +2280,15 @@ def edit_customer(customer_id):
# Get customer data with licenses
cur.execute("""
SELECT id, name, email, created_at
FROM customers
WHERE id = %s
SELECT id, name, email, is_test FROM customers WHERE id = %s
""", (customer_id,))
customer = cur.fetchone()
if not customer:
cur.close()
conn.close()
return "Kunde nicht gefunden", 404
# Get customer's licenses
cur.execute("""
@@ -2409,10 +2453,12 @@ def export_licenses():
conn = get_connection()
cur = conn.cursor()
# Alle Lizenzen mit Kundeninformationen abrufen
cur.execute("""
# Alle Lizenzen mit Kundeninformationen abrufen (ohne Testdaten, außer explizit gewünscht)
include_test = request.args.get('include_test', 'false').lower() == 'true'
query = """
SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email,
l.license_type, l.valid_from, l.valid_until, l.is_active,
l.license_type, l.valid_from, l.valid_until, l.is_active, l.is_test,
CASE
WHEN l.is_active = FALSE THEN 'Deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
@@ -2421,12 +2467,18 @@ def export_licenses():
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id
""")
"""
if not include_test:
query += " WHERE l.is_test = FALSE"
query += " ORDER BY l.id"
cur.execute(query)
# Spaltennamen
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ',
'Gültig von', 'Gültig bis', 'Aktiv', 'Status']
'Gültig von', 'Gültig bis', 'Aktiv', 'Testdaten', 'Status']
# Daten in DataFrame
data = cur.fetchall()
@@ -2439,6 +2491,7 @@ def export_licenses():
# Typ und Aktiv Status anpassen
df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'})
df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'})
df['Testdaten'] = df['Testdaten'].replace({True: 'Ja', False: 'Nein'})
cur.close()
conn.close()
@@ -2498,12 +2551,12 @@ def export_customers():
conn = get_connection()
cur = conn.cursor()
# Alle Kunden mit Lizenzstatistiken
# Alle Kunden mit Lizenzstatistiken (ohne Testdaten)
cur.execute("""
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
COUNT(CASE WHEN l.is_test = FALSE THEN 1 END) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE AND l.is_test = FALSE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE AND l.is_test = FALSE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.created_at
@@ -2900,11 +2953,11 @@ def bulk_activate_licenses():
conn = get_connection()
cur = conn.cursor()
# Update all selected licenses
# Update all selected licenses (nur Live-Daten)
cur.execute("""
UPDATE licenses
SET is_active = TRUE
WHERE id = ANY(%s)
WHERE id = ANY(%s) AND is_test = FALSE
""", (license_ids,))
affected_rows = cur.rowcount
@@ -2936,11 +2989,11 @@ def bulk_deactivate_licenses():
conn = get_connection()
cur = conn.cursor()
# Update all selected licenses
# Update all selected licenses (nur Live-Daten)
cur.execute("""
UPDATE licenses
SET is_active = FALSE
WHERE id = ANY(%s)
WHERE id = ANY(%s) AND is_test = FALSE
""", (license_ids,))
affected_rows = cur.rowcount
@@ -2972,18 +3025,18 @@ def bulk_delete_licenses():
conn = get_connection()
cur = conn.cursor()
# Get license info for audit log
# Get license info for audit log (nur Live-Daten)
cur.execute("""
SELECT license_key
FROM licenses
WHERE id = ANY(%s)
WHERE id = ANY(%s) AND is_test = FALSE
""", (license_ids,))
license_keys = [row[0] for row in cur.fetchall()]
# Delete all selected licenses
# Delete all selected licenses (nur Live-Daten)
cur.execute("""
DELETE FROM licenses
WHERE id = ANY(%s)
WHERE id = ANY(%s) AND is_test = FALSE
""", (license_ids,))
affected_rows = cur.rowcount