Dieser Commit ist enthalten in:
2025-06-16 00:37:14 +02:00
Ursprung ff935204d5
Commit 262de2839e
26 geänderte Dateien mit 2128 neuen und 28 gelöschten Zeilen

Datei anzeigen

@@ -0,0 +1 @@
from . import license, version

Datei anzeigen

@@ -0,0 +1,209 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from typing import Dict, Any
from app.db.database import get_db
from app.models.models import License, Activation, Version
from app.schemas.license import (
LicenseActivationRequest,
LicenseActivationResponse,
LicenseVerificationRequest,
LicenseVerificationResponse
)
from app.core.security import get_api_key
from app.core.config import settings
router = APIRouter()
@router.post("/activate", response_model=LicenseActivationResponse)
async def activate_license(
request: LicenseActivationRequest,
db: Session = Depends(get_db),
api_key = Depends(get_api_key)
):
license = db.query(License).filter(
License.license_key == request.license_key,
License.is_active == True
).first()
if not license:
return LicenseActivationResponse(
success=False,
message="Invalid license key"
)
if license.expires_at and license.expires_at < datetime.utcnow():
return LicenseActivationResponse(
success=False,
message="License has expired"
)
existing_activations = db.query(Activation).filter(
Activation.license_id == license.id,
Activation.is_active == True
).all()
existing_machine = next(
(a for a in existing_activations if a.machine_id == request.machine_id),
None
)
if existing_machine:
if existing_machine.hardware_hash != request.hardware_hash:
return LicenseActivationResponse(
success=False,
message="Hardware mismatch for this machine"
)
existing_machine.last_heartbeat = datetime.utcnow()
existing_machine.app_version = request.app_version
existing_machine.os_info = request.os_info
db.commit()
return LicenseActivationResponse(
success=True,
message="License reactivated successfully",
activation_id=existing_machine.id,
expires_at=license.expires_at,
features={"all_features": True}
)
if len(existing_activations) >= license.max_activations:
return LicenseActivationResponse(
success=False,
message=f"Maximum activations ({license.max_activations}) reached"
)
new_activation = Activation(
license_id=license.id,
machine_id=request.machine_id,
hardware_hash=request.hardware_hash,
os_info=request.os_info,
app_version=request.app_version
)
db.add(new_activation)
db.commit()
db.refresh(new_activation)
return LicenseActivationResponse(
success=True,
message="License activated successfully",
activation_id=new_activation.id,
expires_at=license.expires_at,
features={"all_features": True}
)
@router.post("/verify", response_model=LicenseVerificationResponse)
async def verify_license(
request: LicenseVerificationRequest,
db: Session = Depends(get_db),
api_key = Depends(get_api_key)
):
activation = db.query(Activation).filter(
Activation.id == request.activation_id,
Activation.machine_id == request.machine_id,
Activation.is_active == True
).first()
if not activation:
return LicenseVerificationResponse(
valid=False,
message="Invalid activation"
)
license = activation.license
if not license.is_active:
return LicenseVerificationResponse(
valid=False,
message="License is no longer active"
)
if license.license_key != request.license_key:
return LicenseVerificationResponse(
valid=False,
message="License key mismatch"
)
if activation.hardware_hash != request.hardware_hash:
grace_period = datetime.utcnow() - timedelta(days=settings.OFFLINE_GRACE_PERIOD_DAYS)
if activation.last_heartbeat < grace_period:
return LicenseVerificationResponse(
valid=False,
message="Hardware mismatch and grace period expired"
)
else:
return LicenseVerificationResponse(
valid=True,
message="Hardware mismatch but within grace period",
expires_at=license.expires_at,
features={"all_features": True}
)
if license.expires_at and license.expires_at < datetime.utcnow():
return LicenseVerificationResponse(
valid=False,
message="License has expired"
)
activation.last_heartbeat = datetime.utcnow()
db.commit()
latest_version = db.query(Version).order_by(Version.release_date.desc()).first()
requires_update = False
update_url = None
if latest_version and activation.app_version:
if latest_version.version_number > activation.app_version:
requires_update = True
update_url = latest_version.download_url
return LicenseVerificationResponse(
valid=True,
message="License is valid",
expires_at=license.expires_at,
features={"all_features": True},
requires_update=requires_update,
update_url=update_url
)
@router.get("/info/{license_key}")
async def get_license_info(
license_key: str,
db: Session = Depends(get_db),
api_key = Depends(get_api_key)
):
license = db.query(License).filter(
License.license_key == license_key
).first()
if not license:
raise HTTPException(status_code=404, detail="License not found")
activations = db.query(Activation).filter(
Activation.license_id == license.id,
Activation.is_active == True
).all()
return {
"license_key": license.license_key,
"product_id": license.product_id,
"customer_email": license.customer_email,
"customer_name": license.customer_name,
"is_active": license.is_active,
"expires_at": license.expires_at,
"max_activations": license.max_activations,
"current_activations": len(activations),
"activations": [
{
"id": a.id,
"machine_id": a.machine_id,
"activation_date": a.activation_date,
"last_heartbeat": a.last_heartbeat,
"app_version": a.app_version
}
for a in activations
]
}

