254 Zeilen
10 KiB
Python
254 Zeilen
10 KiB
Python
"""
|
|
SQLite implementation of rotation session repository.
|
|
Handles persistence and retrieval of rotation sessions.
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from domain.entities.method_rotation import RotationSession
|
|
from domain.repositories.method_rotation_repository import IRotationSessionRepository
|
|
from database.db_manager import DatabaseManager
|
|
|
|
|
|
class RotationSessionRepository(IRotationSessionRepository):
|
|
"""SQLite implementation of rotation session repository"""
|
|
|
|
def __init__(self, db_manager):
|
|
self.db_manager = db_manager
|
|
|
|
def save(self, session: RotationSession) -> None:
|
|
"""Save or update a rotation session"""
|
|
query = """
|
|
INSERT OR REPLACE INTO rotation_sessions (
|
|
id, platform, account_id, current_method, attempted_methods,
|
|
session_start, last_rotation, rotation_count, success_count,
|
|
failure_count, is_active, rotation_reason, fingerprint_id, session_metadata
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params = (
|
|
session.session_id,
|
|
session.platform,
|
|
session.account_id,
|
|
session.current_method,
|
|
json.dumps(session.attempted_methods),
|
|
session.session_start.isoformat(),
|
|
session.last_rotation.isoformat() if session.last_rotation else None,
|
|
session.rotation_count,
|
|
session.success_count,
|
|
session.failure_count,
|
|
session.is_active,
|
|
session.rotation_reason,
|
|
session.fingerprint_id,
|
|
json.dumps(session.session_metadata)
|
|
)
|
|
|
|
self.db_manager.execute_query(query, params)
|
|
|
|
def find_by_id(self, session_id: str) -> Optional[RotationSession]:
|
|
"""Find a session by its ID"""
|
|
query = "SELECT * FROM rotation_sessions WHERE id = ?"
|
|
result = self.db_manager.fetch_one(query, (session_id,))
|
|
return self._row_to_session(result) if result else None
|
|
|
|
def find_active_session(self, platform: str, account_id: Optional[str] = None) -> Optional[RotationSession]:
|
|
"""Find an active session for a platform/account"""
|
|
if account_id:
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE platform = ? AND account_id = ? AND is_active = 1
|
|
ORDER BY session_start DESC
|
|
LIMIT 1
|
|
"""
|
|
params = (platform, account_id)
|
|
else:
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE platform = ? AND is_active = 1
|
|
ORDER BY session_start DESC
|
|
LIMIT 1
|
|
"""
|
|
params = (platform,)
|
|
|
|
result = self.db_manager.fetch_one(query, params)
|
|
return self._row_to_session(result) if result else None
|
|
|
|
def find_active_sessions_by_platform(self, platform: str) -> List[RotationSession]:
|
|
"""Find all active sessions for a platform"""
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE platform = ? AND is_active = 1
|
|
ORDER BY session_start DESC
|
|
"""
|
|
results = self.db_manager.fetch_all(query, (platform,))
|
|
return [self._row_to_session(row) for row in results]
|
|
|
|
def update_session_metrics(self, session_id: str, success: bool,
|
|
method_name: str, error_message: Optional[str] = None) -> None:
|
|
"""Update session metrics after a method attempt"""
|
|
session = self.find_by_id(session_id)
|
|
if not session:
|
|
return
|
|
|
|
session.add_attempt(method_name, success, error_message)
|
|
self.save(session)
|
|
|
|
def archive_session(self, session_id: str, final_success: bool = False) -> None:
|
|
"""Mark a session as completed/archived"""
|
|
session = self.find_by_id(session_id)
|
|
if not session:
|
|
return
|
|
|
|
session.complete_session(final_success)
|
|
self.save(session)
|
|
|
|
def get_session_history(self, platform: str, limit: int = 100) -> List[RotationSession]:
|
|
"""Get recent session history for a platform"""
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE platform = ?
|
|
ORDER BY session_start DESC
|
|
LIMIT ?
|
|
"""
|
|
results = self.db_manager.fetch_all(query, (platform, limit))
|
|
return [self._row_to_session(row) for row in results]
|
|
|
|
def get_session_statistics(self, platform: str, days: int = 30) -> Dict[str, Any]:
|
|
"""Get session statistics for a platform over specified days"""
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
|
|
query = """
|
|
SELECT
|
|
COUNT(*) as total_sessions,
|
|
COUNT(CASE WHEN is_active = 1 THEN 1 END) as active_sessions,
|
|
COUNT(CASE WHEN is_active = 0 AND JSON_EXTRACT(session_metadata, '$.final_success') = 1 THEN 1 END) as successful_sessions,
|
|
COUNT(CASE WHEN is_active = 0 AND JSON_EXTRACT(session_metadata, '$.final_success') = 0 THEN 1 END) as failed_sessions,
|
|
AVG(rotation_count) as avg_rotations,
|
|
MAX(rotation_count) as max_rotations,
|
|
AVG(success_count + failure_count) as avg_attempts,
|
|
AVG(CASE WHEN success_count + failure_count > 0 THEN success_count * 1.0 / (success_count + failure_count) ELSE 0 END) as avg_success_rate
|
|
FROM rotation_sessions
|
|
WHERE platform = ? AND session_start >= ?
|
|
"""
|
|
|
|
result = self.db_manager.fetch_one(query, (platform, cutoff_date.isoformat()))
|
|
|
|
if not result:
|
|
return {}
|
|
|
|
return {
|
|
'total_sessions': result[0] or 0,
|
|
'active_sessions': result[1] or 0,
|
|
'successful_sessions': result[2] or 0,
|
|
'failed_sessions': result[3] or 0,
|
|
'avg_rotations_per_session': round(result[4] or 0.0, 2),
|
|
'max_rotations_in_session': result[5] or 0,
|
|
'avg_attempts_per_session': round(result[6] or 0.0, 2),
|
|
'avg_session_success_rate': round(result[7] or 0.0, 3)
|
|
}
|
|
|
|
def cleanup_old_sessions(self, days_to_keep: int = 30) -> int:
|
|
"""Clean up old session data and return number of records removed"""
|
|
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
|
|
|
|
query = """
|
|
DELETE FROM rotation_sessions
|
|
WHERE is_active = 0 AND session_start < ?
|
|
"""
|
|
|
|
cursor = self.db_manager.execute_query(query, (cutoff_date.isoformat(),))
|
|
return cursor.rowcount if cursor else 0
|
|
|
|
def get_method_usage_statistics(self, platform: str, days: int = 30) -> Dict[str, Any]:
|
|
"""Get method usage statistics from sessions"""
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
|
|
query = """
|
|
SELECT
|
|
current_method,
|
|
COUNT(*) as usage_count,
|
|
AVG(success_count) as avg_success_count,
|
|
AVG(failure_count) as avg_failure_count,
|
|
AVG(rotation_count) as avg_rotation_count
|
|
FROM rotation_sessions
|
|
WHERE platform = ? AND session_start >= ?
|
|
GROUP BY current_method
|
|
ORDER BY usage_count DESC
|
|
"""
|
|
|
|
results = self.db_manager.fetch_all(query, (platform, cutoff_date.isoformat()))
|
|
|
|
method_stats = {}
|
|
for row in results:
|
|
method_stats[row[0]] = {
|
|
'usage_count': row[1],
|
|
'avg_success_count': round(row[2] or 0.0, 2),
|
|
'avg_failure_count': round(row[3] or 0.0, 2),
|
|
'avg_rotation_count': round(row[4] or 0.0, 2)
|
|
}
|
|
|
|
return method_stats
|
|
|
|
def find_sessions_by_fingerprint(self, fingerprint_id: str) -> List[RotationSession]:
|
|
"""Find sessions associated with a specific fingerprint"""
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE fingerprint_id = ?
|
|
ORDER BY session_start DESC
|
|
"""
|
|
results = self.db_manager.fetch_all(query, (fingerprint_id,))
|
|
return [self._row_to_session(row) for row in results]
|
|
|
|
def get_long_running_sessions(self, hours: int = 24) -> List[RotationSession]:
|
|
"""Find sessions that have been running for too long"""
|
|
cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
|
|
query = """
|
|
SELECT * FROM rotation_sessions
|
|
WHERE is_active = 1 AND session_start < ?
|
|
ORDER BY session_start ASC
|
|
"""
|
|
|
|
results = self.db_manager.fetch_all(query, (cutoff_time.isoformat(),))
|
|
return [self._row_to_session(row) for row in results]
|
|
|
|
def force_archive_stale_sessions(self, hours: int = 24) -> int:
|
|
"""Force archive sessions that have been running too long"""
|
|
cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
|
|
query = """
|
|
UPDATE rotation_sessions
|
|
SET is_active = 0,
|
|
session_metadata = JSON_SET(
|
|
session_metadata,
|
|
'$.completed_at', ?,
|
|
'$.final_success', 0,
|
|
'$.force_archived', 1
|
|
)
|
|
WHERE is_active = 1 AND session_start < ?
|
|
"""
|
|
|
|
cursor = self.db_manager.execute_query(query, (datetime.now().isoformat(), cutoff_time.isoformat()))
|
|
return cursor.rowcount if cursor else 0
|
|
|
|
def _row_to_session(self, row) -> RotationSession:
|
|
"""Convert database row to RotationSession entity"""
|
|
return RotationSession(
|
|
session_id=row[0],
|
|
platform=row[1],
|
|
account_id=row[2],
|
|
current_method=row[3],
|
|
attempted_methods=json.loads(row[4]) if row[4] else [],
|
|
session_start=datetime.fromisoformat(row[5]),
|
|
last_rotation=datetime.fromisoformat(row[6]) if row[6] else None,
|
|
rotation_count=row[7],
|
|
success_count=row[8],
|
|
failure_count=row[9],
|
|
is_active=bool(row[10]),
|
|
rotation_reason=row[11],
|
|
fingerprint_id=row[12],
|
|
session_metadata=json.loads(row[13]) if row[13] else {}
|
|
) |