nur backups
Dieser Commit ist enthalten in:
@@ -1,124 +0,0 @@
|
||||
import random
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
from flask import request
|
||||
from db import execute_query, get_db_connection, get_db_cursor
|
||||
from config import FAIL_MESSAGES, MAX_LOGIN_ATTEMPTS, BLOCK_DURATION_HOURS, EMAIL_ENABLED
|
||||
from utils.audit import log_audit
|
||||
from utils.network import get_client_ip
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def check_ip_blocked(ip_address):
|
||||
"""Check if an IP address is blocked"""
|
||||
result = execute_query(
|
||||
"""
|
||||
SELECT blocked_until FROM login_attempts
|
||||
WHERE ip_address = %s AND blocked_until IS NOT NULL
|
||||
""",
|
||||
(ip_address,),
|
||||
fetch_one=True
|
||||
)
|
||||
|
||||
if result and result[0]:
|
||||
if result[0] > datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None):
|
||||
return True, result[0]
|
||||
return False, None
|
||||
|
||||
|
||||
def record_failed_attempt(ip_address, username):
|
||||
"""Record a failed login attempt"""
|
||||
# Random error message
|
||||
error_message = random.choice(FAIL_MESSAGES)
|
||||
|
||||
with get_db_connection() as conn:
|
||||
with get_db_cursor(conn) as cur:
|
||||
try:
|
||||
# Check if IP already exists
|
||||
cur.execute("""
|
||||
SELECT attempt_count FROM login_attempts
|
||||
WHERE ip_address = %s
|
||||
""", (ip_address,))
|
||||
|
||||
result = cur.fetchone()
|
||||
|
||||
if result:
|
||||
# Update existing entry
|
||||
new_count = result[0] + 1
|
||||
blocked_until = None
|
||||
|
||||
if new_count >= MAX_LOGIN_ATTEMPTS:
|
||||
blocked_until = datetime.now(ZoneInfo("Europe/Berlin")).replace(tzinfo=None) + timedelta(hours=BLOCK_DURATION_HOURS)
|
||||
# Email notification (if enabled)
|
||||
if EMAIL_ENABLED:
|
||||
send_security_alert_email(ip_address, username, new_count)
|
||||
|
||||
cur.execute("""
|
||||
UPDATE login_attempts
|
||||
SET attempt_count = %s,
|
||||
last_attempt = CURRENT_TIMESTAMP,
|
||||
blocked_until = %s,
|
||||
last_username_tried = %s,
|
||||
last_error_message = %s
|
||||
WHERE ip_address = %s
|
||||
""", (new_count, blocked_until, username, error_message, ip_address))
|
||||
else:
|
||||
# Create new entry
|
||||
cur.execute("""
|
||||
INSERT INTO login_attempts
|
||||
(ip_address, attempt_count, last_username_tried, last_error_message)
|
||||
VALUES (%s, 1, %s, %s)
|
||||
""", (ip_address, username, error_message))
|
||||
|
||||
conn.commit()
|
||||
|
||||
# Audit log
|
||||
log_audit('LOGIN_FAILED', 'user',
|
||||
additional_info=f"IP: {ip_address}, User: {username}, Message: {error_message}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Rate limiting error: {e}")
|
||||
conn.rollback()
|
||||
|
||||
return error_message
|
||||
|
||||
|
||||
def reset_login_attempts(ip_address):
|
||||
"""Reset login attempts for an IP"""
|
||||
execute_query(
|
||||
"DELETE FROM login_attempts WHERE ip_address = %s",
|
||||
(ip_address,)
|
||||
)
|
||||
|
||||
|
||||
def get_login_attempts(ip_address):
|
||||
"""Get the number of login attempts for an IP"""
|
||||
result = execute_query(
|
||||
"SELECT attempt_count FROM login_attempts WHERE ip_address = %s",
|
||||
(ip_address,),
|
||||
fetch_one=True
|
||||
)
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
def send_security_alert_email(ip_address, username, attempt_count):
|
||||
"""Send a security alert email"""
|
||||
subject = f"⚠️ SICHERHEITSWARNUNG: {attempt_count} fehlgeschlagene Login-Versuche"
|
||||
body = f"""
|
||||
WARNUNG: Mehrere fehlgeschlagene Login-Versuche erkannt!
|
||||
|
||||
IP-Adresse: {ip_address}
|
||||
Versuchter Benutzername: {username}
|
||||
Anzahl Versuche: {attempt_count}
|
||||
Zeit: {datetime.now(ZoneInfo("Europe/Berlin")).strftime('%Y-%m-%d %H:%M:%S')}
|
||||
|
||||
Die IP-Adresse wurde für 24 Stunden gesperrt.
|
||||
|
||||
Dies ist eine automatische Nachricht vom v2-Docker Admin Panel.
|
||||
"""
|
||||
|
||||
# TODO: Email sending implementation when SMTP is configured
|
||||
logger.warning(f"Sicherheitswarnung: {attempt_count} fehlgeschlagene Versuche von IP {ip_address}")
|
||||
print(f"E-Mail würde gesendet: {subject}")
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren