Test zu Fake geändert, weil Namensproblem

Dieser Commit ist enthalten in:
2025-06-21 17:22:12 +02:00
Ursprung fec588ba06
Commit 3d899b1c45
22 geänderte Dateien mit 614 neuen und 347 gelöschten Zeilen

Datei anzeigen

@@ -25,18 +25,18 @@ def dashboard():
try:
# Hole Statistiken mit sicheren Defaults
# Anzahl aktiver Lizenzen (nur echte Daten, keine Testdaten)
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_active = true AND is_test = false")
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_active = true AND is_fake = false")
active_licenses = cur.fetchone()[0] if cur.rowcount > 0 else 0
# Anzahl Kunden (nur echte Kunden, keine Testkunden)
cur.execute("SELECT COUNT(*) FROM customers WHERE is_test = false")
# Anzahl Kunden (nur echte Kunden, keine Fake-Kunden)
cur.execute("SELECT COUNT(*) FROM customers WHERE is_fake = false")
total_customers = cur.fetchone()[0] if cur.rowcount > 0 else 0
# Testdaten separat zählen für optionale Anzeige
cur.execute("SELECT COUNT(*) FROM customers WHERE is_test = true")
test_customers_count = cur.fetchone()[0] if cur.rowcount > 0 else 0
cur.execute("SELECT COUNT(*) FROM customers WHERE is_fake = true")
fake_customers_count = cur.fetchone()[0] if cur.rowcount > 0 else 0
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_test = true")
cur.execute("SELECT COUNT(*) FROM licenses WHERE is_fake = true")
test_licenses_count = cur.fetchone()[0] if cur.rowcount > 0 else 0
# Anzahl aktiver Sessions (Admin-Panel)
@@ -61,7 +61,7 @@ def dashboard():
FROM license_heartbeats lh
JOIN licenses l ON l.id = lh.license_id
WHERE lh.timestamp > NOW() - INTERVAL '15 minutes'
AND l.is_test = false
AND l.is_fake = false
""")
active_usage = cur.fetchone()[0] if cur.rowcount > 0 else 0
except Exception as e:
@@ -82,7 +82,7 @@ def dashboard():
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
LEFT JOIN sessions s ON l.id = s.license_id
WHERE l.is_test = false AND c.is_test = false
WHERE l.is_fake = false AND c.is_fake = false
GROUP BY l.license_key, c.name
ORDER BY session_count DESC
LIMIT 10
@@ -109,7 +109,7 @@ def dashboard():
cur.execute("""
SELECT COUNT(*) as full_licenses
FROM licenses
WHERE is_test = false
WHERE is_fake = false
""")
license_count = cur.fetchone()
full_licenses = license_count[0] if license_count and license_count[0] else 0
@@ -123,7 +123,7 @@ def dashboard():
COUNT(CASE WHEN valid_until < NOW() THEN 1 END) as expired,
COUNT(CASE WHEN is_active = false THEN 1 END) as inactive
FROM licenses
WHERE is_test = false
WHERE is_fake = false
""")
license_status = cur.fetchone()
active_licenses_count = license_status[0] if license_status and license_status[0] else 0
@@ -140,8 +140,8 @@ def dashboard():
EXTRACT(DAY FROM (l.valid_until - NOW())) as days_remaining
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.is_test = false
AND c.is_test = false
WHERE l.is_fake = false
AND c.is_fake = false
AND l.is_active = true
AND l.valid_until IS NOT NULL
AND l.valid_until > NOW()
@@ -166,8 +166,8 @@ def dashboard():
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.is_test = false
AND c.is_test = false
WHERE l.is_fake = false
AND c.is_fake = false
ORDER BY l.created_at DESC
LIMIT 5
""")
@@ -176,15 +176,15 @@ def dashboard():
# Stats Objekt für Template erstellen
stats = {
'total_customers': total_customers,
'total_licenses': active_licenses, # This was already filtered for is_test = false
'total_licenses': active_licenses, # This was already filtered for is_fake = false
'active_sessions': active_sessions, # Admin-Panel Sessions
'active_usage': active_usage, # Aktive Kunden-Nutzung
'active_licenses': active_licenses_count,
'full_licenses': full_licenses,
'test_licenses': test_version_licenses, # Test versions, not test data
'test_data_count': test_licenses_count, # Actual test data count
'test_customers_count': test_customers_count,
'test_resources_count': 0,
'fake_data_count': test_licenses_count, # Actual test data count
'fake_customers_count': fake_customers_count,
'fake_resources_count': 0,
'expired_licenses': expired_licenses,
'inactive_licenses': inactive_licenses,
'last_backup': None,
@@ -210,7 +210,7 @@ def dashboard():
COUNT(CASE WHEN status = 'quarantine' THEN 1 END) as quarantine,
COUNT(*) as total
FROM resource_pool
WHERE type = %s AND is_test = false
WHERE type = %s AND is_fake = false
""", (resource_type,))
result = cur.fetchone()
@@ -249,9 +249,9 @@ def dashboard():
# Count test resources separately
try:
cur.execute("SELECT COUNT(*) FROM resource_pool WHERE is_test = true")
test_resources_count = cur.fetchone()[0] if cur.rowcount > 0 else 0
stats['test_resources_count'] = test_resources_count
cur.execute("SELECT COUNT(*) FROM resource_pool WHERE is_fake = true")
fake_resources_count = cur.fetchone()[0] if cur.rowcount > 0 else 0
stats['fake_resources_count'] = fake_resources_count
except:
pass

