Dieser Commit ist enthalten in:
Claude Project Manager
2026-01-18 18:15:34 +01:00
Ursprung 4e82d5ef8f
Commit a25a26a01a
47 geänderte Dateien mit 4756 neuen und 2956 gelöschten Zeilen

Datei anzeigen

@ -24,18 +24,21 @@ class HumanBehavior:
self.speed_factor = max(0.1, min(10.0, speed_factor)) # Begrenzung auf 0.1-10.0
self.randomness = max(0.0, min(1.0, randomness)) # Begrenzung auf 0.0-1.0
# Typische Verzögerungen (in Sekunden)
# Typische Verzögerungen (in Sekunden) - ERHÖHT für Anti-Detection
self.delays = {
"typing_per_char": 0.05, # Verzögerung pro Zeichen beim Tippen
"typing_per_char": 0.08, # Verzögerung pro Zeichen beim Tippen (erhöht)
"mouse_movement": 0.5, # Verzögerung für Mausbewegung
"click": 0.1, # Verzögerung für Mausklick
"page_load": 2.0, # Verzögerung für das Laden einer Seite
"form_fill": 1.0, # Verzögerung zwischen Formularfeldern
"decision": 1.5, # Verzögerung für Entscheidungen
"scroll": 0.3, # Verzögerung für Scrollbewegungen
"verification": 5.0, # Verzögerung für Verifizierungsprozesse
"image_upload": 3.0, # Verzögerung für Bildupload
"navigation": 1.0 # Verzögerung für Navigation
"click": 0.15, # Verzögerung für Mausklick (erhöht)
"page_load": 8.0, # Verzögerung für das Laden einer Seite (STARK erhöht: 5-15s)
"form_fill": 4.0, # Verzögerung zwischen Formularfeldern (STARK erhöht: 2-8s)
"decision": 3.0, # Verzögerung für Entscheidungen (erhöht)
"scroll": 0.5, # Verzögerung für Scrollbewegungen (erhöht)
"verification": 30.0, # Verzögerung für Verifizierungsprozesse (STARK erhöht: 15-45s)
"image_upload": 5.0, # Verzögerung für Bildupload (erhöht)
"navigation": 2.0, # Verzögerung für Navigation (erhöht)
"cookie_reading": 5.0, # NEU: Cookie-Banner lesen (3-8s)
"field_transition": 5.0, # NEU: Zwischen Formularfeldern (2-8s)
"thinking": 2.0 # NEU: Kurze Denkpause
}
def sleep(self, delay_type: str, multiplier: float = 1.0) -> None:
@ -76,7 +79,7 @@ class HumanBehavior:
def _random_delay(self, min_seconds: float = 1.0, max_seconds: float = 3.0) -> None:
"""
Führt eine zufällige Wartezeit aus, um menschliches Verhalten zu simulieren.
Args:
min_seconds: Minimale Wartezeit in Sekunden
max_seconds: Maximale Wartezeit in Sekunden
@ -84,42 +87,96 @@ class HumanBehavior:
delay = random.uniform(min_seconds, max_seconds)
logger.debug(f"Zufällige Wartezeit: {delay:.2f} Sekunden")
time.sleep(delay)
def type_text(self, text: str, on_char_typed: Optional[Callable[[str], None]] = None,
error_probability: float = 0.05, correction_probability: float = 0.9) -> str:
def anti_detection_delay(self, action_type: str = "form_fill") -> None:
"""
Simuliert menschliches Tippen mit möglichen Tippfehlern und Korrekturen.
Erzeugt eine realistische Anti-Detection-Verzögerung.
Diese Methode verwendet längere, zufälligere Wartezeiten um Bot-Erkennung
zu vermeiden. Die Verzögerungen sind bewusst lang um menschliches
Verhalten realistischer zu simulieren.
Args:
action_type: Art der Aktion:
- "form_fill": Zwischen Formularfeldern (2-8s)
- "page_load": Auf neuen Seiten (5-15s)
- "verification": Vor Code-Eingabe (15-45s)
- "cookie_reading": Cookie-Banner lesen (3-8s)
- "thinking": Kurze Denkpause (1-3s)
"""
delay_ranges = {
"form_fill": (2.0, 8.0), # Zwischen Formularfeldern
"page_load": (5.0, 15.0), # Auf neuen Seiten
"verification": (15.0, 45.0), # Vor Code-Eingabe
"cookie_reading": (3.0, 8.0), # Cookie-Banner lesen
"thinking": (1.0, 3.0), # Kurze Denkpause
"field_focus": (0.5, 1.5), # Vor Feldinteraktion
}
min_delay, max_delay = delay_ranges.get(action_type, (2.0, 5.0))
# Basis-Verzögerung
delay = random.uniform(min_delay, max_delay)
# Zusätzliche Variation basierend auf randomness
if self.randomness > 0:
variation = 1.0 + (random.random() * 2 - 1) * self.randomness * 0.3
delay *= variation
# Speed-Factor anwenden (aber nicht zu stark reduzieren)
delay = delay / max(self.speed_factor, 0.5)
# Gelegentlich extra lange Pause (simuliert Ablenkung/Nachdenken)
if random.random() < 0.1:
extra_delay = random.uniform(2.0, 5.0)
delay += extra_delay
logger.debug(f"Extra Denkpause: +{extra_delay:.2f}s")
logger.debug(f"Anti-Detection Delay ({action_type}): {delay:.2f}s")
time.sleep(max(0.5, delay)) # Minimum 0.5s
def type_text(self, text: str, on_char_typed: Optional[Callable[[str], None]] = None,
error_probability: float = 0.15, correction_probability: float = 0.95) -> str:
"""
Simuliert menschliches Tippen mit realistischen Tippfehlern und Korrekturen.
Die Fehlerrate wurde auf 15% erhöht (vorher 5%) um realistischeres
menschliches Verhalten zu simulieren. Echte Menschen machen häufig
Tippfehler und korrigieren diese sofort.
Args:
text: Zu tippender Text
on_char_typed: Optionale Funktion, die für jedes getippte Zeichen aufgerufen wird
error_probability: Wahrscheinlichkeit für Tippfehler (0-1)
error_probability: Wahrscheinlichkeit für Tippfehler (0-1), Standard: 0.15 (15%)
correction_probability: Wahrscheinlichkeit, Tippfehler zu korrigieren (0-1)
Returns:
Der tatsächlich getippte Text (mit oder ohne Fehler)
"""
# Anpassen der Fehlerwahrscheinlichkeit basierend auf Zufälligkeit
adjusted_error_prob = error_probability * self.randomness
# Fehlerrate zwischen 10-20% halten für Realismus
base_error_prob = max(0.10, min(0.20, error_probability))
# Anpassen basierend auf Zufälligkeit (aber nicht unter 10%)
adjusted_error_prob = max(0.10, base_error_prob * (0.8 + self.randomness * 0.4))
result = ""
i = 0
while i < len(text):
char = text[i]
# Potentieller Tippfehler
if random.random() < adjusted_error_prob:
# Auswahl eines Fehlertyps:
# - Falsches Zeichen (Tastatur-Nachbarn)
# - Ausgelassenes Zeichen
# - Doppeltes Zeichen
# Auswahl eines Fehlertyps:
# - Falsches Zeichen (Tastatur-Nachbarn) - 50%
# - Transposition (Buchstaben vertauschen) - 15%
# - Ausgelassenes Zeichen - 15%
# - Doppeltes Zeichen - 20%
error_type = random.choices(
["wrong", "skip", "double"],
weights=[0.6, 0.2, 0.2],
["wrong", "transposition", "skip", "double"],
weights=[0.50, 0.15, 0.15, 0.20],
k=1
)[0]
if error_type == "wrong":
# Falsches Zeichen tippen (Tastatur-Nachbarn)
keyboard_neighbors = self.get_keyboard_neighbors(char)
@ -129,15 +186,18 @@ class HumanBehavior:
if on_char_typed:
on_char_typed(wrong_char)
self.sleep("typing_per_char")
# Pause bevor Fehler "bemerkt" wird
time.sleep(random.uniform(0.1, 0.4))
# Entscheiden, ob der Fehler korrigiert wird
if random.random() < correction_probability:
# Löschen des falschen Zeichens
result = result[:-1]
if on_char_typed:
on_char_typed("\b") # Backspace
self.sleep("typing_per_char", 1.5) # Längere Pause für Korrektur
self.sleep("typing_per_char", 1.8)
# Korrektes Zeichen tippen
result += char
if on_char_typed:
@ -149,35 +209,87 @@ class HumanBehavior:
if on_char_typed:
on_char_typed(char)
self.sleep("typing_per_char")
elif error_type == "transposition" and i < len(text) - 1:
# Buchstaben vertauschen (häufiger Tippfehler)
next_char = text[i + 1]
result += next_char + char # Vertauscht
if on_char_typed:
on_char_typed(next_char)
self.sleep("typing_per_char")
on_char_typed(char)
self.sleep("typing_per_char")
# Korrektur der Transposition
if random.random() < correction_probability:
time.sleep(random.uniform(0.2, 0.5)) # Bemerken des Fehlers
# Beide Zeichen löschen
result = result[:-2]
if on_char_typed:
on_char_typed("\b")
self.sleep("typing_per_char", 1.3)
on_char_typed("\b")
self.sleep("typing_per_char", 1.5)
# Korrekte Reihenfolge tippen
result += char + next_char
if on_char_typed:
on_char_typed(char)
self.sleep("typing_per_char")
on_char_typed(next_char)
self.sleep("typing_per_char")
i += 1 # Nächstes Zeichen überspringen (bereits verarbeitet)
elif error_type == "skip":
# Zeichen auslassen (nichts tun)
# In 50% der Fälle später bemerken und nachtippen
if random.random() < 0.5 and i < len(text) - 1:
# Nächstes Zeichen normal tippen
pass # Wird übersprungen
pass
elif error_type == "double":
# Zeichen doppelt tippen
result += char + char
if on_char_typed:
on_char_typed(char)
self.sleep("typing_per_char", 0.3) # Sehr kurz zwischen Doppel
on_char_typed(char)
self.sleep("typing_per_char")
# Pause bevor Fehler bemerkt wird
time.sleep(random.uniform(0.15, 0.35))
# Entscheiden, ob der Fehler korrigiert wird
if random.random() < correction_probability:
# Löschen des doppelten Zeichens
result = result[:-1]
if on_char_typed:
on_char_typed("\b") # Backspace
self.sleep("typing_per_char", 1.2)
self.sleep("typing_per_char", 1.3)
else:
# Normales Tippen ohne Fehler
result += char
if on_char_typed:
on_char_typed(char)
self.sleep("typing_per_char")
# Variable Tippgeschwindigkeit basierend auf Zeichen
if char in ' .,!?;:':
# Längere Pause nach Satzzeichen/Leerzeichen
self.sleep("typing_per_char", random.uniform(1.2, 1.8))
elif char.isupper():
# Leicht länger für Großbuchstaben (Shift-Taste)
self.sleep("typing_per_char", random.uniform(1.0, 1.3))
else:
self.sleep("typing_per_char", random.uniform(0.8, 1.2))
i += 1
# Gelegentliche längere Pause (Nachdenken)
if random.random() < 0.05:
time.sleep(random.uniform(0.3, 0.8))
return result
def get_keyboard_neighbors(self, char: str) -> List[str]:

Datei anzeigen

@ -1,195 +0,0 @@
"""
Modal System Test - Test-Funktionen für das Modal-System
"""
import logging
import time
from typing import Optional
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget
from PyQt5.QtCore import QTimer
from utils.modal_manager import ModalManager
from views.widgets.progress_modal import ProgressModal
from views.widgets.account_creation_modal import AccountCreationModal
from views.widgets.login_process_modal import LoginProcessModal
logger = logging.getLogger("modal_test")
class ModalTestWindow(QMainWindow):
"""Test-Fenster für Modal-System Tests"""
def __init__(self):
super().__init__()
self.setWindowTitle("AccountForger Modal System Test")
self.setGeometry(100, 100, 600, 400)
# Modal Manager
self.modal_manager = ModalManager(parent_window=self)
# Test UI
self.setup_ui()
def setup_ui(self):
"""Erstellt Test-UI"""
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
# Test Buttons
btn_account_creation = QPushButton("Test Account Creation Modal")
btn_account_creation.clicked.connect(self.test_account_creation_modal)
layout.addWidget(btn_account_creation)
btn_login_process = QPushButton("Test Login Process Modal")
btn_login_process.clicked.connect(self.test_login_process_modal)
layout.addWidget(btn_login_process)
btn_generic_modal = QPushButton("Test Generic Progress Modal")
btn_generic_modal.clicked.connect(self.test_generic_modal)
layout.addWidget(btn_generic_modal)
btn_error_modal = QPushButton("Test Error Modal")
btn_error_modal.clicked.connect(self.test_error_modal)
layout.addWidget(btn_error_modal)
btn_modal_manager = QPushButton("Test Modal Manager")
btn_modal_manager.clicked.connect(self.test_modal_manager)
layout.addWidget(btn_modal_manager)
def test_account_creation_modal(self):
"""Testet Account Creation Modal"""
logger.info("Testing Account Creation Modal")
modal = AccountCreationModal(parent=self, platform="Instagram")
# Steps setzen
steps = [
"Browser wird vorbereitet",
"Formular wird ausgefüllt",
"Account wird erstellt",
"E-Mail wird verifiziert"
]
modal.set_steps(steps)
# Modal anzeigen
modal.show_platform_specific_process()
# Simuliere Steps
QTimer.singleShot(1000, lambda: modal.start_step("Browser wird vorbereitet"))
QTimer.singleShot(2000, lambda: modal.complete_step("Browser wird vorbereitet", "Formular wird ausgefüllt"))
QTimer.singleShot(3000, lambda: modal.start_step("Formular wird ausgefüllt"))
QTimer.singleShot(4000, lambda: modal.complete_step("Formular wird ausgefüllt", "Account wird erstellt"))
QTimer.singleShot(5000, lambda: modal.start_step("Account wird erstellt"))
QTimer.singleShot(6000, lambda: modal.complete_step("Account wird erstellt", "E-Mail wird verifiziert"))
QTimer.singleShot(7000, lambda: modal.start_step("E-Mail wird verifiziert"))
QTimer.singleShot(8000, lambda: modal.show_success({"username": "test_user", "platform": "Instagram"}))
def test_login_process_modal(self):
"""Testet Login Process Modal"""
logger.info("Testing Login Process Modal")
modal = LoginProcessModal(parent=self, platform="TikTok")
# Session Login testen
modal.show_session_login("test_account", "TikTok")
# Simuliere Login-Prozess
QTimer.singleShot(1000, lambda: modal.update_login_progress("browser_init", "Browser wird gestartet"))
QTimer.singleShot(2000, lambda: modal.update_login_progress("session_restore", "Session wird wiederhergestellt"))
QTimer.singleShot(3000, lambda: modal.update_login_progress("verification", "Login wird geprüft"))
QTimer.singleShot(4000, lambda: modal.show_session_restored())
def test_generic_modal(self):
"""Testet Generic Progress Modal"""
logger.info("Testing Generic Progress Modal")
modal = ProgressModal(parent=self, modal_type="verification")
modal.show_process()
# Simuliere Updates
QTimer.singleShot(1000, lambda: modal.update_status("Verbindung wird hergestellt...", "Server wird kontaktiert"))
QTimer.singleShot(2000, lambda: modal.update_status("Daten werden verarbeitet...", "Bitte warten"))
QTimer.singleShot(3000, lambda: modal.update_status("✅ Vorgang abgeschlossen!", "Erfolgreich"))
QTimer.singleShot(4000, lambda: modal.hide_process())
def test_error_modal(self):
"""Testet Error Modal"""
logger.info("Testing Error Modal")
modal = ProgressModal(parent=self, modal_type="generic")
modal.show_process()
# Nach kurzer Zeit Fehler anzeigen
QTimer.singleShot(1500, lambda: modal.show_error("Netzwerkfehler aufgetreten", auto_close_seconds=3))
def test_modal_manager(self):
"""Testet Modal Manager"""
logger.info("Testing Modal Manager")
# Zeige Account Creation Modal über Manager
self.modal_manager.show_modal(
'account_creation',
title="🔄 Test Account wird erstellt",
status="Modal Manager Test läuft...",
detail="Über ModalManager aufgerufen"
)
# Simuliere Updates über Manager
QTimer.singleShot(1000, lambda: self.modal_manager.update_modal_status(
'account_creation',
"Browser wird initialisiert...",
"Schritt 1 von 3"
))
QTimer.singleShot(2000, lambda: self.modal_manager.update_modal_status(
'account_creation',
"Formular wird ausgefüllt...",
"Schritt 2 von 3"
))
QTimer.singleShot(3000, lambda: self.modal_manager.update_modal_status(
'account_creation',
"Account wird finalisiert...",
"Schritt 3 von 3"
))
QTimer.singleShot(4000, lambda: self.modal_manager.update_modal_status(
'account_creation',
"✅ Account erfolgreich erstellt!",
"Test abgeschlossen"
))
QTimer.singleShot(5000, lambda: self.modal_manager.hide_modal('account_creation'))
def run_modal_test():
"""Führt den Modal-Test aus"""
import sys
# QApplication erstellen falls nicht vorhanden
app = QApplication.instance()
if app is None:
app = QApplication(sys.argv)
# Test-Fenster erstellen
test_window = ModalTestWindow()
test_window.show()
# App ausführen
if hasattr(app, 'exec'):
return app.exec()
else:
return app.exec_()
if __name__ == "__main__":
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Test ausführen
run_modal_test()

Datei anzeigen

@ -1,412 +0,0 @@
"""
Performance Monitor - Non-intrusive monitoring for race condition detection
Debug-only monitoring without production performance impact
"""
import time
import threading
import functools
import traceback
from typing import Dict, Any, Optional, Callable, List
from collections import defaultdict, deque
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import logging
import json
import os
logger = logging.getLogger(__name__)
@dataclass
class OperationMetrics:
"""Metriken für eine einzelne Operation"""
operation_name: str
thread_id: int
thread_name: str
start_time: float
end_time: Optional[float] = None
duration: Optional[float] = None
success: bool = True
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
stack_trace: Optional[str] = None
def complete(self, success: bool = True, error_message: Optional[str] = None):
"""Markiert Operation als abgeschlossen"""
self.end_time = time.time()
self.duration = self.end_time - self.start_time
self.success = success
self.error_message = error_message
def to_dict(self) -> Dict[str, Any]:
"""Konvertiert zu Dictionary für Serialisierung"""
return {
'operation_name': self.operation_name,
'thread_id': self.thread_id,
'thread_name': self.thread_name,
'start_time': self.start_time,
'end_time': self.end_time,
'duration': self.duration,
'success': self.success,
'error_message': self.error_message,
'metadata': self.metadata,
'has_stack_trace': self.stack_trace is not None
}
class PerformanceMonitor:
"""
Performance-Monitor mit race condition detection
"""
def __init__(self, enabled: bool = None, max_history: int = 1000):
# Auto-detect based on debug settings oder environment
if enabled is None:
enabled = (
os.getenv('DEBUG_RACE_CONDITIONS', '').lower() in ['true', '1', 'yes'] or
os.getenv('PERFORMANCE_MONITORING', '').lower() in ['true', '1', 'yes']
)
self.enabled = enabled
self.max_history = max_history
# Monitoring data
self._operation_history: deque = deque(maxlen=max_history)
self._active_operations: Dict[str, OperationMetrics] = {}
self._operation_stats: Dict[str, Dict[str, Any]] = defaultdict(lambda: {
'total_calls': 0,
'successful_calls': 0,
'failed_calls': 0,
'total_duration': 0.0,
'min_duration': float('inf'),
'max_duration': 0.0,
'concurrent_executions': 0,
'max_concurrent': 0
})
# Thread safety
self._lock = threading.RLock()
# Race condition detection
self._potential_races: List[Dict[str, Any]] = []
self._long_operations: List[Dict[str, Any]] = []
# Thresholds
self.long_operation_threshold = 2.0 # seconds
self.race_detection_window = 0.1 # seconds
if self.enabled:
logger.info("Performance monitoring enabled")
def monitor_operation(self, operation_name: str, capture_stack: bool = False):
"""
Decorator für Operation-Monitoring
"""
def decorator(func: Callable) -> Callable:
if not self.enabled:
return func # No overhead when disabled
@functools.wraps(func)
def wrapper(*args, **kwargs):
return self._execute_monitored(
operation_name or func.__name__,
func,
capture_stack,
*args,
**kwargs
)
wrapper.original = func
wrapper.is_monitored = True
return wrapper
return decorator
def _execute_monitored(self, operation_name: str, func: Callable,
capture_stack: bool, *args, **kwargs) -> Any:
"""Führt eine überwachte Operation aus"""
if not self.enabled:
return func(*args, **kwargs)
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
operation_key = f"{operation_name}_{thread_id}_{time.time()}"
# Metrics-Objekt erstellen
metrics = OperationMetrics(
operation_name=operation_name,
thread_id=thread_id,
thread_name=thread_name,
start_time=time.time(),
stack_trace=traceback.format_stack() if capture_stack else None
)
# Race condition detection
self._detect_potential_race(operation_name, metrics.start_time)
with self._lock:
# Concurrent execution tracking
concurrent_count = sum(
1 for op in self._active_operations.values()
if op.operation_name == operation_name
)
stats = self._operation_stats[operation_name]
stats['concurrent_executions'] = concurrent_count
stats['max_concurrent'] = max(stats['max_concurrent'], concurrent_count)
# Operation zu aktiven hinzufügen
self._active_operations[operation_key] = metrics
try:
# Operation ausführen
result = func(*args, **kwargs)
# Erfolg markieren
metrics.complete(success=True)
return result
except Exception as e:
# Fehler markieren
metrics.complete(success=False, error_message=str(e))
raise
finally:
# Cleanup und Statistik-Update
with self._lock:
self._active_operations.pop(operation_key, None)
self._update_statistics(metrics)
self._operation_history.append(metrics)
# Long operation detection
if metrics.duration and metrics.duration > self.long_operation_threshold:
self._record_long_operation(metrics)
def _detect_potential_race(self, operation_name: str, start_time: float):
"""Erkennt potentielle Race Conditions"""
if not self.enabled:
return
# Prüfe ob ähnliche Operationen zeitgleich laufen
concurrent_ops = []
with self._lock:
for op in self._active_operations.values():
if (op.operation_name == operation_name and
abs(op.start_time - start_time) < self.race_detection_window):
concurrent_ops.append(op)
if len(concurrent_ops) > 0:
race_info = {
'operation_name': operation_name,
'detected_at': start_time,
'concurrent_threads': [op.thread_id for op in concurrent_ops],
'time_window': self.race_detection_window,
'severity': 'high' if len(concurrent_ops) > 2 else 'medium'
}
self._potential_races.append(race_info)
logger.warning(f"Potential race condition detected: {operation_name} "
f"running on {len(concurrent_ops)} threads simultaneously")
def _record_long_operation(self, metrics: OperationMetrics):
"""Zeichnet lange Operationen auf"""
long_op_info = {
'operation_name': metrics.operation_name,
'duration': metrics.duration,
'thread_id': metrics.thread_id,
'start_time': metrics.start_time,
'success': metrics.success,
'metadata': metrics.metadata
}
self._long_operations.append(long_op_info)
logger.warning(f"Long operation detected: {metrics.operation_name} "
f"took {metrics.duration:.3f}s (threshold: {self.long_operation_threshold}s)")
def _update_statistics(self, metrics: OperationMetrics):
"""Aktualisiert Operation-Statistiken"""
stats = self._operation_stats[metrics.operation_name]
stats['total_calls'] += 1
if metrics.success:
stats['successful_calls'] += 1
else:
stats['failed_calls'] += 1
if metrics.duration:
stats['total_duration'] += metrics.duration
stats['min_duration'] = min(stats['min_duration'], metrics.duration)
stats['max_duration'] = max(stats['max_duration'], metrics.duration)
def get_statistics(self) -> Dict[str, Any]:
"""Gibt vollständige Monitoring-Statistiken zurück"""
if not self.enabled:
return {'monitoring_enabled': False}
with self._lock:
# Statistiken aufbereiten
processed_stats = {}
for op_name, stats in self._operation_stats.items():
processed_stats[op_name] = {
**stats,
'average_duration': (
stats['total_duration'] / stats['total_calls']
if stats['total_calls'] > 0 else 0
),
'success_rate': (
stats['successful_calls'] / stats['total_calls']
if stats['total_calls'] > 0 else 0
),
'min_duration': stats['min_duration'] if stats['min_duration'] != float('inf') else 0
}
return {
'monitoring_enabled': True,
'operation_statistics': processed_stats,
'race_conditions': {
'detected_count': len(self._potential_races),
'recent_races': self._potential_races[-10:], # Last 10
},
'long_operations': {
'detected_count': len(self._long_operations),
'threshold': self.long_operation_threshold,
'recent_long_ops': self._long_operations[-10:], # Last 10
},
'active_operations': len(self._active_operations),
'history_size': len(self._operation_history),
'thresholds': {
'long_operation_threshold': self.long_operation_threshold,
'race_detection_window': self.race_detection_window
}
}
def get_race_condition_report(self) -> Dict[str, Any]:
"""Gibt detaillierten Race Condition Report zurück"""
if not self.enabled:
return {'monitoring_enabled': False}
with self._lock:
# Gruppiere Race Conditions nach Operation
races_by_operation = defaultdict(list)
for race in self._potential_races:
races_by_operation[race['operation_name']].append(race)
# Analysiere Patterns
analysis = {}
for op_name, races in races_by_operation.items():
high_severity = sum(1 for r in races if r['severity'] == 'high')
analysis[op_name] = {
'total_races': len(races),
'high_severity_races': high_severity,
'affected_threads': len(set(
thread_id for race in races
for thread_id in race['concurrent_threads']
)),
'first_detected': min(r['detected_at'] for r in races),
'last_detected': max(r['detected_at'] for r in races),
'recommendation': self._get_race_recommendation(op_name, races)
}
return {
'monitoring_enabled': True,
'total_race_conditions': len(self._potential_races),
'affected_operations': len(races_by_operation),
'analysis_by_operation': analysis,
'raw_detections': self._potential_races
}
def _get_race_recommendation(self, operation_name: str, races: List[Dict]) -> str:
"""Gibt Empfehlungen für Race Condition Behebung"""
race_count = len(races)
high_severity_count = sum(1 for r in races if r['severity'] == 'high')
if high_severity_count > 5:
return f"CRITICAL: {operation_name} has {high_severity_count} high-severity race conditions. Implement ThreadSafetyMixin immediately."
elif race_count > 10:
return f"HIGH: {operation_name} frequently encounters race conditions. Consider adding thread synchronization."
elif race_count > 3:
return f"MEDIUM: {operation_name} occasionally has race conditions. Monitor and consider thread safety measures."
else:
return f"LOW: {operation_name} has minimal race condition risk."
def export_report(self, filename: Optional[str] = None) -> str:
"""Exportiert vollständigen Report als JSON"""
if not self.enabled:
return "Monitoring not enabled"
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"performance_report_{timestamp}.json"
report = {
'timestamp': datetime.now().isoformat(),
'statistics': self.get_statistics(),
'race_condition_report': self.get_race_condition_report(),
'operation_history': [op.to_dict() for op in list(self._operation_history)[-100:]] # Last 100
}
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
logger.info(f"Performance report exported to: {filename}")
return filename
except Exception as e:
logger.error(f"Failed to export performance report: {e}")
return f"Export failed: {e}"
def reset_statistics(self):
"""Setzt alle Statistiken zurück"""
with self._lock:
self._operation_history.clear()
self._operation_stats.clear()
self._potential_races.clear()
self._long_operations.clear()
# Aktive Operationen nicht löschen - könnten noch laufen
if self.enabled:
logger.info("Performance monitoring statistics reset")
# Global Monitor Instance
_global_monitor: Optional[PerformanceMonitor] = None
_monitor_init_lock = threading.RLock()
def get_performance_monitor() -> PerformanceMonitor:
"""Holt die globale Monitor-Instanz (Singleton)"""
global _global_monitor
if _global_monitor is None:
with _monitor_init_lock:
if _global_monitor is None:
_global_monitor = PerformanceMonitor()
return _global_monitor
# Convenience Decorators
def monitor_if_enabled(operation_name: str = None, capture_stack: bool = False):
"""Convenience decorator für conditional monitoring"""
monitor = get_performance_monitor()
return monitor.monitor_operation(operation_name, capture_stack)
def monitor_race_conditions(operation_name: str = None):
"""Speziell für Race Condition Detection"""
return monitor_if_enabled(operation_name, capture_stack=True)
def monitor_fingerprint_operations(operation_name: str = None):
"""Speziell für Fingerprint-Operationen"""
return monitor_if_enabled(f"fingerprint_{operation_name}", capture_stack=False)
def monitor_session_operations(operation_name: str = None):
"""Speziell für Session-Operationen"""
return monitor_if_enabled(f"session_{operation_name}", capture_stack=False)

Datei anzeigen

@ -7,6 +7,12 @@ Dieser Guard verhindert:
- Mehrere Browser-Instanzen gleichzeitig
Clean Code & YAGNI: Nur das Nötigste, keine Über-Engineering.
WICHTIG - Korrekte Verwendung:
- start() → Prozess beginnt
- end(success=True/False) → Prozess endet normal (zählt für Failure-Tracking)
- release() → Prozess wird abgebrochen (zählt NICHT als Failure)
- Alle Methoden sind idempotent (mehrfacher Aufruf ist sicher)
"""
import json
@ -14,6 +20,7 @@ import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple
import threading
logger = logging.getLogger(__name__)
@ -26,6 +33,9 @@ class ProcessGuard:
- Process Lock Management (nur ein Prozess gleichzeitig)
- Fehler-Tracking (Zwangspause nach 3 Fehlern)
- Persistierung der Pause-Zeit über Neustarts
Thread-Safety:
- Alle öffentlichen Methoden sind thread-safe durch Lock
"""
# Konfiguration
@ -35,11 +45,15 @@ class ProcessGuard:
def __init__(self):
"""Initialisiert den Process Guard."""
# Thread-Safety Lock
self._thread_lock = threading.Lock()
# Process Lock
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp für Auto-Timeout
self._lock_id = None # Eindeutige ID für jeden Lock
# Error Tracking
self._failure_count = 0
@ -48,6 +62,9 @@ class ProcessGuard:
# Config File
self._config_file = Path("config/.process_guard")
# Counter für Lock-IDs
self._lock_counter = 0
def can_start(self, process_type: str, platform: str) -> Tuple[bool, Optional[str]]:
"""
Prüft ob ein Prozess gestartet werden darf.
@ -61,98 +78,172 @@ class ProcessGuard:
- (True, None) wenn erlaubt
- (False, "Fehlermeldung") wenn blockiert
"""
# 1. Prüfe Zwangspause
if self._is_paused():
remaining_min = self._get_pause_remaining_minutes()
error_msg = (
f"⏸ Zwangspause aktiv\n\n"
f"Nach 3 fehlgeschlagenen Versuchen ist eine Pause erforderlich.\n"
f"Verbleibende Zeit: {remaining_min} Minuten\n\n"
f"Empfehlung:\n"
f"• Proxy-Einstellungen prüfen\n"
f"Internetverbindung prüfen\n"
f"Plattform-Status überprüfen"
)
return False, error_msg
with self._thread_lock:
# 1. Prüfe Zwangspause
if self._is_paused():
remaining_min = self._get_pause_remaining_minutes()
error_msg = (
f"⏸ Zwangspause aktiv\n\n"
f"Nach 3 fehlgeschlagenen Versuchen ist eine Pause erforderlich.\n"
f"Verbleibende Zeit: {remaining_min} Minuten\n\n"
f"Empfehlung:\n"
f"Proxy-Einstellungen prüfen\n"
f"Internetverbindung prüfen\n"
f"• Plattform-Status überprüfen"
)
return False, error_msg
# 2. Prüfe Process Lock
if self._is_locked:
error_msg = (
f"⚠ Prozess läuft bereits\n\n"
f"Aktuell aktiv: {self._current_process} ({self._current_platform})\n\n"
f"Bitte warten Sie bis der aktuelle Vorgang abgeschlossen ist."
)
return False, error_msg
# 2. Prüfe Process Lock (mit Auto-Timeout-Check)
if self._is_locked_with_timeout_check():
error_msg = (
f"⚠ Prozess läuft bereits\n\n"
f"Aktuell aktiv: {self._current_process} ({self._current_platform})\n\n"
f"Bitte warten Sie bis der aktuelle Vorgang abgeschlossen ist."
)
return False, error_msg
return True, None
return True, None
def start(self, process_type: str, platform: str):
def start(self, process_type: str, platform: str) -> int:
"""
Startet einen Prozess (setzt den Lock).
Args:
process_type: Art des Prozesses
platform: Plattform
"""
self._is_locked = True
self._current_process = process_type
self._current_platform = platform
self._lock_started_at = datetime.now() # Timestamp für Auto-Timeout
logger.info(f"Process locked: {process_type} ({platform})")
def end(self, success: bool):
Returns:
int: Lock-ID für diesen Prozess (für spätere Freigabe)
"""
with self._thread_lock:
self._lock_counter += 1
self._lock_id = self._lock_counter
self._is_locked = True
self._current_process = process_type
self._current_platform = platform
self._lock_started_at = datetime.now()
logger.info(f"Process locked [ID={self._lock_id}]: {process_type} ({platform})")
return self._lock_id
def end(self, success: bool) -> bool:
"""
Beendet einen Prozess (gibt den Lock frei).
Diese Methode ist IDEMPOTENT - mehrfacher Aufruf ist sicher.
Der Failure-Counter wird nur erhöht wenn der Lock aktiv war.
Args:
success: War der Prozess erfolgreich?
Returns:
bool: True wenn Lock freigegeben wurde, False wenn kein Lock aktiv war
"""
# Lock freigeben
process_info = f"{self._current_process} ({self._current_platform})"
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp zurücksetzen
with self._thread_lock:
# IDEMPOTENZ: Prüfe ob Lock überhaupt aktiv ist
if not self._is_locked:
logger.debug("end() aufgerufen, aber kein Lock aktiv - ignoriere")
return False
# Fehler-Tracking
if success:
if self._failure_count > 0:
logger.info(f"Fehler-Counter zurückgesetzt nach Erfolg (war: {self._failure_count})")
self._failure_count = 0
self._save_pause_state()
else:
self._failure_count += 1
logger.warning(f"Fehlschlag #{self._failure_count} bei {process_info}")
# Lock-Info für Logging speichern
process_info = f"{self._current_process} ({self._current_platform})"
lock_id = self._lock_id
if self._failure_count >= self.MAX_FAILURES:
self._activate_pause()
# Lock freigeben
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
logger.info(f"Process unlocked: {process_info} (success={success})")
# Fehler-Tracking (nur wenn Lock aktiv war)
if success:
if self._failure_count > 0:
logger.info(f"Fehler-Counter zurückgesetzt nach Erfolg (war: {self._failure_count})")
self._failure_count = 0
self._save_pause_state()
else:
self._failure_count += 1
logger.warning(f"Fehlschlag #{self._failure_count} bei {process_info}")
if self._failure_count >= self.MAX_FAILURES:
self._activate_pause()
logger.info(f"Process unlocked [ID={lock_id}]: {process_info} (success={success})")
return True
def release(self) -> bool:
"""
Gibt den Lock frei OHNE den Failure-Counter zu beeinflussen.
Verwendung:
- User-Abbruch (Cancel-Button)
- Validierungsfehler VOR Prozessstart
- Cleanup bei App-Schließung
Diese Methode ist IDEMPOTENT - mehrfacher Aufruf ist sicher.
Returns:
bool: True wenn Lock freigegeben wurde, False wenn kein Lock aktiv war
"""
with self._thread_lock:
# IDEMPOTENZ: Prüfe ob Lock überhaupt aktiv ist
if not self._is_locked:
logger.debug("release() aufgerufen, aber kein Lock aktiv - ignoriere")
return False
# Lock-Info für Logging speichern
process_info = f"{self._current_process} ({self._current_platform})"
lock_id = self._lock_id
# Lock freigeben (OHNE Failure-Tracking)
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
logger.info(f"Process released [ID={lock_id}]: {process_info} (kein Failure gezählt)")
return True
def reset(self):
"""
Reset beim App-Start.
Lädt Pause-State, resettet aber Lock (da Lock nicht über Neustarts persistiert).
"""
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None # Timestamp zurücksetzen
self._load_pause_state()
with self._thread_lock:
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
self._load_pause_state()
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
logger.warning(f"Zwangspause aktiv: noch {remaining} Minuten")
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
logger.warning(f"Zwangspause aktiv: noch {remaining} Minuten")
logger.info("Process Guard initialisiert")
logger.info("Process Guard initialisiert")
def is_locked(self) -> bool:
"""
Gibt zurück ob aktuell ein Prozess läuft (mit Auto-Timeout-Check).
Thread-safe Methode.
Returns:
True wenn ein Prozess aktiv ist
"""
with self._thread_lock:
return self._is_locked_with_timeout_check()
def _is_locked_with_timeout_check(self) -> bool:
"""
Interne Methode: Prüft Lock-Status mit Auto-Timeout.
MUSS innerhalb eines _thread_lock aufgerufen werden!
Returns:
True wenn Lock aktiv ist
"""
if not self._is_locked:
return False
@ -162,14 +253,15 @@ class ProcessGuard:
if elapsed_minutes > self.LOCK_TIMEOUT_MINUTES:
logger.warning(
f"⏰ AUTO-TIMEOUT: Lock nach {int(elapsed_minutes)} Minuten freigegeben. "
f"⏰ AUTO-TIMEOUT: Lock [ID={self._lock_id}] nach {int(elapsed_minutes)} Minuten freigegeben. "
f"Prozess: {self._current_process} ({self._current_platform})"
)
# Lock automatisch freigeben
# Lock automatisch freigeben (OHNE Failure-Zählung - Timeout ist kein User-Fehler)
self._is_locked = False
self._current_process = None
self._current_platform = None
self._lock_started_at = None
self._lock_id = None
return False
return True
@ -178,26 +270,44 @@ class ProcessGuard:
"""
Gibt zurück ob Zwangspause aktiv ist.
Thread-safe Methode.
Returns:
True wenn Pause aktiv ist
"""
return self._is_paused()
with self._thread_lock:
return self._is_paused()
def get_status_message(self) -> Optional[str]:
"""
Gibt Status-Nachricht zurück wenn blockiert.
Thread-safe Methode.
Returns:
None wenn nicht blockiert, sonst Nachricht
"""
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
return f"Zwangspause aktiv (noch {remaining} Min)"
with self._thread_lock:
if self._is_paused():
remaining = self._get_pause_remaining_minutes()
return f"Zwangspause aktiv (noch {remaining} Min)"
if self._is_locked:
return f"'{self._current_process}' läuft"
if self._is_locked:
return f"'{self._current_process}' läuft"
return None
return None
def get_failure_count(self) -> int:
"""
Gibt den aktuellen Failure-Counter zurück.
Thread-safe Methode.
Returns:
int: Anzahl der Fehlschläge seit letztem Erfolg
"""
with self._thread_lock:
return self._failure_count
# Private Methoden
@ -273,18 +383,24 @@ class ProcessGuard:
logger.error(f"Fehler beim Laden des Pause-State: {e}")
# Globale Instanz (YAGNI: Kein komplexes Singleton-Pattern nötig)
# Globale Instanz mit Thread-Safety
_guard_instance = None
_guard_instance_lock = threading.Lock()
def get_guard() -> ProcessGuard:
"""
Gibt die globale ProcessGuard-Instanz zurück.
Thread-safe Singleton-Pattern.
Returns:
ProcessGuard: Die globale Guard-Instanz
"""
global _guard_instance
if _guard_instance is None:
_guard_instance = ProcessGuard()
with _guard_instance_lock:
# Double-check locking
if _guard_instance is None:
_guard_instance = ProcessGuard()
return _guard_instance

