from flask import Flask, jsonify, request from flask_cors import CORS from datetime import datetime, timedelta import os import psycopg2 from psycopg2.extras import RealDictCursor from psycopg2.pool import SimpleConnectionPool import redis import json import logging from functools import wraps import jwt from collections import defaultdict import numpy as np from prometheus_flask_exporter import PrometheusMetrics logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Initialize Prometheus metrics metrics = PrometheusMetrics(app) metrics.info('analytics_service_info', 'Analytics Service Information', version='1.0.0') # Configuration DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql://postgres:postgres@postgres:5432/v2_adminpanel') REDIS_URL = os.environ.get('REDIS_URL', 'redis://redis:6379/2') JWT_SECRET = os.environ.get('JWT_SECRET', 'your-secret-key') SERVICE_PORT = 5003 # Database connection pool db_pool = SimpleConnectionPool(1, 20, DATABASE_URL) # Redis client redis_client = redis.from_url(REDIS_URL, decode_responses=True) # Cache decorator def cache_result(ttl=300): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): cache_key = f"analytics:{f.__name__}:{str(args)}:{str(kwargs)}" cached = redis_client.get(cache_key) if cached: return json.loads(cached) result = f(*args, **kwargs) redis_client.setex(cache_key, ttl, json.dumps(result, default=str)) return result return wrapper return decorator # JWT validation decorator def require_auth(f): @wraps(f) def wrapper(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid authorization header'}), 401 token = auth_header.split(' ')[1] try: payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) request.jwt_payload = payload except jwt.ExpiredSignatureError: return jsonify({'error': 'Token expired'}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token'}), 401 return f(*args, **kwargs) return wrapper # Database query helper def execute_query(query, params=None, fetchall=True): conn = db_pool.getconn() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(query, params) if fetchall: return cur.fetchall() return cur.fetchone() finally: db_pool.putconn(conn) # Analytics calculations class AnalyticsService: @staticmethod @cache_result(ttl=60) def get_usage_statistics(customer_id=None, days=30): """Get usage statistics for licenses""" base_query = """ SELECT DATE(lh.timestamp) as date, COUNT(DISTINCT lh.license_id) as active_licenses, COUNT(DISTINCT lh.hardware_id) as active_devices, COUNT(*) as total_heartbeats, COUNT(DISTINCT lh.session_data->>'app_version') as app_versions FROM license_heartbeats lh JOIN licenses l ON l.id = lh.license_id WHERE lh.timestamp >= NOW() - INTERVAL '%s days' """ params = [days] if customer_id: base_query += " AND l.customer_id = %s" params.append(customer_id) base_query += " GROUP BY DATE(lh.timestamp) ORDER BY date DESC" return execute_query(base_query, params) @staticmethod @cache_result(ttl=300) def get_performance_metrics(days=7): """Get system performance metrics""" query = """ SELECT DATE_TRUNC('hour', timestamp) as hour, AVG(EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (PARTITION BY license_id ORDER BY timestamp)))) as avg_heartbeat_interval, COUNT(*) as validation_count, COUNT(DISTINCT license_id) as unique_licenses, COUNT(DISTINCT hardware_id) as unique_devices FROM license_heartbeats WHERE timestamp >= NOW() - INTERVAL '%s days' GROUP BY DATE_TRUNC('hour', timestamp) ORDER BY hour DESC """ return execute_query(query, [days]) @staticmethod @cache_result(ttl=120) def get_anomaly_statistics(days=30): """Get anomaly detection statistics""" query = """ SELECT anomaly_type, severity, COUNT(*) as count, COUNT(CASE WHEN resolved = false THEN 1 END) as unresolved_count, AVG(CASE WHEN resolved = true THEN EXTRACT(EPOCH FROM (resolved_at - detected_at))/3600 END) as avg_resolution_hours FROM anomaly_detections WHERE detected_at >= NOW() - INTERVAL '%s days' GROUP BY anomaly_type, severity ORDER BY count DESC """ return execute_query(query, [days]) @staticmethod @cache_result(ttl=300) def get_license_distribution(): """Get license distribution statistics""" query = """ SELECT l.license_type, l.is_test, COUNT(*) as total_count, COUNT(CASE WHEN l.is_active = true THEN 1 END) as active_count, COUNT(CASE WHEN lh.timestamp >= NOW() - INTERVAL '1 hour' THEN 1 END) as recently_active, AVG(l.device_limit) as avg_device_limit FROM licenses l LEFT JOIN LATERAL ( SELECT timestamp FROM license_heartbeats WHERE license_id = l.id ORDER BY timestamp DESC LIMIT 1 ) lh ON true GROUP BY l.license_type, l.is_test """ return execute_query(query) @staticmethod def get_revenue_impact(days=30): """Calculate revenue impact from license usage""" query = """ WITH license_activity AS ( SELECT l.id, l.customer_id, l.license_type, l.price, COUNT(DISTINCT DATE(lh.timestamp)) as active_days, COUNT(DISTINCT lh.hardware_id) as devices_used FROM licenses l LEFT JOIN license_heartbeats lh ON l.id = lh.license_id AND lh.timestamp >= NOW() - INTERVAL '%s days' WHERE l.is_test = false GROUP BY l.id, l.customer_id, l.license_type, l.price ) SELECT license_type, COUNT(*) as total_licenses, SUM(price) as total_revenue, AVG(active_days) as avg_active_days, AVG(devices_used) as avg_devices_used, SUM(CASE WHEN active_days > 0 THEN price ELSE 0 END) as active_revenue, SUM(CASE WHEN active_days = 0 THEN price ELSE 0 END) as inactive_revenue FROM license_activity GROUP BY license_type """ return execute_query(query, [days]) @staticmethod @cache_result(ttl=600) def get_geographic_distribution(): """Get geographic distribution of license usage""" query = """ SELECT lh.ip_address::text, COUNT(DISTINCT lh.license_id) as license_count, COUNT(DISTINCT lh.hardware_id) as device_count, COUNT(*) as total_validations, MAX(lh.timestamp) as last_seen FROM license_heartbeats lh WHERE lh.timestamp >= NOW() - INTERVAL '24 hours' AND lh.ip_address IS NOT NULL GROUP BY lh.ip_address ORDER BY total_validations DESC LIMIT 100 """ return execute_query(query) @staticmethod def get_usage_patterns(license_id=None): """Analyze usage patterns for predictive analytics""" base_query = """ WITH hourly_usage AS ( SELECT EXTRACT(HOUR FROM timestamp) as hour_of_day, EXTRACT(DOW FROM timestamp) as day_of_week, COUNT(*) as usage_count FROM license_heartbeats WHERE timestamp >= NOW() - INTERVAL '30 days' """ params = [] if license_id: base_query += " AND license_id = %s" params.append(license_id) base_query += """ GROUP BY hour_of_day, day_of_week ) SELECT hour_of_day, day_of_week, usage_count, AVG(usage_count) OVER (PARTITION BY hour_of_day) as avg_hourly_usage, AVG(usage_count) OVER (PARTITION BY day_of_week) as avg_daily_usage FROM hourly_usage ORDER BY day_of_week, hour_of_day """ return execute_query(base_query, params) @staticmethod def calculate_churn_risk(): """Calculate churn risk based on usage patterns""" query = """ WITH recent_activity AS ( SELECT l.id, l.customer_id, l.expires_at, MAX(lh.timestamp) as last_activity, COUNT(DISTINCT DATE(lh.timestamp)) as active_days_30d, COUNT(DISTINCT DATE(lh.timestamp)) FILTER (WHERE lh.timestamp >= NOW() - INTERVAL '7 days') as active_days_7d FROM licenses l LEFT JOIN license_heartbeats lh ON l.id = lh.license_id AND lh.timestamp >= NOW() - INTERVAL '30 days' WHERE l.is_test = false GROUP BY l.id, l.customer_id, l.expires_at ) SELECT customer_id, COUNT(*) as total_licenses, AVG(EXTRACT(EPOCH FROM (NOW() - last_activity))/86400) as avg_days_since_activity, AVG(active_days_30d) as avg_active_days_30d, AVG(active_days_7d) as avg_active_days_7d, MIN(expires_at) as next_expiry, CASE WHEN AVG(active_days_7d) = 0 AND AVG(active_days_30d) > 0 THEN 'high' WHEN AVG(active_days_30d) < 5 THEN 'medium' ELSE 'low' END as churn_risk FROM recent_activity GROUP BY customer_id HAVING COUNT(*) > 0 ORDER BY churn_risk DESC, avg_days_since_activity DESC """ return execute_query(query) # API Routes @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'service': 'analytics-service', 'timestamp': datetime.utcnow().isoformat() }) @app.route('/api/v1/analytics/usage', methods=['GET']) @require_auth def get_usage_stats(): """Get usage statistics""" customer_id = request.args.get('customer_id') days = int(request.args.get('days', 30)) stats = AnalyticsService.get_usage_statistics(customer_id, days) return jsonify({ 'success': True, 'data': stats, 'period_days': days, 'customer_id': customer_id }) @app.route('/api/v1/analytics/performance', methods=['GET']) @require_auth def get_performance(): """Get performance metrics""" days = int(request.args.get('days', 7)) metrics = AnalyticsService.get_performance_metrics(days) return jsonify({ 'success': True, 'data': metrics, 'period_days': days }) @app.route('/api/v1/analytics/anomalies', methods=['GET']) @require_auth def get_anomalies(): """Get anomaly statistics""" days = int(request.args.get('days', 30)) anomalies = AnalyticsService.get_anomaly_statistics(days) return jsonify({ 'success': True, 'data': anomalies, 'period_days': days }) @app.route('/api/v1/analytics/distribution', methods=['GET']) @require_auth def get_distribution(): """Get license distribution""" distribution = AnalyticsService.get_license_distribution() return jsonify({ 'success': True, 'data': distribution }) @app.route('/api/v1/analytics/revenue', methods=['GET']) @require_auth def get_revenue(): """Get revenue impact analysis""" days = int(request.args.get('days', 30)) revenue = AnalyticsService.get_revenue_impact(days) return jsonify({ 'success': True, 'data': revenue, 'period_days': days }) @app.route('/api/v1/analytics/geographic', methods=['GET']) @require_auth def get_geographic(): """Get geographic distribution""" geo_data = AnalyticsService.get_geographic_distribution() return jsonify({ 'success': True, 'data': geo_data }) @app.route('/api/v1/analytics/patterns', methods=['GET']) @require_auth def get_patterns(): """Get usage patterns""" license_id = request.args.get('license_id') patterns = AnalyticsService.get_usage_patterns(license_id) return jsonify({ 'success': True, 'data': patterns, 'license_id': license_id }) @app.route('/api/v1/analytics/churn-risk', methods=['GET']) @require_auth def get_churn_risk(): """Get churn risk analysis""" churn_data = AnalyticsService.calculate_churn_risk() return jsonify({ 'success': True, 'data': churn_data }) @app.route('/api/v1/analytics/summary/', methods=['GET']) @require_auth def get_customer_summary(customer_id): """Get comprehensive analytics summary for a customer""" usage = AnalyticsService.get_usage_statistics(customer_id, 30) # Calculate summary metrics total_heartbeats = sum(day['total_heartbeats'] for day in usage) active_days = len([day for day in usage if day['active_licenses'] > 0]) return jsonify({ 'success': True, 'customer_id': customer_id, 'summary': { 'total_heartbeats_30d': total_heartbeats, 'active_days_30d': active_days, 'average_daily_devices': np.mean([day['active_devices'] for day in usage]) if usage else 0, 'usage_trend': usage[:7] if len(usage) >= 7 else usage } }) # Real-time analytics endpoint (for websocket in future) @app.route('/api/v1/analytics/realtime', methods=['GET']) @require_auth def get_realtime_stats(): """Get real-time statistics for dashboard""" # Get stats from last 5 minutes query = """ SELECT COUNT(DISTINCT license_id) as active_licenses, COUNT(DISTINCT hardware_id) as active_devices, COUNT(*) as validations_5min, COUNT(*) / 5.0 as validations_per_minute FROM license_heartbeats WHERE timestamp >= NOW() - INTERVAL '5 minutes' """ realtime = execute_query(query, fetchall=False) # Get current anomalies anomaly_query = """ SELECT COUNT(*) as unresolved_anomalies FROM anomaly_detections WHERE resolved = false """ anomalies = execute_query(anomaly_query, fetchall=False) return jsonify({ 'success': True, 'timestamp': datetime.utcnow().isoformat(), 'data': { 'active_licenses': realtime['active_licenses'] or 0, 'active_devices': realtime['active_devices'] or 0, 'validations_5min': realtime['validations_5min'] or 0, 'validations_per_minute': float(realtime['validations_per_minute'] or 0), 'unresolved_anomalies': anomalies['unresolved_anomalies'] or 0 } }) if __name__ == '__main__': logger.info(f"Starting Analytics Service on port {SERVICE_PORT}") app.run(host='0.0.0.0', port=SERVICE_PORT, debug=os.environ.get('FLASK_ENV') == 'development')