Datei anzeigen

@@ -25,7 +25,7 @@ def api_customers():
try:
# Get all customers (with optional search)
customers = get_customers(show_test=True, search=search)
customers = get_customers(show_fake=True, search=search)
# Pagination
start = (page - 1) * per_page
@@ -396,7 +396,7 @@ def bulk_delete_licenses():
for license_id in license_ids:
# Hole vollständige Lizenz-Info
cur.execute("""
SELECT l.id, l.license_key, l.is_active, l.is_test,
SELECT l.id, l.license_key, l.is_active, l.is_fake,
c.name as customer_name
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
@@ -407,7 +407,7 @@ def bulk_delete_licenses():
if not result:
continue
license_id, license_key, is_active, is_test, customer_name = result
license_id, license_key, is_active, is_fake, customer_name = result
# Safety check: Don't delete active licenses unless forced
if is_active and not force_delete:
@@ -601,7 +601,7 @@ def get_license_resources(license_id):
rp.id,
rp.resource_type,
rp.resource_value,
rp.is_test,
rp.is_fake,
rp.status_changed_at,
lr.assigned_at,
lr.assigned_by
@@ -617,7 +617,7 @@ def get_license_resources(license_id):
'id': row[0],
'type': row[1],
'value': row[2],
'is_test': row[3],
'is_fake': row[3],
'status_changed_at': row[4].isoformat() if row[4] else None,
'assigned_at': row[5].isoformat() if row[5] else None,
'assigned_by': row[6]
@@ -675,7 +675,7 @@ def allocate_resources():
try:
# Prüfe ob Ressource verfügbar ist
cur.execute("""
SELECT resource_value, status, is_test
SELECT resource_value, status, is_fake
FROM resource_pools
WHERE id = %s
""", (resource_id,))
@@ -690,8 +690,8 @@ def allocate_resources():
continue
# Prüfe Test/Produktion Kompatibilität
if resource[2] != license_data['is_test']:
errors.append(f"Ressource {resource[0]} ist {'Test' if resource[2] else 'Produktion'}, Lizenz ist {'Test' if license_data['is_test'] else 'Produktion'}")
if resource[2] != license_data['is_fake']:
errors.append(f"Ressource {resource[0]} ist {'Test' if resource[2] else 'Produktion'}, Lizenz ist {'Test' if license_data['is_fake'] else 'Produktion'}")
continue
# Weise Ressource zu
@@ -751,32 +751,32 @@ def check_resource_availability():
resource_type = request.args.get('type')
if resource_type:
count = int(request.args.get('count', 1))
is_test = request.args.get('is_test', 'false') == 'true'
show_test = request.args.get('show_test', 'false') == 'true'
is_fake = request.args.get('is_fake', 'false') == 'true'
show_fake = request.args.get('show_fake', 'false') == 'true'
conn = get_connection()
cur = conn.cursor()
try:
# Hole verfügbare Ressourcen mit Details
if show_test:
if show_fake:
# Zeige alle verfügbaren Ressourcen (Test und Produktion)
cur.execute("""
SELECT id, resource_value, is_test
SELECT id, resource_value, is_fake
FROM resource_pools
WHERE resource_type = %s
AND status = 'available'
ORDER BY is_test, resource_value
ORDER BY is_fake, resource_value
LIMIT %s
""", (resource_type, count))
else:
# Zeige nur Produktions-Ressourcen
cur.execute("""
SELECT id, resource_value, is_test
SELECT id, resource_value, is_fake
FROM resource_pools
WHERE resource_type = %s
AND status = 'available'
AND is_test = false
AND is_fake = false
ORDER BY resource_value
LIMIT %s
""", (resource_type, count))
@@ -786,7 +786,7 @@ def check_resource_availability():
available_resources.append({
'id': row[0],
'value': row[1],
'is_test': row[2]
'is_fake': row[2]
})
return jsonify({
@@ -794,7 +794,7 @@ def check_resource_availability():
'requested': count,
'available': available_resources,
'sufficient': len(available_resources) >= count,
'show_test': show_test
'show_fake': show_fake
})
except Exception as e:
@@ -808,7 +808,7 @@ def check_resource_availability():
domain_count = int(request.args.get('domain', 0))
ipv4_count = int(request.args.get('ipv4', 0))
phone_count = int(request.args.get('phone', 0))
is_test = request.args.get('is_test', 'false') == 'true'
is_fake = request.args.get('is_fake', 'false') == 'true'
conn = get_connection()
cur = conn.cursor()
@@ -823,8 +823,8 @@ def check_resource_availability():
FROM resource_pools
WHERE resource_type = 'domain'
AND status = 'available'
AND is_test = %s
""", (is_test,))
AND is_fake = %s
""", (is_fake,))
domain_available = cur.fetchone()[0]
# IPv4
@@ -833,8 +833,8 @@ def check_resource_availability():
FROM resource_pools
WHERE resource_type = 'ipv4'
AND status = 'available'
AND is_test = %s
""", (is_test,))
AND is_fake = %s
""", (is_fake,))
ipv4_available = cur.fetchone()[0]
# Phones
@@ -843,8 +843,8 @@ def check_resource_availability():
FROM resource_pools
WHERE resource_type = 'phone'
AND status = 'available'
AND is_test = %s
""", (is_test,))
AND is_fake = %s
""", (is_fake,))
phone_available = cur.fetchone()[0]
return jsonify({
@@ -862,7 +862,7 @@ def check_resource_availability():
ipv4_available >= ipv4_count and
phone_available >= phone_count
),
'is_test': is_test
'is_fake': is_fake
})
except Exception as e:

Datei anzeigen

@@ -42,7 +42,7 @@ def batch_create():
valid_from = request.form['valid_from']
valid_until = request.form['valid_until']
device_limit = int(request.form['device_limit'])
is_test = 'is_test' in request.form
is_fake = 'is_fake' in request.form
# Validierung
if count < 1 or count > 100:
@@ -74,13 +74,13 @@ def batch_create():
INSERT INTO licenses (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_test, created_at
is_fake, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_test, datetime.now()
is_fake, datetime.now()
))
license_id = cur.fetchone()[0]
@@ -141,7 +141,7 @@ def batch_export():
SELECT
l.license_key, c.name, c.email,
l.license_type, l.valid_from, l.valid_until,
l.device_limit, l.is_test, l.created_at
l.device_limit, l.is_fake, l.created_at
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = ANY(%s)
@@ -158,7 +158,7 @@ def batch_export():
'valid_from': row[4],
'valid_until': row[5],
'device_limit': row[6],
'is_test': row[7],
'is_fake': row[7],
'created_at': row[8]
})
@@ -334,13 +334,13 @@ def batch_import():
INSERT INTO licenses (
license_key, customer_id,
license_type, valid_from, valid_until, device_limit,
is_test, created_at
is_fake, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
license_key, customer_id,
row['license_type'], row['valid_from'], row['valid_until'],
int(row['device_limit']), row.get('is_test', False),
int(row['device_limit']), row.get('is_fake', False),
datetime.now()
))

Datei anzeigen

@@ -22,14 +22,14 @@ def test_customers():
@customer_bp.route("/customers")
@login_required
def customers():
show_test = request.args.get('show_test', 'false').lower() == 'true'
show_fake = request.args.get('show_fake', 'false').lower() == 'true'
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
sort = request.args.get('sort', 'name')
order = request.args.get('order', 'asc')
customers_list = get_customers(show_test=show_test, search=search)
customers_list = get_customers(show_fake=show_fake, search=search)
# Sortierung
if sort == 'name':
@@ -48,7 +48,7 @@ def customers():
return render_template("customers.html",
customers=paginated_customers,
show_test=show_test,
show_fake=show_fake,
search=search,
page=page,
per_page=per_page,
@@ -77,17 +77,17 @@ def edit_customer(customer_id):
new_values = {
'name': request.form['name'],
'email': request.form['email'],
'is_test': 'is_test' in request.form
'is_fake': 'is_fake' in request.form
}
cur.execute("""
UPDATE customers
SET name = %s, email = %s, is_test = %s
SET name = %s, email = %s, is_fake = %s
WHERE id = %s
""", (
new_values['name'],
new_values['email'],
new_values['is_test'],
new_values['is_fake'],
customer_id
))
@@ -98,16 +98,16 @@ def edit_customer(customer_id):
old_values={
'name': current_customer['name'],
'email': current_customer['email'],
'is_test': current_customer.get('is_test', False)
'is_fake': current_customer.get('is_fake', False)
},
new_values=new_values)
flash('Kunde erfolgreich aktualisiert!', 'success')
# Redirect mit show_test Parameter wenn nötig
# Redirect mit show_fake Parameter wenn nötig
redirect_url = url_for('customers.customers_licenses')
if request.form.get('show_test') == 'true':
redirect_url += '?show_test=true'
if request.form.get('show_fake') == 'true':
redirect_url += '?show_fake=true'
return redirect(redirect_url)
finally:
cur.close()
@@ -137,13 +137,13 @@ def create_customer():
# Insert new customer
name = request.form['name']
email = request.form['email']
is_test = 'is_test' in request.form # Checkbox ist nur vorhanden wenn angekreuzt
is_fake = 'is_fake' in request.form # Checkbox ist nur vorhanden wenn angekreuzt
cur.execute("""
INSERT INTO customers (name, email, is_test, created_at)
INSERT INTO customers (name, email, is_fake, created_at)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (name, email, is_test, datetime.now()))
""", (name, email, is_fake, datetime.now()))
customer_id = cur.fetchone()[0]
conn.commit()
@@ -153,17 +153,17 @@ def create_customer():
new_values={
'name': name,
'email': email,
'is_test': is_test
'is_fake': is_fake
})
if is_test:
flash(f'Testkunde {name} erfolgreich erstellt!', 'success')
if is_fake:
flash(f'Fake-Kunde {name} erfolgreich erstellt!', 'success')
else:
flash(f'Kunde {name} erfolgreich erstellt!', 'success')
# Redirect mit show_test=true wenn Testkunde erstellt wurde
if is_test:
return redirect(url_for('customers.customers_licenses', show_test='true'))
# Redirect mit show_fake=true wenn Fake-Kunde erstellt wurde
if is_fake:
return redirect(url_for('customers.customers_licenses', show_fake='true'))
else:
return redirect(url_for('customers.customers_licenses'))
@@ -232,9 +232,9 @@ def customers_licenses():
import psycopg2
logging.info("=== CUSTOMERS-LICENSES ROUTE CALLED ===")
# Get show_test parameter from URL
show_test = request.args.get('show_test', 'false').lower() == 'true'
logging.info(f"show_test parameter: {show_test}")
# Get show_fake parameter from URL
show_fake = request.args.get('show_fake', 'false').lower() == 'true'
logging.info(f"show_fake parameter: {show_fake}")
try:
# Direkte Verbindung ohne Helper-Funktionen
@@ -250,7 +250,7 @@ def customers_licenses():
try:
# Hole alle Kunden mit ihren Lizenzen
# Wenn show_test=false, zeige nur Nicht-Test-Kunden
# Wenn show_fake=false, zeige nur Nicht-Test-Kunden
query = """
SELECT
c.id,
@@ -259,17 +259,17 @@ def customers_licenses():
c.created_at,
COUNT(l.id),
COUNT(CASE WHEN l.is_active = true THEN 1 END),
COUNT(CASE WHEN l.is_test = true THEN 1 END),
COUNT(CASE WHEN l.is_fake = true THEN 1 END),
MAX(l.created_at),
c.is_test
c.is_fake
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE (%s OR c.is_test = false)
GROUP BY c.id, c.name, c.email, c.created_at, c.is_test
WHERE (%s OR c.is_fake = false)
GROUP BY c.id, c.name, c.email, c.created_at, c.is_fake
ORDER BY c.name
"""
cur.execute(query, (show_test,))
cur.execute(query, (show_fake,))
customers = []
results = cur.fetchall()
@@ -286,12 +286,12 @@ def customers_licenses():
'active_licenses': row[5],
'test_licenses': row[6],
'last_license_created': row[7],
'is_test': row[8]
'is_fake': row[8]
})
return render_template("customers_licenses.html",
customers=customers,
show_test=show_test)
show_fake=show_fake)
finally:
cur.close()
@@ -325,7 +325,7 @@ def api_customer_licenses(customer_id):
l.license_key,
l.license_type,
l.is_active,
l.is_test,
l.is_fake,
l.valid_from,
l.valid_until,
l.device_limit,
@@ -387,7 +387,7 @@ def api_customer_licenses(customer_id):
'license_key': row[1],
'license_type': row[2],
'is_active': row[3],
'is_test': row[4],
'is_fake': row[4],
'valid_from': row[5].strftime('%Y-%m-%d') if row[5] else None,
'valid_until': row[6].strftime('%Y-%m-%d') if row[6] else None,
'device_limit': row[7],
@@ -442,7 +442,7 @@ def api_customer_quick_stats(customer_id):
SELECT
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.is_test = true THEN 1 END) as test_licenses,
COUNT(CASE WHEN l.is_fake = true THEN 1 END) as test_licenses,
SUM(l.device_limit) as total_device_limit
FROM licenses l
WHERE l.customer_id = %s

Datei anzeigen

@@ -21,10 +21,10 @@ def export_licenses():
try:
# Filter aus Request
show_test = request.args.get('show_test', 'false') == 'true'
show_fake = request.args.get('show_fake', 'false') == 'true'
# SQL Query mit optionalem Test-Filter
if show_test:
if show_fake:
query = """
SELECT
l.id,
@@ -37,7 +37,7 @@ def export_licenses():
l.is_active,
l.device_limit,
l.created_at,
l.is_test,
l.is_fake,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.is_active = false THEN 'Deaktiviert'
@@ -62,7 +62,7 @@ def export_licenses():
l.is_active,
l.device_limit,
l.created_at,
l.is_test,
l.is_fake,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.is_active = false THEN 'Deaktiviert'
@@ -72,7 +72,7 @@ def export_licenses():
(SELECT COUNT(DISTINCT hardware_id) FROM sessions s WHERE s.license_key = l.license_key) as registered_devices
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
WHERE l.is_test = false
WHERE l.is_fake = false
ORDER BY l.created_at DESC
"""
@@ -81,7 +81,7 @@ def export_licenses():
# Daten für Export vorbereiten
data = []
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ', 'Gültig von',
'Gültig bis', 'Aktiv', 'Gerätelimit', 'Erstellt am', 'Test-Lizenz',
'Gültig bis', 'Aktiv', 'Gerätelimit', 'Erstellt am', 'Fake-Lizenz',
'Status', 'Aktive Sessions', 'Registrierte Geräte']
for row in cur.fetchall():
@@ -163,13 +163,13 @@ def export_customers():
c.phone,
c.address,
c.created_at,
c.is_test,
c.is_fake,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.phone, c.address, c.created_at, c.is_test
GROUP BY c.id, c.name, c.email, c.phone, c.address, c.created_at, c.is_fake
ORDER BY c.name
""")
@@ -227,7 +227,7 @@ def export_sessions():
s.last_heartbeat,
s.is_active,
l.license_type,
l.is_test
l.is_fake
FROM sessions s
LEFT JOIN licenses l ON s.license_key = l.license_key
WHERE s.is_active = true
@@ -247,7 +247,7 @@ def export_sessions():
s.last_heartbeat,
s.is_active,
l.license_type,
l.is_test
l.is_fake
FROM sessions s
LEFT JOIN licenses l ON s.license_key = l.license_key
WHERE s.started_at >= CURRENT_TIMESTAMP - INTERVAL '%s days'
@@ -259,7 +259,7 @@ def export_sessions():
data = []
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'Benutzer', 'Geräte-ID',
'Login-Zeit', 'Logout-Zeit', 'Letzte Aktivität', 'Aktiv',
'Lizenztyp', 'Test-Lizenz']
'Lizenztyp', 'Fake-Lizenz']
for row in cur.fetchall():
data.append(list(row))
@@ -295,7 +295,7 @@ def export_resources():
# Filter aus Request
resource_type = request.args.get('type', 'all')
status_filter = request.args.get('status', 'all')
show_test = request.args.get('show_test', 'false') == 'true'
show_fake = request.args.get('show_fake', 'false') == 'true'
# SQL Query aufbauen
query = """
@@ -304,7 +304,7 @@ def export_resources():
rp.resource_type,
rp.resource_value,
rp.status,
rp.is_test,
rp.is_fake,
l.license_key,
c.name as customer_name,
rp.created_at,
@@ -328,8 +328,8 @@ def export_resources():
query += " AND rp.status = %s"
params.append(status_filter)
if not show_test:
query += " AND rp.is_test = false"
if not show_fake:
query += " AND rp.is_fake = false"
query += " ORDER BY rp.resource_type, rp.resource_value"

Datei anzeigen

@@ -24,43 +24,47 @@ def licenses():
# Get filter parameters
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
filter_types = request.args.getlist('types[]') # Multi-select for types
filter_statuses = request.args.getlist('statuses[]') # Multi-select for statuses
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
page = request.args.get('page', 1, type=int)
per_page = 50
# Process type filter to determine show_test
# Default: show only real data unless test_data is explicitly selected
show_test = filter_type == 'test_data'
# Get all licenses (both fake and real data)
licenses_list = get_licenses(show_fake=True)
# Get licenses based on filters
licenses_list = get_licenses(show_test=show_test)
# Type filtering with OR logic
if filter_types:
filtered_by_type = []
for license in licenses_list:
# Check if license matches any selected type
if 'full' in filter_types and license.get('license_type') == 'full' and not license.get('is_fake'):
filtered_by_type.append(license)
elif 'test' in filter_types and license.get('license_type') == 'test' and not license.get('is_fake'):
filtered_by_type.append(license)
licenses_list = filtered_by_type
else:
# If no types selected, show only real data by default
licenses_list = [l for l in licenses_list if not l.get('is_fake')]
# Additional filtering based on type and status
if filter_type:
if filter_type == 'full':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'full' and not l.get('is_test')]
elif filter_type == 'test':
licenses_list = [l for l in licenses_list if l.get('license_type') == 'test' and not l.get('is_test')]
elif filter_type == 'test_data':
licenses_list = [l for l in licenses_list if l.get('is_test')]
elif filter_type == 'live_data':
licenses_list = [l for l in licenses_list if not l.get('is_test')]
# Status filtering
if filter_status:
# Status filtering with OR logic
if filter_statuses:
now = datetime.now()
if filter_status == 'active':
licenses_list = [l for l in licenses_list if l.get('is_active') and l.get('valid_until') and l.get('valid_until') > now]
elif filter_status == 'expiring':
expiry_threshold = now + timedelta(days=30)
licenses_list = [l for l in licenses_list if l.get('valid_until') and now < l.get('valid_until') <= expiry_threshold]
elif filter_status == 'expired':
licenses_list = [l for l in licenses_list if l.get('valid_until') and l.get('valid_until') <= now]
elif filter_status == 'inactive':
licenses_list = [l for l in licenses_list if not l.get('is_active')]
filtered_by_status = []
for license in licenses_list:
# Check if license matches any selected status
if 'active' in filter_statuses and license.get('is_active') and license.get('valid_until') and license.get('valid_until') > now:
filtered_by_status.append(license)
elif 'expiring' in filter_statuses:
expiry_threshold = now + timedelta(days=30)
if license.get('valid_until') and now < license.get('valid_until') <= expiry_threshold:
filtered_by_status.append(license)
elif 'expired' in filter_statuses and license.get('valid_until') and license.get('valid_until') <= now:
filtered_by_status.append(license)
elif 'inactive' in filter_statuses and not license.get('is_active'):
filtered_by_status.append(license)
licenses_list = filtered_by_status
# Search filtering
if search:
@@ -79,10 +83,9 @@ def licenses():
return render_template("licenses.html",
licenses=licenses_list,
show_test=show_test,
search=search,
filter_type=filter_type,
filter_status=filter_status,
filter_types=filter_types,
filter_statuses=filter_statuses,
sort=sort,
order=order,
page=page,
@@ -114,14 +117,14 @@ def edit_license(license_id):
'valid_from': request.form['valid_from'],
'valid_until': request.form['valid_until'],
'is_active': 'is_active' in request.form,
'is_test': 'is_test' in request.form,
'is_fake': 'is_fake' in request.form,
'device_limit': int(request.form.get('device_limit', 3))
}
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s, is_test = %s, device_limit = %s
valid_until = %s, is_active = %s, is_fake = %s, device_limit = %s
WHERE id = %s
""", (
new_values['license_key'],
@@ -129,7 +132,7 @@ def edit_license(license_id):
new_values['valid_from'],
new_values['valid_until'],
new_values['is_active'],
new_values['is_test'],
new_values['is_fake'],
new_values['device_limit'],
license_id
))
@@ -144,7 +147,7 @@ def edit_license(license_id):
'valid_from': str(current_license.get('valid_from', '')),
'valid_until': str(current_license.get('valid_until', '')),
'is_active': current_license.get('is_active'),
'is_test': current_license.get('is_test'),
'is_fake': current_license.get('is_fake'),
'device_limit': current_license.get('device_limit', 3)
},
new_values=new_values)
@@ -283,7 +286,7 @@ def create_license():
license_key = request.form["license_key"].upper() # Immer Großbuchstaben
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
is_test = request.form.get("is_test") == "on" # Checkbox value
is_fake = request.form.get("is_fake") == "on" # Checkbox value
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
@@ -337,19 +340,19 @@ def create_license():
# Kunde einfügen (erbt Test-Status von Lizenz)
cur.execute("""
INSERT INTO customers (name, email, is_test, created_at)
INSERT INTO customers (name, email, is_fake, created_at)
VALUES (%s, %s, %s, NOW())
RETURNING id
""", (name, email, is_test))
""", (name, email, is_fake))
customer_id = cur.fetchone()[0]
customer_info = {'name': name, 'email': email, 'is_test': is_test}
customer_info = {'name': name, 'email': email, 'is_fake': is_fake}
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email, 'is_test': is_test})
new_values={'name': name, 'email': email, 'is_fake': is_fake})
else:
# Bestehender Kunde - hole Infos für Audit-Log
cur.execute("SELECT name, email, is_test FROM customers WHERE id = %s", (customer_id,))
cur.execute("SELECT name, email, is_fake FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
@@ -357,17 +360,17 @@ def create_license():
customer_info = {'name': customer_data[0], 'email': customer_data[1]}
# Wenn Kunde Test-Kunde ist, Lizenz auch als Test markieren
if customer_data[2]: # is_test des Kunden
is_test = True
if customer_data[2]: # is_fake des Kunden
is_fake = True
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active,
domain_count, ipv4_count, phone_count, device_limit, is_test)
domain_count, ipv4_count, phone_count, device_limit, is_fake)
VALUES (%s, %s, %s, %s, %s, TRUE, %s, %s, %s, %s, %s)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until,
domain_count, ipv4_count, phone_count, device_limit, is_test))
domain_count, ipv4_count, phone_count, device_limit, is_fake))
license_id = cur.fetchone()[0]
# Ressourcen zuweisen
@@ -375,10 +378,10 @@ def create_license():
# Prüfe Verfügbarkeit
cur.execute("""
SELECT
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s) as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s) as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s) as phones
""", (is_test, is_test, is_test))
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'domain' AND status = 'available' AND is_fake = %s) as domains,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'ipv4' AND status = 'available' AND is_fake = %s) as ipv4s,
(SELECT COUNT(*) FROM resource_pools WHERE resource_type = 'phone' AND status = 'available' AND is_fake = %s) as phones
""", (is_fake, is_fake, is_fake))
available = cur.fetchone()
if available[0] < domain_count:
@@ -392,9 +395,9 @@ def create_license():
if domain_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'domain' AND status = 'available' AND is_test = %s
WHERE resource_type = 'domain' AND status = 'available' AND is_fake = %s
LIMIT %s FOR UPDATE
""", (is_test, domain_count))
""", (is_fake, domain_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -417,9 +420,9 @@ def create_license():
if ipv4_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'ipv4' AND status = 'available' AND is_test = %s
WHERE resource_type = 'ipv4' AND status = 'available' AND is_fake = %s
LIMIT %s FOR UPDATE
""", (is_test, ipv4_count))
""", (is_fake, ipv4_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -442,9 +445,9 @@ def create_license():
if phone_count > 0:
cur.execute("""
SELECT id FROM resource_pools
WHERE resource_type = 'phone' AND status = 'available' AND is_test = %s
WHERE resource_type = 'phone' AND status = 'available' AND is_fake = %s
LIMIT %s FOR UPDATE
""", (is_test, phone_count))
""", (is_fake, phone_count))
for (resource_id,) in cur.fetchall():
cur.execute("""
UPDATE resource_pools
@@ -480,7 +483,7 @@ def create_license():
'valid_from': valid_from,
'valid_until': valid_until,
'device_limit': device_limit,
'is_test': is_test
'is_fake': is_fake
})
flash(f'Lizenz {license_key} erfolgreich erstellt!', 'success')

