import os import sys from flask import Flask, request, jsonify from flask_cors import CORS import logging from functools import wraps from datetime import datetime, timedelta import asyncio from concurrent.futures import ThreadPoolExecutor # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from config import get_config from repositories.license_repo import LicenseRepository from repositories.cache_repo import CacheRepository from events.event_bus import EventBus, Event, EventTypes from models import AnomalyType, Severity # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__) config = get_config() app.config.from_object(config) CORS(app) # Initialize dependencies license_repo = LicenseRepository(config.DATABASE_URL) cache_repo = CacheRepository(config.REDIS_URL) event_bus = EventBus(config.RABBITMQ_URL) # Thread pool for async operations executor = ThreadPoolExecutor(max_workers=10) def require_auth(f): """Decorator to require authentication""" @wraps(f) def decorated_function(*args, **kwargs): api_key = request.headers.get('X-API-Key') if not api_key: return jsonify({"error": "Missing API key"}), 401 # Simple validation for now if not api_key.startswith('sk_'): return jsonify({"error": "Invalid API key"}), 401 return f(*args, **kwargs) return decorated_function @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ "status": "healthy", "service": "analytics", "timestamp": datetime.utcnow().isoformat() }) @app.route('/api/v1/analytics/licenses//patterns', methods=['GET']) @require_auth def analyze_license_patterns(license_id): """Analyze usage patterns for a license""" days = request.args.get('days', 30, type=int) # Get usage data query = """ WITH hourly_usage AS ( SELECT DATE_TRUNC('hour', timestamp) as hour, COUNT(*) as validations, COUNT(DISTINCT hardware_id) as devices, COUNT(DISTINCT ip_address) as ips FROM license_heartbeats WHERE license_id = %s AND timestamp > NOW() - INTERVAL '%s days' GROUP BY DATE_TRUNC('hour', timestamp) ), daily_patterns AS ( SELECT EXTRACT(DOW FROM hour) as day_of_week, EXTRACT(HOUR FROM hour) as hour_of_day, AVG(validations) as avg_validations, MAX(devices) as max_devices FROM hourly_usage GROUP BY day_of_week, hour_of_day ) SELECT * FROM daily_patterns ORDER BY day_of_week, hour_of_day """ patterns = license_repo.execute_query(query, (license_id, days)) # Detect anomalies anomalies = detect_usage_anomalies(license_id, patterns) return jsonify({ "license_id": license_id, "days_analyzed": days, "patterns": patterns, "anomalies": anomalies }) @app.route('/api/v1/analytics/licenses//anomalies/detect', methods=['POST']) @require_auth def detect_anomalies(license_id): """Manually trigger anomaly detection for a license""" # Run multiple anomaly detection checks anomalies = [] # Check for multiple IPs ip_anomalies = check_multiple_ips(license_id) anomalies.extend(ip_anomalies) # Check for rapid hardware changes hw_anomalies = check_rapid_hardware_changes(license_id) anomalies.extend(hw_anomalies) # Check for concurrent usage concurrent_anomalies = check_concurrent_usage(license_id) anomalies.extend(concurrent_anomalies) # Check for geographic anomalies geo_anomalies = check_geographic_anomalies(license_id) anomalies.extend(geo_anomalies) # Store detected anomalies for anomaly in anomalies: store_anomaly(license_id, anomaly) return jsonify({ "license_id": license_id, "anomalies_detected": len(anomalies), "anomalies": anomalies }) @app.route('/api/v1/analytics/licenses//risk-score', methods=['GET']) @require_auth def get_risk_score(license_id): """Calculate risk score for a license""" # Get recent anomalies query = """ SELECT anomaly_type, severity, detected_at FROM anomaly_detections WHERE license_id = %s AND detected_at > NOW() - INTERVAL '30 days' AND resolved = false """ anomalies = license_repo.execute_query(query, (license_id,)) # Calculate risk score risk_score = 0 severity_weights = { 'low': 10, 'medium': 25, 'high': 50, 'critical': 100 } for anomaly in anomalies: weight = severity_weights.get(anomaly['severity'], 0) # Recent anomalies have higher weight days_old = (datetime.utcnow() - anomaly['detected_at']).days recency_factor = max(0.5, 1 - (days_old / 30)) risk_score += weight * recency_factor # Normalize to 0-100 risk_score = min(100, risk_score) # Determine risk level if risk_score < 20: risk_level = "low" elif risk_score < 50: risk_level = "medium" elif risk_score < 80: risk_level = "high" else: risk_level = "critical" return jsonify({ "license_id": license_id, "risk_score": round(risk_score, 2), "risk_level": risk_level, "active_anomalies": len(anomalies), "factors": anomalies }) @app.route('/api/v1/analytics/reports/usage', methods=['GET']) @require_auth def generate_usage_report(): """Generate usage report for all licenses""" days = request.args.get('days', 30, type=int) query = """ WITH license_stats AS ( SELECT l.id, l.license_key, l.customer_id, c.name as customer_name, l.max_devices, l.is_test, l.expires_at, COUNT(DISTINCT lh.hardware_id) as active_devices, COUNT(lh.*) as total_validations, MAX(lh.timestamp) as last_validation FROM licenses l LEFT JOIN customers c ON l.customer_id = c.id LEFT JOIN license_heartbeats lh ON l.id = lh.license_id AND lh.timestamp > NOW() - INTERVAL '%s days' WHERE l.is_active = true GROUP BY l.id, l.license_key, l.customer_id, c.name, l.max_devices, l.is_test, l.expires_at ) SELECT *, CASE WHEN total_validations = 0 THEN 'inactive' WHEN active_devices > max_devices THEN 'over_limit' WHEN expires_at < NOW() THEN 'expired' ELSE 'active' END as status, ROUND((active_devices::numeric / NULLIF(max_devices, 0)) * 100, 2) as device_utilization FROM license_stats ORDER BY total_validations DESC """ report = license_repo.execute_query(query, (days,)) # Summary statistics summary = { "total_licenses": len(report), "active_licenses": len([r for r in report if r['status'] == 'active']), "inactive_licenses": len([r for r in report if r['status'] == 'inactive']), "over_limit_licenses": len([r for r in report if r['status'] == 'over_limit']), "expired_licenses": len([r for r in report if r['status'] == 'expired']), "total_validations": sum(r['total_validations'] for r in report), "average_device_utilization": sum(r['device_utilization'] or 0 for r in report) / len(report) if report else 0 } return jsonify({ "period_days": days, "generated_at": datetime.utcnow().isoformat(), "summary": summary, "licenses": report }) @app.route('/api/v1/analytics/reports/revenue', methods=['GET']) @require_auth def generate_revenue_report(): """Generate revenue analytics report""" # This would need pricing information in the database # For now, return a placeholder return jsonify({ "message": "Revenue reporting requires pricing data integration", "placeholder": True }) def detect_usage_anomalies(license_id, patterns): """Detect anomalies in usage patterns""" anomalies = [] if not patterns: return anomalies # Calculate statistics validations = [p['avg_validations'] for p in patterns] if validations: avg_validations = sum(validations) / len(validations) max_validations = max(validations) # Detect spikes for pattern in patterns: if pattern['avg_validations'] > avg_validations * 3: anomalies.append({ "type": AnomalyType.SUSPICIOUS_PATTERN.value, "severity": Severity.MEDIUM.value, "details": { "day": pattern['day_of_week'], "hour": pattern['hour_of_day'], "validations": pattern['avg_validations'], "average": avg_validations } }) return anomalies def check_multiple_ips(license_id): """Check for multiple IP addresses""" query = """ SELECT COUNT(DISTINCT ip_address) as ip_count, array_agg(DISTINCT ip_address) as ips FROM license_heartbeats WHERE license_id = %s AND timestamp > NOW() - INTERVAL '1 hour' """ result = license_repo.execute_one(query, (license_id,)) anomalies = [] if result and result['ip_count'] > config.ANOMALY_MULTIPLE_IPS_THRESHOLD: anomalies.append({ "type": AnomalyType.MULTIPLE_IPS.value, "severity": Severity.HIGH.value, "details": { "ip_count": result['ip_count'], "ips": result['ips'][:10], # Limit to 10 IPs "threshold": config.ANOMALY_MULTIPLE_IPS_THRESHOLD } }) return anomalies def check_rapid_hardware_changes(license_id): """Check for rapid hardware ID changes""" query = """ SELECT hardware_id, created_at FROM activation_events WHERE license_id = %s AND event_type IN ('activation', 'transfer') AND created_at > NOW() - INTERVAL '1 hour' AND success = true ORDER BY created_at DESC """ events = license_repo.execute_query(query, (license_id,)) anomalies = [] if len(events) > 1: # Check time between changes for i in range(len(events) - 1): time_diff = (events[i]['created_at'] - events[i+1]['created_at']).total_seconds() / 60 if time_diff < config.ANOMALY_RAPID_HARDWARE_CHANGE_MINUTES: anomalies.append({ "type": AnomalyType.RAPID_HARDWARE_CHANGE.value, "severity": Severity.HIGH.value, "details": { "hardware_ids": [events[i]['hardware_id'], events[i+1]['hardware_id']], "time_difference_minutes": round(time_diff, 2), "threshold_minutes": config.ANOMALY_RAPID_HARDWARE_CHANGE_MINUTES } }) return anomalies def check_concurrent_usage(license_id): """Check for concurrent usage from different devices""" query = """ WITH concurrent_sessions AS ( SELECT h1.hardware_id as hw1, h2.hardware_id as hw2, h1.timestamp as time1, h2.timestamp as time2 FROM license_heartbeats h1 JOIN license_heartbeats h2 ON h1.license_id = h2.license_id WHERE h1.license_id = %s AND h2.license_id = %s AND h1.hardware_id != h2.hardware_id AND h1.timestamp > NOW() - INTERVAL '15 minutes' AND h2.timestamp > NOW() - INTERVAL '15 minutes' AND ABS(EXTRACT(EPOCH FROM h1.timestamp - h2.timestamp)) < 300 ) SELECT COUNT(*) as concurrent_count FROM concurrent_sessions """ result = license_repo.execute_one(query, (license_id, license_id)) anomalies = [] if result and result['concurrent_count'] > 0: anomalies.append({ "type": AnomalyType.CONCURRENT_USE.value, "severity": Severity.CRITICAL.value, "details": { "concurrent_sessions": result['concurrent_count'], "timeframe_minutes": 5 } }) return anomalies def check_geographic_anomalies(license_id): """Check for geographic anomalies (requires IP geolocation)""" # This would require IP geolocation service integration # For now, return empty list return [] def store_anomaly(license_id, anomaly): """Store detected anomaly in database""" query = """ INSERT INTO anomaly_detections (license_id, anomaly_type, severity, details) VALUES (%s, %s, %s, %s) ON CONFLICT (license_id, anomaly_type, details) DO NOTHING """ import json license_repo.execute_insert(query, ( license_id, anomaly['type'], anomaly['severity'], json.dumps(anomaly['details']) )) # Publish event event_bus.publish(Event( EventTypes.ANOMALY_DETECTED, { "license_id": license_id, "anomaly": anomaly }, "analytics" )) @app.route('/api/v1/analytics/dashboard', methods=['GET']) @require_auth def get_dashboard_data(): """Get analytics dashboard data""" query = """ WITH current_stats AS ( SELECT COUNT(DISTINCT license_id) as active_licenses, COUNT(DISTINCT hardware_id) as active_devices, COUNT(*) as validations_today FROM license_heartbeats WHERE timestamp > CURRENT_DATE ), anomaly_stats AS ( SELECT COUNT(*) as total_anomalies, COUNT(*) FILTER (WHERE severity = 'critical') as critical_anomalies, COUNT(*) FILTER (WHERE resolved = false) as unresolved_anomalies FROM anomaly_detections WHERE detected_at > CURRENT_DATE - INTERVAL '7 days' ), trend_data AS ( SELECT DATE(timestamp) as date, COUNT(*) as validations, COUNT(DISTINCT license_id) as licenses, COUNT(DISTINCT hardware_id) as devices FROM license_heartbeats WHERE timestamp > CURRENT_DATE - INTERVAL '7 days' GROUP BY DATE(timestamp) ORDER BY date ) SELECT cs.*, ans.*, (SELECT json_agg(td.*) FROM trend_data td) as trends FROM current_stats cs, anomaly_stats ans """ dashboard_data = license_repo.execute_one(query) return jsonify(dashboard_data or {}) @app.errorhandler(404) def not_found(error): return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"Internal error: {error}") return jsonify({"error": "Internal server error"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5003, debug=True)