Files
AccountForger-neuerUpload/infrastructure/repositories/method_strategy_repository.py
Claude Project Manager 04585e95b6 Initial commit
2025-08-01 23:50:28 +02:00

282 Zeilen
11 KiB
Python

"""
SQLite implementation of method strategy repository.
Handles persistence and retrieval of method strategies with performance optimization.
"""
import json
import sqlite3
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from domain.entities.method_rotation import MethodStrategy, RiskLevel
from domain.repositories.method_rotation_repository import IMethodStrategyRepository
from database.db_manager import DatabaseManager
class MethodStrategyRepository(IMethodStrategyRepository):
"""SQLite implementation of method strategy repository"""
def __init__(self, db_manager):
self.db_manager = db_manager
def save(self, strategy: MethodStrategy) -> None:
"""Save or update a method strategy"""
strategy.updated_at = datetime.now()
query = """
INSERT OR REPLACE INTO method_strategies (
id, platform, method_name, priority, success_rate, failure_rate,
last_success, last_failure, cooldown_period, max_daily_attempts,
risk_level, is_active, configuration, tags, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
strategy.strategy_id,
strategy.platform,
strategy.method_name,
strategy.priority,
strategy.success_rate,
strategy.failure_rate,
strategy.last_success.isoformat() if strategy.last_success else None,
strategy.last_failure.isoformat() if strategy.last_failure else None,
strategy.cooldown_period,
strategy.max_daily_attempts,
strategy.risk_level.value,
strategy.is_active,
json.dumps(strategy.configuration),
json.dumps(strategy.tags),
strategy.created_at.isoformat(),
strategy.updated_at.isoformat()
)
self.db_manager.execute_query(query, params)
def find_by_id(self, strategy_id: str) -> Optional[MethodStrategy]:
"""Find a strategy by its ID"""
query = "SELECT * FROM method_strategies WHERE id = ?"
result = self.db_manager.fetch_one(query, (strategy_id,))
return self._row_to_strategy(result) if result else None
def find_by_platform(self, platform: str) -> List[MethodStrategy]:
"""Find all strategies for a platform"""
query = """
SELECT * FROM method_strategies
WHERE platform = ?
ORDER BY priority DESC, success_rate DESC
"""
results = self.db_manager.fetch_all(query, (platform,))
return [self._row_to_strategy(row) for row in results]
def find_active_by_platform(self, platform: str) -> List[MethodStrategy]:
"""Find all active strategies for a platform, ordered by effectiveness"""
query = """
SELECT * FROM method_strategies
WHERE platform = ? AND is_active = 1
ORDER BY priority DESC, success_rate DESC, last_success DESC
"""
results = self.db_manager.fetch_all(query, (platform,))
strategies = [self._row_to_strategy(row) for row in results]
# Sort by effectiveness score
strategies.sort(key=lambda s: s.effectiveness_score, reverse=True)
return strategies
def find_by_platform_and_method(self, platform: str, method_name: str) -> Optional[MethodStrategy]:
"""Find a specific method strategy"""
query = "SELECT * FROM method_strategies WHERE platform = ? AND method_name = ?"
result = self.db_manager.fetch_one(query, (platform, method_name))
return self._row_to_strategy(result) if result else None
def update_performance_metrics(self, strategy_id: str, success: bool,
execution_time: float = 0.0) -> None:
"""Update performance metrics for a strategy"""
strategy = self.find_by_id(strategy_id)
if not strategy:
return
strategy.update_performance(success, execution_time)
self.save(strategy)
def get_next_available_method(self, platform: str,
excluded_methods: List[str] = None,
max_risk_level: str = "HIGH") -> Optional[MethodStrategy]:
"""Get the next best available method for a platform"""
if excluded_methods is None:
excluded_methods = []
# Build query with exclusions
placeholders = ','.join(['?' for _ in excluded_methods])
exclusion_clause = f"AND method_name NOT IN ({placeholders})" if excluded_methods else ""
# Build risk level clause
risk_clause = "'LOW', 'MEDIUM'"
if max_risk_level == 'HIGH':
risk_clause += ", 'HIGH'"
query = f"""
SELECT * FROM method_strategies
WHERE platform = ?
AND is_active = 1
AND risk_level IN ({risk_clause})
{exclusion_clause}
ORDER BY priority DESC, success_rate DESC
LIMIT 1
"""
params = [platform] + excluded_methods
result = self.db_manager.fetch_one(query, params)
if not result:
return None
strategy = self._row_to_strategy(result)
# Check if method is on cooldown
if strategy.is_on_cooldown:
# Try to find another method
excluded_methods.append(strategy.method_name)
return self.get_next_available_method(platform, excluded_methods, max_risk_level)
return strategy
def disable_method(self, platform: str, method_name: str, reason: str) -> None:
"""Disable a method temporarily or permanently"""
query = """
UPDATE method_strategies
SET is_active = 0, updated_at = ?
WHERE platform = ? AND method_name = ?
"""
self.db_manager.execute_query(query, (datetime.now().isoformat(), platform, method_name))
# Log the reason in configuration
strategy = self.find_by_platform_and_method(platform, method_name)
if strategy:
strategy.configuration['disabled_reason'] = reason
strategy.configuration['disabled_at'] = datetime.now().isoformat()
self.save(strategy)
def enable_method(self, platform: str, method_name: str) -> None:
"""Re-enable a disabled method"""
query = """
UPDATE method_strategies
SET is_active = 1, updated_at = ?
WHERE platform = ? AND method_name = ?
"""
self.db_manager.execute_query(query, (datetime.now().isoformat(), platform, method_name))
# Clear disabled reason from configuration
strategy = self.find_by_platform_and_method(platform, method_name)
if strategy:
strategy.configuration.pop('disabled_reason', None)
strategy.configuration.pop('disabled_at', None)
self.save(strategy)
def get_platform_statistics(self, platform: str) -> Dict[str, Any]:
"""Get aggregated statistics for all methods on a platform"""
query = """
SELECT
COUNT(*) as total_methods,
COUNT(CASE WHEN is_active = 1 THEN 1 END) as active_methods,
AVG(success_rate) as avg_success_rate,
MAX(success_rate) as best_success_rate,
MIN(success_rate) as worst_success_rate,
AVG(priority) as avg_priority,
COUNT(CASE WHEN last_success > datetime('now', '-24 hours') THEN 1 END) as recent_successes,
COUNT(CASE WHEN last_failure > datetime('now', '-24 hours') THEN 1 END) as recent_failures
FROM method_strategies
WHERE platform = ?
"""
result = self.db_manager.fetch_one(query, (platform,))
if not result:
return {}
return {
'total_methods': result[0] or 0,
'active_methods': result[1] or 0,
'avg_success_rate': round(result[2] or 0.0, 3),
'best_success_rate': result[3] or 0.0,
'worst_success_rate': result[4] or 0.0,
'avg_priority': round(result[5] or 0.0, 1),
'recent_successes_24h': result[6] or 0,
'recent_failures_24h': result[7] or 0
}
def cleanup_old_data(self, days_to_keep: int = 90) -> int:
"""Clean up old performance data and return number of records removed"""
# This implementation doesn't remove strategies but resets old performance data
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
query = """
UPDATE method_strategies
SET last_success = NULL, last_failure = NULL, success_rate = 0.0, failure_rate = 0.0
WHERE (last_success < ? OR last_failure < ?)
AND (last_success IS NOT NULL OR last_failure IS NOT NULL)
"""
cursor = self.db_manager.execute_query(query, (cutoff_date.isoformat(), cutoff_date.isoformat()))
return cursor.rowcount if cursor else 0
def get_methods_by_risk_level(self, platform: str, risk_level: RiskLevel) -> List[MethodStrategy]:
"""Get methods filtered by risk level"""
query = """
SELECT * FROM method_strategies
WHERE platform = ? AND risk_level = ? AND is_active = 1
ORDER BY priority DESC, success_rate DESC
"""
results = self.db_manager.fetch_all(query, (platform, risk_level.value))
return [self._row_to_strategy(row) for row in results]
def get_emergency_methods(self, platform: str) -> List[MethodStrategy]:
"""Get only the most reliable methods for emergency mode"""
query = """
SELECT * FROM method_strategies
WHERE platform = ?
AND is_active = 1
AND risk_level = 'LOW'
AND success_rate > 0.5
ORDER BY success_rate DESC, priority DESC
LIMIT 2
"""
results = self.db_manager.fetch_all(query, (platform,))
return [self._row_to_strategy(row) for row in results]
def bulk_update_priorities(self, platform: str, priority_updates: Dict[str, int]) -> None:
"""Bulk update method priorities for a platform"""
query = """
UPDATE method_strategies
SET priority = ?, updated_at = ?
WHERE platform = ? AND method_name = ?
"""
params_list = [
(priority, datetime.now().isoformat(), platform, method_name)
for method_name, priority in priority_updates.items()
]
with self.db_manager.get_connection() as conn:
conn.executemany(query, params_list)
conn.commit()
def _row_to_strategy(self, row) -> MethodStrategy:
"""Convert database row to MethodStrategy entity"""
return MethodStrategy(
strategy_id=row[0],
platform=row[1],
method_name=row[2],
priority=row[3],
success_rate=row[4],
failure_rate=row[5],
last_success=datetime.fromisoformat(row[6]) if row[6] else None,
last_failure=datetime.fromisoformat(row[7]) if row[7] else None,
cooldown_period=row[8],
max_daily_attempts=row[9],
risk_level=RiskLevel(row[10]),
is_active=bool(row[11]),
configuration=json.loads(row[12]) if row[12] else {},
tags=json.loads(row[13]) if row[13] else [],
created_at=datetime.fromisoformat(row[14]),
updated_at=datetime.fromisoformat(row[15])
)