Dieser Commit ist enthalten in:
Claude Project Manager
2025-08-01 23:50:28 +02:00
Commit 04585e95b6
290 geänderte Dateien mit 64086 neuen und 0 gelöschten Zeilen

0
licensing/__init__.py Normale Datei
Datei anzeigen

362
licensing/api_client.py Normale Datei
Datei anzeigen

@ -0,0 +1,362 @@
"""
API Client für die Kommunikation mit dem License Server.
"""
import requests
import json
import logging
from typing import Dict, Any, Optional
from urllib.parse import urljoin
logger = logging.getLogger("license_api_client")
logger.setLevel(logging.DEBUG)
# Füge Console Handler hinzu falls noch nicht vorhanden
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
class LicenseAPIClient:
"""Client für die Kommunikation mit dem License Server API."""
def __init__(self, base_url: str = "https://api-software-undso.intelsight.de",
api_key: str = "AF-2025-8E57CA6A97E257C5FA3E7778B8B44413"): # TODO: Update with valid API key
"""
Initialisiert den API Client.
Args:
base_url: Basis-URL des License Servers
api_key: API Key für die Authentifizierung
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
'X-API-Key': self.api_key,
'Content-Type': 'application/json',
'User-Agent': 'AccountForger/1.0.0'
})
def _make_request(self, method: str, endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Führt eine API-Anfrage aus.
Args:
method: HTTP-Methode (GET, POST, etc.)
endpoint: API-Endpunkt (z.B. '/api/license/activate')
data: Request-Body (für POST/PUT)
params: Query-Parameter (für GET)
Returns:
Response als Dictionary
Raises:
requests.exceptions.RequestException: Bei Netzwerkfehlern
"""
url = urljoin(self.base_url, endpoint)
try:
logger.debug(f"API Request: {method} {url}")
logger.debug(f"Headers: {self.session.headers}")
if data:
logger.debug(f"Request Body: {data}")
response = self.session.request(
method=method,
url=url,
json=data,
params=params,
timeout=30
)
logger.debug(f"API Response: {response.status_code}")
# Bei 401 ist der API Key ungültig
if response.status_code == 401:
error_data = response.json() if response.text else {}
logger.error(f"API Key ungültig: {error_data.get('error', 'Unauthorized')}")
return {
'success': False,
'error': error_data.get('error', 'Invalid or missing API key'),
'code': 'INVALID_API_KEY',
'status': 401
}
# Erfolgreiche Responses
if response.status_code in [200, 201]:
return {
'success': True,
'data': response.json(),
'status': response.status_code
}
# Andere Fehler
try:
error_data = response.json()
return {
'success': False,
'error': error_data.get('error', f'HTTP {response.status_code}'),
'code': error_data.get('code', 'UNKNOWN_ERROR'),
'status': response.status_code,
'data': error_data
}
except:
return {
'success': False,
'error': f'HTTP {response.status_code}: {response.text}',
'code': 'HTTP_ERROR',
'status': response.status_code
}
except requests.exceptions.Timeout:
logger.error(f"Timeout bei Anfrage an {url}")
return {
'success': False,
'error': 'Request timeout',
'code': 'TIMEOUT',
'status': 0
}
except requests.exceptions.ConnectionError:
logger.error(f"Verbindungsfehler bei Anfrage an {url}")
return {
'success': False,
'error': 'Connection error',
'code': 'CONNECTION_ERROR',
'status': 0
}
except Exception as e:
logger.error(f"Unerwarteter Fehler bei API-Anfrage: {e}")
return {
'success': False,
'error': str(e),
'code': 'UNKNOWN_ERROR',
'status': 0
}
def activate_license(self, license_key: str, hardware_hash: str,
machine_name: str, app_version: str = "1.0.0") -> Dict[str, Any]:
"""
Aktiviert eine Lizenz auf einem neuen System.
Args:
license_key: Lizenzschlüssel (XXXX-XXXX-XXXX-XXXX)
hardware_hash: Eindeutiger Hardware-Identifier
machine_name: Name des Computers
app_version: Version der Anwendung
Returns:
API Response
"""
data = {
'license_key': license_key,
'hardware_fingerprint': hardware_hash, # Neuer Parameter-Name
'machine_name': machine_name, # Neuer Parameter-Name
'app_version': app_version
}
return self._make_request('POST', '/api/license/activate', data=data)
def verify_license(self, license_key: str, hardware_hash: str,
machine_id: str, activation_id: int,
app_version: str = "1.0.0") -> Dict[str, Any]:
"""
Verifiziert eine aktive Lizenz.
Args:
license_key: Lizenzschlüssel
hardware_hash: Hardware-Identifier
machine_id: Maschinen-ID
activation_id: Aktivierungs-ID (von activate_license)
app_version: Version der Anwendung
Returns:
API Response
"""
data = {
'license_key': license_key,
'hardware_fingerprint': hardware_hash, # Neuer Parameter-Name
'machine_name': machine_id, # Neuer Parameter-Name
'activation_id': activation_id,
'app_version': app_version
}
return self._make_request('POST', '/api/license/verify', data=data)
def get_license_info(self, license_key: str) -> Dict[str, Any]:
"""
Holt Informationen zu einer Lizenz.
Args:
license_key: Lizenzschlüssel
Returns:
API Response
"""
return self._make_request('GET', f'/api/license/info/{license_key}')
def start_session(self, license_key: str, machine_id: str,
hardware_hash: str, version: str = "1.0.0", ip_address: str = None) -> Dict[str, Any]:
"""
Startet eine neue Session für eine Lizenz.
Args:
license_key: Lizenzschlüssel
machine_id: Maschinen-ID (z.B. Computername)
hardware_hash: Hardware-Identifier
version: App-Version
ip_address: IP-Adresse des Clients (NEU)
Returns:
API Response mit session_token
"""
data = {
'license_key': license_key,
# Neue Parameter-Namen
'machine_name': machine_id,
'hardware_fingerprint': hardware_hash,
# Alte Parameter-Namen für Backward Compatibility
'machine_id': machine_id,
'hardware_id': hardware_hash, # Server erwartet beide Hardware-Felder
'hardware_hash': hardware_hash,
'version': version
}
# IP-Adresse hinzufügen wenn vorhanden
if ip_address:
data['ip_address'] = ip_address
return self._make_request('POST', '/api/license/session/start', data=data)
def session_heartbeat(self, session_token: str, license_key: str) -> Dict[str, Any]:
"""
Sendet einen Heartbeat für eine aktive Session.
Args:
session_token: Session Token von start_session
license_key: Lizenzschlüssel
Returns:
API Response
"""
data = {
'session_token': session_token,
'license_key': license_key
}
return self._make_request('POST', '/api/license/session/heartbeat', data=data)
def end_session(self, session_token: str) -> Dict[str, Any]:
"""
Beendet eine aktive Session.
Args:
session_token: Session Token
Returns:
API Response
"""
data = {
'session_token': session_token
}
return self._make_request('POST', '/api/license/session/end', data=data)
def check_version(self, current_version: str, license_key: str) -> Dict[str, Any]:
"""
Prüft auf verfügbare Updates.
Args:
current_version: Aktuelle Version der App
license_key: Lizenzschlüssel
Returns:
API Response mit Update-Info
"""
data = {
'current_version': current_version,
'license_key': license_key
}
return self._make_request('POST', '/api/version/check', data=data)
def get_latest_version(self) -> Dict[str, Any]:
"""
Holt Informationen zur neuesten Version.
Returns:
API Response
"""
return self._make_request('GET', '/api/version/latest')
def test_connection(self) -> Dict[str, Any]:
"""
Testet die Verbindung zum Server.
Returns:
API Response
"""
try:
response = self.session.get(
urljoin(self.base_url, '/health'),
timeout=10
)
if response.status_code == 200:
return {
'success': True,
'data': response.json(),
'status': 200
}
else:
return {
'success': False,
'error': f'Server returned status {response.status_code}',
'status': response.status_code
}
except Exception as e:
return {
'success': False,
'error': str(e),
'status': 0
}
# Test-Funktion
if __name__ == "__main__":
# Logging konfigurieren
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# API Client erstellen
client = LicenseAPIClient()
print("=== License Server API Client Test ===\n")
# 1. Verbindung testen
print("1. Teste Verbindung zum Server...")
result = client.test_connection()
print(f" Ergebnis: {'' if result['success'] else ''}")
if result['success']:
print(f" Server Status: {result['data']}")
else:
print(f" Fehler: {result['error']}")
print("\n2. Teste API Key Authentifizierung...")
# Versuche License Info zu holen (benötigt gültigen API Key)
test_license = "TEST-TEST-TEST-TEST"
result = client.get_license_info(test_license)
print(f" Ergebnis: {'' if result['success'] else ''}")
if not result['success']:
print(f" Status: {result['status']}")
print(f" Fehler: {result['error']}")
if result['status'] == 401:
print(" → API Key wird abgelehnt!")
else:
print(f" → API Key wird akzeptiert!")
print("\n=== Test abgeschlossen ===")

