Files
AccountForger-neuerUpload/utils/process_guard.py
2025-11-27 21:17:32 +01:00

291 Zeilen
9.4 KiB
Python

"""
Process Guard - Schützt vor parallelen Prozessen und Fehler-Spam.
Dieser Guard verhindert:
- Parallele Prozesse (nur ein Vorgang gleichzeitig)
- Zu viele Fehlversuche (Zwangspause nach 3 Fehlern)
- Mehrere Browser-Instanzen gleichzeitig
Clean Code & YAGNI: Nur das Nötigste, keine Über-Engineering.
"""
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class ProcessGuard:
"""
Einfacher Guard für Prozess-Locks und Fehler-Tracking.
Verantwortlichkeiten:
- Process Lock Management (nur ein Prozess gleichzeitig)
- Fehler-Tracking (Zwangspause nach 3 Fehlern)
- Persistierung der Pause-Zeit über Neustarts
"""
# Konfiguration
MAX_FAILURES = 3
PAUSE_DURATION_HOURS = 1
LOCK_TIMEOUT_MINUTES = 10 # Auto-Unlock nach 10 Minuten bei hängenden Prozessen
def __init__(self):
"""Initialisiert den Process Guard."""
# Process Lock
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp für Auto-Timeout
# Error Tracking
self._failure_count = 0
self._pause_until = None
# Config File
self._config_file = Path("config/.process_guard")
def can_start(self, process_type: str, platform: str) -> Tuple[bool, Optional[str]]:
"""
Prüft ob ein Prozess gestartet werden darf.
Args:
process_type: Art des Prozesses (z.B. "Account-Erstellung", "Login")
platform: Plattform (z.B. "Instagram", "Facebook")
Returns:
(erlaubt: bool, fehler_nachricht: Optional[str])
- (True, None) wenn erlaubt
- (False, "Fehlermeldung") wenn blockiert
"""
# 1. Prüfe Zwangspause
if self._is_paused():
remaining_min = self._get_pause_remaining_minutes()
error_msg = (
f"⏸ Zwangspause aktiv\n\n"
f"Nach 3 fehlgeschlagenen Versuchen ist eine Pause erforderlich.\n"
f"Verbleibende Zeit: {remaining_min} Minuten\n\n"
f"Empfehlung:\n"
f"• Proxy-Einstellungen prüfen\n"
f"• Internetverbindung prüfen\n"
f"• Plattform-Status überprüfen"
)
return False, error_msg
# 2. Prüfe Process Lock
if self._is_locked:
error_msg = (
f"⚠ Prozess läuft bereits\n\n"
f"Aktuell aktiv: {self._current_process} ({self._current_platform})\n\n"
f"Bitte warten Sie bis der aktuelle Vorgang abgeschlossen ist."
)
return False, error_msg
return True, None
def start(self, process_type: str, platform: str):
"""
Startet einen Prozess (setzt den Lock).
Args:
process_type: Art des Prozesses
platform: Plattform
"""
self._is_locked = True
self._current_process = process_type
self._current_platform = platform
self._lock_started_at = datetime.now() # Timestamp für Auto-Timeout
logger.info(f"Process locked: {process_type} ({platform})")
def end(self, success: bool):
"""
Beendet einen Prozess (gibt den Lock frei).
Args:
success: War der Prozess erfolgreich?
"""
# Lock freigeben
process_info = f"{self._current_process} ({self._current_platform})"
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp zurücksetzen
# Fehler-Tracking
if success:
if self._failure_count > 0:
logger.info(f"Fehler-Counter zurückgesetzt nach Erfolg (war: {self._failure_count})")
self._failure_count = 0
self._save_pause_state()
else:
self._failure_count += 1
logger.warning(f"Fehlschlag #{self._failure_count} bei {process_info}")
if self._failure_count >= self.MAX_FAILURES:
self._activate_pause()
logger.info(f"Process unlocked: {process_info} (success={success})")
def reset(self):
"""
Reset beim App-Start.
Lädt Pause-State, resettet aber Lock (da Lock nicht über Neustarts persistiert).
"""
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp zurücksetzen
self._load_pause_state()
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
logger.warning(f"Zwangspause aktiv: noch {remaining} Minuten")
logger.info("Process Guard initialisiert")
def is_locked(self) -> bool:
"""
Gibt zurück ob aktuell ein Prozess läuft (mit Auto-Timeout-Check).
Returns:
True wenn ein Prozess aktiv ist
"""
if not self._is_locked:
return False
# Auto-Timeout-Check: Unlock bei hängenden Prozessen
if self._lock_started_at:
elapsed_minutes = (datetime.now() - self._lock_started_at).total_seconds() / 60
if elapsed_minutes > self.LOCK_TIMEOUT_MINUTES:
logger.warning(
f"⏰ AUTO-TIMEOUT: Lock nach {int(elapsed_minutes)} Minuten freigegeben. "
f"Prozess: {self._current_process} ({self._current_platform})"
)
# Lock automatisch freigeben
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
return False
return True
def is_paused(self) -> bool:
"""
Gibt zurück ob Zwangspause aktiv ist.
Returns:
True wenn Pause aktiv ist
"""
return self._is_paused()
def get_status_message(self) -> Optional[str]:
"""
Gibt Status-Nachricht zurück wenn blockiert.
Returns:
None wenn nicht blockiert, sonst Nachricht
"""
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
return f"Zwangspause aktiv (noch {remaining} Min)"
if self._is_locked:
return f"'{self._current_process}' läuft"
return None
# Private Methoden
def _is_paused(self) -> bool:
"""Prüft ob Zwangspause aktiv ist."""
if not self._pause_until:
return False
# Prüfe ob Pause abgelaufen
if datetime.now() >= self._pause_until:
logger.info("Zwangspause ist abgelaufen")
self._pause_until = None
self._failure_count = 0
self._save_pause_state()
return False
return True
def _activate_pause(self):
"""Aktiviert Zwangspause."""
self._pause_until = datetime.now() + timedelta(hours=self.PAUSE_DURATION_HOURS)
self._save_pause_state()
pause_until_str = self._pause_until.strftime('%H:%M')
logger.error(
f"⏸ ZWANGSPAUSE AKTIVIERT bis {pause_until_str} "
f"nach {self.MAX_FAILURES} Fehlschlägen"
)
def _get_pause_remaining_minutes(self) -> int:
"""Gibt verbleibende Minuten der Pause zurück."""
if not self._pause_until:
return 0
remaining_seconds = (self._pause_until - datetime.now()).total_seconds()
remaining_minutes = max(0, int(remaining_seconds / 60))
return remaining_minutes
def _save_pause_state(self):
"""Speichert Pause-State in Datei (nur wenn nötig)."""
try:
# Erstelle config-Verzeichnis falls nicht vorhanden
self._config_file.parent.mkdir(exist_ok=True)
data = {
'pause_until': self._pause_until.isoformat() if self._pause_until else None,
'failure_count': self._failure_count,
'last_update': datetime.now().isoformat()
}
self._config_file.write_text(json.dumps(data, indent=2))
logger.debug(f"Pause-State gespeichert: {data}")
except Exception as e:
logger.error(f"Fehler beim Speichern des Pause-State: {e}")
def _load_pause_state(self):
"""Lädt Pause-State aus Datei."""
if not self._config_file.exists():
logger.debug("Keine gespeicherte Pause-State gefunden")
return
try:
data = json.loads(self._config_file.read_text())
# Lade Pause-Zeit
if data.get('pause_until'):
self._pause_until = datetime.fromisoformat(data['pause_until'])
self._failure_count = data.get('failure_count', 0)
logger.info(f"Pause-State geladen: Pause bis {self._pause_until}, Failures: {self._failure_count}")
except Exception as e:
logger.error(f"Fehler beim Laden des Pause-State: {e}")
# Globale Instanz (YAGNI: Kein komplexes Singleton-Pattern nötig)
_guard_instance = None
def get_guard() -> ProcessGuard:
"""
Gibt die globale ProcessGuard-Instanz zurück.
Returns:
ProcessGuard: Die globale Guard-Instanz
"""
global _guard_instance
if _guard_instance is None:
_guard_instance = ProcessGuard()
return _guard_instance