1737 Zeilen
56 KiB
Python
1737 Zeilen
56 KiB
Python
import os
|
|
import psycopg2
|
|
from psycopg2.extras import Json
|
|
from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify, flash
|
|
from flask_session import Session
|
|
from functools import wraps
|
|
from dotenv import load_dotenv
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
import io
|
|
import subprocess
|
|
import gzip
|
|
from cryptography.fernet import Fernet
|
|
from pathlib import Path
|
|
import time
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
import logging
|
|
import random
|
|
import hashlib
|
|
import requests
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = os.urandom(24)
|
|
app.config['SESSION_TYPE'] = 'filesystem'
|
|
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
|
|
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5) # 5 Minuten Session-Timeout
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SECURE'] = False # Wird auf True gesetzt wenn HTTPS (intern läuft HTTP)
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
app.config['SESSION_COOKIE_NAME'] = 'admin_session'
|
|
# WICHTIG: Session-Cookie soll auch nach 5 Minuten ablaufen
|
|
app.config['SESSION_REFRESH_EACH_REQUEST'] = False
|
|
Session(app)
|
|
|
|
# Backup-Konfiguration
|
|
BACKUP_DIR = Path("/app/backups")
|
|
BACKUP_DIR.mkdir(exist_ok=True)
|
|
|
|
# Rate-Limiting Konfiguration
|
|
FAIL_MESSAGES = [
|
|
"NOPE!",
|
|
"ACCESS DENIED, TRY HARDER",
|
|
"WRONG! 🚫",
|
|
"COMPUTER SAYS NO",
|
|
"YOU FAILED"
|
|
]
|
|
|
|
MAX_LOGIN_ATTEMPTS = 5
|
|
BLOCK_DURATION_HOURS = 24
|
|
CAPTCHA_AFTER_ATTEMPTS = 2
|
|
|
|
# Scheduler für automatische Backups
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.start()
|
|
|
|
# Logging konfigurieren
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Login decorator
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'logged_in' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
# Prüfe ob Session abgelaufen ist
|
|
if 'last_activity' in session:
|
|
last_activity = datetime.fromisoformat(session['last_activity'])
|
|
time_since_activity = datetime.now() - last_activity
|
|
|
|
# Debug-Logging
|
|
app.logger.info(f"Session check for {session.get('username', 'unknown')}: "
|
|
f"Last activity: {last_activity}, "
|
|
f"Time since: {time_since_activity.total_seconds()} seconds")
|
|
|
|
if time_since_activity > timedelta(minutes=5):
|
|
# Session abgelaufen - Logout
|
|
username = session.get('username', 'unbekannt')
|
|
app.logger.info(f"Session timeout for user {username} - auto logout")
|
|
# Audit-Log für automatischen Logout (vor session.clear()!)
|
|
try:
|
|
log_audit('AUTO_LOGOUT', 'session', additional_info={'reason': 'Session timeout (5 minutes)', 'username': username})
|
|
except:
|
|
pass
|
|
session.clear()
|
|
flash('Ihre Sitzung ist abgelaufen. Bitte melden Sie sich erneut an.', 'warning')
|
|
return redirect(url_for('login'))
|
|
|
|
# Aktivität NICHT automatisch aktualisieren
|
|
# Nur bei expliziten Benutzeraktionen (wird vom Heartbeat gemacht)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# DB-Verbindung mit UTF-8 Encoding
|
|
def get_connection():
|
|
conn = psycopg2.connect(
|
|
host=os.getenv("POSTGRES_HOST", "postgres"),
|
|
port=os.getenv("POSTGRES_PORT", "5432"),
|
|
dbname=os.getenv("POSTGRES_DB"),
|
|
user=os.getenv("POSTGRES_USER"),
|
|
password=os.getenv("POSTGRES_PASSWORD"),
|
|
options='-c client_encoding=UTF8'
|
|
)
|
|
conn.set_client_encoding('UTF8')
|
|
return conn
|
|
|
|
# Audit-Log-Funktion
|
|
def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None):
|
|
"""Protokolliert Änderungen im Audit-Log"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
username = session.get('username', 'system')
|
|
ip_address = request.remote_addr if request else None
|
|
user_agent = request.headers.get('User-Agent') if request else None
|
|
|
|
# Konvertiere Dictionaries zu JSONB
|
|
old_json = Json(old_values) if old_values else None
|
|
new_json = Json(new_values) if new_values else None
|
|
|
|
cur.execute("""
|
|
INSERT INTO audit_log
|
|
(username, action, entity_type, entity_id, old_values, new_values,
|
|
ip_address, user_agent, additional_info)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (username, action, entity_type, entity_id, old_json, new_json,
|
|
ip_address, user_agent, additional_info))
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"Audit log error: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Verschlüsselungs-Funktionen
|
|
def get_or_create_encryption_key():
|
|
"""Holt oder erstellt einen Verschlüsselungsschlüssel"""
|
|
key_file = BACKUP_DIR / ".backup_key"
|
|
|
|
# Versuche Key aus Umgebungsvariable zu lesen
|
|
env_key = os.getenv("BACKUP_ENCRYPTION_KEY")
|
|
if env_key:
|
|
try:
|
|
# Validiere den Key
|
|
Fernet(env_key.encode())
|
|
return env_key.encode()
|
|
except:
|
|
pass
|
|
|
|
# Wenn kein gültiger Key in ENV, prüfe Datei
|
|
if key_file.exists():
|
|
return key_file.read_bytes()
|
|
|
|
# Erstelle neuen Key
|
|
key = Fernet.generate_key()
|
|
key_file.write_bytes(key)
|
|
logging.info("Neuer Backup-Verschlüsselungsschlüssel erstellt")
|
|
return key
|
|
|
|
# Backup-Funktionen
|
|
def create_backup(backup_type="manual", created_by=None):
|
|
"""Erstellt ein verschlüsseltes Backup der Datenbank"""
|
|
start_time = time.time()
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc"
|
|
filepath = BACKUP_DIR / filename
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Backup-Eintrag erstellen
|
|
cur.execute("""
|
|
INSERT INTO backup_history
|
|
(filename, filepath, backup_type, status, created_by, is_encrypted)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (filename, str(filepath), backup_type, 'in_progress',
|
|
created_by or 'system', True))
|
|
backup_id = cur.fetchone()[0]
|
|
conn.commit()
|
|
|
|
try:
|
|
# PostgreSQL Dump erstellen
|
|
dump_command = [
|
|
'pg_dump',
|
|
'-h', os.getenv("POSTGRES_HOST", "postgres"),
|
|
'-p', os.getenv("POSTGRES_PORT", "5432"),
|
|
'-U', os.getenv("POSTGRES_USER"),
|
|
'-d', os.getenv("POSTGRES_DB"),
|
|
'--no-password',
|
|
'--verbose'
|
|
]
|
|
|
|
# PGPASSWORD setzen
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
|
|
|
|
# Dump ausführen
|
|
result = subprocess.run(dump_command, capture_output=True, text=True, env=env)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"pg_dump failed: {result.stderr}")
|
|
|
|
dump_data = result.stdout.encode('utf-8')
|
|
|
|
# Komprimieren
|
|
compressed_data = gzip.compress(dump_data)
|
|
|
|
# Verschlüsseln
|
|
key = get_or_create_encryption_key()
|
|
f = Fernet(key)
|
|
encrypted_data = f.encrypt(compressed_data)
|
|
|
|
# Speichern
|
|
filepath.write_bytes(encrypted_data)
|
|
|
|
# Statistiken sammeln
|
|
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'")
|
|
tables_count = cur.fetchone()[0]
|
|
|
|
cur.execute("""
|
|
SELECT SUM(n_live_tup)
|
|
FROM pg_stat_user_tables
|
|
""")
|
|
records_count = cur.fetchone()[0] or 0
|
|
|
|
duration = time.time() - start_time
|
|
filesize = filepath.stat().st_size
|
|
|
|
# Backup-Eintrag aktualisieren
|
|
cur.execute("""
|
|
UPDATE backup_history
|
|
SET status = %s, filesize = %s, tables_count = %s,
|
|
records_count = %s, duration_seconds = %s
|
|
WHERE id = %s
|
|
""", ('success', filesize, tables_count, records_count, duration, backup_id))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
log_audit('BACKUP', 'database', backup_id,
|
|
additional_info=f"Backup erstellt: {filename} ({filesize} bytes)")
|
|
|
|
# E-Mail-Benachrichtigung (wenn konfiguriert)
|
|
send_backup_notification(True, filename, filesize, duration)
|
|
|
|
logging.info(f"Backup erfolgreich erstellt: {filename}")
|
|
return True, filename
|
|
|
|
except Exception as e:
|
|
# Fehler protokollieren
|
|
cur.execute("""
|
|
UPDATE backup_history
|
|
SET status = %s, error_message = %s, duration_seconds = %s
|
|
WHERE id = %s
|
|
""", ('failed', str(e), time.time() - start_time, backup_id))
|
|
conn.commit()
|
|
|
|
logging.error(f"Backup fehlgeschlagen: {e}")
|
|
send_backup_notification(False, filename, error=str(e))
|
|
|
|
return False, str(e)
|
|
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def restore_backup(backup_id, encryption_key=None):
|
|
"""Stellt ein Backup wieder her"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Backup-Info abrufen
|
|
cur.execute("""
|
|
SELECT filename, filepath, is_encrypted
|
|
FROM backup_history
|
|
WHERE id = %s
|
|
""", (backup_id,))
|
|
backup_info = cur.fetchone()
|
|
|
|
if not backup_info:
|
|
raise Exception("Backup nicht gefunden")
|
|
|
|
filename, filepath, is_encrypted = backup_info
|
|
filepath = Path(filepath)
|
|
|
|
if not filepath.exists():
|
|
raise Exception("Backup-Datei nicht gefunden")
|
|
|
|
# Datei lesen
|
|
encrypted_data = filepath.read_bytes()
|
|
|
|
# Entschlüsseln
|
|
if is_encrypted:
|
|
key = encryption_key.encode() if encryption_key else get_or_create_encryption_key()
|
|
try:
|
|
f = Fernet(key)
|
|
compressed_data = f.decrypt(encrypted_data)
|
|
except:
|
|
raise Exception("Entschlüsselung fehlgeschlagen. Falsches Passwort?")
|
|
else:
|
|
compressed_data = encrypted_data
|
|
|
|
# Dekomprimieren
|
|
dump_data = gzip.decompress(compressed_data)
|
|
sql_commands = dump_data.decode('utf-8')
|
|
|
|
# Bestehende Verbindungen schließen
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Datenbank wiederherstellen
|
|
restore_command = [
|
|
'psql',
|
|
'-h', os.getenv("POSTGRES_HOST", "postgres"),
|
|
'-p', os.getenv("POSTGRES_PORT", "5432"),
|
|
'-U', os.getenv("POSTGRES_USER"),
|
|
'-d', os.getenv("POSTGRES_DB"),
|
|
'--no-password'
|
|
]
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
|
|
|
|
result = subprocess.run(restore_command, input=sql_commands,
|
|
capture_output=True, text=True, env=env)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Wiederherstellung fehlgeschlagen: {result.stderr}")
|
|
|
|
# Audit-Log (neue Verbindung)
|
|
log_audit('RESTORE', 'database', backup_id,
|
|
additional_info=f"Backup wiederhergestellt: {filename}")
|
|
|
|
return True, "Backup erfolgreich wiederhergestellt"
|
|
|
|
except Exception as e:
|
|
logging.error(f"Wiederherstellung fehlgeschlagen: {e}")
|
|
return False, str(e)
|
|
|
|
def send_backup_notification(success, filename, filesize=None, duration=None, error=None):
|
|
"""Sendet E-Mail-Benachrichtigung (wenn konfiguriert)"""
|
|
if not os.getenv("EMAIL_ENABLED", "false").lower() == "true":
|
|
return
|
|
|
|
# E-Mail-Funktion vorbereitet aber deaktiviert
|
|
# TODO: Implementieren wenn E-Mail-Server konfiguriert ist
|
|
logging.info(f"E-Mail-Benachrichtigung vorbereitet: Backup {'erfolgreich' if success else 'fehlgeschlagen'}")
|
|
|
|
# Scheduled Backup Job
|
|
def scheduled_backup():
|
|
"""Führt ein geplantes Backup aus"""
|
|
logging.info("Starte geplantes Backup...")
|
|
create_backup(backup_type="scheduled", created_by="scheduler")
|
|
|
|
# Scheduler konfigurieren - täglich um 3:00 Uhr
|
|
scheduler.add_job(
|
|
scheduled_backup,
|
|
'cron',
|
|
hour=3,
|
|
minute=0,
|
|
id='daily_backup',
|
|
replace_existing=True
|
|
)
|
|
|
|
# Rate-Limiting Funktionen
|
|
def get_client_ip():
|
|
"""Ermittelt die echte IP-Adresse des Clients"""
|
|
if request.environ.get('HTTP_X_FORWARDED_FOR'):
|
|
return request.environ['HTTP_X_FORWARDED_FOR'].split(',')[0]
|
|
elif request.environ.get('HTTP_X_REAL_IP'):
|
|
return request.environ.get('HTTP_X_REAL_IP')
|
|
else:
|
|
return request.environ.get('REMOTE_ADDR')
|
|
|
|
def check_ip_blocked(ip_address):
|
|
"""Prüft ob eine IP-Adresse gesperrt ist"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT blocked_until FROM login_attempts
|
|
WHERE ip_address = %s AND blocked_until IS NOT NULL
|
|
""", (ip_address,))
|
|
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if result and result[0]:
|
|
if result[0] > datetime.now():
|
|
return True, result[0]
|
|
return False, None
|
|
|
|
def record_failed_attempt(ip_address, username):
|
|
"""Zeichnet einen fehlgeschlagenen Login-Versuch auf"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Random Fehlermeldung
|
|
error_message = random.choice(FAIL_MESSAGES)
|
|
|
|
try:
|
|
# Prüfen ob IP bereits existiert
|
|
cur.execute("""
|
|
SELECT attempt_count FROM login_attempts
|
|
WHERE ip_address = %s
|
|
""", (ip_address,))
|
|
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
# Update bestehenden Eintrag
|
|
new_count = result[0] + 1
|
|
blocked_until = None
|
|
|
|
if new_count >= MAX_LOGIN_ATTEMPTS:
|
|
blocked_until = datetime.now() + timedelta(hours=BLOCK_DURATION_HOURS)
|
|
# E-Mail-Benachrichtigung (wenn aktiviert)
|
|
if os.getenv("EMAIL_ENABLED", "false").lower() == "true":
|
|
send_security_alert_email(ip_address, username, new_count)
|
|
|
|
cur.execute("""
|
|
UPDATE login_attempts
|
|
SET attempt_count = %s,
|
|
last_attempt = CURRENT_TIMESTAMP,
|
|
blocked_until = %s,
|
|
last_username_tried = %s,
|
|
last_error_message = %s
|
|
WHERE ip_address = %s
|
|
""", (new_count, blocked_until, username, error_message, ip_address))
|
|
else:
|
|
# Neuen Eintrag erstellen
|
|
cur.execute("""
|
|
INSERT INTO login_attempts
|
|
(ip_address, attempt_count, last_username_tried, last_error_message)
|
|
VALUES (%s, 1, %s, %s)
|
|
""", (ip_address, username, error_message))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
log_audit('LOGIN_FAILED', 'user',
|
|
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
|
|
|
|
except Exception as e:
|
|
print(f"Rate limiting error: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return error_message
|
|
|
|
def reset_login_attempts(ip_address):
|
|
"""Setzt die Login-Versuche für eine IP zurück"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
cur.execute("""
|
|
DELETE FROM login_attempts
|
|
WHERE ip_address = %s
|
|
""", (ip_address,))
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"Reset attempts error: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def get_login_attempts(ip_address):
|
|
"""Gibt die Anzahl der Login-Versuche für eine IP zurück"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT attempt_count FROM login_attempts
|
|
WHERE ip_address = %s
|
|
""", (ip_address,))
|
|
|
|
result = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return result[0] if result else 0
|
|
|
|
def send_security_alert_email(ip_address, username, attempt_count):
|
|
"""Sendet eine Sicherheitswarnung per E-Mail"""
|
|
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
|
|
body = f"""
|
|
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
|
|
|
|
IP-Adresse: {ip_address}
|
|
Versuchter Benutzername: {username}
|
|
Anzahl Versuche: {attempt_count}
|
|
Zeit: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
|
|
Die IP-Adresse wurde für 24 Stunden gesperrt.
|
|
|
|
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
|
|
"""
|
|
|
|
# TODO: E-Mail-Versand implementieren wenn SMTP konfiguriert
|
|
logging.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
|
|
print(f"E-Mail würde gesendet: {subject}")
|
|
|
|
def verify_recaptcha(response):
|
|
"""Verifiziert die reCAPTCHA v2 Response mit Google"""
|
|
secret_key = os.getenv('RECAPTCHA_SECRET_KEY')
|
|
|
|
# Wenn kein Secret Key konfiguriert ist, CAPTCHA als bestanden werten (für PoC)
|
|
if not secret_key:
|
|
logging.warning("RECAPTCHA_SECRET_KEY nicht konfiguriert - CAPTCHA wird übersprungen")
|
|
return True
|
|
|
|
# Verifizierung bei Google
|
|
try:
|
|
verify_url = 'https://www.google.com/recaptcha/api/siteverify'
|
|
data = {
|
|
'secret': secret_key,
|
|
'response': response
|
|
}
|
|
|
|
# Timeout für Request setzen
|
|
r = requests.post(verify_url, data=data, timeout=5)
|
|
result = r.json()
|
|
|
|
# Log für Debugging
|
|
if not result.get('success'):
|
|
logging.warning(f"reCAPTCHA Validierung fehlgeschlagen: {result.get('error-codes', [])}")
|
|
|
|
return result.get('success', False)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"reCAPTCHA Verifizierung fehlgeschlagen: {str(e)}")
|
|
# Bei Netzwerkfehlern CAPTCHA als bestanden werten
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Unerwarteter Fehler bei reCAPTCHA: {str(e)}")
|
|
return False
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
# Timing-Attack Schutz - Start Zeit merken
|
|
start_time = time.time()
|
|
|
|
# IP-Adresse ermitteln
|
|
ip_address = get_client_ip()
|
|
|
|
# Prüfen ob IP gesperrt ist
|
|
is_blocked, blocked_until = check_ip_blocked(ip_address)
|
|
if is_blocked:
|
|
time_remaining = (blocked_until - datetime.now()).total_seconds() / 3600
|
|
error_msg = f"IP GESPERRT! Noch {time_remaining:.1f} Stunden warten."
|
|
return render_template("login.html", error=error_msg, error_type="blocked")
|
|
|
|
# Anzahl bisheriger Versuche
|
|
attempt_count = get_login_attempts(ip_address)
|
|
|
|
if request.method == "POST":
|
|
username = request.form.get("username")
|
|
password = request.form.get("password")
|
|
captcha_response = request.form.get("g-recaptcha-response")
|
|
|
|
# CAPTCHA-Prüfung wenn nötig
|
|
if attempt_count >= CAPTCHA_AFTER_ATTEMPTS:
|
|
if not captcha_response:
|
|
# Timing-Attack Schutz
|
|
elapsed = time.time() - start_time
|
|
if elapsed < 1.0:
|
|
time.sleep(1.0 - elapsed)
|
|
return render_template("login.html",
|
|
error="CAPTCHA ERFORDERLICH!",
|
|
show_captcha=True,
|
|
error_type="captcha",
|
|
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
|
|
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
|
|
|
|
# CAPTCHA validieren
|
|
if not verify_recaptcha(captcha_response):
|
|
# Timing-Attack Schutz
|
|
elapsed = time.time() - start_time
|
|
if elapsed < 1.0:
|
|
time.sleep(1.0 - elapsed)
|
|
return render_template("login.html",
|
|
error="CAPTCHA UNGÜLTIG! Bitte erneut versuchen.",
|
|
show_captcha=True,
|
|
error_type="captcha",
|
|
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
|
|
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
|
|
|
|
# Check gegen beide Admin-Accounts aus .env
|
|
admin1_user = os.getenv("ADMIN1_USERNAME")
|
|
admin1_pass = os.getenv("ADMIN1_PASSWORD")
|
|
admin2_user = os.getenv("ADMIN2_USERNAME")
|
|
admin2_pass = os.getenv("ADMIN2_PASSWORD")
|
|
|
|
# Login-Prüfung
|
|
login_success = False
|
|
if ((username == admin1_user and password == admin1_pass) or
|
|
(username == admin2_user and password == admin2_pass)):
|
|
login_success = True
|
|
|
|
# Timing-Attack Schutz - Mindestens 1 Sekunde warten
|
|
elapsed = time.time() - start_time
|
|
if elapsed < 1.0:
|
|
time.sleep(1.0 - elapsed)
|
|
|
|
if login_success:
|
|
# Erfolgreicher Login
|
|
session.permanent = True # Aktiviert das Timeout
|
|
session['logged_in'] = True
|
|
session['username'] = username
|
|
session['last_activity'] = datetime.now().isoformat()
|
|
reset_login_attempts(ip_address)
|
|
log_audit('LOGIN_SUCCESS', 'user',
|
|
additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}")
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
# Fehlgeschlagener Login
|
|
error_message = record_failed_attempt(ip_address, username)
|
|
new_attempt_count = get_login_attempts(ip_address)
|
|
|
|
# Prüfen ob jetzt gesperrt
|
|
is_now_blocked, _ = check_ip_blocked(ip_address)
|
|
if is_now_blocked:
|
|
log_audit('LOGIN_BLOCKED', 'security',
|
|
additional_info=f"IP {ip_address} wurde nach {MAX_LOGIN_ATTEMPTS} Versuchen gesperrt")
|
|
|
|
return render_template("login.html",
|
|
error=error_message,
|
|
show_captcha=(new_attempt_count >= CAPTCHA_AFTER_ATTEMPTS),
|
|
error_type="failed",
|
|
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - new_attempt_count),
|
|
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
|
|
|
|
# GET Request
|
|
return render_template("login.html",
|
|
show_captcha=(attempt_count >= CAPTCHA_AFTER_ATTEMPTS),
|
|
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
|
|
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
username = session.get('username', 'unknown')
|
|
log_audit('LOGOUT', 'user', additional_info=f"Abmeldung")
|
|
session.pop('logged_in', None)
|
|
session.pop('username', None)
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route("/heartbeat", methods=['POST'])
|
|
@login_required
|
|
def heartbeat():
|
|
"""Endpoint für Session Keep-Alive - aktualisiert last_activity"""
|
|
# Aktualisiere last_activity nur wenn explizit angefordert
|
|
session['last_activity'] = datetime.now().isoformat()
|
|
# Force session save
|
|
session.modified = True
|
|
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'last_activity': session['last_activity'],
|
|
'username': session.get('username')
|
|
})
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def dashboard():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Statistiken abrufen
|
|
# Gesamtanzahl Kunden
|
|
cur.execute("SELECT COUNT(*) FROM customers")
|
|
total_customers = cur.fetchone()[0]
|
|
|
|
# Gesamtanzahl Lizenzen
|
|
cur.execute("SELECT COUNT(*) FROM licenses")
|
|
total_licenses = cur.fetchone()[0]
|
|
|
|
# Aktive Lizenzen (nicht abgelaufen und is_active = true)
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until >= CURRENT_DATE AND is_active = TRUE
|
|
""")
|
|
active_licenses = cur.fetchone()[0]
|
|
|
|
# Aktive Sessions
|
|
cur.execute("SELECT COUNT(*) FROM sessions WHERE is_active = TRUE")
|
|
active_sessions_count = cur.fetchone()[0]
|
|
|
|
# Abgelaufene Lizenzen
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until < CURRENT_DATE
|
|
""")
|
|
expired_licenses = cur.fetchone()[0]
|
|
|
|
# Lizenzen die in den nächsten 30 Tagen ablaufen
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM licenses
|
|
WHERE valid_until >= CURRENT_DATE
|
|
AND valid_until < CURRENT_DATE + INTERVAL '30 days'
|
|
AND is_active = TRUE
|
|
""")
|
|
expiring_soon = cur.fetchone()[0]
|
|
|
|
# Testlizenzen vs Vollversionen
|
|
cur.execute("""
|
|
SELECT license_type, COUNT(*)
|
|
FROM licenses
|
|
GROUP BY license_type
|
|
""")
|
|
license_types = dict(cur.fetchall())
|
|
|
|
# Letzte 5 erstellten Lizenzen
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, l.valid_until,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
|
|
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
|
|
ELSE 'aktiv'
|
|
END as status
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
ORDER BY l.id DESC
|
|
LIMIT 5
|
|
""")
|
|
recent_licenses = cur.fetchall()
|
|
|
|
# Bald ablaufende Lizenzen (nächste 30 Tage)
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, l.valid_until,
|
|
l.valid_until - CURRENT_DATE as days_left
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.valid_until >= CURRENT_DATE
|
|
AND l.valid_until < CURRENT_DATE + INTERVAL '30 days'
|
|
AND l.is_active = TRUE
|
|
ORDER BY l.valid_until
|
|
LIMIT 10
|
|
""")
|
|
expiring_licenses = cur.fetchall()
|
|
|
|
# Letztes Backup
|
|
cur.execute("""
|
|
SELECT created_at, filesize, duration_seconds, backup_type, status
|
|
FROM backup_history
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""")
|
|
last_backup_info = cur.fetchone()
|
|
|
|
# Sicherheitsstatistiken
|
|
# Gesperrte IPs
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM login_attempts
|
|
WHERE blocked_until IS NOT NULL AND blocked_until > CURRENT_TIMESTAMP
|
|
""")
|
|
blocked_ips_count = cur.fetchone()[0]
|
|
|
|
# Fehlversuche heute
|
|
cur.execute("""
|
|
SELECT COALESCE(SUM(attempt_count), 0) FROM login_attempts
|
|
WHERE last_attempt::date = CURRENT_DATE
|
|
""")
|
|
failed_attempts_today = cur.fetchone()[0]
|
|
|
|
# Letzte 5 Sicherheitsereignisse
|
|
cur.execute("""
|
|
SELECT
|
|
la.ip_address,
|
|
la.attempt_count,
|
|
la.last_attempt,
|
|
la.blocked_until,
|
|
la.last_username_tried,
|
|
la.last_error_message
|
|
FROM login_attempts la
|
|
ORDER BY la.last_attempt DESC
|
|
LIMIT 5
|
|
""")
|
|
recent_security_events = []
|
|
for event in cur.fetchall():
|
|
recent_security_events.append({
|
|
'ip_address': event[0],
|
|
'attempt_count': event[1],
|
|
'last_attempt': event[2].strftime('%d.%m %H:%M'),
|
|
'blocked_until': event[3].strftime('%d.%m %H:%M') if event[3] and event[3] > datetime.now() else None,
|
|
'username_tried': event[4],
|
|
'error_message': event[5]
|
|
})
|
|
|
|
# Sicherheitslevel berechnen
|
|
if blocked_ips_count > 5 or failed_attempts_today > 50:
|
|
security_level = 'danger'
|
|
security_level_text = 'KRITISCH'
|
|
elif blocked_ips_count > 2 or failed_attempts_today > 20:
|
|
security_level = 'warning'
|
|
security_level_text = 'ERHÖHT'
|
|
else:
|
|
security_level = 'success'
|
|
security_level_text = 'NORMAL'
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
stats = {
|
|
'total_customers': total_customers,
|
|
'total_licenses': total_licenses,
|
|
'active_licenses': active_licenses,
|
|
'expired_licenses': expired_licenses,
|
|
'expiring_soon': expiring_soon,
|
|
'full_licenses': license_types.get('full', 0),
|
|
'test_licenses': license_types.get('test', 0),
|
|
'recent_licenses': recent_licenses,
|
|
'expiring_licenses': expiring_licenses,
|
|
'active_sessions': active_sessions_count,
|
|
'last_backup': last_backup_info,
|
|
# Sicherheitsstatistiken
|
|
'blocked_ips_count': blocked_ips_count,
|
|
'failed_attempts_today': failed_attempts_today,
|
|
'recent_security_events': recent_security_events,
|
|
'security_level': security_level,
|
|
'security_level_text': security_level_text
|
|
}
|
|
|
|
return render_template("dashboard.html", stats=stats, username=session.get('username'))
|
|
|
|
@app.route("/create", methods=["GET", "POST"])
|
|
@login_required
|
|
def create_license():
|
|
if request.method == "POST":
|
|
name = request.form["customer_name"]
|
|
email = request.form["email"]
|
|
license_key = request.form["license_key"]
|
|
license_type = request.form["license_type"]
|
|
valid_from = request.form["valid_from"]
|
|
valid_until = request.form["valid_until"]
|
|
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Kunde einfügen (falls nicht vorhanden)
|
|
cur.execute("""
|
|
INSERT INTO customers (name, email, created_at)
|
|
VALUES (%s, %s, NOW())
|
|
RETURNING id
|
|
""", (name, email))
|
|
customer_id = cur.fetchone()[0]
|
|
|
|
# Lizenz hinzufügen
|
|
cur.execute("""
|
|
INSERT INTO licenses (license_key, customer_id, license_type, valid_from, valid_until, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, TRUE)
|
|
RETURNING id
|
|
""", (license_key, customer_id, license_type, valid_from, valid_until))
|
|
license_id = cur.fetchone()[0]
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
log_audit('CREATE', 'license', license_id,
|
|
new_values={
|
|
'license_key': license_key,
|
|
'customer_name': name,
|
|
'customer_email': email,
|
|
'license_type': license_type,
|
|
'valid_from': valid_from,
|
|
'valid_until': valid_until
|
|
})
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/create")
|
|
|
|
return render_template("index.html", username=session.get('username'))
|
|
|
|
@app.route("/licenses")
|
|
@login_required
|
|
def licenses():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Parameter
|
|
search = request.args.get('search', '').strip()
|
|
filter_type = request.args.get('type', '')
|
|
filter_status = request.args.get('status', '')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
# SQL Query mit optionaler Suche und Filtern
|
|
query = """
|
|
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
|
|
l.valid_from, l.valid_until, l.is_active,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'abgelaufen'
|
|
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'läuft bald ab'
|
|
ELSE 'aktiv'
|
|
END as status
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
|
|
# Suchfilter
|
|
if search:
|
|
query += """
|
|
AND (LOWER(l.license_key) LIKE LOWER(%s)
|
|
OR LOWER(c.name) LIKE LOWER(%s)
|
|
OR LOWER(c.email) LIKE LOWER(%s))
|
|
"""
|
|
search_param = f'%{search}%'
|
|
params.extend([search_param, search_param, search_param])
|
|
|
|
# Typ-Filter
|
|
if filter_type:
|
|
query += " AND l.license_type = %s"
|
|
params.append(filter_type)
|
|
|
|
# Status-Filter
|
|
if filter_status == 'active':
|
|
query += " AND l.valid_until >= CURRENT_DATE AND l.is_active = TRUE"
|
|
elif filter_status == 'expiring':
|
|
query += " AND l.valid_until >= CURRENT_DATE AND l.valid_until < CURRENT_DATE + INTERVAL '30 days' AND l.is_active = TRUE"
|
|
elif filter_status == 'expired':
|
|
query += " AND l.valid_until < CURRENT_DATE"
|
|
elif filter_status == 'inactive':
|
|
query += " AND l.is_active = FALSE"
|
|
|
|
# Gesamtanzahl für Pagination
|
|
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
|
|
cur.execute(count_query, params)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Pagination
|
|
offset = (page - 1) * per_page
|
|
query += " ORDER BY l.valid_until DESC LIMIT %s OFFSET %s"
|
|
params.extend([per_page, offset])
|
|
|
|
cur.execute(query, params)
|
|
licenses = cur.fetchall()
|
|
|
|
# Pagination Info
|
|
total_pages = (total + per_page - 1) // per_page
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("licenses.html",
|
|
licenses=licenses,
|
|
search=search,
|
|
filter_type=filter_type,
|
|
filter_status=filter_status,
|
|
page=page,
|
|
total_pages=total_pages,
|
|
total=total,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/license/edit/<int:license_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edit_license(license_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
# Alte Werte für Audit-Log abrufen
|
|
cur.execute("""
|
|
SELECT license_key, license_type, valid_from, valid_until, is_active
|
|
FROM licenses WHERE id = %s
|
|
""", (license_id,))
|
|
old_license = cur.fetchone()
|
|
|
|
# Update license
|
|
license_key = request.form["license_key"]
|
|
license_type = request.form["license_type"]
|
|
valid_from = request.form["valid_from"]
|
|
valid_until = request.form["valid_until"]
|
|
is_active = request.form.get("is_active") == "on"
|
|
|
|
cur.execute("""
|
|
UPDATE licenses
|
|
SET license_key = %s, license_type = %s, valid_from = %s,
|
|
valid_until = %s, is_active = %s
|
|
WHERE id = %s
|
|
""", (license_key, license_type, valid_from, valid_until, is_active, license_id))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
log_audit('UPDATE', 'license', license_id,
|
|
old_values={
|
|
'license_key': old_license[0],
|
|
'license_type': old_license[1],
|
|
'valid_from': str(old_license[2]),
|
|
'valid_until': str(old_license[3]),
|
|
'is_active': old_license[4]
|
|
},
|
|
new_values={
|
|
'license_key': license_key,
|
|
'license_type': license_type,
|
|
'valid_from': valid_from,
|
|
'valid_until': valid_until,
|
|
'is_active': is_active
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/licenses")
|
|
|
|
# Get license data
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name, c.email, l.license_type,
|
|
l.valid_from, l.valid_until, l.is_active, c.id
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.id = %s
|
|
""", (license_id,))
|
|
|
|
license = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not license:
|
|
return redirect("/licenses")
|
|
|
|
return render_template("edit_license.html", license=license, username=session.get('username'))
|
|
|
|
@app.route("/license/delete/<int:license_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_license(license_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Lizenzdetails für Audit-Log abrufen
|
|
cur.execute("""
|
|
SELECT l.license_key, c.name, l.license_type
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE l.id = %s
|
|
""", (license_id,))
|
|
license_info = cur.fetchone()
|
|
|
|
cur.execute("DELETE FROM licenses WHERE id = %s", (license_id,))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
if license_info:
|
|
log_audit('DELETE', 'license', license_id,
|
|
old_values={
|
|
'license_key': license_info[0],
|
|
'customer_name': license_info[1],
|
|
'license_type': license_info[2]
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/licenses")
|
|
|
|
@app.route("/customers")
|
|
@login_required
|
|
def customers():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Parameter
|
|
search = request.args.get('search', '').strip()
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 20
|
|
|
|
# SQL Query mit optionaler Suche
|
|
base_query = """
|
|
SELECT c.id, c.name, c.email, c.created_at,
|
|
COUNT(l.id) as license_count,
|
|
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
"""
|
|
|
|
params = []
|
|
|
|
if search:
|
|
base_query += """
|
|
WHERE LOWER(c.name) LIKE LOWER(%s)
|
|
OR LOWER(c.email) LIKE LOWER(%s)
|
|
"""
|
|
search_param = f'%{search}%'
|
|
params.extend([search_param, search_param])
|
|
|
|
# Gesamtanzahl für Pagination
|
|
count_query = f"""
|
|
SELECT COUNT(DISTINCT c.id)
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
{"WHERE LOWER(c.name) LIKE LOWER(%s) OR LOWER(c.email) LIKE LOWER(%s)" if search else ""}
|
|
"""
|
|
if search:
|
|
cur.execute(count_query, params)
|
|
else:
|
|
cur.execute(count_query)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Pagination
|
|
offset = (page - 1) * per_page
|
|
query = base_query + """
|
|
GROUP BY c.id, c.name, c.email, c.created_at
|
|
ORDER BY c.created_at DESC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
params.extend([per_page, offset])
|
|
|
|
cur.execute(query, params)
|
|
customers = cur.fetchall()
|
|
|
|
# Pagination Info
|
|
total_pages = (total + per_page - 1) // per_page
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("customers.html",
|
|
customers=customers,
|
|
search=search,
|
|
page=page,
|
|
total_pages=total_pages,
|
|
total=total,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/customer/edit/<int:customer_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edit_customer(customer_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
if request.method == "POST":
|
|
# Alte Werte für Audit-Log abrufen
|
|
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
|
|
old_customer = cur.fetchone()
|
|
|
|
# Update customer
|
|
name = request.form["name"]
|
|
email = request.form["email"]
|
|
|
|
cur.execute("""
|
|
UPDATE customers
|
|
SET name = %s, email = %s
|
|
WHERE id = %s
|
|
""", (name, email, customer_id))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
log_audit('UPDATE', 'customer', customer_id,
|
|
old_values={
|
|
'name': old_customer[0],
|
|
'email': old_customer[1]
|
|
},
|
|
new_values={
|
|
'name': name,
|
|
'email': email
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/customers")
|
|
|
|
# Get customer data with licenses
|
|
cur.execute("""
|
|
SELECT id, name, email, created_at
|
|
FROM customers
|
|
WHERE id = %s
|
|
""", (customer_id,))
|
|
|
|
customer = cur.fetchone()
|
|
|
|
# Get customer's licenses
|
|
cur.execute("""
|
|
SELECT id, license_key, license_type, valid_from, valid_until, is_active
|
|
FROM licenses
|
|
WHERE customer_id = %s
|
|
ORDER BY valid_until DESC
|
|
""", (customer_id,))
|
|
|
|
licenses = cur.fetchall()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not customer:
|
|
return redirect("/customers")
|
|
|
|
return render_template("edit_customer.html", customer=customer, licenses=licenses, username=session.get('username'))
|
|
|
|
@app.route("/customer/delete/<int:customer_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_customer(customer_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Prüfen ob Kunde Lizenzen hat
|
|
cur.execute("SELECT COUNT(*) FROM licenses WHERE customer_id = %s", (customer_id,))
|
|
license_count = cur.fetchone()[0]
|
|
|
|
if license_count > 0:
|
|
# Kunde hat Lizenzen - nicht löschen
|
|
cur.close()
|
|
conn.close()
|
|
return redirect("/customers")
|
|
|
|
# Kundendetails für Audit-Log abrufen
|
|
cur.execute("SELECT name, email FROM customers WHERE id = %s", (customer_id,))
|
|
customer_info = cur.fetchone()
|
|
|
|
# Kunde löschen wenn keine Lizenzen vorhanden
|
|
cur.execute("DELETE FROM customers WHERE id = %s", (customer_id,))
|
|
|
|
conn.commit()
|
|
|
|
# Audit-Log
|
|
if customer_info:
|
|
log_audit('DELETE', 'customer', customer_id,
|
|
old_values={
|
|
'name': customer_info[0],
|
|
'email': customer_info[1]
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/customers")
|
|
|
|
@app.route("/sessions")
|
|
@login_required
|
|
def sessions():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Aktive Sessions abrufen
|
|
cur.execute("""
|
|
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
|
|
s.user_agent, s.started_at, s.last_heartbeat,
|
|
EXTRACT(EPOCH FROM (NOW() - s.last_heartbeat))/60 as minutes_inactive
|
|
FROM sessions s
|
|
JOIN licenses l ON s.license_id = l.id
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE s.is_active = TRUE
|
|
ORDER BY s.last_heartbeat DESC
|
|
""")
|
|
active_sessions = cur.fetchall()
|
|
|
|
# Inaktive Sessions der letzten 24 Stunden
|
|
cur.execute("""
|
|
SELECT s.id, s.session_id, l.license_key, c.name, s.ip_address,
|
|
s.started_at, s.ended_at,
|
|
EXTRACT(EPOCH FROM (s.ended_at - s.started_at))/60 as duration_minutes
|
|
FROM sessions s
|
|
JOIN licenses l ON s.license_id = l.id
|
|
JOIN customers c ON l.customer_id = c.id
|
|
WHERE s.is_active = FALSE
|
|
AND s.ended_at > NOW() - INTERVAL '24 hours'
|
|
ORDER BY s.ended_at DESC
|
|
LIMIT 50
|
|
""")
|
|
recent_sessions = cur.fetchall()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("sessions.html",
|
|
active_sessions=active_sessions,
|
|
recent_sessions=recent_sessions,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/session/end/<int:session_id>", methods=["POST"])
|
|
@login_required
|
|
def end_session(session_id):
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Session beenden
|
|
cur.execute("""
|
|
UPDATE sessions
|
|
SET is_active = FALSE, ended_at = NOW()
|
|
WHERE id = %s AND is_active = TRUE
|
|
""", (session_id,))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return redirect("/sessions")
|
|
|
|
@app.route("/export/licenses")
|
|
@login_required
|
|
def export_licenses():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Alle Lizenzen mit Kundeninformationen abrufen
|
|
cur.execute("""
|
|
SELECT l.id, l.license_key, c.name as customer_name, c.email as customer_email,
|
|
l.license_type, l.valid_from, l.valid_until, l.is_active,
|
|
CASE
|
|
WHEN l.valid_until < CURRENT_DATE THEN 'Abgelaufen'
|
|
WHEN l.valid_until < CURRENT_DATE + INTERVAL '30 days' THEN 'Läuft bald ab'
|
|
ELSE 'Aktiv'
|
|
END as status
|
|
FROM licenses l
|
|
JOIN customers c ON l.customer_id = c.id
|
|
ORDER BY l.id
|
|
""")
|
|
|
|
# Spaltennamen
|
|
columns = ['ID', 'Lizenzschlüssel', 'Kunde', 'E-Mail', 'Typ',
|
|
'Gültig von', 'Gültig bis', 'Aktiv', 'Status']
|
|
|
|
# Daten in DataFrame
|
|
data = cur.fetchall()
|
|
df = pd.DataFrame(data, columns=columns)
|
|
|
|
# Datumsformatierung
|
|
df['Gültig von'] = pd.to_datetime(df['Gültig von']).dt.strftime('%d.%m.%Y')
|
|
df['Gültig bis'] = pd.to_datetime(df['Gültig bis']).dt.strftime('%d.%m.%Y')
|
|
|
|
# Typ und Aktiv Status anpassen
|
|
df['Typ'] = df['Typ'].replace({'full': 'Vollversion', 'test': 'Testversion'})
|
|
df['Aktiv'] = df['Aktiv'].replace({True: 'Ja', False: 'Nein'})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Export Format
|
|
export_format = request.args.get('format', 'excel')
|
|
|
|
# Audit-Log
|
|
log_audit('EXPORT', 'license',
|
|
additional_info=f"Export aller Lizenzen als {export_format.upper()}")
|
|
filename = f'lizenzen_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
|
|
|
|
if export_format == 'csv':
|
|
# CSV Export
|
|
output = io.StringIO()
|
|
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
|
|
output.seek(0)
|
|
|
|
return send_file(
|
|
io.BytesIO(output.getvalue().encode('utf-8-sig')),
|
|
mimetype='text/csv',
|
|
as_attachment=True,
|
|
download_name=f'{filename}.csv'
|
|
)
|
|
else:
|
|
# Excel Export
|
|
output = io.BytesIO()
|
|
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
|
df.to_excel(writer, sheet_name='Lizenzen', index=False)
|
|
|
|
# Formatierung
|
|
worksheet = writer.sheets['Lizenzen']
|
|
for column in worksheet.columns:
|
|
max_length = 0
|
|
column_letter = column[0].column_letter
|
|
for cell in column:
|
|
try:
|
|
if len(str(cell.value)) > max_length:
|
|
max_length = len(str(cell.value))
|
|
except:
|
|
pass
|
|
adjusted_width = min(max_length + 2, 50)
|
|
worksheet.column_dimensions[column_letter].width = adjusted_width
|
|
|
|
output.seek(0)
|
|
|
|
return send_file(
|
|
output,
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
as_attachment=True,
|
|
download_name=f'{filename}.xlsx'
|
|
)
|
|
|
|
@app.route("/export/customers")
|
|
@login_required
|
|
def export_customers():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Alle Kunden mit Lizenzstatistiken
|
|
cur.execute("""
|
|
SELECT c.id, c.name, c.email, c.created_at,
|
|
COUNT(l.id) as total_licenses,
|
|
COUNT(CASE WHEN l.is_active = TRUE AND l.valid_until >= CURRENT_DATE THEN 1 END) as active_licenses,
|
|
COUNT(CASE WHEN l.valid_until < CURRENT_DATE THEN 1 END) as expired_licenses
|
|
FROM customers c
|
|
LEFT JOIN licenses l ON c.id = l.customer_id
|
|
GROUP BY c.id, c.name, c.email, c.created_at
|
|
ORDER BY c.id
|
|
""")
|
|
|
|
# Spaltennamen
|
|
columns = ['ID', 'Name', 'E-Mail', 'Erstellt am',
|
|
'Lizenzen gesamt', 'Aktive Lizenzen', 'Abgelaufene Lizenzen']
|
|
|
|
# Daten in DataFrame
|
|
data = cur.fetchall()
|
|
df = pd.DataFrame(data, columns=columns)
|
|
|
|
# Datumsformatierung
|
|
df['Erstellt am'] = pd.to_datetime(df['Erstellt am']).dt.strftime('%d.%m.%Y %H:%M')
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Export Format
|
|
export_format = request.args.get('format', 'excel')
|
|
|
|
# Audit-Log
|
|
log_audit('EXPORT', 'customer',
|
|
additional_info=f"Export aller Kunden als {export_format.upper()}")
|
|
filename = f'kunden_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
|
|
|
|
if export_format == 'csv':
|
|
# CSV Export
|
|
output = io.StringIO()
|
|
df.to_csv(output, index=False, encoding='utf-8-sig', sep=';')
|
|
output.seek(0)
|
|
|
|
return send_file(
|
|
io.BytesIO(output.getvalue().encode('utf-8-sig')),
|
|
mimetype='text/csv',
|
|
as_attachment=True,
|
|
download_name=f'{filename}.csv'
|
|
)
|
|
else:
|
|
# Excel Export
|
|
output = io.BytesIO()
|
|
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
|
df.to_excel(writer, sheet_name='Kunden', index=False)
|
|
|
|
# Formatierung
|
|
worksheet = writer.sheets['Kunden']
|
|
for column in worksheet.columns:
|
|
max_length = 0
|
|
column_letter = column[0].column_letter
|
|
for cell in column:
|
|
try:
|
|
if len(str(cell.value)) > max_length:
|
|
max_length = len(str(cell.value))
|
|
except:
|
|
pass
|
|
adjusted_width = min(max_length + 2, 50)
|
|
worksheet.column_dimensions[column_letter].width = adjusted_width
|
|
|
|
output.seek(0)
|
|
|
|
return send_file(
|
|
output,
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
as_attachment=True,
|
|
download_name=f'{filename}.xlsx'
|
|
)
|
|
|
|
@app.route("/audit")
|
|
@login_required
|
|
def audit_log():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Parameter
|
|
filter_user = request.args.get('user', '').strip()
|
|
filter_action = request.args.get('action', '').strip()
|
|
filter_entity = request.args.get('entity', '').strip()
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 50
|
|
|
|
# SQL Query mit optionalen Filtern
|
|
query = """
|
|
SELECT id, timestamp, username, action, entity_type, entity_id,
|
|
old_values, new_values, ip_address, user_agent, additional_info
|
|
FROM audit_log
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
|
|
# Filter
|
|
if filter_user:
|
|
query += " AND LOWER(username) LIKE LOWER(%s)"
|
|
params.append(f'%{filter_user}%')
|
|
|
|
if filter_action:
|
|
query += " AND action = %s"
|
|
params.append(filter_action)
|
|
|
|
if filter_entity:
|
|
query += " AND entity_type = %s"
|
|
params.append(filter_entity)
|
|
|
|
# Gesamtanzahl für Pagination
|
|
count_query = "SELECT COUNT(*) FROM (" + query + ") as count_table"
|
|
cur.execute(count_query, params)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Pagination
|
|
offset = (page - 1) * per_page
|
|
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
|
|
params.extend([per_page, offset])
|
|
|
|
cur.execute(query, params)
|
|
logs = cur.fetchall()
|
|
|
|
# JSON-Werte parsen
|
|
parsed_logs = []
|
|
for log in logs:
|
|
parsed_log = list(log)
|
|
# old_values und new_values sind bereits Dictionaries (JSONB)
|
|
# Keine Konvertierung nötig
|
|
parsed_logs.append(parsed_log)
|
|
|
|
# Pagination Info
|
|
total_pages = (total + per_page - 1) // per_page
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("audit_log.html",
|
|
logs=parsed_logs,
|
|
filter_user=filter_user,
|
|
filter_action=filter_action,
|
|
filter_entity=filter_entity,
|
|
page=page,
|
|
total_pages=total_pages,
|
|
total=total,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/backups")
|
|
@login_required
|
|
def backups():
|
|
"""Zeigt die Backup-Historie an"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Letztes erfolgreiches Backup für Dashboard
|
|
cur.execute("""
|
|
SELECT created_at, filesize, duration_seconds
|
|
FROM backup_history
|
|
WHERE status = 'success'
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""")
|
|
last_backup = cur.fetchone()
|
|
|
|
# Alle Backups abrufen
|
|
cur.execute("""
|
|
SELECT id, filename, filesize, backup_type, status, error_message,
|
|
created_at, created_by, tables_count, records_count,
|
|
duration_seconds, is_encrypted
|
|
FROM backup_history
|
|
ORDER BY created_at DESC
|
|
""")
|
|
backups = cur.fetchall()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("backups.html",
|
|
backups=backups,
|
|
last_backup=last_backup,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/backup/create", methods=["POST"])
|
|
@login_required
|
|
def create_backup_route():
|
|
"""Erstellt ein manuelles Backup"""
|
|
username = session.get('username')
|
|
success, result = create_backup(backup_type="manual", created_by=username)
|
|
|
|
if success:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Backup erfolgreich erstellt: {result}'
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f'Backup fehlgeschlagen: {result}'
|
|
}), 500
|
|
|
|
@app.route("/backup/restore/<int:backup_id>", methods=["POST"])
|
|
@login_required
|
|
def restore_backup_route(backup_id):
|
|
"""Stellt ein Backup wieder her"""
|
|
encryption_key = request.form.get('encryption_key')
|
|
|
|
success, message = restore_backup(backup_id, encryption_key)
|
|
|
|
if success:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': message
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': message
|
|
}), 500
|
|
|
|
@app.route("/backup/download/<int:backup_id>")
|
|
@login_required
|
|
def download_backup(backup_id):
|
|
"""Lädt eine Backup-Datei herunter"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT filename, filepath
|
|
FROM backup_history
|
|
WHERE id = %s
|
|
""", (backup_id,))
|
|
backup_info = cur.fetchone()
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not backup_info:
|
|
return "Backup nicht gefunden", 404
|
|
|
|
filename, filepath = backup_info
|
|
filepath = Path(filepath)
|
|
|
|
if not filepath.exists():
|
|
return "Backup-Datei nicht gefunden", 404
|
|
|
|
# Audit-Log
|
|
log_audit('DOWNLOAD', 'backup', backup_id,
|
|
additional_info=f"Backup heruntergeladen: {filename}")
|
|
|
|
return send_file(filepath, as_attachment=True, download_name=filename)
|
|
|
|
@app.route("/security/blocked-ips")
|
|
@login_required
|
|
def blocked_ips():
|
|
"""Zeigt alle gesperrten IPs an"""
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
ip_address,
|
|
attempt_count,
|
|
first_attempt,
|
|
last_attempt,
|
|
blocked_until,
|
|
last_username_tried,
|
|
last_error_message
|
|
FROM login_attempts
|
|
WHERE blocked_until IS NOT NULL
|
|
ORDER BY blocked_until DESC
|
|
""")
|
|
|
|
blocked_ips_list = []
|
|
for ip in cur.fetchall():
|
|
blocked_ips_list.append({
|
|
'ip_address': ip[0],
|
|
'attempt_count': ip[1],
|
|
'first_attempt': ip[2].strftime('%d.%m.%Y %H:%M'),
|
|
'last_attempt': ip[3].strftime('%d.%m.%Y %H:%M'),
|
|
'blocked_until': ip[4].strftime('%d.%m.%Y %H:%M'),
|
|
'is_active': ip[4] > datetime.now(),
|
|
'last_username': ip[5],
|
|
'last_error': ip[6]
|
|
})
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return render_template("blocked_ips.html",
|
|
blocked_ips=blocked_ips_list,
|
|
username=session.get('username'))
|
|
|
|
@app.route("/security/unblock-ip", methods=["POST"])
|
|
@login_required
|
|
def unblock_ip():
|
|
"""Entsperrt eine IP-Adresse"""
|
|
ip_address = request.form.get('ip_address')
|
|
|
|
if ip_address:
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("""
|
|
UPDATE login_attempts
|
|
SET blocked_until = NULL
|
|
WHERE ip_address = %s
|
|
""", (ip_address,))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Audit-Log
|
|
log_audit('UNBLOCK_IP', 'security',
|
|
additional_info=f"IP {ip_address} manuell entsperrt")
|
|
|
|
return redirect(url_for('blocked_ips'))
|
|
|
|
@app.route("/security/clear-attempts", methods=["POST"])
|
|
@login_required
|
|
def clear_attempts():
|
|
"""Löscht alle Login-Versuche für eine IP"""
|
|
ip_address = request.form.get('ip_address')
|
|
|
|
if ip_address:
|
|
reset_login_attempts(ip_address)
|
|
|
|
# Audit-Log
|
|
log_audit('CLEAR_ATTEMPTS', 'security',
|
|
additional_info=f"Login-Versuche für IP {ip_address} zurückgesetzt")
|
|
|
|
return redirect(url_for('blocked_ips'))
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000)
|