906 Zeilen
34 KiB
Python
906 Zeilen
34 KiB
Python
"""
|
|
Playwright Manager - Hauptklasse für die Browser-Steuerung mit Anti-Bot-Erkennung
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import random
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, List, Any, Tuple
|
|
from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext, ElementHandle
|
|
|
|
from domain.value_objects.browser_protection_style import BrowserProtectionStyle
|
|
from infrastructure.services.browser_protection_service import BrowserProtectionService
|
|
|
|
# Konfiguriere Logger
|
|
logger = logging.getLogger("playwright_manager")
|
|
|
|
class PlaywrightManager:
|
|
"""
|
|
Verwaltet Browser-Sitzungen mit Playwright, einschließlich Stealth-Modus und Proxy-Einstellungen.
|
|
"""
|
|
|
|
def __init__(self,
|
|
headless: bool = False,
|
|
proxy: Optional[Dict[str, str]] = None,
|
|
browser_type: str = "chromium",
|
|
user_agent: Optional[str] = None,
|
|
screenshots_dir: str = "screenshots",
|
|
slowmo: int = 0,
|
|
window_position: Optional[Tuple[int, int]] = None):
|
|
"""
|
|
Initialisiert den PlaywrightManager.
|
|
|
|
Args:
|
|
headless: Ob der Browser im Headless-Modus ausgeführt werden soll
|
|
proxy: Proxy-Konfiguration (z.B. {'server': 'http://myproxy.com:3128', 'username': 'user', 'password': 'pass'})
|
|
browser_type: Welcher Browser-Typ verwendet werden soll ("chromium", "firefox", oder "webkit")
|
|
user_agent: Benutzerdefinierter User-Agent
|
|
screenshots_dir: Verzeichnis für Screenshots
|
|
slowmo: Verzögerung zwischen Aktionen in Millisekunden (nützlich für Debugging)
|
|
window_position: Optionale Fensterposition als Tupel (x, y)
|
|
"""
|
|
self.headless = headless
|
|
self.proxy = proxy
|
|
self.browser_type = browser_type
|
|
self.user_agent = user_agent
|
|
self.screenshots_dir = screenshots_dir
|
|
self.slowmo = slowmo
|
|
self.window_position = window_position
|
|
|
|
# Stelle sicher, dass das Screenshots-Verzeichnis existiert
|
|
os.makedirs(self.screenshots_dir, exist_ok=True)
|
|
|
|
# Playwright-Instanzen
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.context = None
|
|
self.page = None
|
|
|
|
# Zähler für Wiederhholungsversuche
|
|
self.retry_counter = {}
|
|
|
|
# Lade Stealth-Konfigurationen
|
|
self.stealth_config = self._load_stealth_config()
|
|
|
|
# Browser Protection Service
|
|
self.protection_service = BrowserProtectionService()
|
|
self.protection_applied = False
|
|
self.protection_style = None
|
|
|
|
def _load_stealth_config(self) -> Dict[str, Any]:
|
|
"""Lädt die Stealth-Konfigurationen aus der Datei oder verwendet Standardwerte."""
|
|
try:
|
|
config_dir = Path(__file__).parent.parent / "config"
|
|
stealth_config_path = config_dir / "stealth_config.json"
|
|
|
|
if stealth_config_path.exists():
|
|
with open(stealth_config_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Konnte Stealth-Konfiguration nicht laden: {e}")
|
|
|
|
# Verwende Standardwerte, wenn das Laden fehlschlägt
|
|
return {
|
|
"vendor": "Google Inc.",
|
|
"platform": "Win32",
|
|
"webdriver": False,
|
|
"accept_language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
|
|
"timezone_id": "Europe/Berlin",
|
|
"fingerprint_noise": True,
|
|
"device_scale_factor": 1.0,
|
|
}
|
|
|
|
def start(self) -> Page:
|
|
"""
|
|
Startet die Playwright-Sitzung und gibt die Browser-Seite zurück.
|
|
|
|
Returns:
|
|
Page: Die Browser-Seite
|
|
"""
|
|
if self.page is not None:
|
|
return self.page
|
|
|
|
try:
|
|
self.playwright = sync_playwright().start()
|
|
|
|
# Wähle den Browser-Typ
|
|
if self.browser_type == "firefox":
|
|
browser_instance = self.playwright.firefox
|
|
elif self.browser_type == "webkit":
|
|
browser_instance = self.playwright.webkit
|
|
else:
|
|
browser_instance = self.playwright.chromium
|
|
|
|
# Browser-Startoptionen
|
|
browser_args = []
|
|
|
|
if self.browser_type == "chromium":
|
|
# Chrome-spezifische Argumente für Anti-Bot-Erkennung
|
|
browser_args.extend([
|
|
'--disable-blink-features=AutomationControlled',
|
|
'--disable-features=IsolateOrigins,site-per-process',
|
|
'--disable-site-isolation-trials',
|
|
])
|
|
|
|
# Browser-Launch-Optionen
|
|
launch_options = {
|
|
"headless": self.headless,
|
|
"args": browser_args,
|
|
"slow_mo": self.slowmo
|
|
}
|
|
|
|
# Fensterposition setzen wenn angegeben
|
|
if self.window_position and not self.headless:
|
|
x, y = self.window_position
|
|
browser_args.extend([
|
|
f'--window-position={x},{y}'
|
|
])
|
|
|
|
# Browser starten
|
|
self.browser = browser_instance.launch(**launch_options)
|
|
|
|
# Kontext-Optionen für Stealth-Modus
|
|
context_options = {
|
|
"viewport": {"width": 1920, "height": 1080},
|
|
"device_scale_factor": self.stealth_config.get("device_scale_factor", 1.0),
|
|
"locale": "de-DE",
|
|
"timezone_id": self.stealth_config.get("timezone_id", "Europe/Berlin"),
|
|
"accept_downloads": True,
|
|
}
|
|
|
|
# User-Agent setzen
|
|
if self.user_agent:
|
|
context_options["user_agent"] = self.user_agent
|
|
|
|
# Proxy-Einstellungen, falls vorhanden
|
|
if self.proxy:
|
|
context_options["proxy"] = self.proxy
|
|
|
|
# Browserkontext erstellen
|
|
self.context = self.browser.new_context(**context_options)
|
|
|
|
# JavaScript-Fingerprinting-Schutz
|
|
self._apply_stealth_scripts()
|
|
|
|
# Neue Seite erstellen
|
|
self.page = self.context.new_page()
|
|
|
|
# Event-Listener für Konsolen-Logs
|
|
self.page.on("console", lambda msg: logger.debug(f"BROWSER CONSOLE: {msg.text}"))
|
|
|
|
return self.page
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Starten des Browsers: {e}")
|
|
self.close()
|
|
raise
|
|
|
|
def _apply_stealth_scripts(self):
|
|
"""Wendet JavaScript-Skripte an, um Browser-Fingerprinting zu umgehen."""
|
|
# Diese Skripte überschreiben Eigenschaften, die für Bot-Erkennung verwendet werden
|
|
scripts = [
|
|
# WebDriver-Eigenschaft überschreiben
|
|
"""
|
|
() => {
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|
get: () => false,
|
|
});
|
|
}
|
|
""",
|
|
|
|
# Navigator-Eigenschaften überschreiben
|
|
f"""
|
|
() => {{
|
|
const newProto = navigator.__proto__;
|
|
delete newProto.webdriver;
|
|
navigator.__proto__ = newProto;
|
|
|
|
Object.defineProperty(navigator, 'platform', {{
|
|
get: () => '{self.stealth_config.get("platform", "Win32")}'
|
|
}});
|
|
|
|
Object.defineProperty(navigator, 'languages', {{
|
|
get: () => ['de-DE', 'de', 'en-US', 'en']
|
|
}});
|
|
|
|
Object.defineProperty(navigator, 'vendor', {{
|
|
get: () => '{self.stealth_config.get("vendor", "Google Inc.")}'
|
|
}});
|
|
}}
|
|
""",
|
|
|
|
# Chrome-Objekte hinzufügen, die in normalen Browsern vorhanden sind
|
|
"""
|
|
() => {
|
|
// Fügt chrome.runtime hinzu, falls nicht vorhanden
|
|
if (!window.chrome) {
|
|
window.chrome = {};
|
|
}
|
|
if (!window.chrome.runtime) {
|
|
window.chrome.runtime = {};
|
|
window.chrome.runtime.sendMessage = function() {};
|
|
}
|
|
}
|
|
""",
|
|
|
|
# Plugin-Fingerprinting
|
|
"""
|
|
() => {
|
|
const originalQuery = window.navigator.permissions.query;
|
|
window.navigator.permissions.query = (parameters) => (
|
|
parameters.name === 'notifications' ?
|
|
Promise.resolve({ state: Notification.permission }) :
|
|
originalQuery(parameters)
|
|
);
|
|
}
|
|
"""
|
|
]
|
|
|
|
# Wenn Fingerprint-Noise aktiviert ist, füge zufällige Variationen hinzu
|
|
if self.stealth_config.get("fingerprint_noise", True):
|
|
scripts.append("""
|
|
() => {
|
|
// Canvas-Fingerprinting leicht verändern
|
|
const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
|
|
HTMLCanvasElement.prototype.toDataURL = function(type) {
|
|
const result = originalToDataURL.apply(this, arguments);
|
|
|
|
if (this.width > 16 && this.height > 16) {
|
|
// Kleines Rauschen in Pixels einfügen
|
|
const context = this.getContext('2d');
|
|
const imageData = context.getImageData(0, 0, 2, 2);
|
|
const pixelArray = imageData.data;
|
|
|
|
// Ändere einen zufälligen Pixel leicht
|
|
const randomPixel = Math.floor(Math.random() * pixelArray.length / 4) * 4;
|
|
pixelArray[randomPixel] = (pixelArray[randomPixel] + Math.floor(Math.random() * 10)) % 256;
|
|
|
|
context.putImageData(imageData, 0, 0);
|
|
}
|
|
|
|
return result;
|
|
};
|
|
}
|
|
""")
|
|
|
|
# Skripte auf den Browser-Kontext anwenden
|
|
for script in scripts:
|
|
self.context.add_init_script(script)
|
|
|
|
def navigate_to(self, url: str, wait_until: str = "networkidle", timeout: int = 30000) -> bool:
|
|
"""
|
|
Navigiert zu einer bestimmten URL und wartet, bis die Seite geladen ist.
|
|
|
|
Args:
|
|
url: Die Ziel-URL
|
|
wait_until: Wann die Navigation als abgeschlossen gilt ("load", "domcontentloaded", "networkidle")
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True bei erfolgreicher Navigation, False sonst
|
|
"""
|
|
if self.page is None:
|
|
self.start()
|
|
|
|
try:
|
|
logger.info(f"Navigiere zu: {url}")
|
|
self.page.goto(url, wait_until=wait_until, timeout=timeout)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Navigation zu {url}: {e}")
|
|
self.take_screenshot(f"navigation_error_{int(time.time())}")
|
|
return False
|
|
|
|
def wait_for_selector(self, selector: str, timeout: int = 30000) -> Optional[ElementHandle]:
|
|
"""
|
|
Wartet auf ein Element mit dem angegebenen Selektor.
|
|
|
|
Args:
|
|
selector: CSS- oder XPath-Selektor
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
Optional[ElementHandle]: Das Element oder None, wenn nicht gefunden
|
|
"""
|
|
if self.page is None:
|
|
raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.")
|
|
|
|
try:
|
|
element = self.page.wait_for_selector(selector, timeout=timeout)
|
|
return element
|
|
except Exception as e:
|
|
logger.warning(f"Element nicht gefunden: {selector} - {e}")
|
|
return None
|
|
|
|
def fill_form_field(self, selector: str, value: str, timeout: int = 5000) -> bool:
|
|
"""
|
|
Füllt ein Formularfeld aus.
|
|
|
|
Args:
|
|
selector: Selektor für das Feld
|
|
value: Einzugebender Wert
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
# Auf Element warten
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
|
|
# Element fokussieren
|
|
element.focus()
|
|
time.sleep(random.uniform(0.1, 0.3))
|
|
|
|
# Vorhandenen Text löschen (optional)
|
|
current_value = element.evaluate("el => el.value")
|
|
if current_value:
|
|
element.fill("")
|
|
time.sleep(random.uniform(0.1, 0.2))
|
|
|
|
# Text menschenähnlich eingeben
|
|
for char in value:
|
|
element.type(char, delay=random.uniform(20, 100))
|
|
time.sleep(random.uniform(0.01, 0.05))
|
|
|
|
logger.info(f"Feld {selector} gefüllt mit: {value}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Ausfüllen von {selector}: {e}")
|
|
key = f"fill_{selector}"
|
|
return self._retry_action(key, lambda: self.fill_form_field(selector, value, timeout))
|
|
|
|
def click_element(self, selector: str, force: bool = False, timeout: int = 5000) -> bool:
|
|
"""
|
|
Klickt auf ein Element mit Anti-Bot-Bypass-Strategien.
|
|
|
|
Args:
|
|
selector: Selektor für das Element
|
|
force: Force-Click verwenden
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
# Auf Element warten
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
|
|
# Scroll zum Element
|
|
self.page.evaluate("element => element.scrollIntoView({ behavior: 'smooth', block: 'center' })", element)
|
|
time.sleep(random.uniform(0.3, 0.7))
|
|
|
|
# Menschenähnliches Verhalten - leichte Verzögerung vor dem Klick
|
|
time.sleep(random.uniform(0.2, 0.5))
|
|
|
|
# Element klicken
|
|
element.click(force=force, delay=random.uniform(20, 100))
|
|
|
|
logger.info(f"Element geklickt: {selector}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Klicken auf {selector}: {e}")
|
|
# Bei Fehlern verwende robuste Click-Strategien
|
|
return self.robust_click(selector, timeout)
|
|
|
|
def robust_click(self, selector: str, timeout: int = 5000) -> bool:
|
|
"""
|
|
Robuste Click-Methode mit mehreren Anti-Bot-Bypass-Strategien.
|
|
Speziell für Instagram's Click-Interceptors entwickelt.
|
|
|
|
Args:
|
|
selector: Selektor für das Element
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
logger.info(f"Verwende robuste Click-Strategien für: {selector}")
|
|
|
|
strategies = [
|
|
# Strategie 1: Standard Playwright Click
|
|
lambda: self._strategy_standard_click(selector, timeout),
|
|
|
|
# Strategie 2: Force Click
|
|
lambda: self._strategy_force_click(selector, timeout),
|
|
|
|
# Strategie 3: JavaScript Event Dispatch
|
|
lambda: self._strategy_javascript_click(selector),
|
|
|
|
# Strategie 4: Overlay-Entfernung + Click
|
|
lambda: self._strategy_remove_overlays_click(selector, timeout),
|
|
|
|
# Strategie 5: Focus + Enter (für Buttons/Links)
|
|
lambda: self._strategy_focus_and_enter(selector),
|
|
|
|
# Strategie 6: Mouse Position Click
|
|
lambda: self._strategy_coordinate_click(selector)
|
|
]
|
|
|
|
for i, strategy in enumerate(strategies, 1):
|
|
try:
|
|
logger.debug(f"Versuche Click-Strategie {i} für {selector}")
|
|
if strategy():
|
|
logger.info(f"Click erfolgreich mit Strategie {i} für {selector}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Strategie {i} fehlgeschlagen: {e}")
|
|
continue
|
|
|
|
logger.error(f"Alle Click-Strategien fehlgeschlagen für {selector}")
|
|
return False
|
|
|
|
def _strategy_standard_click(self, selector: str, timeout: int) -> bool:
|
|
"""Strategie 1: Standard Playwright Click"""
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
element.click()
|
|
return True
|
|
|
|
def _strategy_force_click(self, selector: str, timeout: int) -> bool:
|
|
"""Strategie 2: Force Click um Event-Blockierungen zu umgehen"""
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
element.click(force=True)
|
|
return True
|
|
|
|
def _strategy_javascript_click(self, selector: str) -> bool:
|
|
"""Strategie 3: JavaScript Event Dispatch um Overlays zu umgehen"""
|
|
script = f"""
|
|
(function() {{
|
|
const element = document.querySelector('{selector}');
|
|
if (!element) return false;
|
|
|
|
// Erstelle und sende Click-Event
|
|
const event = new MouseEvent('click', {{
|
|
bubbles: true,
|
|
cancelable: true,
|
|
view: window,
|
|
detail: 1,
|
|
button: 0,
|
|
buttons: 1
|
|
}});
|
|
|
|
element.dispatchEvent(event);
|
|
|
|
// Zusätzlich: Focus und Click Events
|
|
element.focus();
|
|
|
|
const clickEvent = new Event('click', {{
|
|
bubbles: true,
|
|
cancelable: true
|
|
}});
|
|
element.dispatchEvent(clickEvent);
|
|
|
|
return true;
|
|
}})();
|
|
"""
|
|
|
|
return self.page.evaluate(script)
|
|
|
|
def _strategy_remove_overlays_click(self, selector: str, timeout: int) -> bool:
|
|
"""Strategie 4: Entferne Click-Interceptors und klicke dann"""
|
|
# Entferne Overlays die Click-Events abfangen
|
|
self._remove_click_interceptors()
|
|
|
|
# Warte kurz damit DOM-Änderungen wirksam werden
|
|
time.sleep(0.2)
|
|
|
|
# Jetzt normaler Click
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
element.click()
|
|
return True
|
|
|
|
def _strategy_focus_and_enter(self, selector: str) -> bool:
|
|
"""Strategie 5: Focus Element und verwende Enter-Taste"""
|
|
script = f"""
|
|
(function() {{
|
|
const element = document.querySelector('{selector}');
|
|
if (!element) return false;
|
|
|
|
// Element fokussieren
|
|
element.focus();
|
|
element.scrollIntoView({{ block: 'center' }});
|
|
|
|
// Enter-Event senden
|
|
const enterEvent = new KeyboardEvent('keydown', {{
|
|
key: 'Enter',
|
|
code: 'Enter',
|
|
keyCode: 13,
|
|
which: 13,
|
|
bubbles: true,
|
|
cancelable: true
|
|
}});
|
|
|
|
element.dispatchEvent(enterEvent);
|
|
|
|
// Zusätzlich keyup Event
|
|
const keyupEvent = new KeyboardEvent('keyup', {{
|
|
key: 'Enter',
|
|
code: 'Enter',
|
|
keyCode: 13,
|
|
which: 13,
|
|
bubbles: true,
|
|
cancelable: true
|
|
}});
|
|
|
|
element.dispatchEvent(keyupEvent);
|
|
|
|
return true;
|
|
}})();
|
|
"""
|
|
|
|
return self.page.evaluate(script)
|
|
|
|
def _strategy_coordinate_click(self, selector: str) -> bool:
|
|
"""Strategie 6: Click auf Koordinaten des Elements"""
|
|
try:
|
|
element = self.page.locator(selector).first
|
|
if not element.is_visible():
|
|
return False
|
|
|
|
# Hole Element-Position
|
|
box = element.bounding_box()
|
|
if not box:
|
|
return False
|
|
|
|
# Klicke in die Mitte des Elements
|
|
x = box['x'] + box['width'] / 2
|
|
y = box['y'] + box['height'] / 2
|
|
|
|
self.page.mouse.click(x, y)
|
|
return True
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
def _remove_click_interceptors(self) -> None:
|
|
"""
|
|
Entfernt invisible Overlays die Click-Events abfangen.
|
|
Speziell für Instagram's Anti-Bot-Maßnahmen entwickelt.
|
|
"""
|
|
script = """
|
|
(function() {
|
|
console.log('AccountForger: Entferne Click-Interceptors...');
|
|
|
|
// Liste typischer Instagram Click-Interceptor Klassen
|
|
const interceptorSelectors = [
|
|
// Instagram's bekannte Interceptor-Klassen
|
|
'.x1lliihq.x1plvlek.xryxfnj',
|
|
'.x1n2onr6.xzkaem6',
|
|
'span[dir="auto"]',
|
|
|
|
// Allgemeine Interceptor-Eigenschaften
|
|
'[style*="pointer-events: all"]',
|
|
'[style*="position: absolute"]',
|
|
'[style*="z-index"]'
|
|
];
|
|
|
|
let removedCount = 0;
|
|
|
|
// Entferne Interceptor-Elemente
|
|
interceptorSelectors.forEach(selector => {
|
|
try {
|
|
const elements = document.querySelectorAll(selector);
|
|
elements.forEach(el => {
|
|
const style = window.getComputedStyle(el);
|
|
|
|
// Prüfe ob Element ein Click-Interceptor ist
|
|
const isInterceptor = (
|
|
style.pointerEvents === 'all' ||
|
|
(style.position === 'absolute' && parseInt(style.zIndex) > 1000) ||
|
|
(el.offsetWidth > 0 && el.offsetHeight > 0 &&
|
|
el.textContent.trim() === '' &&
|
|
style.backgroundColor === 'rgba(0, 0, 0, 0)')
|
|
);
|
|
|
|
if (isInterceptor) {
|
|
// Deaktiviere Pointer-Events
|
|
el.style.pointerEvents = 'none';
|
|
el.style.display = 'none';
|
|
el.style.visibility = 'hidden';
|
|
removedCount++;
|
|
}
|
|
});
|
|
} catch (e) {
|
|
console.warn('Fehler beim Entfernen von Interceptors:', e);
|
|
}
|
|
});
|
|
|
|
// Zusätzlich: Entferne alle unsichtbaren absolute Elemente die über anderen liegen
|
|
const allElements = document.querySelectorAll('*');
|
|
allElements.forEach(el => {
|
|
const style = window.getComputedStyle(el);
|
|
|
|
if (style.position === 'absolute' || style.position === 'fixed') {
|
|
const rect = el.getBoundingClientRect();
|
|
|
|
// Prüfe ob Element unsichtbar aber vorhanden ist
|
|
if (rect.width > 0 && rect.height > 0 &&
|
|
style.opacity !== '0' &&
|
|
style.visibility !== 'hidden' &&
|
|
el.textContent.trim() === '' &&
|
|
parseInt(style.zIndex) > 10) {
|
|
|
|
el.style.pointerEvents = 'none';
|
|
removedCount++;
|
|
}
|
|
}
|
|
});
|
|
|
|
console.log(`AccountForger: ${removedCount} Click-Interceptors entfernt`);
|
|
|
|
// Markiere dass Interceptors entfernt wurden
|
|
window.__accountforge_interceptors_removed = true;
|
|
|
|
return removedCount;
|
|
})();
|
|
"""
|
|
|
|
try:
|
|
removed_count = self.page.evaluate(script)
|
|
if removed_count > 0:
|
|
logger.info(f"Click-Interceptors entfernt: {removed_count}")
|
|
else:
|
|
logger.debug("Keine Click-Interceptors gefunden")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Entfernen von Click-Interceptors: {e}")
|
|
|
|
def select_option(self, selector: str, value: str, timeout: int = 5000) -> bool:
|
|
"""
|
|
Wählt eine Option aus einem Dropdown-Menü.
|
|
|
|
Args:
|
|
selector: Selektor für das Dropdown
|
|
value: Wert oder sichtbarer Text der Option
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
# Auf Element warten
|
|
element = self.wait_for_selector(selector, timeout)
|
|
if not element:
|
|
return False
|
|
|
|
# Option auswählen
|
|
self.page.select_option(selector, value=value)
|
|
|
|
logger.info(f"Option '{value}' ausgewählt in {selector}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Auswahl von '{value}' in {selector}: {e}")
|
|
key = f"select_{selector}"
|
|
return self._retry_action(key, lambda: self.select_option(selector, value, timeout))
|
|
|
|
def is_element_visible(self, selector: str, timeout: int = 5000) -> bool:
|
|
"""
|
|
Prüft, ob ein Element sichtbar ist.
|
|
|
|
Args:
|
|
selector: Selektor für das Element
|
|
timeout: Timeout in Millisekunden
|
|
|
|
Returns:
|
|
bool: True wenn sichtbar, False sonst
|
|
"""
|
|
try:
|
|
element = self.page.wait_for_selector(selector, timeout=timeout, state="visible")
|
|
return element is not None
|
|
except:
|
|
return False
|
|
|
|
def take_screenshot(self, name: str = None) -> str:
|
|
"""
|
|
Erstellt einen Screenshot der aktuellen Seite.
|
|
|
|
Args:
|
|
name: Name für den Screenshot (ohne Dateierweiterung)
|
|
|
|
Returns:
|
|
str: Pfad zum erstellten Screenshot
|
|
"""
|
|
if self.page is None:
|
|
raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.")
|
|
|
|
timestamp = int(time.time())
|
|
filename = f"{name}_{timestamp}.png" if name else f"screenshot_{timestamp}.png"
|
|
path = os.path.join(self.screenshots_dir, filename)
|
|
|
|
self.page.screenshot(path=path, full_page=True)
|
|
logger.info(f"Screenshot erstellt: {path}")
|
|
return path
|
|
|
|
def _retry_action(self, key: str, action_func, max_retries: int = 3) -> bool:
|
|
"""
|
|
Wiederholt eine Aktion bei Fehler.
|
|
|
|
Args:
|
|
key: Eindeutiger Schlüssel für die Aktion
|
|
action_func: Funktion, die ausgeführt werden soll
|
|
max_retries: Maximale Anzahl der Wiederholungen
|
|
|
|
Returns:
|
|
bool: Ergebnis der Aktion
|
|
"""
|
|
if key not in self.retry_counter:
|
|
self.retry_counter[key] = 0
|
|
|
|
self.retry_counter[key] += 1
|
|
|
|
if self.retry_counter[key] <= max_retries:
|
|
logger.info(f"Wiederhole Aktion {key} (Versuch {self.retry_counter[key]} von {max_retries})")
|
|
time.sleep(random.uniform(0.5, 1.0))
|
|
return action_func()
|
|
else:
|
|
logger.warning(f"Maximale Anzahl von Wiederholungen für {key} erreicht")
|
|
self.retry_counter[key] = 0
|
|
return False
|
|
|
|
def apply_protection(self, protection_style: Optional[BrowserProtectionStyle] = None) -> None:
|
|
"""
|
|
Wendet Browser-Schutz an, um versehentliche Benutzerinteraktionen zu verhindern.
|
|
|
|
Args:
|
|
protection_style: Konfiguration für den Schutzstil. Verwendet Standardwerte wenn None.
|
|
"""
|
|
if self.page is None:
|
|
raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.")
|
|
|
|
if protection_style is None:
|
|
protection_style = BrowserProtectionStyle()
|
|
|
|
# Speichere den Stil für spätere Wiederanwendung
|
|
self.protection_style = protection_style
|
|
|
|
# Wende Schutz initial an
|
|
self.protection_service.protect_browser(self.page, protection_style)
|
|
self.protection_applied = True
|
|
|
|
# Registriere Event-Handler für Seitenwechsel
|
|
self._setup_protection_listeners()
|
|
|
|
logger.info(f"Browser-Schutz angewendet mit Level: {protection_style.level.value}")
|
|
|
|
def _setup_protection_listeners(self) -> None:
|
|
"""Setzt Event-Listener auf, um Schutz bei Seitenwechsel neu anzuwenden."""
|
|
if self.page is None:
|
|
return
|
|
|
|
# Bei Navigation (Seitenwechsel) Schutz neu anwenden
|
|
def on_navigation():
|
|
if self.protection_applied and self.protection_style:
|
|
# Kurz warten bis neue Seite geladen ist
|
|
self.page.wait_for_load_state("domcontentloaded")
|
|
# Schutz neu anwenden
|
|
self.protection_service.protect_browser(self.page, self.protection_style)
|
|
logger.debug("Browser-Schutz nach Navigation neu angewendet")
|
|
|
|
# Registriere Handler für verschiedene Events
|
|
self.page.on("framenavigated", lambda frame: on_navigation() if frame == self.page.main_frame else None)
|
|
|
|
# Zusätzlich: Wende Schutz bei DOM-Änderungen neu an
|
|
self.context.add_init_script("""
|
|
// Überwache DOM-Änderungen und wende Schutz neu an wenn nötig
|
|
const observer = new MutationObserver(() => {
|
|
const shield = document.getElementById('accountforge-shield');
|
|
if (!shield && window.__accountforge_protection) {
|
|
// Schutz wurde entfernt, wende neu an
|
|
setTimeout(() => {
|
|
if (!document.getElementById('accountforge-shield')) {
|
|
eval(window.__accountforge_protection);
|
|
}
|
|
}, 100);
|
|
}
|
|
});
|
|
observer.observe(document.body, { childList: true, subtree: true });
|
|
""")
|
|
|
|
def remove_protection(self) -> None:
|
|
"""Entfernt den Browser-Schutz."""
|
|
if self.page is None or not self.protection_applied:
|
|
return
|
|
|
|
self.protection_service.remove_protection(self.page)
|
|
self.protection_applied = False
|
|
self.protection_style = None
|
|
logger.info("Browser-Schutz entfernt")
|
|
|
|
def close(self):
|
|
"""Schließt den Browser und gibt Ressourcen frei."""
|
|
try:
|
|
# Entferne Schutz vor dem Schließen
|
|
if self.protection_applied:
|
|
self.remove_protection()
|
|
|
|
# Seite erst schließen, dann Kontext, dann Browser, dann Playwright
|
|
if self.page:
|
|
try:
|
|
self.page.close()
|
|
except Exception as e:
|
|
logger.warning(f"Fehler beim Schließen der Page: {e}")
|
|
self.page = None
|
|
|
|
if self.context:
|
|
try:
|
|
self.context.close()
|
|
except Exception as e:
|
|
logger.warning(f"Fehler beim Schließen des Context: {e}")
|
|
self.context = None
|
|
|
|
if self.browser:
|
|
try:
|
|
self.browser.close()
|
|
except Exception as e:
|
|
logger.warning(f"Fehler beim Schließen des Browsers: {e}")
|
|
self.browser = None
|
|
|
|
# Playwright stop mit Retry-Logik
|
|
if self.playwright:
|
|
try:
|
|
self.playwright.stop()
|
|
except Exception as e:
|
|
logger.warning(f"Fehler beim Stoppen von Playwright: {e}")
|
|
# Versuche force stop
|
|
try:
|
|
import time
|
|
time.sleep(0.5) # Kurz warten
|
|
self.playwright.stop()
|
|
except Exception as e2:
|
|
logger.error(f"Force stop fehlgeschlagen: {e2}")
|
|
self.playwright = None
|
|
|
|
logger.info("Browser-Sitzung erfolgreich geschlossen")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Schließen des Browsers: {e}")
|
|
# Versuche Ressourcen trotzdem zu nullen
|
|
self.page = None
|
|
self.context = None
|
|
self.browser = None
|
|
self.playwright = None
|
|
|
|
def __enter__(self):
|
|
"""Kontext-Manager-Eintritt."""
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Kontext-Manager-Austritt."""
|
|
self.close()
|
|
|
|
|
|
# Beispielnutzung, wenn direkt ausgeführt
|
|
if __name__ == "__main__":
|
|
# Konfiguriere Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Beispiel für einen Proxy (ohne Anmeldedaten)
|
|
proxy_config = {
|
|
"server": "http://example-proxy.com:8080"
|
|
}
|
|
|
|
# Browser starten und zu einer Seite navigieren
|
|
with PlaywrightManager(headless=False) as manager:
|
|
manager.navigate_to("https://www.instagram.com")
|
|
time.sleep(5) # Kurze Pause zum Anzeigen der Seite |