Sicherheitsstatus

Dieser Commit ist enthalten in:
2025-06-07 21:01:45 +02:00
Ursprung 25b8a9a33d
Commit df5e0e0365
7 geänderte Dateien mit 771 neuen und 11 gelöschten Zeilen

Datei anzeigen

@@ -6,7 +6,7 @@ from flask_session import Session
from functools import wraps
from dotenv import load_dotenv
import pandas as pd
from datetime import datetime
from datetime import datetime, timedelta
import io
import subprocess
import gzip
@@ -15,6 +15,8 @@ from pathlib import Path
import time
from apscheduler.schedulers.background import BackgroundScheduler
import logging
import random
import hashlib
load_dotenv()
@@ -29,6 +31,19 @@ Session(app)
BACKUP_DIR = Path("/app/backups")
BACKUP_DIR.mkdir(exist_ok=True)
# Rate-Limiting Konfiguration
FAIL_MESSAGES = [
"NOPE!",
"ACCESS DENIED, TRY HARDER",
"WRONG! 🚫",
"COMPUTER SAYS NO",
"YOU FAILED"
]
MAX_LOGIN_ATTEMPTS = 5
BLOCK_DURATION_HOURS = 24
CAPTCHA_AFTER_ATTEMPTS = 2
# Scheduler für automatische Backups
scheduler = BackgroundScheduler()
scheduler.start()
@@ -321,11 +336,183 @@ scheduler.add_job(
replace_existing=True
)
# Rate-Limiting Funktionen
def get_client_ip():
"""Ermittelt die echte IP-Adresse des Clients"""
if request.environ.get('HTTP_X_FORWARDED_FOR'):
return request.environ['HTTP_X_FORWARDED_FOR'].split(',')[0]
elif request.environ.get('HTTP_X_REAL_IP'):
return request.environ.get('HTTP_X_REAL_IP')
else:
return request.environ.get('REMOTE_ADDR')
def check_ip_blocked(ip_address):
"""Prüft ob eine IP-Adresse gesperrt ist"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT blocked_until FROM login_attempts
WHERE ip_address = %s AND blocked_until IS NOT NULL
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
if result and result[0]:
if result[0] > datetime.now():
return True, result[0]
return False, None
def record_failed_attempt(ip_address, username):
"""Zeichnet einen fehlgeschlagenen Login-Versuch auf"""
conn = get_connection()
cur = conn.cursor()
# Random Fehlermeldung
error_message = random.choice(FAIL_MESSAGES)
try:
# Prüfen ob IP bereits existiert
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
if result:
# Update bestehenden Eintrag
new_count = result[0] + 1
blocked_until = None
if new_count >= MAX_LOGIN_ATTEMPTS:
blocked_until = datetime.now() + timedelta(hours=BLOCK_DURATION_HOURS)
# E-Mail-Benachrichtigung (wenn aktiviert)
if os.getenv("EMAIL_ENABLED", "false").lower() == "true":
send_security_alert_email(ip_address, username, new_count)
cur.execute("""
UPDATE login_attempts
SET attempt_count = %s,
last_attempt = CURRENT_TIMESTAMP,
blocked_until = %s,
last_username_tried = %s,
last_error_message = %s
WHERE ip_address = %s
""", (new_count, blocked_until, username, error_message, ip_address))
else:
# Neuen Eintrag erstellen
cur.execute("""
INSERT INTO login_attempts
(ip_address, attempt_count, last_username_tried, last_error_message)
VALUES (%s, 1, %s, %s)
""", (ip_address, username, error_message))
conn.commit()
# Audit-Log
log_audit('LOGIN_FAILED', 'user',
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
except Exception as e:
print(f"Rate limiting error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
return error_message
def reset_login_attempts(ip_address):
"""Setzt die Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
try:
cur.execute("""
DELETE FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
conn.commit()
except Exception as e:
print(f"Reset attempts error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()
def get_login_attempts(ip_address):
"""Gibt die Anzahl der Login-Versuche für eine IP zurück"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
cur.close()
conn.close()
return result[0] if result else 0
def send_security_alert_email(ip_address, username, attempt_count):
"""Sendet eine Sicherheitswarnung per E-Mail"""
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
body = f"""
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
IP-Adresse: {ip_address}
Versuchter Benutzername: {username}
Anzahl Versuche: {attempt_count}
Zeit: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Die IP-Adresse wurde für 24 Stunden gesperrt.
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
"""
# TODO: E-Mail-Versand implementieren wenn SMTP konfiguriert
logging.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
print(f"E-Mail würde gesendet: {subject}")
@app.route("/login", methods=["GET", "POST"])
def login():
# Timing-Attack Schutz - Start Zeit merken
start_time = time.time()
# IP-Adresse ermitteln
ip_address = get_client_ip()
# Prüfen ob IP gesperrt ist
is_blocked, blocked_until = check_ip_blocked(ip_address)
if is_blocked:
time_remaining = (blocked_until - datetime.now()).total_seconds() / 3600
error_msg = f"IP GESPERRT! Noch {time_remaining:.1f} Stunden warten."
return render_template("login.html", error=error_msg, error_type="blocked")
# Anzahl bisheriger Versuche
attempt_count = get_login_attempts(ip_address)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
captcha_response = request.form.get("g-recaptcha-response")
# CAPTCHA-Prüfung wenn nötig
if attempt_count >= CAPTCHA_AFTER_ATTEMPTS:
if not captcha_response:
# Timing-Attack Schutz
elapsed = time.time() - start_time
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
return render_template("login.html",
error="CAPTCHA ERFORDERLICH!",
show_captcha=True,
error_type="captcha")
# Check gegen beide Admin-Accounts aus .env
admin1_user = os.getenv("ADMIN1_USERNAME")
@@ -333,16 +520,46 @@ def login():
admin2_user = os.getenv("ADMIN2_USERNAME")
admin2_pass = os.getenv("ADMIN2_PASSWORD")
# Login-Prüfung
login_success = False
if ((username == admin1_user and password == admin1_pass) or
(username == admin2_user and password == admin2_pass)):
login_success = True
# Timing-Attack Schutz - Mindestens 1 Sekunde warten
elapsed = time.time() - start_time
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
if login_success:
# Erfolgreicher Login
session['logged_in'] = True
session['username'] = username
log_audit('LOGIN', 'user', additional_info=f"Erfolgreiche Anmeldung")
reset_login_attempts(ip_address)
log_audit('LOGIN_SUCCESS', 'user',
additional_info=f"Erfolgreiche Anmeldung von IP: {ip_address}")
return redirect(url_for('dashboard'))
else:
return render_template("login.html", error="Ungültige Anmeldedaten")
# Fehlgeschlagener Login
error_message = record_failed_attempt(ip_address, username)
new_attempt_count = get_login_attempts(ip_address)
# Prüfen ob jetzt gesperrt
is_now_blocked, _ = check_ip_blocked(ip_address)
if is_now_blocked:
log_audit('LOGIN_BLOCKED', 'security',
additional_info=f"IP {ip_address} wurde nach {MAX_LOGIN_ATTEMPTS} Versuchen gesperrt")
return render_template("login.html",
error=error_message,
show_captcha=(new_attempt_count >= CAPTCHA_AFTER_ATTEMPTS),
error_type="failed",
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - new_attempt_count))
return render_template("login.html")
# GET Request
return render_template("login.html",
show_captcha=(attempt_count >= CAPTCHA_AFTER_ATTEMPTS),
attempts_left=max(0, MAX_LOGIN_ATTEMPTS - attempt_count))
@app.route("/logout")
def logout():
@@ -440,6 +657,56 @@ def dashboard():
""")
last_backup_info = cur.fetchone()
# Sicherheitsstatistiken
# Gesperrte IPs
cur.execute("""
SELECT COUNT(*) FROM login_attempts
WHERE blocked_until IS NOT NULL AND blocked_until > CURRENT_TIMESTAMP
""")
blocked_ips_count = cur.fetchone()[0]
# Fehlversuche heute
cur.execute("""
SELECT COALESCE(SUM(attempt_count), 0) FROM login_attempts
WHERE last_attempt::date = CURRENT_DATE
""")
failed_attempts_today = cur.fetchone()[0]
# Letzte 5 Sicherheitsereignisse
cur.execute("""
SELECT
la.ip_address,
la.attempt_count,
la.last_attempt,
la.blocked_until,
la.last_username_tried,
la.last_error_message
FROM login_attempts la
ORDER BY la.last_attempt DESC
LIMIT 5
""")
recent_security_events = []
for event in cur.fetchall():
recent_security_events.append({
'ip_address': event[0],
'attempt_count': event[1],
'last_attempt': event[2].strftime('%d.%m %H:%M'),
'blocked_until': event[3].strftime('%d.%m %H:%M') if event[3] and event[3] > datetime.now() else None,
'username_tried': event[4],
'error_message': event[5]
})
# Sicherheitslevel berechnen
if blocked_ips_count > 5 or failed_attempts_today > 50:
security_level = 'danger'
security_level_text = 'KRITISCH'
elif blocked_ips_count > 2 or failed_attempts_today > 20:
security_level = 'warning'
security_level_text = 'ERHÖHT'
else:
security_level = 'success'
security_level_text = 'NORMAL'
cur.close()
conn.close()
@@ -454,7 +721,13 @@ def dashboard():
'recent_licenses': recent_licenses,
'expiring_licenses': expiring_licenses,
'active_sessions': active_sessions_count,
'last_backup': last_backup_info
'last_backup': last_backup_info,
# Sicherheitsstatistiken
'blocked_ips_count': blocked_ips_count,
'failed_attempts_today': failed_attempts_today,
'recent_security_events': recent_security_events,
'security_level': security_level,
'security_level_text': security_level_text
}
return render_template("dashboard.html", stats=stats, username=session.get('username'))
@@ -1274,5 +1547,87 @@ def download_backup(backup_id):
return send_file(filepath, as_attachment=True, download_name=filename)
@app.route("/security/blocked-ips")
@login_required
def blocked_ips():
"""Zeigt alle gesperrten IPs an"""
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT
ip_address,
attempt_count,
first_attempt,
last_attempt,
blocked_until,
last_username_tried,
last_error_message
FROM login_attempts
WHERE blocked_until IS NOT NULL
ORDER BY blocked_until DESC
""")
blocked_ips_list = []
for ip in cur.fetchall():
blocked_ips_list.append({
'ip_address': ip[0],
'attempt_count': ip[1],
'first_attempt': ip[2].strftime('%d.%m.%Y %H:%M'),
'last_attempt': ip[3].strftime('%d.%m.%Y %H:%M'),
'blocked_until': ip[4].strftime('%d.%m.%Y %H:%M'),
'is_active': ip[4] > datetime.now(),
'last_username': ip[5],
'last_error': ip[6]
})
cur.close()
conn.close()
return render_template("blocked_ips.html",
blocked_ips=blocked_ips_list,
username=session.get('username'))
@app.route("/security/unblock-ip", methods=["POST"])
@login_required
def unblock_ip():
"""Entsperrt eine IP-Adresse"""
ip_address = request.form.get('ip_address')
if ip_address:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
UPDATE login_attempts
SET blocked_until = NULL
WHERE ip_address = %s
""", (ip_address,))
conn.commit()
cur.close()
conn.close()
# Audit-Log
log_audit('UNBLOCK_IP', 'security',
additional_info=f"IP {ip_address} manuell entsperrt")
return redirect(url_for('blocked_ips'))
@app.route("/security/clear-attempts", methods=["POST"])
@login_required
def clear_attempts():
"""Löscht alle Login-Versuche für eine IP"""
ip_address = request.form.get('ip_address')
if ip_address:
reset_login_attempts(ip_address)
# Audit-Log
log_audit('CLEAR_ATTEMPTS', 'security',
additional_info=f"Login-Versuche für IP {ip_address} zurückgesetzt")
return redirect(url_for('blocked_ips'))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)