2FA
Dieser Commit ist enthalten in:
@@ -22,6 +22,12 @@ import requests
|
||||
import secrets
|
||||
import string
|
||||
import re
|
||||
import bcrypt
|
||||
import pyotp
|
||||
import qrcode
|
||||
from io import BytesIO
|
||||
import base64
|
||||
import json
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -111,6 +117,87 @@ def get_connection():
|
||||
conn.set_client_encoding('UTF8')
|
||||
return conn
|
||||
|
||||
# User Authentication Helper Functions
|
||||
def hash_password(password):
|
||||
"""Hash a password using bcrypt"""
|
||||
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
||||
|
||||
def verify_password(password, hashed):
|
||||
"""Verify a password against its hash"""
|
||||
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
||||
|
||||
def get_user_by_username(username):
|
||||
"""Get user from database by username"""
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
cur.execute("""
|
||||
SELECT id, username, password_hash, email, totp_secret, totp_enabled,
|
||||
backup_codes, last_password_change, failed_2fa_attempts
|
||||
FROM users WHERE username = %s
|
||||
""", (username,))
|
||||
user = cur.fetchone()
|
||||
if user:
|
||||
return {
|
||||
'id': user[0],
|
||||
'username': user[1],
|
||||
'password_hash': user[2],
|
||||
'email': user[3],
|
||||
'totp_secret': user[4],
|
||||
'totp_enabled': user[5],
|
||||
'backup_codes': user[6],
|
||||
'last_password_change': user[7],
|
||||
'failed_2fa_attempts': user[8]
|
||||
}
|
||||
return None
|
||||
finally:
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
def generate_totp_secret():
|
||||
"""Generate a new TOTP secret"""
|
||||
return pyotp.random_base32()
|
||||
|
||||
def generate_qr_code(username, totp_secret):
|
||||
"""Generate QR code for TOTP setup"""
|
||||
totp_uri = pyotp.totp.TOTP(totp_secret).provisioning_uri(
|
||||
name=username,
|
||||
issuer_name='V2 Admin Panel'
|
||||
)
|
||||
|
||||
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
||||
qr.add_data(totp_uri)
|
||||
qr.make(fit=True)
|
||||
|
||||
img = qr.make_image(fill_color="black", back_color="white")
|
||||
buf = BytesIO()
|
||||
img.save(buf, format='PNG')
|
||||
buf.seek(0)
|
||||
|
||||
return base64.b64encode(buf.getvalue()).decode()
|
||||
|
||||
def verify_totp(totp_secret, token):
|
||||
"""Verify a TOTP token"""
|
||||
totp = pyotp.TOTP(totp_secret)
|
||||
return totp.verify(token, valid_window=1)
|
||||
|
||||
def generate_backup_codes(count=8):
|
||||
"""Generate backup codes for 2FA recovery"""
|
||||
codes = []
|
||||
for _ in range(count):
|
||||
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
||||
codes.append(code)
|
||||
return codes
|
||||
|
||||
def hash_backup_code(code):
|
||||
"""Hash a backup code for storage"""
|
||||
return hashlib.sha256(code.encode()).hexdigest()
|
||||
|
||||
def verify_backup_code(code, hashed_codes):
|
||||
"""Verify a backup code against stored hashes"""
|
||||
code_hash = hashlib.sha256(code.encode()).hexdigest()
|
||||
return code_hash in hashed_codes
|
||||
|
||||
# Audit-Log-Funktion
|
||||
def log_audit(action, entity_type, entity_id=None, old_values=None, new_values=None, additional_info=None):
|
||||
"""Protokolliert Änderungen im Audit-Log"""
|
||||
@@ -647,17 +734,26 @@ def login():
|
||||
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count),
|
||||
recaptcha_site_key=recaptcha_site_key)
|
||||
|
||||
# Check gegen beide Admin-Accounts aus .env
|
||||
admin1_user = os.getenv("ADMIN1_USERNAME")
|
||||
admin1_pass = os.getenv("ADMIN1_PASSWORD")
|
||||
admin2_user = os.getenv("ADMIN2_USERNAME")
|
||||
admin2_pass = os.getenv("ADMIN2_PASSWORD")
|
||||
|
||||
# Login-Prüfung
|
||||
# Check user in database first, fallback to env vars
|
||||
user = get_user_by_username(username)
|
||||
login_success = False
|
||||
if ((username == admin1_user and password == admin1_pass) or
|
||||
(username == admin2_user and password == admin2_pass)):
|
||||
login_success = True
|
||||
needs_2fa = False
|
||||
|
||||
if user:
|
||||
# Database user authentication
|
||||
if verify_password(password, user['password_hash']):
|
||||
login_success = True
|
||||
needs_2fa = user['totp_enabled']
|
||||
else:
|
||||
# Fallback to environment variables for backward compatibility
|
||||
admin1_user = os.getenv("ADMIN1_USERNAME")
|
||||
admin1_pass = os.getenv("ADMIN1_PASSWORD")
|
||||
admin2_user = os.getenv("ADMIN2_USERNAME")
|
||||
admin2_pass = os.getenv("ADMIN2_PASSWORD")
|
||||
|
||||
if ((username == admin1_user and password == admin1_pass) or
|
||||
(username == admin2_user and password == admin2_pass)):
|
||||
login_success = True
|
||||
|
||||
# Timing-Attack Schutz - Mindestens 1 Sekunde warten
|
||||
elapsed = time.time() - start_time
|
||||
@@ -666,14 +762,23 @@ def login():
|
||||
|
||||
if login_success:
|
||||
# Erfolgreicher Login
|
||||
session.permanent = True # Aktiviert das Timeout
|
||||
session['logged_in'] = True
|
||||
session['username'] = username
|
||||
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
|
||||
reset_login_attempts(ip_address)
|
||||
log_audit('LOGIN_SUCCESS', 'user',
|
||||
additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}")
|
||||
return redirect(url_for('dashboard'))
|
||||
if needs_2fa:
|
||||
# Store temporary session for 2FA verification
|
||||
session['temp_username'] = username
|
||||
session['temp_user_id'] = user['id']
|
||||
session['awaiting_2fa'] = True
|
||||
return redirect(url_for('verify_2fa'))
|
||||
else:
|
||||
# Complete login without 2FA
|
||||
session.permanent = True # Aktiviert das Timeout
|
||||
session['logged_in'] = True
|
||||
session['username'] = username
|
||||
session['user_id'] = user['id'] if user else None
|
||||
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
|
||||
reset_login_attempts(ip_address)
|
||||
log_audit('LOGIN_SUCCESS', 'user',
|
||||
additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}")
|
||||
return redirect(url_for('dashboard'))
|
||||
else:
|
||||
# Fehlgeschlagener Login
|
||||
error_message = record_failed_attempt(ip_address, username)
|
||||
@@ -704,8 +809,224 @@ def logout():
|
||||
log_audit('LOGOUT', 'user', additional_info=f"Abmeldung")
|
||||
session.pop('logged_in', None)
|
||||
session.pop('username', None)
|
||||
session.pop('user_id', None)
|
||||
session.pop('temp_username', None)
|
||||
session.pop('temp_user_id', None)
|
||||
session.pop('awaiting_2fa', None)
|
||||
return redirect(url_for('login'))
|
||||
|
||||
@app.route("/verify-2fa", methods=["GET", "POST"])
|
||||
def verify_2fa():
|
||||
if not session.get('awaiting_2fa'):
|
||||
return redirect(url_for('login'))
|
||||
|
||||
if request.method == "POST":
|
||||
token = request.form.get('token', '').replace(' ', '')
|
||||
username = session.get('temp_username')
|
||||
user_id = session.get('temp_user_id')
|
||||
|
||||
if not username or not user_id:
|
||||
flash('Session expired. Please login again.', 'error')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
user = get_user_by_username(username)
|
||||
if not user:
|
||||
flash('User not found.', 'error')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
# Check if it's a backup code
|
||||
if len(token) == 8 and token.isupper():
|
||||
# Try backup code
|
||||
backup_codes = json.loads(user['backup_codes']) if user['backup_codes'] else []
|
||||
if verify_backup_code(token, backup_codes):
|
||||
# Remove used backup code
|
||||
code_hash = hash_backup_code(token)
|
||||
backup_codes.remove(code_hash)
|
||||
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("UPDATE users SET backup_codes = %s WHERE id = %s",
|
||||
(json.dumps(backup_codes), user_id))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
# Complete login
|
||||
session.permanent = True
|
||||
session['logged_in'] = True
|
||||
session['username'] = username
|
||||
session['user_id'] = user_id
|
||||
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
|
||||
session.pop('temp_username', None)
|
||||
session.pop('temp_user_id', None)
|
||||
session.pop('awaiting_2fa', None)
|
||||
|
||||
flash('Login successful using backup code. Please generate new backup codes.', 'warning')
|
||||
log_audit('LOGIN_2FA_BACKUP', 'user', additional_info=f"2FA login with backup code")
|
||||
return redirect(url_for('dashboard'))
|
||||
else:
|
||||
# Try TOTP token
|
||||
if verify_totp(user['totp_secret'], token):
|
||||
# Complete login
|
||||
session.permanent = True
|
||||
session['logged_in'] = True
|
||||
session['username'] = username
|
||||
session['user_id'] = user_id
|
||||
session['last_activity'] = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None).isoformat()
|
||||
session.pop('temp_username', None)
|
||||
session.pop('temp_user_id', None)
|
||||
session.pop('awaiting_2fa', None)
|
||||
|
||||
log_audit('LOGIN_2FA_SUCCESS', 'user', additional_info=f"2FA login successful")
|
||||
return redirect(url_for('dashboard'))
|
||||
|
||||
# Failed verification
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("UPDATE users SET failed_2fa_attempts = failed_2fa_attempts + 1, last_failed_2fa = %s WHERE id = %s",
|
||||
(datetime.now(), user_id))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
flash('Invalid authentication code. Please try again.', 'error')
|
||||
log_audit('LOGIN_2FA_FAILED', 'user', additional_info=f"Failed 2FA attempt")
|
||||
|
||||
return render_template('verify_2fa.html')
|
||||
|
||||
@app.route("/profile")
|
||||
@login_required
|
||||
def profile():
|
||||
user = get_user_by_username(session['username'])
|
||||
if not user:
|
||||
# For environment-based users, redirect with message
|
||||
flash('Bitte führen Sie das Migrations-Script aus, um Passwort-Änderung und 2FA zu aktivieren.', 'info')
|
||||
return redirect(url_for('dashboard'))
|
||||
return render_template('profile.html', user=user)
|
||||
|
||||
@app.route("/profile/change-password", methods=["POST"])
|
||||
@login_required
|
||||
def change_password():
|
||||
current_password = request.form.get('current_password')
|
||||
new_password = request.form.get('new_password')
|
||||
confirm_password = request.form.get('confirm_password')
|
||||
|
||||
user = get_user_by_username(session['username'])
|
||||
|
||||
# Verify current password
|
||||
if not verify_password(current_password, user['password_hash']):
|
||||
flash('Current password is incorrect.', 'error')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
# Check new password
|
||||
if new_password != confirm_password:
|
||||
flash('New passwords do not match.', 'error')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
if len(new_password) < 8:
|
||||
flash('Password must be at least 8 characters long.', 'error')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
# Update password
|
||||
new_hash = hash_password(new_password)
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("UPDATE users SET password_hash = %s, last_password_change = %s WHERE id = %s",
|
||||
(new_hash, datetime.now(), user['id']))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
log_audit('PASSWORD_CHANGE', 'user', entity_id=user['id'],
|
||||
additional_info="Password changed successfully")
|
||||
flash('Password changed successfully.', 'success')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
@app.route("/profile/setup-2fa")
|
||||
@login_required
|
||||
def setup_2fa():
|
||||
user = get_user_by_username(session['username'])
|
||||
|
||||
if user['totp_enabled']:
|
||||
flash('2FA is already enabled for your account.', 'info')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
# Generate new TOTP secret
|
||||
totp_secret = generate_totp_secret()
|
||||
session['temp_totp_secret'] = totp_secret
|
||||
|
||||
# Generate QR code
|
||||
qr_code = generate_qr_code(user['username'], totp_secret)
|
||||
|
||||
return render_template('setup_2fa.html',
|
||||
totp_secret=totp_secret,
|
||||
qr_code=qr_code)
|
||||
|
||||
@app.route("/profile/enable-2fa", methods=["POST"])
|
||||
@login_required
|
||||
def enable_2fa():
|
||||
token = request.form.get('token', '').replace(' ', '')
|
||||
totp_secret = session.get('temp_totp_secret')
|
||||
|
||||
if not totp_secret:
|
||||
flash('2FA setup session expired. Please try again.', 'error')
|
||||
return redirect(url_for('setup_2fa'))
|
||||
|
||||
# Verify the token
|
||||
if not verify_totp(totp_secret, token):
|
||||
flash('Invalid authentication code. Please try again.', 'error')
|
||||
return redirect(url_for('setup_2fa'))
|
||||
|
||||
# Generate backup codes
|
||||
backup_codes = generate_backup_codes()
|
||||
hashed_codes = [hash_backup_code(code) for code in backup_codes]
|
||||
|
||||
# Enable 2FA
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
UPDATE users
|
||||
SET totp_secret = %s, totp_enabled = TRUE, backup_codes = %s
|
||||
WHERE username = %s
|
||||
""", (totp_secret, json.dumps(hashed_codes), session['username']))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
session.pop('temp_totp_secret', None)
|
||||
|
||||
log_audit('2FA_ENABLED', 'user', additional_info="2FA enabled successfully")
|
||||
|
||||
# Show backup codes
|
||||
return render_template('backup_codes.html', backup_codes=backup_codes)
|
||||
|
||||
@app.route("/profile/disable-2fa", methods=["POST"])
|
||||
@login_required
|
||||
def disable_2fa():
|
||||
password = request.form.get('password')
|
||||
user = get_user_by_username(session['username'])
|
||||
|
||||
# Verify password
|
||||
if not verify_password(password, user['password_hash']):
|
||||
flash('Incorrect password.', 'error')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
# Disable 2FA
|
||||
conn = get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
UPDATE users
|
||||
SET totp_enabled = FALSE, totp_secret = NULL, backup_codes = NULL
|
||||
WHERE username = %s
|
||||
""", (session['username'],))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
log_audit('2FA_DISABLED', 'user', additional_info="2FA disabled")
|
||||
flash('2FA has been disabled for your account.', 'success')
|
||||
return redirect(url_for('profile'))
|
||||
|
||||
@app.route("/heartbeat", methods=['POST'])
|
||||
@login_required
|
||||
def heartbeat():
|
||||
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren