from flask import Flask, jsonify, request from flask_cors import CORS from datetime import datetime, timedelta import os import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import SimpleConnectionPool import redis import json import logging from functools import wraps import jwt import uuid from typing import List, Dict, Optional import bcrypt from prometheus_flask_exporter import PrometheusMetrics logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Initialize Prometheus metrics metrics = PrometheusMetrics(app) metrics.info('admin_api_service_info', 'Admin API Service Information', version='1.0.0') # Configuration DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql://postgres:postgres@postgres:5432/v2_adminpanel') REDIS_URL = os.environ.get('REDIS_URL', 'redis://redis:6379/3') JWT_SECRET = os.environ.get('JWT_SECRET', 'your-secret-key') SERVICE_PORT = 5004 # Database connection pool db_pool = SimpleConnectionPool(1, 20, DATABASE_URL) # Redis client redis_client = redis.from_url(REDIS_URL, decode_responses=True) # JWT validation decorator with admin check def require_admin_auth(f): @wraps(f) def wrapper(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid authorization header'}), 401 token = auth_header.split(' ')[1] try: payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) # Check if user has admin privileges if payload.get('type') not in ['admin_access', 'analytics_access']: return jsonify({'error': 'Insufficient privileges'}), 403 request.jwt_payload = payload except jwt.ExpiredSignatureError: return jsonify({'error': 'Token expired'}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token'}), 401 return f(*args, **kwargs) return wrapper # Database query helpers def execute_query(query, params=None, fetchall=True): conn = db_pool.getconn() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(query, params) if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')): conn.commit() return cur.rowcount if fetchall: return cur.fetchall() return cur.fetchone() finally: db_pool.putconn(conn) def execute_batch(query, data): conn = db_pool.getconn() try: with conn.cursor() as cur: cur.executemany(query, data) conn.commit() return cur.rowcount finally: db_pool.putconn(conn) # Audit logging def log_admin_action(action: str, entity_type: str, entity_id: str, details: Dict, user_id: str = None): """Log admin actions to audit trail""" query = """ INSERT INTO audit_log (username, action, timestamp, ip_address, additional_info) VALUES (%s, %s, %s, %s, %s) """ username = user_id or request.jwt_payload.get('sub', 'system') ip_address = request.headers.get('X-Real-IP', request.remote_addr) additional_info = json.dumps({ 'entity_type': entity_type, 'entity_id': entity_id, 'details': details }) execute_query(query, [username, action, datetime.utcnow(), ip_address, additional_info]) # API Routes @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'service': 'admin-api-service', 'timestamp': datetime.utcnow().isoformat() }) # License Management @app.route('/api/v1/admin/licenses', methods=['GET']) @require_admin_auth def list_licenses(): """List all licenses with filtering and pagination""" page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) customer_id = request.args.get('customer_id') is_active = request.args.get('is_active') license_type = request.args.get('license_type') offset = (page - 1) * per_page # Build query with filters query = """ SELECT l.*, c.name as customer_name, c.email as customer_email, COUNT(DISTINCT lh.hardware_id) as active_devices, MAX(lh.timestamp) as last_activity FROM licenses l LEFT JOIN customers c ON l.customer_id = c.id LEFT JOIN license_heartbeats lh ON l.id = lh.license_id AND lh.timestamp > NOW() - INTERVAL '24 hours' WHERE 1=1 """ params = [] if customer_id: query += " AND l.customer_id = %s" params.append(customer_id) if is_active is not None: query += " AND l.is_active = %s" params.append(is_active == 'true') if license_type: query += " AND l.license_type = %s" params.append(license_type) query += """ GROUP BY l.id, c.name, c.email ORDER BY l.created_at DESC LIMIT %s OFFSET %s """ params.extend([per_page, offset]) licenses = execute_query(query, params) # Get total count count_query = "SELECT COUNT(*) as total FROM licenses WHERE 1=1" count_params = [] if customer_id: count_query += " AND customer_id = %s" count_params.append(customer_id) if is_active is not None: count_query += " AND is_active = %s" count_params.append(is_active == 'true') if license_type: count_query += " AND license_type = %s" count_params.append(license_type) total = execute_query(count_query, count_params, fetchall=False)['total'] return jsonify({ 'success': True, 'data': licenses, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'pages': (total + per_page - 1) // per_page } }) @app.route('/api/v1/admin/licenses/', methods=['GET']) @require_admin_auth def get_license(license_id): """Get detailed license information""" query = """ SELECT l.*, c.name as customer_name, c.email as customer_email, array_agg(DISTINCT lh.hardware_id) as hardware_ids, COUNT(DISTINCT lh.hardware_id) as device_count, MIN(lh.timestamp) as first_activation, MAX(lh.timestamp) as last_activity FROM licenses l LEFT JOIN customers c ON l.customer_id = c.id LEFT JOIN license_heartbeats lh ON l.id = lh.license_id WHERE l.id = %s GROUP BY l.id, c.name, c.email """ license_data = execute_query(query, [license_id], fetchall=False) if not license_data: return jsonify({'error': 'License not found'}), 404 # Get recent activity activity_query = """ SELECT hardware_id, ip_address, timestamp, user_agent FROM license_heartbeats WHERE license_id = %s ORDER BY timestamp DESC LIMIT 20 """ recent_activity = execute_query(activity_query, [license_id]) license_data['recent_activity'] = recent_activity return jsonify({ 'success': True, 'data': license_data }) @app.route('/api/v1/admin/licenses', methods=['POST']) @require_admin_auth def create_license(): """Create a new license""" data = request.get_json() required_fields = ['customer_id', 'license_type', 'device_limit'] if not all(field in data for field in required_fields): return jsonify({'error': 'Missing required fields'}), 400 license_id = str(uuid.uuid4()) license_key = f"{data['license_type'].upper()}-{uuid.uuid4().hex[:8].upper()}-{uuid.uuid4().hex[:8].upper()}" query = """ INSERT INTO licenses (id, customer_id, license_key, license_type, device_limit, is_active, expires_at, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING * """ expires_at = None if data.get('expires_at'): expires_at = datetime.fromisoformat(data['expires_at']) params = [ license_id, data['customer_id'], license_key, data['license_type'], data['device_limit'], data.get('is_active', True), expires_at, datetime.utcnow() ] new_license = execute_query(query, params, fetchall=False) log_admin_action('create_license', 'license', license_id, { 'license_key': license_key, 'customer_id': data['customer_id'], 'license_type': data['license_type'] }) return jsonify({ 'success': True, 'data': new_license }), 201 @app.route('/api/v1/admin/licenses/', methods=['PUT']) @require_admin_auth def update_license(license_id): """Update license information""" data = request.get_json() # Build dynamic update query update_fields = [] params = [] allowed_fields = ['is_active', 'device_limit', 'expires_at', 'notes'] for field in allowed_fields: if field in data: update_fields.append(f"{field} = %s") params.append(data[field]) if not update_fields: return jsonify({'error': 'No fields to update'}), 400 query = f""" UPDATE licenses SET {', '.join(update_fields)}, updated_at = %s WHERE id = %s RETURNING * """ params.extend([datetime.utcnow(), license_id]) updated_license = execute_query(query, params, fetchall=False) if not updated_license: return jsonify({'error': 'License not found'}), 404 log_admin_action('update_license', 'license', license_id, data) # Clear cache redis_client.delete(f"license:{license_id}") return jsonify({ 'success': True, 'data': updated_license }) @app.route('/api/v1/admin/licenses/', methods=['DELETE']) @require_admin_auth def delete_license(license_id): """Delete a license (soft delete by deactivating)""" query = """ UPDATE licenses SET is_active = false, updated_at = %s WHERE id = %s RETURNING * """ deleted_license = execute_query(query, [datetime.utcnow(), license_id], fetchall=False) if not deleted_license: return jsonify({'error': 'License not found'}), 404 log_admin_action('delete_license', 'license', license_id, { 'license_key': deleted_license['license_key'] }) # Clear cache redis_client.delete(f"license:{license_id}") return jsonify({ 'success': True, 'message': 'License deactivated successfully' }) # Batch Operations @app.route('/api/v1/admin/licenses/batch', methods=['POST']) @require_admin_auth def batch_create_licenses(): """Create multiple licenses at once""" data = request.get_json() if 'licenses' not in data or not isinstance(data['licenses'], list): return jsonify({'error': 'Invalid request format'}), 400 created_licenses = [] for license_data in data['licenses']: license_id = str(uuid.uuid4()) license_key = f"{license_data['license_type'].upper()}-{uuid.uuid4().hex[:8].upper()}-{uuid.uuid4().hex[:8].upper()}" query = """ INSERT INTO licenses (id, customer_id, license_key, license_type, device_limit, is_active, expires_at, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING * """ params = [ license_id, license_data['customer_id'], license_key, license_data['license_type'], license_data.get('device_limit', 1), license_data.get('is_active', True), license_data.get('expires_at'), datetime.utcnow() ] new_license = execute_query(query, params, fetchall=False) created_licenses.append(new_license) log_admin_action('batch_create_licenses', 'license', None, { 'count': len(created_licenses), 'customer_ids': list(set(l['customer_id'] for l in created_licenses)) }) return jsonify({ 'success': True, 'data': created_licenses, 'count': len(created_licenses) }), 201 @app.route('/api/v1/admin/licenses/batch/activate', methods=['POST']) @require_admin_auth def batch_activate_licenses(): """Batch activate/deactivate licenses""" data = request.get_json() if 'license_ids' not in data or 'is_active' not in data: return jsonify({'error': 'Missing required fields'}), 400 query = """ UPDATE licenses SET is_active = %s, updated_at = %s WHERE id = ANY(%s) """ affected = execute_query( query, [data['is_active'], datetime.utcnow(), data['license_ids']] ) log_admin_action('batch_update_licenses', 'license', None, { 'action': 'activate' if data['is_active'] else 'deactivate', 'count': affected, 'license_ids': data['license_ids'] }) # Clear cache for all affected licenses for license_id in data['license_ids']: redis_client.delete(f"license:{license_id}") return jsonify({ 'success': True, 'affected': affected }) # Customer Management @app.route('/api/v1/admin/customers', methods=['GET']) @require_admin_auth def list_customers(): """List all customers with stats""" page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 50)) search = request.args.get('search') offset = (page - 1) * per_page query = """ SELECT c.*, COUNT(DISTINCT l.id) as license_count, COUNT(DISTINCT CASE WHEN l.is_active THEN l.id END) as active_licenses, MAX(lh.timestamp) as last_activity FROM customers c LEFT JOIN licenses l ON c.id = l.customer_id LEFT JOIN license_heartbeats lh ON l.id = lh.license_id AND lh.timestamp > NOW() - INTERVAL '30 days' """ params = [] if search: query += " WHERE c.name ILIKE %s OR c.email ILIKE %s" params.extend([f'%{search}%', f'%{search}%']) query += """ GROUP BY c.id ORDER BY c.created_at DESC LIMIT %s OFFSET %s """ params.extend([per_page, offset]) customers = execute_query(query, params) # Get total count count_query = "SELECT COUNT(*) as total FROM customers" if search: count_query += " WHERE name ILIKE %s OR email ILIKE %s" total = execute_query(count_query, [f'%{search}%', f'%{search}%'], fetchall=False)['total'] else: total = execute_query(count_query, fetchall=False)['total'] return jsonify({ 'success': True, 'data': customers, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'pages': (total + per_page - 1) // per_page } }) # System Configuration @app.route('/api/v1/admin/config/feature-flags', methods=['GET']) @require_admin_auth def list_feature_flags(): """List all feature flags""" query = "SELECT * FROM feature_flags ORDER BY name" flags = execute_query(query) return jsonify({ 'success': True, 'data': flags }) @app.route('/api/v1/admin/config/feature-flags/', methods=['PUT']) @require_admin_auth def update_feature_flag(flag_id): """Update feature flag status""" data = request.get_json() if 'enabled' not in data: return jsonify({'error': 'Missing enabled field'}), 400 query = """ UPDATE feature_flags SET enabled = %s, updated_at = %s WHERE id = %s RETURNING * """ updated_flag = execute_query( query, [data['enabled'], datetime.utcnow(), flag_id], fetchall=False ) if not updated_flag: return jsonify({'error': 'Feature flag not found'}), 404 log_admin_action('update_feature_flag', 'feature_flag', flag_id, { 'name': updated_flag['name'], 'enabled': data['enabled'] }) # Clear feature flag cache redis_client.delete('feature_flags:all') return jsonify({ 'success': True, 'data': updated_flag }) # API Key Management @app.route('/api/v1/admin/api-keys', methods=['GET']) @require_admin_auth def list_api_keys(): """List all API keys""" query = """ SELECT ak.*, arl.requests_per_minute, arl.requests_per_hour FROM api_clients ak LEFT JOIN api_rate_limits arl ON ak.api_key = arl.api_key ORDER BY ak.created_at DESC """ api_keys = execute_query(query) return jsonify({ 'success': True, 'data': api_keys }) @app.route('/api/v1/admin/api-keys', methods=['POST']) @require_admin_auth def create_api_key(): """Create new API key""" data = request.get_json() if 'name' not in data: return jsonify({'error': 'Missing name field'}), 400 api_key = f"sk_{uuid.uuid4().hex}" # Create API client client_query = """ INSERT INTO api_clients (api_key, name, is_active, created_at) VALUES (%s, %s, %s, %s) RETURNING * """ new_client = execute_query( client_query, [api_key, data['name'], True, datetime.utcnow()], fetchall=False ) # Create rate limits rate_query = """ INSERT INTO api_rate_limits (api_key, requests_per_minute, requests_per_hour, requests_per_day) VALUES (%s, %s, %s, %s) """ execute_query( rate_query, [ api_key, data.get('requests_per_minute', 60), data.get('requests_per_hour', 1000), data.get('requests_per_day', 10000) ] ) log_admin_action('create_api_key', 'api_key', api_key, { 'name': data['name'] }) return jsonify({ 'success': True, 'data': new_client }), 201 # Audit Log @app.route('/api/v1/admin/audit-log', methods=['GET']) @require_admin_auth def get_audit_log(): """Get audit log entries""" page = int(request.args.get('page', 1)) per_page = int(request.args.get('per_page', 100)) action = request.args.get('action') username = request.args.get('username') start_date = request.args.get('start_date') end_date = request.args.get('end_date') offset = (page - 1) * per_page query = "SELECT * FROM audit_log WHERE 1=1" params = [] if action: query += " AND action = %s" params.append(action) if username: query += " AND username = %s" params.append(username) if start_date: query += " AND timestamp >= %s" params.append(datetime.fromisoformat(start_date)) if end_date: query += " AND timestamp <= %s" params.append(datetime.fromisoformat(end_date)) query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" params.extend([per_page, offset]) entries = execute_query(query, params) return jsonify({ 'success': True, 'data': entries }) # Device Management @app.route('/api/v1/admin/licenses//devices', methods=['GET']) @require_admin_auth def list_license_devices(license_id): """List all devices for a license""" query = """ SELECT DISTINCT hardware_id, MIN(timestamp) as first_seen, MAX(timestamp) as last_seen, COUNT(*) as total_heartbeats, array_agg(DISTINCT ip_address) as ip_addresses FROM license_heartbeats WHERE license_id = %s GROUP BY hardware_id ORDER BY last_seen DESC """ devices = execute_query(query, [license_id]) return jsonify({ 'success': True, 'data': devices }) @app.route('/api/v1/admin/licenses//devices/', methods=['DELETE']) @require_admin_auth def remove_device(license_id, hardware_id): """Remove a device from a license""" # Mark device as inactive in activation events query = """ INSERT INTO activation_events (id, license_id, event_type, hardware_id, success, created_at) VALUES (%s, %s, 'deactivation', %s, true, %s) """ execute_query( query, [str(uuid.uuid4()), license_id, hardware_id, datetime.utcnow()] ) log_admin_action('remove_device', 'license', license_id, { 'hardware_id': hardware_id }) return jsonify({ 'success': True, 'message': 'Device removed successfully' }) # System Stats @app.route('/api/v1/admin/stats/overview', methods=['GET']) @require_admin_auth def get_system_overview(): """Get system overview statistics""" stats = {} # License stats license_stats = execute_query(""" SELECT COUNT(*) as total_licenses, COUNT(CASE WHEN is_active THEN 1 END) as active_licenses, COUNT(CASE WHEN expires_at < NOW() THEN 1 END) as expired_licenses, COUNT(CASE WHEN is_test THEN 1 END) as test_licenses FROM licenses """, fetchall=False) stats['licenses'] = license_stats # Customer stats customer_stats = execute_query(""" SELECT COUNT(*) as total_customers, COUNT(CASE WHEN created_at > NOW() - INTERVAL '30 days' THEN 1 END) as new_customers FROM customers """, fetchall=False) stats['customers'] = customer_stats # Activity stats activity_stats = execute_query(""" SELECT COUNT(DISTINCT license_id) as active_licenses_24h, COUNT(DISTINCT hardware_id) as active_devices_24h, COUNT(*) as total_validations_24h FROM license_heartbeats WHERE timestamp > NOW() - INTERVAL '24 hours' """, fetchall=False) stats['activity'] = activity_stats # Anomaly stats anomaly_stats = execute_query(""" SELECT COUNT(*) as total_anomalies, COUNT(CASE WHEN resolved = false THEN 1 END) as unresolved_anomalies, COUNT(CASE WHEN severity = 'critical' AND resolved = false THEN 1 END) as critical_anomalies FROM anomaly_detections WHERE detected_at > NOW() - INTERVAL '7 days' """, fetchall=False) stats['anomalies'] = anomaly_stats return jsonify({ 'success': True, 'data': stats, 'timestamp': datetime.utcnow().isoformat() }) if __name__ == '__main__': logger.info(f"Starting Admin API Service on port {SERVICE_PORT}") app.run(host='0.0.0.0', port=SERVICE_PORT, debug=os.environ.get('FLASK_ENV') == 'development')