124 Zeilen
4.5 KiB
Python
124 Zeilen
4.5 KiB
Python
import random
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
from flask import request
|
|
from db import execute_query, get_db_connection, get_db_cursor
|
|
from config import FAIL_MESSAGES, MAX_LOGIN_ATTEMPTS, BLOCK_DURATION_HOURS, EMAIL_ENABLED
|
|
from utils.audit import log_audit
|
|
from utils.network import get_client_ip
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def check_ip_blocked(ip_address):
|
|
"""Check if an IP address is blocked"""
|
|
result = execute_query(
|
|
"""
|
|
SELECT blocked_until FROM login_attempts
|
|
WHERE ip_address = %s AND blocked_until IS NOT NULL
|
|
""",
|
|
(ip_address,),
|
|
fetch_one=True
|
|
)
|
|
|
|
if result and result[0]:
|
|
if result[0] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None):
|
|
return True, result[0]
|
|
return False, None
|
|
|
|
|
|
def record_failed_attempt(ip_address, username):
|
|
"""Record a failed login attempt"""
|
|
# Random error message
|
|
error_message = random.choice(FAIL_MESSAGES)
|
|
|
|
with get_db_connection() as conn:
|
|
with get_db_cursor(conn) as cur:
|
|
try:
|
|
# Check if IP already exists
|
|
cur.execute("""
|
|
SELECT attempt_count FROM login_attempts
|
|
WHERE ip_address = %s
|
|
""", (ip_address,))
|
|
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
# Update existing entry
|
|
new_count = result[0] + 1
|
|
blocked_until = None
|
|
|
|
if new_count >= MAX_LOGIN_ATTEMPTS:
|
|
blocked_until = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) + timedelta(hours=BLOCK_DURATION_HOURS)
|
|
# Email notification (if enabled)
|
|
if EMAIL_ENABLED:
|
|
send_security_alert_email(ip_address, username, new_count)
|
|
|
|
cur.execute("""
|
|
UPDATE login_attempts
|
|
SET attempt_count = %s,
|
|
last_attempt = CURRENT_TIMESTAMP,
|
|
blocked_until = %s,
|
|
last_username_tried = %s,
|
|
last_error_message = %s
|
|
WHERE ip_address = %s
|
|
""", (new_count, blocked_until, username, error_message, ip_address))
|
|
else:
|
|
# Create new entry
|
|
cur.execute("""
|
|
INSERT INTO login_attempts
|
|
(ip_address, attempt_count, last_username_tried, last_error_message)
|
|
VALUES (%s, 1, %s, %s)
|
|
""", (ip_address, username, error_message))
|
|
|
|
conn.commit()
|
|
|
|
# Audit log
|
|
log_audit('LOGIN_FAILED', 'user',
|
|
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Rate limiting error: {e}")
|
|
conn.rollback()
|
|
|
|
return error_message
|
|
|
|
|
|
def reset_login_attempts(ip_address):
|
|
"""Reset login attempts for an IP"""
|
|
execute_query(
|
|
"DELETE FROM login_attempts WHERE ip_address = %s",
|
|
(ip_address,)
|
|
)
|
|
|
|
|
|
def get_login_attempts(ip_address):
|
|
"""Get the number of login attempts for an IP"""
|
|
result = execute_query(
|
|
"SELECT attempt_count FROM login_attempts WHERE ip_address = %s",
|
|
(ip_address,),
|
|
fetch_one=True
|
|
)
|
|
return result[0] if result else 0
|
|
|
|
|
|
def send_security_alert_email(ip_address, username, attempt_count):
|
|
"""Send a security alert email"""
|
|
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
|
|
body = f"""
|
|
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
|
|
|
|
IP-Adresse: {ip_address}
|
|
Versuchter Benutzername: {username}
|
|
Anzahl Versuche: {attempt_count}
|
|
Zeit: {datetime.now(ZoneInfo("Europe/Berlin")).strftime('%Y-%m-%d %H:%M:%S')}
|
|
|
|
Die IP-Adresse wurde für 24 Stunden gesperrt.
|
|
|
|
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
|
|
"""
|
|
|
|
# TODO: Email sending implementation when SMTP is configured
|
|
logger.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
|
|
print(f"E-Mail würde gesendet: {subject}") |