""" Process Guard - Schützt vor parallelen Prozessen und Fehler-Spam. Dieser Guard verhindert: - Parallele Prozesse (nur ein Vorgang gleichzeitig) - Zu viele Fehlversuche (Zwangspause nach 3 Fehlern) - Mehrere Browser-Instanzen gleichzeitig Clean Code & YAGNI: Nur das Nötigste, keine Über-Engineering. """ import json import logging from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Tuple logger = logging.getLogger(__name__) class ProcessGuard: """ Einfacher Guard für Prozess-Locks und Fehler-Tracking. Verantwortlichkeiten: - Process Lock Management (nur ein Prozess gleichzeitig) - Fehler-Tracking (Zwangspause nach 3 Fehlern) - Persistierung der Pause-Zeit über Neustarts """ # Konfiguration MAX_FAILURES = 3 PAUSE_DURATION_HOURS = 1 LOCK_TIMEOUT_MINUTES = 10 # Auto-Unlock nach 10 Minuten bei hängenden Prozessen def __init__(self): """Initialisiert den Process Guard.""" # Process Lock self._is_locked = False self._current_process = None self._current_platform = None self._lock_started_at = None # Timestamp für Auto-Timeout # Error Tracking self._failure_count = 0 self._pause_until = None # Config File self._config_file = Path("config/.process_guard") def can_start(self, process_type: str, platform: str) -> Tuple[bool, Optional[str]]: """ Prüft ob ein Prozess gestartet werden darf. Args: process_type: Art des Prozesses (z.B. "Account-Erstellung", "Login") platform: Plattform (z.B. "Instagram", "Facebook") Returns: (erlaubt: bool, fehler_nachricht: Optional[str]) - (True, None) wenn erlaubt - (False, "Fehlermeldung") wenn blockiert """ # 1. Prüfe Zwangspause if self._is_paused(): remaining_min = self._get_pause_remaining_minutes() error_msg = ( f"⏸ Zwangspause aktiv\n\n" f"Nach 3 fehlgeschlagenen Versuchen ist eine Pause erforderlich.\n" f"Verbleibende Zeit: {remaining_min} Minuten\n\n" f"Empfehlung:\n" f"• Proxy-Einstellungen prüfen\n" f"• Internetverbindung prüfen\n" f"• Plattform-Status überprüfen" ) return False, error_msg # 2. Prüfe Process Lock if self._is_locked: error_msg = ( f"⚠ Prozess läuft bereits\n\n" f"Aktuell aktiv: {self._current_process} ({self._current_platform})\n\n" f"Bitte warten Sie bis der aktuelle Vorgang abgeschlossen ist." ) return False, error_msg return True, None def start(self, process_type: str, platform: str): """ Startet einen Prozess (setzt den Lock). Args: process_type: Art des Prozesses platform: Plattform """ self._is_locked = True self._current_process = process_type self._current_platform = platform self._lock_started_at = datetime.now() # Timestamp für Auto-Timeout logger.info(f"Process locked: {process_type} ({platform})") def end(self, success: bool): """ Beendet einen Prozess (gibt den Lock frei). Args: success: War der Prozess erfolgreich? """ # Lock freigeben process_info = f"{self._current_process} ({self._current_platform})" self._is_locked = False self._current_process = None self._current_platform = None self._lock_started_at = None # Timestamp zurücksetzen # Fehler-Tracking if success: if self._failure_count > 0: logger.info(f"Fehler-Counter zurückgesetzt nach Erfolg (war: {self._failure_count})") self._failure_count = 0 self._save_pause_state() else: self._failure_count += 1 logger.warning(f"Fehlschlag #{self._failure_count} bei {process_info}") if self._failure_count >= self.MAX_FAILURES: self._activate_pause() logger.info(f"Process unlocked: {process_info} (success={success})") def reset(self): """ Reset beim App-Start. Lädt Pause-State, resettet aber Lock (da Lock nicht über Neustarts persistiert). """ self._is_locked = False self._current_process = None self._current_platform = None self._lock_started_at = None # Timestamp zurücksetzen self._load_pause_state() if self._is_paused(): remaining = self._get_pause_remaining_minutes() logger.warning(f"Zwangspause aktiv: noch {remaining} Minuten") logger.info("Process Guard initialisiert") def is_locked(self) -> bool: """ Gibt zurück ob aktuell ein Prozess läuft (mit Auto-Timeout-Check). Returns: True wenn ein Prozess aktiv ist """ if not self._is_locked: return False # Auto-Timeout-Check: Unlock bei hängenden Prozessen if self._lock_started_at: elapsed_minutes = (datetime.now() - self._lock_started_at).total_seconds() / 60 if elapsed_minutes > self.LOCK_TIMEOUT_MINUTES: logger.warning( f"⏰ AUTO-TIMEOUT: Lock nach {int(elapsed_minutes)} Minuten freigegeben. " f"Prozess: {self._current_process} ({self._current_platform})" ) # Lock automatisch freigeben self._is_locked = False self._current_process = None self._current_platform = None self._lock_started_at = None return False return True def is_paused(self) -> bool: """ Gibt zurück ob Zwangspause aktiv ist. Returns: True wenn Pause aktiv ist """ return self._is_paused() def get_status_message(self) -> Optional[str]: """ Gibt Status-Nachricht zurück wenn blockiert. Returns: None wenn nicht blockiert, sonst Nachricht """ if self._is_paused(): remaining = self._get_pause_remaining_minutes() return f"Zwangspause aktiv (noch {remaining} Min)" if self._is_locked: return f"'{self._current_process}' läuft" return None # Private Methoden def _is_paused(self) -> bool: """Prüft ob Zwangspause aktiv ist.""" if not self._pause_until: return False # Prüfe ob Pause abgelaufen if datetime.now() >= self._pause_until: logger.info("Zwangspause ist abgelaufen") self._pause_until = None self._failure_count = 0 self._save_pause_state() return False return True def _activate_pause(self): """Aktiviert Zwangspause.""" self._pause_until = datetime.now() + timedelta(hours=self.PAUSE_DURATION_HOURS) self._save_pause_state() pause_until_str = self._pause_until.strftime('%H:%M') logger.error( f"⏸ ZWANGSPAUSE AKTIVIERT bis {pause_until_str} " f"nach {self.MAX_FAILURES} Fehlschlägen" ) def _get_pause_remaining_minutes(self) -> int: """Gibt verbleibende Minuten der Pause zurück.""" if not self._pause_until: return 0 remaining_seconds = (self._pause_until - datetime.now()).total_seconds() remaining_minutes = max(0, int(remaining_seconds / 60)) return remaining_minutes def _save_pause_state(self): """Speichert Pause-State in Datei (nur wenn nötig).""" try: # Erstelle config-Verzeichnis falls nicht vorhanden self._config_file.parent.mkdir(exist_ok=True) data = { 'pause_until': self._pause_until.isoformat() if self._pause_until else None, 'failure_count': self._failure_count, 'last_update': datetime.now().isoformat() } self._config_file.write_text(json.dumps(data, indent=2)) logger.debug(f"Pause-State gespeichert: {data}") except Exception as e: logger.error(f"Fehler beim Speichern des Pause-State: {e}") def _load_pause_state(self): """Lädt Pause-State aus Datei.""" if not self._config_file.exists(): logger.debug("Keine gespeicherte Pause-State gefunden") return try: data = json.loads(self._config_file.read_text()) # Lade Pause-Zeit if data.get('pause_until'): self._pause_until = datetime.fromisoformat(data['pause_until']) self._failure_count = data.get('failure_count', 0) logger.info(f"Pause-State geladen: Pause bis {self._pause_until}, Failures: {self._failure_count}") except Exception as e: logger.error(f"Fehler beim Laden des Pause-State: {e}") # Globale Instanz (YAGNI: Kein komplexes Singleton-Pattern nötig) _guard_instance = None def get_guard() -> ProcessGuard: """ Gibt die globale ProcessGuard-Instanz zurück. Returns: ProcessGuard: Die globale Guard-Instanz """ global _guard_instance if _guard_instance is None: _guard_instance = ProcessGuard() return _guard_instance