Files
test-main/browser/playwright_manager.py
Claude Project Manager 08ed938105 Initial commit
2025-07-03 21:11:05 +02:00

517 Zeilen
19 KiB
Python

"""
Playwright Manager - Hauptklasse für die Browser-Steuerung mit Anti-Bot-Erkennung
"""
import os
import json
import logging
import random
import time
from pathlib import Path
from typing import Dict, Optional, List, Any, Tuple
from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext, ElementHandle
# Konfiguriere Logger
logger = logging.getLogger("playwright_manager")
class PlaywrightManager:
"""
Verwaltet Browser-Sitzungen mit Playwright, einschließlich Stealth-Modus und Proxy-Einstellungen.
"""
def __init__(self,
headless: bool = False,
proxy: Optional[Dict[str, str]] = None,
browser_type: str = "chromium",
user_agent: Optional[str] = None,
screenshots_dir: str = "screenshots",
slowmo: int = 0):
"""
Initialisiert den PlaywrightManager.
Args:
headless: Ob der Browser im Headless-Modus ausgeführt werden soll
proxy: Proxy-Konfiguration (z.B. {'server': 'http://myproxy.com:3128', 'username': 'user', 'password': 'pass'})
browser_type: Welcher Browser-Typ verwendet werden soll ("chromium", "firefox", oder "webkit")
user_agent: Benutzerdefinierter User-Agent
screenshots_dir: Verzeichnis für Screenshots
slowmo: Verzögerung zwischen Aktionen in Millisekunden (nützlich für Debugging)
"""
self.headless = headless
self.proxy = proxy
self.browser_type = browser_type
self.user_agent = user_agent
self.screenshots_dir = screenshots_dir
self.slowmo = slowmo
# Stelle sicher, dass das Screenshots-Verzeichnis existiert
os.makedirs(self.screenshots_dir, exist_ok=True)
# Playwright-Instanzen
self.playwright = None
self.browser = None
self.context = None
self.page = None
# Zähler für Wiederhholungsversuche
self.retry_counter = {}
# Lade Stealth-Konfigurationen
self.stealth_config = self._load_stealth_config()
def _load_stealth_config(self) -> Dict[str, Any]:
"""Lädt die Stealth-Konfigurationen aus der Datei oder verwendet Standardwerte."""
try:
config_dir = Path(__file__).parent.parent / "config"
stealth_config_path = config_dir / "stealth_config.json"
if stealth_config_path.exists():
with open(stealth_config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Konnte Stealth-Konfiguration nicht laden: {e}")
# Verwende Standardwerte, wenn das Laden fehlschlägt
return {
"vendor": "Google Inc.",
"platform": "Win32",
"webdriver": False,
"accept_language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"timezone_id": "Europe/Berlin",
"fingerprint_noise": True,
"device_scale_factor": 1.0,
}
def start(self) -> Page:
"""
Startet die Playwright-Sitzung und gibt die Browser-Seite zurück.
Returns:
Page: Die Browser-Seite
"""
if self.page is not None:
return self.page
try:
self.playwright = sync_playwright().start()
# Wähle den Browser-Typ
if self.browser_type == "firefox":
browser_instance = self.playwright.firefox
elif self.browser_type == "webkit":
browser_instance = self.playwright.webkit
else:
browser_instance = self.playwright.chromium
# Browser-Startoptionen
browser_args = []
if self.browser_type == "chromium":
# Chrome-spezifische Argumente für Anti-Bot-Erkennung
browser_args.extend([
'--disable-blink-features=AutomationControlled',
'--disable-features=IsolateOrigins,site-per-process',
'--disable-site-isolation-trials',
])
# Browser starten
self.browser = browser_instance.launch(
headless=self.headless,
args=browser_args,
slow_mo=self.slowmo
)
# Kontext-Optionen für Stealth-Modus
context_options = {
"viewport": {"width": 1920, "height": 1080},
"device_scale_factor": self.stealth_config.get("device_scale_factor", 1.0),
"locale": "de-DE",
"timezone_id": self.stealth_config.get("timezone_id", "Europe/Berlin"),
"accept_downloads": True,
}
# User-Agent setzen
if self.user_agent:
context_options["user_agent"] = self.user_agent
# Proxy-Einstellungen, falls vorhanden
if self.proxy:
context_options["proxy"] = self.proxy
# Browserkontext erstellen
self.context = self.browser.new_context(**context_options)
# JavaScript-Fingerprinting-Schutz
self._apply_stealth_scripts()
# Neue Seite erstellen
self.page = self.context.new_page()
# Event-Listener für Konsolen-Logs
self.page.on("console", lambda msg: logger.debug(f"BROWSER CONSOLE: {msg.text}"))
return self.page
except Exception as e:
logger.error(f"Fehler beim Starten des Browsers: {e}")
self.close()
raise
def _apply_stealth_scripts(self):
"""Wendet JavaScript-Skripte an, um Browser-Fingerprinting zu umgehen."""
# Diese Skripte überschreiben Eigenschaften, die für Bot-Erkennung verwendet werden
scripts = [
# WebDriver-Eigenschaft überschreiben
"""
() => {
Object.defineProperty(navigator, 'webdriver', {
get: () => false,
});
}
""",
# Navigator-Eigenschaften überschreiben
f"""
() => {{
const newProto = navigator.__proto__;
delete newProto.webdriver;
navigator.__proto__ = newProto;
Object.defineProperty(navigator, 'platform', {{
get: () => '{self.stealth_config.get("platform", "Win32")}'
}});
Object.defineProperty(navigator, 'languages', {{
get: () => ['de-DE', 'de', 'en-US', 'en']
}});
Object.defineProperty(navigator, 'vendor', {{
get: () => '{self.stealth_config.get("vendor", "Google Inc.")}'
}});
}}
""",
# Chrome-Objekte hinzufügen, die in normalen Browsern vorhanden sind
"""
() => {
// Fügt chrome.runtime hinzu, falls nicht vorhanden
if (!window.chrome) {
window.chrome = {};
}
if (!window.chrome.runtime) {
window.chrome.runtime = {};
window.chrome.runtime.sendMessage = function() {};
}
}
""",
# Plugin-Fingerprinting
"""
() => {
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
}
"""
]
# Wenn Fingerprint-Noise aktiviert ist, füge zufällige Variationen hinzu
if self.stealth_config.get("fingerprint_noise", True):
scripts.append("""
() => {
// Canvas-Fingerprinting leicht verändern
const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function(type) {
const result = originalToDataURL.apply(this, arguments);
if (this.width > 16 && this.height > 16) {
// Kleines Rauschen in Pixels einfügen
const context = this.getContext('2d');
const imageData = context.getImageData(0, 0, 2, 2);
const pixelArray = imageData.data;
// Ändere einen zufälligen Pixel leicht
const randomPixel = Math.floor(Math.random() * pixelArray.length / 4) * 4;
pixelArray[randomPixel] = (pixelArray[randomPixel] + Math.floor(Math.random() * 10)) % 256;
context.putImageData(imageData, 0, 0);
}
return result;
};
}
""")
# Skripte auf den Browser-Kontext anwenden
for script in scripts:
self.context.add_init_script(script)
def navigate_to(self, url: str, wait_until: str = "networkidle", timeout: int = 30000) -> bool:
"""
Navigiert zu einer bestimmten URL und wartet, bis die Seite geladen ist.
Args:
url: Die Ziel-URL
wait_until: Wann die Navigation als abgeschlossen gilt ("load", "domcontentloaded", "networkidle")
timeout: Timeout in Millisekunden
Returns:
bool: True bei erfolgreicher Navigation, False sonst
"""
if self.page is None:
self.start()
try:
logger.info(f"Navigiere zu: {url}")
self.page.goto(url, wait_until=wait_until, timeout=timeout)
return True
except Exception as e:
logger.error(f"Fehler bei der Navigation zu {url}: {e}")
self.take_screenshot(f"navigation_error_{int(time.time())}")
return False
def wait_for_selector(self, selector: str, timeout: int = 30000) -> Optional[ElementHandle]:
"""
Wartet auf ein Element mit dem angegebenen Selektor.
Args:
selector: CSS- oder XPath-Selektor
timeout: Timeout in Millisekunden
Returns:
Optional[ElementHandle]: Das Element oder None, wenn nicht gefunden
"""
if self.page is None:
raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.")
try:
element = self.page.wait_for_selector(selector, timeout=timeout)
return element
except Exception as e:
logger.warning(f"Element nicht gefunden: {selector} - {e}")
return None
def fill_form_field(self, selector: str, value: str, timeout: int = 5000) -> bool:
"""
Füllt ein Formularfeld aus.
Args:
selector: Selektor für das Feld
value: Einzugebender Wert
timeout: Timeout in Millisekunden
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
# Auf Element warten
element = self.wait_for_selector(selector, timeout)
if not element:
return False
# Element fokussieren
element.focus()
time.sleep(random.uniform(0.1, 0.3))
# Vorhandenen Text löschen (optional)
current_value = element.evaluate("el => el.value")
if current_value:
element.fill("")
time.sleep(random.uniform(0.1, 0.2))
# Text menschenähnlich eingeben
for char in value:
element.type(char, delay=random.uniform(20, 100))
time.sleep(random.uniform(0.01, 0.05))
logger.info(f"Feld {selector} gefüllt mit: {value}")
return True
except Exception as e:
logger.error(f"Fehler beim Ausfüllen von {selector}: {e}")
key = f"fill_{selector}"
return self._retry_action(key, lambda: self.fill_form_field(selector, value, timeout))
def click_element(self, selector: str, force: bool = False, timeout: int = 5000) -> bool:
"""
Klickt auf ein Element.
Args:
selector: Selektor für das Element
force: Force-Click verwenden
timeout: Timeout in Millisekunden
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
# Auf Element warten
element = self.wait_for_selector(selector, timeout)
if not element:
return False
# Scroll zum Element
self.page.evaluate("element => element.scrollIntoView({ behavior: 'smooth', block: 'center' })", element)
time.sleep(random.uniform(0.3, 0.7))
# Menschenähnliches Verhalten - leichte Verzögerung vor dem Klick
time.sleep(random.uniform(0.2, 0.5))
# Element klicken
element.click(force=force, delay=random.uniform(20, 100))
logger.info(f"Element geklickt: {selector}")
return True
except Exception as e:
logger.error(f"Fehler beim Klicken auf {selector}: {e}")
key = f"click_{selector}"
return self._retry_action(key, lambda: self.click_element(selector, force, timeout))
def select_option(self, selector: str, value: str, timeout: int = 5000) -> bool:
"""
Wählt eine Option aus einem Dropdown-Menü.
Args:
selector: Selektor für das Dropdown
value: Wert oder sichtbarer Text der Option
timeout: Timeout in Millisekunden
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
# Auf Element warten
element = self.wait_for_selector(selector, timeout)
if not element:
return False
# Option auswählen
self.page.select_option(selector, value=value)
logger.info(f"Option '{value}' ausgewählt in {selector}")
return True
except Exception as e:
logger.error(f"Fehler bei der Auswahl von '{value}' in {selector}: {e}")
key = f"select_{selector}"
return self._retry_action(key, lambda: self.select_option(selector, value, timeout))
def is_element_visible(self, selector: str, timeout: int = 5000) -> bool:
"""
Prüft, ob ein Element sichtbar ist.
Args:
selector: Selektor für das Element
timeout: Timeout in Millisekunden
Returns:
bool: True wenn sichtbar, False sonst
"""
try:
element = self.page.wait_for_selector(selector, timeout=timeout, state="visible")
return element is not None
except:
return False
def take_screenshot(self, name: str = None) -> str:
"""
Erstellt einen Screenshot der aktuellen Seite.
Args:
name: Name für den Screenshot (ohne Dateierweiterung)
Returns:
str: Pfad zum erstellten Screenshot
"""
if self.page is None:
raise ValueError("Browser nicht gestartet. Rufe zuerst start() auf.")
timestamp = int(time.time())
filename = f"{name}_{timestamp}.png" if name else f"screenshot_{timestamp}.png"
path = os.path.join(self.screenshots_dir, filename)
self.page.screenshot(path=path, full_page=True)
logger.info(f"Screenshot erstellt: {path}")
return path
def _retry_action(self, key: str, action_func, max_retries: int = 3) -> bool:
"""
Wiederholt eine Aktion bei Fehler.
Args:
key: Eindeutiger Schlüssel für die Aktion
action_func: Funktion, die ausgeführt werden soll
max_retries: Maximale Anzahl der Wiederholungen
Returns:
bool: Ergebnis der Aktion
"""
if key not in self.retry_counter:
self.retry_counter[key] = 0
self.retry_counter[key] += 1
if self.retry_counter[key] <= max_retries:
logger.info(f"Wiederhole Aktion {key} (Versuch {self.retry_counter[key]} von {max_retries})")
time.sleep(random.uniform(0.5, 1.0))
return action_func()
else:
logger.warning(f"Maximale Anzahl von Wiederholungen für {key} erreicht")
self.retry_counter[key] = 0
return False
def close(self):
"""Schließt den Browser und gibt Ressourcen frei."""
try:
if self.page:
self.page.close()
self.page = None
if self.context:
self.context.close()
self.context = None
if self.browser:
self.browser.close()
self.browser = None
if self.playwright:
self.playwright.stop()
self.playwright = None
logger.info("Browser-Sitzung geschlossen")
except Exception as e:
logger.error(f"Fehler beim Schließen des Browsers: {e}")
def __enter__(self):
"""Kontext-Manager-Eintritt."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Kontext-Manager-Austritt."""
self.close()
# Beispielnutzung, wenn direkt ausgeführt
if __name__ == "__main__":
# Konfiguriere Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Beispiel für einen Proxy (ohne Anmeldedaten)
proxy_config = {
"server": "http://example-proxy.com:8080"
}
# Browser starten und zu einer Seite navigieren
with PlaywrightManager(headless=False) as manager:
manager.navigate_to("https://www.instagram.com")
time.sleep(5) # Kurze Pause zum Anzeigen der Seite