Refactoring - Part1

Dieser Commit ist enthalten in:
2025-06-16 21:14:13 +02:00
Ursprung 262de2839e
Commit 29b302a343
28 geänderte Dateien mit 11124 neuen und 671 gelöschten Zeilen

Datei anzeigen

@@ -1,50 +1,63 @@
import os
import psycopg2
from psycopg2.extras import Json
from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify, flash
from flask_session import Session
from functools import wraps
from dotenv import load_dotenv
import pandas as pd
import sys
import time
import json
import logging
import requests
import re
import random
import base64
from io import BytesIO
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import io
import subprocess
import gzip
from cryptography.fernet import Fernet
from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
import logging
import random
import hashlib
import requests
import secrets
import string
import re
import bcrypt
import pyotp
import qrcode
from io import BytesIO
import base64
import json
from werkzeug.middleware.proxy_fix import ProxyFix
from openpyxl.utils import get_column_letter
load_dotenv()
# Add current directory to Python path to ensure modules can be imported
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from flask import Flask, render_template, request, redirect, session, url_for, send_file, jsonify, flash
from flask_session import Session
from werkzeug.middleware.proxy_fix import ProxyFix
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
from psycopg2.extras import Json
# Import our new modules
import config
from db import get_connection, get_db_connection, get_db_cursor, execute_query
from auth.decorators import login_required
from auth.password import hash_password, verify_password
from auth.two_factor import (
generate_totp_secret, generate_qr_code, verify_totp,
generate_backup_codes, hash_backup_code, verify_backup_code
)
from auth.rate_limiting import (
check_ip_blocked, record_failed_attempt,
reset_login_attempts, get_login_attempts
)
from utils.network import get_client_ip
from utils.audit import log_audit
from utils.license import generate_license_key, validate_license_key
from utils.backup import create_backup, restore_backup, get_or_create_encryption_key
from utils.export import (
create_excel_export, format_datetime_for_export,
prepare_license_export_data, prepare_customer_export_data,
prepare_session_export_data, prepare_audit_export_data
)
from models import get_user_by_username
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['JSON_AS_ASCII'] = False # JSON-Ausgabe mit UTF-8
app.config['JSONIFY_MIMETYPE'] = 'application/json; charset=utf-8'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5) # 5 Minuten Session-Timeout
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SECURE'] = False # Wird auf True gesetzt wenn HTTPS (intern läuft HTTP)
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['SESSION_COOKIE_NAME'] = 'admin_session'
# WICHTIG: Session-Cookie soll auch nach 5 Minuten ablaufen
app.config['SESSION_REFRESH_EACH_REQUEST'] = False
# Load configuration from config module
app.config['SECRET_KEY'] = config.SECRET_KEY
app.config['SESSION_TYPE'] = config.SESSION_TYPE
app.config['JSON_AS_ASCII'] = config.JSON_AS_ASCII
app.config['JSONIFY_MIMETYPE'] = config.JSONIFY_MIMETYPE
app.config['PERMANENT_SESSION_LIFETIME'] = config.PERMANENT_SESSION_LIFETIME
app.config['SESSION_COOKIE_HTTPONLY'] = config.SESSION_COOKIE_HTTPONLY
app.config['SESSION_COOKIE_SECURE'] = config.SESSION_COOKIE_SECURE
app.config['SESSION_COOKIE_SAMESITE'] = config.SESSION_COOKIE_SAMESITE
app.config['SESSION_COOKIE_NAME'] = config.SESSION_COOKIE_NAME
app.config['SESSION_REFRESH_EACH_REQUEST'] = config.SESSION_REFRESH_EACH_REQUEST
Session(app)
# ProxyFix für korrekte IP-Adressen hinter Nginx
@@ -52,22 +65,7 @@ app.wsgi_app = ProxyFix(
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1
)
# Backup-Konfiguration
BACKUP_DIR = Path("/app/backups")
BACKUP_DIR.mkdir(exist_ok=True)
# Rate-Limiting Konfiguration
FAIL_MESSAGES = [
"NOPE!",
"ACCESS DENIED, TRY HARDER",
"WRONG! 🚫",
"COMPUTER SAYS NO",
"YOU FAILED"
]
MAX_LOGIN_ATTEMPTS = 5
BLOCK_DURATION_HOURS = 24
CAPTCHA_AFTER_ATTEMPTS = 2
# Configuration is now loaded from config module
# Scheduler für automatische Backups
scheduler = BackgroundScheduler()
@@ -77,385 +75,6 @@ scheduler.start()
logging.basicConfig(level=logging.INFO)
# Login decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
# Prüfe ob Session abgelaufen ist
if 'last_activity' in session:
last_activity = datetime.fromisoformat(session['last_activity'])
time_since_activity = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) - last_activity
# Debug-Logging
app.logger.info(f"Session check for {session.get('username', 'unknown')}: "
f"Last activity: {last_activity}, "
f"Time since: {time_since_activity.total_seconds()} seconds")
if time_since_activity > timedelta(minutes=5):
# Session abgelaufen - Logout
username = session.get('username', 'unbekannt')
app.logger.info(f"Session timeout for user {username} - auto logout")
# Audit-Log für automatischen Logout (vor session.clear()!)
try:
log_audit('AUTO_LOGOUT', 'session', additional_info={'reason': 'Session timeout (5 minutes)', 'username': username})
except:
pass
session.clear()
flash('Ihre Sitzung ist abgelaufen. Bitte melden Sie sich erneut an.', 'warning')
return redirect(url_for('login'))
# Aktivität NICHT automatisch aktualisieren
# Nur bei expliziten Benutzeraktionen (wird vom Heartbeat gemacht)
return f(*args, **kwargs)
return decorated_function
# DB-Verbindung mit UTF-8 Encoding
def get_connection():
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "5432"),
dbname=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
options='-c client_encoding=UTF8'
)
conn.set_client_encoding('UTF8')
return conn
# User Authentication Helper Functions
def hash_password(password):
"""Hash a password using bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password, hashed):
"""Verify a password against its hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def get_user_by_username(username):
"""Get user from database by username"""
conn = get_connection()
cur = conn.cursor()
try:
cur.execute("""
SELECT id, username, password_hash, email, totp_secret, totp_enabled,
backup_codes, last_password_change, failed_2fa_attempts
FROM users WHERE username = %s
""", (username,))
user = cur.fetchone()
if user:
return {
'id': user[0],
'username': user[1],
'password_hash': user[2],
'email': user[3],
'totp_secret': user[4],
'totp_enabled': user[5],
'backup_codes': user[6],
'last_password_change': user[7],
'failed_2fa_attempts': user[8]
}
return None
finally:
cur.close()
conn.close()
def generate_totp_secret():
"""Generate a new TOTP secret"""
return pyotp.random_base32()
def generate_qr_code(username, totp_secret):
"""Generate QR code for TOTP setup"""
totp_uri = pyotp.totp.TOTP(totp_secret).provisioning_uri(
name=username,
issuer_name='V2 Admin Panel'
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return base64.b64encode(buf.getvalue()).decode()
def verify_totp(totp_secret, token):
"""Verify a TOTP token"""
totp = pyotp.TOTP(totp_secret)
return totp.verify(token, valid_window=1)
def generate_backup_codes(count=8):
"""Generate backup codes for 2FA recovery"""
codes = []
for _ in range(count):
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
codes.append(code)
return codes
def hash_backup_code(code):
"""Hash a backup code for storage"""
return hashlib.sha256(code.encode()).hexdigest()
def verify_backup_code(code, hashed_codes):
"""Verify a backup code against stored hashes"""
code_hash = hashlib.sha256(code.encode()).hexdigest()
return code_hash in hashed_codes
# Audit-Log-Funktion
def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None):
"""Protokolliert Änderungen im Audit-Log"""
conn = get_connection()
cur = conn.cursor()
try:
username = session.get('username', 'system')
ip_address = get_client_ip() if request else None
user_agent = request.headers.get('User-Agent') if request else None
# Debug logging
app.logger.info(f"Audit log - IP address captured: {ip_address}, Action: {action}, User: {username}")
# Konvertiere Dictionaries zu JSONB
old_json = Json(old_values) if old_values else None
new_json = Json(new_values) if new_values else None
cur.execute("""
INSERT INTO audit_log
(username, action, entity_type, entity_id, old_values, new_values,
ip_address, user_agent, additional_info)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (username, action, entity_type, entity_id, old_json, new_json,
ip_address, user_agent, additional_info))
conn.commit()
except Exception as e:
print(f"Audit log error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
# Verschlüsselungs-Funktionen
def get_or_create_encryption_key():
"""Holt oder erstellt einen Verschlüsselungsschlüssel"""
key_file = BACKUP_DIR / ".backup_key"
# Versuche Key aus Umgebungsvariable zu lesen
env_key = os.getenv("BACKUP_ENCRYPTION_KEY")
if env_key:
try:
# Validiere den Key
Fernet(env_key.encode())
return env_key.encode()
except:
pass
# Wenn kein gültiger Key in ENV, prüfe Datei
if key_file.exists():
return key_file.read_bytes()
# Erstelle neuen Key
key = Fernet.generate_key()
key_file.write_bytes(key)
logging.info("Neuer Backup-Verschlüsselungsschlüssel erstellt")
return key
# Backup-Funktionen
def create_backup(backup_type="manual", created_by=None):
"""Erstellt ein verschlüsseltes Backup der Datenbank"""
start_time = time.time()
timestamp = datetime.now(ZoneInfo("Europe/Berlin")).strftime("%Y%m%d_%H%M%S")
filename = f"backup_v2docker_{timestamp}_encrypted.sql.gz.enc"
filepath = BACKUP_DIR / filename
conn = get_connection()
cur = conn.cursor()
# Backup-Eintrag erstellen
cur.execute("""
INSERT INTO backup_history
(filename, filepath, backup_type, status, created_by, is_encrypted)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (filename, str(filepath), backup_type, 'in_progress',
created_by or 'system', True))
backup_id = cur.fetchone()[0]
conn.commit()
try:
# PostgreSQL Dump erstellen
dump_command = [
'pg_dump',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password',
'--verbose'
]
# PGPASSWORD setzen
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
# Dump ausführen
result = subprocess.run(dump_command, capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"pg_dump failed: {result.stderr}")
dump_data = result.stdout.encode('utf-8')
# Komprimieren
compressed_data = gzip.compress(dump_data)
# Verschlüsseln
key = get_or_create_encryption_key()
f = Fernet(key)
encrypted_data = f.encrypt(compressed_data)
# Speichern
filepath.write_bytes(encrypted_data)
# Statistiken sammeln
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'")
tables_count = cur.fetchone()[0]
cur.execute("""
SELECT SUM(n_live_tup)
FROM pg_stat_user_tables
""")
records_count = cur.fetchone()[0] or 0
duration = time.time() - start_time
filesize = filepath.stat().st_size
# Backup-Eintrag aktualisieren
cur.execute("""
UPDATE backup_history
SET status = %s, filesize = %s, tables_count = %s,
records_count = %s, duration_seconds = %s
WHERE id = %s
""", ('success', filesize, tables_count, records_count, duration, backup_id))
conn.commit()
# Audit-Log
log_audit('BACKUP', 'database', backup_id,
additional_info=f"Backup erstellt: {filename} ({filesize} bytes)")
# E-Mail-Benachrichtigung (wenn konfiguriert)
send_backup_notification(True, filename, filesize, duration)
logging.info(f"Backup erfolgreich erstellt: {filename}")
return True, filename
except Exception as e:
# Fehler protokollieren
cur.execute("""
UPDATE backup_history
SET status = %s, error_message = %s, duration_seconds = %s
WHERE id = %s
""", ('failed', str(e), time.time() - start_time, backup_id))
conn.commit()
logging.error(f"Backup fehlgeschlagen: {e}")
send_backup_notification(False, filename, error=str(e))
return False, str(e)
finally:
cur.close()
conn.close()
def restore_backup(backup_id, encryption_key=None):
"""Stellt ein Backup wieder her"""
conn = get_connection()
cur = conn.cursor()
try:
# Backup-Info abrufen
cur.execute("""
SELECT filename, filepath, is_encrypted
FROM backup_history
WHERE id = %s
""", (backup_id,))
backup_info = cur.fetchone()
if not backup_info:
raise Exception("Backup nicht gefunden")
filename, filepath, is_encrypted = backup_info
filepath = Path(filepath)
if not filepath.exists():
raise Exception("Backup-Datei nicht gefunden")
# Datei lesen
encrypted_data = filepath.read_bytes()
# Entschlüsseln
if is_encrypted:
key = encryption_key.encode() if encryption_key else get_or_create_encryption_key()
try:
f = Fernet(key)
compressed_data = f.decrypt(encrypted_data)
except:
raise Exception("Entschlüsselung fehlgeschlagen. Falsches Passwort?")
else:
compressed_data = encrypted_data
# Dekomprimieren
dump_data = gzip.decompress(compressed_data)
sql_commands = dump_data.decode('utf-8')
# Bestehende Verbindungen schließen
cur.close()
conn.close()
# Datenbank wiederherstellen
restore_command = [
'psql',
'-h', os.getenv("POSTGRES_HOST", "postgres"),
'-p', os.getenv("POSTGRES_PORT", "5432"),
'-U', os.getenv("POSTGRES_USER"),
'-d', os.getenv("POSTGRES_DB"),
'--no-password'
]
env = os.environ.copy()
env['PGPASSWORD'] = os.getenv("POSTGRES_PASSWORD")
result = subprocess.run(restore_command, input=sql_commands,
capture_output=True, text=True, env=env)
if result.returncode != 0:
raise Exception(f"Wiederherstellung fehlgeschlagen: {result.stderr}")
# Audit-Log (neue Verbindung)
log_audit('RESTORE', 'database', backup_id,
additional_info=f"Backup wiederhergestellt: {filename}")
return True, "Backup erfolgreich wiederhergestellt"
except Exception as e:
logging.error(f"Wiederherstellung fehlgeschlagen: {e}")
return False, str(e)
def send_backup_notification(success, filename, filesize=None, duration=None, error=None):
"""Sendet E-Mail-Benachrichtigung (wenn konfiguriert)"""
if not os.getenv("EMAIL_ENABLED", "false").lower() == "true":
return
# E-Mail-Funktion vorbereitet aber deaktiviert
# TODO: Implementieren wenn E-Mail-Server konfiguriert ist
logging.info(f"E-Mail-Benachrichtigung vorbereitet: Backup {'erfolgreich' if success else 'fehlgeschlagen'}")
# Scheduled Backup Job
def scheduled_backup():
"""Führt ein geplantes Backup aus"""
@@ -466,165 +85,16 @@ def scheduled_backup():
scheduler.add_job(
scheduled_backup,
'cron',
hour=3,
minute=0,
hour=config.SCHEDULER_CONFIG['backup_hour'],
minute=config.SCHEDULER_CONFIG['backup_minute'],
id='daily_backup',
replace_existing=True
)
# Rate-Limiting Funktionen
def get_client_ip():
"""Ermittelt die echte IP-Adresse des Clients"""
# Debug logging
app.logger.info(f"Headers - X-Real-IP: {request.headers.get('X-Real-IP')}, X-Forwarded-For: {request.headers.get('X-Forwarded-For')}, Remote-Addr: {request.remote_addr}")
# Try X-Real-IP first (set by nginx)
if request.headers.get('X-Real-IP'):
return request.headers.get('X-Real-IP')
# Then X-Forwarded-For
elif request.headers.get('X-Forwarded-For'):
# X-Forwarded-For can contain multiple IPs, take the first one
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
# Fallback to remote_addr
else:
return request.remote_addr
def check_ip_blocked(ip_address):
"""Prüft ob eine IP-Adresse gesperrt ist"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT blocked_until FROM login_attempts
WHERE ip_address = %s AND blocked_until IS NOT NULL
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
if result and result[0]:
if result[0] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None):
return True, result[0]
return False, None
def record_failed_attempt(ip_address, username):
"""Zeichnet einen fehlgeschlagenen Login-Versuch auf"""
conn = get_connection()
cur = conn.cursor()
# Random Fehlermeldung
error_message = random.choice(FAIL_MESSAGES)
try:
# Prüfen ob IP bereits existiert
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
if result:
# Update bestehenden Eintrag
new_count = result[0] + 1
blocked_until = None
if new_count >= MAX_LOGIN_ATTEMPTS:
blocked_until = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) + timedelta(hours=BLOCK_DURATION_HOURS)
# E-Mail-Benachrichtigung (wenn aktiviert)
if os.getenv("EMAIL_ENABLED", "false").lower() == "true":
send_security_alert_email(ip_address, username, new_count)
cur.execute("""
UPDATE login_attempts
SET attempt_count = %s,
last_attempt = CURRENT_TIMESTAMP,
blocked_until = %s,
last_username_tried = %s,
last_error_message = %s
WHERE ip_address = %s
""", (new_count, blocked_until, username, error_message, ip_address))
else:
# Neuen Eintrag erstellen
cur.execute("""
INSERT INTO login_attempts
(ip_address, attempt_count, last_username_tried, last_error_message)
VALUES (%s, 1, %s, %s)
""", (ip_address, username, error_message))
conn.commit()
# Audit-Log
log_audit('LOGIN_FAILED', 'user',
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
except Exception as e:
print(f"Rate limiting error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
return error_message
def reset_login_attempts(ip_address):
"""Setzt die Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
try:
cur.execute("""
DELETE FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
conn.commit()
except Exception as e:
print(f"Reset attempts error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
def get_login_attempts(ip_address):
"""Gibt die Anzahl der Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
return result[0] if result else 0
def send_security_alert_email(ip_address, username, attempt_count):
"""Sendet eine Sicherheitswarnung per E-Mail"""
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
body = f"""
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
IP-Adresse: {ip_address}
Versuchter Benutzername: {username}
Anzahl Versuche: {attempt_count}
Zeit: {datetime.now(ZoneInfo("Europe/Berlin")).strftime('%Y-%m-%d %H:%M:%S')}
Die IP-Adresse wurde für 24 Stunden gesperrt.
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
"""
# TODO: E-Mail-Versand implementieren wenn SMTP konfiguriert
logging.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
print(f"E-Mail würde gesendet: {subject}")
def verify_recaptcha(response):
"""Verifiziert die reCAPTCHA v2 Response mit Google"""
secret_key = os.getenv('RECAPTCHA_SECRET_KEY')
secret_key = config.RECAPTCHA_SECRET_KEY
# Wenn kein Secret Key konfiguriert ist, CAPTCHA als bestanden werten (für PoC)
if not secret_key:
@@ -657,49 +127,6 @@ def verify_recaptcha(response):
logging.error(f"Unerwarteter Fehler bei reCAPTCHA: {str(e)}")
return False
def generate_license_key(license_type='full'):
"""
Generiert einen Lizenzschlüssel im Format: AF-F-YYYYMM-XXXX-YYYY-ZZZZ
AF = Account Factory (Produktkennung)
F/T = F für Fullversion, T für Testversion
YYYY = Jahr
MM = Monat
XXXX-YYYY-ZZZZ = Zufällige alphanumerische Zeichen
"""
# Erlaubte Zeichen (ohne verwirrende wie 0/O, 1/I/l)
chars = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'
# Datum-Teil
now = datetime.now(ZoneInfo("Europe/Berlin"))
date_part = now.strftime('%Y%m')
type_char = 'F' if license_type == 'full' else 'T'
# Zufällige Teile generieren (3 Blöcke à 4 Zeichen)
parts = []
for _ in range(3):
part = ''.join(secrets.choice(chars) for _ in range(4))
parts.append(part)
# Key zusammensetzen
key = f"AF-{type_char}-{date_part}-{parts[0]}-{parts[1]}-{parts[2]}"
return key
def validate_license_key(key):
"""
Validiert das License Key Format
Erwartet: AF-F-YYYYMM-XXXX-YYYY-ZZZZ oder AF-T-YYYYMM-XXXX-YYYY-ZZZZ
"""
if not key:
return False
# Pattern für das neue Format
# AF- (fest) + F oder T + - + 6 Ziffern (YYYYMM) + - + 4 Zeichen + - + 4 Zeichen + - + 4 Zeichen
pattern = r'^AF-[FT]-\d{6}-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
# Großbuchstaben für Vergleich
return bool(re.match(pattern, key.upper()))
@app.route("/login", methods=["GET", "POST"])
def login():
@@ -725,8 +152,8 @@ def login():
captcha_response = request.form.get("g-recaptcha-response")
# CAPTCHA-Prüfung nur wenn Keys konfiguriert sind
recaptcha_site_key = os.getenv('RECAPTCHA_SITE_KEY')
if attempt_count >= CAPTCHA_AFTER_ATTEMPTS and recaptcha_site_key:
recaptcha_site_key = config.RECAPTCHA_SITE_KEY
if attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and recaptcha_site_key:
if not captcha_response:
# Timing-Attack Schutz
elapsed = time.time() - start_time
@@ -736,7 +163,7 @@ def login():
error="CAPTCHA ERFORDERLICH!",
show_captcha=True,
error_type="captcha",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=recaptcha_site_key)
# CAPTCHA validieren
@@ -749,7 +176,7 @@ def login():
error="CAPTCHA UNGÜLTIG! Bitte erneut versuchen.",
show_captcha=True,
error_type="captcha",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=recaptcha_site_key)
# Check user in database first, fallback to env vars
@@ -764,13 +191,7 @@ def login():
needs_2fa = user['totp_enabled']
else:
# Fallback to environment variables for backward compatibility
admin1_user = os.getenv("ADMIN1_USERNAME")
admin1_pass = os.getenv("ADMIN1_PASSWORD")
admin2_user = os.getenv("ADMIN2_USERNAME")
admin2_pass = os.getenv("ADMIN2_PASSWORD")
if ((username == admin1_user and password == admin1_pass) or
(username == admin2_user and password == admin2_pass)):
if username in config.ADMIN_USERS and password == config.ADMIN_USERS[username]:
login_success = True
# Timing-Attack Schutz - Mindestens 1 Sekunde warten
@@ -806,20 +227,20 @@ def login():
is_now_blocked, _ = check_ip_blocked(ip_address)
if is_now_blocked:
log_audit('LOGIN_BLOCKED', 'security',
additional_info=f"IP {ip_address} wurde nach {MAX_LOGIN_ATTEMPTS} Versuchen gesperrt")
additional_info=f"IP {ip_address} wurde nach {config.MAX_LOGIN_ATTEMPTS} Versuchen gesperrt")
return render_template("login.html",
error=error_message,
show_captcha=(new_attempt_count >= CAPTCHA_AFTER_ATTEMPTS and os.getenv('RECAPTCHA_SITE_KEY')),
show_captcha=(new_attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and config.RECAPTCHA_SITE_KEY),
error_type="failed",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - new_attempt_count),
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - new_attempt_count),
recaptcha_site_key=config.RECAPTCHA_SITE_KEY)
# GET Request
return render_template("login.html",
show_captcha=(attempt_count >= CAPTCHA_AFTER_ATTEMPTS and os.getenv('RECAPTCHA_SITE_KEY')),
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=os.getenv('RECAPTCHA_SITE_KEY'))
show_captcha=(attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and config.RECAPTCHA_SITE_KEY),
attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count),
recaptcha_site_key=config.RECAPTCHA_SITE_KEY)
@app.route("/logout")
def logout():