Files
AccountForger-neuerUpload/utils/email_handler.py
2025-11-27 21:17:32 +01:00

703 Zeilen
29 KiB
Python

"""
E-Mail-Handler für den Social Media Account Generator.
Verwaltet den Abruf von Bestätigungscodes und E-Mail-Verifizierungen.
"""
import os
import json
import logging
import time
import imaplib
import email
import re
from typing import Dict, List, Any, Optional, Tuple, Union
from email.header import decode_header
from datetime import datetime, timedelta
from utils.text_similarity import TextSimilarity
from config.paths import PathConfig
logger = logging.getLogger("email_handler")
class EmailHandler:
"""
Handler für den Zugriff auf E-Mail-Dienste und den Abruf von Bestätigungscodes.
"""
CONFIG_FILE = os.path.join(PathConfig.CONFIG_DIR, "email_config.json")
def __init__(self):
"""Initialisiert den EmailHandler und lädt die Konfiguration."""
self.config = self.load_config()
# Stelle sicher, dass das Konfigurationsverzeichnis existiert
os.makedirs(os.path.dirname(self.CONFIG_FILE), exist_ok=True)
# TextSimilarity-Instanz für Fuzzy-Matching
self.text_similarity = TextSimilarity(default_threshold=0.75)
# Cache für die letzten erfolgreichen Verbindungsdaten
self.last_connection = None
# Typische Betreffzeilen für Verifizierungs-E-Mails nach Plattform
self.verification_subjects = {
"instagram": [
"Bestätige deine E-Mail-Adresse",
"Bestätigungscode für Instagram",
"Dein Instagram-Code",
"Bestätige deinen Instagram-Account",
"Verify your email address",
"Instagram Verification Code",
"Your Instagram Code",
"Verify your Instagram account",
"Instagram-Bestätigungscode",
"Instagram security code"
],
"facebook": [
"Bestätigungscode für Facebook",
"Facebook-Bestätigungscode",
"Dein Facebook-Code",
"Facebook Verification Code",
"Your Facebook Code"
],
"twitter": [
"Bestätige dein Twitter-Konto",
"Twitter-Bestätigungscode",
"Verify your Twitter account",
"Twitter Verification Code"
],
"x": [
"ist dein X Verifizierungscode",
"is your X verification code",
"X Verifizierungscode",
"X verification code",
"Bestätige dein X-Konto",
"Verify your X account",
"X Bestätigungscode",
"X confirmation code"
],
"tiktok": [
"ist dein Bestätigungscode",
"is your confirmation code",
"TikTok-Bestätigungscode",
"Bestätige dein TikTok-Konto",
"TikTok Verification Code",
"Verify your TikTok account"
],
"default": [
"Bestätigungscode",
"Verification Code",
"Account Verification",
"Konto-Bestätigung",
"Security Code",
"Sicherheitscode"
]
}
logger.info("E-Mail-Handler initialisiert")
def load_config(self) -> Dict[str, Any]:
"""
Lädt die E-Mail-Konfiguration aus der Konfigurationsdatei.
Returns:
Dict[str, Any]: Die geladene Konfiguration oder Standardwerte
"""
default_config = {
"imap_server": "imap.ionos.de",
"imap_port": 993,
"imap_user": "info@z5m7q9dk3ah2v1plx6ju.com",
"imap_pass": "GZsg9:66@a@M%etP"
}
try:
if os.path.exists(self.CONFIG_FILE):
with open(self.CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
logger.info("E-Mail-Konfiguration geladen")
return config
else:
# Standardwerte speichern
with open(self.CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(default_config, f, indent=2)
logger.info("Standard-E-Mail-Konfiguration erstellt")
return default_config
except Exception as e:
logger.error(f"Fehler beim Laden der E-Mail-Konfiguration: {e}")
return default_config
def save_config(self) -> bool:
"""
Speichert die aktuelle Konfiguration in die Konfigurationsdatei.
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
with open(self.CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(self.config, f, indent=2)
logger.info("E-Mail-Konfiguration gespeichert")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der E-Mail-Konfiguration: {e}")
return False
def get_config(self) -> Dict[str, Any]:
"""
Gibt die aktuelle Konfiguration zurück.
Returns:
Dict[str, Any]: Die aktuelle Konfiguration
"""
return self.config
def update_config(self, new_config: Dict[str, Any]) -> bool:
"""
Aktualisiert die Konfiguration mit den neuen Werten.
Args:
new_config: Neue Konfiguration
Returns:
bool: True bei Erfolg, False bei Fehler
"""
try:
# Aktuelle Konfiguration speichern
old_config = self.config.copy()
# Neue Werte übernehmen
self.config.update(new_config)
# Konfiguration speichern
success = self.save_config()
if not success:
# Bei Speicherfehler zur alten Konfiguration zurückkehren
self.config = old_config
return False
return True
except Exception as e:
logger.error(f"Fehler beim Aktualisieren der E-Mail-Konfiguration: {e}")
return False
def update_credentials(self, username: str, password: str) -> bool:
"""
Aktualisiert nur die Anmeldeinformationen.
Args:
username: Benutzername
password: Passwort
Returns:
bool: True bei Erfolg, False bei Fehler
"""
return self.update_config({
"imap_user": username,
"imap_pass": password
})
def update_server(self, server: str, port: int) -> bool:
"""
Aktualisiert nur die Serverinformationen.
Args:
server: IMAP-Server
port: IMAP-Port
Returns:
bool: True bei Erfolg, False bei Fehler
"""
return self.update_config({
"imap_server": server,
"imap_port": port
})
def test_connection(self) -> Dict[str, Any]:
"""
Testet die Verbindung zum IMAP-Server.
Returns:
Dict[str, Any]: Ergebnis des Tests
"""
try:
logger.info(f"Teste Verbindung zu {self.config['imap_server']}:{self.config['imap_port']}")
# DEBUG: Zeige geladene Config
logger.info(f"IMAP Config geladen: Server={self.config['imap_server']}, Port={self.config['imap_port']}, User={self.config['imap_user']}, Pass={self.config['imap_pass'][:4]}...{self.config['imap_pass'][-4:]}")
# SSL-Verbindung zum IMAP-Server herstellen
mail = imaplib.IMAP4_SSL(self.config["imap_server"], self.config["imap_port"])
# Anmelden
mail.login(self.config["imap_user"], self.config["imap_pass"])
# Verfügbare Postfächer auflisten
status, mailboxes = mail.list()
if status == 'OK':
mailbox_count = len(mailboxes)
# INBOX auswählen
mail.select("INBOX")
# Abmelden
mail.logout()
# Verbindungsdaten im Cache speichern
self.last_connection = {
"server": self.config["imap_server"],
"port": self.config["imap_port"],
"username": self.config["imap_user"],
"password": self.config["imap_pass"]
}
logger.info(f"Verbindungstest erfolgreich: {mailbox_count} Postfächer gefunden")
return {
"success": True,
"server": self.config["imap_server"],
"port": self.config["imap_port"],
"mailbox_count": mailbox_count
}
else:
logger.error(f"Fehler beim Abrufen der Postfächer: {status}")
mail.logout()
return {
"success": False,
"error": f"Fehler beim Abrufen der Postfächer: {status}"
}
except imaplib.IMAP4.error as e:
logger.error(f"IMAP-Fehler: {e}")
return {
"success": False,
"error": f"IMAP-Fehler: {e}"
}
except Exception as e:
logger.error(f"Allgemeiner Fehler: {e}")
return {
"success": False,
"error": f"Allgemeiner Fehler: {e}"
}
def search_emails(self, search_criteria: str = "ALL", max_emails: int = 5) -> List[Dict[str, Any]]:
"""
Sucht nach E-Mails mit den angegebenen Kriterien.
Args:
search_criteria: IMAP-Suchkriterien
max_emails: Maximale Anzahl der abzurufenden E-Mails
Returns:
List[Dict[str, Any]]: Liste der gefundenen E-Mails
"""
try:
# DEBUG: Zeige Login-Daten SOFORT (ohne komplettes Passwort)
print(f"[EMAIL-DEBUG] IMAP Login-Versuch: Server={self.config['imap_server']}, Port={self.config['imap_port']}, User={self.config['imap_user']}, Pass={self.config['imap_pass'][:4]}...{self.config['imap_pass'][-4:]}")
logger.info(f"IMAP Login-Versuch: Server={self.config['imap_server']}, Port={self.config['imap_port']}, User={self.config['imap_user']}, Pass={self.config['imap_pass'][:4]}...{self.config['imap_pass'][-4:]}")
# Verbindung zum IMAP-Server herstellen
mail = imaplib.IMAP4_SSL(self.config["imap_server"], self.config["imap_port"])
# Anmelden
mail.login(self.config["imap_user"], self.config["imap_pass"])
# INBOX auswählen
mail.select("INBOX")
# Nach E-Mails suchen
status, data = mail.search(None, search_criteria)
emails = []
if status == 'OK':
# E-Mail-IDs abrufen
email_ids = data[0].split()
# Newest emails first
email_ids = list(reversed(email_ids))
# Begrenze die Anzahl der abzurufenden E-Mails
if max_emails > 0:
email_ids = email_ids[:max_emails]
for email_id in email_ids:
# E-Mail abrufen
status, data = mail.fetch(email_id, '(RFC822)')
if status == 'OK':
# E-Mail-Inhalt parsen
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# Betreff decodieren (vollständig, alle Teile zusammenfügen)
subject_parts = decode_header(msg.get("Subject", ""))
subject = ""
for part, encoding in subject_parts:
if isinstance(part, bytes):
subject += part.decode(encoding or 'utf-8', errors='replace')
else:
subject += str(part) if part else ""
# Absender decodieren (vollständig, alle Teile zusammenfügen)
from_parts = decode_header(msg.get("From", ""))
from_addr = ""
for part, encoding in from_parts:
if isinstance(part, bytes):
from_addr += part.decode(encoding or 'utf-8', errors='replace')
else:
from_addr += str(part) if part else ""
# Empfänger decodieren (vollständig, alle Teile zusammenfügen)
to_parts = decode_header(msg.get("To", ""))
to_addr = ""
for part, encoding in to_parts:
if isinstance(part, bytes):
to_addr += part.decode(encoding or 'utf-8', errors='replace')
else:
to_addr += str(part) if part else ""
# Extrahiere E-Mail-Adresse aus dem To-Feld
to_email = self._extract_email_from_addr(to_addr)
# Datum decodieren
date = msg.get("Date", "")
# E-Mail-Text extrahieren
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" not in content_disposition:
if content_type == "text/plain":
try:
# Textinhalt decodieren
charset = part.get_content_charset() or 'utf-8'
body = part.get_payload(decode=True).decode(charset, errors='replace')
break
except:
body = "[Fehler beim Decodieren des Inhalts]"
elif content_type == "text/html" and not body:
try:
# HTML-Inhalt decodieren
charset = part.get_content_charset() or 'utf-8'
body = part.get_payload(decode=True).decode(charset, errors='replace')
except:
body = "[Fehler beim Decodieren des HTML-Inhalts]"
else:
try:
# Einzel-Teil-E-Mail decodieren
charset = msg.get_content_charset() or 'utf-8'
body = msg.get_payload(decode=True).decode(charset, errors='replace')
except:
body = "[Fehler beim Decodieren des Inhalts]"
# E-Mail-Informationen speichern
email_info = {
"id": email_id.decode(),
"subject": subject,
"from": from_addr,
"to": to_addr,
"to_email": to_email,
"date": date,
"body": body
}
emails.append(email_info)
# Abmelden
mail.logout()
logger.info(f"{len(emails)} E-Mails gefunden")
return emails
except Exception as e:
logger.error(f"Fehler beim Suchen nach E-Mails: {e}")
return []
def _extract_email_from_addr(self, addr_str: str) -> str:
"""
Extrahiert die E-Mail-Adresse aus einem Adressstring im Format 'Name <email@domain.com>'.
Args:
addr_str: Adressstring
Returns:
str: Die extrahierte E-Mail-Adresse oder der ursprüngliche String
"""
# Regulärer Ausdruck für die Extraktion der E-Mail-Adresse
email_pattern = r'<?([\w\.-]+@[\w\.-]+\.\w+)>?'
match = re.search(email_pattern, addr_str)
if match:
return match.group(1).lower()
return addr_str.lower()
def _is_subject_relevant(self, subject: str, platform: str) -> bool:
"""
Prüft, ob der Betreff relevant für eine Verifizierungs-E-Mail der angegebenen Plattform ist.
Verwendet Fuzzy-Matching für die Erkennung.
Args:
subject: Betreff der E-Mail
platform: Plattform (instagram, facebook, twitter, etc.)
Returns:
bool: True, wenn der Betreff relevant ist, False sonst
"""
# Standardschwellenwert für Fuzzy-Matching
threshold = 0.75
# Betreffzeilen für die angegebene Plattform und Standard
subject_patterns = self.verification_subjects.get(platform.lower(), [])
subject_patterns += self.verification_subjects["default"]
# Prüfe auf exakte Übereinstimmung (schneller)
for pattern in subject_patterns:
if pattern.lower() in subject.lower():
logger.debug(f"Relevanter Betreff gefunden (exakte Übereinstimmung): {subject}")
return True
# Wenn keine exakte Übereinstimmung, Fuzzy-Matching verwenden
for pattern in subject_patterns:
similarity = self.text_similarity.similarity_ratio(pattern.lower(), subject.lower())
if similarity >= threshold:
logger.debug(f"Relevanter Betreff gefunden (Fuzzy-Matching, {similarity:.2f}): {subject}")
return True
# Alternativ: Prüfe, ob der Betreff den Pattern enthält (mit Fuzzy-Matching)
if self.text_similarity.contains_similar_text(subject.lower(), [pattern.lower()], threshold=threshold):
logger.debug(f"Relevanter Betreff gefunden (Fuzzy-Contains): {subject}")
return True
return False
def get_verification_code(self, target_email: Optional[str] = None, platform: str = "instagram",
max_attempts: int = 30, delay_seconds: int = 2) -> Optional[str]:
"""
Ruft einen Bestätigungscode von einer E-Mail ab.
Args:
target_email: Ziel-E-Mail-Adresse oder None für alle
platform: Plattform (instagram, facebook, twitter, etc.)
max_attempts: Maximale Anzahl an Versuchen
delay_seconds: Verzögerung zwischen Versuchen in Sekunden
Returns:
Optional[str]: Der Bestätigungscode oder None, wenn nicht gefunden
"""
logger.info(f"Suche nach Bestätigungscode für {platform} mit E-Mail {target_email or 'alle'}")
# Bei Catch-All Domains ist die exakte E-Mail-Adresse wichtig!
if target_email:
logger.info(f"EXAKTE E-Mail-Suche: {target_email} (Catch-All Domain)")
# Letzter Tag als Suchkriterium
today = datetime.now()
yesterday = today - timedelta(days=1)
date_str = yesterday.strftime("%d-%b-%Y")
search_criteria = f'(SINCE "{date_str}")'
# E-Mail-Abruf mit Wiederholungsversuch
total_wait_time = max_attempts * delay_seconds
logger.info(f"Warte bis zu {total_wait_time} Sekunden ({total_wait_time/60:.1f} Minuten) auf E-Mail")
for attempt in range(max_attempts):
elapsed_time = attempt * delay_seconds
remaining_time = total_wait_time - elapsed_time
logger.debug(f"Versuch {attempt + 1}/{max_attempts} - Verstrichene Zeit: {elapsed_time}s, Verbleibend: {remaining_time}s")
# Alle neuen E-Mails abrufen
emails = self.search_emails(search_criteria, max_emails=10)
logger.debug(f"Gefundene E-Mails: {len(emails)}")
# E-Mails filtern und nach Bestätigungscode suchen
for idx, email_info in enumerate(emails):
logger.debug(f"E-Mail {idx+1}: To={email_info.get('to_email', 'N/A')}, Subject={email_info.get('subject', 'N/A')[:50]}...")
# Extrahierte E-Mail-Adresse des Empfängers
to_email = email_info.get("to_email", "").lower()
# WICHTIG: Bei Catch-All Domains MUSS die exakte E-Mail-Adresse übereinstimmen!
if target_email:
# NUR exakte E-Mail-Übereinstimmung zulassen
if target_email.lower() == to_email:
logger.debug(f"✓ E-Mail-Match: {to_email} == {target_email}")
else:
logger.debug(f"✗ E-Mail übersprungen: {to_email} != {target_email} (exakte Übereinstimmung erforderlich)")
continue
# Betreff auf Relevanz prüfen (mit Fuzzy-Matching)
subject = email_info.get("subject", "")
if not subject or not self._is_subject_relevant(subject, platform):
logger.debug(f"E-Mail übersprungen: Betreff '{subject}' ist nicht relevant")
continue
# Nach Bestätigungscode im Text suchen
body = email_info.get("body", "")
code = self._extract_verification_code(body, platform)
if code:
logger.info(f"Bestätigungscode gefunden: {code} (E-Mail an {to_email})")
return code
else:
logger.debug(f"Kein Code in relevanter E-Mail gefunden (Betreff: {subject})")
# Wenn kein Code gefunden wurde und noch Versuche übrig sind, warten und erneut versuchen
if attempt < max_attempts - 1:
logger.debug(f"Kein Code gefunden, warte {delay_seconds} Sekunden...")
time.sleep(delay_seconds)
logger.warning("Kein Bestätigungscode gefunden nach allen Versuchen")
return None
def _extract_verification_code(self, text: str, platform: str = "instagram") -> Optional[str]:
"""
Extrahiert einen Bestätigungscode aus einem Text.
Args:
text: Zu durchsuchender Text
platform: Plattform (instagram, facebook, twitter, etc.)
Returns:
Optional[str]: Der gefundene Bestätigungscode oder None
"""
# Plattformspezifische Muster für Bestätigungscodes
patterns = {
"instagram": [
r"Dein Code ist (\d{6})",
r"Your code is (\d{6})",
r"Bestätigungscode: (\d{6})",
r"Confirmation code: (\d{6})",
r"(\d{6}) ist dein Instagram-Code",
r"(\d{6}) is your Instagram code",
r"Instagram-Code: (\d{6})",
r"Instagram code: (\d{6})",
r"Instagram: (\d{6})",
r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern
],
"facebook": [
r"FB-(\d{5})", # Format: FB-24518 (häufigstes Format)
r"Bestätigungscode lautet (\d{5})", # "Dein Bestätigungscode lautet 24518"
r"Dein Code ist (\d{5})",
r"Your code is (\d{5})",
r"Bestätigungscode: (\d{5})",
r"Confirmation code: (\d{5})",
r"Facebook-Code: (\d{5})",
r"Facebook code: (\d{5})",
r"Facebook: (\d{5})",
r"Der Sicherheitscode lautet (\d{8})",
r"Security code is (\d{8})",
r"(\d{8}) ist dein Facebook-Code",
r"(\d{8}) is your Facebook code",
r"[^\d](\d{8})[^\d]",
r"[^\d](\d{5})[^\d]" # Zahl umgeben von Nicht-Ziffern
],
"twitter": [
r"Code: (\d{6})",
r"Verification code: (\d{6})",
r"Twitter-Code: (\d{6})",
r"Twitter code: (\d{6})",
r"Twitter: (\d{6})",
r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern
],
"x": [
r"(\d{6}) ist dein X Verifizierungscode",
r"(\d{6}) is your X verification code",
r"Code: (\d{6})",
r"Verification code: (\d{6})",
r"X-Code: (\d{6})",
r"X code: (\d{6})",
r"X: (\d{6})",
r"Verifizierungscode: (\d{6})",
r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern
],
"tiktok": [
r"(\d{6}) ist dein Bestätigungscode",
r"(\d{6}) is your confirmation code",
r"TikTok-Code: (\d{6})",
r"TikTok code: (\d{6})",
r"TikTok: (\d{6})",
r"Bestätigungscode[:\s]*(\d{6})",
r"Confirmation code[:\s]*(\d{6})",
r"[^\d](\d{6})[^\d]" # 6-stellige Zahl umgeben von Nicht-Ziffern
],
"default": [
r"Code[:\s]*(\d{4,8})",
r"[Vv]erification [Cc]ode[:\s]*(\d{4,8})",
r"[Bb]estätigungscode[:\s]*(\d{4,8})",
r"(\d{4,8}) is your code",
r"(\d{4,8}) ist dein Code",
r"[^\d](\d{6})[^\d]", # 6-stellige Zahl umgeben von Nicht-Ziffern
r"[^\d](\d{5})[^\d]" # 5-stellige Zahl umgeben von Nicht-Ziffern
]
}
# Plattformspezifische Muster verwenden
platform_patterns = patterns.get(platform.lower(), [])
# Alle Muster dieser Plattform durchsuchen
for pattern in platform_patterns:
match = re.search(pattern, text)
if match:
code = match.group(1)
logger.debug(f"Code gefunden mit Muster '{pattern}': {code}")
return code
# Wenn keine plattformspezifischen Muster gefunden wurden, Default-Muster verwenden
for pattern in patterns["default"]:
match = re.search(pattern, text)
if match:
code = match.group(1)
logger.debug(f"Code gefunden mit Default-Muster '{pattern}': {code}")
return code
# Generische Suche nach Zahlen (für die jeweilige Plattform typische Länge)
code_length = 6 # Standard
possible_lengths = [code_length]
if platform.lower() == "facebook":
possible_lengths = [8, 5]
for length in possible_lengths:
generic_pattern = r"\b(\d{" + str(length) + r"})\b"
matches = re.findall(generic_pattern, text)
if matches:
code = matches[0]
logger.debug(f"Code gefunden mit generischem Muster (Länge {length}): {code}")
return code
logger.debug("Kein Code gefunden")
return None
def get_confirmation_code(self, expected_email: str, search_criteria: str = "ALL",
max_attempts: int = 30, delay_seconds: int = 2) -> Optional[str]:
"""
Ruft einen Bestätigungscode von einer E-Mail ab (Kompatibilitätsmethode).
Args:
expected_email: E-Mail-Adresse, von der der Code erwartet wird
search_criteria: IMAP-Suchkriterien
max_attempts: Maximale Anzahl an Versuchen
delay_seconds: Verzögerung zwischen Versuchen in Sekunden
Returns:
Optional[str]: Der Bestätigungscode oder None, wenn nicht gefunden
"""
logger.info(f"Suche nach Bestätigungscode für {expected_email}")
# Vermutete Plattform basierend auf der E-Mail-Adresse oder dem Inhalt
platform = "instagram" # Standard
# Bestätigungscode abrufen
return self.get_verification_code(expected_email, platform, max_attempts, delay_seconds)