Export drin noch nicht komplett
Dieser Commit ist enthalten in:
479
utils/profile_export_service.py
Normale Datei
479
utils/profile_export_service.py
Normale Datei
@ -0,0 +1,479 @@
|
||||
"""
|
||||
Profil-Export-Service für Account-Daten
|
||||
|
||||
Exportiert Account-Profile in verschiedene Formate (CSV, TXT, PDF)
|
||||
mit optionalem Passwortschutz.
|
||||
"""
|
||||
|
||||
import os
|
||||
import csv
|
||||
import secrets
|
||||
import string
|
||||
import logging
|
||||
from io import BytesIO, StringIO
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("profile_export_service")
|
||||
|
||||
|
||||
class ProfileExportService:
|
||||
"""Service für den Export von Account-Profilen"""
|
||||
|
||||
# Felder die exportiert werden (ohne id, last_login, notes, status)
|
||||
EXPORT_FIELDS = {
|
||||
"username": "Username",
|
||||
"password": "Passwort",
|
||||
"email": "E-Mail",
|
||||
"phone": "Telefon",
|
||||
"platform": "Plattform",
|
||||
"full_name": "Name",
|
||||
"created_at": "Erstellt_am"
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def export_to_csv(account_data: Dict[str, Any]) -> bytes:
|
||||
"""
|
||||
Exportiert Account-Daten als CSV.
|
||||
|
||||
Args:
|
||||
account_data: Dictionary mit Account-Daten
|
||||
|
||||
Returns:
|
||||
CSV-Daten als bytes
|
||||
"""
|
||||
try:
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(
|
||||
output,
|
||||
fieldnames=ProfileExportService.EXPORT_FIELDS.values(),
|
||||
quoting=csv.QUOTE_MINIMAL
|
||||
)
|
||||
|
||||
# Header schreiben
|
||||
writer.writeheader()
|
||||
|
||||
# Datenzeile vorbereiten
|
||||
row_data = {}
|
||||
for field_key, field_label in ProfileExportService.EXPORT_FIELDS.items():
|
||||
value = account_data.get(field_key, "")
|
||||
# None zu leerem String konvertieren
|
||||
row_data[field_label] = value if value is not None else ""
|
||||
|
||||
# Datenzeile schreiben
|
||||
writer.writerow(row_data)
|
||||
|
||||
# In bytes konvertieren (UTF-8)
|
||||
csv_content = output.getvalue()
|
||||
output.close()
|
||||
|
||||
logger.info("CSV-Export erfolgreich")
|
||||
return csv_content.encode('utf-8')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim CSV-Export: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def export_to_txt(account_data: Dict[str, Any]) -> bytes:
|
||||
"""
|
||||
Exportiert Account-Daten als TXT (Key-Value Format).
|
||||
|
||||
Args:
|
||||
account_data: Dictionary mit Account-Daten
|
||||
|
||||
Returns:
|
||||
TXT-Daten als bytes
|
||||
"""
|
||||
try:
|
||||
lines = []
|
||||
|
||||
for field_key, field_label in ProfileExportService.EXPORT_FIELDS.items():
|
||||
value = account_data.get(field_key, "")
|
||||
# None zu leerem String konvertieren
|
||||
value = value if value is not None else ""
|
||||
lines.append(f"{field_label}: {value}")
|
||||
|
||||
# Exportdatum hinzufügen
|
||||
export_date = datetime.now().strftime("%d.%m.%Y %H:%M")
|
||||
lines.append(f"\nExportiert am: {export_date}")
|
||||
|
||||
txt_content = "\n".join(lines)
|
||||
|
||||
logger.info("TXT-Export erfolgreich")
|
||||
return txt_content.encode('utf-8')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim TXT-Export: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def export_to_pdf(account_data: Dict[str, Any]) -> bytes:
|
||||
"""
|
||||
Exportiert Account-Daten als PDF mit schönem Layout.
|
||||
|
||||
Args:
|
||||
account_data: Dictionary mit Account-Daten
|
||||
|
||||
Returns:
|
||||
PDF-Daten als bytes
|
||||
"""
|
||||
try:
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.units import mm
|
||||
from reportlab.lib import colors
|
||||
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.lib.enums import TA_LEFT, TA_CENTER
|
||||
|
||||
# PDF-Buffer erstellen
|
||||
buffer = BytesIO()
|
||||
|
||||
# Dokument erstellen
|
||||
doc = SimpleDocTemplate(
|
||||
buffer,
|
||||
pagesize=A4,
|
||||
rightMargin=20*mm,
|
||||
leftMargin=20*mm,
|
||||
topMargin=20*mm,
|
||||
bottomMargin=20*mm
|
||||
)
|
||||
|
||||
# Story (Inhalt) erstellen
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Custom Styles
|
||||
title_style = ParagraphStyle(
|
||||
'CustomTitle',
|
||||
parent=styles['Title'],
|
||||
fontSize=20,
|
||||
textColor=colors.HexColor('#1F2937'),
|
||||
spaceAfter=5*mm,
|
||||
alignment=TA_CENTER
|
||||
)
|
||||
|
||||
heading_style = ParagraphStyle(
|
||||
'CustomHeading',
|
||||
parent=styles['Heading2'],
|
||||
fontSize=14,
|
||||
textColor=colors.HexColor('#374151'),
|
||||
spaceAfter=3*mm,
|
||||
spaceBefore=5*mm
|
||||
)
|
||||
|
||||
# IntelSight Logo versuchen zu laden
|
||||
logo_path = Path("resources/icons/intelsight-logo.svg")
|
||||
if logo_path.exists():
|
||||
try:
|
||||
# SVG zu reportlab Image (mit svglib falls verfügbar)
|
||||
try:
|
||||
from svglib.svglib import svg2rlg
|
||||
from reportlab.graphics import renderPDF
|
||||
|
||||
drawing = svg2rlg(str(logo_path))
|
||||
if drawing:
|
||||
# Skalieren auf vernünftige Größe
|
||||
drawing.width = 40*mm
|
||||
drawing.height = 10*mm
|
||||
drawing.scale(40*mm/drawing.width, 10*mm/drawing.height)
|
||||
story.append(drawing)
|
||||
story.append(Spacer(1, 5*mm))
|
||||
except ImportError:
|
||||
# svglib nicht verfügbar - überspringen
|
||||
logger.debug("svglib nicht verfügbar - Logo wird übersprungen")
|
||||
except Exception as e:
|
||||
logger.debug(f"Logo konnte nicht geladen werden: {e}")
|
||||
|
||||
# Titel
|
||||
username = account_data.get("username", "Unbekannt")
|
||||
platform = account_data.get("platform", "Unbekannt")
|
||||
title = Paragraph(f"Account-Profil: {username}", title_style)
|
||||
story.append(title)
|
||||
|
||||
# Plattform
|
||||
platform_text = Paragraph(f"Plattform: {platform.title()}", styles['Normal'])
|
||||
story.append(platform_text)
|
||||
story.append(Spacer(1, 10*mm))
|
||||
|
||||
# LOGIN-DATEN Sektion
|
||||
login_heading = Paragraph("LOGIN-DATEN", heading_style)
|
||||
story.append(login_heading)
|
||||
|
||||
# Login-Daten Tabelle
|
||||
login_data = [
|
||||
["Benutzername:", account_data.get("username", "")],
|
||||
["Passwort:", account_data.get("password", "")],
|
||||
["E-Mail:", account_data.get("email", "") or "-"],
|
||||
["Telefon:", account_data.get("phone", "") or "-"]
|
||||
]
|
||||
|
||||
login_table = Table(login_data, colWidths=[45*mm, 115*mm])
|
||||
login_table.setStyle(TableStyle([
|
||||
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
|
||||
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
|
||||
('FONTSIZE', (0, 0), (-1, -1), 11),
|
||||
('TEXTCOLOR', (0, 0), (0, -1), colors.HexColor('#6B7280')),
|
||||
('TEXTCOLOR', (1, 0), (1, -1), colors.HexColor('#1F2937')),
|
||||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||||
('LEFTPADDING', (0, 0), (-1, -1), 0),
|
||||
('RIGHTPADDING', (0, 0), (-1, -1), 0),
|
||||
('TOPPADDING', (0, 0), (-1, -1), 2*mm),
|
||||
('BOTTOMPADDING', (0, 0), (-1, -1), 2*mm),
|
||||
]))
|
||||
|
||||
story.append(login_table)
|
||||
story.append(Spacer(1, 8*mm))
|
||||
|
||||
# PROFIL-INFORMATIONEN Sektion
|
||||
profile_heading = Paragraph("PROFIL-INFORMATIONEN", heading_style)
|
||||
story.append(profile_heading)
|
||||
|
||||
# Profil-Daten Tabelle
|
||||
profile_data = [
|
||||
["Name:", account_data.get("full_name", "") or "-"],
|
||||
["Erstellt am:", account_data.get("created_at", "") or "-"]
|
||||
]
|
||||
|
||||
profile_table = Table(profile_data, colWidths=[45*mm, 115*mm])
|
||||
profile_table.setStyle(TableStyle([
|
||||
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
|
||||
('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
|
||||
('FONTSIZE', (0, 0), (-1, -1), 11),
|
||||
('TEXTCOLOR', (0, 0), (0, -1), colors.HexColor('#6B7280')),
|
||||
('TEXTCOLOR', (1, 0), (1, -1), colors.HexColor('#1F2937')),
|
||||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||||
('LEFTPADDING', (0, 0), (-1, -1), 0),
|
||||
('RIGHTPADDING', (0, 0), (-1, -1), 0),
|
||||
('TOPPADDING', (0, 0), (-1, -1), 2*mm),
|
||||
('BOTTOMPADDING', (0, 0), (-1, -1), 2*mm),
|
||||
]))
|
||||
|
||||
story.append(profile_table)
|
||||
story.append(Spacer(1, 15*mm))
|
||||
|
||||
# Export-Datum (Footer)
|
||||
export_date = datetime.now().strftime("%d.%m.%Y %H:%M")
|
||||
footer_text = Paragraph(
|
||||
f"Exportiert am: {export_date}",
|
||||
ParagraphStyle(
|
||||
'Footer',
|
||||
parent=styles['Normal'],
|
||||
fontSize=9,
|
||||
textColor=colors.HexColor('#9CA3AF'),
|
||||
alignment=TA_CENTER
|
||||
)
|
||||
)
|
||||
story.append(footer_text)
|
||||
|
||||
# PDF erstellen
|
||||
doc.build(story)
|
||||
|
||||
# Buffer-Wert holen
|
||||
pdf_content = buffer.getvalue()
|
||||
buffer.close()
|
||||
|
||||
logger.info("PDF-Export erfolgreich")
|
||||
return pdf_content
|
||||
|
||||
except ImportError as e:
|
||||
logger.error(f"reportlab nicht installiert: {e}")
|
||||
raise Exception(
|
||||
"PDF-Export erfordert 'reportlab' Library. "
|
||||
"Bitte installieren Sie: pip install reportlab"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim PDF-Export: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def generate_password(length: int = 10) -> str:
|
||||
"""
|
||||
Generiert ein sicheres zufälliges Passwort.
|
||||
|
||||
Args:
|
||||
length: Länge des Passworts (Standard: 10)
|
||||
|
||||
Returns:
|
||||
Generiertes Passwort
|
||||
"""
|
||||
# Zeichensatz: Groß- und Kleinbuchstaben, Zahlen, Sonderzeichen
|
||||
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
|
||||
|
||||
# Sicherstellen dass mindestens ein Zeichen von jedem Typ vorhanden ist
|
||||
password_chars = [
|
||||
secrets.choice(string.ascii_uppercase), # Mindestens ein Großbuchstabe
|
||||
secrets.choice(string.ascii_lowercase), # Mindestens ein Kleinbuchstabe
|
||||
secrets.choice(string.digits), # Mindestens eine Zahl
|
||||
secrets.choice("!@#$%^&*") # Mindestens ein Sonderzeichen
|
||||
]
|
||||
|
||||
# Rest auffüllen
|
||||
for _ in range(length - 4):
|
||||
password_chars.append(secrets.choice(alphabet))
|
||||
|
||||
# Mischen für Zufälligkeit
|
||||
secrets.SystemRandom().shuffle(password_chars)
|
||||
|
||||
password = ''.join(password_chars)
|
||||
logger.info("Passwort generiert")
|
||||
return password
|
||||
|
||||
@staticmethod
|
||||
def create_protected_zip(files_dict: Dict[str, bytes], password: str) -> bytes:
|
||||
"""
|
||||
Erstellt eine passwortgeschützte ZIP-Datei.
|
||||
|
||||
Args:
|
||||
files_dict: Dictionary mit {filename: content} Paaren
|
||||
password: Passwort für die ZIP-Datei
|
||||
|
||||
Returns:
|
||||
ZIP-Daten als bytes
|
||||
"""
|
||||
try:
|
||||
import pyzipper
|
||||
|
||||
# ZIP-Buffer erstellen
|
||||
buffer = BytesIO()
|
||||
|
||||
# ZIP mit AES-Verschlüsselung erstellen
|
||||
with pyzipper.AESZipFile(
|
||||
buffer,
|
||||
'w',
|
||||
compression=pyzipper.ZIP_DEFLATED,
|
||||
encryption=pyzipper.WZ_AES
|
||||
) as zf:
|
||||
# Passwort setzen
|
||||
zf.setpassword(password.encode('utf-8'))
|
||||
|
||||
# Dateien hinzufügen
|
||||
for filename, content in files_dict.items():
|
||||
zf.writestr(filename, content)
|
||||
|
||||
# Buffer-Wert holen
|
||||
zip_content = buffer.getvalue()
|
||||
buffer.close()
|
||||
|
||||
logger.info(f"Passwortgeschützte ZIP erstellt mit {len(files_dict)} Dateien")
|
||||
return zip_content
|
||||
|
||||
except ImportError as e:
|
||||
logger.error(f"pyzipper nicht installiert: {e}")
|
||||
raise Exception(
|
||||
"ZIP mit Passwortschutz erfordert 'pyzipper' Library. "
|
||||
"Bitte installieren Sie: pip install pyzipper"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Erstellen der geschützten ZIP: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def generate_filename(
|
||||
account_data: Dict[str, Any],
|
||||
format_extension: str,
|
||||
include_timestamp: bool = True
|
||||
) -> str:
|
||||
"""
|
||||
Generiert einen Dateinamen nach Konvention.
|
||||
|
||||
Format: {username}_{platform}_{timestamp}.{extension}
|
||||
Beispiel: max_mueller_instagram_2025-11-10_14-30.csv
|
||||
|
||||
Args:
|
||||
account_data: Account-Daten
|
||||
format_extension: Dateiendung (z.B. "csv", "txt", "pdf", "zip")
|
||||
include_timestamp: Ob Timestamp hinzugefügt werden soll
|
||||
|
||||
Returns:
|
||||
Generierter Dateiname
|
||||
"""
|
||||
# Username und Platform aus Account-Daten
|
||||
username = account_data.get("username", "account")
|
||||
platform = account_data.get("platform", "unknown").lower()
|
||||
|
||||
# Sonderzeichen entfernen für Dateinamen
|
||||
username = ProfileExportService._sanitize_filename(username)
|
||||
platform = ProfileExportService._sanitize_filename(platform)
|
||||
|
||||
# Timestamp
|
||||
if include_timestamp:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
|
||||
filename = f"{username}_{platform}_{timestamp}.{format_extension}"
|
||||
else:
|
||||
filename = f"{username}_{platform}.{format_extension}"
|
||||
|
||||
return filename
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_filename(filename: str) -> str:
|
||||
"""
|
||||
Entfernt ungültige Zeichen aus Dateinamen.
|
||||
|
||||
Args:
|
||||
filename: Ursprünglicher Dateiname
|
||||
|
||||
Returns:
|
||||
Bereinigter Dateiname
|
||||
"""
|
||||
# Nur alphanumerische Zeichen, Unterstrich und Bindestrich erlauben
|
||||
valid_chars = f"-_.{string.ascii_letters}{string.digits}"
|
||||
sanitized = ''.join(c if c in valid_chars else '_' for c in filename)
|
||||
return sanitized
|
||||
|
||||
@staticmethod
|
||||
def export_account(
|
||||
account_data: Dict[str, Any],
|
||||
formats: List[str],
|
||||
password_protect: bool = False
|
||||
) -> Tuple[Dict[str, bytes], Optional[str]]:
|
||||
"""
|
||||
Exportiert Account-Daten in angegebene Formate.
|
||||
|
||||
Args:
|
||||
account_data: Account-Daten zum Exportieren
|
||||
formats: Liste von Formaten ["csv", "txt", "pdf"]
|
||||
password_protect: Ob Dateien passwortgeschützt werden sollen
|
||||
|
||||
Returns:
|
||||
Tuple von (files_dict, password)
|
||||
- files_dict: {filename: content} für alle Formate
|
||||
- password: Generiertes Passwort (None wenn nicht geschützt)
|
||||
"""
|
||||
files_dict = {}
|
||||
|
||||
# Jedes Format exportieren
|
||||
for fmt in formats:
|
||||
fmt = fmt.lower()
|
||||
|
||||
if fmt == "csv":
|
||||
content = ProfileExportService.export_to_csv(account_data)
|
||||
filename = ProfileExportService.generate_filename(account_data, "csv")
|
||||
files_dict[filename] = content
|
||||
|
||||
elif fmt == "txt":
|
||||
content = ProfileExportService.export_to_txt(account_data)
|
||||
filename = ProfileExportService.generate_filename(account_data, "txt")
|
||||
files_dict[filename] = content
|
||||
|
||||
elif fmt == "pdf":
|
||||
content = ProfileExportService.export_to_pdf(account_data)
|
||||
filename = ProfileExportService.generate_filename(account_data, "pdf")
|
||||
files_dict[filename] = content
|
||||
else:
|
||||
logger.warning(f"Unbekanntes Format ignoriert: {fmt}")
|
||||
|
||||
# Passwortschutz wenn gewünscht
|
||||
password = None
|
||||
if password_protect and files_dict:
|
||||
password = ProfileExportService.generate_password()
|
||||
|
||||
# Alle Dateien in ZIP packen
|
||||
zip_filename = ProfileExportService.generate_filename(account_data, "zip")
|
||||
zip_content = ProfileExportService.create_protected_zip(files_dict, password)
|
||||
|
||||
# Nur ZIP zurückgeben
|
||||
files_dict = {zip_filename: zip_content}
|
||||
|
||||
return files_dict, password
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren