Initial commit
Dieser Commit ist enthalten in:
326
services/activity_sync.py
Normale Datei
326
services/activity_sync.py
Normale Datei
@ -0,0 +1,326 @@
|
||||
"""
|
||||
Activity Sync Service for CPM
|
||||
Handles WebSocket connection to the Activity Server
|
||||
"""
|
||||
|
||||
import socketio
|
||||
import requests
|
||||
import threading
|
||||
import json
|
||||
from typing import Optional, Callable, List, Dict
|
||||
from pathlib import Path
|
||||
from utils.logger import logger
|
||||
|
||||
|
||||
class ActivitySyncService:
|
||||
def __init__(self, server_url: str = None, api_key: str = None, user_id: str = None, user_name: str = None):
|
||||
self.server_url = server_url or "http://localhost:3001"
|
||||
self.api_key = api_key
|
||||
self.user_id = user_id
|
||||
self.user_name = user_name
|
||||
self.sio = None
|
||||
self.connected = False
|
||||
self.on_activities_update = None
|
||||
self.activities = []
|
||||
self.active_projects = [] # Changed from single current_activity to list
|
||||
|
||||
# Load settings
|
||||
self.load_settings()
|
||||
|
||||
def load_settings(self):
|
||||
"""Load activity sync settings from file"""
|
||||
settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json"
|
||||
try:
|
||||
if settings_file.exists():
|
||||
with open(settings_file, 'r') as f:
|
||||
settings = json.load(f)
|
||||
self.server_url = settings.get("server_url", self.server_url)
|
||||
self.api_key = settings.get("api_key", self.api_key)
|
||||
self.user_id = settings.get("user_id", self.user_id)
|
||||
self.user_name = settings.get("user_name", self.user_name)
|
||||
logger.info(f"Loaded activity sync settings from {settings_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load activity settings: {e}")
|
||||
|
||||
def save_settings(self):
|
||||
"""Save activity sync settings to file"""
|
||||
settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json"
|
||||
try:
|
||||
settings_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
settings = {
|
||||
"server_url": self.server_url,
|
||||
"api_key": self.api_key,
|
||||
"user_id": self.user_id,
|
||||
"user_name": self.user_name
|
||||
}
|
||||
with open(settings_file, 'w') as f:
|
||||
json.dump(settings, f, indent=2)
|
||||
logger.info(f"Saved activity sync settings to {settings_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save activity settings: {e}")
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the activity server"""
|
||||
if not all([self.server_url, self.api_key, self.user_id, self.user_name]):
|
||||
logger.warning("Activity sync not configured properly")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Create Socket.IO client
|
||||
self.sio = socketio.Client()
|
||||
|
||||
# Register event handlers
|
||||
@self.sio.event
|
||||
def connect():
|
||||
logger.info("Connected to activity server")
|
||||
self.connected = True
|
||||
# Fetch initial activities after connection
|
||||
self._fetch_initial_activities()
|
||||
|
||||
@self.sio.event
|
||||
def disconnect():
|
||||
logger.info("Disconnected from activity server")
|
||||
self.connected = False
|
||||
|
||||
@self.sio.event
|
||||
def activities_update(data):
|
||||
"""Handle activities update from server"""
|
||||
logger.debug(f"Received raw activities update: {len(data)} items")
|
||||
|
||||
# Filter out inactive entries and ensure we only keep active ones
|
||||
active_activities = [
|
||||
activity for activity in data
|
||||
if activity.get('isActive', False)
|
||||
]
|
||||
|
||||
# Log details about the activities
|
||||
for activity in active_activities:
|
||||
logger.debug(f"Active: {activity.get('userName')} on {activity.get('projectName')}")
|
||||
|
||||
self.activities = active_activities
|
||||
logger.info(f"Activities update: {len(active_activities)} active (of {len(data)} total)")
|
||||
|
||||
if self.on_activities_update:
|
||||
self.on_activities_update(active_activities)
|
||||
|
||||
@self.sio.event
|
||||
def connect_error(data):
|
||||
logger.error(f"Connection error: {data}")
|
||||
self.connected = False
|
||||
|
||||
# Connect with authentication
|
||||
logger.info(f"Attempting to connect to activity server: {self.server_url}")
|
||||
|
||||
# Try to force polling transport if WebSockets are blocked
|
||||
import os
|
||||
os.environ['SOCKETIO_TRANSPORT'] = 'polling'
|
||||
|
||||
self.sio.connect(
|
||||
self.server_url,
|
||||
auth={
|
||||
'token': self.api_key,
|
||||
'userId': self.user_id,
|
||||
'userName': self.user_name
|
||||
},
|
||||
wait_timeout=10,
|
||||
transports=['polling', 'websocket'] # Try polling first, then websocket
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to activity server: {e}")
|
||||
self.connected = False
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from the activity server"""
|
||||
if self.sio:
|
||||
try:
|
||||
self.sio.disconnect()
|
||||
self.sio = None
|
||||
self.connected = False
|
||||
logger.info("Disconnected from activity server")
|
||||
except Exception as e:
|
||||
logger.error(f"Error disconnecting: {e}")
|
||||
|
||||
def start_activity(self, project_name: str, project_path: str, description: str = ""):
|
||||
"""Start a new activity (adds to active projects list)"""
|
||||
logger.debug(f"start_activity called for: {project_name}")
|
||||
|
||||
if not self.connected or not self.sio:
|
||||
logger.warning("Not connected to activity server")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Check if project is already active
|
||||
existing = next((p for p in self.active_projects if p['projectName'] == project_name), None)
|
||||
if existing:
|
||||
logger.info(f"Project {project_name} is already active")
|
||||
return True
|
||||
|
||||
# Add to active projects list
|
||||
new_activity = {
|
||||
'projectName': project_name,
|
||||
'projectPath': project_path,
|
||||
'userId': self.user_id,
|
||||
'userName': self.user_name,
|
||||
'isActive': True
|
||||
}
|
||||
self.active_projects.append(new_activity)
|
||||
logger.debug(f"Added to active_projects: {new_activity}")
|
||||
|
||||
# Emit to server
|
||||
self.sio.emit('activity-start', {
|
||||
'projectName': project_name,
|
||||
'projectPath': project_path,
|
||||
'description': description
|
||||
})
|
||||
|
||||
logger.info(f"Started activity for project: {project_name}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start activity: {e}")
|
||||
# Remove from list on failure
|
||||
self.active_projects = [p for p in self.active_projects if p['projectName'] != project_name]
|
||||
return False
|
||||
|
||||
def stop_activity(self, project_name: str = None):
|
||||
"""Stop activity for a specific project or all activities if no project specified"""
|
||||
logger.debug(f"stop_activity called for project: {project_name}")
|
||||
|
||||
if not self.connected or not self.sio:
|
||||
logger.warning("Not connected to activity server")
|
||||
return False
|
||||
|
||||
try:
|
||||
if project_name:
|
||||
# Stop specific project
|
||||
activity = next((p for p in self.active_projects if p['projectName'] == project_name), None)
|
||||
if not activity:
|
||||
logger.warning(f"Project {project_name} is not in active projects")
|
||||
return False
|
||||
|
||||
# Remove from active projects
|
||||
self.active_projects = [p for p in self.active_projects if p['projectName'] != project_name]
|
||||
logger.debug(f"Removed {project_name} from active_projects")
|
||||
|
||||
# NEW: Emit with activityId if available
|
||||
emit_data = {}
|
||||
if 'id' in activity:
|
||||
emit_data['activityId'] = activity['id']
|
||||
elif 'activityId' in activity:
|
||||
emit_data['activityId'] = activity['activityId']
|
||||
else:
|
||||
# Fallback to project name
|
||||
emit_data['projectName'] = project_name
|
||||
|
||||
self.sio.emit('activity-stop', emit_data)
|
||||
|
||||
logger.info(f"Stopped activity for project: {project_name}")
|
||||
else:
|
||||
# Stop all activities (backward compatibility)
|
||||
project_names = [p['projectName'] for p in self.active_projects]
|
||||
self.active_projects = []
|
||||
logger.debug("Cleared all active_projects")
|
||||
|
||||
# Emit to server (server should handle stopping all activities for this user)
|
||||
self.sio.emit('activity-stop')
|
||||
|
||||
logger.info(f"Stopped all activities: {project_names}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to stop activity: {e}")
|
||||
return False
|
||||
|
||||
def get_activities(self) -> List[Dict]:
|
||||
"""Get current activities via REST API"""
|
||||
if not self.api_key:
|
||||
return []
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.server_url}/api/activities",
|
||||
headers={"X-API-Key": self.api_key},
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
all_activities = data.get('activities', [])
|
||||
# Filter to only return active activities
|
||||
return [a for a in all_activities if a.get('isActive', False)]
|
||||
else:
|
||||
logger.error(f"Failed to fetch activities: {response.status_code}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch activities: {e}")
|
||||
return []
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
"""Check if the service is properly configured"""
|
||||
return all([self.server_url, self.api_key, self.user_id, self.user_name])
|
||||
|
||||
def get_active_users_count(self) -> int:
|
||||
"""Get count of active users"""
|
||||
return len(self.activities)
|
||||
|
||||
def is_project_active(self, project_name: str) -> Optional[Dict]:
|
||||
"""Check if a project has active users"""
|
||||
for activity in self.activities:
|
||||
if activity.get('projectName') == project_name and activity.get('isActive'):
|
||||
return activity
|
||||
return None
|
||||
|
||||
def get_current_activity(self, project_name: str = None) -> Optional[Dict]:
|
||||
"""Get current user's activity for a specific project or first active project"""
|
||||
if project_name:
|
||||
# Return specific project if active
|
||||
activity = next((p for p in self.active_projects if p['projectName'] == project_name), None)
|
||||
logger.debug(f"get_current_activity for {project_name}, returning: {activity}")
|
||||
return activity
|
||||
else:
|
||||
# Return first active project for backward compatibility
|
||||
activity = self.active_projects[0] if self.active_projects else None
|
||||
logger.debug(f"get_current_activity called, returning: {activity}")
|
||||
return activity
|
||||
|
||||
def get_all_current_activities(self) -> List[Dict]:
|
||||
"""Get all current user's active projects"""
|
||||
logger.debug(f"get_all_current_activities called, returning {len(self.active_projects)} activities")
|
||||
return self.active_projects.copy()
|
||||
|
||||
def is_project_active_for_user(self, project_name: str) -> bool:
|
||||
"""Check if a specific project is active for the current user"""
|
||||
return any(p['projectName'] == project_name for p in self.active_projects)
|
||||
|
||||
def _fetch_initial_activities(self):
|
||||
"""Fetch initial activities after connection"""
|
||||
try:
|
||||
# Fetch user-specific activities
|
||||
response = requests.get(
|
||||
f"{self.server_url}/api/activities/{self.user_id}",
|
||||
headers={"X-API-Key": self.api_key},
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
# NEW: Handle array format
|
||||
user_activities = data.get('activities', [])
|
||||
# Update our active projects list
|
||||
self.active_projects = [a for a in user_activities if a.get('isActive', False)]
|
||||
logger.info(f"Fetched {len(self.active_projects)} active projects for current user")
|
||||
|
||||
# Also fetch all activities
|
||||
activities = self.get_activities() # Already filtered to only active
|
||||
if activities:
|
||||
self.activities = activities
|
||||
if self.on_activities_update:
|
||||
self.on_activities_update(activities)
|
||||
logger.info(f"Fetched {len(activities)} active activities from all users")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch initial activities: {e}")
|
||||
|
||||
|
||||
# Global instance
|
||||
activity_service = ActivitySyncService()
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren