""" E-Mail-Handler für den Social Media Account Generator. Verwaltet den Abruf von Bestätigungscodes und E-Mail-Verifizierungen. """ import os import json import logging import time import imaplib import email import re from typing import Dict, List, Any, Optional, Tuple, Union from email.header import decode_header from datetime import datetime, timedelta from utils.text_similarity import TextSimilarity logger = logging.getLogger("email_handler") class EmailHandler: """ Handler für den Zugriff auf E-Mail-Dienste und den Abruf von Bestätigungscodes. """ CONFIG_FILE = os.path.join("config", "email_config.json") def __init__(self): """Initialisiert den EmailHandler und lädt die Konfiguration.""" self.config = self.load_config() # Stelle sicher, dass das Konfigurationsverzeichnis existiert os.makedirs(os.path.dirname(self.CONFIG_FILE), exist_ok=True) # TextSimilarity-Instanz für Fuzzy-Matching self.text_similarity = TextSimilarity(default_threshold=0.75) # Cache für die letzten erfolgreichen Verbindungsdaten self.last_connection = None # Typische Betreffzeilen für Verifizierungs-E-Mails nach Plattform self.verification_subjects = { "instagram": [ "Bestätige deine E-Mail-Adresse", "Bestätigungscode für Instagram", "Dein Instagram-Code", "Bestätige deinen Instagram-Account", "Verify your email address", "Instagram Verification Code", "Your Instagram Code", "Verify your Instagram account", "Instagram-Bestätigungscode", "Instagram security code" ], "facebook": [ "Bestätigungscode für Facebook", "Facebook-Bestätigungscode", "Dein Facebook-Code", "Facebook Verification Code", "Your Facebook Code" ], "twitter": [ "Bestätige dein Twitter-Konto", "Twitter-Bestätigungscode", "Verify your Twitter account", "Twitter Verification Code" ], "x": [ "ist dein X Verifizierungscode", "is your X verification code", "X Verifizierungscode", "X verification code", "Bestätige dein X-Konto", "Verify your X account", "X Bestätigungscode", "X confirmation code" ], "tiktok": [ "ist dein Bestätigungscode", "is your confirmation code", "TikTok-Bestätigungscode", "Bestätige dein TikTok-Konto", "TikTok Verification Code", "Verify your TikTok account" ], "default": [ "Bestätigungscode", "Verification Code", "Account Verification", "Konto-Bestätigung", "Security Code", "Sicherheitscode" ] } logger.info("E-Mail-Handler initialisiert") def load_config(self) -> Dict[str, Any]: """ Lädt die E-Mail-Konfiguration aus der Konfigurationsdatei. Returns: Dict[str, Any]: Die geladene Konfiguration oder Standardwerte """ default_config = { "imap_server": "imap.ionos.de", "imap_port": 993, "imap_user": "info@z5m7q9dk3ah2v1plx6ju.com", "imap_pass": "cz&ie.O9$!:!tYY@" } try: if os.path.exists(self.CONFIG_FILE): with open(self.CONFIG_FILE, "r", encoding="utf-8") as f: config = json.load(f) logger.info("E-Mail-Konfiguration geladen") return config else: # Standardwerte speichern with open(self.CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(default_config, f, indent=2) logger.info("Standard-E-Mail-Konfiguration erstellt") return default_config except Exception as e: logger.error(f"Fehler beim Laden der E-Mail-Konfiguration: {e}") return default_config def save_config(self) -> bool: """ Speichert die aktuelle Konfiguration in die Konfigurationsdatei. Returns: bool: True bei Erfolg, False bei Fehler """ try: with open(self.CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(self.config, f, indent=2) logger.info("E-Mail-Konfiguration gespeichert") return True except Exception as e: logger.error(f"Fehler beim Speichern der E-Mail-Konfiguration: {e}") return False def get_config(self) -> Dict[str, Any]: """ Gibt die aktuelle Konfiguration zurück. Returns: Dict[str, Any]: Die aktuelle Konfiguration """ return self.config def update_config(self, new_config: Dict[str, Any]) -> bool: """ Aktualisiert die Konfiguration mit den neuen Werten. Args: new_config: Neue Konfiguration Returns: bool: True bei Erfolg, False bei Fehler """ try: # Aktuelle Konfiguration speichern old_config = self.config.copy() # Neue Werte übernehmen self.config.update(new_config) # Konfiguration speichern success = self.save_config() if not success: # Bei Speicherfehler zur alten Konfiguration zurückkehren self.config = old_config return False return True except Exception as e: logger.error(f"Fehler beim Aktualisieren der E-Mail-Konfiguration: {e}") return False def update_credentials(self, username: str, password: str) -> bool: """ Aktualisiert nur die Anmeldeinformationen. Args: username: Benutzername password: Passwort Returns: bool: True bei Erfolg, False bei Fehler """ return self.update_config({ "imap_user": username, "imap_pass": password }) def update_server(self, server: str, port: int) -> bool: """ Aktualisiert nur die Serverinformationen. Args: server: IMAP-Server port: IMAP-Port Returns: bool: True bei Erfolg, False bei Fehler """ return self.update_config({ "imap_server": server, "imap_port": port }) def test_connection(self) -> Dict[str, Any]: """ Testet die Verbindung zum IMAP-Server. Returns: Dict[str, Any]: Ergebnis des Tests """ try: logger.info(f"Teste Verbindung zu {self.config['imap_server']}:{self.config['imap_port']}") # SSL-Verbindung zum IMAP-Server herstellen mail = imaplib.IMAP4_SSL(self.config["imap_server"], self.config["imap_port"]) # Anmelden mail.login(self.config["imap_user"], self.config["imap_pass"]) # Verfügbare Postfächer auflisten status, mailboxes = mail.list() if status == 'OK': mailbox_count = len(mailboxes) # INBOX auswählen mail.select("INBOX") # Abmelden mail.logout() # Verbindungsdaten im Cache speichern self.last_connection = { "server": self.config["imap_server"], "port": self.config["imap_port"], "username": self.config["imap_user"], "password": self.config["imap_pass"] } logger.info(f"Verbindungstest erfolgreich: {mailbox_count} Postfächer gefunden") return { "success": True, "server": self.config["imap_server"], "port": self.config["imap_port"], "mailbox_count": mailbox_count } else: logger.error(f"Fehler beim Abrufen der Postfächer: {status}") mail.logout() return { "success": False, "error": f"Fehler beim Abrufen der Postfächer: {status}" } except imaplib.IMAP4.error as e: logger.error(f"IMAP-Fehler: {e}") return { "success": False, "error": f"IMAP-Fehler: {e}" } except Exception as e: logger.error(f"Allgemeiner Fehler: {e}") return { "success": False, "error": f"Allgemeiner Fehler: {e}" } def search_emails(self, search_criteria: str = "ALL", max_emails: int = 5) -> List[Dict[str, Any]]: """ Sucht nach E-Mails mit den angegebenen Kriterien. Args: search_criteria: IMAP-Suchkriterien max_emails: Maximale Anzahl der abzurufenden E-Mails Returns: List[Dict[str, Any]]: Liste der gefundenen E-Mails """ try: # Verbindung zum IMAP-Server herstellen mail = imaplib.IMAP4_SSL(self.config["imap_server"], self.config["imap_port"]) # Anmelden mail.login(self.config["imap_user"], self.config["imap_pass"]) # INBOX auswählen mail.select("INBOX") # Nach E-Mails suchen status, data = mail.search(None, search_criteria) emails = [] if status == 'OK': # E-Mail-IDs abrufen email_ids = data[0].split() # Newest emails first email_ids = list(reversed(email_ids)) # Begrenze die Anzahl der abzurufenden E-Mails if max_emails > 0: email_ids = email_ids[:max_emails] for email_id in email_ids: # E-Mail abrufen status, data = mail.fetch(email_id, '(RFC822)') if status == 'OK': # E-Mail-Inhalt parsen raw_email = data[0][1] msg = email.message_from_bytes(raw_email) # Betreff decodieren (vollständig, alle Teile zusammenfügen) subject_parts = decode_header(msg.get("Subject", "")) subject = "" for part, encoding in subject_parts: if isinstance(part, bytes): subject += part.decode(encoding or 'utf-8', errors='replace') else: subject += str(part) if part else "" # Absender decodieren (vollständig, alle Teile zusammenfügen) from_parts = decode_header(msg.get("From", "")) from_addr = "" for part, encoding in from_parts: if isinstance(part, bytes): from_addr += part.decode(encoding or 'utf-8', errors='replace') else: from_addr += str(part) if part else "" # Empfänger decodieren (vollständig, alle Teile zusammenfügen) to_parts = decode_header(msg.get("To", "")) to_addr = "" for part, encoding in to_parts: if isinstance(part, bytes): to_addr += part.decode(encoding or 'utf-8', errors='replace') else: to_addr += str(part) if part else "" # Extrahiere E-Mail-Adresse aus dem To-Feld to_email = self._extract_email_from_addr(to_addr) # Datum decodieren date = msg.get("Date", "") # E-Mail-Text extrahieren body = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if "attachment" not in content_disposition: if content_type == "text/plain": try: # Textinhalt decodieren charset = part.get_content_charset() or 'utf-8' body = part.get_payload(decode=True).decode(charset, errors='replace') break except: body = "[Fehler beim Decodieren des Inhalts]" elif content_type == "text/html" and not body: try: # HTML-Inhalt decodieren charset = part.get_content_charset() or 'utf-8' body = part.get_payload(decode=True).decode(charset, errors='replace') except: body = "[Fehler beim Decodieren des HTML-Inhalts]" else: try: # Einzel-Teil-E-Mail decodieren charset = msg.get_content_charset() or 'utf-8' body = msg.get_payload(decode=True).decode(charset, errors='replace') except: body = "[Fehler beim Decodieren des Inhalts]" # E-Mail-Informationen speichern email_info = { "id": email_id.decode(), "subject": subject, "from": from_addr, "to": to_addr, "to_email": to_email, "date": date, "body": body } emails.append(email_info) # Abmelden mail.logout() logger.info(f"{len(emails)} E-Mails gefunden") return emails except Exception as e: logger.error(f"Fehler beim Suchen nach E-Mails: {e}") return [] def _extract_email_from_addr(self, addr_str: str) -> str: """ Extrahiert die E-Mail-Adresse aus einem Adressstring im Format 'Name '. Args: addr_str: Adressstring Returns: str: Die extrahierte E-Mail-Adresse oder der ursprüngliche String """ # Regulärer Ausdruck für die Extraktion der E-Mail-Adresse email_pattern = r'?' match = re.search(email_pattern, addr_str) if match: return match.group(1).lower() return addr_str.lower() def _is_subject_relevant(self, subject: str, platform: str) -> bool: """ Prüft, ob der Betreff relevant für eine Verifizierungs-E-Mail der angegebenen Plattform ist. Verwendet Fuzzy-Matching für die Erkennung. Args: subject: Betreff der E-Mail platform: Plattform (instagram, facebook, twitter, etc.) Returns: bool: True, wenn der Betreff relevant ist, False sonst """ # Standardschwellenwert für Fuzzy-Matching threshold = 0.75 # Betreffzeilen für die angegebene Plattform und Standard subject_patterns = self.verification_subjects.get(platform.lower(), []) subject_patterns += self.verification_subjects["default"] # Prüfe auf exakte Übereinstimmung (schneller) for pattern in subject_patterns: if pattern.lower() in subject.lower(): logger.debug(f"Relevanter Betreff gefunden (exakte Übereinstimmung): {subject}") return True # Wenn keine exakte Übereinstimmung, Fuzzy-Matching verwenden for pattern in subject_patterns: similarity = self.text_similarity.similarity_ratio(pattern.lower(), subject.lower()) if similarity >= threshold: logger.debug(f"Relevanter Betreff gefunden (Fuzzy-Matching, {similarity:.2f}): {subject}") return True # Alternativ: Prüfe, ob der Betreff den Pattern enthält (mit Fuzzy-Matching) if self.text_similarity.contains_similar_text(subject.lower(), [pattern.lower()], threshold=threshold): logger.debug(f"Relevanter Betreff gefunden (Fuzzy-Contains): {subject}") return True return False def get_verification_code(self, target_email: Optional[str] = None, platform: str = "instagram", max_attempts: int = 30, delay_seconds: int = 2) -> Optional[str]: """ Ruft einen Bestätigungscode von einer E-Mail ab. Args: target_email: Ziel-E-Mail-Adresse oder None für alle platform: Plattform (instagram, facebook, twitter, etc.) max_attempts: Maximale Anzahl an Versuchen delay_seconds: Verzögerung zwischen Versuchen in Sekunden Returns: Optional[str]: Der Bestätigungscode oder None, wenn nicht gefunden """ logger.info(f"Suche nach Bestätigungscode für {platform} mit E-Mail {target_email or 'alle'}") # Bei Catch-All Domains ist die exakte E-Mail-Adresse wichtig! if target_email: logger.info(f"EXAKTE E-Mail-Suche: {target_email} (Catch-All Domain)") # Letzter Tag als Suchkriterium today = datetime.now() yesterday = today - timedelta(days=1) date_str = yesterday.strftime("%d-%b-%Y") search_criteria = f'(SINCE "{date_str}")' # E-Mail-Abruf mit Wiederholungsversuch total_wait_time = max_attempts * delay_seconds logger.info(f"Warte bis zu {total_wait_time} Sekunden ({total_wait_time/60:.1f} Minuten) auf E-Mail") for attempt in range(max_attempts): elapsed_time = attempt * delay_seconds remaining_time = total_wait_time - elapsed_time logger.debug(f"Versuch {attempt + 1}/{max_attempts} - Verstrichene Zeit: {elapsed_time}s, Verbleibend: {remaining_time}s") # Alle neuen E-Mails abrufen emails = self.search_emails(search_criteria, max_emails=10) logger.debug(f"Gefundene E-Mails: {len(emails)}") # E-Mails filtern und nach Bestätigungscode suchen for idx, email_info in enumerate(emails): logger.debug(f"E-Mail {idx+1}: To={email_info.get('to_email', 'N/A')}, Subject={email_info.get('subject', 'N/A')[:50]}...") # Extrahierte E-Mail-Adresse des Empfängers to_email = email_info.get("to_email", "").lower() # WICHTIG: Bei Catch-All Domains MUSS die exakte E-Mail-Adresse übereinstimmen! if target_email: # NUR exakte E-Mail-Übereinstimmung zulassen if target_email.lower() == to_email: logger.debug(f"✓ E-Mail-Match: {to_email} == {target_email}") else: logger.debug(f"✗ E-Mail übersprungen: {to_email} != {target_email} (exakte Übereinstimmung erforderlich)") continue # Betreff auf Relevanz prüfen (mit Fuzzy-Matching) subject = email_info.get("subject", "") if not subject or not self._is_subject_relevant(subject, platform): logger.debug(f"E-Mail übersprungen: Betreff '{subject}' ist nicht relevant") continue # Nach Bestätigungscode im Text suchen body = email_info.get("body", "") code = self._extract_verification_code(body, platform) if code: logger.info(f"Bestätigungscode gefunden: {code} (E-Mail an {to_email})") return code else: logger.debug(f"Kein Code in relevanter E-Mail gefunden (Betreff: {subject})") # Wenn kein Code gefunden wurde und noch Versuche übrig sind, warten und erneut versuchen if attempt < max_attempts - 1: logger.debug(f"Kein Code gefunden, warte {delay_seconds} Sekunden...") time.sleep(delay_seconds) logger.warning("Kein Bestätigungscode gefunden nach allen Versuchen") return None def _extract_verification_code(self, text: str, platform: str = "instagram") -> Optional[str]: """ Extrahiert einen Bestätigungscode aus einem Text. Args: text: Zu durchsuchender Text platform: Plattform (instagram, facebook, twitter, etc.) Returns: Optional[str]: Der gefundene Bestätigungscode oder None """ # Plattformspezifische Muster für Bestätigungscodes patterns = { "instagram": [ r"Dein Code ist (\d{6})", r"Your code is (\d{6})", r"Bestätigungscode: (\d{6})", r"Confirmation code: (\d{6})", r"(\d{6}) ist dein Instagram-Code", r"(\d{6}) is your Instagram code", r"Instagram-Code: (\d{6})", r"Instagram code: (\d{6})", r"Instagram: (\d{6})", r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern ], "facebook": [ r"Dein Code ist (\d{5})", r"Your code is (\d{5})", r"Bestätigungscode: (\d{5})", r"Confirmation code: (\d{5})", r"Facebook-Code: (\d{5})", r"Facebook code: (\d{5})", r"Facebook: (\d{5})", r"[^\d](\d{5})[^\d]" # 5-stellige Zahl umgeben von Nicht-Ziffern ], "twitter": [ r"Code: (\d{6})", r"Verification code: (\d{6})", r"Twitter-Code: (\d{6})", r"Twitter code: (\d{6})", r"Twitter: (\d{6})", r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern ], "x": [ r"(\d{6}) ist dein X Verifizierungscode", r"(\d{6}) is your X verification code", r"Code: (\d{6})", r"Verification code: (\d{6})", r"X-Code: (\d{6})", r"X code: (\d{6})", r"X: (\d{6})", r"Verifizierungscode: (\d{6})", r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern ], "tiktok": [ r"(\d{6}) ist dein Bestätigungscode", r"(\d{6}) is your confirmation code", r"TikTok-Code: (\d{6})", r"TikTok code: (\d{6})", r"TikTok: (\d{6})", r"Bestätigungscode[:\s]*(\d{6})", r"Confirmation code[:\s]*(\d{6})", r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern ], "default": [ r"Code[:\s]*(\d{4,8})", r"[Vv]erification [Cc]ode[:\s]*(\d{4,8})", r"[Bb]estätigungscode[:\s]*(\d{4,8})", r"(\d{4,8}) is your code", r"(\d{4,8}) ist dein Code", r"[^\d](\d{6})[^\d]", # 6-stellige Zahl umgeben von Nicht-Ziffern r"[^\d](\d{5})[^\d]" # 5-stellige Zahl umgeben von Nicht-Ziffern ] } # Plattformspezifische Muster verwenden platform_patterns = patterns.get(platform.lower(), []) # Alle Muster dieser Plattform durchsuchen for pattern in platform_patterns: match = re.search(pattern, text) if match: code = match.group(1) logger.debug(f"Code gefunden mit Muster '{pattern}': {code}") return code # Wenn keine plattformspezifischen Muster gefunden wurden, Default-Muster verwenden for pattern in patterns["default"]: match = re.search(pattern, text) if match: code = match.group(1) logger.debug(f"Code gefunden mit Default-Muster '{pattern}': {code}") return code # Generische Suche nach Zahlen (für die jeweilige Plattform typische Länge) code_length = 6 # Standard if platform.lower() == "facebook": code_length = 5 # Suche nach alleinstehenden Zahlen der richtigen Länge generic_pattern = r"\b(\d{" + str(code_length) + r"})\b" matches = re.findall(generic_pattern, text) if matches: # Nehme die erste gefundene Zahl code = matches[0] logger.debug(f"Code gefunden mit generischem Muster: {code}") return code logger.debug("Kein Code gefunden") return None def get_confirmation_code(self, expected_email: str, search_criteria: str = "ALL", max_attempts: int = 30, delay_seconds: int = 2) -> Optional[str]: """ Ruft einen Bestätigungscode von einer E-Mail ab (Kompatibilitätsmethode). Args: expected_email: E-Mail-Adresse, von der der Code erwartet wird search_criteria: IMAP-Suchkriterien max_attempts: Maximale Anzahl an Versuchen delay_seconds: Verzögerung zwischen Versuchen in Sekunden Returns: Optional[str]: Der Bestätigungscode oder None, wenn nicht gefunden """ logger.info(f"Suche nach Bestätigungscode für {expected_email}") # Vermutete Plattform basierend auf der E-Mail-Adresse oder dem Inhalt platform = "instagram" # Standard # Bestätigungscode abrufen return self.get_verification_code(expected_email, platform, max_attempts, delay_seconds)