""" Playwright Manager - Hauptklasse für die Browser-Steuerung mit Anti-Bot-Erkennung """ import os import json import logging import random import time from pathlib import Path from typing import Dict, Optional, List, Any, Tuple from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext, ElementHandle from domain.value_objects.browser_protection_style import BrowserProtectionStyle from infrastructure.services.browser_protection_service import BrowserProtectionService # Konfiguriere Logger logger = logging.getLogger("playwright_manager") class PlaywrightManager: """ Verwaltet Browser-Sitzungen mit Playwright, einschließlich Stealth-Modus und Proxy-Einstellungen. """ def __init__(self, headless: bool = False, proxy: Optional[Dict[str, str]] = None, browser_type: str = "chromium", user_agent: Optional[str] = None, screenshots_dir: str = "screenshots", slowmo: int = 0, window_position: Optional[Tuple[int, int]] = None): """ Initialisiert den PlaywrightManager. Args: headless: Ob der Browser im Headless-Modus ausgeführt werden soll proxy: Proxy-Konfiguration (z.B. {'server': 'http://myproxy.com:3128', 'username': 'user', 'password': 'pass'}) browser_type: Welcher Browser-Typ verwendet werden soll ("chromium", "firefox", oder "webkit") user_agent: Benutzerdefinierter User-Agent screenshots_dir: Verzeichnis für Screenshots slowmo: Verzögerung zwischen Aktionen in Millisekunden (nützlich für Debugging) window_position: Optionale Fensterposition als Tupel (x, y) """ self.headless = headless self.proxy = proxy self.browser_type = browser_type self.user_agent = user_agent self.screenshots_dir = screenshots_dir self.slowmo = slowmo self.window_position = window_position # Stelle sicher, dass das Screenshots-Verzeichnis existiert os.makedirs(self.screenshots_dir, exist_ok=True) # Playwright-Instanzen self.playwright = None self.browser = None self.context = None self.page = None # Zähler für Wiederhholungsversuche self.retry_counter = {} # Lade Stealth-Konfigurationen self.stealth_config = self._load_stealth_config() # Browser Protection Service self.protection_service = BrowserProtectionService() self.protection_applied = False self.protection_style = None def _load_stealth_config(self) -> Dict[str, Any]: """Lädt die Stealth-Konfigurationen aus der Datei oder verwendet Standardwerte.""" try: config_dir = Path(__file__).parent.parent / "config" stealth_config_path = config_dir / "stealth_config.json" if stealth_config_path.exists(): with open(stealth_config_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Konnte Stealth-Konfiguration nicht laden: {e}") # Verwende Standardwerte, wenn das Laden fehlschlägt return { "vendor": "Google Inc.", "platform": "Win32", "webdriver": False, "accept_language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7", "timezone_id": "Europe/Berlin", "fingerprint_noise": True, "device_scale_factor": 1.0, } def start(self) -> Page: """ Startet die Playwright-Sitzung und gibt die Browser-Seite zurück. Returns: Page: Die Browser-Seite """ if self.page is not None: return self.page try: self.playwright = sync_playwright().start() # Wähle den Browser-Typ if self.browser_type == "firefox": browser_instance = self.playwright.firefox elif self.browser_type == "webkit": browser_instance = self.playwright.webkit else: browser_instance = self.playwright.chromium # Browser-Startoptionen browser_args = [] if self.browser_type == "chromium": # Chrome-spezifische Argumente für Anti-Bot-Erkennung browser_args.extend([ '--disable-blink-features=AutomationControlled', '--disable-features=IsolateOrigins,site-per-process', '--disable-site-isolation-trials', ]) # Browser-Launch-Optionen launch_options = { "headless": self.headless, "args": browser_args, "slow_mo": self.slowmo } # Fensterposition setzen wenn angegeben if self.window_position and not self.headless: x, y = self.window_position browser_args.extend([ f'--window-position={x},{y}' ]) # Browser starten self.browser = browser_instance.launch(**launch_options) # Kontext-Optionen für Stealth-Modus context_options = { "viewport": {"width": 1920, "height": 1080}, "device_scale_factor": self.stealth_config.get("device_scale_factor", 1.0), "locale": "de-DE", "timezone_id": self.stealth_config.get("timezone_id", "Europe/Berlin"), "accept_downloads": True, } # User-Agent setzen if self.user_agent: context_options["user_agent"] = self.user_agent # Proxy-Einstellungen, falls vorhanden if self.proxy: context_options["proxy"] = self.proxy # Browserkontext erstellen self.context = self.browser.new_context(**context_options) # JavaScript-Fingerprinting-Schutz self._apply_stealth_scripts() # Neue Seite erstellen self.page = self.context.new_page() # Event-Listener für Konsolen-Logs self.page.on("console", lambda msg: logger.debug(f"BROWSER CONSOLE: {msg.text}")) return self.page except Exception as e: logger.error(f"Fehler beim Starten des Browsers: {e}") self.close() raise def _apply_stealth_scripts(self): """Wendet JavaScript-Skripte an, um Browser-Fingerprinting zu umgehen.""" # Diese Skripte überschreiben Eigenschaften, die für Bot-Erkennung verwendet werden scripts = [ # WebDriver-Eigenschaft überschreiben """ () => { Object.defineProperty(navigator, 'webdriver', { get: () => false, }); } """, # Navigator-Eigenschaften überschreiben f""" () => {{ const newProto = navigator.__proto__; delete newProto.webdriver; navigator.__proto__ = newProto; Object.defineProperty(navigator, 'platform', {{ get: () => '{self.stealth_config.get("platform", "Win32")}' }}); Object.defineProperty(navigator, 'languages', {{ get: () => ['de-DE', 'de', 'en-US', 'en'] }}); Object.defineProperty(navigator, 'vendor', {{ get: () => '{self.stealth_config.get("vendor", "Google Inc.")}' }}); }} """, # Chrome-Objekte hinzufügen, die in normalen Browsern vorhanden sind """ () => { // Fügt chrome.runtime hinzu, falls nicht vorhanden if (!window.chrome) { window.chrome = {}; } if (!window.chrome.runtime) { window.chrome.runtime = {}; window.chrome.runtime.sendMessage = function() {}; } } """, # Plugin-Fingerprinting """ () => { const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); } """ ] # Wenn Fingerprint-Noise aktiviert ist, füge zufällige Variationen hinzu if self.stealth_config.get("fingerprint_noise", True): scripts.append(""" () => { // Canvas-Fingerprinting leicht verändern const originalToDataURL = HTMLCanvasElement.prototype.toDataURL; HTMLCanvasElement.prototype.toDataURL = function(type) { const result = originalToDataURL.apply(this, arguments); if (this.width > 16 && this.height > 16) { // Kleines Rauschen in Pixels einfügen const context = this.getContext('2d'); const imageData = context.getImageData(0, 0, 2, 2); const pixelArray = imageData.data; // Ändere einen zufälligen Pixel leicht const randomPixel = Math.floor(Math.random() * pixelArray.length / 4) * 4; pixelArray[randomPixel] = (pixelArray[randomPixel] + Math.floor(Math.random() * 10)) % 256; context.putImageData(imageData, 0, 0); } return result; }; } """) # Skripte auf den Browser-Kontext anwenden for script in scripts: self.context.add_init_script(script) def navigate_to(self, url: str, wait_until: str = "networkidle", timeout: int = 30000) -> bool: """ Navigiert zu einer bestimmten URL und wartet, bis die Seite geladen ist. Args: url: Die Ziel-URL wait_until: Wann die Navigation als abgeschlossen gilt ("load", "domcontentloaded", "networkidle") timeout: Timeout in Millisekunden Returns: bool: True bei erfolgreicher Navigation, False sonst """ if self.page is None: self.start() try: logger.info(f"Navigiere zu: {url}") self.page.goto(url, wait_until=wait_until, timeout=timeout) return True except Exception as e: logger.error(f"Fehler bei der Navigation zu {url}: {e}") self.take_screenshot(f"navigation_error_{int(time.time())}") return False def wait_for_selector(self, selector: str, timeout: int = 30000) -> Optional[ElementHandle]: """ Wartet auf ein Element mit dem angegebenen Selektor. Args: selector: CSS- oder XPath-Selektor timeout: Timeout in Millisekunden Returns: Optional[ElementHandle]: Das Element oder None, wenn nicht gefunden """ if self.page is None: raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.") try: element = self.page.wait_for_selector(selector, timeout=timeout) return element except Exception as e: logger.warning(f"Element nicht gefunden: {selector} - {e}") return None def fill_form_field(self, selector: str, value: str, timeout: int = 5000) -> bool: """ Füllt ein Formularfeld aus. Args: selector: Selektor für das Feld value: Einzugebender Wert timeout: Timeout in Millisekunden Returns: bool: True bei Erfolg, False bei Fehler """ try: # Auf Element warten element = self.wait_for_selector(selector, timeout) if not element: return False # Element fokussieren element.focus() time.sleep(random.uniform(0.1, 0.3)) # Vorhandenen Text löschen (optional) current_value = element.evaluate("el => el.value") if current_value: element.fill("") time.sleep(random.uniform(0.1, 0.2)) # Text menschenähnlich eingeben for char in value: element.type(char, delay=random.uniform(20, 100)) time.sleep(random.uniform(0.01, 0.05)) logger.info(f"Feld {selector} gefüllt mit: {value}") return True except Exception as e: logger.error(f"Fehler beim Ausfüllen von {selector}: {e}") key = f"fill_{selector}" return self._retry_action(key, lambda: self.fill_form_field(selector, value, timeout)) def click_element(self, selector: str, force: bool = False, timeout: int = 5000) -> bool: """ Klickt auf ein Element mit Anti-Bot-Bypass-Strategien. Args: selector: Selektor für das Element force: Force-Click verwenden timeout: Timeout in Millisekunden Returns: bool: True bei Erfolg, False bei Fehler """ try: # Auf Element warten element = self.wait_for_selector(selector, timeout) if not element: return False # Scroll zum Element self.page.evaluate("element => element.scrollIntoView({ behavior: 'smooth', block: 'center' })", element) time.sleep(random.uniform(0.3, 0.7)) # Menschenähnliches Verhalten - leichte Verzögerung vor dem Klick time.sleep(random.uniform(0.2, 0.5)) # Element klicken element.click(force=force, delay=random.uniform(20, 100)) logger.info(f"Element geklickt: {selector}") return True except Exception as e: logger.error(f"Fehler beim Klicken auf {selector}: {e}") # Bei Fehlern verwende robuste Click-Strategien return self.robust_click(selector, timeout) def robust_click(self, selector: str, timeout: int = 5000) -> bool: """ Robuste Click-Methode mit mehreren Anti-Bot-Bypass-Strategien. Speziell für Instagram's Click-Interceptors entwickelt. Args: selector: Selektor für das Element timeout: Timeout in Millisekunden Returns: bool: True bei Erfolg, False bei Fehler """ logger.info(f"Verwende robuste Click-Strategien für: {selector}") strategies = [ # Strategie 1: Standard Playwright Click lambda: self._strategy_standard_click(selector, timeout), # Strategie 2: Force Click lambda: self._strategy_force_click(selector, timeout), # Strategie 3: JavaScript Event Dispatch lambda: self._strategy_javascript_click(selector), # Strategie 4: Overlay-Entfernung + Click lambda: self._strategy_remove_overlays_click(selector, timeout), # Strategie 5: Focus + Enter (für Buttons/Links) lambda: self._strategy_focus_and_enter(selector), # Strategie 6: Mouse Position Click lambda: self._strategy_coordinate_click(selector) ] for i, strategy in enumerate(strategies, 1): try: logger.debug(f"Versuche Click-Strategie {i} für {selector}") if strategy(): logger.info(f"Click erfolgreich mit Strategie {i} für {selector}") return True except Exception as e: logger.debug(f"Strategie {i} fehlgeschlagen: {e}") continue logger.error(f"Alle Click-Strategien fehlgeschlagen für {selector}") return False def _strategy_standard_click(self, selector: str, timeout: int) -> bool: """Strategie 1: Standard Playwright Click""" element = self.wait_for_selector(selector, timeout) if not element: return False element.click() return True def _strategy_force_click(self, selector: str, timeout: int) -> bool: """Strategie 2: Force Click um Event-Blockierungen zu umgehen""" element = self.wait_for_selector(selector, timeout) if not element: return False element.click(force=True) return True def _strategy_javascript_click(self, selector: str) -> bool: """Strategie 3: JavaScript Event Dispatch um Overlays zu umgehen""" script = f""" (function() {{ const element = document.querySelector('{selector}'); if (!element) return false; // Erstelle und sende Click-Event const event = new MouseEvent('click', {{ bubbles: true, cancelable: true, view: window, detail: 1, button: 0, buttons: 1 }}); element.dispatchEvent(event); // Zusätzlich: Focus und Click Events element.focus(); const clickEvent = new Event('click', {{ bubbles: true, cancelable: true }}); element.dispatchEvent(clickEvent); return true; }})(); """ return self.page.evaluate(script) def _strategy_remove_overlays_click(self, selector: str, timeout: int) -> bool: """Strategie 4: Entferne Click-Interceptors und klicke dann""" # Entferne Overlays die Click-Events abfangen self._remove_click_interceptors() # Warte kurz damit DOM-Änderungen wirksam werden time.sleep(0.2) # Jetzt normaler Click element = self.wait_for_selector(selector, timeout) if not element: return False element.click() return True def _strategy_focus_and_enter(self, selector: str) -> bool: """Strategie 5: Focus Element und verwende Enter-Taste""" script = f""" (function() {{ const element = document.querySelector('{selector}'); if (!element) return false; // Element fokussieren element.focus(); element.scrollIntoView({{ block: 'center' }}); // Enter-Event senden const enterEvent = new KeyboardEvent('keydown', {{ key: 'Enter', code: 'Enter', keyCode: 13, which: 13, bubbles: true, cancelable: true }}); element.dispatchEvent(enterEvent); // Zusätzlich keyup Event const keyupEvent = new KeyboardEvent('keyup', {{ key: 'Enter', code: 'Enter', keyCode: 13, which: 13, bubbles: true, cancelable: true }}); element.dispatchEvent(keyupEvent); return true; }})(); """ return self.page.evaluate(script) def _strategy_coordinate_click(self, selector: str) -> bool: """Strategie 6: Click auf Koordinaten des Elements""" try: element = self.page.locator(selector).first if not element.is_visible(): return False # Hole Element-Position box = element.bounding_box() if not box: return False # Klicke in die Mitte des Elements x = box['x'] + box['width'] / 2 y = box['y'] + box['height'] / 2 self.page.mouse.click(x, y) return True except Exception: return False def _remove_click_interceptors(self) -> None: """ Entfernt invisible Overlays die Click-Events abfangen. Speziell für Instagram's Anti-Bot-Maßnahmen entwickelt. """ script = """ (function() { console.log('AccountForger: Entferne Click-Interceptors...'); // Liste typischer Instagram Click-Interceptor Klassen const interceptorSelectors = [ // Instagram's bekannte Interceptor-Klassen '.x1lliihq.x1plvlek.xryxfnj', '.x1n2onr6.xzkaem6', 'span[dir="auto"]', // Allgemeine Interceptor-Eigenschaften '[style*="pointer-events: all"]', '[style*="position: absolute"]', '[style*="z-index"]' ]; let removedCount = 0; // Entferne Interceptor-Elemente interceptorSelectors.forEach(selector => { try { const elements = document.querySelectorAll(selector); elements.forEach(el => { const style = window.getComputedStyle(el); // Prüfe ob Element ein Click-Interceptor ist const isInterceptor = ( style.pointerEvents === 'all' || (style.position === 'absolute' && parseInt(style.zIndex) > 1000) || (el.offsetWidth > 0 && el.offsetHeight > 0 && el.textContent.trim() === '' && style.backgroundColor === 'rgba(0, 0, 0, 0)') ); if (isInterceptor) { // Deaktiviere Pointer-Events el.style.pointerEvents = 'none'; el.style.display = 'none'; el.style.visibility = 'hidden'; removedCount++; } }); } catch (e) { console.warn('Fehler beim Entfernen von Interceptors:', e); } }); // Zusätzlich: Entferne alle unsichtbaren absolute Elemente die über anderen liegen const allElements = document.querySelectorAll('*'); allElements.forEach(el => { const style = window.getComputedStyle(el); if (style.position === 'absolute' || style.position === 'fixed') { const rect = el.getBoundingClientRect(); // Prüfe ob Element unsichtbar aber vorhanden ist if (rect.width > 0 && rect.height > 0 && style.opacity !== '0' && style.visibility !== 'hidden' && el.textContent.trim() === '' && parseInt(style.zIndex) > 10) { el.style.pointerEvents = 'none'; removedCount++; } } }); console.log(`AccountForger: ${removedCount} Click-Interceptors entfernt`); // Markiere dass Interceptors entfernt wurden window.__accountforge_interceptors_removed = true; return removedCount; })(); """ try: removed_count = self.page.evaluate(script) if removed_count > 0: logger.info(f"Click-Interceptors entfernt: {removed_count}") else: logger.debug("Keine Click-Interceptors gefunden") except Exception as e: logger.error(f"Fehler beim Entfernen von Click-Interceptors: {e}") def select_option(self, selector: str, value: str, timeout: int = 5000) -> bool: """ Wählt eine Option aus einem Dropdown-Menü. Args: selector: Selektor für das Dropdown value: Wert oder sichtbarer Text der Option timeout: Timeout in Millisekunden Returns: bool: True bei Erfolg, False bei Fehler """ try: # Auf Element warten element = self.wait_for_selector(selector, timeout) if not element: return False # Option auswählen self.page.select_option(selector, value=value) logger.info(f"Option '{value}' ausgewählt in {selector}") return True except Exception as e: logger.error(f"Fehler bei der Auswahl von '{value}' in {selector}: {e}") key = f"select_{selector}" return self._retry_action(key, lambda: self.select_option(selector, value, timeout)) def is_element_visible(self, selector: str, timeout: int = 5000) -> bool: """ Prüft, ob ein Element sichtbar ist. Args: selector: Selektor für das Element timeout: Timeout in Millisekunden Returns: bool: True wenn sichtbar, False sonst """ try: element = self.page.wait_for_selector(selector, timeout=timeout, state="visible") return element is not None except: return False def take_screenshot(self, name: str = None) -> str: """ Erstellt einen Screenshot der aktuellen Seite. Args: name: Name für den Screenshot (ohne Dateierweiterung) Returns: str: Pfad zum erstellten Screenshot """ if self.page is None: raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.") timestamp = int(time.time()) filename = f"{name}_{timestamp}.png" if name else f"screenshot_{timestamp}.png" path = os.path.join(self.screenshots_dir, filename) self.page.screenshot(path=path, full_page=True) logger.info(f"Screenshot erstellt: {path}") return path def _retry_action(self, key: str, action_func, max_retries: int = 3) -> bool: """ Wiederholt eine Aktion bei Fehler. Args: key: Eindeutiger Schlüssel für die Aktion action_func: Funktion, die ausgeführt werden soll max_retries: Maximale Anzahl der Wiederholungen Returns: bool: Ergebnis der Aktion """ if key not in self.retry_counter: self.retry_counter[key] = 0 self.retry_counter[key] += 1 if self.retry_counter[key] <= max_retries: logger.info(f"Wiederhole Aktion {key} (Versuch {self.retry_counter[key]} von {max_retries})") time.sleep(random.uniform(0.5, 1.0)) return action_func() else: logger.warning(f"Maximale Anzahl von Wiederholungen für {key} erreicht") self.retry_counter[key] = 0 return False def apply_protection(self, protection_style: Optional[BrowserProtectionStyle] = None) -> None: """ Wendet Browser-Schutz an, um versehentliche Benutzerinteraktionen zu verhindern. Args: protection_style: Konfiguration für den Schutzstil. Verwendet Standardwerte wenn None. """ if self.page is None: raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.") if protection_style is None: protection_style = BrowserProtectionStyle() # Speichere den Stil für spätere Wiederanwendung self.protection_style = protection_style # Wende Schutz initial an self.protection_service.protect_browser(self.page, protection_style) self.protection_applied = True # Registriere Event-Handler für Seitenwechsel self._setup_protection_listeners() logger.info(f"Browser-Schutz angewendet mit Level: {protection_style.level.value}") def _setup_protection_listeners(self) -> None: """Setzt Event-Listener auf, um Schutz bei Seitenwechsel neu anzuwenden.""" if self.page is None: return # Bei Navigation (Seitenwechsel) Schutz neu anwenden def on_navigation(): if self.protection_applied and self.protection_style: # Kurz warten bis neue Seite geladen ist self.page.wait_for_load_state("domcontentloaded") # Schutz neu anwenden self.protection_service.protect_browser(self.page, self.protection_style) logger.debug("Browser-Schutz nach Navigation neu angewendet") # Registriere Handler für verschiedene Events self.page.on("framenavigated", lambda frame: on_navigation() if frame == self.page.main_frame else None) # Zusätzlich: Wende Schutz bei DOM-Änderungen neu an self.context.add_init_script(""" // Überwache DOM-Änderungen und wende Schutz neu an wenn nötig const observer = new MutationObserver(() => { const shield = document.getElementById('accountforge-shield'); if (!shield && window.__accountforge_protection) { // Schutz wurde entfernt, wende neu an setTimeout(() => { if (!document.getElementById('accountforge-shield')) { eval(window.__accountforge_protection); } }, 100); } }); observer.observe(document.body, { childList: true, subtree: true }); """) def remove_protection(self) -> None: """Entfernt den Browser-Schutz.""" if self.page is None or not self.protection_applied: return self.protection_service.remove_protection(self.page) self.protection_applied = False self.protection_style = None logger.info("Browser-Schutz entfernt") def close(self): """Schließt den Browser und gibt Ressourcen frei.""" try: # Entferne Schutz vor dem Schließen if self.protection_applied: self.remove_protection() # Seite erst schließen, dann Kontext, dann Browser, dann Playwright if self.page: try: self.page.close() except Exception as e: logger.warning(f"Fehler beim Schließen der Page: {e}") self.page = None if self.context: try: self.context.close() except Exception as e: logger.warning(f"Fehler beim Schließen des Context: {e}") self.context = None if self.browser: try: self.browser.close() except Exception as e: logger.warning(f"Fehler beim Schließen des Browsers: {e}") self.browser = None # Playwright stop mit Retry-Logik if self.playwright: try: self.playwright.stop() except Exception as e: logger.warning(f"Fehler beim Stoppen von Playwright: {e}") # Versuche force stop try: import time time.sleep(0.5) # Kurz warten self.playwright.stop() except Exception as e2: logger.error(f"Force stop fehlgeschlagen: {e2}") self.playwright = None logger.info("Browser-Sitzung erfolgreich geschlossen") except Exception as e: logger.error(f"Fehler beim Schließen des Browsers: {e}") # Versuche Ressourcen trotzdem zu nullen self.page = None self.context = None self.browser = None self.playwright = None def __enter__(self): """Kontext-Manager-Eintritt.""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Kontext-Manager-Austritt.""" self.close() # Beispielnutzung, wenn direkt ausgeführt if __name__ == "__main__": # Konfiguriere Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Beispiel für einen Proxy (ohne Anmeldedaten) proxy_config = { "server": "http://example-proxy.com:8080" } # Browser starten und zu einer Seite navigieren with PlaywrightManager(headless=False) as manager: manager.navigate_to("https://www.instagram.com") time.sleep(5) # Kurze Pause zum Anzeigen der Seite