Files
AccountForger-neuerUpload/utils/process_guard.py
Claude Project Manager a25a26a01a Update changes
2026-01-18 18:15:34 +01:00

407 Zeilen
13 KiB
Python

"""
Process Guard - Schützt vor parallelen Prozessen und Fehler-Spam.
Dieser Guard verhindert:
- Parallele Prozesse (nur ein Vorgang gleichzeitig)
- Zu viele Fehlversuche (Zwangspause nach 3 Fehlern)
- Mehrere Browser-Instanzen gleichzeitig
Clean Code & YAGNI: Nur das Nötigste, keine Über-Engineering.
WICHTIG - Korrekte Verwendung:
- start() → Prozess beginnt
- end(success=True/False) → Prozess endet normal (zählt für Failure-Tracking)
- release() → Prozess wird abgebrochen (zählt NICHT als Failure)
- Alle Methoden sind idempotent (mehrfacher Aufruf ist sicher)
"""
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple
import threading
logger = logging.getLogger(__name__)
class ProcessGuard:
"""
Einfacher Guard für Prozess-Locks und Fehler-Tracking.
Verantwortlichkeiten:
- Process Lock Management (nur ein Prozess gleichzeitig)
- Fehler-Tracking (Zwangspause nach 3 Fehlern)
- Persistierung der Pause-Zeit über Neustarts
Thread-Safety:
- Alle öffentlichen Methoden sind thread-safe durch Lock
"""
# Konfiguration
MAX_FAILURES = 3
PAUSE_DURATION_HOURS = 1
LOCK_TIMEOUT_MINUTES = 10 # Auto-Unlock nach 10 Minuten bei hängenden Prozessen
def __init__(self):
"""Initialisiert den Process Guard."""
# Thread-Safety Lock
self._thread_lock = threading.Lock()
# Process Lock
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp für Auto-Timeout
self._lock_id = None # Eindeutige ID für jeden Lock
# Error Tracking
self._failure_count = 0
self._pause_until = None
# Config File
self._config_file = Path("config/.process_guard")
# Counter für Lock-IDs
self._lock_counter = 0
def can_start(self, process_type: str, platform: str) -> Tuple[bool, Optional[str]]:
"""
Prüft ob ein Prozess gestartet werden darf.
Args:
process_type: Art des Prozesses (z.B. "Account-Erstellung", "Login")
platform: Plattform (z.B. "Instagram", "Facebook")
Returns:
(erlaubt: bool, fehler_nachricht: Optional[str])
- (True, None) wenn erlaubt
- (False, "Fehlermeldung") wenn blockiert
"""
with self._thread_lock:
# 1. Prüfe Zwangspause
if self._is_paused():
remaining_min = self._get_pause_remaining_minutes()
error_msg = (
f"⏸ Zwangspause aktiv\n\n"
f"Nach 3 fehlgeschlagenen Versuchen ist eine Pause erforderlich.\n"
f"Verbleibende Zeit: {remaining_min} Minuten\n\n"
f"Empfehlung:\n"
f"• Proxy-Einstellungen prüfen\n"
f"• Internetverbindung prüfen\n"
f"• Plattform-Status überprüfen"
)
return False, error_msg
# 2. Prüfe Process Lock (mit Auto-Timeout-Check)
if self._is_locked_with_timeout_check():
error_msg = (
f"⚠ Prozess läuft bereits\n\n"
f"Aktuell aktiv: {self._current_process} ({self._current_platform})\n\n"
f"Bitte warten Sie bis der aktuelle Vorgang abgeschlossen ist."
)
return False, error_msg
return True, None
def start(self, process_type: str, platform: str) -> int:
"""
Startet einen Prozess (setzt den Lock).
Args:
process_type: Art des Prozesses
platform: Plattform
Returns:
int: Lock-ID für diesen Prozess (für spätere Freigabe)
"""
with self._thread_lock:
self._lock_counter += 1
self._lock_id = self._lock_counter
self._is_locked = True
self._current_process = process_type
self._current_platform = platform
self._lock_started_at = datetime.now()
logger.info(f"Process locked [ID={self._lock_id}]: {process_type} ({platform})")
return self._lock_id
def end(self, success: bool) -> bool:
"""
Beendet einen Prozess (gibt den Lock frei).
Diese Methode ist IDEMPOTENT - mehrfacher Aufruf ist sicher.
Der Failure-Counter wird nur erhöht wenn der Lock aktiv war.
Args:
success: War der Prozess erfolgreich?
Returns:
bool: True wenn Lock freigegeben wurde, False wenn kein Lock aktiv war
"""
with self._thread_lock:
# IDEMPOTENZ: Prüfe ob Lock überhaupt aktiv ist
if not self._is_locked:
logger.debug("end() aufgerufen, aber kein Lock aktiv - ignoriere")
return False
# Lock-Info für Logging speichern
process_info = f"{self._current_process} ({self._current_platform})"
lock_id = self._lock_id
# Lock freigeben
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
# Fehler-Tracking (nur wenn Lock aktiv war)
if success:
if self._failure_count > 0:
logger.info(f"Fehler-Counter zurückgesetzt nach Erfolg (war: {self._failure_count})")
self._failure_count = 0
self._save_pause_state()
else:
self._failure_count += 1
logger.warning(f"Fehlschlag #{self._failure_count} bei {process_info}")
if self._failure_count >= self.MAX_FAILURES:
self._activate_pause()
logger.info(f"Process unlocked [ID={lock_id}]: {process_info} (success={success})")
return True
def release(self) -> bool:
"""
Gibt den Lock frei OHNE den Failure-Counter zu beeinflussen.
Verwendung:
- User-Abbruch (Cancel-Button)
- Validierungsfehler VOR Prozessstart
- Cleanup bei App-Schließung
Diese Methode ist IDEMPOTENT - mehrfacher Aufruf ist sicher.
Returns:
bool: True wenn Lock freigegeben wurde, False wenn kein Lock aktiv war
"""
with self._thread_lock:
# IDEMPOTENZ: Prüfe ob Lock überhaupt aktiv ist
if not self._is_locked:
logger.debug("release() aufgerufen, aber kein Lock aktiv - ignoriere")
return False
# Lock-Info für Logging speichern
process_info = f"{self._current_process} ({self._current_platform})"
lock_id = self._lock_id
# Lock freigeben (OHNE Failure-Tracking)
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
logger.info(f"Process released [ID={lock_id}]: {process_info} (kein Failure gezählt)")
return True
def reset(self):
"""
Reset beim App-Start.
Lädt Pause-State, resettet aber Lock (da Lock nicht über Neustarts persistiert).
"""
with self._thread_lock:
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
self._load_pause_state()
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
logger.warning(f"Zwangspause aktiv: noch {remaining} Minuten")
logger.info("Process Guard initialisiert")
def is_locked(self) -> bool:
"""
Gibt zurück ob aktuell ein Prozess läuft (mit Auto-Timeout-Check).
Thread-safe Methode.
Returns:
True wenn ein Prozess aktiv ist
"""
with self._thread_lock:
return self._is_locked_with_timeout_check()
def _is_locked_with_timeout_check(self) -> bool:
"""
Interne Methode: Prüft Lock-Status mit Auto-Timeout.
MUSS innerhalb eines _thread_lock aufgerufen werden!
Returns:
True wenn Lock aktiv ist
"""
if not self._is_locked:
return False
# Auto-Timeout-Check: Unlock bei hängenden Prozessen
if self._lock_started_at:
elapsed_minutes = (datetime.now() - self._lock_started_at).total_seconds() / 60
if elapsed_minutes > self.LOCK_TIMEOUT_MINUTES:
logger.warning(
f"⏰ AUTO-TIMEOUT: Lock [ID={self._lock_id}] nach {int(elapsed_minutes)} Minuten freigegeben. "
f"Prozess: {self._current_process} ({self._current_platform})"
)
# Lock automatisch freigeben (OHNE Failure-Zählung - Timeout ist kein User-Fehler)
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
return False
return True
def is_paused(self) -> bool:
"""
Gibt zurück ob Zwangspause aktiv ist.
Thread-safe Methode.
Returns:
True wenn Pause aktiv ist
"""
with self._thread_lock:
return self._is_paused()
def get_status_message(self) -> Optional[str]:
"""
Gibt Status-Nachricht zurück wenn blockiert.
Thread-safe Methode.
Returns:
None wenn nicht blockiert, sonst Nachricht
"""
with self._thread_lock:
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
return f"Zwangspause aktiv (noch {remaining} Min)"
if self._is_locked:
return f"'{self._current_process}' läuft"
return None
def get_failure_count(self) -> int:
"""
Gibt den aktuellen Failure-Counter zurück.
Thread-safe Methode.
Returns:
int: Anzahl der Fehlschläge seit letztem Erfolg
"""
with self._thread_lock:
return self._failure_count
# Private Methoden
def _is_paused(self) -> bool:
"""Prüft ob Zwangspause aktiv ist."""
if not self._pause_until:
return False
# Prüfe ob Pause abgelaufen
if datetime.now() >= self._pause_until:
logger.info("Zwangspause ist abgelaufen")
self._pause_until = None
self._failure_count = 0
self._save_pause_state()
return False
return True
def _activate_pause(self):
"""Aktiviert Zwangspause."""
self._pause_until = datetime.now() + timedelta(hours=self.PAUSE_DURATION_HOURS)
self._save_pause_state()
pause_until_str = self._pause_until.strftime('%H:%M')
logger.error(
f"⏸ ZWANGSPAUSE AKTIVIERT bis {pause_until_str} "
f"nach {self.MAX_FAILURES} Fehlschlägen"
)
def _get_pause_remaining_minutes(self) -> int:
"""Gibt verbleibende Minuten der Pause zurück."""
if not self._pause_until:
return 0
remaining_seconds = (self._pause_until - datetime.now()).total_seconds()
remaining_minutes = max(0, int(remaining_seconds / 60))
return remaining_minutes
def _save_pause_state(self):
"""Speichert Pause-State in Datei (nur wenn nötig)."""
try:
# Erstelle config-Verzeichnis falls nicht vorhanden
self._config_file.parent.mkdir(exist_ok=True)
data = {
'pause_until': self._pause_until.isoformat() if self._pause_until else None,
'failure_count': self._failure_count,
'last_update': datetime.now().isoformat()
}
self._config_file.write_text(json.dumps(data, indent=2))
logger.debug(f"Pause-State gespeichert: {data}")
except Exception as e:
logger.error(f"Fehler beim Speichern des Pause-State: {e}")
def _load_pause_state(self):
"""Lädt Pause-State aus Datei."""
if not self._config_file.exists():
logger.debug("Keine gespeicherte Pause-State gefunden")
return
try:
data = json.loads(self._config_file.read_text())
# Lade Pause-Zeit
if data.get('pause_until'):
self._pause_until = datetime.fromisoformat(data['pause_until'])
self._failure_count = data.get('failure_count', 0)
logger.info(f"Pause-State geladen: Pause bis {self._pause_until}, Failures: {self._failure_count}")
except Exception as e:
logger.error(f"Fehler beim Laden des Pause-State: {e}")
# Globale Instanz mit Thread-Safety
_guard_instance = None
_guard_instance_lock = threading.Lock()
def get_guard() -> ProcessGuard:
"""
Gibt die globale ProcessGuard-Instanz zurück.
Thread-safe Singleton-Pattern.
Returns:
ProcessGuard: Die globale Guard-Instanz
"""
global _guard_instance
if _guard_instance is None:
with _guard_instance_lock:
# Double-check locking
if _guard_instance is None:
_guard_instance = ProcessGuard()
return _guard_instance