import time import json from datetime import datetime from zoneinfo import ZoneInfo from flask import Blueprint, render_template, request, redirect, session, url_for, flash, jsonify import config from auth.decorators import login_required from auth.password import hash_password, verify_password from auth.two_factor import ( generate_totp_secret, generate_qr_code, verify_totp, generate_backup_codes, hash_backup_code, verify_backup_code ) from auth.rate_limiting import ( check_ip_blocked, record_failed_attempt, reset_login_attempts, get_login_attempts ) from utils.network import get_client_ip from utils.audit import log_audit from models import get_user_by_username from db import get_db_connection, get_db_cursor from utils.recaptcha import verify_recaptcha # Create Blueprint auth_bp = Blueprint('auth', __name__) @auth_bp.route("/login", methods=["GET", "POST"]) def login(): # Timing-Attack Schutz - Start Zeit merken start_time = time.time() # IP-Adresse ermitteln ip_address = get_client_ip() # Prüfen ob IP gesperrt ist is_blocked, blocked_until = check_ip_blocked(ip_address) if is_blocked: time_remaining = (blocked_until - datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None)).total_seconds() / 3600 error_msg = f"IP GESPERRT! Noch {time_remaining:.1f} Stunden warten." return render_template("login.html", error=error_msg, error_type="blocked") # Anzahl bisheriger Versuche attempt_count = get_login_attempts(ip_address) if request.method == "POST": username = request.form.get("username") password = request.form.get("password") captcha_response = request.form.get("g-recaptcha-response") # CAPTCHA-Prüfung nur wenn Keys konfiguriert sind recaptcha_site_key = config.RECAPTCHA_SITE_KEY if attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and recaptcha_site_key: if not captcha_response: # Timing-Attack Schutz elapsed = time.time() - start_time if elapsed < 1.0: time.sleep(1.0 - elapsed) return render_template("login.html", error="CAPTCHA ERFORDERLICH!", show_captcha=True, error_type="captcha", attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count), recaptcha_site_key=recaptcha_site_key) # CAPTCHA validieren if not verify_recaptcha(captcha_response): # Timing-Attack Schutz elapsed = time.time() - start_time if elapsed < 1.0: time.sleep(1.0 - elapsed) return render_template("login.html", error="CAPTCHA UNGÜLTIG! Bitte erneut versuchen.", show_captcha=True, error_type="captcha", attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count), recaptcha_site_key=recaptcha_site_key) # Check user in database first, fallback to env vars user = get_user_by_username(username) login_success = False needs_2fa = False if user: # Database user authentication if verify_password(password, user['password_hash']): login_success = True needs_2fa = user['totp_enabled'] else: # Fallback to environment variables for backward compatibility if username in config.ADMIN_USERS and password == config.ADMIN_USERS[username]: login_success = True # Timing-Attack Schutz - Mindestens 1 Sekunde warten elapsed = time.time() - start_time if elapsed < 1.0: time.sleep(1.0 - elapsed) if login_success: # Erfolgreicher Login if needs_2fa: # Store temporary session for 2FA verification session['temp_username'] = username session['temp_user_id'] = user['id'] session['awaiting_2fa'] = True return redirect(url_for('auth.verify_2fa')) else: # Complete login without 2FA session.permanent = True # Aktiviert das Timeout session['logged_in'] = True session['username'] = username session['user_id'] = user['id'] if user else None session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat() reset_login_attempts(ip_address) log_audit('LOGIN_SUCCESS', 'user', additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}") return redirect(url_for('admin.dashboard')) else: # Fehlgeschlagener Login error_message = record_failed_attempt(ip_address, username) new_attempt_count = get_login_attempts(ip_address) # Prüfen ob jetzt gesperrt is_now_blocked, _ = check_ip_blocked(ip_address) if is_now_blocked: log_audit('LOGIN_BLOCKED', 'security', additional_info=f"IP {ip_address} wurde nach {config.MAX_LOGIN_ATTEMPTS} Versuchen gesperrt") return render_template("login.html", error=error_message, show_captcha=(new_attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and config.RECAPTCHA_SITE_KEY), error_type="failed", attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - new_attempt_count), recaptcha_site_key=config.RECAPTCHA_SITE_KEY) # GET Request return render_template("login.html", show_captcha=(attempt_count >= config.CAPTCHA_AFTER_ATTEMPTS and config.RECAPTCHA_SITE_KEY), attempts_left=max(0, config.MAX_LOGIN_ATTEMPTS - attempt_count), recaptcha_site_key=config.RECAPTCHA_SITE_KEY) @auth_bp.route("/logout") def logout(): username = session.get('username', 'unknown') log_audit('LOGOUT', 'user', additional_info=f"Abmeldung") session.pop('logged_in', None) session.pop('username', None) session.pop('user_id', None) session.pop('temp_username', None) session.pop('temp_user_id', None) session.pop('awaiting_2fa', None) return redirect(url_for('auth.login')) @auth_bp.route("/verify-2fa", methods=["GET", "POST"]) def verify_2fa(): if not session.get('awaiting_2fa'): return redirect(url_for('auth.login')) if request.method == "POST": token = request.form.get('token', '').replace(' ', '') username = session.get('temp_username') user_id = session.get('temp_user_id') if not username or not user_id: flash('Session expired. Please login again.', 'error') return redirect(url_for('auth.login')) user = get_user_by_username(username) if not user: flash('User not found.', 'error') return redirect(url_for('auth.login')) # Check if it's a backup code if len(token) == 8 and token.isupper(): # Try backup code backup_codes = json.loads(user['backup_codes']) if user['backup_codes'] else [] if verify_backup_code(token, backup_codes): # Remove used backup code code_hash = hash_backup_code(token) backup_codes.remove(code_hash) with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute("UPDATE users SET backup_codes = %s WHERE id = %s", (json.dumps(backup_codes), user_id)) # Complete login session.permanent = True session['logged_in'] = True session['username'] = username session['user_id'] = user_id session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat() session.pop('temp_username', None) session.pop('temp_user_id', None) session.pop('awaiting_2fa', None) flash('Login successful using backup code. Please generate new backup codes.', 'warning') log_audit('LOGIN_2FA_BACKUP', 'user', additional_info=f"2FA login with backup code") return redirect(url_for('admin.dashboard')) else: # Try TOTP token if verify_totp(user['totp_secret'], token): # Complete login session.permanent = True session['logged_in'] = True session['username'] = username session['user_id'] = user_id session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat() session.pop('temp_username', None) session.pop('temp_user_id', None) session.pop('awaiting_2fa', None) log_audit('LOGIN_2FA_SUCCESS', 'user', additional_info=f"2FA login successful") return redirect(url_for('admin.dashboard')) # Failed verification with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute("UPDATE users SET failed_2fa_attempts = failed_2fa_attempts + 1, last_failed_2fa = %s WHERE id = %s", (datetime.now(), user_id)) flash('Invalid authentication code. Please try again.', 'error') log_audit('LOGIN_2FA_FAILED', 'user', additional_info=f"Failed 2FA attempt") return render_template('verify_2fa.html') @auth_bp.route("/profile") @login_required def profile(): user = get_user_by_username(session['username']) if not user: # For environment-based users, redirect with message flash('Bitte führen Sie das Migrations-Script aus, um Passwort-Änderung und 2FA zu aktivieren.', 'info') return redirect(url_for('admin.dashboard')) return render_template('profile.html', user=user) @auth_bp.route("/profile/change-password", methods=["POST"]) @login_required def change_password(): current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') user = get_user_by_username(session['username']) # Verify current password if not verify_password(current_password, user['password_hash']): flash('Current password is incorrect.', 'error') return redirect(url_for('auth.profile')) # Check new password if new_password != confirm_password: flash('New passwords do not match.', 'error') return redirect(url_for('auth.profile')) if len(new_password) < 8: flash('Password must be at least 8 characters long.', 'error') return redirect(url_for('auth.profile')) # Update password new_hash = hash_password(new_password) with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute("UPDATE users SET password_hash = %s, last_password_change = %s WHERE id = %s", (new_hash, datetime.now(), user['id'])) log_audit('PASSWORD_CHANGE', 'user', entity_id=user['id'], additional_info="Password changed successfully") flash('Password changed successfully.', 'success') return redirect(url_for('auth.profile')) @auth_bp.route("/profile/setup-2fa") @login_required def setup_2fa(): user = get_user_by_username(session['username']) if user['totp_enabled']: flash('2FA is already enabled for your account.', 'info') return redirect(url_for('auth.profile')) # Generate new TOTP secret totp_secret = generate_totp_secret() session['temp_totp_secret'] = totp_secret # Generate QR code qr_code = generate_qr_code(user['username'], totp_secret) return render_template('setup_2fa.html', totp_secret=totp_secret, qr_code=qr_code) @auth_bp.route("/profile/enable-2fa", methods=["POST"]) @login_required def enable_2fa(): token = request.form.get('token', '').replace(' ', '') totp_secret = session.get('temp_totp_secret') if not totp_secret: flash('2FA setup session expired. Please try again.', 'error') return redirect(url_for('auth.setup_2fa')) # Verify the token if not verify_totp(totp_secret, token): flash('Invalid authentication code. Please try again.', 'error') return redirect(url_for('auth.setup_2fa')) # Generate backup codes backup_codes = generate_backup_codes() backup_codes_hashed = [hash_backup_code(code) for code in backup_codes] # Enable 2FA for user user = get_user_by_username(session['username']) with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute(""" UPDATE users SET totp_secret = %s, totp_enabled = true, backup_codes = %s WHERE id = %s """, (totp_secret, json.dumps(backup_codes_hashed), user['id'])) # Clear temp secret session.pop('temp_totp_secret', None) log_audit('2FA_ENABLED', 'user', entity_id=user['id'], additional_info="2FA successfully enabled") # Show backup codes return render_template('backup_codes.html', backup_codes=backup_codes) @auth_bp.route("/profile/disable-2fa", methods=["POST"]) @login_required def disable_2fa(): password = request.form.get('password') user = get_user_by_username(session['username']) # Verify password if not verify_password(password, user['password_hash']): flash('Incorrect password. 2FA was not disabled.', 'error') return redirect(url_for('auth.profile')) # Disable 2FA with get_db_connection() as conn: with get_db_cursor(conn) as cur: cur.execute(""" UPDATE users SET totp_enabled = false, totp_secret = NULL, backup_codes = NULL WHERE id = %s """, (user['id'],)) log_audit('2FA_DISABLED', 'user', entity_id=user['id'], additional_info="2FA disabled by user") flash('2FA has been disabled for your account.', 'success') return redirect(url_for('auth.profile')) @auth_bp.route("/heartbeat", methods=['POST']) @login_required def heartbeat(): """Endpoint für Session Keep-Alive - aktualisiert last_activity""" # Aktualisiere last_activity nur wenn explizit angefordert session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat() # Force session save session.modified = True return jsonify({ 'status': 'ok', 'last_activity': session['last_activity'], 'username': session.get('username') })