Initial commit: AegisSight-Monitor (OSINT-Monitoringsystem)
Dieser Commit ist enthalten in:
102
src/email_utils/rate_limiter.py
Normale Datei
102
src/email_utils/rate_limiter.py
Normale Datei
@@ -0,0 +1,102 @@
|
||||
"""In-Memory Rate-Limiting fuer Magic-Link-Anfragen und Code-Verifizierung."""
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""Rate-Limiter mit zwei Ebenen: pro E-Mail und pro IP."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_per_email: int = 3,
|
||||
email_window_seconds: int = 900, # 15 Minuten
|
||||
max_per_ip: int = 10,
|
||||
ip_window_seconds: int = 3600, # 1 Stunde
|
||||
):
|
||||
self.max_per_email = max_per_email
|
||||
self.email_window = email_window_seconds
|
||||
self.max_per_ip = max_per_ip
|
||||
self.ip_window = ip_window_seconds
|
||||
self._email_requests: dict[str, list[float]] = defaultdict(list)
|
||||
self._ip_requests: dict[str, list[float]] = defaultdict(list)
|
||||
|
||||
def _clean(self, entries: list[float], window: int) -> list[float]:
|
||||
cutoff = time.time() - window
|
||||
return [t for t in entries if t > cutoff]
|
||||
|
||||
def check(self, email: str, ip: str) -> tuple[bool, str]:
|
||||
"""Prueft ob die Anfrage erlaubt ist.
|
||||
|
||||
Returns:
|
||||
(erlaubt, grund) - True wenn OK, False mit Grund wenn blockiert.
|
||||
"""
|
||||
now = time.time()
|
||||
|
||||
# E-Mail-Limit
|
||||
self._email_requests[email] = self._clean(self._email_requests[email], self.email_window)
|
||||
if len(self._email_requests[email]) >= self.max_per_email:
|
||||
return False, "Zu viele Anfragen fuer diese E-Mail-Adresse. Bitte warten."
|
||||
|
||||
# IP-Limit
|
||||
self._ip_requests[ip] = self._clean(self._ip_requests[ip], self.ip_window)
|
||||
if len(self._ip_requests[ip]) >= self.max_per_ip:
|
||||
return False, "Zu viele Anfragen von dieser IP-Adresse. Bitte warten."
|
||||
|
||||
return True, ""
|
||||
|
||||
def record(self, email: str, ip: str):
|
||||
"""Zeichnet eine erfolgreiche Anfrage auf."""
|
||||
now = time.time()
|
||||
self._email_requests[email].append(now)
|
||||
self._ip_requests[ip].append(now)
|
||||
|
||||
|
||||
class VerifyCodeLimiter:
|
||||
"""Rate-Limiter fuer Code-Verifizierung (Brute-Force-Schutz).
|
||||
|
||||
Zaehlt Fehlversuche pro E-Mail und pro IP.
|
||||
Nach max_attempts wird gesperrt bis das Zeitfenster ablaeuft.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_attempts_per_email: int = 5,
|
||||
max_attempts_per_ip: int = 15,
|
||||
window_seconds: int = 600, # 10 Minuten (= Magic-Link-Ablaufzeit)
|
||||
):
|
||||
self.max_per_email = max_attempts_per_email
|
||||
self.max_per_ip = max_attempts_per_ip
|
||||
self.window = window_seconds
|
||||
self._email_failures: dict[str, list[float]] = defaultdict(list)
|
||||
self._ip_failures: dict[str, list[float]] = defaultdict(list)
|
||||
|
||||
def _clean(self, entries: list[float]) -> list[float]:
|
||||
cutoff = time.time() - self.window
|
||||
return [t for t in entries if t > cutoff]
|
||||
|
||||
def check(self, email: str, ip: str) -> tuple[bool, str]:
|
||||
"""Prueft ob ein Verifizierungsversuch erlaubt ist."""
|
||||
self._email_failures[email] = self._clean(self._email_failures[email])
|
||||
if len(self._email_failures[email]) >= self.max_per_email:
|
||||
return False, "Zu viele Fehlversuche. Bitte neuen Code anfordern."
|
||||
|
||||
self._ip_failures[ip] = self._clean(self._ip_failures[ip])
|
||||
if len(self._ip_failures[ip]) >= self.max_per_ip:
|
||||
return False, "Zu viele Fehlversuche von dieser IP-Adresse."
|
||||
|
||||
return True, ""
|
||||
|
||||
def record_failure(self, email: str, ip: str):
|
||||
"""Zeichnet einen fehlgeschlagenen Versuch auf."""
|
||||
now = time.time()
|
||||
self._email_failures[email].append(now)
|
||||
self._ip_failures[ip].append(now)
|
||||
|
||||
def clear(self, email: str):
|
||||
"""Loescht Zaehler nach erfolgreichem Login."""
|
||||
self._email_failures.pop(email, None)
|
||||
|
||||
|
||||
# Singleton-Instanzen
|
||||
magic_link_limiter = RateLimiter()
|
||||
verify_code_limiter = VerifyCodeLimiter()
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren