import os import psycopg2 from psycopg2.extras import Json from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify, flash from flask_session import Session from functools import wraps from dotenv import load_dotenv import pandas as pd from datetime import datetime, timedelta import io import subprocess import gzip from cryptography.fernet import Fernet from pathlib import Path import time from apscheduler.schedulers.background import BackgroundScheduler import logging import random import hashlib load_dotenv() app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(24) app.config['SESSION_TYPE'] = 'filesystem' app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8 app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8' app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5) # 5 Minuten Session-Timeout app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SECURE'] = False # Wird auf True gesetzt wenn HTTPS (intern läuft HTTP) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' app.config['SESSION_COOKIE_NAME'] = 'admin_session' # WICHTIG: Session-Cookie soll auch nach 5 Minuten ablaufen app.config['SESSION_REFRESH_EACH_REQUEST'] = False Session(app) # Backup-Konfiguration BACKUP_DIR = Path("/app/backups") BACKUP_DIR.mkdir(exist_ok=True) # Rate-Limiting Konfiguration FAIL_MESSAGES = [ "NOPE!", "ACCESS DENIED, TRY HARDER", "WRONG! 🚫", "COMPUTER SAYS NO", "YOU FAILED" ] MAX_LOGIN_ATTEMPTS = 5 BLOCK_DURATION_HOURS = 24 CAPTCHA_AFTER_ATTEMPTS = 2 # Scheduler für automatische Backups scheduler = BackgroundScheduler() scheduler.start() # Logging konfigurieren logging.basicConfig(level=logging.INFO) # Login decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'logged_in' not in session: return redirect(url_for('login')) # Prüfe ob Session abgelaufen ist if 'last_activity' in session: last_activity = datetime.fromisoformat(session['last_activity']) time_since_activity = datetime.now() - last_activity # Debug-Logging app.logger.info(f"Session check for {session.get('username', 'unknown')}: " f"Last activity: {last_activity}, " f"Time since: {time_since_activity.total_seconds()} seconds") if time_since_activity > timedelta(minutes=5): # Session abgelaufen - Logout username = session.get('username', 'unbekannt') app.logger.info(f"Session timeout for user {username} - auto logout") # Audit-Log für automatischen Logout (vor session.clear()!) try: log_audit('AUTO_LOGOUT', 'session', additional_info={'reason': 'Session timeout (5 minutes)', 'username': username}) except: pass session.clear() flash('Ihre Sitzung ist abgelaufen. Bitte melden Sie sich erneut an.', 'warning') return redirect(url_for('login')) # Aktivität NICHT automatisch aktualisieren # Nur bei expliziten Benutzeraktionen (wird vom Heartbeat gemacht) return f(*args, **kwargs) return decorated_function # DB-Verbindung mit UTF-8 Encoding def get_connection(): conn = psycopg2.connect( host=os.getenv("POSTGRES_HOST", "postgres"), port=os.getenv("POSTGRES_PORT", "5432"), dbname=os.getenv("POSTGRES_DB"), user=os.getenv("POSTGRES_USER"), password=os.getenv("POSTGRES_PASSWORD"), options='-c client_encoding=UTF8' ) conn.set_client_encoding('UTF8') return conn # Audit-Log-Funktion def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None): """Protokolliert Änderungen im Audit-Log""" conn = get_connection() cur = conn.cursor() try: username = session.get('username', 'system') ip_address = request.remote_addr if request else None user_agent = request.headers.get('User-Agent') if request else None # Konvertiere Dictionaries zu JSONB old_json = Json(old_values) if old_values else None new_json = Json(new_values) if new_values else None cur.execute(""" INSERT INTO audit_log (username, action, entity_type, entity_id, old_values, new_values, ip_address, user_agent, additional_info) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, (username, action, entity_type, entity_id, old_json, new_json, ip_address, user_agent, additional_info)) conn.commit() except Exception as e: print(f"Audit log error: {e}") conn.rollback() finally: cur.close() conn.close() # Verschlüsselungs-Funktionen def get_or_create_encryption_key(): """Holt oder erstellt einen Verschlüsselungsschlüssel""" key_file = BACKUP_DIR / ".backup_key" # Versuche Key aus Umgebungsvariable zu lesen env_key = os.getenv("BACKUP_ENCRYPTION_KEY") if env_key: try: # Validiere den Key Fernet(env_key.encode()) return env_key.encode() except: pass # Wenn kein gültiger Key in ENV, prüfe Datei if key_file.exists(): return key_file.read_bytes() # Erstelle neuen Key key = Fernet.generate_key() key_file.write_bytes(key) logging.info("Neuer Backup-Verschlüsselungsschlüssel erstellt") return key # Backup-Funktionen def create_backup(backup_type="manual", created_by=None): """Erstellt ein verschlüsseltes Backup der Datenbank""" start_time = time.time() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc" filepath = BACKUP_DIR / filename conn = get_connection() cur = conn.cursor() # Backup-Eintrag erstellen cur.execute(""" INSERT INTO backup_history (filename, filepath, backup_type, status, created_by, is_encrypted) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, (filename, str(filepath), backup_type, 'in_progress', created_by or 'system', True)) backup_id = cur.fetchone()[0] conn.commit() try: # PostgreSQL Dump erstellen dump_command = [ 'pg_dump', '-h', os.getenv("POSTGRES_HOST", "postgres"), '-p', os.getenv("POSTGRES_PORT", "5432"), '-U', os.getenv("POSTGRES_USER"), '-d', os.getenv("POSTGRES_DB"), '--no-password', '--verbose' ] # PGPASSWORD setzen env = os.environ.copy() env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD") # Dump ausführen result = subprocess.run(dump_command, capture_output=True, text=True, env=env) if result.returncode != 0: raise Exception(f"pg_dump failed: {result.stderr}") dump_data = result.stdout.encode('utf-8') # Komprimieren compressed_data = gzip.compress(dump_data) # Verschlüsseln key = get_or_create_encryption_key() f = Fernet(key) encrypted_data = f.encrypt(compressed_data) # Speichern filepath.write_bytes(encrypted_data) # Statistiken sammeln cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'") tables_count = cur.fetchone()[0] cur.execute(""" SELECT SUM(n_live_tup) FROM pg_stat_user_tables """) records_count = cur.fetchone()[0] or 0 duration = time.time() - start_time filesize = filepath.stat().st_size # Backup-Eintrag aktualisieren cur.execute(""" UPDATE backup_history SET status = %s, filesize = %s, tables_count = %s, records_count = %s, duration_seconds = %s WHERE id = %s """, ('success', filesize, tables_count, records_count, duration, backup_id)) conn.commit() # Audit-Log log_audit('BACKUP', 'database', backup_id, additional_info=f"Backup erstellt: {filename} ({filesize} bytes)") # E-Mail-Benachrichtigung (wenn konfiguriert) send_backup_notification(True, filename, filesize, duration) logging.info(f"Backup erfolgreich erstellt: {filename}") return True, filename except Exception as e: # Fehler protokollieren cur.execute(""" UPDATE backup_history SET status = %s, error_message = %s, duration_seconds = %s WHERE id = %s """, ('failed', str(e), time.time() - start_time, backup_id)) conn.commit() logging.error(f"Backup fehlgeschlagen: {e}") send_backup_notification(False, filename, error=str(e)) return False, str(e) finally: cur.close() conn.close() def restore_backup(backup_id, encryption_key=None): """Stellt ein Backup wieder her""" conn = get_connection() cur = conn.cursor() try: # Backup-Info abrufen cur.execute(""" SELECT filename, filepath, is_encrypted FROM backup_history WHERE id = %s """, (backup_id,)) backup_info = cur.fetchone() if not backup_info: raise Exception("Backup nicht gefunden") filename, filepath, is_encrypted = backup_info filepath = Path(filepath) if not filepath.exists(): raise Exception("Backup-Datei nicht gefunden") # Datei lesen encrypted_data = filepath.read_bytes() # Entschlüsseln if is_encrypted: key = encryption_key.encode() if encryption_key else get_or_create_encryption_key() try: f = Fernet(key) compressed_data = f.decrypt(encrypted_data) except: raise Exception("Entschlüsselung fehlgeschlagen. Falsches Passwort?") else: compressed_data = encrypted_data # Dekomprimieren dump_data = gzip.decompress(compressed_data) sql_commands = dump_data.decode('utf-8') # Bestehende Verbindungen schließen cur.close() conn.close() # Datenbank wiederherstellen restore_command = [ 'psql', '-h', os.getenv("POSTGRES_HOST", "postgres"), '-p', os.getenv("POSTGRES_PORT", "5432"), '-U', os.getenv("POSTGRES_USER"), '-d', os.getenv("POSTGRES_DB"), '--no-password' ] env = os.environ.copy() env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD") result = subprocess.run(restore_command, input=sql_commands, capture_output=True, text=True, env=env) if result.returncode != 0: raise Exception(f"Wiederherstellung fehlgeschlagen: {result.stderr}") # Audit-Log (neue Verbindung) log_audit('RESTORE', 'database', backup_id, additional_info=f"Backup wiederhergestellt: {filename}") return True, "Backup erfolgreich wiederhergestellt" except Exception as e: logging.error(f"Wiederherstellung fehlgeschlagen: {e}") return False, str(e) def send_backup_notification(success, filename, filesize=None, duration=None, error=None): """Sendet E-Mail-Benachrichtigung (wenn konfiguriert)""" if not os.getenv("EMAIL_ENABLED", "false").lower() == "true": return # E-Mail-Funktion vorbereitet aber deaktiviert # TODO: Implementieren wenn E-Mail-Server konfiguriert ist logging.info(f"E-Mail-Benachrichtigung vorbereitet: Backup {'erfolgreich' if success else 'fehlgeschlagen'}") # Scheduled Backup Job def scheduled_backup(): """Führt ein geplantes Backup aus""" logging.info("Starte geplantes Backup...") create_backup(backup_type="scheduled", created_by="scheduler") # Scheduler konfigurieren - täglich um 3:00 Uhr scheduler.add_job( scheduled_backup, 'cron', hour=3, minute=0, id='daily_backup', replace_existing=True ) # Rate-Limiting Funktionen def get_client_ip(): """Ermittelt die echte IP-Adresse des Clients""" if request.environ.get('HTTP_X_FORWARDED_FOR'): return request.environ['HTTP_X_FORWARDED_FOR'].split(',')[0] elif request.environ.get('HTTP_X_REAL_IP'): return request.environ.get('HTTP_X_REAL_IP') else: return request.environ.get('REMOTE_ADDR') def check_ip_blocked(ip_address): """Prüft ob eine IP-Adresse gesperrt ist""" conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT blocked_until FROM login_attempts WHERE ip_address = %s AND blocked_until IS NOT NULL """, (ip_address,)) result = cur.fetchone() cur.close() conn.close() if result and result[0]: if result[0] > datetime.now(): return True, result[0] return False, None def record_failed_attempt(ip_address, username): """Zeichnet einen fehlgeschlagenen Login-Versuch auf""" conn = get_connection() cur = conn.cursor() # Random Fehlermeldung error_message = random.choice(FAIL_MESSAGES) try: # Prüfen ob IP bereits existiert cur.execute(""" SELECT attempt_count FROM login_attempts WHERE ip_address = %s """, (ip_address,)) result = cur.fetchone() if result: # Update bestehenden Eintrag new_count = result[0] + 1 blocked_until = None if new_count >= MAX_LOGIN_ATTEMPTS: blocked_until = datetime.now() + timedelta(hours=BLOCK_DURATION_HOURS) # E-Mail-Benachrichtigung (wenn aktiviert) if os.getenv("EMAIL_ENABLED", "false").lower() == "true": send_security_alert_email(ip_address, username, new_count) cur.execute(""" UPDATE login_attempts SET attempt_count = %s, last_attempt = CURRENT_TIMESTAMP, blocked_until = %s, last_username_tried = %s, last_error_message = %s WHERE ip_address = %s """, (new_count, blocked_until, username, error_message, ip_address)) else: # Neuen Eintrag erstellen cur.execute(""" INSERT INTO login_attempts (ip_address, attempt_count, last_username_tried, last_error_message) VALUES (%s, 1, %s, %s) """, (ip_address, username, error_message)) conn.commit() # Audit-Log log_audit('LOGIN_FAILED', 'user', additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}") except Exception as e: print(f"Rate limiting error: {e}") conn.rollback() finally: cur.close() conn.close() return error_message def reset_login_attempts(ip_address): """Setzt die Login-Versuche für eine IP zurück""" conn = get_connection() cur = conn.cursor() try: cur.execute(""" DELETE FROM login_attempts WHERE ip_address = %s """, (ip_address,)) conn.commit() except Exception as e: print(f"Reset attempts error: {e}") conn.rollback() finally: cur.close() conn.close() def get_login_attempts(ip_address): """Gibt die Anzahl der Login-Versuche für eine IP zurück""" conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT attempt_count FROM login_attempts WHERE ip_address = %s """, (ip_address,)) result = cur.fetchone() cur.close() conn.close() return result[0] if result else 0 def send_security_alert_email(ip_address, username, attempt_count): """Sendet eine Sicherheitswarnung per E-Mail""" subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche" body = f""" WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt! IP-Adresse: {ip_address} Versuchter Benutzername: {username} Anzahl Versuche: {attempt_count} Zeit: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Die IP-Adresse wurde für 24 Stunden gesperrt. Dies ist eine automatische Nachricht vom v2-Docker Admin Panel. """ # TODO: E-Mail-Versand implementieren wenn SMTP konfiguriert logging.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}") print(f"E-Mail würde gesendet: {subject}") @app.route("/login", methods=["GET", "POST"]) def login(): # Timing-Attack Schutz - Start Zeit merken start_time = time.time() # IP-Adresse ermitteln ip_address = get_client_ip() # Prüfen ob IP gesperrt ist is_blocked, blocked_until = check_ip_blocked(ip_address) if is_blocked: time_remaining = (blocked_until - datetime.now()).total_seconds() / 3600 error_msg = f"IP GESPERRT! Noch {time_remaining:.1f} Stunden warten." return render_template("login.html", error=error_msg, error_type="blocked") # Anzahl bisheriger Versuche attempt_count = get_login_attempts(ip_address) if request.method == "POST": username = request.form.get("username") password = request.form.get("password") captcha_response = request.form.get("g-recaptcha-response") # CAPTCHA-Prüfung wenn nötig if attempt_count >= CAPTCHA_AFTER_ATTEMPTS: if not captcha_response: # Timing-Attack Schutz elapsed = time.time() - start_time if elapsed < 1.0: time.sleep(1.0 - elapsed) return render_template("login.html", error="CAPTCHA ERFORDERLICH!", show_captcha=True, error_type="captcha") # Check gegen beide Admin-Accounts aus .env admin1_user = os.getenv("ADMIN1_USERNAME") admin1_pass = os.getenv("ADMIN1_PASSWORD") admin2_user = os.getenv("ADMIN2_USERNAME") admin2_pass = os.getenv("ADMIN2_PASSWORD") # Login-Prüfung login_success = False if ((username == admin1_user and password == admin1_pass) or (username == admin2_user and password == admin2_pass)): login_success = True # Timing-Attack Schutz - Mindestens 1 Sekunde warten elapsed = time.time() - start_time if elapsed < 1.0: time.sleep(1.0 - elapsed) if login_success: # Erfolgreicher Login session.permanent = True # Aktiviert das Timeout session['logged_in'] = True session['username'] = username session['last_activity'] = datetime.now().isoformat() reset_login_attempts(ip_address) log_audit('LOGIN_SUCCESS', 'user', additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}") return redirect(url_for('dashboard')) else: # Fehlgeschlagener Login error_message = record_failed_attempt(ip_address, username) new_attempt_count = get_login_attempts(ip_address) # Prüfen ob jetzt gesperrt is_now_blocked, _ = check_ip_blocked(ip_address) if is_now_blocked: log_audit('LOGIN_BLOCKED', 'security', additional_info=f"IP {ip_address} wurde nach {MAX_LOGIN_ATTEMPTS} Versuchen gesperrt") return render_template("login.html", error=error_message, show_captcha=(new_attempt_count >= CAPTCHA_AFTER_ATTEMPTS), error_type="failed", attempts_left=max(0, MAX_LOGIN_ATTEMPTS - new_attempt_count)) # GET Request return render_template("login.html", show_captcha=(attempt_count >= CAPTCHA_AFTER_ATTEMPTS), attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count)) @app.route("/logout") def logout(): username = session.get('username', 'unknown') log_audit('LOGOUT', 'user', additional_info=f"Abmeldung") session.pop('logged_in', None) session.pop('username', None) return redirect(url_for('login')) @app.route("/heartbeat", methods=['POST']) @login_required def heartbeat(): """Endpoint für Session Keep-Alive - aktualisiert last_activity""" # Aktualisiere last_activity nur wenn explizit angefordert session['last_activity'] = datetime.now().isoformat() # Force session save session.modified = True return jsonify({ 'status': 'ok', 'last_activity': session['last_activity'], 'username': session.get('username') }) @app.route("/") @login_required def dashboard(): conn = get_connection() cur = conn.cursor() # Statistiken abrufen # Gesamtanzahl Kunden cur.execute("SELECT COUNT(*) FROM customers") total_customers = cur.fetchone()[0] # Gesamtanzahl Lizenzen cur.execute("SELECT COUNT(*) FROM licenses") total_licenses = cur.fetchone()[0] # Aktive Lizenzen (nicht abgelaufen und is_active = true) cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until >= CURRENT_DATE AND is_active = TRUE """) active_licenses = cur.fetchone()[0] # Aktive Sessions cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE") active_sessions_count = cur.fetchone()[0] # Abgelaufene Lizenzen cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until < CURRENT_DATE """) expired_licenses = cur.fetchone()[0] # Lizenzen die in den nächsten 30 Tagen ablaufen cur.execute(""" SELECT COUNT(*) FROM licenses WHERE valid_until >= CURRENT_DATE AND valid_until < CURRENT_DATE + INTERVAL '30 days' AND is_active = TRUE """) expiring_soon = cur.fetchone()[0] # Testlizenzen vs Vollversionen cur.execute(""" SELECT license_type, COUNT(*) FROM licenses GROUP BY license_type """) license_types = dict(cur.fetchall()) # Letzte 5 erstellten Lizenzen cur.execute(""" SELECT l.id, l.license_key, c.name, l.valid_until, CASE WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen' WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab' ELSE 'aktiv' END as status FROM licenses l JOIN customers c ON l.customer_id = c.id ORDER BY l.id DESC LIMIT 5 """) recent_licenses = cur.fetchall() # Bald ablaufende Lizenzen (nächste 30 Tage) cur.execute(""" SELECT l.id, l.license_key, c.name, l.valid_until, l.valid_until - CURRENT_DATE as days_left FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE ORDER BY l.valid_until LIMIT 10 """) expiring_licenses = cur.fetchall() # Letztes Backup cur.execute(""" SELECT created_at, filesize, duration_seconds, backup_type, status FROM backup_history ORDER BY created_at DESC LIMIT 1 """) last_backup_info = cur.fetchone() # Sicherheitsstatistiken # Gesperrte IPs cur.execute(""" SELECT COUNT(*) FROM login_attempts WHERE blocked_until IS NOT NULL AND blocked_until > CURRENT_TIMESTAMP """) blocked_ips_count = cur.fetchone()[0] # Fehlversuche heute cur.execute(""" SELECT COALESCE(SUM(attempt_count), 0) FROM login_attempts WHERE last_attempt::date = CURRENT_DATE """) failed_attempts_today = cur.fetchone()[0] # Letzte 5 Sicherheitsereignisse cur.execute(""" SELECT la.ip_address, la.attempt_count, la.last_attempt, la.blocked_until, la.last_username_tried, la.last_error_message FROM login_attempts la ORDER BY la.last_attempt DESC LIMIT 5 """) recent_security_events = [] for event in cur.fetchall(): recent_security_events.append({ 'ip_address': event[0], 'attempt_count': event[1], 'last_attempt': event[2].strftime('%d.%m %H:%M'), 'blocked_until': event[3].strftime('%d.%m %H:%M') if event[3] and event[3] > datetime.now() else None, 'username_tried': event[4], 'error_message': event[5] }) # Sicherheitslevel berechnen if blocked_ips_count > 5 or failed_attempts_today > 50: security_level = 'danger' security_level_text = 'KRITISCH' elif blocked_ips_count > 2 or failed_attempts_today > 20: security_level = 'warning' security_level_text = 'ERHÖHT' else: security_level = 'success' security_level_text = 'NORMAL' cur.close() conn.close() stats = { 'total_customers': total_customers, 'total_licenses': total_licenses, 'active_licenses': active_licenses, 'expired_licenses': expired_licenses, 'expiring_soon': expiring_soon, 'full_licenses': license_types.get('full', 0), 'test_licenses': license_types.get('test', 0), 'recent_licenses': recent_licenses, 'expiring_licenses': expiring_licenses, 'active_sessions': active_sessions_count, 'last_backup': last_backup_info, # Sicherheitsstatistiken 'blocked_ips_count': blocked_ips_count, 'failed_attempts_today': failed_attempts_today, 'recent_security_events': recent_security_events, 'security_level': security_level, 'security_level_text': security_level_text } return render_template("dashboard.html", stats=stats, username=session.get('username')) @app.route("/create", methods=["GET", "POST"]) @login_required def create_license(): if request.method == "POST": name = request.form["customer_name"] email = request.form["email"] license_key = request.form["license_key"] license_type = request.form["license_type"] valid_from = request.form["valid_from"] valid_until = request.form["valid_until"] conn = get_connection() cur = conn.cursor() # Kunde einfügen (falls nicht vorhanden) cur.execute(""" INSERT INTO customers (name, email, created_at) VALUES (%s, %s, NOW()) RETURNING id """, (name, email)) customer_id = cur.fetchone()[0] # Lizenz hinzufügen cur.execute(""" INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active) VALUES (%s, %s, %s, %s, %s, TRUE) RETURNING id """, (license_key, customer_id, license_type, valid_from, valid_until)) license_id = cur.fetchone()[0] conn.commit() # Audit-Log log_audit('CREATE', 'license', license_id, new_values={ 'license_key': license_key, 'customer_name': name, 'customer_email': email, 'license_type': license_type, 'valid_from': valid_from, 'valid_until': valid_until }) cur.close() conn.close() return redirect("/create") return render_template("index.html", username=session.get('username')) @app.route("/licenses") @login_required def licenses(): conn = get_connection() cur = conn.cursor() # Parameter search = request.args.get('search', '').strip() filter_type = request.args.get('type', '') filter_status = request.args.get('status', '') page = request.args.get('page', 1, type=int) per_page = 20 # SQL Query mit optionaler Suche und Filtern query = """ SELECT l.id, l.license_key, c.name, c.email, l.license_type, l.valid_from, l.valid_until, l.is_active, CASE WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen' WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab' ELSE 'aktiv' END as status FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE 1=1 """ params = [] # Suchfilter if search: query += """ AND (LOWER(l.license_key) LIKE LOWER(%s) OR LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)) """ search_param = f'%{search}%' params.extend([search_param, search_param, search_param]) # Typ-Filter if filter_type: query += " AND l.license_type = %s" params.append(filter_type) # Status-Filter if filter_status == 'active': query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE" elif filter_status == 'expiring': query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE" elif filter_status == 'expired': query += " AND l.valid_until < CURRENT_DATE" elif filter_status == 'inactive': query += " AND l.is_active = FALSE" # Gesamtanzahl für Pagination count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table" cur.execute(count_query, params) total = cur.fetchone()[0] # Pagination offset = (page - 1) * per_page query += " ORDER BY l.valid_until DESC LIMIT %s OFFSET %s" params.extend([per_page, offset]) cur.execute(query, params) licenses = cur.fetchall() # Pagination Info total_pages = (total + per_page - 1) // per_page cur.close() conn.close() return render_template("licenses.html", licenses=licenses, search=search, filter_type=filter_type, filter_status=filter_status, page=page, total_pages=total_pages, total=total, username=session.get('username')) @app.route("/license/edit/", methods=["GET", "POST"]) @login_required def edit_license(license_id): conn = get_connection() cur = conn.cursor() if request.method == "POST": # Alte Werte für Audit-Log abrufen cur.execute(""" SELECT license_key, license_type, valid_from, valid_until, is_active FROM licenses WHERE id = %s """, (license_id,)) old_license = cur.fetchone() # Update license license_key = request.form["license_key"] license_type = request.form["license_type"] valid_from = request.form["valid_from"] valid_until = request.form["valid_until"] is_active = request.form.get("is_active") == "on" cur.execute(""" UPDATE licenses SET license_key = %s, license_type = %s, valid_from = %s, valid_until = %s, is_active = %s WHERE id = %s """, (license_key, license_type, valid_from, valid_until, is_active, license_id)) conn.commit() # Audit-Log log_audit('UPDATE', 'license', license_id, old_values={ 'license_key': old_license[0], 'license_type': old_license[1], 'valid_from': str(old_license[2]), 'valid_until': str(old_license[3]), 'is_active': old_license[4] }, new_values={ 'license_key': license_key, 'license_type': license_type, 'valid_from': valid_from, 'valid_until': valid_until, 'is_active': is_active }) cur.close() conn.close() return redirect("/licenses") # Get license data cur.execute(""" SELECT l.id, l.license_key, c.name, c.email, l.license_type, l.valid_from, l.valid_until, l.is_active, c.id FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE l.id = %s """, (license_id,)) license = cur.fetchone() cur.close() conn.close() if not license: return redirect("/licenses") return render_template("edit_license.html", license=license, username=session.get('username')) @app.route("/license/delete/", methods=["POST"]) @login_required def delete_license(license_id): conn = get_connection() cur = conn.cursor() # Lizenzdetails für Audit-Log abrufen cur.execute(""" SELECT l.license_key, c.name, l.license_type FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE l.id = %s """, (license_id,)) license_info = cur.fetchone() cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,)) conn.commit() # Audit-Log if license_info: log_audit('DELETE', 'license', license_id, old_values={ 'license_key': license_info[0], 'customer_name': license_info[1], 'license_type': license_info[2] }) cur.close() conn.close() return redirect("/licenses") @app.route("/customers") @login_required def customers(): conn = get_connection() cur = conn.cursor() # Parameter search = request.args.get('search', '').strip() page = request.args.get('page', 1, type=int) per_page = 20 # SQL Query mit optionaler Suche base_query = """ SELECT c.id, c.name, c.email, c.created_at, COUNT(l.id) as license_count, COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses FROM customers c LEFT JOIN licenses l ON c.id = l.customer_id """ params = [] if search: base_query += """ WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s) """ search_param = f'%{search}%' params.extend([search_param, search_param]) # Gesamtanzahl für Pagination count_query = f""" SELECT COUNT(DISTINCT c.id) FROM customers c LEFT JOIN licenses l ON c.id = l.customer_id {"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""} """ if search: cur.execute(count_query, params) else: cur.execute(count_query) total = cur.fetchone()[0] # Pagination offset = (page - 1) * per_page query = base_query + """ GROUP BY c.id, c.name, c.email, c.created_at ORDER BY c.created_at DESC LIMIT %s OFFSET %s """ params.extend([per_page, offset]) cur.execute(query, params) customers = cur.fetchall() # Pagination Info total_pages = (total + per_page - 1) // per_page cur.close() conn.close() return render_template("customers.html", customers=customers, search=search, page=page, total_pages=total_pages, total=total, username=session.get('username')) @app.route("/customer/edit/", methods=["GET", "POST"]) @login_required def edit_customer(customer_id): conn = get_connection() cur = conn.cursor() if request.method == "POST": # Alte Werte für Audit-Log abrufen cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,)) old_customer = cur.fetchone() # Update customer name = request.form["name"] email = request.form["email"] cur.execute(""" UPDATE customers SET name = %s, email = %s WHERE id = %s """, (name, email, customer_id)) conn.commit() # Audit-Log log_audit('UPDATE', 'customer', customer_id, old_values={ 'name': old_customer[0], 'email': old_customer[1] }, new_values={ 'name': name, 'email': email }) cur.close() conn.close() return redirect("/customers") # Get customer data with licenses cur.execute(""" SELECT id, name, email, created_at FROM customers WHERE id = %s """, (customer_id,)) customer = cur.fetchone() # Get customer's licenses cur.execute(""" SELECT id, license_key, license_type, valid_from, valid_until, is_active FROM licenses WHERE customer_id = %s ORDER BY valid_until DESC """, (customer_id,)) licenses = cur.fetchall() cur.close() conn.close() if not customer: return redirect("/customers") return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username')) @app.route("/customer/delete/", methods=["POST"]) @login_required def delete_customer(customer_id): conn = get_connection() cur = conn.cursor() # Prüfen ob Kunde Lizenzen hat cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,)) license_count = cur.fetchone()[0] if license_count > 0: # Kunde hat Lizenzen - nicht löschen cur.close() conn.close() return redirect("/customers") # Kundendetails für Audit-Log abrufen cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,)) customer_info = cur.fetchone() # Kunde löschen wenn keine Lizenzen vorhanden cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,)) conn.commit() # Audit-Log if customer_info: log_audit('DELETE', 'customer', customer_id, old_values={ 'name': customer_info[0], 'email': customer_info[1] }) cur.close() conn.close() return redirect("/customers") @app.route("/sessions") @login_required def sessions(): conn = get_connection() cur = conn.cursor() # Aktive Sessions abrufen cur.execute(""" SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address, s.user_agent, s.started_at, s.last_heartbeat, EXTRACT(EPOCH FROM (NOW() - s.last_heartbeat))/60 as minutes_inactive FROM sessions s JOIN licenses l ON s.license_id = l.id JOIN customers c ON l.customer_id = c.id WHERE s.is_active = TRUE ORDER BY s.last_heartbeat DESC """) active_sessions = cur.fetchall() # Inaktive Sessions der letzten 24 Stunden cur.execute(""" SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address, s.started_at, s.ended_at, EXTRACT(EPOCH FROM (s.ended_at - s.started_at))/60 as duration_minutes FROM sessions s JOIN licenses l ON s.license_id = l.id JOIN customers c ON l.customer_id = c.id WHERE s.is_active = FALSE AND s.ended_at > NOW() - INTERVAL '24 hours' ORDER BY s.ended_at DESC LIMIT 50 """) recent_sessions = cur.fetchall() cur.close() conn.close() return render_template("sessions.html", active_sessions=active_sessions, recent_sessions=recent_sessions, username=session.get('username')) @app.route("/session/end/", methods=["POST"]) @login_required def end_session(session_id): conn = get_connection() cur = conn.cursor() # Session beenden cur.execute(""" UPDATE sessions SET is_active = FALSE, ended_at = NOW() WHERE id = %s AND is_active = TRUE """, (session_id,)) conn.commit() cur.close() conn.close() return redirect("/sessions") @app.route("/export/licenses") @login_required def export_licenses(): conn = get_connection() cur = conn.cursor() # Alle Lizenzen mit Kundeninformationen abrufen cur.execute(""" SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email, l.license_type, l.valid_from, l.valid_until, l.is_active, CASE WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen' WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'Läuft bald ab' ELSE 'Aktiv' END as status FROM licenses l JOIN customers c ON l.customer_id = c.id ORDER BY l.id """) # Spaltennamen columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ', 'Gültig von', 'Gültig bis', 'Aktiv', 'Status'] # Daten in DataFrame data = cur.fetchall() df = pd.DataFrame(data, columns=columns) # Datumsformatierung df['Gültig von'] = pd.to_datetime(df['Gültig von']).dt.strftime('%d.%m.%Y') df['Gültig bis'] = pd.to_datetime(df['Gültig bis']).dt.strftime('%d.%m.%Y') # Typ und Aktiv Status anpassen df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'}) df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'}) cur.close() conn.close() # Export Format export_format = request.args.get('format', 'excel') # Audit-Log log_audit('EXPORT', 'license', additional_info=f"Export aller Lizenzen als {export_format.upper()}") filename = f'lizenzen_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}' if export_format == 'csv': # CSV Export output = io.StringIO() df.to_csv(output, index=False, encoding='utf-8-sig', sep=';') output.seek(0) return send_file( io.BytesIO(output.getvalue().encode('utf-8-sig')), mimetype='text/csv', as_attachment=True, download_name=f'{filename}.csv' ) else: # Excel Export output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='Lizenzen', index=False) # Formatierung worksheet = writer.sheets['Lizenzen'] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width output.seek(0) return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f'{filename}.xlsx' ) @app.route("/export/customers") @login_required def export_customers(): conn = get_connection() cur = conn.cursor() # Alle Kunden mit Lizenzstatistiken cur.execute(""" SELECT c.id, c.name, c.email, c.created_at, COUNT(l.id) as total_licenses, COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses, COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses FROM customers c LEFT JOIN licenses l ON c.id = l.customer_id GROUP BY c.id, c.name, c.email, c.created_at ORDER BY c.id """) # Spaltennamen columns = ['ID', 'Name', 'E-Mail', 'Erstellt am', 'Lizenzen gesamt', 'Aktive Lizenzen', 'Abgelaufene Lizenzen'] # Daten in DataFrame data = cur.fetchall() df = pd.DataFrame(data, columns=columns) # Datumsformatierung df['Erstellt am'] = pd.to_datetime(df['Erstellt am']).dt.strftime('%d.%m.%Y %H:%M') cur.close() conn.close() # Export Format export_format = request.args.get('format', 'excel') # Audit-Log log_audit('EXPORT', 'customer', additional_info=f"Export aller Kunden als {export_format.upper()}") filename = f'kunden_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}' if export_format == 'csv': # CSV Export output = io.StringIO() df.to_csv(output, index=False, encoding='utf-8-sig', sep=';') output.seek(0) return send_file( io.BytesIO(output.getvalue().encode('utf-8-sig')), mimetype='text/csv', as_attachment=True, download_name=f'{filename}.csv' ) else: # Excel Export output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='Kunden', index=False) # Formatierung worksheet = writer.sheets['Kunden'] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width output.seek(0) return send_file( output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name=f'{filename}.xlsx' ) @app.route("/audit") @login_required def audit_log(): conn = get_connection() cur = conn.cursor() # Parameter filter_user = request.args.get('user', '').strip() filter_action = request.args.get('action', '').strip() filter_entity = request.args.get('entity', '').strip() page = request.args.get('page', 1, type=int) per_page = 50 # SQL Query mit optionalen Filtern query = """ SELECT id, timestamp, username, action, entity_type, entity_id, old_values, new_values, ip_address, user_agent, additional_info FROM audit_log WHERE 1=1 """ params = [] # Filter if filter_user: query += " AND LOWER(username) LIKE LOWER(%s)" params.append(f'%{filter_user}%') if filter_action: query += " AND action = %s" params.append(filter_action) if filter_entity: query += " AND entity_type = %s" params.append(filter_entity) # Gesamtanzahl für Pagination count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table" cur.execute(count_query, params) total = cur.fetchone()[0] # Pagination offset = (page - 1) * per_page query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" params.extend([per_page, offset]) cur.execute(query, params) logs = cur.fetchall() # JSON-Werte parsen parsed_logs = [] for log in logs: parsed_log = list(log) # old_values und new_values sind bereits Dictionaries (JSONB) # Keine Konvertierung nötig parsed_logs.append(parsed_log) # Pagination Info total_pages = (total + per_page - 1) // per_page cur.close() conn.close() return render_template("audit_log.html", logs=parsed_logs, filter_user=filter_user, filter_action=filter_action, filter_entity=filter_entity, page=page, total_pages=total_pages, total=total, username=session.get('username')) @app.route("/backups") @login_required def backups(): """Zeigt die Backup-Historie an""" conn = get_connection() cur = conn.cursor() # Letztes erfolgreiches Backup für Dashboard cur.execute(""" SELECT created_at, filesize, duration_seconds FROM backup_history WHERE status = 'success' ORDER BY created_at DESC LIMIT 1 """) last_backup = cur.fetchone() # Alle Backups abrufen cur.execute(""" SELECT id, filename, filesize, backup_type, status, error_message, created_at, created_by, tables_count, records_count, duration_seconds, is_encrypted FROM backup_history ORDER BY created_at DESC """) backups = cur.fetchall() cur.close() conn.close() return render_template("backups.html", backups=backups, last_backup=last_backup, username=session.get('username')) @app.route("/backup/create", methods=["POST"]) @login_required def create_backup_route(): """Erstellt ein manuelles Backup""" username = session.get('username') success, result = create_backup(backup_type="manual", created_by=username) if success: return jsonify({ 'success': True, 'message': f'Backup erfolgreich erstellt: {result}' }) else: return jsonify({ 'success': False, 'message': f'Backup fehlgeschlagen: {result}' }), 500 @app.route("/backup/restore/", methods=["POST"]) @login_required def restore_backup_route(backup_id): """Stellt ein Backup wieder her""" encryption_key = request.form.get('encryption_key') success, message = restore_backup(backup_id, encryption_key) if success: return jsonify({ 'success': True, 'message': message }) else: return jsonify({ 'success': False, 'message': message }), 500 @app.route("/backup/download/") @login_required def download_backup(backup_id): """Lädt eine Backup-Datei herunter""" conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT filename, filepath FROM backup_history WHERE id = %s """, (backup_id,)) backup_info = cur.fetchone() cur.close() conn.close() if not backup_info: return "Backup nicht gefunden", 404 filename, filepath = backup_info filepath = Path(filepath) if not filepath.exists(): return "Backup-Datei nicht gefunden", 404 # Audit-Log log_audit('DOWNLOAD', 'backup', backup_id, additional_info=f"Backup heruntergeladen: {filename}") return send_file(filepath, as_attachment=True, download_name=filename) @app.route("/security/blocked-ips") @login_required def blocked_ips(): """Zeigt alle gesperrten IPs an""" conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT ip_address, attempt_count, first_attempt, last_attempt, blocked_until, last_username_tried, last_error_message FROM login_attempts WHERE blocked_until IS NOT NULL ORDER BY blocked_until DESC """) blocked_ips_list = [] for ip in cur.fetchall(): blocked_ips_list.append({ 'ip_address': ip[0], 'attempt_count': ip[1], 'first_attempt': ip[2].strftime('%d.%m.%Y %H:%M'), 'last_attempt': ip[3].strftime('%d.%m.%Y %H:%M'), 'blocked_until': ip[4].strftime('%d.%m.%Y %H:%M'), 'is_active': ip[4] > datetime.now(), 'last_username': ip[5], 'last_error': ip[6] }) cur.close() conn.close() return render_template("blocked_ips.html", blocked_ips=blocked_ips_list, username=session.get('username')) @app.route("/security/unblock-ip", methods=["POST"]) @login_required def unblock_ip(): """Entsperrt eine IP-Adresse""" ip_address = request.form.get('ip_address') if ip_address: conn = get_connection() cur = conn.cursor() cur.execute(""" UPDATE login_attempts SET blocked_until = NULL WHERE ip_address = %s """, (ip_address,)) conn.commit() cur.close() conn.close() # Audit-Log log_audit('UNBLOCK_IP', 'security', additional_info=f"IP {ip_address} manuell entsperrt") return redirect(url_for('blocked_ips')) @app.route("/security/clear-attempts", methods=["POST"]) @login_required def clear_attempts(): """Löscht alle Login-Versuche für eine IP""" ip_address = request.form.get('ip_address') if ip_address: reset_login_attempts(ip_address) # Audit-Log log_audit('CLEAR_ATTEMPTS', 'security', additional_info=f"Login-Versuche für IP {ip_address} zurückgesetzt") return redirect(url_for('blocked_ips')) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000)