Files
AccountForger-neuerUpload/application/use_cases/analyze_failure_rate_use_case.py
Claude Project Manager 04585e95b6 Initial commit
2025-08-01 23:50:28 +02:00

352 Zeilen
14 KiB
Python

"""
Analyze Failure Rate Use Case - Analysiert Fehlerquoten und Muster
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from domain.services.analytics_service import IAnalyticsService
from domain.value_objects.error_summary import ErrorSummary
from domain.entities.error_event import ErrorType
logger = logging.getLogger("analyze_failure_rate_use_case")
class AnalyzeFailureRateUseCase:
"""
Use Case für Fehleranalyse.
Implementiert zeitbasierte Fehleranalyse, Fehler-Clustering,
Trend-Erkennung und Empfehlungen für Verbesserungen.
"""
def __init__(self, analytics_service: IAnalyticsService):
self.analytics_service = analytics_service
self.critical_error_types = [
ErrorType.RATE_LIMIT,
ErrorType.CAPTCHA,
ErrorType.AUTHENTICATION
]
self.error_thresholds = {
'critical': 0.5, # 50% Fehlerrate ist kritisch
'warning': 0.3, # 30% Fehlerrate ist Warnung
'acceptable': 0.1 # 10% Fehlerrate ist akzeptabel
}
def execute(self,
platform: Optional[str] = None,
timeframe: timedelta = timedelta(hours=24)) -> Dict[str, Any]:
"""
Analysiert Fehlerquoten und Muster.
Args:
platform: Spezifische Platform oder None für alle
timeframe: Zeitrahmen für Analyse
Returns:
Analyse-Ergebnis mit Metriken und Empfehlungen
"""
# Hole Basis-Metriken
success_rate = self.analytics_service.get_success_rate(timeframe, platform)
failure_rate = 1.0 - success_rate
# Hole häufigste Fehler
common_errors = self.analytics_service.get_common_errors(20, timeframe)
# Analysiere Fehler-Muster
patterns = self.analytics_service.analyze_failure_patterns(timeframe)
# Erstelle Analyse
analysis = {
'timeframe': str(timeframe),
'platform': platform or 'all',
'metrics': {
'overall_failure_rate': failure_rate,
'overall_success_rate': success_rate,
'severity': self._calculate_severity(failure_rate)
},
'error_breakdown': self._analyze_error_types(common_errors),
'temporal_patterns': self._analyze_temporal_patterns(patterns),
'error_clusters': self._cluster_errors(common_errors),
'critical_issues': self._identify_critical_issues(common_errors, failure_rate),
'recommendations': self._generate_recommendations(
failure_rate, common_errors, patterns
)
}
# Logge wichtige Erkenntnisse
self._log_insights(analysis)
return analysis
def _calculate_severity(self, failure_rate: float) -> str:
"""Berechnet Schweregrad basierend auf Fehlerrate"""
if failure_rate >= self.error_thresholds['critical']:
return 'critical'
elif failure_rate >= self.error_thresholds['warning']:
return 'warning'
elif failure_rate >= self.error_thresholds['acceptable']:
return 'moderate'
else:
return 'low'
def _analyze_error_types(self, errors: List[ErrorSummary]) -> List[Dict[str, Any]]:
"""Analysiert Fehlertypen im Detail"""
breakdown = []
for error in errors[:10]: # Top 10 Fehler
analysis = {
'error_type': error.error_type,
'count': error.error_count,
'frequency_per_hour': error.frequency,
'recovery_rate': error.recovery_success_rate,
'severity_score': error.severity_score,
'impact': {
'user_impact': error.total_user_impact,
'system_impact': error.total_system_impact,
'data_loss': error.data_loss_incidents
},
'common_contexts': {
'urls': error.most_common_urls[:3],
'actions': error.most_common_actions[:3],
'steps': error.most_common_steps[:3]
},
'trend': self._calculate_error_trend(error)
}
breakdown.append(analysis)
return breakdown
def _calculate_error_trend(self, error: ErrorSummary) -> str:
"""Berechnet Trend für einen Fehlertyp"""
# Vereinfacht: Basierend auf Frequenz
if error.frequency > 10:
return 'increasing'
elif error.frequency > 5:
return 'stable'
else:
return 'decreasing'
def _analyze_temporal_patterns(self, patterns: Dict[str, Any]) -> Dict[str, Any]:
"""Analysiert zeitliche Muster in Fehlern"""
temporal = {
'peak_error_hours': [],
'low_error_hours': [],
'daily_pattern': 'unknown',
'weekly_pattern': 'unknown'
}
# TODO: Implementiere mit echten Timeline-Daten
# Beispiel-Implementation
if patterns:
# Finde Peak-Zeiten
if 'hourly_distribution' in patterns:
hourly = patterns['hourly_distribution']
sorted_hours = sorted(hourly.items(),
key=lambda x: x[1],
reverse=True)
temporal['peak_error_hours'] = [h[0] for h in sorted_hours[:3]]
temporal['low_error_hours'] = [h[0] for h in sorted_hours[-3:]]
return temporal
def _cluster_errors(self, errors: List[ErrorSummary]) -> List[Dict[str, Any]]:
"""Clustert ähnliche Fehler"""
clusters = []
# Cluster nach Error Type
type_clusters = defaultdict(list)
for error in errors:
# Extrahiere Basis-Typ aus error_type
base_type = error.error_type.split('_')[0] if '_' in error.error_type else error.error_type
type_clusters[base_type].append(error)
# Erstelle Cluster-Analyse
for cluster_name, cluster_errors in type_clusters.items():
if len(cluster_errors) > 1:
total_count = sum(e.error_count for e in cluster_errors)
avg_recovery = sum(e.recovery_success_rate for e in cluster_errors) / len(cluster_errors)
clusters.append({
'cluster_name': cluster_name,
'error_count': len(cluster_errors),
'total_occurrences': total_count,
'avg_recovery_rate': avg_recovery,
'members': [e.error_type for e in cluster_errors]
})
return sorted(clusters, key=lambda x: x['total_occurrences'], reverse=True)
def _identify_critical_issues(self,
errors: List[ErrorSummary],
overall_failure_rate: float) -> List[Dict[str, Any]]:
"""Identifiziert kritische Issues"""
critical_issues = []
# Hohe Gesamt-Fehlerrate
if overall_failure_rate >= self.error_thresholds['critical']:
critical_issues.append({
'issue': 'high_overall_failure_rate',
'severity': 'critical',
'description': f'Fehlerrate von {overall_failure_rate:.1%} überschreitet kritischen Schwellenwert',
'recommendation': 'Sofortige Untersuchung und Maßnahmen erforderlich'
})
# Kritische Fehlertypen
for error in errors:
error_type = ErrorType.UNKNOWN
try:
error_type = ErrorType(error.error_type)
except:
pass
if error_type in self.critical_error_types:
if error.frequency > 5: # Mehr als 5 pro Stunde
critical_issues.append({
'issue': f'high_frequency_{error.error_type}',
'severity': 'critical',
'description': f'{error.error_type} tritt {error.frequency:.1f} mal pro Stunde auf',
'recommendation': self._get_error_specific_recommendation(error_type)
})
# Niedrige Recovery-Rate
low_recovery = [e for e in errors if e.recovery_success_rate < 0.2]
if low_recovery:
critical_issues.append({
'issue': 'low_recovery_rate',
'severity': 'warning',
'description': f'{len(low_recovery)} Fehlertypen haben Recovery-Rate < 20%',
'recommendation': 'Recovery-Strategien überprüfen und verbessern'
})
return critical_issues
def _get_error_specific_recommendation(self, error_type: ErrorType) -> str:
"""Gibt spezifische Empfehlung für Fehlertyp"""
recommendations = {
ErrorType.RATE_LIMIT: 'Rate Limiting Parameter erhöhen und Delays anpassen',
ErrorType.CAPTCHA: 'CAPTCHA-Solving-Service prüfen oder manuelle Intervention',
ErrorType.AUTHENTICATION: 'Credentials und Session-Management überprüfen',
ErrorType.NETWORK: 'Netzwerk-Stabilität und Proxy-Konfiguration prüfen',
ErrorType.TIMEOUT: 'Timeouts erhöhen und Performance optimieren'
}
return recommendations.get(error_type, 'Detaillierte Fehleranalyse durchführen')
def _generate_recommendations(self,
failure_rate: float,
errors: List[ErrorSummary],
patterns: Dict[str, Any]) -> List[str]:
"""Generiert konkrete Handlungsempfehlungen"""
recommendations = []
# Basis-Empfehlungen nach Fehlerrate
severity = self._calculate_severity(failure_rate)
if severity == 'critical':
recommendations.append(
"🚨 KRITISCH: Sofortige Intervention erforderlich - "
"Pausieren Sie neue Account-Erstellungen bis Issues gelöst sind"
)
elif severity == 'warning':
recommendations.append(
"⚠️ WARNUNG: Erhöhte Fehlerrate - "
"Reduzieren Sie Geschwindigkeit und überwachen Sie genau"
)
# Spezifische Empfehlungen basierend auf Top-Fehlern
if errors:
top_error = errors[0]
if top_error.error_type == ErrorType.RATE_LIMIT.value:
recommendations.append(
"📊 Rate Limiting ist Hauptproblem - "
"Erhöhen Sie Delays zwischen Aktionen um 50%"
)
elif top_error.error_type == ErrorType.CAPTCHA.value:
recommendations.append(
"🔐 CAPTCHA-Challenges häufig - "
"Prüfen Sie Fingerprinting und Session-Qualität"
)
# Zeitbasierte Empfehlungen
if patterns and 'peak_hours' in patterns:
recommendations.append(
f"⏰ Vermeiden Sie Aktivität während Peak-Zeiten: "
f"{', '.join(patterns['peak_hours'])}"
)
# Recovery-basierte Empfehlungen
low_recovery = [e for e in errors if e.recovery_success_rate < 0.3]
if len(low_recovery) > 3:
recommendations.append(
"🔄 Viele Fehler ohne erfolgreiche Recovery - "
"Implementieren Sie bessere Retry-Strategien"
)
# Platform-spezifische Empfehlungen
platform_errors = defaultdict(int)
for error in errors:
for url in error.most_common_urls:
if 'instagram' in url.lower():
platform_errors['instagram'] += error.error_count
elif 'tiktok' in url.lower():
platform_errors['tiktok'] += error.error_count
if platform_errors:
worst_platform = max(platform_errors.items(), key=lambda x: x[1])
recommendations.append(
f"📱 {worst_platform[0].title()} hat die meisten Fehler - "
f"Fokussieren Sie Optimierungen auf diese Plattform"
)
return recommendations
def _log_insights(self, analysis: Dict[str, Any]) -> None:
"""Loggt wichtige Erkenntnisse"""
severity = analysis['metrics']['severity']
failure_rate = analysis['metrics']['overall_failure_rate']
log_message = f"Failure analysis completed: {failure_rate:.1%} failure rate ({severity})"
if analysis['critical_issues']:
log_message += f", {len(analysis['critical_issues'])} critical issues found"
if severity in ['critical', 'warning']:
logger.warning(log_message)
else:
logger.info(log_message)
# Logge Top-Empfehlungen
if analysis['recommendations']:
logger.info(f"Top recommendation: {analysis['recommendations'][0]}")
def compare_platforms(self,
timeframe: timedelta = timedelta(days=7)) -> Dict[str, Any]:
"""Vergleicht Fehlerraten zwischen Plattformen"""
comparison = self.analytics_service.get_platform_comparison(timeframe)
# Erweitere mit Fehler-spezifischen Metriken
for platform, stats in comparison.items():
if isinstance(stats, dict):
# Berechne Fehler-Schwerpunkte
platform_errors = self.analytics_service.get_common_errors(10, timeframe)
# Filter für Platform
# TODO: Implementiere Platform-Filter in Error Summary
stats['primary_error_types'] = []
stats['improvement_potential'] = self._calculate_improvement_potential(stats)
return comparison
def _calculate_improvement_potential(self, stats: Dict[str, Any]) -> str:
"""Berechnet Verbesserungspotential"""
success_rate = stats.get('success_rate', 0)
if success_rate < 0.5:
return 'high'
elif success_rate < 0.7:
return 'medium'
elif success_rate < 0.9:
return 'low'
else:
return 'minimal'