Datei anzeigen

@@ -28,9 +28,9 @@ def resources():
resource_type = request.args.get('type', 'all')
status_filter = request.args.get('status', 'all')
search_query = request.args.get('search', '')
show_test = request.args.get('show_test', 'false') == 'true'
show_fake = request.args.get('show_fake', 'false') == 'true'
logging.info(f"Filters: type={resource_type}, status={status_filter}, search={search_query}, show_test={show_test}")
logging.info(f"Filters: type={resource_type}, status={status_filter}, search={search_query}, show_fake={show_fake}")
# Basis-Query
query = """
@@ -39,7 +39,7 @@ def resources():
rp.resource_type,
rp.resource_value,
rp.status,
rp.is_test,
rp.is_fake,
rp.allocated_to_license,
rp.created_at,
rp.status_changed_at,
@@ -67,8 +67,8 @@ def resources():
query += " AND (rp.resource_value ILIKE %s OR c.name ILIKE %s)"
params.extend([f'%{search_query}%', f'%{search_query}%'])
if not show_test:
query += " AND rp.is_test = false"
if not show_fake:
query += " AND rp.is_fake = false"
query += " ORDER BY rp.resource_type, rp.resource_value"
@@ -84,7 +84,7 @@ def resources():
'resource_type': row[1],
'resource_value': row[2],
'status': row[3],
'is_test': row[4],
'is_fake': row[4],
'allocated_to_license': row[5],
'created_at': row[6],
'status_changed_at': row[7],
@@ -98,16 +98,16 @@ def resources():
SELECT
resource_type,
status,
is_test,
is_fake,
COUNT(*) as count
FROM resource_pools
"""
# Apply test filter to statistics as well
if not show_test:
stats_query += " WHERE is_test = false"
if not show_fake:
stats_query += " WHERE is_fake = false"
stats_query += " GROUP BY resource_type, status, is_test"
stats_query += " GROUP BY resource_type, status, is_fake"
cur.execute(stats_query)
@@ -115,7 +115,7 @@ def resources():
for row in cur.fetchall():
res_type = row[0]
status = row[1]
is_test = row[2]
is_fake = row[2]
count = row[3]
if res_type not in stats:
@@ -131,7 +131,7 @@ def resources():
stats[res_type]['total'] += count
stats[res_type][status] = stats[res_type].get(status, 0) + count
if is_test:
if is_fake:
stats[res_type]['test'] += count
else:
stats[res_type]['prod'] += count
@@ -160,7 +160,7 @@ def resources():
resource_type=resource_type,
status_filter=status_filter,
search=search_query, # Changed from search_query to search
show_test=show_test,
show_fake=show_fake,
total=total,
page=page,
total_pages=total_pages,
@@ -325,7 +325,7 @@ def resource_history(resource_id):
try:
# Hole Ressourcen-Info
cur.execute("""
SELECT resource_type, resource_value, status, is_test
SELECT resource_type, resource_value, status, is_fake
FROM resource_pools WHERE id = %s
""", (resource_id,))
resource = cur.fetchone()
@@ -369,7 +369,7 @@ def resource_history(resource_id):
'type': resource[0],
'value': resource[1],
'status': resource[2],
'is_test': resource[3]
'is_fake': resource[3]
},
history=history)
@@ -395,10 +395,10 @@ def resource_metrics():
SELECT
resource_type,
status,
is_test,
is_fake,
COUNT(*) as count
FROM resource_pools
GROUP BY resource_type, status, is_test
GROUP BY resource_type, status, is_fake
ORDER BY resource_type, status
""")
@@ -539,8 +539,8 @@ def resources_report():
COUNT(CASE WHEN status = 'available' THEN 1 END) as available,
COUNT(CASE WHEN status = 'allocated' THEN 1 END) as allocated,
COUNT(CASE WHEN status = 'quarantined' THEN 1 END) as quarantined,
COUNT(CASE WHEN is_test = true THEN 1 END) as test,
COUNT(CASE WHEN is_test = false THEN 1 END) as production
COUNT(CASE WHEN is_fake = true THEN 1 END) as test,
COUNT(CASE WHEN is_fake = false THEN 1 END) as production
FROM resource_pools
GROUP BY resource_type
ORDER BY resource_type
@@ -566,7 +566,7 @@ def resources_report():
rp.resource_type,
rp.resource_value,
rp.status,
rp.is_test,
rp.is_fake,
c.name as customer_name,
l.license_key,
rp.status_changed_at,
@@ -628,7 +628,7 @@ def add_resources():
try:
resource_type = request.form.get('resource_type')
resources_text = request.form.get('resources_text', '')
is_test = request.form.get('is_test', 'false') == 'true'
is_fake = request.form.get('is_fake', 'false') == 'true'
if not resource_type or not resources_text.strip():
flash('Bitte Ressourcentyp und Ressourcen angeben!', 'error')
@@ -686,9 +686,9 @@ def add_resources():
for resource in new_resources:
cur.execute("""
INSERT INTO resource_pools
(resource_type, resource_value, status, is_test, created_by)
(resource_type, resource_value, status, is_fake, created_by)
VALUES (%s, %s, 'available', %s, %s)
""", (resource_type, resource, is_test, session['username']))
""", (resource_type, resource, is_fake, session['username']))
added_count += 1
conn.commit()
@@ -706,7 +706,7 @@ def add_resources():
if invalid_resources:
flash(f'{len(invalid_resources)} ungültige Ressourcen wurden ignoriert.', 'error')
return redirect(url_for('resources.resources', show_test=request.form.get('show_test', 'false')))
return redirect(url_for('resources.resources', show_fake=request.form.get('show_fake', 'false')))
except Exception as e:
conn.rollback()
@@ -717,5 +717,5 @@ def add_resources():
conn.close()
# GET request - show form
show_test = request.args.get('show_test', 'false') == 'true'
return render_template('add_resources.html', show_test=show_test)
show_fake = request.args.get('show_fake', 'false') == 'true'
return render_template('add_resources.html', show_fake=show_fake)