# social_networks/tiktok/tiktok_registration_clean.py """ TikTok Registration Module - Clean Architecture Implementation Handles the complete TikTok account registration workflow. """ import time from typing import Dict, Any, Optional, List from dataclasses import dataclass from enum import Enum from .tiktok_selectors import TikTokSelectors from .tiktok_workflow import TikTokWorkflow from utils.logger import setup_logger logger = setup_logger("tiktok_registration") class RegistrationStage(Enum): """Enumeration of registration workflow stages.""" NAVIGATION = "navigation" COOKIE_CONSENT = "cookie_consent" LOGIN_CLICK = "login_click" REGISTER_CLICK = "register_click" PHONE_EMAIL_SELECTION = "phone_email_selection" METHOD_SELECTION = "method_selection" BIRTHDAY_ENTRY = "birthday_entry" EMAIL_ENTRY = "email_entry" PASSWORD_ENTRY = "password_entry" CODE_SENDING = "code_sending" CODE_VERIFICATION = "code_verification" USERNAME_ENTRY = "username_entry" COMPLETION = "completion" @dataclass class RegistrationConfig: """Configuration for registration process.""" max_retries: int = 3 retry_delay: float = 2.0 timeout: int = 5000 human_delay_min: float = 0.5 human_delay_max: float = 2.0 class TikTokRegistration: """ Clean implementation of TikTok account registration. Follows Single Responsibility Principle and Clean Architecture. """ def __init__(self, automation): """ Initialize TikTok registration handler. Args: automation: Parent automation instance with browser and utilities """ self.automation = automation self.selectors = TikTokSelectors() self.workflow = TikTokWorkflow.get_registration_workflow() self.config = RegistrationConfig() self._current_stage = None logger.debug("TikTok registration handler initialized") def register_account( self, full_name: str, age: int, registration_method: str = "email", phone_number: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """ Execute complete account registration workflow. Args: full_name: User's full name age: User's age (must be >= 13) registration_method: Either "email" or "phone" phone_number: Phone number (required if method is "phone") **kwargs: Additional optional parameters Returns: Registration result dictionary with status and account data """ try: # Validate inputs if not self._validate_inputs(full_name, age, registration_method, phone_number): return self._create_error_result("Invalid input parameters", RegistrationStage.NAVIGATION) # Generate account data account_data = self._generate_account_data(full_name, age, registration_method, phone_number, **kwargs) logger.info(f"Starting TikTok registration for {account_data['username']} via {registration_method}") # Execute registration workflow stages stages = [ (RegistrationStage.NAVIGATION, self._navigate_to_tiktok), (RegistrationStage.COOKIE_CONSENT, self._handle_cookies), (RegistrationStage.LOGIN_CLICK, self._click_login), (RegistrationStage.REGISTER_CLICK, self._click_register), (RegistrationStage.PHONE_EMAIL_SELECTION, self._select_phone_email_option), (RegistrationStage.METHOD_SELECTION, lambda: self._select_method(registration_method)), (RegistrationStage.BIRTHDAY_ENTRY, lambda: self._enter_birthday(account_data["birthday"])), (RegistrationStage.EMAIL_ENTRY, lambda: self._enter_email(account_data["email"])), (RegistrationStage.PASSWORD_ENTRY, lambda: self._enter_password(account_data["password"])), (RegistrationStage.CODE_SENDING, self._send_verification_code), (RegistrationStage.CODE_VERIFICATION, lambda: self._verify_code(account_data["email"])), (RegistrationStage.USERNAME_ENTRY, lambda: self._handle_username(account_data)), (RegistrationStage.COMPLETION, self._complete_registration) ] for stage, handler in stages: self._current_stage = stage self._emit_progress(f"Processing: {stage.value}") if not self._execute_with_retry(handler): return self._create_error_result( f"Failed at stage: {stage.value}", stage, account_data ) self._add_human_delay() logger.info(f"Successfully created TikTok account: {account_data['username']}") return self._create_success_result(account_data) except Exception as e: logger.error(f"Unexpected error during registration: {e}", exc_info=True) return self._create_error_result(str(e), self._current_stage) # ========== VALIDATION METHODS ========== def _validate_inputs( self, full_name: str, age: int, method: str, phone: Optional[str] ) -> bool: """Validate registration inputs.""" if not full_name or len(full_name.strip()) < 2: logger.error("Invalid name provided") return False if age < 13: logger.error("Age must be at least 13 for TikTok") return False if method not in ["email", "phone"]: logger.error(f"Invalid registration method: {method}") return False if method == "phone" and not phone: logger.error("Phone number required for phone registration") return False return True # ========== DATA GENERATION ========== def _generate_account_data( self, full_name: str, age: int, method: str, phone: Optional[str], **kwargs ) -> Dict[str, Any]: """Generate account data for registration.""" birthday = self.automation.birthday_generator.generate_birthday_components("tiktok", age) password = kwargs.get("password") or self.automation.password_generator.generate_password() username = kwargs.get("username") or self.automation.username_generator.generate_username(full_name) email = kwargs.get("email") or self._generate_email(username) return { "full_name": full_name, "username": username, "email": email, "password": password, "birthday": birthday, "age": age, "registration_method": method, "phone_number": phone } def _generate_email(self, username: str) -> str: """Generate email address for account.""" return f"{username}@{self.automation.email_domain}" # ========== NAVIGATION STAGES ========== def _navigate_to_tiktok(self) -> bool: """Navigate to TikTok homepage.""" try: self._emit_progress("Navigating to TikTok...") page = self.automation.browser.page page.goto("https://www.tiktok.com/", wait_until="domcontentloaded", timeout=30000) # Verify navigation success if not self._wait_for_element_any([ "button#header-login-button", "button.TUXButton:has-text('Anmelden')", "button:has-text('Log in')" ]): logger.error("Failed to load TikTok homepage") return False logger.info("Successfully navigated to TikTok") return True except Exception as e: logger.error(f"Navigation failed: {e}") return False def _handle_cookies(self) -> bool: """Handle cookie consent banner if present.""" try: cookie_selectors = [ "button:has-text('Alle Cookies akzeptieren')", "button:has-text('Accept all')", "button:has-text('Alle ablehnen')", "button:has-text('Reject all')" ] for selector in cookie_selectors: if self._click_if_visible(selector, timeout=2000): logger.info("Cookie banner handled") return True logger.debug("No cookie banner found") return True except Exception as e: logger.debug(f"Cookie handling: {e}") return True def _click_login(self) -> bool: """Click the login button.""" return self._click_element_with_fallback( primary_selectors=[ "button#header-login-button", "button.TUXButton:has-text('Anmelden')", "button:has-text('Log in')" ], fallback_text=["Anmelden", "Log in", "Sign in"], stage_name="login button" ) def _click_register(self) -> bool: """Click the register link.""" return self._click_element_with_fallback( primary_selectors=[ "a:text('Registrieren')", "button:text('Registrieren')", "span:text('Registrieren')", "a:text('Sign up')", "span:text('Sign up')" ], fallback_text=["Registrieren", "Sign up", "Register"], stage_name="register link" ) def _select_phone_email_option(self) -> bool: """Select phone/email registration option.""" return self._click_element_with_fallback( primary_selectors=[ "div:has-text('Telefonnummer oder E-Mail')", "div:has-text('Use phone or email')", "button:has-text('Phone or email')" ], fallback_text=["Telefonnummer oder E-Mail", "Use phone or email", "Phone or email"], stage_name="phone/email option", use_fuzzy=True ) def _select_method(self, method: str) -> bool: """Select specific registration method (email or phone).""" if method == "email": # Check if already on email page if self._is_element_visible(self.selectors.EMAIL_FIELD, timeout=1000): logger.info("Already on email registration page") return True return self._click_element_with_fallback( primary_selectors=[ "a:has-text('Mit E-Mail-Adresse registrieren')", "a:has-text('Sign up with email')", "button:has-text('E-Mail')" ], fallback_text=["E-Mail", "Email"], stage_name="email method" ) return False # Phone method not fully implemented # ========== FORM FILLING STAGES ========== def _enter_birthday(self, birthday: Dict[str, int]) -> bool: """Enter birthday using dropdowns.""" try: self._emit_progress("Entering birthday...") # Month selection if not self._select_dropdown_option( dropdown_text="Monat", option_value=self._get_month_name(birthday["month"]) ): return False # Day selection if not self._select_dropdown_option( dropdown_text="Tag", option_value=str(birthday["day"]) ): return False # Year selection if not self._select_dropdown_option( dropdown_text="Jahr", option_value=str(birthday["year"]) ): return False logger.info(f"Birthday entered: {birthday['month']}/{birthday['day']}/{birthday['year']}") return True except Exception as e: logger.error(f"Birthday entry failed: {e}") return False def _enter_email(self, email: str) -> bool: """Enter email address.""" return self._fill_input_field( field_selectors=[ "input[placeholder='E-Mail-Adresse']", "input[name='email']", "input[type='email']" ], value=email, field_name="email" ) def _enter_password(self, password: str) -> bool: """Enter password with proper selectors.""" return self._fill_input_field( field_selectors=[ "input.css-ujllvj-InputContainer[type='password']", "input[type='password'][autocomplete='new-password']", "input.etcs7ny1[type='password']", "input[type='password'][placeholder='Passwort']", "input[type='password']" ], value=password, field_name="password", validate=True ) def _send_verification_code(self) -> bool: """Click send code button.""" self._emit_progress("Requesting verification code...") return self._click_element_with_fallback( primary_selectors=[ "button[data-e2e='send-code-button']", "button:has-text('Code senden')", "button:has-text('Send code')" ], fallback_text=["Code senden", "Send code"], stage_name="send code button", wait_enabled=True ) def _verify_code(self, email: str) -> bool: """Handle email verification code.""" try: self._emit_progress("Waiting for verification code...") # Get verification code from email code = self._get_verification_code(email) if not code: logger.error("Failed to retrieve verification code") return False # Enter verification code return self._fill_input_field( field_selectors=[ "input[placeholder*='sechsstelligen Code']", "input[placeholder*='6-digit code']", "input[maxlength='6']" ], value=code, field_name="verification code" ) except Exception as e: logger.error(f"Code verification failed: {e}") return False def _get_verification_code(self, email: str) -> Optional[str]: """Retrieve verification code from email.""" max_attempts = 30 for attempt in range(max_attempts): try: code = self.automation.email_handler.get_verification_code( target_email=email, platform="tiktok", max_attempts=1, delay_seconds=2 ) if code and len(code) == 6 and code.isdigit(): logger.info(f"Verification code retrieved: {code}") return code except Exception as e: logger.debug(f"Attempt {attempt + 1} failed: {e}") time.sleep(2) return None def _handle_username(self, account_data: Dict[str, Any]) -> bool: """Handle username step (usually skipped).""" try: # Try to skip username selection skip_selectors = [ "button:has-text('Überspringen')", "button:has-text('Skip')", "a:has-text('Skip')" ] for selector in skip_selectors: if self._click_if_visible(selector, timeout=2000): logger.info("Username step skipped") return True # If can't skip, try to continue return True except Exception as e: logger.debug(f"Username handling: {e}") return True def _complete_registration(self) -> bool: """Complete registration and verify success.""" try: self._emit_progress("Finalizing account...") # Click any final continue buttons continue_selectors = [ "button:has-text('Weiter')", "button:has-text('Continue')", "button:has-text('Next')" ] for selector in continue_selectors: self._click_if_visible(selector, timeout=1000) # Check for success indicators success_indicators = [ "a[href='/foryou']", "button[data-e2e='profile-icon']", "[aria-label='Profile']" ] for indicator in success_indicators: if self._is_element_visible(indicator, timeout=5000): logger.info("Registration completed successfully") return True # Even if indicators not found, might still be successful logger.warning("Success indicators not found, assuming success") return True except Exception as e: logger.error(f"Completion check failed: {e}") return False # ========== HELPER METHODS ========== def _execute_with_retry(self, handler, max_retries: Optional[int] = None) -> bool: """Execute handler with retry logic.""" retries = max_retries or self.config.max_retries for attempt in range(retries): try: if handler(): return True if attempt < retries - 1: logger.debug(f"Retrying... ({attempt + 2}/{retries})") time.sleep(self.config.retry_delay * (attempt + 1)) except Exception as e: logger.error(f"Handler error: {e}") if attempt == retries - 1: raise return False def _click_element_with_fallback( self, primary_selectors: List[str], fallback_text: List[str], stage_name: str, use_fuzzy: bool = False, wait_enabled: bool = False ) -> bool: """Click element with multiple fallback strategies.""" # Try primary selectors for selector in primary_selectors: if self._click_if_visible(selector): logger.info(f"Clicked {stage_name} with selector: {selector}") return True # Try fuzzy matching if enabled if use_fuzzy and hasattr(self.automation, 'ui_helper'): try: if self.automation.ui_helper.click_button_fuzzy( fallback_text, primary_selectors[0] if primary_selectors else None ): logger.info(f"Clicked {stage_name} using fuzzy matching") return True except: pass logger.error(f"Failed to click {stage_name}") return False def _fill_input_field( self, field_selectors: List[str], value: str, field_name: str, validate: bool = False ) -> bool: """Fill input field with value.""" for selector in field_selectors: try: if self._is_element_visible(selector, timeout=2000): element = self.automation.browser.page.locator(selector).first element.click() element.fill("") element.type(value, delay=50) if validate: actual = element.input_value() if actual != value: logger.warning(f"Validation failed for {field_name}") continue logger.info(f"Filled {field_name} field") return True except Exception as e: logger.debug(f"Failed with selector {selector}: {e}") logger.error(f"Failed to fill {field_name} field") return False def _select_dropdown_option(self, dropdown_text: str, option_value: str) -> bool: """Select option from dropdown.""" try: # Click dropdown dropdown_selector = f"div:has-text('{dropdown_text}')" if not self._click_if_visible(dropdown_selector): return False time.sleep(0.5) # Click option option_selector = f"div.css-vz5m7n-DivOption:has-text('{option_value}')" if not self._click_if_visible(option_selector): # Try alternative selector option_selector = f"div[role='option']:has-text('{option_value}')" if not self._click_if_visible(option_selector): return False logger.info(f"Selected {option_value} from {dropdown_text} dropdown") return True except Exception as e: logger.error(f"Dropdown selection failed: {e}") return False def _click_if_visible(self, selector: str, timeout: int = None) -> bool: """Click element if visible.""" try: timeout = timeout or self.config.timeout if self.automation.browser.is_element_visible(selector, timeout=timeout): self.automation.browser.click_element(selector) return True except: pass return False def _is_element_visible(self, selector: str, timeout: int = None) -> bool: """Check if element is visible.""" try: timeout = timeout or self.config.timeout return self.automation.browser.is_element_visible(selector, timeout=timeout) except: return False def _wait_for_element_any(self, selectors: List[str], timeout: int = None) -> bool: """Wait for any of the selectors to be visible.""" timeout = timeout or self.config.timeout end_time = time.time() + (timeout / 1000) while time.time() < end_time: for selector in selectors: if self._is_element_visible(selector, timeout=500): return True time.sleep(0.5) return False def _add_human_delay(self): """Add human-like delay between actions.""" if hasattr(self.automation, 'human_behavior'): self.automation.human_behavior.random_delay( self.config.human_delay_min, self.config.human_delay_max ) else: time.sleep(self.config.human_delay_min) def _emit_progress(self, message: str): """Emit progress message to UI.""" if hasattr(self.automation, '_emit_customer_log'): self.automation._emit_customer_log(message) logger.info(message) def _get_month_name(self, month: int) -> str: """Convert month number to name.""" months = { 1: "Januar", 2: "Februar", 3: "März", 4: "April", 5: "Mai", 6: "Juni", 7: "Juli", 8: "August", 9: "September", 10: "Oktober", 11: "November", 12: "Dezember" } return months.get(month, str(month)) # ========== RESULT CREATION ========== def _create_success_result(self, account_data: Dict[str, Any]) -> Dict[str, Any]: """Create success result dictionary.""" return { "success": True, "stage": RegistrationStage.COMPLETION.value, "account_data": account_data, "message": f"Account {account_data['username']} successfully created" } def _create_error_result( self, error: str, stage: Optional[RegistrationStage] = None, account_data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Create error result dictionary.""" return { "success": False, "error": error, "stage": stage.value if stage else "unknown", "account_data": account_data or {} }