Datei anzeigen

@ -0,0 +1,312 @@
"""
Hardware Fingerprint Generator fuer die Lizenzierung.
Erstellt einen eindeutigen Hardware-Hash basierend auf System-Eigenschaften.
"""
import hashlib
import platform
import socket
import uuid
import os
import subprocess
import logging
from typing import List, Optional
logger = logging.getLogger("hardware_fingerprint")
class HardwareFingerprint:
"""Generiert und verwaltet Hardware-Fingerprints f<>r die Lizenzierung."""
FINGERPRINT_FILE = os.path.join("config", ".hardware_id")
def __init__(self):
"""Initialisiert den Fingerprint-Generator."""
os.makedirs("config", exist_ok=True)
def get_mac_address(self) -> Optional[str]:
"""
Holt die MAC-Adresse der prim<69>ren Netzwerkkarte.
Returns:
MAC-Adresse als String oder None
"""
try:
# UUID-basierte MAC-Adresse (funktioniert cross-platform)
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> ele) & 0xff)
for ele in range(0,8*6,8)][::-1])
if mac != "00:00:00:00:00:00":
return mac
except Exception as e:
logger.warning(f"Fehler beim Abrufen der MAC-Adresse: {e}")
return None
def get_cpu_info(self) -> str:
"""
Holt CPU-Informationen.
Returns:
CPU-Info als String
"""
try:
# Platform-unabh<62>ngige CPU-Info
cpu_info = platform.processor()
if not cpu_info:
cpu_info = platform.machine()
return cpu_info or "unknown"
except Exception as e:
logger.warning(f"Fehler beim Abrufen der CPU-Info: {e}")
return "unknown"
def get_system_uuid(self) -> Optional[str]:
"""
Versucht die System-UUID zu ermitteln.
Returns:
System-UUID als String oder None
"""
try:
# Windows
if platform.system() == "Windows":
try:
output = subprocess.check_output(
"wmic csproduct get UUID",
shell=True,
stderr=subprocess.DEVNULL
).decode()
lines = output.strip().split('\n')
if len(lines) > 1:
uuid_str = lines[1].strip()
if uuid_str and uuid_str != "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF":
return uuid_str
except:
pass
# Linux
elif platform.system() == "Linux":
try:
with open("/sys/class/dmi/id/product_uuid", "r") as f:
uuid_str = f.read().strip()
if uuid_str:
return uuid_str
except:
pass
# Alternative f<>r Linux
try:
with open("/etc/machine-id", "r") as f:
return f.read().strip()
except:
pass
# macOS
elif platform.system() == "Darwin":
try:
output = subprocess.check_output(
["ioreg", "-rd1", "-c", "IOPlatformExpertDevice"],
stderr=subprocess.DEVNULL
).decode()
for line in output.split('\n'):
if 'IOPlatformUUID' in line:
uuid_str = line.split('"')[-2]
if uuid_str:
return uuid_str
except:
pass
except Exception as e:
logger.warning(f"Fehler beim Abrufen der System-UUID: {e}")
return None
def get_disk_serial(self) -> Optional[str]:
"""
Versucht die Seriennummer der Systemfestplatte zu ermitteln.
Returns:
Disk Serial als String oder None
"""
try:
if platform.system() == "Windows":
try:
output = subprocess.check_output(
"wmic diskdrive get serialnumber",
shell=True,
stderr=subprocess.DEVNULL
).decode()
lines = output.strip().split('\n')
for line in lines[1:]:
serial = line.strip()
if serial and serial != "SerialNumber":
return serial
except:
pass
elif platform.system() == "Linux":
try:
# Versuche verschiedene Methoden
for device in ["/dev/sda", "/dev/nvme0n1", "/dev/vda"]:
if os.path.exists(device):
try:
output = subprocess.check_output(
["sudo", "hdparm", "-I", device],
stderr=subprocess.DEVNULL
).decode()
for line in output.split('\n'):
if 'Serial Number:' in line:
return line.split(':')[1].strip()
except:
pass
except:
pass
except Exception as e:
logger.warning(f"Fehler beim Abrufen der Disk-Serial: {e}")
return None
def generate_hardware_hash(self) -> str:
"""
Generiert einen eindeutigen Hardware-Hash basierend auf verschiedenen
System-Eigenschaften.
Returns:
Hardware-Hash als String
"""
components = []
# 1. Hostname (immer verf<72>gbar)
hostname = socket.gethostname()
components.append(f"HOST:{hostname}")
# 2. MAC-Adresse
mac = self.get_mac_address()
if mac:
components.append(f"MAC:{mac}")
# 3. CPU-Info
cpu = self.get_cpu_info()
components.append(f"CPU:{cpu}")
# 4. System-UUID
sys_uuid = self.get_system_uuid()
if sys_uuid:
components.append(f"UUID:{sys_uuid}")
# 5. Disk Serial
disk_serial = self.get_disk_serial()
if disk_serial:
components.append(f"DISK:{disk_serial}")
# 6. Platform-Info
components.append(f"PLATFORM:{platform.system()}-{platform.machine()}")
# 7. Username (als Fallback)
try:
username = os.getlogin()
components.append(f"USER:{username}")
except:
pass
# Kombiniere alle Komponenten
fingerprint_data = "|".join(sorted(components))
# Erstelle SHA256 Hash
hash_object = hashlib.sha256(fingerprint_data.encode())
hardware_hash = hash_object.hexdigest()
logger.info(f"Hardware-Fingerprint generiert mit {len(components)} Komponenten")
logger.debug(f"Komponenten: {components}")
return hardware_hash
def get_or_create_fingerprint(self) -> str:
"""
Holt den gespeicherten Fingerprint oder erstellt einen neuen.
Returns:
Hardware-Fingerprint als String
"""
# Pr<50>fe ob bereits ein Fingerprint existiert
if os.path.exists(self.FINGERPRINT_FILE):
try:
with open(self.FINGERPRINT_FILE, 'r') as f:
stored_hash = f.read().strip()
if stored_hash:
logger.info("Gespeicherten Hardware-Fingerprint gefunden")
return stored_hash
except Exception as e:
logger.warning(f"Fehler beim Lesen des gespeicherten Fingerprints: {e}")
# Generiere neuen Fingerprint
hardware_hash = self.generate_hardware_hash()
# Speichere f<>r zuk<75>nftige Verwendung
try:
with open(self.FINGERPRINT_FILE, 'w') as f:
f.write(hardware_hash)
logger.info("Hardware-Fingerprint gespeichert")
except Exception as e:
logger.error(f"Fehler beim Speichern des Fingerprints: {e}")
return hardware_hash
def get_machine_name(self) -> str:
"""
Gibt den Maschinennamen zur<75>ck.
Returns:
Maschinenname
"""
try:
return socket.gethostname()
except:
return "Unknown-PC"
def get_system_info(self) -> dict:
"""
Sammelt detaillierte System-Informationen.
Returns:
Dictionary mit System-Infos
"""
return {
"hostname": self.get_machine_name(),
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"architecture": platform.machine(),
"processor": self.get_cpu_info(),
"python_version": platform.python_version(),
"mac_address": self.get_mac_address(),
"system_uuid": self.get_system_uuid()
}
# Test-Funktion
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== Hardware Fingerprint Test ===\n")
fingerprint = HardwareFingerprint()
# System-Info anzeigen
print("System-Informationen:")
info = fingerprint.get_system_info()
for key, value in info.items():
print(f" {key}: {value}")
# Fingerprint generieren
print("\nHardware-Fingerprint:")
hw_hash = fingerprint.get_or_create_fingerprint()
print(f" Hash: {hw_hash}")
print(f" L<>nge: {len(hw_hash)} Zeichen")
# Maschinenname
print(f"\nMaschinenname: {fingerprint.get_machine_name()}")
print("\n=== Test abgeschlossen ===")

