"""In-Memory Rate-Limiting fuer Magic-Link-Anfragen.""" import time from collections import defaultdict class RateLimiter: """Rate-Limiter mit zwei Ebenen: pro E-Mail und pro IP.""" def __init__( self, max_per_email: int = 3, email_window_seconds: int = 900, # 15 Minuten max_per_ip: int = 10, ip_window_seconds: int = 3600, # 1 Stunde ): self.max_per_email = max_per_email self.email_window = email_window_seconds self.max_per_ip = max_per_ip self.ip_window = ip_window_seconds self._email_requests: dict[str, list[float]] = defaultdict(list) self._ip_requests: dict[str, list[float]] = defaultdict(list) def _clean(self, entries: list[float], window: int) -> list[float]: cutoff = time.time() - window return [t for t in entries if t > cutoff] def check(self, email: str, ip: str) -> tuple[bool, str]: """Prueft ob die Anfrage erlaubt ist. Returns: (erlaubt, grund) - True wenn OK, False mit Grund wenn blockiert. """ now = time.time() # E-Mail-Limit self._email_requests[email] = self._clean(self._email_requests[email], self.email_window) if len(self._email_requests[email]) >= self.max_per_email: return False, "Zu viele Anfragen fuer diese E-Mail-Adresse. Bitte warten." # IP-Limit self._ip_requests[ip] = self._clean(self._ip_requests[ip], self.ip_window) if len(self._ip_requests[ip]) >= self.max_per_ip: return False, "Zu viele Anfragen von dieser IP-Adresse. Bitte warten." return True, "" def record(self, email: str, ip: str): """Zeichnet eine erfolgreiche Anfrage auf.""" now = time.time() self._email_requests[email].append(now) self._ip_requests[ip].append(now) # Singleton-Instanz magic_link_limiter = RateLimiter()