Datei anzeigen

@@ -0,0 +1,84 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from packaging import version
from app.db.database import get_db
from app.models.models import Version, License
from app.schemas.license import VersionCheckRequest, VersionCheckResponse
from app.core.security import get_api_key
router = APIRouter()
@router.post("/check", response_model=VersionCheckResponse)
async def check_version(
request: VersionCheckRequest,
db: Session = Depends(get_db),
api_key = Depends(get_api_key)
):
license = db.query(License).filter(
License.license_key == request.license_key,
License.is_active == True
).first()
if not license:
return VersionCheckResponse(
latest_version=request.current_version,
current_version=request.current_version,
update_available=False,
is_mandatory=False
)
latest_version = db.query(Version).order_by(Version.release_date.desc()).first()
if not latest_version:
return VersionCheckResponse(
latest_version=request.current_version,
current_version=request.current_version,
update_available=False,
is_mandatory=False
)
current_ver = version.parse(request.current_version)
latest_ver = version.parse(latest_version.version_number)
update_available = latest_ver > current_ver
is_mandatory = False
if update_available and latest_version.is_mandatory:
if latest_version.min_version:
min_ver = version.parse(latest_version.min_version)
is_mandatory = current_ver < min_ver
else:
is_mandatory = True
return VersionCheckResponse(
latest_version=latest_version.version_number,
current_version=request.current_version,
update_available=update_available,
is_mandatory=is_mandatory,
download_url=latest_version.download_url if update_available else None,
release_notes=latest_version.release_notes if update_available else None
)
@router.get("/latest")
async def get_latest_version(
db: Session = Depends(get_db),
api_key = Depends(get_api_key)
):
latest_version = db.query(Version).order_by(Version.release_date.desc()).first()
if not latest_version:
return {
"version": "1.0.0",
"release_date": None,
"release_notes": "Initial release"
}
return {
"version": latest_version.version_number,
"release_date": latest_version.release_date,
"is_mandatory": latest_version.is_mandatory,
"min_version": latest_version.min_version,
"download_url": latest_version.download_url,
"release_notes": latest_version.release_notes
}

Datei anzeigen

@@ -0,0 +1,32 @@
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
PROJECT_NAME: str = "License Server"
VERSION: str = "1.0.0"
API_PREFIX: str = "/api"
SECRET_KEY: str = "your-secret-key-change-this-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
DATABASE_URL: str = "postgresql://license_user:license_password@db:5432/license_db"
REDIS_URL: str = "redis://redis:6379"
ALLOWED_ORIGINS: List[str] = [
"https://api-software-undso.z5m7q9dk3ah2v1plx6ju.com",
"https://admin-panel-undso.z5m7q9dk3ah2v1plx6ju.com"
]
DEBUG: bool = False
MAX_ACTIVATIONS_PER_LICENSE: int = 5
HEARTBEAT_INTERVAL_MINUTES: int = 15
OFFLINE_GRACE_PERIOD_DAYS: int = 7
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()

Datei anzeigen

