492 Zeilen
20 KiB
Python
492 Zeilen
20 KiB
Python
"""
|
|
TikTok-Utils - Hilfsfunktionen für die TikTok-Automatisierung.
|
|
"""
|
|
|
|
import re
|
|
import time
|
|
import random
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
|
|
from .tiktok_selectors import TikTokSelectors
|
|
from utils.logger import setup_logger
|
|
|
|
# Konfiguriere Logger
|
|
logger = setup_logger("tiktok_utils")
|
|
|
|
class TikTokUtils:
|
|
"""
|
|
Hilfsfunktionen für die TikTok-Automatisierung.
|
|
Enthält allgemeine Hilfsmethoden und kleinere Funktionen.
|
|
"""
|
|
|
|
def __init__(self, automation):
|
|
"""
|
|
Initialisiert die TikTok-Utils.
|
|
|
|
Args:
|
|
automation: Referenz auf die Hauptautomatisierungsklasse
|
|
"""
|
|
self.automation = automation
|
|
# Browser wird direkt von automation verwendet
|
|
self.selectors = TikTokSelectors()
|
|
|
|
logger.debug("TikTok-Utils initialisiert")
|
|
|
|
def _ensure_browser(self) -> bool:
|
|
"""
|
|
Stellt sicher, dass die Browser-Referenz verfügbar ist.
|
|
|
|
Returns:
|
|
bool: True wenn Browser verfügbar, False sonst
|
|
"""
|
|
if self.automation.browser is None:
|
|
logger.error("Browser-Referenz nicht verfügbar")
|
|
return False
|
|
|
|
return True
|
|
|
|
def handle_cookie_banner(self) -> bool:
|
|
"""
|
|
Behandelt den Cookie-Banner, falls angezeigt.
|
|
|
|
Returns:
|
|
bool: True wenn Banner behandelt wurde oder nicht existiert, False bei Fehler
|
|
"""
|
|
if not self._ensure_browser():
|
|
return False
|
|
|
|
try:
|
|
# Cookie-Dialoge in TikTok prüfen
|
|
cookie_selectors = [
|
|
"button[data-e2e='cookie-banner-reject']",
|
|
"button:contains('Ablehnen')",
|
|
"button:contains('Nur erforderliche')",
|
|
"button:contains('Reject')",
|
|
"button[data-e2e='cookie-banner-accept']"
|
|
]
|
|
|
|
for selector in cookie_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=2000):
|
|
logger.info(f"Cookie-Banner erkannt: {selector}")
|
|
|
|
# Versuche, den Ablehnen-Button zu klicken
|
|
if "reject" in selector.lower() or "ablehnen" in selector.lower() or "erforderliche" in selector.lower():
|
|
if self.automation.browser.click_element(selector):
|
|
logger.info("Cookie-Banner erfolgreich abgelehnt")
|
|
time.sleep(random.uniform(0.5, 1.5))
|
|
return True
|
|
|
|
# Fallback: Akzeptieren-Button klicken, wenn Ablehnen nicht funktioniert
|
|
else:
|
|
if self.automation.browser.click_element(selector):
|
|
logger.info("Cookie-Banner erfolgreich akzeptiert")
|
|
time.sleep(random.uniform(0.5, 1.5))
|
|
return True
|
|
|
|
# Wenn kein Cookie-Banner gefunden wurde
|
|
logger.debug("Kein Cookie-Banner erkannt")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Behandeln des Cookie-Banners: {e}")
|
|
return False
|
|
|
|
def extract_username_from_url(self, url: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert den Benutzernamen aus einer TikTok-URL.
|
|
|
|
Args:
|
|
url: Die TikTok-URL
|
|
|
|
Returns:
|
|
Optional[str]: Der extrahierte Benutzername oder None
|
|
"""
|
|
try:
|
|
# Muster für Profil-URLs
|
|
patterns = [
|
|
r'tiktok\.com/@([a-zA-Z0-9._]+)/?(?:$|\?|#)',
|
|
r'tiktok\.com/user/([a-zA-Z0-9._]+)/?',
|
|
r'tiktok\.com/video/[^/]+/by/([a-zA-Z0-9._]+)/?'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, url)
|
|
if match:
|
|
username = match.group(1)
|
|
# Einige Ausnahmen filtern
|
|
if username not in ["explore", "accounts", "video", "foryou", "trending"]:
|
|
return username
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Extrahieren des Benutzernamens aus der URL: {e}")
|
|
return None
|
|
|
|
def get_current_username(self) -> Optional[str]:
|
|
"""
|
|
Versucht, den Benutzernamen des aktuell angemeldeten Kontos zu ermitteln.
|
|
|
|
Returns:
|
|
Optional[str]: Der Benutzername oder None, wenn nicht gefunden
|
|
"""
|
|
if not self._ensure_browser():
|
|
return None
|
|
|
|
try:
|
|
# Verschiedene Methoden zur Erkennung des Benutzernamens
|
|
|
|
# 1. Benutzername aus URL des Profils
|
|
profile_link_selectors = [
|
|
"a[href*='/@']",
|
|
"a[href*='/user/']"
|
|
]
|
|
|
|
for selector in profile_link_selectors:
|
|
element = self.automation.browser.wait_for_selector(selector, timeout=2000)
|
|
if element:
|
|
href = element.get_attribute("href")
|
|
if href:
|
|
username = self.extract_username_from_url(href)
|
|
if username:
|
|
logger.info(f"Benutzername aus Profil-Link ermittelt: {username}")
|
|
return username
|
|
|
|
# 2. Profilicon prüfen auf data-e2e-Attribut
|
|
profile_icon_selectors = [
|
|
"button[data-e2e='profile-icon']",
|
|
"svg[data-e2e='profile-icon']"
|
|
]
|
|
|
|
for selector in profile_icon_selectors:
|
|
element = self.automation.browser.wait_for_selector(selector, timeout=2000)
|
|
if element:
|
|
# Prüfen, ob ein Elternelement möglicherweise ein data-e2e-Attribut mit dem Benutzernamen hat
|
|
parent = element.evaluate("node => node.parentElement")
|
|
if parent:
|
|
data_e2e = parent.get_attribute("data-e2e")
|
|
if data_e2e and "profile" in data_e2e:
|
|
username_match = re.search(r'profile-([a-zA-Z0-9._]+)', data_e2e)
|
|
if username_match:
|
|
username = username_match.group(1)
|
|
logger.info(f"Benutzername aus data-e2e-Attribut ermittelt: {username}")
|
|
return username
|
|
|
|
# 3. TikTok-spezifisches Element mit Benutzername suchen
|
|
username_element = self.automation.browser.wait_for_selector("h1[data-e2e='user-title']", timeout=2000)
|
|
if username_element:
|
|
username = username_element.inner_text().strip()
|
|
if username:
|
|
logger.info(f"Benutzername aus user-title-Element ermittelt: {username}")
|
|
return username
|
|
|
|
logger.warning("Konnte Benutzernamen nicht ermitteln")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Ermittlung des Benutzernamens: {e}")
|
|
return None
|
|
|
|
def wait_for_navigation(self, expected_url_pattern: str = None,
|
|
timeout: int = 30000, check_interval: int = 500) -> bool:
|
|
"""
|
|
Wartet, bis die Seite zu einer URL mit einem bestimmten Muster navigiert.
|
|
|
|
Args:
|
|
expected_url_pattern: Erwartetes Muster der URL (Regex)
|
|
timeout: Zeitlimit in Millisekunden
|
|
check_interval: Intervall zwischen den Prüfungen in Millisekunden
|
|
|
|
Returns:
|
|
bool: True wenn die Navigation erfolgreich war, False sonst
|
|
"""
|
|
if not self._ensure_browser():
|
|
return False
|
|
|
|
try:
|
|
start_time = time.time()
|
|
end_time = start_time + (timeout / 1000)
|
|
|
|
while time.time() < end_time:
|
|
current_url = self.automation.browser.page.url
|
|
|
|
if expected_url_pattern and re.search(expected_url_pattern, current_url):
|
|
logger.info(f"Navigation zu URL mit Muster '{expected_url_pattern}' erfolgreich")
|
|
return True
|
|
|
|
# Kurze Pause vor der nächsten Prüfung
|
|
time.sleep(check_interval / 1000)
|
|
|
|
logger.warning(f"Zeitüberschreitung bei Navigation zu URL mit Muster '{expected_url_pattern}'")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Warten auf Navigation: {e}")
|
|
return False
|
|
|
|
def handle_dialog_or_popup(self, expected_text: Union[str, List[str]] = None,
|
|
action: str = "close", timeout: int = 5000) -> bool:
|
|
"""
|
|
Behandelt einen Dialog oder Popup.
|
|
|
|
Args:
|
|
expected_text: Erwarteter Text im Dialog oder Liste von Texten
|
|
action: Aktion ("close", "confirm", "cancel")
|
|
timeout: Zeitlimit in Millisekunden
|
|
|
|
Returns:
|
|
bool: True wenn der Dialog erfolgreich behandelt wurde, False sonst
|
|
"""
|
|
if not self._ensure_browser():
|
|
return False
|
|
|
|
try:
|
|
# Dialog-Element suchen
|
|
dialog_selector = "div[role='dialog']"
|
|
dialog_element = self.automation.browser.wait_for_selector(dialog_selector, timeout=timeout)
|
|
|
|
if not dialog_element:
|
|
logger.debug("Kein Dialog gefunden")
|
|
return False
|
|
|
|
logger.info("Dialog gefunden")
|
|
|
|
# Text im Dialog prüfen, falls angegeben
|
|
if expected_text:
|
|
if isinstance(expected_text, str):
|
|
expected_text = [expected_text]
|
|
|
|
dialog_text = dialog_element.inner_text()
|
|
text_found = False
|
|
|
|
for text in expected_text:
|
|
if text in dialog_text:
|
|
logger.info(f"Erwarteter Text im Dialog gefunden: '{text}'")
|
|
text_found = True
|
|
break
|
|
|
|
if not text_found:
|
|
logger.warning(f"Erwarteter Text nicht im Dialog gefunden: {expected_text}")
|
|
return False
|
|
|
|
# Aktion ausführen
|
|
if action == "close":
|
|
# Schließen-Button suchen und klicken
|
|
close_button_selectors = [
|
|
"button[data-e2e='modal-close']",
|
|
"svg[data-e2e='modal-close']",
|
|
"button.css-1afoydx-StyledCloseButton",
|
|
"div[role='dialog'] button:first-child"
|
|
]
|
|
|
|
for selector in close_button_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=1000):
|
|
if self.automation.browser.click_element(selector):
|
|
logger.info("Dialog geschlossen")
|
|
return True
|
|
|
|
# Wenn kein Schließen-Button gefunden wurde, Escape-Taste drücken
|
|
self.automation.browser.page.keyboard.press("Escape")
|
|
logger.info("Dialog mit Escape-Taste geschlossen")
|
|
|
|
elif action == "confirm":
|
|
# Bestätigen-Button suchen und klicken
|
|
confirm_button_selectors = [
|
|
"button[type='submit']",
|
|
"button:contains('OK')",
|
|
"button:contains('Ja')",
|
|
"button:contains('Yes')",
|
|
"button:contains('Bestätigen')",
|
|
"button:contains('Confirm')"
|
|
]
|
|
|
|
for selector in confirm_button_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=1000):
|
|
if self.automation.browser.click_element(selector):
|
|
logger.info("Dialog bestätigt")
|
|
return True
|
|
|
|
elif action == "cancel":
|
|
# Abbrechen-Button suchen und klicken
|
|
cancel_button_selectors = [
|
|
"button:contains('Abbrechen')",
|
|
"button:contains('Cancel')",
|
|
"button:contains('Nein')",
|
|
"button:contains('No')"
|
|
]
|
|
|
|
for selector in cancel_button_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=1000):
|
|
if self.automation.browser.click_element(selector):
|
|
logger.info("Dialog abgebrochen")
|
|
return True
|
|
|
|
logger.warning(f"Konnte keine {action}-Aktion für den Dialog ausführen")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Dialog-Behandlung: {e}")
|
|
return False
|
|
|
|
def handle_rate_limiting(self, rotate_proxy: bool = True) -> bool:
|
|
"""
|
|
Behandelt eine Rate-Limiting-Situation.
|
|
|
|
Args:
|
|
rotate_proxy: Ob der Proxy rotiert werden soll
|
|
|
|
Returns:
|
|
bool: True wenn erfolgreich behandelt, False sonst
|
|
"""
|
|
if not self._ensure_browser():
|
|
return False
|
|
|
|
try:
|
|
logger.warning("Rate-Limiting erkannt, warte und versuche es erneut")
|
|
|
|
# Screenshot erstellen
|
|
self.automation._take_screenshot("rate_limit_detected")
|
|
|
|
# Proxy rotieren, falls gewünscht
|
|
if rotate_proxy and self.automation.use_proxy:
|
|
success = self.automation._rotate_proxy()
|
|
if not success:
|
|
logger.warning("Konnte Proxy nicht rotieren")
|
|
|
|
# Längere Wartezeit
|
|
wait_time = random.uniform(120, 300) # 2-5 Minuten
|
|
logger.info(f"Warte {wait_time:.1f} Sekunden vor dem nächsten Versuch")
|
|
time.sleep(wait_time)
|
|
|
|
# Seite neuladen
|
|
self.automation.browser.page.reload()
|
|
self.automation.human_behavior.wait_for_page_load()
|
|
|
|
# Prüfen, ob Rate-Limiting noch aktiv ist
|
|
rate_limit_texts = [
|
|
"bitte warte einige minuten",
|
|
"please wait a few minutes",
|
|
"try again later",
|
|
"versuche es später erneut",
|
|
"zu viele anfragen",
|
|
"too many requests"
|
|
]
|
|
|
|
page_content = self.automation.browser.page.content().lower()
|
|
|
|
still_rate_limited = False
|
|
for text in rate_limit_texts:
|
|
if text in page_content:
|
|
still_rate_limited = True
|
|
break
|
|
|
|
if still_rate_limited:
|
|
logger.warning("Immer noch Rate-Limited nach dem Warten")
|
|
return False
|
|
else:
|
|
logger.info("Rate-Limiting scheint aufgehoben zu sein")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Behandlung des Rate-Limitings: {e}")
|
|
return False
|
|
|
|
def is_logged_in(self) -> bool:
|
|
"""
|
|
Überprüft, ob der Benutzer bei TikTok angemeldet ist.
|
|
|
|
Returns:
|
|
bool: True wenn angemeldet, False sonst
|
|
"""
|
|
if not self._ensure_browser():
|
|
return False
|
|
|
|
try:
|
|
# Erfolgsindikatoren überprüfen
|
|
success_indicators = TikTokSelectors.SUCCESS_INDICATORS
|
|
|
|
for selector in success_indicators:
|
|
if self.automation.browser.is_element_visible(selector, timeout=2000):
|
|
logger.info(f"Benutzer ist angemeldet (Indikator: {selector})")
|
|
return True
|
|
|
|
# URL überprüfen
|
|
current_url = self.automation.browser.page.url
|
|
if "/foryou" in current_url or "tiktok.com/explore" in current_url:
|
|
logger.info("Benutzer ist angemeldet (URL-Check)")
|
|
return True
|
|
|
|
# Anmelden-Button prüfen - wenn sichtbar, dann nicht angemeldet
|
|
login_button_selectors = [
|
|
TikTokSelectors.LOGIN_BUTTON_LEFT,
|
|
TikTokSelectors.LOGIN_BUTTON_RIGHT
|
|
]
|
|
|
|
for selector in login_button_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=2000):
|
|
logger.info("Benutzer ist nicht angemeldet (Anmelde-Button sichtbar)")
|
|
return False
|
|
|
|
# Profilicon checken - wenn sichtbar, dann angemeldet
|
|
profile_selectors = [
|
|
"button[data-e2e='profile-icon']",
|
|
"svg[data-e2e='profile-icon']"
|
|
]
|
|
|
|
for selector in profile_selectors:
|
|
if self.automation.browser.is_element_visible(selector, timeout=2000):
|
|
logger.info("Benutzer ist angemeldet (Profilicon sichtbar)")
|
|
return True
|
|
|
|
logger.warning("Konnte Login-Status nicht eindeutig bestimmen")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei der Überprüfung des Login-Status: {e}")
|
|
return False
|
|
|
|
def extract_verification_code_from_email(self, email_body: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert den Verifizierungscode aus einer E-Mail.
|
|
|
|
Args:
|
|
email_body: Der E-Mail-Text
|
|
|
|
Returns:
|
|
Optional[str]: Der Verifizierungscode oder None, wenn nicht gefunden
|
|
"""
|
|
try:
|
|
# Muster für TikTok-Verifizierungscodes
|
|
patterns = [
|
|
r'(\d{6}) ist dein Bestätigungscode',
|
|
r'(\d{6}) ist dein TikTok-Code',
|
|
r'(\d{6}) is your TikTok code',
|
|
r'(\d{6}) is your verification code',
|
|
r'Dein Bestätigungscode lautet (\d{6})',
|
|
r'Your verification code is (\d{6})',
|
|
r'Verification code: (\d{6})',
|
|
r'Bestätigungscode: (\d{6})',
|
|
r'TikTok code: (\d{6})',
|
|
r'TikTok-Code: (\d{6})'
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, email_body)
|
|
if match:
|
|
code = match.group(1)
|
|
logger.info(f"Verifizierungscode aus E-Mail extrahiert: {code}")
|
|
return code
|
|
|
|
# Allgemeine Suche nach 6-stelligen Zahlen, wenn keine spezifischen Muster passen
|
|
general_match = re.search(r'[^\d](\d{6})[^\d]', email_body)
|
|
if general_match:
|
|
code = general_match.group(1)
|
|
logger.info(f"6-stelliger Code aus E-Mail extrahiert: {code}")
|
|
return code
|
|
|
|
logger.warning("Kein Verifizierungscode in der E-Mail gefunden")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Extrahieren des Verifizierungscodes aus der E-Mail: {e}")
|
|
return None |