495 Zeilen
17 KiB
Python
495 Zeilen
17 KiB
Python
import logging
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
from flask import Blueprint, request, send_file
|
|
|
|
import config
|
|
from auth.decorators import login_required
|
|
from utils.export import create_excel_export, create_csv_export, prepare_audit_export_data, format_datetime_for_export
|
|
from db import get_connection
|
|
|
|
# Create Blueprint
|
|
export_bp = Blueprint('export', __name__, url_prefix='/export')
|
|
|
|
|
|
@export_bp.route("/licenses")
|
|
@login_required
|
|
def export_licenses():
|
|
"""Exportiert Lizenzen als Excel-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Nur reale Daten exportieren - keine Fake-Daten
|
|
query = """
|
|
SELECT
|
|
l.id,
|
|
l.license_key,
|
|
c.name as customer_name,
|
|
c.email as customer_email,
|
|
l.license_type,
|
|
l.valid_from,
|
|
l.valid_until,
|
|
l.is_active,
|
|
l.device_limit,
|
|
l.created_at,
|
|
l.is_fake,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
|
|
WHEN l.is_active = false THEN 'Deaktiviert'
|
|
ELSE 'Aktiv'
|
|
END as status,
|
|
(SELECT COUNT(*) FROM sessions s WHERE s.license_key = l.license_key AND s.is_active = true) as active_sessions,
|
|
(SELECT COUNT(DISTINCT hardware_id) FROM sessions s WHERE s.license_key = l.license_key) as registered_devices
|
|
FROM licenses l
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.is_fake = false
|
|
ORDER BY l.created_at DESC
|
|
"""
|
|
|
|
cur.execute(query)
|
|
|
|
# Daten für Export vorbereiten
|
|
data = []
|
|
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ', 'Gültig von',
|
|
'Gültig bis', 'Aktiv', 'Gerätelimit', 'Erstellt am', 'Fake-Lizenz',
|
|
'Status', 'Aktive Sessions', 'Registrierte Geräte']
|
|
|
|
for row in cur.fetchall():
|
|
row_data = list(row)
|
|
# Format datetime fields
|
|
if row_data[5]: # valid_from
|
|
row_data[5] = format_datetime_for_export(row_data[5])
|
|
if row_data[6]: # valid_until
|
|
row_data[6] = format_datetime_for_export(row_data[6])
|
|
if row_data[9]: # created_at
|
|
row_data[9] = format_datetime_for_export(row_data[9])
|
|
data.append(row_data)
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'lizenzen')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'lizenzen')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Lizenzen", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
@export_bp.route("/audit")
|
|
@login_required
|
|
def export_audit():
|
|
"""Exportiert Audit-Logs als Excel-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Filter aus Request
|
|
days = int(request.args.get('days', 30))
|
|
action_filter = request.args.get('action', '')
|
|
entity_type_filter = request.args.get('entity_type', '')
|
|
|
|
# Query aufbauen
|
|
query = """
|
|
SELECT
|
|
id, timestamp, username, action, entity_type, entity_id,
|
|
ip_address, user_agent, old_values, new_values, additional_info
|
|
FROM audit_log
|
|
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '%s days'
|
|
"""
|
|
params = [days]
|
|
|
|
if action_filter:
|
|
query += " AND action = %s"
|
|
params.append(action_filter)
|
|
|
|
if entity_type_filter:
|
|
query += " AND entity_type = %s"
|
|
params.append(entity_type_filter)
|
|
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
cur.execute(query, params)
|
|
|
|
# Daten in Dictionary-Format umwandeln
|
|
audit_logs = []
|
|
for row in cur.fetchall():
|
|
audit_logs.append({
|
|
'id': row[0],
|
|
'timestamp': row[1],
|
|
'username': row[2],
|
|
'action': row[3],
|
|
'entity_type': row[4],
|
|
'entity_id': row[5],
|
|
'ip_address': row[6],
|
|
'user_agent': row[7],
|
|
'old_values': row[8],
|
|
'new_values': row[9],
|
|
'additional_info': row[10]
|
|
})
|
|
|
|
# Daten für Export vorbereiten
|
|
data = prepare_audit_export_data(audit_logs)
|
|
|
|
# Excel-Datei erstellen
|
|
columns = ['ID', 'Zeitstempel', 'Benutzer', 'Aktion', 'Entität', 'Entität ID',
|
|
'IP-Adresse', 'User Agent', 'Alte Werte', 'Neue Werte', 'Zusatzinfo']
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'audit_log')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'audit_log')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Audit-Logs", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
@export_bp.route("/customers")
|
|
@login_required
|
|
def export_customers():
|
|
"""Exportiert Kunden als Excel-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# SQL Query - nur reale Kunden exportieren
|
|
cur.execute("""
|
|
SELECT
|
|
c.id,
|
|
c.name,
|
|
c.email,
|
|
c.phone,
|
|
c.address,
|
|
c.created_at,
|
|
c.is_fake,
|
|
COUNT(l.id) as license_count,
|
|
COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_licenses,
|
|
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
WHERE c.is_fake = false
|
|
GROUP BY c.id, c.name, c.email, c.phone, c.address, c.created_at, c.is_fake
|
|
ORDER BY c.name
|
|
""")
|
|
|
|
# Daten für Export vorbereiten
|
|
data = []
|
|
columns = ['ID', 'Name', 'E-Mail', 'Telefon', 'Adresse', 'Erstellt am',
|
|
'Test-Kunde', 'Anzahl Lizenzen', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
|
|
|
|
for row in cur.fetchall():
|
|
# Format datetime fields (created_at ist Spalte 5)
|
|
row_data = list(row)
|
|
if row_data[5]: # created_at
|
|
row_data[5] = format_datetime_for_export(row_data[5])
|
|
data.append(row_data)
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'kunden')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'kunden')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Kunden", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
@export_bp.route("/sessions")
|
|
@login_required
|
|
def export_sessions():
|
|
"""Exportiert Sessions als Excel-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Filter aus Request
|
|
days = int(request.args.get('days', 7))
|
|
active_only = request.args.get('active_only', 'false') == 'true'
|
|
|
|
# SQL Query
|
|
if active_only:
|
|
query = """
|
|
SELECT
|
|
s.id,
|
|
s.license_key,
|
|
l.customer_name,
|
|
s.username,
|
|
s.hardware_id,
|
|
s.started_at,
|
|
s.ended_at,
|
|
s.last_heartbeat,
|
|
s.is_active,
|
|
l.license_type,
|
|
l.is_fake
|
|
FROM sessions s
|
|
LEFT JOIN licenses l ON s.license_key = l.license_key
|
|
WHERE s.is_active = true AND l.is_fake = false
|
|
ORDER BY s.started_at DESC
|
|
"""
|
|
cur.execute(query)
|
|
else:
|
|
query = """
|
|
SELECT
|
|
s.id,
|
|
s.license_key,
|
|
l.customer_name,
|
|
s.username,
|
|
s.hardware_id,
|
|
s.started_at,
|
|
s.ended_at,
|
|
s.last_heartbeat,
|
|
s.is_active,
|
|
l.license_type,
|
|
l.is_fake
|
|
FROM sessions s
|
|
LEFT JOIN licenses l ON s.license_key = l.license_key
|
|
WHERE s.started_at >= CURRENT_TIMESTAMP - INTERVAL '%s days' AND l.is_fake = false
|
|
ORDER BY s.started_at DESC
|
|
"""
|
|
cur.execute(query, (days,))
|
|
|
|
# Daten für Export vorbereiten
|
|
data = []
|
|
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'Benutzer', 'Geräte-ID',
|
|
'Login-Zeit', 'Logout-Zeit', 'Letzte Aktivität', 'Aktiv',
|
|
'Lizenztyp', 'Fake-Lizenz']
|
|
|
|
for row in cur.fetchall():
|
|
row_data = list(row)
|
|
# Format datetime fields
|
|
if row_data[5]: # started_at
|
|
row_data[5] = format_datetime_for_export(row_data[5])
|
|
if row_data[6]: # ended_at
|
|
row_data[6] = format_datetime_for_export(row_data[6])
|
|
if row_data[7]: # last_heartbeat
|
|
row_data[7] = format_datetime_for_export(row_data[7])
|
|
data.append(row_data)
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'sessions')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'sessions')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Sessions", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
@export_bp.route("/resources")
|
|
@login_required
|
|
def export_resources():
|
|
"""Exportiert Ressourcen als Excel-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Filter aus Request
|
|
resource_type = request.args.get('type', 'all')
|
|
status_filter = request.args.get('status', 'all')
|
|
|
|
# SQL Query aufbauen
|
|
query = """
|
|
SELECT
|
|
rp.id,
|
|
rp.resource_type,
|
|
rp.resource_value,
|
|
rp.status,
|
|
rp.is_fake,
|
|
l.license_key,
|
|
c.name as customer_name,
|
|
rp.created_at,
|
|
rp.created_by,
|
|
rp.status_changed_at,
|
|
rp.status_changed_by,
|
|
rp.quarantine_reason
|
|
FROM resource_pools rp
|
|
LEFT JOIN licenses l ON rp.allocated_to_license = l.id
|
|
LEFT JOIN customers c ON l.customer_id = c.id
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
|
|
if resource_type != 'all':
|
|
query += " AND rp.resource_type = %s"
|
|
params.append(resource_type)
|
|
|
|
if status_filter != 'all':
|
|
query += " AND rp.status = %s"
|
|
params.append(status_filter)
|
|
|
|
# Immer nur reale Ressourcen exportieren
|
|
query += " AND rp.is_fake = false"
|
|
|
|
query += " ORDER BY rp.resource_type, rp.resource_value"
|
|
|
|
cur.execute(query, params)
|
|
|
|
# Daten für Export vorbereiten
|
|
data = []
|
|
columns = ['ID', 'Typ', 'Wert', 'Status', 'Test-Ressource', 'Lizenzschlüssel',
|
|
'Kunde', 'Erstellt am', 'Erstellt von', 'Status geändert am',
|
|
'Status geändert von', 'Quarantäne-Grund']
|
|
|
|
for row in cur.fetchall():
|
|
row_data = list(row)
|
|
# Format datetime fields
|
|
if row_data[7]: # created_at
|
|
row_data[7] = format_datetime_for_export(row_data[7])
|
|
if row_data[9]: # status_changed_at
|
|
row_data[9] = format_datetime_for_export(row_data[9])
|
|
data.append(row_data)
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'ressourcen')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'ressourcen')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Ressourcen", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
@export_bp.route("/monitoring")
|
|
@login_required
|
|
def export_monitoring():
|
|
"""Exportiert Monitoring-Daten als Excel/CSV-Datei"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Zeitraum aus Request
|
|
hours = int(request.args.get('hours', 24))
|
|
|
|
# Monitoring-Daten sammeln
|
|
data = []
|
|
columns = ['Zeitstempel', 'Lizenz-ID', 'Lizenzschlüssel', 'Kunde', 'Hardware-ID',
|
|
'IP-Adresse', 'Ereignis-Typ', 'Schweregrad', 'Beschreibung']
|
|
|
|
# Query für Heartbeats und optionale Anomalien
|
|
query = """
|
|
WITH monitoring_data AS (
|
|
-- Lizenz-Heartbeats
|
|
SELECT
|
|
lh.timestamp,
|
|
lh.license_id,
|
|
l.license_key,
|
|
c.name as customer_name,
|
|
lh.hardware_id,
|
|
lh.ip_address,
|
|
'Heartbeat' as event_type,
|
|
'Normal' as severity,
|
|
'License validation' as description
|
|
FROM license_heartbeats lh
|
|
JOIN licenses l ON l.id = lh.license_id
|
|
JOIN customers c ON c.id = l.customer_id
|
|
WHERE lh.timestamp > CURRENT_TIMESTAMP - INTERVAL '%s hours'
|
|
AND l.is_fake = false
|
|
"""
|
|
|
|
# Check if anomaly_detections table exists
|
|
cur.execute("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_name = 'anomaly_detections'
|
|
)
|
|
""")
|
|
has_anomalies = cur.fetchone()[0]
|
|
|
|
if has_anomalies:
|
|
query += """
|
|
UNION ALL
|
|
|
|
-- Anomalien
|
|
SELECT
|
|
ad.detected_at as timestamp,
|
|
ad.license_id,
|
|
l.license_key,
|
|
c.name as customer_name,
|
|
ad.hardware_id,
|
|
ad.ip_address,
|
|
ad.anomaly_type as event_type,
|
|
ad.severity,
|
|
ad.description
|
|
FROM anomaly_detections ad
|
|
LEFT JOIN licenses l ON l.id = ad.license_id
|
|
LEFT JOIN customers c ON c.id = l.customer_id
|
|
WHERE ad.detected_at > CURRENT_TIMESTAMP - INTERVAL '%s hours'
|
|
AND (l.is_fake = false OR l.is_fake IS NULL)
|
|
"""
|
|
params = [hours, hours]
|
|
else:
|
|
params = [hours]
|
|
|
|
query += """
|
|
)
|
|
SELECT * FROM monitoring_data
|
|
ORDER BY timestamp DESC
|
|
"""
|
|
|
|
cur.execute(query, params)
|
|
|
|
for row in cur.fetchall():
|
|
row_data = list(row)
|
|
# Format datetime field (timestamp ist Spalte 0)
|
|
if row_data[0]: # timestamp
|
|
row_data[0] = format_datetime_for_export(row_data[0])
|
|
data.append(row_data)
|
|
|
|
# Format prüfen
|
|
format_type = request.args.get('format', 'excel').lower()
|
|
|
|
if format_type == 'csv':
|
|
# CSV-Datei erstellen
|
|
return create_csv_export(data, columns, 'monitoring')
|
|
else:
|
|
# Excel-Datei erstellen
|
|
return create_excel_export(data, columns, 'monitoring')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fehler beim Export: {str(e)}")
|
|
return "Fehler beim Exportieren der Monitoring-Daten", 500
|
|
finally:
|
|
cur.close()
|
|
conn.close() |