@@ -0,0 +1,52 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.models import ApiKey
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=403, detail="Invalid token")
def verify_api_key(api_key: str, db: Session):
key = db.query(ApiKey).filter(
ApiKey.key == api_key,
ApiKey.is_active == True
).first()
if not key:
raise HTTPException(status_code=401, detail="Invalid API key")
key.last_used = datetime.utcnow()
db.commit()
return key
def get_api_key(
credentials: HTTPAuthorizationCredentials = Security(security),
db: Session = Depends(get_db)
):
api_key = credentials.credentials
return verify_api_key(api_key, db)

Datei anzeigen

@@ -0,0 +1,16 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

65
v2_lizenzserver/app/main.py Normale Datei
Datei anzeigen

@@ -0,0 +1,65 @@
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
import logging
from datetime import datetime
from app.api import license, version
from app.core.config import settings
from app.db.database import engine, Base
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Base.metadata.create_all(bind=engine)
app = FastAPI(
title="License Server API",
description="API for software license management",
version="1.0.0",
docs_url="/docs" if settings.DEBUG else None,
redoc_url="/redoc" if settings.DEBUG else None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Global exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
@app.get("/")
async def root():
return {
"status": "online",
"service": "License Server",
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}
app.include_router(license.router, prefix="/api/license", tags=["license"])
app.include_router(version.router, prefix="/api/version", tags=["version"])
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8443,
reload=settings.DEBUG
)

Datei anzeigen

@@ -0,0 +1 @@
from .models import License, Activation, Version, ApiKey

Datei anzeigen

@@ -0,0 +1,65 @@
from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.db.database import Base
import uuid
class License(Base):
__tablename__ = "licenses"
id = Column(Integer, primary_key=True, index=True)
license_key = Column(String, unique=True, index=True, default=lambda: str(uuid.uuid4()))
product_id = Column(String, nullable=False)
customer_email = Column(String, nullable=False)
customer_name = Column(String)
max_activations = Column(Integer, default=1)
is_active = Column(Boolean, default=True)
expires_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
activations = relationship("Activation", back_populates="license")
class Activation(Base):
__tablename__ = "activations"
id = Column(Integer, primary_key=True, index=True)
license_id = Column(Integer, ForeignKey("licenses.id"))
machine_id = Column(String, nullable=False)
hardware_hash = Column(String, nullable=False)
activation_date = Column(DateTime, server_default=func.now())
last_heartbeat = Column(DateTime, server_default=func.now())
is_active = Column(Boolean, default=True)
os_info = Column(JSON)
app_version = Column(String)
license = relationship("License", back_populates="activations")
class Version(Base):
__tablename__ = "versions"
id = Column(Integer, primary_key=True, index=True)
version_number = Column(String, unique=True, nullable=False)
release_date = Column(DateTime, server_default=func.now())
is_mandatory = Column(Boolean, default=False)
min_version = Column(String)
download_url = Column(String)
release_notes = Column(Text)
created_at = Column(DateTime, server_default=func.now())
class ApiKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String, unique=True, index=True, default=lambda: str(uuid.uuid4()))
name = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
last_used = Column(DateTime)

Datei anzeigen

@@ -0,0 +1,8 @@
from .license import (
LicenseActivationRequest,
LicenseActivationResponse,
LicenseVerificationRequest,
LicenseVerificationResponse,
VersionCheckRequest,
VersionCheckResponse
)

Datei anzeigen

@@ -0,0 +1,43 @@
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional, Dict, Any
class LicenseActivationRequest(BaseModel):
license_key: str
machine_id: str
hardware_hash: str
os_info: Optional[Dict[str, Any]] = None
app_version: Optional[str] = None
class LicenseActivationResponse(BaseModel):
success: bool
message: str
activation_id: Optional[int] = None
expires_at: Optional[datetime] = None
features: Optional[Dict[str, Any]] = None
class LicenseVerificationRequest(BaseModel):
license_key: str
machine_id: str
hardware_hash: str
activation_id: int
class LicenseVerificationResponse(BaseModel):
valid: bool
message: str
expires_at: Optional[datetime] = None
features: Optional[Dict[str, Any]] = None
requires_update: bool = False
update_url: Optional[str] = None
class VersionCheckRequest(BaseModel):
current_version: str
license_key: str
class VersionCheckResponse(BaseModel):
latest_version: str
current_version: str
update_available: bool
is_mandatory: bool
download_url: Optional[str] = None
release_notes: Optional[str] = None