331 Zeilen
11 KiB
Python
331 Zeilen
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vollständiges Beispiel für die Integration des Lizenzservers in eine Python-Anwendung
|
|
"""
|
|
|
|
import requests
|
|
import hashlib
|
|
import platform
|
|
import uuid
|
|
import json
|
|
import os
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
class LicenseManager:
|
|
def __init__(self, api_key, app_version="1.0.0"):
|
|
self.api_key = api_key
|
|
self.app_version = app_version
|
|
self.server_url = "https://api-software-undso.z5m7q9dk3ah2v1plx6ju.com"
|
|
self.headers = {"Authorization": f"Bearer {api_key}"}
|
|
|
|
# Cache-Verzeichnis
|
|
self.cache_dir = Path.home() / ".myapp" / "license"
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
self.cache_file = self.cache_dir / "license.json"
|
|
|
|
# Lizenz-Status
|
|
self.license_key = None
|
|
self.activation_id = None
|
|
self.is_valid = False
|
|
self.expires_at = None
|
|
|
|
# Heartbeat Thread
|
|
self.heartbeat_thread = None
|
|
self.stop_heartbeat = False
|
|
|
|
def get_machine_id(self):
|
|
"""Eindeutige Maschinen-ID basierend auf MAC-Adresse"""
|
|
mac = uuid.getnode()
|
|
return f"MAC-{mac:012X}"
|
|
|
|
def get_hardware_hash(self):
|
|
"""Hardware-Fingerprint aus verschiedenen Systeminfos"""
|
|
components = [
|
|
self.get_machine_id(),
|
|
platform.processor(),
|
|
platform.system(),
|
|
platform.machine(),
|
|
platform.node()
|
|
]
|
|
|
|
combined = "-".join(components)
|
|
return hashlib.sha256(combined.encode()).hexdigest()
|
|
|
|
def save_license_cache(self):
|
|
"""Lizenzinfo lokal speichern für Offline-Betrieb"""
|
|
cache_data = {
|
|
"license_key": self.license_key,
|
|
"activation_id": self.activation_id,
|
|
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
|
|
"hardware_hash": self.get_hardware_hash(),
|
|
"last_verified": datetime.now().isoformat()
|
|
}
|
|
|
|
with open(self.cache_file, 'w') as f:
|
|
json.dump(cache_data, f)
|
|
|
|
def load_license_cache(self):
|
|
"""Gespeicherte Lizenz laden"""
|
|
if not self.cache_file.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(self.cache_file, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return None
|
|
|
|
def activate_license(self, license_key):
|
|
"""Neue Lizenz aktivieren"""
|
|
data = {
|
|
"license_key": license_key,
|
|
"machine_id": self.get_machine_id(),
|
|
"hardware_hash": self.get_hardware_hash(),
|
|
"os_info": {
|
|
"os": platform.system(),
|
|
"version": platform.version(),
|
|
"release": platform.release(),
|
|
"machine": platform.machine()
|
|
},
|
|
"app_version": self.app_version
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.server_url}/api/license/activate",
|
|
headers=self.headers,
|
|
json=data,
|
|
timeout=10,
|
|
verify=True
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if result.get("success"):
|
|
self.license_key = license_key
|
|
self.activation_id = result.get("activation_id")
|
|
self.is_valid = True
|
|
|
|
if result.get("expires_at"):
|
|
self.expires_at = datetime.fromisoformat(
|
|
result["expires_at"].replace("Z", "+00:00")
|
|
)
|
|
|
|
self.save_license_cache()
|
|
self.start_heartbeat()
|
|
|
|
return True, result.get("message", "Lizenz aktiviert")
|
|
else:
|
|
return False, result.get("message", "Aktivierung fehlgeschlagen")
|
|
else:
|
|
return False, f"Server-Fehler: {response.status_code}"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return False, f"Verbindungsfehler: {str(e)}"
|
|
|
|
def verify_license(self):
|
|
"""Lizenz verifizieren (Heartbeat)"""
|
|
if not self.license_key or not self.activation_id:
|
|
return False, "Keine aktive Lizenz"
|
|
|
|
data = {
|
|
"license_key": self.license_key,
|
|
"machine_id": self.get_machine_id(),
|
|
"hardware_hash": self.get_hardware_hash(),
|
|
"activation_id": self.activation_id
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.server_url}/api/license/verify",
|
|
headers=self.headers,
|
|
json=data,
|
|
timeout=10,
|
|
verify=True
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
self.is_valid = result.get("valid", False)
|
|
|
|
if self.is_valid:
|
|
self.save_license_cache()
|
|
|
|
# Update-Check
|
|
if result.get("requires_update"):
|
|
print(f"Update verfügbar: {result.get('update_url')}")
|
|
|
|
return self.is_valid, result.get("message", "")
|
|
else:
|
|
return False, f"Server-Fehler: {response.status_code}"
|
|
|
|
except requests.exceptions.RequestException:
|
|
# Offline-Modus: Cache prüfen
|
|
return self.verify_offline()
|
|
|
|
def verify_offline(self):
|
|
"""Offline-Verifizierung mit Grace Period"""
|
|
cache = self.load_license_cache()
|
|
if not cache:
|
|
return False, "Keine gecachte Lizenz vorhanden"
|
|
|
|
# Hardware-Hash prüfen
|
|
if cache.get("hardware_hash") != self.get_hardware_hash():
|
|
# Grace Period bei Hardware-Änderung
|
|
last_verified = datetime.fromisoformat(cache.get("last_verified"))
|
|
grace_period = timedelta(days=7)
|
|
|
|
if datetime.now() - last_verified > grace_period:
|
|
return False, "Hardware geändert - Grace Period abgelaufen"
|
|
|
|
# Ablaufdatum prüfen
|
|
if cache.get("expires_at"):
|
|
expires_at = datetime.fromisoformat(cache.get("expires_at"))
|
|
if datetime.now() > expires_at:
|
|
return False, "Lizenz abgelaufen"
|
|
|
|
self.license_key = cache.get("license_key")
|
|
self.activation_id = cache.get("activation_id")
|
|
self.is_valid = True
|
|
|
|
return True, "Offline-Modus (gecachte Lizenz)"
|
|
|
|
def check_for_updates(self):
|
|
"""Nach Updates suchen"""
|
|
if not self.license_key:
|
|
return None
|
|
|
|
data = {
|
|
"current_version": self.app_version,
|
|
"license_key": self.license_key
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.server_url}/api/version/check",
|
|
headers=self.headers,
|
|
json=data,
|
|
timeout=10,
|
|
verify=True
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
except:
|
|
pass
|
|
|
|
return None
|
|
|
|
def heartbeat_worker(self):
|
|
"""Background-Thread für regelmäßige Lizenzprüfung"""
|
|
while not self.stop_heartbeat:
|
|
time.sleep(900) # 15 Minuten
|
|
|
|
if self.stop_heartbeat:
|
|
break
|
|
|
|
valid, message = self.verify_license()
|
|
if not valid:
|
|
print(f"Lizenz-Warnung: {message}")
|
|
# Hier könnte die App reagieren (z.B. Features deaktivieren)
|
|
|
|
def start_heartbeat(self):
|
|
"""Heartbeat-Thread starten"""
|
|
if self.heartbeat_thread and self.heartbeat_thread.is_alive():
|
|
return
|
|
|
|
self.stop_heartbeat = False
|
|
self.heartbeat_thread = threading.Thread(
|
|
target=self.heartbeat_worker,
|
|
daemon=True
|
|
)
|
|
self.heartbeat_thread.start()
|
|
|
|
def stop_heartbeat_thread(self):
|
|
"""Heartbeat-Thread beenden"""
|
|
self.stop_heartbeat = True
|
|
if self.heartbeat_thread:
|
|
self.heartbeat_thread.join(timeout=1)
|
|
|
|
def get_license_info(self):
|
|
"""Aktuelle Lizenzinformationen"""
|
|
return {
|
|
"valid": self.is_valid,
|
|
"license_key": self.license_key[:4] + "****" if self.license_key else None,
|
|
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
|
|
"machine_id": self.get_machine_id()
|
|
}
|
|
|
|
|
|
# Beispiel-Anwendung
|
|
def main():
|
|
# API-Key sollte sicher gespeichert werden (z.B. verschlüsselt)
|
|
API_KEY = os.environ.get("LICENSE_API_KEY", "your-api-key-here")
|
|
|
|
# License Manager initialisieren
|
|
license_mgr = LicenseManager(API_KEY, app_version="1.0.0")
|
|
|
|
# Versuche gecachte Lizenz zu laden
|
|
cache = license_mgr.load_license_cache()
|
|
if cache:
|
|
print("Gecachte Lizenz gefunden, verifiziere...")
|
|
valid, message = license_mgr.verify_license()
|
|
|
|
if valid:
|
|
print(f"✓ Lizenz gültig: {message}")
|
|
else:
|
|
print(f"✗ Lizenz ungültig: {message}")
|
|
# Neue Lizenz erforderlich
|
|
license_key = input("Bitte Lizenzschlüssel eingeben: ")
|
|
success, message = license_mgr.activate_license(license_key)
|
|
|
|
if success:
|
|
print(f"✓ {message}")
|
|
else:
|
|
print(f"✗ {message}")
|
|
return
|
|
else:
|
|
# Erste Aktivierung
|
|
print("Keine Lizenz gefunden.")
|
|
license_key = input("Bitte Lizenzschlüssel eingeben: ")
|
|
success, message = license_mgr.activate_license(license_key)
|
|
|
|
if success:
|
|
print(f"✓ {message}")
|
|
else:
|
|
print(f"✗ {message}")
|
|
return
|
|
|
|
# Update-Check
|
|
print("\nPrüfe auf Updates...")
|
|
update_info = license_mgr.check_for_updates()
|
|
if update_info and update_info.get("update_available"):
|
|
print(f"Update verfügbar: {update_info.get('latest_version')}")
|
|
if update_info.get("is_mandatory"):
|
|
print("⚠️ Dies ist ein Pflicht-Update!")
|
|
|
|
# Lizenzinfo anzeigen
|
|
info = license_mgr.get_license_info()
|
|
print(f"\nLizenz-Status:")
|
|
print(f"- Gültig: {info['valid']}")
|
|
print(f"- Ablauf: {info['expires_at']}")
|
|
print(f"- Maschine: {info['machine_id']}")
|
|
|
|
# App läuft...
|
|
print("\n✓ Anwendung gestartet mit gültiger Lizenz")
|
|
|
|
try:
|
|
# Simuliere App-Laufzeit
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nBeende Anwendung...")
|
|
license_mgr.stop_heartbeat_thread()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |