Files
v2-Docker/v2_adminpanel/auth/rate_limiting.py
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

124 Zeilen
4.5 KiB
Python

import random
import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from flask import request
from db import execute_query, get_db_connection, get_db_cursor
from config import FAIL_MESSAGES, MAX_LOGIN_ATTEMPTS, BLOCK_DURATION_HOURS, EMAIL_ENABLED
from utils.audit import log_audit
from utils.network import get_client_ip
logger = logging.getLogger(__name__)
def check_ip_blocked(ip_address):
"""Check if an IP address is blocked"""
result = execute_query(
"""
SELECT blocked_until FROM login_attempts
WHERE ip_address = %s AND blocked_until IS NOT NULL
""",
(ip_address,),
fetch_one=True
)
if result and result[0]:
if result[0] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None):
return True, result[0]
return False, None
def record_failed_attempt(ip_address, username):
"""Record a failed login attempt"""
# Random error message
error_message = random.choice(FAIL_MESSAGES)
with get_db_connection() as conn:
with get_db_cursor(conn) as cur:
try:
# Check if IP already exists
cur.execute("""
SELECT attempt_count FROM login_attempts
WHERE ip_address = %s
""", (ip_address,))
result = cur.fetchone()
if result:
# Update existing entry
new_count = result[0] + 1
blocked_until = None
if new_count >= MAX_LOGIN_ATTEMPTS:
blocked_until = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) + timedelta(hours=BLOCK_DURATION_HOURS)
# Email notification (if enabled)
if EMAIL_ENABLED:
send_security_alert_email(ip_address, username, new_count)
cur.execute("""
UPDATE login_attempts
SET attempt_count = %s,
last_attempt = CURRENT_TIMESTAMP,
blocked_until = %s,
last_username_tried = %s,
last_error_message = %s
WHERE ip_address = %s
""", (new_count, blocked_until, username, error_message, ip_address))
else:
# Create new entry
cur.execute("""
INSERT INTO login_attempts
(ip_address, attempt_count, last_username_tried, last_error_message)
VALUES (%s, 1, %s, %s)
""", (ip_address, username, error_message))
conn.commit()
# Audit log
log_audit('LOGIN_FAILED', 'user',
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
except Exception as e:
logger.error(f"Rate limiting error: {e}")
conn.rollback()
return error_message
def reset_login_attempts(ip_address):
"""Reset login attempts for an IP"""
execute_query(
"DELETE FROM login_attempts WHERE ip_address = %s",
(ip_address,)
)
def get_login_attempts(ip_address):
"""Get the number of login attempts for an IP"""
result = execute_query(
"SELECT attempt_count FROM login_attempts WHERE ip_address = %s",
(ip_address,),
fetch_one=True
)
return result[0] if result else 0
def send_security_alert_email(ip_address, username, attempt_count):
"""Send a security alert email"""
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
body = f"""
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
IP-Adresse: {ip_address}
Versuchter Benutzername: {username}
Anzahl Versuche: {attempt_count}
Zeit: {datetime.now(ZoneInfo("Europe/Berlin")).strftime('%Y-%m-%d %H:%M:%S')}
Die IP-Adresse wurde für 24 Stunden gesperrt.
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
"""
# TODO: Email sending implementation when SMTP is configured
logger.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
print(f"E-Mail würde gesendet: {subject}")