472
licensing/license_manager.py Normale Datei
Datei anzeigen

@ -0,0 +1,472 @@
"""
Lizenzverwaltungsfunktionalität für den Social Media Account Generator.
"""
import os
import json
import time
import uuid
import hmac
import hashlib
import logging
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, Union
from .api_client import LicenseAPIClient
from .hardware_fingerprint import HardwareFingerprint
from .session_manager import SessionManager
logger = logging.getLogger("license_manager")
class LicenseManager:
"""Klasse zur Verwaltung von Softwarelizenzen."""
CONFIG_FILE = os.path.join("config", "license.json")
def __init__(self):
"""Initialisiert den LicenseManager und lädt die Konfiguration."""
# API Client und andere Komponenten initialisieren
self.api_client = LicenseAPIClient()
self.hardware_fingerprint = HardwareFingerprint()
self.session_manager = SessionManager(self.api_client)
self.license_data = self.load_license_data()
self.machine_id = self.hardware_fingerprint.get_machine_name()
self.hardware_hash = self.hardware_fingerprint.get_or_create_fingerprint()
# Stelle sicher, dass das Konfigurationsverzeichnis existiert
os.makedirs(os.path.dirname(self.CONFIG_FILE), exist_ok=True)
def load_license_data(self) -> Dict[str, Any]:
"""Lädt die Lizenzdaten aus der Konfigurationsdatei."""
if not os.path.exists(self.CONFIG_FILE):
return {
"key": "",
"activation_id": None,
"activation_date": "",
"expiry_date": "",
"status": "inactive",
"status_text": "Keine Lizenz aktiviert",
"features": [],
"last_online_check": "",
"max_activations": 0,
"max_users": 0
}
try:
with open(self.CONFIG_FILE, "r", encoding="utf-8") as f:
license_data = json.load(f)
logger.info(f"Lizenzdaten geladen: Status '{license_data.get('status', 'unbekannt')}'")
return license_data
except Exception as e:
logger.error(f"Fehler beim Laden der Lizenzdaten: {e}")
return {
"key": "",
"activation_id": None,
"activation_date": "",
"expiry_date": "",
"status": "inactive",
"status_text": "Fehler beim Laden der Lizenz",
"features": [],
"last_online_check": "",
"max_activations": 0,
"max_users": 0
}
def save_license_data(self) -> bool:
"""Speichert die Lizenzdaten in die Konfigurationsdatei."""
try:
with open(self.CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(self.license_data, f, indent=2)
logger.info("Lizenzdaten gespeichert")
return True
except Exception as e:
logger.error(f"Fehler beim Speichern der Lizenzdaten: {e}")
return False
def clear_license_data(self) -> None:
"""Löscht alle Lizenzdaten und setzt auf Standardwerte zurück."""
self.license_data = {
"key": "",
"activation_id": None,
"activation_date": "",
"expiry_date": "",
"status": "inactive",
"status_text": "Keine Lizenz aktiviert",
"features": [],
"last_online_check": "",
"max_activations": 0,
"max_users": 0
}
self.save_license_data()
# Session beenden
if self.session_manager.is_session_active():
self.session_manager.end_session()
def get_license_info(self) -> Dict[str, Any]:
"""Gibt die aktuellen Lizenzdaten zurück."""
return self.license_data
def is_licensed(self) -> bool:
"""
Überprüft, ob eine gültige Lizenz vorhanden ist.
Returns:
True, wenn eine gültige Lizenz vorhanden ist, sonst False
"""
# Prüfe den Status der Lizenz
if self.license_data["status"] not in ["active", "trial"]:
return False
# Prüfe, ob die Lizenz abgelaufen ist
if self.license_data["expiry_date"]:
try:
expiry_date = datetime.fromisoformat(self.license_data["expiry_date"])
if datetime.now() > expiry_date:
logger.warning("Lizenz ist abgelaufen")
self.license_data["status"] = "expired"
self.license_data["status_text"] = "Lizenz abgelaufen"
self.save_license_data()
return False
except Exception as e:
logger.error(f"Fehler beim Parsen des Ablaufdatums: {e}")
return False
# Prüfe, ob regelmäßige Online-Verifizierung erforderlich ist
if self.license_data["last_online_check"]:
try:
last_check = datetime.fromisoformat(self.license_data["last_online_check"])
max_offline_days = 7 # Maximale Tage ohne Online-Check
if datetime.now() > last_check + timedelta(days=max_offline_days):
logger.warning(f"Letzte Online-Überprüfung ist mehr als {max_offline_days} Tage her")
# Versuche, eine Online-Überprüfung durchzuführen
if not self.online_verification():
self.license_data["status"] = "verification_required"
self.license_data["status_text"] = "Online-Überprüfung erforderlich"
self.save_license_data()
return False
except Exception as e:
logger.error(f"Fehler bei der Überprüfung der Online-Verifizierung: {e}")
return True
def online_verification(self) -> bool:
"""
Führt eine Online-Verifizierung der Lizenz durch.
Returns:
True, wenn die Online-Verifizierung erfolgreich war, sonst False
"""
if not self.license_data.get("activation_id"):
logger.warning("Keine Aktivierungs-ID vorhanden")
return False
logger.info("Führe Online-Verifizierung durch...")
try:
# API Call für Verifizierung
result = self.api_client.verify_license(
license_key=self.license_data["key"],
hardware_hash=self.hardware_hash,
machine_id=self.machine_id,
activation_id=self.license_data["activation_id"],
app_version="1.0.0"
)
if result.get("success"):
data = result.get("data", {})
# Update license data
if data.get("valid"):
self.license_data["last_online_check"] = datetime.now().isoformat()
self.license_data["status"] = "active"
self.license_data["status_text"] = "Lizenz aktiv"
# Update expiry date if provided
license_info = data.get("license", {})
if license_info.get("valid_until"):
self.license_data["expiry_date"] = license_info["valid_until"]
self.save_license_data()
logger.info("Online-Verifizierung erfolgreich")
return True
else:
logger.warning("Lizenz ist nicht mehr gültig")
self.license_data["status"] = "invalid"
self.license_data["status_text"] = data.get("message", "Lizenz ungültig")
self.save_license_data()
return False
else:
logger.error(f"Online-Verifizierung fehlgeschlagen: {result.get('error')}")
return False
except Exception as e:
logger.error(f"Fehler bei der Online-Verifizierung: {e}")
return False
def activate_license(self, license_key: str) -> Dict[str, Any]:
"""
Aktiviert eine Lizenz mit dem angegebenen Schlüssel.
Args:
license_key: Zu aktivierender Lizenzschlüssel
Returns:
Dictionary mit Erfolg und Nachricht
"""
if not license_key:
return {
"success": False,
"error": "Bitte geben Sie einen Lizenzschlüssel ein."
}
logger.info(f"Aktiviere Lizenz: {license_key[:4]}...")
try:
# API Call für Aktivierung
result = self.api_client.activate_license(
license_key=license_key,
hardware_hash=self.hardware_hash,
machine_name=self.machine_id,
app_version="1.0.0"
)
if result.get("success"):
data = result.get("data", {})
activation = data.get("activation", {})
# Speichere Lizenzdaten
self.license_data["key"] = license_key
self.license_data["activation_id"] = activation.get("id")
self.license_data["activation_date"] = activation.get("activated_at", datetime.now().isoformat())
self.license_data["status"] = "active"
self.license_data["status_text"] = "Lizenz erfolgreich aktiviert"
self.license_data["last_online_check"] = datetime.now().isoformat()
# Hole zusätzliche Lizenzinfos
info_result = self.api_client.get_license_info(license_key)
if info_result.get("success"):
license_info = info_result.get("data", {}).get("license", {})
self.license_data["expiry_date"] = license_info.get("valid_until", "")
self.license_data["max_activations"] = license_info.get("max_activations", 0)
self.license_data["max_users"] = license_info.get("max_users", 0)
self.license_data["features"] = self._extract_features(license_info)
self.save_license_data()
# Starte Session
session_result = self.session_manager.start_session(
license_key=license_key,
activation_id=self.license_data["activation_id"]
)
if not session_result.get("success"):
logger.warning(f"Session konnte nicht gestartet werden: {session_result.get('error')}")
logger.info("Lizenz erfolgreich aktiviert")
return {
"success": True,
"message": "Lizenz erfolgreich aktiviert",
"update_available": session_result.get("update_info", {}).get("available", False),
"latest_version": session_result.get("update_info", {}).get("version")
}
else:
error = result.get("error", "Unbekannter Fehler")
status = result.get("status", 0)
# Spezifische Fehlermeldungen
if status == 404:
error = "Lizenzschlüssel nicht gefunden"
elif status == 409:
error = "Lizenz ist bereits auf einem anderen System aktiviert"
elif status == 422:
error = "Ungültiges Lizenzformat"
elif status == 403:
error = "Lizenz ist deaktiviert oder abgelaufen"
logger.error(f"Lizenzaktivierung fehlgeschlagen: {error}")
return {
"success": False,
"error": error
}
except Exception as e:
logger.error(f"Fehler bei der Lizenzaktivierung: {e}")
return {
"success": False,
"error": f"Fehler bei der Aktivierung: {str(e)}"
}
def deactivate_license(self) -> Dict[str, Any]:
"""
Deaktiviert die aktuelle Lizenz.
Returns:
Dictionary mit Erfolg und Nachricht
"""
if not self.is_licensed():
return {
"success": False,
"error": "Keine aktive Lizenz vorhanden"
}
logger.info("Deaktiviere Lizenz...")
# Session beenden
if self.session_manager.is_session_active():
self.session_manager.end_session()
# TODO: API Call für Deaktivierung wenn implementiert
# Lokale Daten löschen
self.clear_license_data()
logger.info("Lizenz deaktiviert")
return {
"success": True,
"message": "Lizenz erfolgreich deaktiviert"
}
def start_session(self) -> bool:
"""
Startet eine Session für die aktuelle Lizenz.
Returns:
True wenn erfolgreich, False sonst
"""
if not self.is_licensed():
logger.warning("Keine gültige Lizenz für Session-Start")
return False
result = self.session_manager.start_session(
license_key=self.license_data["key"],
activation_id=self.license_data.get("activation_id")
)
return result.get("success", False)
def resume_session(self) -> bool:
"""
Versucht eine gespeicherte Session fortzusetzen.
Returns:
True wenn erfolgreich, False sonst
"""
return self.session_manager.resume_session()
def has_feature(self, feature: str) -> bool:
"""
Prüft, ob ein Feature in der Lizenz enthalten ist.
Args:
feature: Name des Features
Returns:
True, wenn das Feature verfügbar ist, sonst False
"""
if not self.is_licensed():
return False
# "all" bedeutet alle Features verfügbar
if "all" in self.license_data.get("features", []):
return True
return feature in self.license_data.get("features", [])
def _extract_features(self, license_info: Dict[str, Any]) -> List[str]:
"""
Extrahiert Features basierend auf dem Lizenztyp.
Args:
license_info: Lizenzinformationen vom Server
Returns:
Liste der verfügbaren Features
"""
license_type = license_info.get("type", "basic")
# Feature-Mapping basierend auf Lizenztyp
feature_map = {
"basic": ["account_creation", "basic_export"],
"premium": ["account_creation", "basic_export", "multi_account",
"proxy_rotation", "advanced_export"],
"enterprise": ["all"]
}
return feature_map.get(license_type, ["account_creation"])
def get_status_text(self) -> str:
"""
Gibt einen lesbaren Status-Text zurück.
Returns:
Status-Text
"""
status = self.license_data.get("status", "inactive")
if status == "active":
if self.license_data.get("expiry_date"):
try:
expiry = datetime.fromisoformat(self.license_data["expiry_date"])
days_left = (expiry - datetime.now()).days
if days_left > 0:
return f"Aktiv (noch {days_left} Tage)"
else:
return "Abgelaufen"
except:
pass
return "Aktiv (unbegrenzt)"
elif status == "trial":
if self.license_data.get("expiry_date"):
try:
expiry = datetime.fromisoformat(self.license_data["expiry_date"])
days_left = (expiry - datetime.now()).days
if days_left > 0:
return f"Testversion (noch {days_left} Tage)"
else:
return "Testversion abgelaufen"
except:
pass
return "Testversion"
return self.license_data.get("status_text", "Keine Lizenz aktiviert")
# Test-Funktion
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== License Manager Test ===\n")
# License Manager erstellen
license_mgr = LicenseManager()
# Lizenz-Info anzeigen
print("Aktuelle Lizenz-Info:")
info = license_mgr.get_license_info()
print(f" Status: {info.get('status')}")
print(f" Status-Text: {license_mgr.get_status_text()}")
print(f" Lizenzschlüssel: {info.get('key', 'Keine')}")
print(f" Aktivierungs-ID: {info.get('activation_id', 'Keine')}")
# Session-Status
print(f"\nSession aktiv: {license_mgr.session_manager.is_session_active()}")
print("\n=== Test abgeschlossen ===")

304
licensing/license_validator.py Normale Datei
Datei anzeigen

@ -0,0 +1,304 @@
"""
Lizenzvalidator - Validiert Lizenzschlüssel und enthält Sicherheitsalgorithmen
"""
import os
import logging
import hashlib
import hmac
import base64
import json
import time
import random
import string
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple, Any, List
# Konfiguriere Logger
logger = logging.getLogger("license_validator")
class LicenseValidator:
"""
Validiert Lizenzschlüssel und führt kryptografische Operationen durch.
Enthält Platzhaltercode für die Lizenzvalidierung.
"""
# Sicherheitsschlüssel (würde in einer echten Implementierung nicht im Code stehen)
SECRET_KEY = "5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8"
def __init__(self):
"""Initialisiert den LicenseValidator."""
logger.info("Lizenzvalidator initialisiert")
def validate_key_format(self, license_key: str) -> bool:
"""
Prüft, ob der Lizenzschlüssel das richtige Format hat.
Args:
license_key: Der zu prüfende Lizenzschlüssel
Returns:
bool: True, wenn das Format gültig ist, False sonst
"""
# Einfacher Formatcheck für XXXXX-XXXXX-XXXXX-XXXXX
import re
return bool(re.match(r'^[A-Z0-9]{5}-[A-Z0-9]{5}-[A-Z0-9]{5}-[A-Z0-9]{5}$', license_key))
def validate_key_checksum(self, license_key: str) -> bool:
"""
Prüft, ob die Prüfsumme des Lizenzschlüssels gültig ist.
Args:
license_key: Der zu prüfende Lizenzschlüssel
Returns:
bool: True, wenn die Prüfsumme gültig ist, False sonst
"""
# Platzhalterimplementierung - in einer echten Implementierung würde hier
# eine Prüfsummenberechnung stehen
# Entferne Bindestriche für die Verarbeitung
key_parts = license_key.split('-')
if len(key_parts) != 4:
return False
# Einfacher Check: Letzter Buchstabe des ersten Teils ist abhängig von den ersten Buchstaben
# der anderen Teile (XOR der ASCII-Werte)
try:
check_char = key_parts[0][-1]
calculated_char = chr(ord(key_parts[1][0]) ^ ord(key_parts[2][0]) ^ ord(key_parts[3][0]))
# In einer echten Implementierung wäre diese Prüfung viel stärker
return check_char == calculated_char
except IndexError:
return False
except Exception as e:
logger.error(f"Fehler bei der Prüfsummenberechnung: {e}")
return False
def decrypt_license_data(self, license_key: str) -> Optional[Dict[str, Any]]:
"""
Entschlüsselt Lizenzinformationen aus dem Schlüssel.
Args:
license_key: Der zu entschlüsselnde Lizenzschlüssel
Returns:
Optional[Dict[str, Any]]: Entschlüsselte Lizenzdaten oder None bei Fehler
"""
# Platzhalterimplementierung - in einer echten Implementierung würde hier
# eine Entschlüsselung stehen
if not self.validate_key_format(license_key):
return None
# Mock-Daten generieren
key_parts = license_key.split('-')
# Aus dem Schlüssel Informationen "ableiten"
try:
# Verwende den ersten Teil für die Lizenzart
license_type_index = sum(ord(c) for c in key_parts[0]) % 3
license_types = ["basic", "premium", "enterprise"]
license_type = license_types[license_type_index]
# Verwende den zweiten Teil für die Gültigkeitsdauer
validity_months = (sum(ord(c) for c in key_parts[1]) % 12) + 1
# Verwende den dritten Teil für die Funktionen
features_count = (sum(ord(c) for c in key_parts[2]) % 5) + 1
all_features = ["multi_account", "proxy_rotation", "advanced_analytics",
"sms_verification", "captcha_solving", "phone_verification",
"export", "scheduling"]
features = all_features[:features_count]
# Generiere ein "verschlüsseltes" Token
token = hashlib.sha256(license_key.encode()).hexdigest()
# Aktuelle Zeit für Aktivierung
now = datetime.now()
activation_date = now.strftime("%Y-%m-%d %H:%M:%S")
expiry_date = (now + timedelta(days=30*validity_months)).strftime("%Y-%m-%d %H:%M:%S")
# Lizenzdaten zusammenstellen
license_data = {
"license_type": license_type,
"features": features,
"activation_date": activation_date,
"expiry_date": expiry_date,
"token": token
}
return license_data
except Exception as e:
logger.error(f"Fehler bei der Entschlüsselung des Lizenzschlüssels: {e}")
return None
def generate_license_key(self, license_type: str = "basic", validity_months: int = 12,
features: List[str] = None) -> str:
"""
Generiert einen Lizenzschlüssel.
Args:
license_type: Art der Lizenz ("basic", "premium", "enterprise")
validity_months: Gültigkeitsdauer in Monaten
features: Liste der Funktionen
Returns:
str: Generierter Lizenzschlüssel
"""
# Platzhalterimplementierung - in einer echten Implementierung würde hier
# eine sichere Schlüsselgenerierung stehen
# Verwende die Eingabeparameter als Seed für die Generierung
seed = f"{license_type}{validity_months}{','.join(features or [])}{time.time()}"
random.seed(hashlib.md5(seed.encode()).hexdigest())
# Generiere 4 Teile mit jeweils 5 Zeichen (Großbuchstaben und Zahlen)
chars = string.ascii_uppercase + string.digits
parts = []
for _ in range(4):
part = ''.join(random.choice(chars) for _ in range(5))
parts.append(part)
# Stelle sicher, dass der letzte Buchstabe des ersten Teils
# ein XOR der ersten Buchstaben der anderen Teile ist
# (für die einfache Prüfsumme)
calc_char = chr(ord(parts[1][0]) ^ ord(parts[2][0]) ^ ord(parts[3][0]))
parts[0] = parts[0][:-1] + calc_char
# Verbinde die Teile mit Bindestrichen
license_key = '-'.join(parts)
return license_key
def sign_data(self, data: str) -> str:
"""
Signiert Daten mit dem geheimen Schlüssel.
Args:
data: Zu signierende Daten
Returns:
str: Signatur
"""
return hmac.new(
self.SECRET_KEY.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
def verify_signature(self, data: str, signature: str) -> bool:
"""
Überprüft die Signatur von Daten.
Args:
data: Signierte Daten
signature: Zu überprüfende Signatur
Returns:
bool: True, wenn die Signatur gültig ist, False sonst
"""
expected_signature = self.sign_data(data)
return hmac.compare_digest(expected_signature, signature)
def encode_license_data(self, data: Dict[str, Any]) -> str:
"""
Kodiert Lizenzdaten zur sicheren Übertragung.
Args:
data: Zu kodierende Lizenzdaten
Returns:
str: Kodierte Lizenzdaten
"""
# Daten in JSON konvertieren
json_data = json.dumps(data, sort_keys=True)
# Signatur hinzufügen
signature = self.sign_data(json_data)
# Zusammen mit der Signatur kodieren
combined = f"{json_data}|{signature}"
encoded = base64.b64encode(combined.encode()).decode()
return encoded
def decode_license_data(self, encoded: str) -> Optional[Dict[str, Any]]:
"""
Dekodiert und überprüft kodierte Lizenzdaten.
Args:
encoded: Kodierte Lizenzdaten
Returns:
Optional[Dict[str, Any]]: Dekodierte Lizenzdaten oder None bei Fehler
"""
try:
# Dekodieren
decoded = base64.b64decode(encoded).decode()
# In Daten und Signatur aufteilen
json_data, signature = decoded.split('|', 1)
# Signatur überprüfen
if not self.verify_signature(json_data, signature):
logger.warning("Ungültige Signatur in lizenzierten Daten")
return None
# JSON parsen
data = json.loads(json_data)
return data
except Exception as e:
logger.error(f"Fehler beim Dekodieren der Lizenzdaten: {e}")
return None
# Beispielnutzung, wenn direkt ausgeführt
if __name__ == "__main__":
# Konfiguriere Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Beispiel für LicenseValidator
validator = LicenseValidator()
# Generiere einen Lizenzschlüssel
features = ["multi_account", "proxy_rotation", "advanced_analytics"]
key = validator.generate_license_key("premium", 12, features)
print(f"Generierter Lizenzschlüssel: {key}")
# Validiere den Schlüssel
is_valid_format = validator.validate_key_format(key)
is_valid_checksum = validator.validate_key_checksum(key)
print(f"Format gültig: {is_valid_format}")
print(f"Prüfsumme gültig: {is_valid_checksum}")
# Entschlüssele Lizenzdaten
license_data = validator.decrypt_license_data(key)
if license_data:
print("\nEntschlüsselte Lizenzdaten:")
for k, v in license_data.items():
print(f" {k}: {v}")
# Beispiel für Kodierung und Dekodierung
test_data = {
"name": "Test License",
"type": "premium",
"expires": "2026-01-01"
}
encoded = validator.encode_license_data(test_data)
print(f"\nKodierte Daten: {encoded}")
decoded = validator.decode_license_data(encoded)
if decoded:
print("\nDekodierte Daten:")
for k, v in decoded.items():
print(f" {k}: {v}")

440
licensing/session_manager.py Normale Datei
Datei anzeigen

@ -0,0 +1,440 @@
"""
Session Manager für die Lizenz-Session-Verwaltung mit Heartbeat.
"""
import threading
import time
import logging
import json
import os
import requests
from datetime import datetime
from typing import Optional, Dict, Any
from .api_client import LicenseAPIClient
from .hardware_fingerprint import HardwareFingerprint
logger = logging.getLogger("session_manager")
logger.setLevel(logging.DEBUG)
# Füge Console Handler hinzu falls noch nicht vorhanden
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
class SessionManager:
"""Verwaltet die Lizenz-Session und Heartbeat."""
SESSION_FILE = os.path.join("config", ".session_data")
HEARTBEAT_INTERVAL = 60 # Sekunden
def __init__(self, api_client: Optional[LicenseAPIClient] = None):
"""
Initialisiert den Session Manager.
Args:
api_client: Optional vorkonfigurierter API Client
"""
self.api_client = api_client or LicenseAPIClient()
self.hardware_fingerprint = HardwareFingerprint()
self.session_token: Optional[str] = None
self.license_key: Optional[str] = None
self.activation_id: Optional[int] = None
self.heartbeat_thread: Optional[threading.Thread] = None
self.stop_heartbeat = threading.Event()
self.is_active = False
# Lade Session-IP-Konfiguration
self._load_ip_config()
# Session-Daten laden falls vorhanden
self._load_session_data()
def _save_session_data(self) -> None:
"""Speichert die aktuelle Session-Daten."""
try:
os.makedirs("config", exist_ok=True)
session_data = {
"session_token": self.session_token,
"license_key": self.license_key,
"activation_id": self.activation_id,
"timestamp": datetime.now().isoformat()
}
with open(self.SESSION_FILE, 'w') as f:
json.dump(session_data, f)
logger.debug("Session-Daten gespeichert")
except Exception as e:
logger.error(f"Fehler beim Speichern der Session-Daten: {e}")
def _load_session_data(self) -> None:
"""Lädt gespeicherte Session-Daten."""
if os.path.exists(self.SESSION_FILE):
try:
with open(self.SESSION_FILE, 'r') as f:
data = json.load(f)
self.session_token = data.get("session_token")
self.license_key = data.get("license_key")
self.activation_id = data.get("activation_id")
logger.info("Session-Daten geladen")
except Exception as e:
logger.warning(f"Fehler beim Laden der Session-Daten: {e}")
def _clear_session_data(self) -> None:
"""Löscht die gespeicherten Session-Daten."""
try:
if os.path.exists(self.SESSION_FILE):
os.remove(self.SESSION_FILE)
logger.debug("Session-Daten gelöscht")
except Exception as e:
logger.error(f"Fehler beim Löschen der Session-Daten: {e}")
def start_session(self, license_key: str, activation_id: Optional[int] = None) -> Dict[str, Any]:
"""
Startet eine neue Session für die Lizenz.
Args:
license_key: Der Lizenzschlüssel
activation_id: Optional die Aktivierungs-ID
Returns:
Dictionary mit Session-Informationen oder Fehler
"""
if self.is_active:
logger.warning("Session läuft bereits")
return {
"success": False,
"error": "Session already active"
}
# Hardware-Info sammeln
hw_hash = self.hardware_fingerprint.get_or_create_fingerprint()
machine_name = self.hardware_fingerprint.get_machine_name()
# IP-Adresse ermitteln
client_ip = self._get_session_ip()
logger.info(f"Starte Session für Lizenz: {license_key[:4]}...")
logger.debug(f"Session-Parameter: machine_name={machine_name}, hw_hash={hw_hash[:8]}..., ip={client_ip}")
# Session-Start API Call mit IP-Adresse
result = self.api_client.start_session(
license_key=license_key,
machine_id=machine_name,
hardware_hash=hw_hash,
version="1.0.0", # TODO: Version aus config lesen
ip_address=client_ip # NEU: IP-Adresse hinzugefügt
)
logger.debug(f"Session-Start Response: {result}")
if result.get("success"):
data = result.get("data", {})
# Prüfe ob die Session wirklich erfolgreich war
if data.get("success") is False:
# Session wurde abgelehnt
error_msg = data.get("message", "Session start failed")
logger.error(f"Session abgelehnt: {error_msg}")
return {
"success": False,
"error": error_msg,
"code": "SESSION_REJECTED"
}
self.session_token = data.get("session_token")
self.license_key = license_key
self.activation_id = activation_id or data.get("activation_id")
self.is_active = True if self.session_token else False
# Session-Daten speichern
self._save_session_data()
# Heartbeat starten
self._start_heartbeat()
logger.info(f"Session erfolgreich gestartet: {self.session_token}")
# Update-Info prüfen
if data.get("update_available"):
logger.info(f"Update verfügbar: {data.get('latest_version')}")
return {
"success": True,
"session_token": self.session_token,
"update_info": {
"available": data.get("update_available", False),
"version": data.get("latest_version"),
"download_url": data.get("download_url")
}
}
else:
error = result.get("error", "Unknown error")
logger.error(f"Session-Start fehlgeschlagen: {error}")
# Bei Konflikt (409) bedeutet es, dass bereits eine Session läuft
if result.get("status") == 409:
return {
"success": False,
"error": "Another session is already active for this license",
"code": "SESSION_CONFLICT"
}
return {
"success": False,
"error": error,
"code": result.get("code", "SESSION_START_FAILED")
}
def _start_heartbeat(self) -> None:
"""Startet den Heartbeat-Thread."""
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
logger.warning("Heartbeat läuft bereits")
return
self.stop_heartbeat.clear()
self.heartbeat_thread = threading.Thread(
target=self._heartbeat_worker,
daemon=True,
name="LicenseHeartbeat"
)
self.heartbeat_thread.start()
logger.info("Heartbeat-Thread gestartet")
def _heartbeat_worker(self) -> None:
"""Worker-Funktion für den Heartbeat-Thread."""
logger.info(f"Heartbeat-Worker gestartet (Interval: {self.HEARTBEAT_INTERVAL}s)")
while not self.stop_heartbeat.is_set():
try:
# Warte das Interval oder bis Stop-Signal
if self.stop_heartbeat.wait(self.HEARTBEAT_INTERVAL):
break
# Sende Heartbeat
if self.session_token and self.license_key:
logger.debug("Sende Heartbeat...")
result = self.api_client.session_heartbeat(
session_token=self.session_token,
license_key=self.license_key
)
if result.get("success"):
logger.debug("Heartbeat erfolgreich")
else:
logger.error(f"Heartbeat fehlgeschlagen: {result.get('error')}")
# Bei bestimmten Fehlern Session beenden
if result.get("status") in [401, 404]:
logger.error("Session ungültig, beende...")
self.end_session()
break
else:
logger.warning("Keine Session-Daten für Heartbeat")
except Exception as e:
logger.error(f"Fehler im Heartbeat-Worker: {e}")
logger.info("Heartbeat-Worker beendet")
def end_session(self) -> Dict[str, Any]:
"""
Beendet die aktuelle Session.
Returns:
Dictionary mit Informationen über die beendete Session
"""
if not self.is_active:
logger.warning("Keine aktive Session zum Beenden")
return {
"success": False,
"error": "No active session"
}
logger.info("Beende Session...")
# Heartbeat stoppen
self.stop_heartbeat.set()
if self.heartbeat_thread:
self.heartbeat_thread.join(timeout=5)
# Session beenden API Call
result = {"success": True}
if self.session_token:
result = self.api_client.end_session(self.session_token)
if result.get("success"):
logger.info("Session erfolgreich beendet")
else:
logger.error(f"Fehler beim Beenden der Session: {result.get('error')}")
# Session-Daten löschen
self.session_token = None
self.license_key = None
self.activation_id = None
self.is_active = False
self._clear_session_data()
return result
def resume_session(self) -> bool:
"""
Versucht eine gespeicherte Session fortzusetzen.
Returns:
True wenn erfolgreich, False sonst
"""
if self.is_active:
logger.info("Session läuft bereits")
return True
if not self.session_token or not self.license_key:
logger.info("Keine gespeicherten Session-Daten vorhanden")
return False
logger.info("Versuche Session fortzusetzen...")
# Teste mit Heartbeat ob Session noch gültig ist
result = self.api_client.session_heartbeat(
session_token=self.session_token,
license_key=self.license_key
)
if result.get("success"):
logger.info("Session erfolgreich fortgesetzt")
self.is_active = True
self._start_heartbeat()
return True
else:
logger.warning("Gespeicherte Session ungültig")
self._clear_session_data()
return False
def is_session_active(self) -> bool:
"""
Prüft ob eine Session aktiv ist.
Returns:
True wenn aktiv, False sonst
"""
return self.is_active
def get_session_info(self) -> Dict[str, Any]:
"""
Gibt Informationen über die aktuelle Session zurück.
Returns:
Dictionary mit Session-Informationen
"""
return {
"active": self.is_active,
"session_token": self.session_token[:8] + "..." if self.session_token else None,
"license_key": self.license_key[:4] + "..." if self.license_key else None,
"activation_id": self.activation_id,
"heartbeat_interval": self.HEARTBEAT_INTERVAL
}
def set_heartbeat_interval(self, seconds: int) -> None:
"""
Setzt das Heartbeat-Interval.
Args:
seconds: Interval in Sekunden (min 30, max 300)
"""
if 30 <= seconds <= 300:
self.HEARTBEAT_INTERVAL = seconds
logger.info(f"Heartbeat-Interval auf {seconds}s gesetzt")
# Restart Heartbeat wenn aktiv
if self.is_active:
self.stop_heartbeat.set()
if self.heartbeat_thread:
self.heartbeat_thread.join(timeout=5)
self._start_heartbeat()
else:
logger.warning(f"Ungültiges Heartbeat-Interval: {seconds}")
def _load_ip_config(self) -> None:
"""Lädt die IP-Konfiguration aus license_config.json."""
config_path = os.path.join("config", "license_config.json")
self.session_ip_mode = "auto" # Default
self.ip_fallback = "0.0.0.0"
try:
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
self.session_ip_mode = config.get("session_ip_mode", "auto")
self.ip_fallback = config.get("ip_fallback", "0.0.0.0")
logger.debug(f"IP-Konfiguration geladen: mode={self.session_ip_mode}, fallback={self.ip_fallback}")
except Exception as e:
logger.warning(f"Fehler beim Laden der IP-Konfiguration: {e}")
def _get_session_ip(self) -> str:
"""
Ermittelt die IP-Adresse für die Session basierend auf der Konfiguration.
TESTBETRIEB: Temporäre Lösung - wird durch Server-Ressourcenmanagement ersetzt
Returns:
Die IP-Adresse als String
"""
if self.session_ip_mode == "auto":
# TESTBETRIEB: Auto-Erkennung der öffentlichen IP
logger.info("TESTBETRIEB: Ermittle öffentliche IP-Adresse automatisch")
try:
response = requests.get("https://api.ipify.org?format=json", timeout=5)
if response.status_code == 200:
ip = response.json().get("ip")
logger.info(f"Öffentliche IP ermittelt: {ip}")
return ip
else:
logger.warning(f"IP-Ermittlung fehlgeschlagen: Status {response.status_code}")
except Exception as e:
logger.error(f"Fehler bei IP-Ermittlung: {e}")
# Fallback verwenden
logger.warning(f"Verwende Fallback-IP: {self.ip_fallback}")
return self.ip_fallback
elif self.session_ip_mode == "server_assigned":
# TODO: Implementierung für Server-zugewiesene IPs
logger.info("Server-assigned IP mode noch nicht implementiert, verwende Fallback")
return self.ip_fallback
elif self.session_ip_mode == "proxy":
# TODO: Proxy-IP verwenden wenn Proxy aktiv
logger.info("Proxy IP mode noch nicht implementiert, verwende Fallback")
return self.ip_fallback
else:
logger.warning(f"Unbekannter IP-Modus: {self.session_ip_mode}, verwende Fallback")
return self.ip_fallback
# Test-Funktion
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== Session Manager Test ===\n")
# Session Manager erstellen
session_mgr = SessionManager()
# Session-Info anzeigen
print("Aktuelle Session-Info:")
info = session_mgr.get_session_info()
for key, value in info.items():
print(f" {key}: {value}")
# Versuche gespeicherte Session fortzusetzen
print("\nVersuche Session fortzusetzen...")
if session_mgr.resume_session():
print(" ✓ Session fortgesetzt")
else:
print(" ✗ Keine gültige Session gefunden")
print("\n=== Test abgeschlossen ===")