412 Zeilen
16 KiB
Python
412 Zeilen
16 KiB
Python
"""
|
|
Performance Monitor - Non-intrusive monitoring for race condition detection
|
|
Debug-only monitoring without production performance impact
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
import functools
|
|
import traceback
|
|
from typing import Dict, Any, Optional, Callable, List
|
|
from collections import defaultdict, deque
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
import json
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OperationMetrics:
|
|
"""Metriken für eine einzelne Operation"""
|
|
operation_name: str
|
|
thread_id: int
|
|
thread_name: str
|
|
start_time: float
|
|
end_time: Optional[float] = None
|
|
duration: Optional[float] = None
|
|
success: bool = True
|
|
error_message: Optional[str] = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
stack_trace: Optional[str] = None
|
|
|
|
def complete(self, success: bool = True, error_message: Optional[str] = None):
|
|
"""Markiert Operation als abgeschlossen"""
|
|
self.end_time = time.time()
|
|
self.duration = self.end_time - self.start_time
|
|
self.success = success
|
|
self.error_message = error_message
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Konvertiert zu Dictionary für Serialisierung"""
|
|
return {
|
|
'operation_name': self.operation_name,
|
|
'thread_id': self.thread_id,
|
|
'thread_name': self.thread_name,
|
|
'start_time': self.start_time,
|
|
'end_time': self.end_time,
|
|
'duration': self.duration,
|
|
'success': self.success,
|
|
'error_message': self.error_message,
|
|
'metadata': self.metadata,
|
|
'has_stack_trace': self.stack_trace is not None
|
|
}
|
|
|
|
|
|
class PerformanceMonitor:
|
|
"""
|
|
Performance-Monitor mit race condition detection
|
|
"""
|
|
|
|
def __init__(self, enabled: bool = None, max_history: int = 1000):
|
|
# Auto-detect based on debug settings oder environment
|
|
if enabled is None:
|
|
enabled = (
|
|
os.getenv('DEBUG_RACE_CONDITIONS', '').lower() in ['true', '1', 'yes'] or
|
|
os.getenv('PERFORMANCE_MONITORING', '').lower() in ['true', '1', 'yes']
|
|
)
|
|
|
|
self.enabled = enabled
|
|
self.max_history = max_history
|
|
|
|
# Monitoring data
|
|
self._operation_history: deque = deque(maxlen=max_history)
|
|
self._active_operations: Dict[str, OperationMetrics] = {}
|
|
self._operation_stats: Dict[str, Dict[str, Any]] = defaultdict(lambda: {
|
|
'total_calls': 0,
|
|
'successful_calls': 0,
|
|
'failed_calls': 0,
|
|
'total_duration': 0.0,
|
|
'min_duration': float('inf'),
|
|
'max_duration': 0.0,
|
|
'concurrent_executions': 0,
|
|
'max_concurrent': 0
|
|
})
|
|
|
|
# Thread safety
|
|
self._lock = threading.RLock()
|
|
|
|
# Race condition detection
|
|
self._potential_races: List[Dict[str, Any]] = []
|
|
self._long_operations: List[Dict[str, Any]] = []
|
|
|
|
# Thresholds
|
|
self.long_operation_threshold = 2.0 # seconds
|
|
self.race_detection_window = 0.1 # seconds
|
|
|
|
if self.enabled:
|
|
logger.info("Performance monitoring enabled")
|
|
|
|
def monitor_operation(self, operation_name: str, capture_stack: bool = False):
|
|
"""
|
|
Decorator für Operation-Monitoring
|
|
"""
|
|
def decorator(func: Callable) -> Callable:
|
|
if not self.enabled:
|
|
return func # No overhead when disabled
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
return self._execute_monitored(
|
|
operation_name or func.__name__,
|
|
func,
|
|
capture_stack,
|
|
*args,
|
|
**kwargs
|
|
)
|
|
|
|
wrapper.original = func
|
|
wrapper.is_monitored = True
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
def _execute_monitored(self, operation_name: str, func: Callable,
|
|
capture_stack: bool, *args, **kwargs) -> Any:
|
|
"""Führt eine überwachte Operation aus"""
|
|
if not self.enabled:
|
|
return func(*args, **kwargs)
|
|
|
|
thread_id = threading.current_thread().ident
|
|
thread_name = threading.current_thread().name
|
|
operation_key = f"{operation_name}_{thread_id}_{time.time()}"
|
|
|
|
# Metrics-Objekt erstellen
|
|
metrics = OperationMetrics(
|
|
operation_name=operation_name,
|
|
thread_id=thread_id,
|
|
thread_name=thread_name,
|
|
start_time=time.time(),
|
|
stack_trace=traceback.format_stack() if capture_stack else None
|
|
)
|
|
|
|
# Race condition detection
|
|
self._detect_potential_race(operation_name, metrics.start_time)
|
|
|
|
with self._lock:
|
|
# Concurrent execution tracking
|
|
concurrent_count = sum(
|
|
1 for op in self._active_operations.values()
|
|
if op.operation_name == operation_name
|
|
)
|
|
|
|
stats = self._operation_stats[operation_name]
|
|
stats['concurrent_executions'] = concurrent_count
|
|
stats['max_concurrent'] = max(stats['max_concurrent'], concurrent_count)
|
|
|
|
# Operation zu aktiven hinzufügen
|
|
self._active_operations[operation_key] = metrics
|
|
|
|
try:
|
|
# Operation ausführen
|
|
result = func(*args, **kwargs)
|
|
|
|
# Erfolg markieren
|
|
metrics.complete(success=True)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Fehler markieren
|
|
metrics.complete(success=False, error_message=str(e))
|
|
raise
|
|
|
|
finally:
|
|
# Cleanup und Statistik-Update
|
|
with self._lock:
|
|
self._active_operations.pop(operation_key, None)
|
|
self._update_statistics(metrics)
|
|
self._operation_history.append(metrics)
|
|
|
|
# Long operation detection
|
|
if metrics.duration and metrics.duration > self.long_operation_threshold:
|
|
self._record_long_operation(metrics)
|
|
|
|
def _detect_potential_race(self, operation_name: str, start_time: float):
|
|
"""Erkennt potentielle Race Conditions"""
|
|
if not self.enabled:
|
|
return
|
|
|
|
# Prüfe ob ähnliche Operationen zeitgleich laufen
|
|
concurrent_ops = []
|
|
with self._lock:
|
|
for op in self._active_operations.values():
|
|
if (op.operation_name == operation_name and
|
|
abs(op.start_time - start_time) < self.race_detection_window):
|
|
concurrent_ops.append(op)
|
|
|
|
if len(concurrent_ops) > 0:
|
|
race_info = {
|
|
'operation_name': operation_name,
|
|
'detected_at': start_time,
|
|
'concurrent_threads': [op.thread_id for op in concurrent_ops],
|
|
'time_window': self.race_detection_window,
|
|
'severity': 'high' if len(concurrent_ops) > 2 else 'medium'
|
|
}
|
|
|
|
self._potential_races.append(race_info)
|
|
|
|
logger.warning(f"Potential race condition detected: {operation_name} "
|
|
f"running on {len(concurrent_ops)} threads simultaneously")
|
|
|
|
def _record_long_operation(self, metrics: OperationMetrics):
|
|
"""Zeichnet lange Operationen auf"""
|
|
long_op_info = {
|
|
'operation_name': metrics.operation_name,
|
|
'duration': metrics.duration,
|
|
'thread_id': metrics.thread_id,
|
|
'start_time': metrics.start_time,
|
|
'success': metrics.success,
|
|
'metadata': metrics.metadata
|
|
}
|
|
|
|
self._long_operations.append(long_op_info)
|
|
|
|
logger.warning(f"Long operation detected: {metrics.operation_name} "
|
|
f"took {metrics.duration:.3f}s (threshold: {self.long_operation_threshold}s)")
|
|
|
|
def _update_statistics(self, metrics: OperationMetrics):
|
|
"""Aktualisiert Operation-Statistiken"""
|
|
stats = self._operation_stats[metrics.operation_name]
|
|
|
|
stats['total_calls'] += 1
|
|
if metrics.success:
|
|
stats['successful_calls'] += 1
|
|
else:
|
|
stats['failed_calls'] += 1
|
|
|
|
if metrics.duration:
|
|
stats['total_duration'] += metrics.duration
|
|
stats['min_duration'] = min(stats['min_duration'], metrics.duration)
|
|
stats['max_duration'] = max(stats['max_duration'], metrics.duration)
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Gibt vollständige Monitoring-Statistiken zurück"""
|
|
if not self.enabled:
|
|
return {'monitoring_enabled': False}
|
|
|
|
with self._lock:
|
|
# Statistiken aufbereiten
|
|
processed_stats = {}
|
|
for op_name, stats in self._operation_stats.items():
|
|
processed_stats[op_name] = {
|
|
**stats,
|
|
'average_duration': (
|
|
stats['total_duration'] / stats['total_calls']
|
|
if stats['total_calls'] > 0 else 0
|
|
),
|
|
'success_rate': (
|
|
stats['successful_calls'] / stats['total_calls']
|
|
if stats['total_calls'] > 0 else 0
|
|
),
|
|
'min_duration': stats['min_duration'] if stats['min_duration'] != float('inf') else 0
|
|
}
|
|
|
|
return {
|
|
'monitoring_enabled': True,
|
|
'operation_statistics': processed_stats,
|
|
'race_conditions': {
|
|
'detected_count': len(self._potential_races),
|
|
'recent_races': self._potential_races[-10:], # Last 10
|
|
},
|
|
'long_operations': {
|
|
'detected_count': len(self._long_operations),
|
|
'threshold': self.long_operation_threshold,
|
|
'recent_long_ops': self._long_operations[-10:], # Last 10
|
|
},
|
|
'active_operations': len(self._active_operations),
|
|
'history_size': len(self._operation_history),
|
|
'thresholds': {
|
|
'long_operation_threshold': self.long_operation_threshold,
|
|
'race_detection_window': self.race_detection_window
|
|
}
|
|
}
|
|
|
|
def get_race_condition_report(self) -> Dict[str, Any]:
|
|
"""Gibt detaillierten Race Condition Report zurück"""
|
|
if not self.enabled:
|
|
return {'monitoring_enabled': False}
|
|
|
|
with self._lock:
|
|
# Gruppiere Race Conditions nach Operation
|
|
races_by_operation = defaultdict(list)
|
|
for race in self._potential_races:
|
|
races_by_operation[race['operation_name']].append(race)
|
|
|
|
# Analysiere Patterns
|
|
analysis = {}
|
|
for op_name, races in races_by_operation.items():
|
|
high_severity = sum(1 for r in races if r['severity'] == 'high')
|
|
analysis[op_name] = {
|
|
'total_races': len(races),
|
|
'high_severity_races': high_severity,
|
|
'affected_threads': len(set(
|
|
thread_id for race in races
|
|
for thread_id in race['concurrent_threads']
|
|
)),
|
|
'first_detected': min(r['detected_at'] for r in races),
|
|
'last_detected': max(r['detected_at'] for r in races),
|
|
'recommendation': self._get_race_recommendation(op_name, races)
|
|
}
|
|
|
|
return {
|
|
'monitoring_enabled': True,
|
|
'total_race_conditions': len(self._potential_races),
|
|
'affected_operations': len(races_by_operation),
|
|
'analysis_by_operation': analysis,
|
|
'raw_detections': self._potential_races
|
|
}
|
|
|
|
def _get_race_recommendation(self, operation_name: str, races: List[Dict]) -> str:
|
|
"""Gibt Empfehlungen für Race Condition Behebung"""
|
|
race_count = len(races)
|
|
high_severity_count = sum(1 for r in races if r['severity'] == 'high')
|
|
|
|
if high_severity_count > 5:
|
|
return f"CRITICAL: {operation_name} has {high_severity_count} high-severity race conditions. Implement ThreadSafetyMixin immediately."
|
|
elif race_count > 10:
|
|
return f"HIGH: {operation_name} frequently encounters race conditions. Consider adding thread synchronization."
|
|
elif race_count > 3:
|
|
return f"MEDIUM: {operation_name} occasionally has race conditions. Monitor and consider thread safety measures."
|
|
else:
|
|
return f"LOW: {operation_name} has minimal race condition risk."
|
|
|
|
def export_report(self, filename: Optional[str] = None) -> str:
|
|
"""Exportiert vollständigen Report als JSON"""
|
|
if not self.enabled:
|
|
return "Monitoring not enabled"
|
|
|
|
if filename is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"performance_report_{timestamp}.json"
|
|
|
|
report = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'statistics': self.get_statistics(),
|
|
'race_condition_report': self.get_race_condition_report(),
|
|
'operation_history': [op.to_dict() for op in list(self._operation_history)[-100:]] # Last 100
|
|
}
|
|
|
|
try:
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"Performance report exported to: {filename}")
|
|
return filename
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to export performance report: {e}")
|
|
return f"Export failed: {e}"
|
|
|
|
def reset_statistics(self):
|
|
"""Setzt alle Statistiken zurück"""
|
|
with self._lock:
|
|
self._operation_history.clear()
|
|
self._operation_stats.clear()
|
|
self._potential_races.clear()
|
|
self._long_operations.clear()
|
|
# Aktive Operationen nicht löschen - könnten noch laufen
|
|
|
|
if self.enabled:
|
|
logger.info("Performance monitoring statistics reset")
|
|
|
|
|
|
# Global Monitor Instance
|
|
_global_monitor: Optional[PerformanceMonitor] = None
|
|
_monitor_init_lock = threading.RLock()
|
|
|
|
|
|
def get_performance_monitor() -> PerformanceMonitor:
|
|
"""Holt die globale Monitor-Instanz (Singleton)"""
|
|
global _global_monitor
|
|
|
|
if _global_monitor is None:
|
|
with _monitor_init_lock:
|
|
if _global_monitor is None:
|
|
_global_monitor = PerformanceMonitor()
|
|
|
|
return _global_monitor
|
|
|
|
|
|
# Convenience Decorators
|
|
def monitor_if_enabled(operation_name: str = None, capture_stack: bool = False):
|
|
"""Convenience decorator für conditional monitoring"""
|
|
monitor = get_performance_monitor()
|
|
return monitor.monitor_operation(operation_name, capture_stack)
|
|
|
|
|
|
def monitor_race_conditions(operation_name: str = None):
|
|
"""Speziell für Race Condition Detection"""
|
|
return monitor_if_enabled(operation_name, capture_stack=True)
|
|
|
|
|
|
def monitor_fingerprint_operations(operation_name: str = None):
|
|
"""Speziell für Fingerprint-Operationen"""
|
|
return monitor_if_enabled(f"fingerprint_{operation_name}", capture_stack=False)
|
|
|
|
|
|
def monitor_session_operations(operation_name: str = None):
|
|
"""Speziell für Session-Operationen"""
|
|
return monitor_if_enabled(f"session_{operation_name}", capture_stack=False) |