Datei anzeigen

@ -206,8 +206,8 @@ class ProfileExportService:
spaceBefore=5*mm
)
# IntelSight Logo versuchen zu laden
logo_path = Path("resources/icons/intelsight-logo.svg")
# AegisSight Logo versuchen zu laden
logo_path = Path("resources/icons/aegissight-logo.svg")
if logo_path.exists():
try:
# SVG zu reportlab Image (mit svglib falls verfügbar)

Datei anzeigen

@ -383,19 +383,19 @@ class ProxyRotator:
def format_proxy_for_playwright(self, proxy: str) -> Dict[str, str]:
"""
Formatiert einen Proxy-String für die Verwendung mit Playwright.
Args:
proxy: Proxy-String im Format host:port:username:password
Returns:
Dictionary mit Playwright-Proxy-Konfiguration
"""
parts = proxy.split(":")
if len(parts) >= 4:
# Format: host:port:username:password
host, port, username, password = parts[:4]
return {
"server": f"{host}:{port}",
"username": username,
@ -404,10 +404,110 @@ class ProxyRotator:
elif len(parts) >= 2:
# Format: host:port
host, port = parts[:2]
return {
"server": f"{host}:{port}"
}
else:
logger.warning(f"Ungültiges Proxy-Format: {self.mask_proxy_credentials(proxy)}")
return {}
return {}
# ==========================================================================
# ANTI-DETECTION: Erzwungene Proxy-Rotation für Account-Registrierung
# ==========================================================================
def force_rotation(self, proxy_type: str = None) -> Optional[Dict[str, str]]:
"""
Erzwingt eine sofortige Proxy-Rotation.
Diese Methode sollte VOR jeder neuen Account-Registrierung aufgerufen
werden, um sicherzustellen, dass ein frischer Proxy verwendet wird.
Dies verhindert, dass mehrere Accounts von derselben IP erstellt werden.
Args:
proxy_type: Proxy-Typ ("ipv4", "ipv6", "mobile") oder None für zufällig
Returns:
Neue Proxy-Konfiguration (Dict) oder None wenn kein Proxy verfügbar
"""
logger.info("ERZWINGE Proxy-Rotation für neue Registrierung")
# Vorherigen Proxy vergessen
old_proxy = self.current_proxy
self.current_proxy = None
self.last_rotation_time = 0
# Neuen Proxy holen
new_proxy = self.get_proxy(proxy_type)
if new_proxy:
self.current_proxy = new_proxy.get('server', '')
self.last_rotation_time = time.time()
# Log mit maskierten Credentials
masked_server = self.mask_proxy_credentials(self.current_proxy)
logger.info(f"Proxy rotiert: {masked_server}")
if old_proxy:
logger.debug(f"Vorheriger Proxy: {self.mask_proxy_credentials(old_proxy)}")
else:
logger.warning("Kein Proxy verfügbar für erzwungene Rotation")
return new_proxy
def get_proxy_for_registration(self, proxy_type: str = None,
force_new: bool = True) -> Optional[Dict[str, str]]:
"""
Holt einen Proxy speziell für Account-Registrierung.
Diese Methode ist ein Wrapper um force_rotation() mit zusätzlicher
Logik für Registrierungen.
Args:
proxy_type: Gewünschter Proxy-Typ oder None für zufällig
force_new: Ob ein neuer Proxy erzwungen werden soll (Standard: True)
Returns:
Proxy-Konfiguration für Playwright oder None
"""
if force_new:
proxy_config = self.force_rotation(proxy_type)
else:
proxy_config = self.get_proxy(proxy_type)
if not proxy_config:
logger.warning("Kein Proxy für Registrierung verfügbar - Registrierung ohne Proxy")
return None
logger.info(f"Proxy für Registrierung bereit: {self.mask_proxy_credentials(proxy_config.get('server', ''))}")
return proxy_config
def should_rotate_for_registration(self) -> bool:
"""
Prüft, ob eine Proxy-Rotation vor der nächsten Registrierung empfohlen wird.
Returns:
True wenn Rotation empfohlen, False sonst
"""
# Immer True - jede Registrierung sollte einen neuen Proxy verwenden
# Dies ist die sicherste Anti-Detection-Strategie
return True
def get_rotation_stats(self) -> Dict[str, Any]:
"""
Gibt Statistiken über Proxy-Rotationen zurück.
Returns:
Dictionary mit Rotations-Statistiken
"""
return {
"current_proxy": self.mask_proxy_credentials(self.current_proxy) if self.current_proxy else None,
"last_rotation_time": self.last_rotation_time,
"time_since_last_rotation": time.time() - self.last_rotation_time if self.last_rotation_time > 0 else None,
"rotation_interval": self.config.get("rotation_interval", 300),
"available_proxies": {
"ipv4": len(self.config.get("ipv4", [])),
"ipv6": len(self.config.get("ipv6", [])),
"mobile": len(self.config.get("mobile", []))
}
}

351
utils/rate_limit_handler.py Normale Datei
Datei anzeigen

@ -0,0 +1,351 @@
"""
Rate Limit Handler für HTTP 429 und ähnliche Fehler.
Dieses Modul implementiert exponentielles Backoff für Rate-Limiting,
um automatisch auf zu viele Anfragen zu reagieren und Sperren zu vermeiden.
"""
import logging
import time
import random
from typing import Callable, Any, Optional, List
logger = logging.getLogger("rate_limit_handler")
class RateLimitHandler:
"""
Behandelt Rate-Limits mit exponentiellem Backoff.
Diese Klasse implementiert eine robuste Strategie zum Umgang mit
Rate-Limiting durch soziale Netzwerke. Bei Erkennung eines Rate-Limits
wird exponentiell länger gewartet, um Sperren zu vermeiden.
Beispiel:
handler = RateLimitHandler()
# Option 1: Manuelles Handling
if rate_limit_detected:
handler.handle_rate_limit()
# Option 2: Automatisches Retry
result = handler.execute_with_backoff(my_function, arg1, arg2)
"""
# Bekannte Rate-Limit-Indikatoren
RATE_LIMIT_INDICATORS = [
# HTTP Status Codes
'429',
'rate limit',
'rate_limit',
'ratelimit',
# Englische Meldungen
'too many requests',
'too many attempts',
'slow down',
'try again later',
'temporarily blocked',
'please wait',
'request blocked',
# Deutsche Meldungen
'zu viele anfragen',
'zu viele versuche',
'später erneut versuchen',
'vorübergehend gesperrt',
'bitte warten',
# Plattform-spezifische Meldungen
'challenge_required', # Instagram
'checkpoint_required', # Instagram/Facebook
'feedback_required', # Instagram
'spam', # Generisch
'suspicious activity', # Generisch
'unusual activity', # Generisch
]
def __init__(self,
initial_delay: float = 60.0,
max_delay: float = 600.0,
backoff_multiplier: float = 2.0,
max_retries: int = 5,
jitter_factor: float = 0.2):
"""
Initialisiert den Rate-Limit-Handler.
Args:
initial_delay: Anfängliche Wartezeit in Sekunden (Standard: 60s = 1 Minute)
max_delay: Maximale Wartezeit in Sekunden (Standard: 600s = 10 Minuten)
backoff_multiplier: Multiplikator für exponentielles Backoff (Standard: 2.0)
max_retries: Maximale Anzahl an Wiederholungsversuchen (Standard: 5)
jitter_factor: Faktor für zufällige Variation (Standard: 0.2 = ±20%)
"""
self.initial_delay = initial_delay
self.max_delay = max_delay
self.backoff_multiplier = backoff_multiplier
self.max_retries = max_retries
self.jitter_factor = jitter_factor
# Status-Tracking
self.current_retry = 0
self.last_rate_limit_time = 0
self.total_rate_limits = 0
self.consecutive_successes = 0
def is_rate_limited(self, response_text: str) -> bool:
"""
Prüft, ob eine Antwort auf ein Rate-Limit hindeutet.
Args:
response_text: Text der Antwort (z.B. Seiteninhalt, Fehlermeldung)
Returns:
True wenn Rate-Limit erkannt wurde, sonst False
"""
if not response_text:
return False
response_lower = response_text.lower()
for indicator in self.RATE_LIMIT_INDICATORS:
if indicator in response_lower:
logger.warning(f"Rate-Limit Indikator gefunden: '{indicator}'")
return True
return False
def calculate_delay(self, retry_count: int = None) -> float:
"""
Berechnet die Backoff-Verzögerung.
Args:
retry_count: Aktueller Wiederholungsversuch (optional)
Returns:
Verzögerung in Sekunden
"""
if retry_count is None:
retry_count = self.current_retry
# Exponentielles Backoff berechnen
delay = self.initial_delay * (self.backoff_multiplier ** retry_count)
# Jitter hinzufügen (zufällige Variation)
jitter = delay * random.uniform(-self.jitter_factor, self.jitter_factor)
delay = delay + jitter
# Maximum nicht überschreiten
delay = min(delay, self.max_delay)
return delay
def handle_rate_limit(self, retry_count: int = None,
on_waiting: Optional[Callable[[float, int], None]] = None) -> float:
"""
Behandelt ein erkanntes Rate-Limit mit Backoff.
Args:
retry_count: Aktueller Wiederholungsversuch
on_waiting: Optionaler Callback während des Wartens (delay, retry)
Returns:
Tatsächlich gewartete Zeit in Sekunden
"""
if retry_count is None:
retry_count = self.current_retry
delay = self.calculate_delay(retry_count)
logger.warning(
f"Rate-Limit erkannt! Warte {delay:.1f}s "
f"(Versuch {retry_count + 1}/{self.max_retries})"
)
# Callback aufrufen falls vorhanden
if on_waiting:
on_waiting(delay, retry_count + 1)
# Warten
time.sleep(delay)
# Status aktualisieren
self.current_retry = retry_count + 1
self.last_rate_limit_time = time.time()
self.total_rate_limits += 1
self.consecutive_successes = 0
return delay
def execute_with_backoff(self, func: Callable, *args,
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs) -> Any:
"""
Führt eine Funktion mit automatischem Backoff bei Rate-Limits aus.
Args:
func: Auszuführende Funktion
*args: Positionsargumente für die Funktion
on_retry: Optionaler Callback bei Retry (retry_count, exception)
**kwargs: Keyword-Argumente für die Funktion
Returns:
Rückgabewert der Funktion oder None bei Fehler
Raises:
Exception: Wenn max_retries erreicht oder nicht-Rate-Limit-Fehler
"""
last_exception = None
for attempt in range(self.max_retries):
try:
result = func(*args, **kwargs)
# Erfolg - Reset Retry-Zähler
self.current_retry = 0
self.consecutive_successes += 1
# Nach mehreren Erfolgen: Backoff-Zähler langsam reduzieren
if self.consecutive_successes >= 3:
self.total_rate_limits = max(0, self.total_rate_limits - 1)
return result
except Exception as e:
last_exception = e
error_str = str(e).lower()
# Prüfe auf Rate-Limit-Indikatoren
is_rate_limit = any(
indicator in error_str
for indicator in self.RATE_LIMIT_INDICATORS
)
if is_rate_limit:
logger.warning(f"Rate-Limit Exception erkannt: {e}")
if on_retry:
on_retry(attempt, e)
self.handle_rate_limit(attempt)
else:
# Anderer Fehler - nicht durch Backoff lösbar
logger.error(f"Nicht-Rate-Limit Fehler: {e}")
raise
# Maximum erreicht
logger.error(
f"Maximale Wiederholungsversuche ({self.max_retries}) erreicht. "
f"Letzter Fehler: {last_exception}"
)
return None
def should_slow_down(self) -> bool:
"""
Prüft, ob die Geschwindigkeit reduziert werden sollte.
Basierend auf der Anzahl der kürzlichen Rate-Limits wird empfohlen,
ob zusätzliche Verzögerungen eingebaut werden sollten.
Returns:
True wenn Verlangsamung empfohlen, sonst False
"""
# Wenn kürzlich (< 5 min) ein Rate-Limit war
time_since_last = time.time() - self.last_rate_limit_time
if time_since_last < 300 and self.last_rate_limit_time > 0:
return True
# Wenn viele Rate-Limits insgesamt
if self.total_rate_limits >= 3:
return True
return False
def get_recommended_delay(self) -> float:
"""
Gibt eine empfohlene zusätzliche Verzögerung zurück.
Basierend auf dem aktuellen Status wird eine Verzögerung empfohlen,
die zwischen Aktionen eingefügt werden sollte.
Returns:
Empfohlene Verzögerung in Sekunden
"""
if not self.should_slow_down():
return 0.0
# Basis-Verzögerung basierend auf Anzahl der Rate-Limits
base_delay = 5.0 * self.total_rate_limits
# Zusätzliche Verzögerung wenn kürzlich Rate-Limit war
time_since_last = time.time() - self.last_rate_limit_time
if time_since_last < 300:
# Je kürzer her, desto länger warten
recency_factor = 1.0 - (time_since_last / 300)
base_delay += 10.0 * recency_factor
return min(base_delay, 30.0) # Maximum 30 Sekunden
def reset(self):
"""Setzt den Handler auf Anfangszustand zurück."""
self.current_retry = 0
self.last_rate_limit_time = 0
self.total_rate_limits = 0
self.consecutive_successes = 0
logger.info("Rate-Limit Handler zurückgesetzt")
def get_status(self) -> dict:
"""
Gibt den aktuellen Status des Handlers zurück.
Returns:
Dictionary mit Status-Informationen
"""
return {
"current_retry": self.current_retry,
"total_rate_limits": self.total_rate_limits,
"consecutive_successes": self.consecutive_successes,
"last_rate_limit_time": self.last_rate_limit_time,
"should_slow_down": self.should_slow_down(),
"recommended_delay": self.get_recommended_delay(),
}
# Globale Instanz für einfache Verwendung
_default_handler: Optional[RateLimitHandler] = None
def get_default_handler() -> RateLimitHandler:
"""
Gibt die globale Standard-Instanz des Rate-Limit-Handlers zurück.
Returns:
RateLimitHandler-Instanz
"""
global _default_handler
if _default_handler is None:
_default_handler = RateLimitHandler()
return _default_handler
def handle_rate_limit(retry_count: int = None) -> float:
"""
Convenience-Funktion für Rate-Limit-Handling mit Standard-Handler.
Args:
retry_count: Aktueller Wiederholungsversuch
Returns:
Gewartete Zeit in Sekunden
"""
return get_default_handler().handle_rate_limit(retry_count)
def is_rate_limited(response_text: str) -> bool:
"""
Convenience-Funktion für Rate-Limit-Erkennung.
Args:
response_text: Zu prüfender Text
Returns:
True wenn Rate-Limit erkannt
"""
return get_default_handler().is_rate_limited(response_text)

Datei anzeigen

@ -191,9 +191,9 @@ class ThemeManager(QObject):
return os.path.join(self.base_dir, "resources", "icons", f"{icon_name}.svg")
# Logo is theme-specific
if icon_name == "intelsight-logo":
if icon_name == "aegissight-logo":
theme = ThemeConfig.get_theme(self.current_theme)
logo_name = theme.get('logo_path', 'intelsight-logo.svg').replace('.svg', '')
logo_name = theme.get('logo_path', 'aegissight-logo.svg').replace('.svg', '')
return os.path.join(self.base_dir, "resources", "icons", f"{logo_name}.svg")
# For other icons

Datei anzeigen

@ -295,21 +295,22 @@ class UsernameGenerator:
Generierter Benutzername
"""
# Verschiedene Muster für zufällige Benutzernamen
# ANTI-DETECTION: Keine verdächtigen Patterns wie "user" + Zahlen
patterns = [
# Adjektiv + Substantiv
# Adjektiv + Substantiv (z.B. "happytiger")
lambda: random.choice(self.adjectives) + random.choice(self.nouns),
# Substantiv + Zahlen
lambda: random.choice(self.nouns) + "".join(random.choices(string.digits, k=random.randint(1, 4))),
# Adjektiv + Substantiv + Zahlen
lambda: random.choice(self.adjectives) + random.choice(self.nouns) + "".join(random.choices(string.digits, k=random.randint(1, 3))),
# Substantiv + Unterstrich + Substantiv
lambda: random.choice(self.nouns) + ("_" if "_" in policy["allowed_chars"] else "") + random.choice(self.nouns),
# Benutzer + Zahlen
lambda: "user" + "".join(random.choices(string.digits, k=random.randint(3, 6)))
# Substantiv + Jahr (z.B. "eagle1995")
lambda: random.choice(self.nouns) + str(random.randint(1985, 2005)),
# Adjektiv + Substantiv + 2 Ziffern (z.B. "coolwolf42")
lambda: random.choice(self.adjectives) + random.choice(self.nouns) + str(random.randint(10, 99)),
# Substantiv + Unterstrich + Adjektiv (z.B. "tiger_happy")
lambda: random.choice(self.nouns) + ("_" if "_" in policy["allowed_chars"] else "") + random.choice(self.adjectives),
# Adjektiv + Substantiv mit Punkt (z.B. "happy.tiger") - falls erlaubt
lambda: random.choice(self.adjectives) + ("." if "." in policy["allowed_chars"] else "") + random.choice(self.nouns),
]
# Zufälliges Muster auswählen und Benutzernamen generieren
@ -417,49 +418,221 @@ class UsernameGenerator:
policy: Optional[Dict[str, Any]] = None) -> Tuple[bool, str]:
"""
Überprüft, ob ein Benutzername den Richtlinien entspricht.
Args:
username: Zu überprüfender Benutzername
platform: Name der Plattform
policy: Optionale Richtlinie (sonst wird die der Plattform verwendet)
Returns:
(Gültigkeit, Fehlermeldung)
"""
# Richtlinie bestimmen
if not policy:
policy = self.get_platform_policy(platform)
# Länge prüfen
if len(username) < policy["min_length"]:
return False, f"Benutzername ist zu kurz (mindestens {policy['min_length']} Zeichen erforderlich)"
if len(username) > policy["max_length"]:
return False, f"Benutzername ist zu lang (maximal {policy['max_length']} Zeichen erlaubt)"
# Erlaubte Zeichen prüfen
for char in username:
if char not in policy["allowed_chars"]:
return False, f"Unerlaubtes Zeichen: '{char}'"
# Anfangszeichen prüfen
if username[0] not in policy["allowed_start_chars"]:
return False, f"Benutzername darf nicht mit '{username[0]}' beginnen"
# Endzeichen prüfen
if username[-1] not in policy["allowed_end_chars"]:
return False, f"Benutzername darf nicht mit '{username[-1]}' enden"
# Aufeinanderfolgende Sonderzeichen prüfen
if not policy["allowed_consecutive_special"]:
special_chars = set(policy["allowed_chars"]) - set(string.ascii_letters + string.digits)
for i in range(len(username) - 1):
if username[i] in special_chars and username[i+1] in special_chars:
return False, "Keine aufeinanderfolgenden Sonderzeichen erlaubt"
# Disallowed words
for word in policy["disallowed_words"]:
if word.lower() in username.lower():
return False, f"Der Benutzername darf '{word}' nicht enthalten"
return True, "Benutzername ist gültig"
# ANTI-DETECTION: Prüfe auf verdächtige Bot-Patterns
if self._has_suspicious_pattern(username):
return False, "Benutzername enthält verdächtiges Bot-Pattern"
return True, "Benutzername ist gültig"
# ==========================================================================
# ANTI-DETECTION: Verdächtige Pattern-Erkennung
# ==========================================================================
def _has_suspicious_pattern(self, username: str) -> bool:
"""
Prüft, ob ein Benutzername verdächtige Bot-Patterns enthält.
Diese Methode erkennt Benutzernamen-Muster, die häufig von Bots
verwendet werden und daher von Plattformen leicht erkannt werden.
Args:
username: Zu prüfender Benutzername
Returns:
True wenn verdächtig, False wenn ok
"""
username_lower = username.lower()
# Liste verdächtiger Patterns (Regex)
suspicious_patterns = [
# Plattform-spezifische Bot-Prefixe
r'^fb_', # Facebook Bot-Pattern
r'^ig_', # Instagram Bot-Pattern
r'^tw_', # Twitter Bot-Pattern
r'^tt_', # TikTok Bot-Pattern
# Offensichtliche Bot/Test-Prefixe
r'^bot_', # Offensichtlicher Bot
r'^test_', # Test-Account
r'^temp_', # Temporär
r'^fake_', # Offensichtlich fake
r'^new_', # Neu (suspekt)
r'^auto_', # Automatisierung
# Notfall/Backup-Patterns (aus altem Code)
r'_emergency_', # Notfall-Pattern
r'_backup_', # Backup-Pattern
r'^emergency_', # Emergency am Anfang
r'^backup_', # Backup am Anfang
# Generische Bot-Patterns
r'^user\d{4,}$', # user + 4+ Ziffern am Ende (z.B. user12345)
r'^account\d+', # account + Zahlen
r'^profile\d+', # profile + Zahlen
# Verdächtige Zahlenfolgen
r'\d{8,}', # 8+ aufeinanderfolgende Ziffern
r'^[a-z]{1,2}\d{6,}$', # 1-2 Buchstaben + 6+ Ziffern
# Timestamp-basierte Patterns
r'\d{10,}', # Unix-Timestamp-ähnlich (10+ Ziffern)
r'_\d{13}_', # Millisekunden-Timestamp in der Mitte
# Generische Suffixe die auf Bots hindeuten
r'_gen$', # Generator-Suffix
r'_bot$', # Bot-Suffix
r'_auto$', # Auto-Suffix
r'_spam$', # Spam-Suffix
]
for pattern in suspicious_patterns:
if re.search(pattern, username_lower):
logger.debug(f"Verdächtiges Pattern gefunden: {pattern} in '{username}'")
return True
# Zusätzliche Heuristiken
# Prüfe auf zu viele Unterstriche (>2 ist verdächtig)
if username_lower.count('_') > 2:
logger.debug(f"Zu viele Unterstriche in '{username}'")
return True
# Prüfe auf repetitive Zeichen (z.B. "aaaa" oder "1111")
for i in range(len(username_lower) - 3):
if username_lower[i] == username_lower[i+1] == username_lower[i+2] == username_lower[i+3]:
logger.debug(f"Repetitive Zeichen in '{username}'")
return True
return False
def generate_realistic_username(self, first_name: str = "", last_name: str = "",
platform: str = "default") -> str:
"""
Generiert einen realistischen Benutzernamen ohne verdächtige Patterns.
Diese Methode ist speziell für Anti-Detection optimiert und generiert
Benutzernamen, die wie echte menschliche Benutzernamen aussehen.
Args:
first_name: Vorname (optional)
last_name: Nachname (optional)
platform: Zielplattform
Returns:
Realistischer Benutzername
"""
policy = self.get_platform_policy(platform)
# Realistische Patterns (wie echte Menschen sie wählen)
realistic_patterns = []
if first_name:
first_name_clean = re.sub(r'[^a-z]', '', first_name.lower())
# Pattern 1: vorname + Geburtsjahr (z.B. "max1995")
realistic_patterns.append(
lambda fn=first_name_clean: f"{fn}{random.randint(1985, 2005)}"
)
# Pattern 2: vorname + Nachname-Initial + 2 Ziffern (z.B. "maxm92")
if last_name:
last_initial = last_name[0].lower() if last_name else ''
realistic_patterns.append(
lambda fn=first_name_clean, li=last_initial: f"{fn}{li}{random.randint(10, 99)}"
)
# Pattern 3: vorname.nachname (z.B. "max.mustermann")
if last_name:
last_name_clean = re.sub(r'[^a-z]', '', last_name.lower())
realistic_patterns.append(
lambda fn=first_name_clean, ln=last_name_clean: f"{fn}.{ln}"
)
# Pattern 4: vorname_adjektiv (z.B. "max_sunny")
realistic_patterns.append(
lambda fn=first_name_clean: f"{fn}_{random.choice(self.adjectives)}"
)
# Pattern 5: adjektiv_vorname_jahr (z.B. "sunny_max_93")
realistic_patterns.append(
lambda fn=first_name_clean: f"{random.choice(self.adjectives)}_{fn}_{random.randint(85, 99)}"
)
# Fallback-Patterns ohne Namen
realistic_patterns.extend([
# adjektiv + tier (z.B. "happytiger")
lambda: f"{random.choice(self.adjectives)}{random.choice(self.nouns)}",
# adjektiv + tier + 2 Ziffern (z.B. "coolwolf42")
lambda: f"{random.choice(self.adjectives)}{random.choice(self.nouns)}{random.randint(10, 99)}",
# tier + jahr (z.B. "eagle1995")
lambda: f"{random.choice(self.nouns)}{random.randint(1985, 2005)}",
])
# Versuche bis zu 20 mal einen gültigen, nicht-verdächtigen Namen zu generieren
for _ in range(20):
pattern_func = random.choice(realistic_patterns)
username = pattern_func()
# Länge anpassen
if len(username) > policy["max_length"]:
username = username[:policy["max_length"]]
if len(username) < policy["min_length"]:
username += str(random.randint(10, 99))
# Validieren (inkl. Pattern-Check)
valid, _ = self.validate_username(username, policy=policy)
if valid:
logger.info(f"Realistischer Benutzername generiert: {username}")
return username
# Absoluter Fallback
fallback = f"{random.choice(self.adjectives)}{random.choice(self.nouns)}{random.randint(10, 99)}"
logger.warning(f"Fallback-Benutzername verwendet: {fallback}")
return fallback