Dateien
Hetzner-Backup/v2_adminpanel/app.py
2025-06-08 22:37:03 +02:00

2436 Zeilen
82 KiB
Python

import os
import psycopg2
from psycopg2.extras import Json
from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify, flash
from flask_session import Session
from functools import wraps
from dotenv import load_dotenv
import pandas as pd
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import io
import subprocess
import gzip
from cryptography.fernet import Fernet
from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
import logging
import random
import hashlib
import requests
import secrets
import string
import re
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5) # 5 Minuten Session-Timeout
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SECURE'] = False # Wird auf True gesetzt wenn HTTPS (intern läuft HTTP)
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['SESSION_COOKIE_NAME'] = 'admin_session'
# WICHTIG: Session-Cookie soll auch nach 5 Minuten ablaufen
app.config['SESSION_REFRESH_EACH_REQUEST'] = False
Session(app)
# Backup-Konfiguration
BACKUP_DIR = Path("/app/backups")
BACKUP_DIR.mkdir(exist_ok=True)
# Rate-Limiting Konfiguration
FAIL_MESSAGES = [
"NOPE!",
"ACCESS DENIED, TRY HARDER",
"WRONG! 🚫",
"COMPUTER SAYS NO",
"YOU FAILED"
]
MAX_LOGIN_ATTEMPTS = 5
BLOCK_DURATION_HOURS = 24
CAPTCHA_AFTER_ATTEMPTS = 2
# Scheduler für automatische Backups
scheduler = BackgroundScheduler()
scheduler.start()
# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
# Login decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
# Prüfe ob Session abgelaufen ist
if 'last_activity' in session:
last_activity = datetime.fromisoformat(session['last_activity'])
time_since_activity = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) - last_activity
# Debug-Logging
app.logger.info(f"Session check for {session.get('username', 'unknown')}: "
f"Last activity: {last_activity}, "
f"Time since: {time_since_activity.total_seconds()} seconds")
if time_since_activity > timedelta(minutes=5):
# Session abgelaufen - Logout
username = session.get('username', 'unbekannt')
app.logger.info(f"Session timeout for user {username} - auto logout")
# Audit-Log für automatischen Logout (vor session.clear()!)
try:
log_audit('AUTO_LOGOUT', 'session', additional_info={'reason': 'Session timeout (5 minutes)', 'username': username})
except:
pass
session.clear()
flash('Ihre Sitzung ist abgelaufen. Bitte melden Sie sich erneut an.', 'warning')
return redirect(url_for('login'))
# Aktivität NICHT automatisch aktualisieren
# Nur bei expliziten Benutzeraktionen (wird vom Heartbeat gemacht)
return f(*args, **kwargs)
return decorated_function
# DB-Verbindung mit UTF-8 Encoding
def get_connection():
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "5432"),
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
options='-c client_encoding=UTF8'
)
conn.set_client_encoding('UTF8')
return conn
# Audit-Log-Funktion
def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None):
"""Protokolliert Änderungen im Audit-Log"""
conn = get_connection()
cur = conn.cursor()
try:
username = session.get('username', 'system')
ip_address = request.remote_addr if request else None
user_agent = request.headers.get('User-Agent') if request else None
# Konvertiere Dictionaries zu JSONB
old_json = Json(old_values) if old_values else None
new_json = Json(new_values) if new_values else None
cur.execute("""
INSERT INTO audit_log
(username, action, entity_type, entity_id, old_values, new_values,
ip_address, user_agent, additional_info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (username, action, entity_type, entity_id, old_json, new_json,
ip_address, user_agent, additional_info))
conn.commit()
except Exception as e:
print(f"Audit log error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
# Verschlüsselungs-Funktionen
def get_or_create_encryption_key():
"""Holt oder erstellt einen Verschlüsselungsschlüssel"""
key_file = BACKUP_DIR / ".backup_key"
# Versuche Key aus Umgebungsvariable zu lesen
env_key = os.getenv("BACKUP_ENCRYPTION_KEY")
if env_key:
try:
# Validiere den Key
Fernet(env_key.encode())
return env_key.encode()
except:
pass
# Wenn kein gültiger Key in ENV, prüfe Datei
if key_file.exists():
return key_file.read_bytes()
# Erstelle neuen Key
key = Fernet.generate_key()
key_file.write_bytes(key)
logging.info("Neuer Backup-Verschlüsselungsschlüssel erstellt")
return key
# Backup-Funktionen
def create_backup(backup_type="manual", created_by=None):
"""Erstellt ein verschlüsseltes Backup der Datenbank"""
start_time = time.time()
timestamp = datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S")
filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc"
filepath = BACKUP_DIR / filename
conn = get_connection()
cur = conn.cursor()
# Backup-Eintrag erstellen
cur.execute("""
INSERT INTO backup_history
(filename, filepath, backup_type, status, created_by, is_encrypted)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (filename, str(filepath), backup_type, 'in_progress',
created_by or 'system', True))
backup_id = cur.fetchone()[0]
conn.commit()
try:
# PostgreSQL Dump erstellen
dump_command = [
'pg_dump',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password',
'--verbose'
]
# PGPASSWORD setzen
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
# Dump ausführen
result = subprocess.run(dump_command, capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
dump_data = result.stdout.encode('utf-8')
# Komprimieren
compressed_data = gzip.compress(dump_data)
# Verschlüsseln
key = get_or_create_encryption_key()
f = Fernet(key)
encrypted_data = f.encrypt(compressed_data)
# Speichern
filepath.write_bytes(encrypted_data)
# Statistiken sammeln
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'")
tables_count = cur.fetchone()[0]
cur.execute("""
SELECT SUM(n_live_tup)
FROM pg_stat_user_tables
""")
records_count = cur.fetchone()[0] or 0
duration = time.time() - start_time
filesize = filepath.stat().st_size
# Backup-Eintrag aktualisieren
cur.execute("""
UPDATE backup_history
SET status = %s, filesize = %s, tables_count = %s,
records_count = %s, duration_seconds = %s
WHERE id = %s
""", ('success', filesize, tables_count, records_count, duration, backup_id))
conn.commit()
# Audit-Log
log_audit('BACKUP', 'database', backup_id,
additional_info=f"Backup erstellt: {filename} ({filesize} bytes)")
# E-Mail-Benachrichtigung (wenn konfiguriert)
send_backup_notification(True, filename, filesize, duration)
logging.info(f"Backup erfolgreich erstellt: {filename}")
return True, filename
except Exception as e:
# Fehler protokollieren
cur.execute("""
UPDATE backup_history
SET status = %s, error_message = %s, duration_seconds = %s
WHERE id = %s
""", ('failed', str(e), time.time() - start_time, backup_id))
conn.commit()
logging.error(f"Backup fehlgeschlagen: {e}")
send_backup_notification(False, filename, error=str(e))
return False, str(e)
finally:
cur.close()
conn.close()
def restore_backup(backup_id, encryption_key=None):
"""Stellt ein Backup wieder her"""
conn = get_connection()
cur = conn.cursor()
try:
# Backup-Info abrufen
cur.execute("""
SELECT filename, filepath, is_encrypted
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
if not backup_info:
raise Exception("Backup nicht gefunden")
filename, filepath, is_encrypted = backup_info
filepath = Path(filepath)
if not filepath.exists():
raise Exception("Backup-Datei nicht gefunden")
# Datei lesen
encrypted_data = filepath.read_bytes()
# Entschlüsseln
if is_encrypted:
key = encryption_key.encode() if encryption_key else get_or_create_encryption_key()
try:
f = Fernet(key)
compressed_data = f.decrypt(encrypted_data)
except:
raise Exception("Entschlüsselung fehlgeschlagen. Falsches Passwort?")
else:
compressed_data = encrypted_data
# Dekomprimieren
dump_data = gzip.decompress(compressed_data)
sql_commands = dump_data.decode('utf-8')
# Bestehende Verbindungen schließen
cur.close()
conn.close()
# Datenbank wiederherstellen
restore_command = [
'psql',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password'
]
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
result = subprocess.run(restore_command, input=sql_commands,
capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"Wiederherstellung fehlgeschlagen: {result.stderr}")
# Audit-Log (neue Verbindung)
log_audit('RESTORE', 'database', backup_id,
additional_info=f"Backup wiederhergestellt: {filename}")
return True, "Backup erfolgreich wiederhergestellt"
except Exception as e:
logging.error(f"Wiederherstellung fehlgeschlagen: {e}")
return False, str(e)
def send_backup_notification(success, filename, filesize=None, duration=None, error=None):
"""Sendet E-Mail-Benachrichtigung (wenn konfiguriert)"""
if not os.getenv("EMAIL_ENABLED", "false").lower() == "true":
return
# E-Mail-Funktion vorbereitet aber deaktiviert
# TODO: Implementieren wenn E-Mail-Server konfiguriert ist
logging.info(f"E-Mail-Benachrichtigung vorbereitet: Backup {'erfolgreich' if success else 'fehlgeschlagen'}")
# Scheduled Backup Job
def scheduled_backup():
"""Führt ein geplantes Backup aus"""
logging.info("Starte geplantes Backup...")
create_backup(backup_type="scheduled", created_by="scheduler")
# Scheduler konfigurieren - täglich um 3:00 Uhr
scheduler.add_job(
scheduled_backup,
'cron',
hour=3,
minute=0,
id='daily_backup',
replace_existing=True
)
# Rate-Limiting Funktionen
def get_client_ip():
"""Ermittelt die echte IP-Adresse des Clients"""
if request.environ.get('HTTP_X_FORWARDED_FOR'):
return request.environ['HTTP_X_FORWARDED_FOR'].split(',')[0]
elif request.environ.get('HTTP_X_REAL_IP'):
return request.environ.get('HTTP_X_REAL_IP')
else:
return request.environ.get('REMOTE_ADDR')
def check_ip_blocked(ip_address):
"""Prüft ob eine IP-Adresse gesperrt ist"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT blocked_until FROM login_attempts
WHERE ip_address = %s AND blocked_until IS NOT NULL
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
if result and result[0]:
if result[0] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None):
return True, result[0]
return False, None
def record_failed_attempt(ip_address, username):
"""Zeichnet einen fehlgeschlagenen Login-Versuch auf"""
conn = get_connection()
cur = conn.cursor()
# Random Fehlermeldung
error_message = random.choice(FAIL_MESSAGES)
try:
# Prüfen ob IP bereits existiert
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
if result:
# Update bestehenden Eintrag
new_count = result[0] + 1
blocked_until = None
if new_count >= MAX_LOGIN_ATTEMPTS:
blocked_until = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) + timedelta(hours=BLOCK_DURATION_HOURS)
# E-Mail-Benachrichtigung (wenn aktiviert)
if os.getenv("EMAIL_ENABLED", "false").lower() == "true":
send_security_alert_email(ip_address, username, new_count)
cur.execute("""
UPDATE login_attempts
SET attempt_count = %s,
last_attempt = CURRENT_TIMESTAMP,
blocked_until = %s,
last_username_tried = %s,
last_error_message = %s
WHERE ip_address = %s
""", (new_count, blocked_until, username, error_message, ip_address))
else:
# Neuen Eintrag erstellen
cur.execute("""
INSERT INTO login_attempts
(ip_address, attempt_count, last_username_tried, last_error_message)
VALUES (%s, 1, %s, %s)
""", (ip_address, username, error_message))
conn.commit()
# Audit-Log
log_audit('LOGIN_FAILED', 'user',
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
except Exception as e:
print(f"Rate limiting error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
return error_message
def reset_login_attempts(ip_address):
"""Setzt die Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
try:
cur.execute("""
DELETE FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
conn.commit()
except Exception as e:
print(f"Reset attempts error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
def get_login_attempts(ip_address):
"""Gibt die Anzahl der Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
return result[0] if result else 0
def send_security_alert_email(ip_address, username, attempt_count):
"""Sendet eine Sicherheitswarnung per E-Mail"""
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
body = f"""
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
IP-Adresse: {ip_address}
Versuchter Benutzername: {username}
Anzahl Versuche: {attempt_count}
Zeit: {datetime.now(ZoneInfo("Europe/Berlin")).strftime('%Y-%m-%d %H:%M:%S')}
Die IP-Adresse wurde für 24 Stunden gesperrt.
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
"""
# TODO: E-Mail-Versand implementieren wenn SMTP konfiguriert
logging.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
print(f"E-Mail würde gesendet: {subject}")
def verify_recaptcha(response):
"""Verifiziert die reCAPTCHA v2 Response mit Google"""
secret_key = os.getenv('RECAPTCHA_SECRET_KEY')
# Wenn kein Secret Key konfiguriert ist, CAPTCHA als bestanden werten (für PoC)
if not secret_key:
logging.warning("RECAPTCHA_SECRET_KEY nicht konfiguriert - CAPTCHA wird übersprungen")
return True
# Verifizierung bei Google
try:
verify_url = 'https://www.google.com/recaptcha/api/siteverify'
data = {
'secret': secret_key,
'response': response
}
# Timeout für Request setzen
r = requests.post(verify_url, data=data, timeout=5)
result = r.json()
# Log für Debugging
if not result.get('success'):
logging.warning(f"reCAPTCHA Validierung fehlgeschlagen: {result.get('error-codes', [])}")
return result.get('success', False)
except requests.exceptions.RequestException as e:
logging.error(f"reCAPTCHA Verifizierung fehlgeschlagen: {str(e)}")
# Bei Netzwerkfehlern CAPTCHA als bestanden werten
return True
except Exception as e:
logging.error(f"Unerwarteter Fehler bei reCAPTCHA: {str(e)}")
return False
def generate_license_key(license_type='full'):
"""
Generiert einen Lizenzschlüssel im Format: AF-YYYYMMFT-XXXX-YYYY-ZZZZ
AF = Account Factory (Produktkennung)
YYYY = Jahr
MM = Monat
FT = F für Fullversion, T für Testversion
XXXX-YYYY-ZZZZ = Zufällige alphanumerische Zeichen
"""
# Erlaubte Zeichen (ohne verwirrende wie 0/O, 1/I/l)
chars = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'
# Datum-Teil
now = datetime.now(ZoneInfo("Europe/Berlin"))
date_part = now.strftime('%Y%m')
type_char = 'F' if license_type == 'full' else 'T'
# Zufällige Teile generieren (3 Blöcke à 4 Zeichen)
parts = []
for _ in range(3):
part = ''.join(secrets.choice(chars) for _ in range(4))
parts.append(part)
# Key zusammensetzen
key = f"AF-{date_part}{type_char}-{parts[0]}-{parts[1]}-{parts[2]}"
return key
def validate_license_key(key):
"""
Validiert das License Key Format
Erwartet: AF-YYYYMMFT-XXXX-YYYY-ZZZZ
"""
if not key:
return False
# Pattern für das spezifische Format
# AF- (fest) + 6 Ziffern (YYYYMM) + F oder T + - + 4 Zeichen + - + 4 Zeichen + - + 4 Zeichen
pattern = r'^AF-\d{6}[FT]-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
# Großbuchstaben für Vergleich
return bool(re.match(pattern, key.upper()))
@app.route("/login", methods=["GET", "POST"])
def login():
# Timing-Attack Schutz - Start Zeit merken
start_time = time.time()
# IP-Adresse ermitteln
ip_address = get_client_ip()
# Prüfen ob IP gesperrt ist
is_blocked, blocked_until = check_ip_blocked(ip_address)
if is_blocked:
time_remaining = (blocked_until - datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None)).total_seconds() / 3600
error_msg = f"IP GESPERRT! Noch {time_remaining:.1f} Stunden warten."
return render_template("login.html", error=error_msg, error_type="blocked")
# Anzahl bisheriger Versuche
attempt_count = get_login_attempts(ip_address)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
captcha_response = request.form.get("g-recaptcha-response")
# CAPTCHA-Prüfung nur wenn Keys konfiguriert sind
recaptcha_site_key = os.getenv('RECAPTCHA_SITE_KEY')
if attempt_count >= CAPTCHA_AFTER_ATTEMPTS and recaptcha_site_key:
if not captcha_response:
# Timing-Attack Schutz
elapsed = time.time() - start_time
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
return render_template("login.html",
error="CAPTCHA ERFORDERLICH!",
show_captcha=True,
error_type="captcha",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=recaptcha_site_key)
# CAPTCHA validieren
if not verify_recaptcha(captcha_response):
# Timing-Attack Schutz
elapsed = time.time() - start_time
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
return render_template("login.html",
error="CAPTCHA UNGÜLTIG! Bitte erneut versuchen.",
show_captcha=True,
error_type="captcha",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=recaptcha_site_key)
# Check gegen beide Admin-Accounts aus .env
admin1_user = os.getenv("ADMIN1_USERNAME")
admin1_pass = os.getenv("ADMIN1_PASSWORD")
admin2_user = os.getenv("ADMIN2_USERNAME")
admin2_pass = os.getenv("ADMIN2_PASSWORD")
# Login-Prüfung
login_success = False
if ((username == admin1_user and password == admin1_pass) or
(username == admin2_user and password == admin2_pass)):
login_success = True
# Timing-Attack Schutz - Mindestens 1 Sekunde warten
elapsed = time.time() - start_time
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
if login_success:
# Erfolgreicher Login
session.permanent = True # Aktiviert das Timeout
session['logged_in'] = True
session['username'] = username
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
reset_login_attempts(ip_address)
log_audit('LOGIN_SUCCESS', 'user',
additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}")
return redirect(url_for('dashboard'))
else:
# Fehlgeschlagener Login
error_message = record_failed_attempt(ip_address, username)
new_attempt_count = get_login_attempts(ip_address)
# Prüfen ob jetzt gesperrt
is_now_blocked, _ = check_ip_blocked(ip_address)
if is_now_blocked:
log_audit('LOGIN_BLOCKED', 'security',
additional_info=f"IP {ip_address} wurde nach {MAX_LOGIN_ATTEMPTS} Versuchen gesperrt")
return render_template("login.html",
error=error_message,
show_captcha=(new_attempt_count >= CAPTCHA_AFTER_ATTEMPTS and os.getenv('RECAPTCHA_SITE_KEY')),
error_type="failed",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - new_attempt_count),
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
# GET Request
return render_template("login.html",
show_captcha=(attempt_count >= CAPTCHA_AFTER_ATTEMPTS and os.getenv('RECAPTCHA_SITE_KEY')),
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
@app.route("/logout")
def logout():
username = session.get('username', 'unknown')
log_audit('LOGOUT', 'user', additional_info=f"Abmeldung")
session.pop('logged_in', None)
session.pop('username', None)
return redirect(url_for('login'))
@app.route("/heartbeat", methods=['POST'])
@login_required
def heartbeat():
"""Endpoint für Session Keep-Alive - aktualisiert last_activity"""
# Aktualisiere last_activity nur wenn explizit angefordert
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
# Force session save
session.modified = True
return jsonify({
'status': 'ok',
'last_activity': session['last_activity'],
'username': session.get('username')
})
@app.route("/api/generate-license-key", methods=['POST'])
@login_required
def api_generate_key():
"""API Endpoint zur Generierung eines neuen Lizenzschlüssels"""
try:
# Lizenztyp aus Request holen (default: full)
data = request.get_json() or {}
license_type = data.get('type', 'full')
# Key generieren
key = generate_license_key(license_type)
# Prüfen ob Key bereits existiert (sehr unwahrscheinlich aber sicher ist sicher)
conn = get_connection()
cur = conn.cursor()
# Wiederhole bis eindeutiger Key gefunden
attempts = 0
while attempts < 10: # Max 10 Versuche
cur.execute("SELECT 1 FROM licenses WHERE license_key = %s", (key,))
if not cur.fetchone():
break # Key ist eindeutig
key = generate_license_key(license_type)
attempts += 1
cur.close()
conn.close()
# Log für Audit
log_audit('GENERATE_KEY', 'license',
additional_info={'type': license_type, 'key': key})
return jsonify({
'success': True,
'key': key,
'type': license_type
})
except Exception as e:
logging.error(f"Fehler bei Key-Generierung: {str(e)}")
return jsonify({
'success': False,
'error': 'Fehler bei der Key-Generierung'
}), 500
@app.route("/api/customers", methods=['GET'])
@login_required
def api_customers():
"""API Endpoint für die Kundensuche mit Select2"""
try:
# Suchparameter
search = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = 20
conn = get_connection()
cur = conn.cursor()
# SQL Query mit optionaler Suche
if search:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
GROUP BY c.id, c.name, c.email
ORDER BY c.name
LIMIT %s OFFSET %s
""", (f'%{search}%', f'%{search}%', per_page, (page - 1) * per_page))
else:
cur.execute("""
SELECT c.id, c.name, c.email,
COUNT(l.id) as license_count
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email
ORDER BY c.name
LIMIT %s OFFSET %s
""", (per_page, (page - 1) * per_page))
customers = cur.fetchall()
# Format für Select2
results = []
for customer in customers:
results.append({
'id': customer[0],
'text': f"{customer[1]} - {customer[2]} ({customer[3]} Lizenzen)",
'name': customer[1],
'email': customer[2],
'license_count': customer[3]
})
# Gesamtanzahl für Pagination
if search:
cur.execute("""
SELECT COUNT(*) FROM customers
WHERE LOWER(name) LIKE LOWER(%s)
OR LOWER(email) LIKE LOWER(%s)
""", (f'%{search}%', f'%{search}%'))
else:
cur.execute("SELECT COUNT(*) FROM customers")
total_count = cur.fetchone()[0]
cur.close()
conn.close()
# Select2 Response Format
return jsonify({
'results': results,
'pagination': {
'more': (page * per_page) < total_count
}
})
except Exception as e:
logging.error(f"Fehler bei Kundensuche: {str(e)}")
return jsonify({
'results': [],
'error': 'Fehler bei der Kundensuche'
}), 500
@app.route("/")
@login_required
def dashboard():
conn = get_connection()
cur = conn.cursor()
# Statistiken abrufen
# Gesamtanzahl Kunden
cur.execute("SELECT COUNT(*) FROM customers")
total_customers = cur.fetchone()[0]
# Gesamtanzahl Lizenzen
cur.execute("SELECT COUNT(*) FROM licenses")
total_licenses = cur.fetchone()[0]
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
""")
active_licenses = cur.fetchone()[0]
# Aktive Sessions
cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE")
active_sessions_count = cur.fetchone()[0]
# Abgelaufene Lizenzen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until < CURRENT_DATE
""")
expired_licenses = cur.fetchone()[0]
# Deaktivierte Lizenzen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE is_active = FALSE
""")
inactive_licenses = cur.fetchone()[0]
# Lizenzen die in den nächsten 30 Tagen ablaufen
cur.execute("""
SELECT COUNT(*) FROM licenses
WHERE valid_until >= CURRENT_DATE
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
AND is_active = TRUE
""")
expiring_soon = cur.fetchone()[0]
# Testlizenzen vs Vollversionen
cur.execute("""
SELECT license_type, COUNT(*)
FROM licenses
GROUP BY license_type
""")
license_types = dict(cur.fetchall())
# Letzte 5 erstellten Lizenzen
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id DESC
LIMIT 5
""")
recent_licenses = cur.fetchall()
# Bald ablaufende Lizenzen (nächste 30 Tage)
cur.execute("""
SELECT l.id, l.license_key, c.name, l.valid_until,
l.valid_until - CURRENT_DATE as days_left
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.valid_until >= CURRENT_DATE
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
AND l.is_active = TRUE
ORDER BY l.valid_until
LIMIT 10
""")
expiring_licenses = cur.fetchall()
# Letztes Backup
cur.execute("""
SELECT created_at, filesize, duration_seconds, backup_type, status
FROM backup_history
ORDER BY created_at DESC
LIMIT 1
""")
last_backup_info = cur.fetchone()
# Sicherheitsstatistiken
# Gesperrte IPs
cur.execute("""
SELECT COUNT(*) FROM login_attempts
WHERE blocked_until IS NOT NULL AND blocked_until > CURRENT_TIMESTAMP
""")
blocked_ips_count = cur.fetchone()[0]
# Fehlversuche heute
cur.execute("""
SELECT COALESCE(SUM(attempt_count), 0) FROM login_attempts
WHERE last_attempt::date = CURRENT_DATE
""")
failed_attempts_today = cur.fetchone()[0]
# Letzte 5 Sicherheitsereignisse
cur.execute("""
SELECT
la.ip_address,
la.attempt_count,
la.last_attempt,
la.blocked_until,
la.last_username_tried,
la.last_error_message
FROM login_attempts la
ORDER BY la.last_attempt DESC
LIMIT 5
""")
recent_security_events = []
for event in cur.fetchall():
recent_security_events.append({
'ip_address': event[0],
'attempt_count': event[1],
'last_attempt': event[2].strftime('%d.%m %H:%M'),
'blocked_until': event[3].strftime('%d.%m %H:%M') if event[3] and event[3] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) else None,
'username_tried': event[4],
'error_message': event[5]
})
# Sicherheitslevel berechnen
if blocked_ips_count > 5 or failed_attempts_today > 50:
security_level = 'danger'
security_level_text = 'KRITISCH'
elif blocked_ips_count > 2 or failed_attempts_today > 20:
security_level = 'warning'
security_level_text = 'ERHÖHT'
else:
security_level = 'success'
security_level_text = 'NORMAL'
cur.close()
conn.close()
stats = {
'total_customers': total_customers,
'total_licenses': total_licenses,
'active_licenses': active_licenses,
'expired_licenses': expired_licenses,
'inactive_licenses': inactive_licenses,
'expiring_soon': expiring_soon,
'full_licenses': license_types.get('full', 0),
'test_licenses': license_types.get('test', 0),
'recent_licenses': recent_licenses,
'expiring_licenses': expiring_licenses,
'active_sessions': active_sessions_count,
'last_backup': last_backup_info,
# Sicherheitsstatistiken
'blocked_ips_count': blocked_ips_count,
'failed_attempts_today': failed_attempts_today,
'recent_security_events': recent_security_events,
'security_level': security_level,
'security_level_text': security_level_text
}
return render_template("dashboard.html", stats=stats, username=session.get('username'))
@app.route("/create", methods=["GET", "POST"])
@login_required
def create_license():
if request.method == "POST":
customer_id = request.form.get("customer_id")
license_key = request.form["license_key"].upper() # Immer Großbuchstaben
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
duration_type = request.form.get("duration_type", "years")
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
start_date = datetime.strptime(valid_from, "%Y-%m-%d")
if duration_type == "days":
end_date = start_date + timedelta(days=duration)
elif duration_type == "months":
end_date = start_date + relativedelta(months=duration)
else: # years
end_date = start_date + relativedelta(years=duration)
# Ein Tag abziehen, da der Starttag mitgezählt wird
end_date = end_date - timedelta(days=1)
valid_until = end_date.strftime("%Y-%m-%d")
# Validiere License Key Format
if not validate_license_key(license_key):
flash('Ungültiges License Key Format! Erwartet: AF-YYYYMMFT-XXXX-YYYY-ZZZZ', 'error')
return redirect(url_for('create_license'))
conn = get_connection()
cur = conn.cursor()
try:
# Prüfe ob neuer Kunde oder bestehender
if customer_id == "new":
# Neuer Kunde
name = request.form.get("customer_name")
email = request.form.get("email")
if not name:
flash('Kundenname ist erforderlich!', 'error')
return redirect(url_for('create_license'))
# Prüfe ob E-Mail bereits existiert
if email:
cur.execute("SELECT id, name FROM customers WHERE LOWER(email) = LOWER(%s)", (email,))
existing = cur.fetchone()
if existing:
flash(f'E-Mail bereits vergeben für Kunde: {existing[1]}', 'error')
return redirect(url_for('create_license'))
# Kunde einfügen
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
RETURNING id
""", (name, email))
customer_id = cur.fetchone()[0]
customer_info = {'name': name, 'email': email}
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email})
else:
# Bestehender Kunde - hole Infos für Audit-Log
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('create_license'))
customer_info = {'name': customer_data[0], 'email': customer_data[1]}
# Lizenz hinzufügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active)
VALUES (%s, %s, %s, %s, %s, TRUE)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until))
license_id = cur.fetchone()[0]
conn.commit()
# Audit-Log
log_audit('CREATE', 'license', license_id,
new_values={
'license_key': license_key,
'customer_name': customer_info['name'],
'customer_email': customer_info['email'],
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until
})
flash(f'Lizenz {license_key} erfolgreich erstellt!', 'success')
except Exception as e:
conn.rollback()
logging.error(f"Fehler beim Erstellen der Lizenz: {str(e)}")
flash('Fehler beim Erstellen der Lizenz!', 'error')
finally:
cur.close()
conn.close()
return redirect("/create")
return render_template("index.html", username=session.get('username'))
@app.route("/batch", methods=["GET", "POST"])
@login_required
def batch_licenses():
"""Batch-Generierung mehrerer Lizenzen für einen Kunden"""
if request.method == "POST":
# Formulardaten
customer_id = request.form.get("customer_id")
license_type = request.form["license_type"]
quantity = int(request.form["quantity"])
valid_from = request.form["valid_from"]
# Berechne valid_until basierend auf Laufzeit
duration = int(request.form.get("duration", 1))
duration_type = request.form.get("duration_type", "years")
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
start_date = datetime.strptime(valid_from, "%Y-%m-%d")
if duration_type == "days":
end_date = start_date + timedelta(days=duration)
elif duration_type == "months":
end_date = start_date + relativedelta(months=duration)
else: # years
end_date = start_date + relativedelta(years=duration)
# Ein Tag abziehen, da der Starttag mitgezählt wird
end_date = end_date - timedelta(days=1)
valid_until = end_date.strftime("%Y-%m-%d")
# Sicherheitslimit
if quantity < 1 or quantity > 100:
flash('Anzahl muss zwischen 1 und 100 liegen!', 'error')
return redirect(url_for('batch_licenses'))
conn = get_connection()
cur = conn.cursor()
try:
# Prüfe ob neuer Kunde oder bestehender
if customer_id == "new":
# Neuer Kunde
name = request.form.get("customer_name")
email = request.form.get("email")
if not name:
flash('Kundenname ist erforderlich!', 'error')
return redirect(url_for('batch_licenses'))
# Prüfe ob E-Mail bereits existiert
if email:
cur.execute("SELECT id, name FROM customers WHERE LOWER(email) = LOWER(%s)", (email,))
existing = cur.fetchone()
if existing:
flash(f'E-Mail bereits vergeben für Kunde: {existing[1]}', 'error')
return redirect(url_for('batch_licenses'))
# Kunde einfügen
cur.execute("""
INSERT INTO customers (name, email, created_at)
VALUES (%s, %s, NOW())
RETURNING id
""", (name, email))
customer_id = cur.fetchone()[0]
# Audit-Log für neuen Kunden
log_audit('CREATE', 'customer', customer_id,
new_values={'name': name, 'email': email})
else:
# Bestehender Kunde - hole Infos
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
customer_data = cur.fetchone()
if not customer_data:
flash('Kunde nicht gefunden!', 'error')
return redirect(url_for('batch_licenses'))
name = customer_data[0]
email = customer_data[1]
# Lizenzen generieren und speichern
generated_licenses = []
for i in range(quantity):
# Eindeutigen Key generieren
attempts = 0
while attempts < 10:
license_key = generate_license_key(license_type)
cur.execute("SELECT 1 FROM licenses WHERE license_key = %s", (license_key,))
if not cur.fetchone():
break
attempts += 1
# Lizenz einfügen
cur.execute("""
INSERT INTO licenses (license_key, customer_id, license_type,
valid_from, valid_until, is_active)
VALUES (%s, %s, %s, %s, %s, true)
RETURNING id
""", (license_key, customer_id, license_type, valid_from, valid_until))
license_id = cur.fetchone()[0]
generated_licenses.append({
'id': license_id,
'key': license_key,
'type': license_type
})
conn.commit()
# Audit-Log
log_audit('CREATE_BATCH', 'license',
new_values={'customer': name, 'quantity': quantity, 'type': license_type},
additional_info=f"Batch-Generierung von {quantity} Lizenzen")
# Session für Export speichern
session['batch_export'] = {
'customer': name,
'email': email,
'licenses': generated_licenses,
'valid_from': valid_from,
'valid_until': valid_until,
'timestamp': datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
}
flash(f'{quantity} Lizenzen erfolgreich generiert!', 'success')
return render_template("batch_result.html",
customer=name,
email=email,
licenses=generated_licenses,
valid_from=valid_from,
valid_until=valid_until)
except Exception as e:
conn.rollback()
logging.error(f"Fehler bei Batch-Generierung: {str(e)}")
flash('Fehler bei der Batch-Generierung!', 'error')
return redirect(url_for('batch_licenses'))
finally:
cur.close()
conn.close()
# GET Request
return render_template("batch_form.html")
@app.route("/batch/export")
@login_required
def export_batch():
"""Exportiert die zuletzt generierten Batch-Lizenzen"""
batch_data = session.get('batch_export')
if not batch_data:
flash('Keine Batch-Daten zum Exportieren vorhanden!', 'error')
return redirect(url_for('batch_licenses'))
# CSV generieren
output = io.StringIO()
output.write('\ufeff') # UTF-8 BOM für Excel
# Header
output.write(f"Kunde: {batch_data['customer']}\n")
output.write(f"E-Mail: {batch_data['email']}\n")
output.write(f"Generiert am: {datetime.fromisoformat(batch_data['timestamp']).strftime('%d.%m.%Y %H:%M')}\n")
output.write(f"Gültig von: {batch_data['valid_from']} bis {batch_data['valid_until']}\n")
output.write("\n")
output.write("Nr;Lizenzschlüssel;Typ\n")
# Lizenzen
for i, license in enumerate(batch_data['licenses'], 1):
typ_text = "Vollversion" if license['type'] == 'full' else "Testversion"
output.write(f"{i};{license['key']};{typ_text}\n")
output.seek(0)
# Audit-Log
log_audit('EXPORT', 'batch_licenses',
additional_info=f"Export von {len(batch_data['licenses'])} Batch-Lizenzen")
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f"batch_licenses_{batch_data['customer'].replace(' ', '_')}_{datetime.now(ZoneInfo('Europe/Berlin')).strftime('%Y%m%d_%H%M%S')}.csv"
)
@app.route("/licenses")
@login_required
def licenses():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
filter_type = request.args.get('type', '')
filter_status = request.args.get('status', '')
page = request.args.get('page', 1, type=int)
sort = request.args.get('sort', 'valid_until')
order = request.args.get('order', 'desc')
per_page = 20
# Whitelist für erlaubte Sortierfelder
allowed_sort_fields = {
'id': 'l.id',
'license_key': 'l.license_key',
'customer': 'c.name',
'email': 'c.email',
'type': 'l.license_type',
'valid_from': 'l.valid_from',
'valid_until': 'l.valid_until',
'status': 'status',
'active': 'l.is_active'
}
# Validierung
if sort not in allowed_sort_fields:
sort = 'valid_until'
if order not in ['asc', 'desc']:
order = 'desc'
sort_field = allowed_sort_fields[sort]
# SQL Query mit optionaler Suche und Filtern
query = """
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE 1=1
"""
params = []
# Suchfilter
if search:
query += """
AND (LOWER(l.license_key) LIKE LOWER(%s)
OR LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s))
"""
search_param = f'%{search}%'
params.extend([search_param, search_param, search_param])
# Typ-Filter
if filter_type:
query += " AND l.license_type = %s"
params.append(filter_type)
# Status-Filter
if filter_status == 'active':
query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE"
elif filter_status == 'expiring':
query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE"
elif filter_status == 'expired':
query += " AND l.valid_until < CURRENT_DATE"
elif filter_status == 'inactive':
query += " AND l.is_active = FALSE"
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
# Spezialbehandlung für berechnete Felder
if sort == 'status':
# Für Status müssen wir die CASE-Bedingung in ORDER BY wiederholen
query += f""" ORDER BY
CASE
WHEN l.is_active = FALSE THEN 'deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
ELSE 'aktiv'
END {order.upper()} LIMIT %s OFFSET %s"""
else:
query += f" ORDER BY {sort_field} {order.upper()} LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
licenses = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("licenses.html",
licenses=licenses,
search=search,
filter_type=filter_type,
filter_status=filter_status,
page=page,
total_pages=total_pages,
total=total,
sort=sort,
order=order,
username=session.get('username'))
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
@login_required
def edit_license(license_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("""
SELECT license_key, license_type, valid_from, valid_until, is_active
FROM licenses WHERE id = %s
""", (license_id,))
old_license = cur.fetchone()
# Update license
license_key = request.form["license_key"]
license_type = request.form["license_type"]
valid_from = request.form["valid_from"]
valid_until = request.form["valid_until"]
is_active = request.form.get("is_active") == "on"
cur.execute("""
UPDATE licenses
SET license_key = %s, license_type = %s, valid_from = %s,
valid_until = %s, is_active = %s
WHERE id = %s
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
conn.commit()
# Audit-Log
log_audit('UPDATE', 'license', license_id,
old_values={
'license_key': old_license[0],
'license_type': old_license[1],
'valid_from': str(old_license[2]),
'valid_until': str(old_license[3]),
'is_active': old_license[4]
},
new_values={
'license_key': license_key,
'license_type': license_type,
'valid_from': valid_from,
'valid_until': valid_until,
'is_active': is_active
})
cur.close()
conn.close()
return redirect("/licenses")
# Get license data
cur.execute("""
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
l.valid_from, l.valid_until, l.is_active, c.id
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
license = cur.fetchone()
cur.close()
conn.close()
if not license:
return redirect("/licenses")
return render_template("edit_license.html", license=license, username=session.get('username'))
@app.route("/license/delete/<int:license_id>", methods=["POST"])
@login_required
def delete_license(license_id):
conn = get_connection()
cur = conn.cursor()
# Lizenzdetails für Audit-Log abrufen
cur.execute("""
SELECT l.license_key, c.name, l.license_type
FROM licenses l
JOIN customers c ON l.customer_id = c.id
WHERE l.id = %s
""", (license_id,))
license_info = cur.fetchone()
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
conn.commit()
# Audit-Log
if license_info:
log_audit('DELETE', 'license', license_id,
old_values={
'license_key': license_info[0],
'customer_name': license_info[1],
'license_type': license_info[2]
})
cur.close()
conn.close()
return redirect("/licenses")
@app.route("/customers")
@login_required
def customers():
conn = get_connection()
cur = conn.cursor()
# Parameter
search = request.args.get('search', '').strip()
page = request.args.get('page', 1, type=int)
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
per_page = 20
# Whitelist für erlaubte Sortierfelder
allowed_sort_fields = {
'id': 'c.id',
'name': 'c.name',
'email': 'c.email',
'created_at': 'c.created_at',
'licenses': 'license_count',
'active_licenses': 'active_licenses'
}
# Validierung
if sort not in allowed_sort_fields:
sort = 'created_at'
if order not in ['asc', 'desc']:
order = 'desc'
sort_field = allowed_sort_fields[sort]
# SQL Query mit optionaler Suche
base_query = """
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as license_count,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
"""
params = []
if search:
base_query += """
WHERE LOWER(c.name) LIKE LOWER(%s)
OR LOWER(c.email) LIKE LOWER(%s)
"""
search_param = f'%{search}%'
params.extend([search_param, search_param])
# Gesamtanzahl für Pagination
count_query = f"""
SELECT COUNT(DISTINCT c.id)
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
{"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""}
"""
if search:
cur.execute(count_query, params)
else:
cur.execute(count_query)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query = base_query + f"""
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY {sort_field} {order.upper()}
LIMIT %s OFFSET %s
"""
params.extend([per_page, offset])
cur.execute(query, params)
customers = cur.fetchall()
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("customers.html",
customers=customers,
search=search,
page=page,
total_pages=total_pages,
total=total,
sort=sort,
order=order,
username=session.get('username'))
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
@login_required
def edit_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
if request.method == "POST":
# Alte Werte für Audit-Log abrufen
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
old_customer = cur.fetchone()
# Update customer
name = request.form["name"]
email = request.form["email"]
cur.execute("""
UPDATE customers
SET name = %s, email = %s
WHERE id = %s
""", (name, email, customer_id))
conn.commit()
# Audit-Log
log_audit('UPDATE', 'customer', customer_id,
old_values={
'name': old_customer[0],
'email': old_customer[1]
},
new_values={
'name': name,
'email': email
})
cur.close()
conn.close()
return redirect("/customers")
# Get customer data with licenses
cur.execute("""
SELECT id, name, email, created_at
FROM customers
WHERE id = %s
""", (customer_id,))
customer = cur.fetchone()
# Get customer's licenses
cur.execute("""
SELECT id, license_key, license_type, valid_from, valid_until, is_active
FROM licenses
WHERE customer_id = %s
ORDER BY valid_until DESC
""", (customer_id,))
licenses = cur.fetchall()
cur.close()
conn.close()
if not customer:
return redirect("/customers")
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
@app.route("/customer/delete/<int:customer_id>", methods=["POST"])
@login_required
def delete_customer(customer_id):
conn = get_connection()
cur = conn.cursor()
# Prüfen ob Kunde Lizenzen hat
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
license_count = cur.fetchone()[0]
if license_count > 0:
# Kunde hat Lizenzen - nicht löschen
cur.close()
conn.close()
return redirect("/customers")
# Kundendetails für Audit-Log abrufen
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
customer_info = cur.fetchone()
# Kunde löschen wenn keine Lizenzen vorhanden
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
conn.commit()
# Audit-Log
if customer_info:
log_audit('DELETE', 'customer', customer_id,
old_values={
'name': customer_info[0],
'email': customer_info[1]
})
cur.close()
conn.close()
return redirect("/customers")
@app.route("/sessions")
@login_required
def sessions():
conn = get_connection()
cur = conn.cursor()
# Sortierparameter
active_sort = request.args.get('active_sort', 'last_heartbeat')
active_order = request.args.get('active_order', 'desc')
ended_sort = request.args.get('ended_sort', 'ended_at')
ended_order = request.args.get('ended_order', 'desc')
# Whitelist für erlaubte Sortierfelder - Aktive Sessions
active_sort_fields = {
'customer': 'c.name',
'license': 'l.license_key',
'ip': 's.ip_address',
'started': 's.started_at',
'last_heartbeat': 's.last_heartbeat',
'inactive': 'minutes_inactive'
}
# Whitelist für erlaubte Sortierfelder - Beendete Sessions
ended_sort_fields = {
'customer': 'c.name',
'license': 'l.license_key',
'ip': 's.ip_address',
'started': 's.started_at',
'ended_at': 's.ended_at',
'duration': 'duration_minutes'
}
# Validierung
if active_sort not in active_sort_fields:
active_sort = 'last_heartbeat'
if ended_sort not in ended_sort_fields:
ended_sort = 'ended_at'
if active_order not in ['asc', 'desc']:
active_order = 'desc'
if ended_order not in ['asc', 'desc']:
ended_order = 'desc'
# Aktive Sessions abrufen
cur.execute(f"""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.user_agent, s.started_at, s.last_heartbeat,
EXTRACT(EPOCH FROM (NOW() - s.last_heartbeat))/60 as minutes_inactive
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = TRUE
ORDER BY {active_sort_fields[active_sort]} {active_order.upper()}
""")
active_sessions = cur.fetchall()
# Inaktive Sessions der letzten 24 Stunden
cur.execute(f"""
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
s.started_at, s.ended_at,
EXTRACT(EPOCH FROM (s.ended_at - s.started_at))/60 as duration_minutes
FROM sessions s
JOIN licenses l ON s.license_id = l.id
JOIN customers c ON l.customer_id = c.id
WHERE s.is_active = FALSE
AND s.ended_at > NOW() - INTERVAL '24 hours'
ORDER BY {ended_sort_fields[ended_sort]} {ended_order.upper()}
LIMIT 50
""")
recent_sessions = cur.fetchall()
cur.close()
conn.close()
return render_template("sessions.html",
active_sessions=active_sessions,
recent_sessions=recent_sessions,
active_sort=active_sort,
active_order=active_order,
ended_sort=ended_sort,
ended_order=ended_order,
username=session.get('username'))
@app.route("/session/end/<int:session_id>", methods=["POST"])
@login_required
def end_session(session_id):
conn = get_connection()
cur = conn.cursor()
# Session beenden
cur.execute("""
UPDATE sessions
SET is_active = FALSE, ended_at = NOW()
WHERE id = %s AND is_active = TRUE
""", (session_id,))
conn.commit()
cur.close()
conn.close()
return redirect("/sessions")
@app.route("/export/licenses")
@login_required
def export_licenses():
conn = get_connection()
cur = conn.cursor()
# Alle Lizenzen mit Kundeninformationen abrufen
cur.execute("""
SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email,
l.license_type, l.valid_from, l.valid_until, l.is_active,
CASE
WHEN l.is_active = FALSE THEN 'Deaktiviert'
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'Läuft bald ab'
ELSE 'Aktiv'
END as status
FROM licenses l
JOIN customers c ON l.customer_id = c.id
ORDER BY l.id
""")
# Spaltennamen
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ',
'Gültig von', 'Gültig bis', 'Aktiv', 'Status']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Gültig von'] = pd.to_datetime(df['Gültig von']).dt.strftime('%d.%m.%Y')
df['Gültig bis'] = pd.to_datetime(df['Gültig bis']).dt.strftime('%d.%m.%Y')
# Typ und Aktiv Status anpassen
df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'})
df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'})
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
# Audit-Log
log_audit('EXPORT', 'license',
additional_info=f"Export aller Lizenzen als {export_format.upper()}")
filename = f'lizenzen_export_{datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Lizenzen', index=False)
# Formatierung
worksheet = writer.sheets['Lizenzen']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
@app.route("/export/customers")
@login_required
def export_customers():
conn = get_connection()
cur = conn.cursor()
# Alle Kunden mit Lizenzstatistiken
cur.execute("""
SELECT c.id, c.name, c.email, c.created_at,
COUNT(l.id) as total_licenses,
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
FROM customers c
LEFT JOIN licenses l ON c.id = l.customer_id
GROUP BY c.id, c.name, c.email, c.created_at
ORDER BY c.id
""")
# Spaltennamen
columns = ['ID', 'Name', 'E-Mail', 'Erstellt am',
'Lizenzen gesamt', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
# Daten in DataFrame
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
# Datumsformatierung
df['Erstellt am'] = pd.to_datetime(df['Erstellt am']).dt.strftime('%d.%m.%Y %H:%M')
cur.close()
conn.close()
# Export Format
export_format = request.args.get('format', 'excel')
# Audit-Log
log_audit('EXPORT', 'customer',
additional_info=f"Export aller Kunden als {export_format.upper()}")
filename = f'kunden_export_{datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S")}'
if export_format == 'csv':
# CSV Export
output = io.StringIO()
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
output.seek(0)
return send_file(
io.BytesIO(output.getvalue().encode('utf-8-sig')),
mimetype='text/csv',
as_attachment=True,
download_name=f'{filename}.csv'
)
else:
# Excel Export
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Kunden', index=False)
# Formatierung
worksheet = writer.sheets['Kunden']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=f'{filename}.xlsx'
)
@app.route("/audit")
@login_required
def audit_log():
conn = get_connection()
cur = conn.cursor()
# Parameter
filter_user = request.args.get('user', '').strip()
filter_action = request.args.get('action', '').strip()
filter_entity = request.args.get('entity', '').strip()
page = request.args.get('page', 1, type=int)
sort = request.args.get('sort', 'timestamp')
order = request.args.get('order', 'desc')
per_page = 50
# Whitelist für erlaubte Sortierfelder
allowed_sort_fields = {
'timestamp': 'timestamp',
'username': 'username',
'action': 'action',
'entity': 'entity_type',
'ip': 'ip_address'
}
# Validierung
if sort not in allowed_sort_fields:
sort = 'timestamp'
if order not in ['asc', 'desc']:
order = 'desc'
sort_field = allowed_sort_fields[sort]
# SQL Query mit optionalen Filtern
query = """
SELECT id, timestamp, username, action, entity_type, entity_id,
old_values, new_values, ip_address, user_agent, additional_info
FROM audit_log
WHERE 1=1
"""
params = []
# Filter
if filter_user:
query += " AND LOWER(username) LIKE LOWER(%s)"
params.append(f'%{filter_user}%')
if filter_action:
query += " AND action = %s"
params.append(filter_action)
if filter_entity:
query += " AND entity_type = %s"
params.append(filter_entity)
# Gesamtanzahl für Pagination
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Pagination
offset = (page - 1) * per_page
query += f" ORDER BY {sort_field} {order.upper()} LIMIT %s OFFSET %s"
params.extend([per_page, offset])
cur.execute(query, params)
logs = cur.fetchall()
# JSON-Werte parsen
parsed_logs = []
for log in logs:
parsed_log = list(log)
# old_values und new_values sind bereits Dictionaries (JSONB)
# Keine Konvertierung nötig
parsed_logs.append(parsed_log)
# Pagination Info
total_pages = (total + per_page - 1) // per_page
cur.close()
conn.close()
return render_template("audit_log.html",
logs=parsed_logs,
filter_user=filter_user,
filter_action=filter_action,
filter_entity=filter_entity,
page=page,
total_pages=total_pages,
total=total,
sort=sort,
order=order,
username=session.get('username'))
@app.route("/backups")
@login_required
def backups():
"""Zeigt die Backup-Historie an"""
conn = get_connection()
cur = conn.cursor()
# Letztes erfolgreiches Backup für Dashboard
cur.execute("""
SELECT created_at, filesize, duration_seconds
FROM backup_history
WHERE status = 'success'
ORDER BY created_at DESC
LIMIT 1
""")
last_backup = cur.fetchone()
# Alle Backups abrufen
cur.execute("""
SELECT id, filename, filesize, backup_type, status, error_message,
created_at, created_by, tables_count, records_count,
duration_seconds, is_encrypted
FROM backup_history
ORDER BY created_at DESC
""")
backups = cur.fetchall()
cur.close()
conn.close()
return render_template("backups.html",
backups=backups,
last_backup=last_backup,
username=session.get('username'))
@app.route("/backup/create", methods=["POST"])
@login_required
def create_backup_route():
"""Erstellt ein manuelles Backup"""
username = session.get('username')
success, result = create_backup(backup_type="manual", created_by=username)
if success:
return jsonify({
'success': True,
'message': f'Backup erfolgreich erstellt: {result}'
})
else:
return jsonify({
'success': False,
'message': f'Backup fehlgeschlagen: {result}'
}), 500
@app.route("/backup/restore/<int:backup_id>", methods=["POST"])
@login_required
def restore_backup_route(backup_id):
"""Stellt ein Backup wieder her"""
encryption_key = request.form.get('encryption_key')
success, message = restore_backup(backup_id, encryption_key)
if success:
return jsonify({
'success': True,
'message': message
})
else:
return jsonify({
'success': False,
'message': message
}), 500
@app.route("/backup/download/<int:backup_id>")
@login_required
def download_backup(backup_id):
"""Lädt eine Backup-Datei herunter"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT filename, filepath
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
cur.close()
conn.close()
if not backup_info:
return "Backup nicht gefunden", 404
filename, filepath = backup_info
filepath = Path(filepath)
if not filepath.exists():
return "Backup-Datei nicht gefunden", 404
# Audit-Log
log_audit('DOWNLOAD', 'backup', backup_id,
additional_info=f"Backup heruntergeladen: {filename}")
return send_file(filepath, as_attachment=True, download_name=filename)
@app.route("/security/blocked-ips")
@login_required
def blocked_ips():
"""Zeigt alle gesperrten IPs an"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
ip_address,
attempt_count,
first_attempt,
last_attempt,
blocked_until,
last_username_tried,
last_error_message
FROM login_attempts
WHERE blocked_until IS NOT NULL
ORDER BY blocked_until DESC
""")
blocked_ips_list = []
for ip in cur.fetchall():
blocked_ips_list.append({
'ip_address': ip[0],
'attempt_count': ip[1],
'first_attempt': ip[2].strftime('%d.%m.%Y %H:%M'),
'last_attempt': ip[3].strftime('%d.%m.%Y %H:%M'),
'blocked_until': ip[4].strftime('%d.%m.%Y %H:%M'),
'is_active': ip[4] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None),
'last_username': ip[5],
'last_error': ip[6]
})
cur.close()
conn.close()
return render_template("blocked_ips.html",
blocked_ips=blocked_ips_list,
username=session.get('username'))
@app.route("/security/unblock-ip", methods=["POST"])
@login_required
def unblock_ip():
"""Entsperrt eine IP-Adresse"""
ip_address = request.form.get('ip_address')
if ip_address:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
UPDATE login_attempts
SET blocked_until = NULL
WHERE ip_address = %s
""", (ip_address,))
conn.commit()
cur.close()
conn.close()
# Audit-Log
log_audit('UNBLOCK_IP', 'security',
additional_info=f"IP {ip_address} manuell entsperrt")
return redirect(url_for('blocked_ips'))
@app.route("/security/clear-attempts", methods=["POST"])
@login_required
def clear_attempts():
"""Löscht alle Login-Versuche für eine IP"""
ip_address = request.form.get('ip_address')
if ip_address:
reset_login_attempts(ip_address)
# Audit-Log
log_audit('CLEAR_ATTEMPTS', 'security',
additional_info=f"Login-Versuche für IP {ip_address} zurückgesetzt")
return redirect(url_for('blocked_ips'))
# API Endpoints for License Management
@app.route("/api/license/<int:license_id>/toggle", methods=["POST"])
@login_required
def toggle_license_api(license_id):
"""Toggle license active status via API"""
try:
data = request.get_json()
is_active = data.get('is_active', False)
conn = get_connection()
cur = conn.cursor()
# Update license status
cur.execute("""
UPDATE licenses
SET is_active = %s
WHERE id = %s
""", (is_active, license_id))
conn.commit()
# Log the action
log_audit('UPDATE', 'license', license_id,
new_values={'is_active': is_active},
additional_info=f"Lizenz {'aktiviert' if is_active else 'deaktiviert'} via Toggle")
cur.close()
conn.close()
return jsonify({'success': True, 'message': 'Status erfolgreich geändert'})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
@app.route("/api/licenses/bulk-activate", methods=["POST"])
@login_required
def bulk_activate_licenses():
"""Activate multiple licenses at once"""
try:
data = request.get_json()
license_ids = data.get('ids', [])
if not license_ids:
return jsonify({'success': False, 'message': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET is_active = TRUE
WHERE id = ANY(%s)
""", (license_ids,))
affected_rows = cur.rowcount
conn.commit()
# Log the bulk action
log_audit('BULK_UPDATE', 'licenses', None,
new_values={'is_active': True, 'count': affected_rows},
additional_info=f"{affected_rows} Lizenzen aktiviert")
cur.close()
conn.close()
return jsonify({'success': True, 'message': f'{affected_rows} Lizenzen aktiviert'})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
@app.route("/api/licenses/bulk-deactivate", methods=["POST"])
@login_required
def bulk_deactivate_licenses():
"""Deactivate multiple licenses at once"""
try:
data = request.get_json()
license_ids = data.get('ids', [])
if not license_ids:
return jsonify({'success': False, 'message': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
# Update all selected licenses
cur.execute("""
UPDATE licenses
SET is_active = FALSE
WHERE id = ANY(%s)
""", (license_ids,))
affected_rows = cur.rowcount
conn.commit()
# Log the bulk action
log_audit('BULK_UPDATE', 'licenses', None,
new_values={'is_active': False, 'count': affected_rows},
additional_info=f"{affected_rows} Lizenzen deaktiviert")
cur.close()
conn.close()
return jsonify({'success': True, 'message': f'{affected_rows} Lizenzen deaktiviert'})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
@app.route("/api/licenses/bulk-delete", methods=["POST"])
@login_required
def bulk_delete_licenses():
"""Delete multiple licenses at once"""
try:
data = request.get_json()
license_ids = data.get('ids', [])
if not license_ids:
return jsonify({'success': False, 'message': 'Keine Lizenzen ausgewählt'}), 400
conn = get_connection()
cur = conn.cursor()
# Get license info for audit log
cur.execute("""
SELECT license_key
FROM licenses
WHERE id = ANY(%s)
""", (license_ids,))
license_keys = [row[0] for row in cur.fetchall()]
# Delete all selected licenses
cur.execute("""
DELETE FROM licenses
WHERE id = ANY(%s)
""", (license_ids,))
affected_rows = cur.rowcount
conn.commit()
# Log the bulk action
log_audit('BULK_DELETE', 'licenses', None,
old_values={'license_keys': license_keys, 'count': affected_rows},
additional_info=f"{affected_rows} Lizenzen gelöscht")
cur.close()
conn.close()
return jsonify({'success': True, 'message': f'{affected_rows} Lizenzen gelöscht'})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)