Files
Claude Project Manager 0d7d888502 Initial commit
2025-07-05 17:51:16 +02:00

52 Zeilen
1.7 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.models import ApiKey
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=403, detail="Invalid token")
def verify_api_key(api_key: str, db: Session):
key = db.query(ApiKey).filter(
ApiKey.key == api_key,
ApiKey.is_active == True
).first()
if not key:
raise HTTPException(status_code=401, detail="Invalid API key")
key.last_used = datetime.utcnow()
db.commit()
return key
def get_api_key(
credentials: HTTPAuthorizationCredentials = Security(security),
db: Session = Depends(get_db)
):
api_key = credentials.credentials
return verify_api_key(api_key, db)