Dateien
Hetzner-Backup/v2_adminpanel/app.py
2025-06-07 17:23:49 +02:00

1280 Zeilen
40 KiB
Python

import os
import psycopg2
from psycopg2.extras import Json
from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify
from flask_session import Session
from functools import wraps
from dotenv import load_dotenv
import pandas as pd
from datetime import datetime
import io
import json
import subprocess
import gzip
from cryptography.fernet import Fernet
from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
import logging
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
Session(app)
# Backup-Konfiguration
BACKUP_DIR = Path("/app/backups")
BACKUP_DIR.mkdir(exist_ok=True)
# Scheduler für automatische Backups
scheduler = BackgroundScheduler()
scheduler.start()
# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
# Login decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# DB-Verbindung mit UTF-8 Encoding
def get_connection():
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "5432"),
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
options='-c client_encoding=UTF8'
)
conn.set_client_encoding('UTF8')
return conn
# Audit-Log-Funktion
def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None):
"""Protokolliert Änderungen im Audit-Log"""
conn = get_connection()
cur = conn.cursor()
try:
username = session.get('username', 'system')
ip_address = request.remote_addr if request else None
user_agent = request.headers.get('User-Agent') if request else None
# Konvertiere Dictionaries zu JSONB
old_json = Json(old_values) if old_values else None
new_json = Json(new_values) if new_values else None
cur.execute("""
INSERT INTO audit_log
(username, action, entity_type, entity_id, old_values, new_values,
ip_address, user_agent, additional_info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (username, action, entity_type, entity_id, old_json, new_json,
ip_address, user_agent, additional_info))
conn.commit()
except Exception as e:
print(f"Audit log error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
# Verschlüsselungs-Funktionen
def get_or_create_encryption_key():
"""Holt oder erstellt einen Verschlüsselungsschlüssel"""
key_file = BACKUP_DIR / ".backup_key"
# Versuche Key aus Umgebungsvariable zu lesen
env_key = os.getenv("BACKUP_ENCRYPTION_KEY")
if env_key:
try:
# Validiere den Key
Fernet(env_key.encode())
return env_key.encode()
except:
pass
# Wenn kein gültiger Key in ENV, prüfe Datei
if key_file.exists():
return key_file.read_bytes()
# Erstelle neuen Key
key = Fernet.generate_key()
key_file.write_bytes(key)
logging.info("Neuer Backup-Verschlüsselungsschlüssel erstellt")
return key
# Backup-Funktionen
def create_backup(backup_type="manual", created_by=None):
"""Erstellt ein verschlüsseltes Backup der Datenbank"""
start_time = time.time()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc"
filepath = BACKUP_DIR / filename
conn = get_connection()
cur = conn.cursor()
# Backup-Eintrag erstellen
cur.execute("""
INSERT INTO backup_history
(filename, filepath, backup_type, status, created_by, is_encrypted)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (filename, str(filepath), backup_type, 'in_progress',
created_by or 'system', True))
backup_id = cur.fetchone()[0]
conn.commit()
try:
# PostgreSQL Dump erstellen
dump_command = [
'pg_dump',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password',
'--verbose'
]
# PGPASSWORD setzen
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
# Dump ausführen
result = subprocess.run(dump_command, capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
dump_data = result.stdout.encode('utf-8')
# Komprimieren
compressed_data = gzip.compress(dump_data)
# Verschlüsseln
key = get_or_create_encryption_key()
f = Fernet(key)
encrypted_data = f.encrypt(compressed_data)
# Speichern
filepath.write_bytes(encrypted_data)
# Statistiken sammeln
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'")
tables_count = cur.fetchone()[0]
cur.execute("""
SELECT SUM(n_live_tup)
FROM pg_stat_user_tables
""")
records_count = cur.fetchone()[0] or 0
duration = time.time() - start_time
filesize = filepath.stat().st_size
# Backup-Eintrag aktualisieren
cur.execute("""
UPDATE backup_history
SET status = %s, filesize = %s, tables_count = %s,
records_count = %s, duration_seconds = %s
WHERE id = %s
""", ('success', filesize, tables_count, records_count, duration, backup_id))
conn.commit()
# Audit-Log
log_audit('BACKUP', 'database', backup_id,
additional_info=f"Backup erstellt: {filename} ({filesize} bytes)")
# E-Mail-Benachrichtigung (wenn konfiguriert)
send_backup_notification(True, filename, filesize, duration)
logging.info(f"Backup erfolgreich erstellt: {filename}")
return True, filename
except Exception as e:
# Fehler protokollieren
cur.execute("""
UPDATE backup_history
SET status = %s, error_message = %s, duration_seconds = %s
WHERE id = %s
""", ('failed', str(e), time.time() - start_time, backup_id))
conn.commit()
logging.error(f"Backup fehlgeschlagen: {e}")
send_backup_notification(False, filename, error=str(e))
return False, str(e)
finally:
cur.close()
conn.close()
def restore_backup(backup_id, encryption_key=None):
"""Stellt ein Backup wieder her"""
conn = get_connection()
cur = conn.cursor()
try:
# Backup-Info abrufen
cur.execute("""
SELECT filename, filepath, is_encrypted
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
if not backup_info:
raise Exception("Backup nicht gefunden")
filename, filepath, is_encrypted = backup_info
filepath = Path(filepath)
if not filepath.exists():
raise Exception("Backup-Datei nicht gefunden")
# Datei lesen
encrypted_data = filepath.read_bytes()
# Entschlüsseln
if is_encrypted:
key = encryption_key.encode() if encryption_key else get_or_create_encryption_key()
try:
f = Fernet(key)
compressed_data = f.decrypt(encrypted_data)
except:
raise Exception("Entschlüsselung fehlgeschlagen. Falsches Passwort?")
else:
compressed_data = encrypted_data
# Dekomprimieren
dump_data = gzip.decompress(compressed_data)
sql_commands = dump_data.decode('utf-8')
# Bestehende Verbindungen schließen
cur.close()
conn.close()
# Datenbank wiederherstellen
restore_command = [
'psql',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password'
]
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
result = subprocess.run(restore_command, input=sql_commands,
capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"Wiederherstellung fehlgeschlagen: {result.stderr}")
# Audit-Log (neue Verbindung)
log_audit('RESTORE', 'database', backup_id,
additional_info=f"Backup wiederhergestellt: {filename}")
return True, "Backup erfolgreich wiederhergestellt"
except Exception as e:
logging.error(f"Wiederherstellung fehlgeschlagen: {e}")
return False, str(e)
def send_backup_notification(success, filename, filesize=None, duration=None, error=None):
"""Sendet E-Mail-Benachrichtigung (wenn konfiguriert)"""
if not os.getenv("EMAIL_ENABLED", "false").lower() == "true":
return
# E-Mail-Funktion vorbereitet aber deaktiviert
# TODO: Implementieren wenn E-Mail-Server konfiguriert ist
logging.info(f"E-Mail-Benachrichtigung vorbereitet: Backup {'erfolgreich' if success else 'fehlgeschlagen'}")
# Scheduled Backup Job
def scheduled_backup():
"""Führt ein geplantes Backup aus"""
logging.info("Starte geplantes Backup...")
create_backup(backup_type="scheduled", created_by="scheduler")
# Scheduler konfigurieren - täglich um 3:00 Uhr
scheduler.add_job(
scheduled_backup,
'cron',
hour=3,
minute=0,
id='daily_backup',
replace_existing=True
)
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
# Check gegen beide Admin-Accounts aus .env
admin1_user = os.getenv("ADMIN1_USERNAME")
admin1_pass = os.getenv("ADMIN1_PASSWORD")
admin2_user = os.getenv("ADMIN2_USERNAME")
admin2_pass = os.getenv("ADMIN2_PASSWORD")
if ((username == admin1_user and password == admin1_pass) or
(username == admin2_user and password == admin2_pass)):
session['logged_in'] = True
session['username'] = username
log_audit('LOGIN', 'user', additional_info=f"Erfolgreiche Anmeldung")
return redirect(url_for('dashboard'))
else:
return render_template("login.html", error="Ungültige Anmeldedaten")
return render_template("login.html")
@app.route("/logout")
def logout():
username = session.get('username', 'unknown')
log_audit('LOGOUT', 'user', additional_info=f"Abmeldung")
session.pop('logged_in', None)
session.pop('username', None)
return redirect(url_for('login'))
@app.route("/")
@login_required
def dashboard():
conn = get_connection()
cur = conn.cursor()
# Statistiken abrufen
# Gesamtanzahl Kunden
cur.execute("SELECT COUNT(*) FROM customers")
total_customers = cur.fetchone()[0]
# Gesamtanzahl Lizenzen
cur.execute("SELECT COUNT(*) FROM licenses")
total_licenses = cur.fetchone()[0]
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
""")
active_licenses = cur.fetchone()[0]
# Aktive Sessions
cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE")
active_sessions_count = cur.fetchone()[0]
# Abgelaufene Lizenzen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until < CURRENT_DATE
""")
expired_licenses = cur.fetchone()[0]
# Lizenzen die in den nächsten 30 Tagen ablaufen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
AND is_active = TRUE
""")
expiring_soon = cur.fetchone()[0]
# Testlizenzen vs Vollversionen
cur.execute("""
SELECT license_type, COUNT(*)
FROM licenses
GROUP BY license_type
""")
license_types = dict(cur.fetchall())
# Letzte 5 erstellten Lizenzen
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id DESC
LIMIT 5
""")
recent_licenses = cur.fetchall()
# Bald ablaufende Lizenzen (nächste 30 Tage)
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
l.valid_until - CURRENT_DATE as days_left
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.valid_until >= CURRENT_DATE
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
AND l.is_active = TRUE
ORDER BY l.valid_until
LIMIT 10
""")
expiring_licenses = cur.fetchall()
# Letztes Backup
cur.execute("""
SELECT created_at, filesize, duration_seconds, backup_type, status
FROM backup_history
ORDER BY created_at DESC
LIMIT 1
""")
last_backup_info = cur.fetchone()
cur.close()
conn.close()
stats = {
'total_customers': total_customers,
'total_licenses': total_licenses,
'active_licenses': active_licenses,
'expired_licenses': expired_licenses,
'expiring_soon': expiring_soon,
'full_licenses': license_types.get('full', 0),
'test_licenses': license_types.get('test', 0),
'recent_licenses': recent_licenses,
'expiring_licenses': expiring_licenses,
'active_sessions': active_sessions_count,
'last_backup': last_backup_info
}
return render_template("dashboard.html", stats=stats, username=session.get('username'))
@app.route("/create", methods=["GET", "POST"])
@login_required
def create_license():
if request.method == "POST":
name = request.form["customer_name"]
email = request.form["email"]
license_key = request.form["license_key"]
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
conn = get_connection()
cur = conn.cursor()
# Kunde einfügen (falls nicht vorhanden)
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
RETURNING id
""", (name, email))
customer_id = cur.fetchone()[0]
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active)
VALUES (%s, %s, %s, %s, %s, TRUE)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until))
license_id = cur.fetchone()[0]
conn.commit()
# Audit-Log
log_audit('CREATE', 'license', license_id,
new_values={
'license_key': license_key,
'customer_name': name,
'customer_email': email,
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until
})
cur.close()
conn.close()
return redirect("/create")
return render_template("index.html", username=session.get('username'))
@app.route("/licenses")
@login_required
def licenses():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
page = request.args.get('page', 1, type=int)
per_page = 20
# SQL Query mit optionaler Suche und Filtern
query = """
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
# Suchfilter
if search:
query += """
AND (LOWER(l.license_key) LIKE LOWER(%s)
OR LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s))
"""
search_param = f'%{search}%'
params.extend([search_param, search_param, search_param])
# Typ-Filter
if filter_type:
query += " AND l.license_type = %s"
params.append(filter_type)
# Status-Filter
if filter_status == 'active':
query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE"
elif filter_status == 'expiring':
query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE"
elif filter_status == 'expired':
query += " AND l.valid_until < CURRENT_DATE"
elif filter_status == 'inactive':
query += " AND l.is_active = FALSE"
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query += " ORDER BY l.valid_until DESC LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
licenses = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("licenses.html",
licenses=licenses,
search=search,
filter_type=filter_type,
filter_status=filter_status,
page=page,
total_pages=total_pages,
total=total,
username=session.get('username'))
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
@login_required
def edit_license(license_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("""
SELECT license_key, license_type, valid_from, valid_until, is_active
FROM licenses WHERE id = %s
""", (license_id,))
old_license = cur.fetchone()
# Update license
license_key = request.form["license_key"]
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
is_active = request.form.get("is_active") == "on"
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s
WHERE id = %s
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
conn.commit()
# Audit-Log
log_audit('UPDATE', 'license', license_id,
old_values={
'license_key': old_license[0],
'license_type': old_license[1],
'valid_from': str(old_license[2]),
'valid_until': str(old_license[3]),
'is_active': old_license[4]
},
new_values={
'license_key': license_key,
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until,
'is_active': is_active
})
cur.close()
conn.close()
return redirect("/licenses")
# Get license data
cur.execute("""
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active, c.id
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
license = cur.fetchone()
cur.close()
conn.close()
if not license:
return redirect("/licenses")
return render_template("edit_license.html", license=license, username=session.get('username'))
@app.route("/license/delete/<int:license_id>", methods=["POST"])
@login_required
def delete_license(license_id):
conn = get_connection()
cur = conn.cursor()
# Lizenzdetails für Audit-Log abrufen
cur.execute("""
SELECT l.license_key, c.name, l.license_type
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
license_info = cur.fetchone()
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
conn.commit()
# Audit-Log
if license_info:
log_audit('DELETE', 'license', license_id,
old_values={
'license_key': license_info[0],
'customer_name': license_info[1],
'license_type': license_info[2]
})
cur.close()
conn.close()
return redirect("/licenses")
@app.route("/customers")
@login_required
def customers():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
# SQL Query mit optionaler Suche
base_query = """
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
"""
params = []
if search:
base_query += """
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
"""
search_param = f'%{search}%'
params.extend([search_param, search_param])
# Gesamtanzahl für Pagination
count_query = f"""
SELECT COUNT(DISTINCT c.id)
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
{"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""}
"""
if search:
cur.execute(count_query, params)
else:
cur.execute(count_query)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query = base_query + """
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
cur.execute(query, params)
customers = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("customers.html",
customers=customers,
search=search,
page=page,
total_pages=total_pages,
total=total,
username=session.get('username'))
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
@login_required
def edit_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
old_customer = cur.fetchone()
# Update customer
name = request.form["name"]
email = request.form["email"]
cur.execute("""
UPDATE customers
SET name = %s, email = %s
WHERE id = %s
""", (name, email, customer_id))
conn.commit()
# Audit-Log
log_audit('UPDATE', 'customer', customer_id,
old_values={
'name': old_customer[0],
'email': old_customer[1]
},
new_values={
'name': name,
'email': email
})
cur.close()
conn.close()
return redirect("/customers")
# Get customer data with licenses
cur.execute("""
SELECT id, name, email, created_at
FROM customers
WHERE id = %s
""", (customer_id,))
customer = cur.fetchone()
# Get customer's licenses
cur.execute("""
SELECT id, license_key, license_type, valid_from, valid_until, is_active
FROM licenses
WHERE customer_id = %s
ORDER BY valid_until DESC
""", (customer_id,))
licenses = cur.fetchall()
cur.close()
conn.close()
if not customer:
return redirect("/customers")
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
@app.route("/customer/delete/<int:customer_id>", methods=["POST"])
@login_required
def delete_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
# Prüfen ob Kunde Lizenzen hat
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
license_count = cur.fetchone()[0]
if license_count > 0:
# Kunde hat Lizenzen - nicht löschen
cur.close()
conn.close()
return redirect("/customers")
# Kundendetails für Audit-Log abrufen
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
customer_info = cur.fetchone()
# Kunde löschen wenn keine Lizenzen vorhanden
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
conn.commit()
# Audit-Log
if customer_info:
log_audit('DELETE', 'customer', customer_id,
old_values={
'name': customer_info[0],
'email': customer_info[1]
})
cur.close()
conn.close()
return redirect("/customers")
@app.route("/sessions")
@login_required
def sessions():
conn = get_connection()
cur = conn.cursor()
# Aktive Sessions abrufen
cur.execute("""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.user_agent, s.started_at, s.last_heartbeat,
EXTRACT(EPOCH FROM (NOW() - s.last_heartbeat))/60 as minutes_inactive
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = TRUE
ORDER BY s.last_heartbeat DESC
""")
active_sessions = cur.fetchall()
# Inaktive Sessions der letzten 24 Stunden
cur.execute("""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.started_at, s.ended_at,
EXTRACT(EPOCH FROM (s.ended_at - s.started_at))/60 as duration_minutes
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = FALSE
AND s.ended_at > NOW() - INTERVAL '24 hours'
ORDER BY s.ended_at DESC
LIMIT 50
""")
recent_sessions = cur.fetchall()
cur.close()
conn.close()
return render_template("sessions.html",
active_sessions=active_sessions,
recent_sessions=recent_sessions,
username=session.get('username'))
@app.route("/session/end/<int:session_id>", methods=["POST"])
@login_required
def end_session(session_id):
conn = get_connection()
cur = conn.cursor()
# Session beenden
cur.execute("""
UPDATE sessions
SET is_active = FALSE, ended_at = NOW()
WHERE id = %s AND is_active = TRUE
""", (session_id,))
conn.commit()
cur.close()
conn.close()
return redirect("/sessions")
@app.route("/export/licenses")
@login_required
def export_licenses():
conn = get_connection()
cur = conn.cursor()
# Alle Lizenzen mit Kundeninformationen abrufen
cur.execute("""
SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email,
l.license_type, l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'Läuft bald ab'
ELSE 'Aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id
""")
# Spaltennamen
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ',
'Gültig von', 'Gültig bis', 'Aktiv', 'Status']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Gültig von'] = pd.to_datetime(df['Gültig von']).dt.strftime('%d.%m.%Y')
df['Gültig bis'] = pd.to_datetime(df['Gültig bis']).dt.strftime('%d.%m.%Y')
# Typ und Aktiv Status anpassen
df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'})
df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'})
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
# Audit-Log
log_audit('EXPORT', 'license',
additional_info=f"Export aller Lizenzen als {export_format.upper()}")
filename = f'lizenzen_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Lizenzen', index=False)
# Formatierung
worksheet = writer.sheets['Lizenzen']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
@app.route("/export/customers")
@login_required
def export_customers():
conn = get_connection()
cur = conn.cursor()
# Alle Kunden mit Lizenzstatistiken
cur.execute("""
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.id
""")
# Spaltennamen
columns = ['ID', 'Name', 'E-Mail', 'Erstellt am',
'Lizenzen gesamt', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Erstellt am'] = pd.to_datetime(df['Erstellt am']).dt.strftime('%d.%m.%Y %H:%M')
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
# Audit-Log
log_audit('EXPORT', 'customer',
additional_info=f"Export aller Kunden als {export_format.upper()}")
filename = f'kunden_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Kunden', index=False)
# Formatierung
worksheet = writer.sheets['Kunden']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
@app.route("/audit")
@login_required
def audit_log():
conn = get_connection()
cur = conn.cursor()
# Parameter
filter_user = request.args.get('user', '').strip()
filter_action = request.args.get('action', '').strip()
filter_entity = request.args.get('entity', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 50
# SQL Query mit optionalen Filtern
query = """
SELECT id, timestamp, username, action, entity_type, entity_id,
old_values, new_values, ip_address, user_agent, additional_info
FROM audit_log
WHERE 1=1
"""
params = []
# Filter
if filter_user:
query += " AND LOWER(username) LIKE LOWER(%s)"
params.append(f'%{filter_user}%')
if filter_action:
query += " AND action = %s"
params.append(filter_action)
if filter_entity:
query += " AND entity_type = %s"
params.append(filter_entity)
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
logs = cur.fetchall()
# JSON-Werte parsen
parsed_logs = []
for log in logs:
parsed_log = list(log)
# old_values und new_values sind bereits Dictionaries (JSONB)
# Keine Konvertierung nötig
parsed_logs.append(parsed_log)
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("audit_log.html",
logs=parsed_logs,
filter_user=filter_user,
filter_action=filter_action,
filter_entity=filter_entity,
page=page,
total_pages=total_pages,
total=total,
username=session.get('username'))
@app.route("/backups")
@login_required
def backups():
"""Zeigt die Backup-Historie an"""
conn = get_connection()
cur = conn.cursor()
# Letztes erfolgreiches Backup für Dashboard
cur.execute("""
SELECT created_at, filesize, duration_seconds
FROM backup_history
WHERE status = 'success'
ORDER BY created_at DESC
LIMIT 1
""")
last_backup = cur.fetchone()
# Alle Backups abrufen
cur.execute("""
SELECT id, filename, filesize, backup_type, status, error_message,
created_at, created_by, tables_count, records_count,
duration_seconds, is_encrypted
FROM backup_history
ORDER BY created_at DESC
""")
backups = cur.fetchall()
cur.close()
conn.close()
return render_template("backups.html",
backups=backups,
last_backup=last_backup,
username=session.get('username'))
@app.route("/backup/create", methods=["POST"])
@login_required
def create_backup_route():
"""Erstellt ein manuelles Backup"""
username = session.get('username')
success, result = create_backup(backup_type="manual", created_by=username)
if success:
return jsonify({
'success': True,
'message': f'Backup erfolgreich erstellt: {result}'
})
else:
return jsonify({
'success': False,
'message': f'Backup fehlgeschlagen: {result}'
}), 500
@app.route("/backup/restore/<int:backup_id>", methods=["POST"])
@login_required
def restore_backup_route(backup_id):
"""Stellt ein Backup wieder her"""
encryption_key = request.form.get('encryption_key')
success, message = restore_backup(backup_id, encryption_key)
if success:
return jsonify({
'success': True,
'message': message
})
else:
return jsonify({
'success': False,
'message': message
}), 500
@app.route("/backup/download/<int:backup_id>")
@login_required
def download_backup(backup_id):
"""Lädt eine Backup-Datei herunter"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT filename, filepath
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
cur.close()
conn.close()
if not backup_info:
return "Backup nicht gefunden", 404
filename, filepath = backup_info
filepath = Path(filepath)
if not filepath.exists():
return "Backup-Datei nicht gefunden", 404
# Audit-Log
log_audit('DOWNLOAD', 'backup', backup_id,
additional_info=f"Backup heruntergeladen: {filename}")
return send_file(filepath, as_attachment=True, download_name=filename)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=443, ssl_context='adhoc')