Files
v2-Docker/v2_adminpanel/routes/export_routes.py
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

495 Zeilen
17 KiB
Python

import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from flask import Blueprint, request, send_file
import config
from auth.decorators import login_required
from utils.export import create_excel_export, create_csv_export, prepare_audit_export_data, format_datetime_for_export
from db import get_connection
# Create Blueprint
export_bp = Blueprint('export', __name__, url_prefix='/export')
@export_bp.route("/licenses")
@login_required
def export_licenses():
"""Exportiert Lizenzen als Excel-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# Nur reale Daten exportieren - keine Fake-Daten
query = """
SELECT
l.id,
l.license_key,
c.name as customer_name,
c.email as customer_email,
l.license_type,
l.valid_from,
l.valid_until,
l.is_active,
l.device_limit,
l.created_at,
l.is_fake,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.is_active = false THEN 'Deaktiviert'
ELSE 'Aktiv'
END as status,
(SELECT COUNT(*) FROM sessions s WHERE s.license_key = l.license_key AND s.is_active = true) as active_sessions,
(SELECT COUNT(DISTINCT hardware_id) FROM sessions s WHERE s.license_key = l.license_key) as registered_devices
FROM licenses l
LEFT JOIN customers c ON l.customer_id = c.id
WHERE l.is_fake = false
ORDER BY l.created_at DESC
"""
cur.execute(query)
# Daten für Export vorbereiten
data = []
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ', 'Gültig von',
'Gültig bis', 'Aktiv', 'Gerätelimit', 'Erstellt am', 'Fake-Lizenz',
'Status', 'Aktive Sessions', 'Registrierte Geräte']
for row in cur.fetchall():
row_data = list(row)
# Format datetime fields
if row_data[5]: # valid_from
row_data[5] = format_datetime_for_export(row_data[5])
if row_data[6]: # valid_until
row_data[6] = format_datetime_for_export(row_data[6])
if row_data[9]: # created_at
row_data[9] = format_datetime_for_export(row_data[9])
data.append(row_data)
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'lizenzen')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'lizenzen')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Lizenzen", 500
finally:
cur.close()
conn.close()
@export_bp.route("/audit")
@login_required
def export_audit():
"""Exportiert Audit-Logs als Excel-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# Filter aus Request
days = int(request.args.get('days', 30))
action_filter = request.args.get('action', '')
entity_type_filter = request.args.get('entity_type', '')
# Query aufbauen
query = """
SELECT
id, timestamp, username, action, entity_type, entity_id,
ip_address, user_agent, old_values, new_values, additional_info
FROM audit_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '%s days'
"""
params = [days]
if action_filter:
query += " AND action = %s"
params.append(action_filter)
if entity_type_filter:
query += " AND entity_type = %s"
params.append(entity_type_filter)
query += " ORDER BY timestamp DESC"
cur.execute(query, params)
# Daten in Dictionary-Format umwandeln
audit_logs = []
for row in cur.fetchall():
audit_logs.append({
'id': row[0],
'timestamp': row[1],
'username': row[2],
'action': row[3],
'entity_type': row[4],
'entity_id': row[5],
'ip_address': row[6],
'user_agent': row[7],
'old_values': row[8],
'new_values': row[9],
'additional_info': row[10]
})
# Daten für Export vorbereiten
data = prepare_audit_export_data(audit_logs)
# Excel-Datei erstellen
columns = ['ID', 'Zeitstempel', 'Benutzer', 'Aktion', 'Entität', 'Entität ID',
'IP-Adresse', 'User Agent', 'Alte Werte', 'Neue Werte', 'Zusatzinfo']
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'audit_log')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'audit_log')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Audit-Logs", 500
finally:
cur.close()
conn.close()
@export_bp.route("/customers")
@login_required
def export_customers():
"""Exportiert Kunden als Excel-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# SQL Query - nur reale Kunden exportieren
cur.execute("""
SELECT
c.id,
c.name,
c.email,
c.phone,
c.address,
c.created_at,
c.is_fake,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE c.is_fake = false
GROUP BY c.id, c.name, c.email, c.phone, c.address, c.created_at, c.is_fake
ORDER BY c.name
""")
# Daten für Export vorbereiten
data = []
columns = ['ID', 'Name', 'E-Mail', 'Telefon', 'Adresse', 'Erstellt am',
'Test-Kunde', 'Anzahl Lizenzen', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
for row in cur.fetchall():
# Format datetime fields (created_at ist Spalte 5)
row_data = list(row)
if row_data[5]: # created_at
row_data[5] = format_datetime_for_export(row_data[5])
data.append(row_data)
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'kunden')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'kunden')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Kunden", 500
finally:
cur.close()
conn.close()
@export_bp.route("/sessions")
@login_required
def export_sessions():
"""Exportiert Sessions als Excel-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# Filter aus Request
days = int(request.args.get('days', 7))
active_only = request.args.get('active_only', 'false') == 'true'
# SQL Query
if active_only:
query = """
SELECT
s.id,
s.license_key,
l.customer_name,
s.username,
s.hardware_id,
s.started_at,
s.ended_at,
s.last_heartbeat,
s.is_active,
l.license_type,
l.is_fake
FROM sessions s
LEFT JOIN licenses l ON s.license_key = l.license_key
WHERE s.is_active = true AND l.is_fake = false
ORDER BY s.started_at DESC
"""
cur.execute(query)
else:
query = """
SELECT
s.id,
s.license_key,
l.customer_name,
s.username,
s.hardware_id,
s.started_at,
s.ended_at,
s.last_heartbeat,
s.is_active,
l.license_type,
l.is_fake
FROM sessions s
LEFT JOIN licenses l ON s.license_key = l.license_key
WHERE s.started_at >= CURRENT_TIMESTAMP - INTERVAL '%s days' AND l.is_fake = false
ORDER BY s.started_at DESC
"""
cur.execute(query, (days,))
# Daten für Export vorbereiten
data = []
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'Benutzer', 'Geräte-ID',
'Login-Zeit', 'Logout-Zeit', 'Letzte Aktivität', 'Aktiv',
'Lizenztyp', 'Fake-Lizenz']
for row in cur.fetchall():
row_data = list(row)
# Format datetime fields
if row_data[5]: # started_at
row_data[5] = format_datetime_for_export(row_data[5])
if row_data[6]: # ended_at
row_data[6] = format_datetime_for_export(row_data[6])
if row_data[7]: # last_heartbeat
row_data[7] = format_datetime_for_export(row_data[7])
data.append(row_data)
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'sessions')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'sessions')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Sessions", 500
finally:
cur.close()
conn.close()
@export_bp.route("/resources")
@login_required
def export_resources():
"""Exportiert Ressourcen als Excel-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# Filter aus Request
resource_type = request.args.get('type', 'all')
status_filter = request.args.get('status', 'all')
# SQL Query aufbauen
query = """
SELECT
rp.id,
rp.resource_type,
rp.resource_value,
rp.status,
rp.is_fake,
l.license_key,
c.name as customer_name,
rp.created_at,
rp.created_by,
rp.status_changed_at,
rp.status_changed_by,
rp.quarantine_reason
FROM resource_pools rp
LEFT JOIN licenses l ON rp.allocated_to_license = l.id
LEFT JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
if resource_type != 'all':
query += " AND rp.resource_type = %s"
params.append(resource_type)
if status_filter != 'all':
query += " AND rp.status = %s"
params.append(status_filter)
# Immer nur reale Ressourcen exportieren
query += " AND rp.is_fake = false"
query += " ORDER BY rp.resource_type, rp.resource_value"
cur.execute(query, params)
# Daten für Export vorbereiten
data = []
columns = ['ID', 'Typ', 'Wert', 'Status', 'Test-Ressource', 'Lizenzschlüssel',
'Kunde', 'Erstellt am', 'Erstellt von', 'Status geändert am',
'Status geändert von', 'Quarantäne-Grund']
for row in cur.fetchall():
row_data = list(row)
# Format datetime fields
if row_data[7]: # created_at
row_data[7] = format_datetime_for_export(row_data[7])
if row_data[9]: # status_changed_at
row_data[9] = format_datetime_for_export(row_data[9])
data.append(row_data)
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'ressourcen')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'ressourcen')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Ressourcen", 500
finally:
cur.close()
conn.close()
@export_bp.route("/monitoring")
@login_required
def export_monitoring():
"""Exportiert Monitoring-Daten als Excel/CSV-Datei"""
conn = get_connection()
cur = conn.cursor()
try:
# Zeitraum aus Request
hours = int(request.args.get('hours', 24))
# Monitoring-Daten sammeln
data = []
columns = ['Zeitstempel', 'Lizenz-ID', 'Lizenzschlüssel', 'Kunde', 'Hardware-ID',
'IP-Adresse', 'Ereignis-Typ', 'Schweregrad', 'Beschreibung']
# Query für Heartbeats und optionale Anomalien
query = """
WITH monitoring_data AS (
-- Lizenz-Heartbeats
SELECT
lh.timestamp,
lh.license_id,
l.license_key,
c.name as customer_name,
lh.hardware_id,
lh.ip_address,
'Heartbeat' as event_type,
'Normal' as severity,
'License validation' as description
FROM license_heartbeats lh
JOIN licenses l ON l.id = lh.license_id
JOIN customers c ON c.id = l.customer_id
WHERE lh.timestamp > CURRENT_TIMESTAMP - INTERVAL '%s hours'
AND l.is_fake = false
"""
# Check if anomaly_detections table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'anomaly_detections'
)
""")
has_anomalies = cur.fetchone()[0]
if has_anomalies:
query += """
UNION ALL
-- Anomalien
SELECT
ad.detected_at as timestamp,
ad.license_id,
l.license_key,
c.name as customer_name,
ad.hardware_id,
ad.ip_address,
ad.anomaly_type as event_type,
ad.severity,
ad.description
FROM anomaly_detections ad
LEFT JOIN licenses l ON l.id = ad.license_id
LEFT JOIN customers c ON c.id = l.customer_id
WHERE ad.detected_at > CURRENT_TIMESTAMP - INTERVAL '%s hours'
AND (l.is_fake = false OR l.is_fake IS NULL)
"""
params = [hours, hours]
else:
params = [hours]
query += """
)
SELECT * FROM monitoring_data
ORDER BY timestamp DESC
"""
cur.execute(query, params)
for row in cur.fetchall():
row_data = list(row)
# Format datetime field (timestamp ist Spalte 0)
if row_data[0]: # timestamp
row_data[0] = format_datetime_for_export(row_data[0])
data.append(row_data)
# Format prüfen
format_type = request.args.get('format', 'excel').lower()
if format_type == 'csv':
# CSV-Datei erstellen
return create_csv_export(data, columns, 'monitoring')
else:
# Excel-Datei erstellen
return create_excel_export(data, columns, 'monitoring')
except Exception as e:
logging.error(f"Fehler beim Export: {str(e)}")
return "Fehler beim Exportieren der Monitoring-Daten", 500
finally:
cur.close()
conn.close()