import os import sys from flask import Flask, request, jsonify from flask_cors import CORS import logging from functools import wraps from marshmallow import Schema, fields, ValidationError from datetime import datetime, timedelta import secrets # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from config import get_config from repositories.license_repo import LicenseRepository from repositories.cache_repo import CacheRepository from events.event_bus import EventBus, Event, EventTypes from models import EventType, AnomalyType, Severity # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__) config = get_config() app.config.from_object(config) CORS(app) # Initialize dependencies license_repo = LicenseRepository(config.DATABASE_URL) cache_repo = CacheRepository(config.REDIS_URL) event_bus = EventBus(config.RABBITMQ_URL) # Validation schemas class CreateLicenseSchema(Schema): customer_id = fields.Str(required=True) max_devices = fields.Int(missing=1, validate=lambda x: x > 0) expires_in_days = fields.Int(allow_none=True) features = fields.List(fields.Str(), missing=[]) is_test = fields.Bool(missing=False) metadata = fields.Dict(missing={}) class UpdateLicenseSchema(Schema): max_devices = fields.Int(validate=lambda x: x > 0) is_active = fields.Bool() expires_at = fields.DateTime() features = fields.List(fields.Str()) metadata = fields.Dict() class DeactivateDeviceSchema(Schema): hardware_id = fields.Str(required=True) reason = fields.Str() class TransferLicenseSchema(Schema): from_hardware_id = fields.Str(required=True) to_hardware_id = fields.Str(required=True) class SearchLicensesSchema(Schema): customer_id = fields.Str() is_active = fields.Bool() is_test = fields.Bool() created_after = fields.DateTime() created_before = fields.DateTime() expires_after = fields.DateTime() expires_before = fields.DateTime() page = fields.Int(missing=1, validate=lambda x: x > 0) per_page = fields.Int(missing=50, validate=lambda x: 0 < x <= 100) def require_admin_auth(f): """Decorator to require admin authentication""" @wraps(f) def decorated_function(*args, **kwargs): # Check for admin API key api_key = request.headers.get('X-Admin-API-Key') if not api_key: return jsonify({"error": "Missing admin API key"}), 401 # In production, validate against database # For now, check environment variable if api_key != os.getenv('ADMIN_API_KEY', 'admin-key-change-in-production'): return jsonify({"error": "Invalid admin API key"}), 401 return f(*args, **kwargs) return decorated_function @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ "status": "healthy", "service": "admin-api", "timestamp": datetime.utcnow().isoformat() }) @app.route('/api/v1/admin/licenses', methods=['POST']) @require_admin_auth def create_license(): """Create new license""" schema = CreateLicenseSchema() try: data = schema.load(request.get_json()) except ValidationError as e: return jsonify({"error": "Invalid request", "details": e.messages}), 400 # Generate license key license_key = f"LIC-{secrets.token_urlsafe(16).upper()}" # Calculate expiration expires_at = None if data.get('expires_in_days'): expires_at = datetime.utcnow() + timedelta(days=data['expires_in_days']) # Create license in database query = """ INSERT INTO licenses (license_key, customer_id, max_devices, is_active, is_test, expires_at, features, metadata) VALUES (%s, %s, %s, true, %s, %s, %s, %s) RETURNING id """ import json license_id = license_repo.execute_insert(query, ( license_key, data['customer_id'], data['max_devices'], data['is_test'], expires_at, json.dumps(data['features']), json.dumps(data['metadata']) )) if not license_id: return jsonify({"error": "Failed to create license"}), 500 # Publish event event_bus.publish(Event( EventTypes.LICENSE_CREATED, { "license_id": license_id, "customer_id": data['customer_id'], "license_key": license_key }, "admin-api" )) return jsonify({ "id": license_id, "license_key": license_key, "customer_id": data['customer_id'], "max_devices": data['max_devices'], "is_test": data['is_test'], "expires_at": expires_at.isoformat() if expires_at else None, "features": data['features'] }), 201 @app.route('/api/v1/admin/licenses/', methods=['GET']) @require_admin_auth def get_license(license_id): """Get license details""" license = license_repo.get_license_by_id(license_id) if not license: return jsonify({"error": "License not found"}), 404 # Get additional statistics active_devices = license_repo.get_active_devices(license_id) usage_stats = license_repo.get_license_usage_stats(license_id) recent_events = license_repo.get_recent_activations(license_id) # Format response license['active_devices'] = active_devices license['usage_stats'] = usage_stats license['recent_events'] = recent_events return jsonify(license) @app.route('/api/v1/admin/licenses/', methods=['PATCH']) @require_admin_auth def update_license(license_id): """Update license""" schema = UpdateLicenseSchema() try: data = schema.load(request.get_json()) except ValidationError as e: return jsonify({"error": "Invalid request", "details": e.messages}), 400 # Build update query dynamically updates = [] params = [] if 'max_devices' in data: updates.append("max_devices = %s") params.append(data['max_devices']) if 'is_active' in data: updates.append("is_active = %s") params.append(data['is_active']) if 'expires_at' in data: updates.append("expires_at = %s") params.append(data['expires_at']) if 'features' in data: updates.append("features = %s") params.append(json.dumps(data['features'])) if 'metadata' in data: updates.append("metadata = %s") params.append(json.dumps(data['metadata'])) if not updates: return jsonify({"error": "No fields to update"}), 400 # Add updated_at updates.append("updated_at = NOW()") # Add license_id to params params.append(license_id) query = f""" UPDATE licenses SET {', '.join(updates)} WHERE id = %s RETURNING * """ result = license_repo.execute_one(query, params) if not result: return jsonify({"error": "License not found"}), 404 # Invalidate cache cache_repo.invalidate_license_cache(license_id) # Publish event event_bus.publish(Event( EventTypes.LICENSE_UPDATED, { "license_id": license_id, "changes": list(data.keys()) }, "admin-api" )) return jsonify(result) @app.route('/api/v1/admin/licenses/', methods=['DELETE']) @require_admin_auth def delete_license(license_id): """Soft delete license (deactivate)""" query = """ UPDATE licenses SET is_active = false, updated_at = NOW() WHERE id = %s RETURNING id """ result = license_repo.execute_one(query, (license_id,)) if not result: return jsonify({"error": "License not found"}), 404 # Invalidate cache cache_repo.invalidate_license_cache(license_id) # Publish event event_bus.publish(Event( EventTypes.LICENSE_DEACTIVATED, {"license_id": license_id}, "admin-api" )) return jsonify({"success": True, "message": "License deactivated"}) @app.route('/api/v1/admin/licenses//devices', methods=['GET']) @require_admin_auth def get_license_devices(license_id): """Get all devices for a license""" # Get active devices active_devices = license_repo.get_active_devices(license_id) # Get all registered devices from activation events query = """ SELECT DISTINCT ON (hardware_id) hardware_id, event_type, ip_address, user_agent, created_at as registered_at, metadata FROM activation_events WHERE license_id = %s AND event_type IN ('activation', 'reactivation', 'transfer') AND success = true ORDER BY hardware_id, created_at DESC """ all_devices = license_repo.execute_query(query, (license_id,)) # Mark active devices active_hw_ids = {d['hardware_id'] for d in active_devices} for device in all_devices: device['is_active'] = device['hardware_id'] in active_hw_ids if device['is_active']: # Add last_seen from active_devices active_device = next((d for d in active_devices if d['hardware_id'] == device['hardware_id']), None) if active_device: device['last_seen'] = active_device['last_seen'] return jsonify({ "license_id": license_id, "total_devices": len(all_devices), "active_devices": len(active_devices), "devices": all_devices }) @app.route('/api/v1/admin/licenses//devices/deactivate', methods=['POST']) @require_admin_auth def deactivate_device(license_id): """Deactivate a device""" schema = DeactivateDeviceSchema() try: data = schema.load(request.get_json()) except ValidationError as e: return jsonify({"error": "Invalid request", "details": e.messages}), 400 success = license_repo.deactivate_device(license_id, data['hardware_id']) if not success: return jsonify({"error": "Failed to deactivate device"}), 500 # Invalidate cache cache_repo.invalidate_license_cache(license_id) # Publish event event_bus.publish(Event( EventTypes.DEVICE_DEACTIVATED, { "license_id": license_id, "hardware_id": data['hardware_id'], "reason": data.get('reason', 'Admin action') }, "admin-api" )) return jsonify({"success": True, "message": "Device deactivated"}) @app.route('/api/v1/admin/licenses//transfer', methods=['POST']) @require_admin_auth def transfer_license(license_id): """Transfer license between devices""" schema = TransferLicenseSchema() try: data = schema.load(request.get_json()) except ValidationError as e: return jsonify({"error": "Invalid request", "details": e.messages}), 400 # Get client IP ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) success = license_repo.transfer_license( license_id, data['from_hardware_id'], data['to_hardware_id'], ip_address ) if not success: return jsonify({"error": "Failed to transfer license"}), 500 # Invalidate cache cache_repo.invalidate_license_cache(license_id) # Publish event event_bus.publish(Event( EventTypes.LICENSE_TRANSFERRED, { "license_id": license_id, "from_hardware_id": data['from_hardware_id'], "to_hardware_id": data['to_hardware_id'] }, "admin-api" )) return jsonify({"success": True, "message": "License transferred successfully"}) @app.route('/api/v1/admin/licenses', methods=['GET']) @require_admin_auth def search_licenses(): """Search and list licenses""" schema = SearchLicensesSchema() try: filters = schema.load(request.args) except ValidationError as e: return jsonify({"error": "Invalid request", "details": e.messages}), 400 # Build query where_clauses = [] params = [] if filters.get('customer_id'): where_clauses.append("customer_id = %s") params.append(filters['customer_id']) if 'is_active' in filters: where_clauses.append("is_active = %s") params.append(filters['is_active']) if 'is_test' in filters: where_clauses.append("is_test = %s") params.append(filters['is_test']) if filters.get('created_after'): where_clauses.append("created_at >= %s") params.append(filters['created_after']) if filters.get('created_before'): where_clauses.append("created_at <= %s") params.append(filters['created_before']) if filters.get('expires_after'): where_clauses.append("expires_at >= %s") params.append(filters['expires_after']) if filters.get('expires_before'): where_clauses.append("expires_at <= %s") params.append(filters['expires_before']) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Count total count_query = f"SELECT COUNT(*) as total FROM licenses WHERE {where_sql}" total_result = license_repo.execute_one(count_query, params) total = total_result['total'] if total_result else 0 # Get paginated results page = filters['page'] per_page = filters['per_page'] offset = (page - 1) * per_page query = f""" SELECT l.*, c.name as customer_name, c.email as customer_email FROM licenses l JOIN customers c ON l.customer_id = c.id WHERE {where_sql} ORDER BY l.created_at DESC LIMIT %s OFFSET %s """ params.extend([per_page, offset]) licenses = license_repo.execute_query(query, params) return jsonify({ "licenses": licenses, "pagination": { "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page } }) @app.route('/api/v1/admin/licenses//events', methods=['GET']) @require_admin_auth def get_license_events(license_id): """Get all events for a license""" hours = request.args.get('hours', 24, type=int) events = license_repo.get_recent_activations(license_id, hours) return jsonify({ "license_id": license_id, "hours": hours, "total_events": len(events), "events": events }) @app.route('/api/v1/admin/licenses//usage', methods=['GET']) @require_admin_auth def get_license_usage(license_id): """Get usage statistics for a license""" days = request.args.get('days', 30, type=int) stats = license_repo.get_license_usage_stats(license_id, days) # Get daily breakdown query = """ SELECT DATE(timestamp) as date, COUNT(*) as validations, COUNT(DISTINCT hardware_id) as unique_devices, COUNT(DISTINCT ip_address) as unique_ips FROM license_heartbeats WHERE license_id = %s AND timestamp > NOW() - INTERVAL '%s days' GROUP BY DATE(timestamp) ORDER BY date DESC """ daily_stats = license_repo.execute_query(query, (license_id, days)) return jsonify({ "license_id": license_id, "days": days, "summary": stats, "daily": daily_stats }) @app.route('/api/v1/admin/licenses//anomalies', methods=['GET']) @require_admin_auth def get_license_anomalies(license_id): """Get detected anomalies for a license""" query = """ SELECT * FROM anomaly_detections WHERE license_id = %s ORDER BY detected_at DESC LIMIT 100 """ anomalies = license_repo.execute_query(query, (license_id,)) return jsonify({ "license_id": license_id, "total_anomalies": len(anomalies), "anomalies": anomalies }) @app.route('/api/v1/admin/licenses//anomalies//resolve', methods=['POST']) @require_admin_auth def resolve_anomaly(license_id, anomaly_id): """Mark anomaly as resolved""" data = request.get_json() or {} action_taken = data.get('action_taken', 'Resolved by admin') query = """ UPDATE anomaly_detections SET resolved = true, resolved_at = NOW(), resolved_by = 'admin', action_taken = %s WHERE id = %s AND license_id = %s RETURNING id """ result = license_repo.execute_one(query, (action_taken, anomaly_id, license_id)) if not result: return jsonify({"error": "Anomaly not found"}), 404 return jsonify({"success": True, "message": "Anomaly resolved"}) @app.route('/api/v1/admin/licenses/bulk-create', methods=['POST']) @require_admin_auth def bulk_create_licenses(): """Create multiple licenses at once""" data = request.get_json() if not data or 'licenses' not in data: return jsonify({"error": "Missing licenses array"}), 400 schema = CreateLicenseSchema() created_licenses = [] errors = [] for idx, license_data in enumerate(data['licenses']): try: validated_data = schema.load(license_data) # Generate license key license_key = f"LIC-{secrets.token_urlsafe(16).upper()}" # Calculate expiration expires_at = None if validated_data.get('expires_in_days'): expires_at = datetime.utcnow() + timedelta(days=validated_data['expires_in_days']) # Create license query = """ INSERT INTO licenses (license_key, customer_id, max_devices, is_active, is_test, expires_at, features, metadata) VALUES (%s, %s, %s, true, %s, %s, %s, %s) RETURNING id """ import json license_id = license_repo.execute_insert(query, ( license_key, validated_data['customer_id'], validated_data['max_devices'], validated_data['is_test'], expires_at, json.dumps(validated_data['features']), json.dumps(validated_data['metadata']) )) if license_id: created_licenses.append({ "id": license_id, "license_key": license_key, "customer_id": validated_data['customer_id'] }) except Exception as e: errors.append({ "index": idx, "error": str(e) }) return jsonify({ "created": len(created_licenses), "failed": len(errors), "licenses": created_licenses, "errors": errors }), 201 if created_licenses else 400 @app.route('/api/v1/admin/statistics', methods=['GET']) @require_admin_auth def get_statistics(): """Get overall license statistics""" query = """ WITH stats AS ( SELECT COUNT(*) as total_licenses, COUNT(*) FILTER (WHERE is_active = true) as active_licenses, COUNT(*) FILTER (WHERE is_test = true) as test_licenses, COUNT(*) FILTER (WHERE expires_at < NOW()) as expired_licenses, COUNT(DISTINCT customer_id) as total_customers FROM licenses ), device_stats AS ( SELECT COUNT(DISTINCT hardware_id) as total_devices FROM license_heartbeats WHERE timestamp > NOW() - INTERVAL '15 minutes' ), validation_stats AS ( SELECT COUNT(*) as validations_today, COUNT(DISTINCT license_id) as licenses_used_today FROM license_heartbeats WHERE timestamp > CURRENT_DATE ) SELECT * FROM stats, device_stats, validation_stats """ stats = license_repo.execute_one(query) return jsonify(stats or {}) @app.errorhandler(404) def not_found(error): return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"Internal error: {error}") return jsonify({"error": "Internal server error"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5004, debug=True)