""" Activity Sync Service for CPM Handles WebSocket connection to the Activity Server """ import socketio import requests import threading import json from typing import Optional, Callable, List, Dict from pathlib import Path from utils.logger import logger class ActivitySyncService: def __init__(self, server_url: str = None, api_key: str = None, user_id: str = None, user_name: str = None): self.server_url = server_url or "http://localhost:3001" self.api_key = api_key self.user_id = user_id self.user_name = user_name self.sio = None self.connected = False self.on_activities_update = None self.activities = [] self.current_activity = None # Load settings self.load_settings() def load_settings(self): """Load activity sync settings from file""" settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json" try: if settings_file.exists(): with open(settings_file, 'r') as f: settings = json.load(f) self.server_url = settings.get("server_url", self.server_url) self.api_key = settings.get("api_key", self.api_key) self.user_id = settings.get("user_id", self.user_id) self.user_name = settings.get("user_name", self.user_name) logger.info(f"Loaded activity sync settings from {settings_file}") except Exception as e: logger.error(f"Failed to load activity settings: {e}") def save_settings(self): """Save activity sync settings to file""" settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json" try: settings_file.parent.mkdir(parents=True, exist_ok=True) settings = { "server_url": self.server_url, "api_key": self.api_key, "user_id": self.user_id, "user_name": self.user_name } with open(settings_file, 'w') as f: json.dump(settings, f, indent=2) logger.info(f"Saved activity sync settings to {settings_file}") except Exception as e: logger.error(f"Failed to save activity settings: {e}") def connect(self): """Connect to the activity server""" if not all([self.server_url, self.api_key, self.user_id, self.user_name]): logger.warning("Activity sync not configured properly") return False try: # Create Socket.IO client self.sio = socketio.Client() # Register event handlers @self.sio.event def connect(): logger.info("Connected to activity server") self.connected = True # Fetch initial activities after connection self._fetch_initial_activities() @self.sio.event def disconnect(): logger.info("Disconnected from activity server") self.connected = False @self.sio.event def activities_update(data): """Handle activities update from server""" logger.debug(f"Received raw activities update: {len(data)} items") # Filter out inactive entries and ensure we only keep active ones active_activities = [ activity for activity in data if activity.get('isActive', False) ] # Log details about the activities for activity in active_activities: logger.debug(f"Active: {activity.get('userName')} on {activity.get('projectName')}") self.activities = active_activities logger.info(f"Activities update: {len(active_activities)} active (of {len(data)} total)") if self.on_activities_update: self.on_activities_update(active_activities) @self.sio.event def connect_error(data): logger.error(f"Connection error: {data}") self.connected = False # Connect with authentication logger.info(f"Attempting to connect to activity server: {self.server_url}") # Try to force polling transport if WebSockets are blocked import os os.environ['SOCKETIO_TRANSPORT'] = 'polling' self.sio.connect( self.server_url, auth={ 'token': self.api_key, 'userId': self.user_id, 'userName': self.user_name } ) return True except Exception as e: logger.error(f"Failed to connect to activity server: {e}") self.connected = False return False def disconnect(self): """Disconnect from the activity server""" if self.sio: try: self.sio.disconnect() self.sio = None self.connected = False logger.info("Disconnected from activity server") except Exception as e: logger.error(f"Error disconnecting: {e}") def start_activity(self, project_name: str, project_path: str, description: str = ""): """Start a new activity""" logger.debug(f"start_activity called for: {project_name}") if not self.connected or not self.sio: logger.warning("Not connected to activity server") return False try: # Set current activity immediately self.current_activity = { 'projectName': project_name, 'projectPath': project_path, 'userId': self.user_id, 'userName': self.user_name, 'isActive': True } logger.debug(f"Set current_activity: {self.current_activity}") # Emit to server self.sio.emit('activity-start', { 'projectName': project_name, 'projectPath': project_path, 'description': description }) logger.info(f"Started activity for project: {project_name}") return True except Exception as e: logger.error(f"Failed to start activity: {e}") self.current_activity = None return False def stop_activity(self): """Stop the current activity""" logger.debug(f"stop_activity called, current: {self.current_activity}") if not self.connected or not self.sio: logger.warning("Not connected to activity server") return False try: # Store project name for logging project_name = self.current_activity.get('projectName') if self.current_activity else 'None' # Clear current activity immediately self.current_activity = None logger.debug("Cleared current_activity") # Emit to server self.sio.emit('activity-stop') logger.info(f"Stopped activity for project: {project_name}") return True except Exception as e: logger.error(f"Failed to stop activity: {e}") return False def get_activities(self) -> List[Dict]: """Get current activities via REST API""" if not self.api_key: return [] try: response = requests.get( f"{self.server_url}/api/activities", headers={"X-API-Key": self.api_key}, timeout=5 ) if response.status_code == 200: data = response.json() all_activities = data.get('activities', []) # Filter to only return active activities return [a for a in all_activities if a.get('isActive', False)] else: logger.error(f"Failed to fetch activities: {response.status_code}") return [] except Exception as e: logger.error(f"Failed to fetch activities: {e}") return [] def is_configured(self) -> bool: """Check if the service is properly configured""" return all([self.server_url, self.api_key, self.user_id, self.user_name]) def get_active_users_count(self) -> int: """Get count of active users""" return len(self.activities) def is_project_active(self, project_name: str) -> Optional[Dict]: """Check if a project has active users""" for activity in self.activities: if activity.get('projectName') == project_name and activity.get('isActive'): return activity return None def get_current_activity(self) -> Optional[Dict]: """Get current user's activity""" logger.debug(f"get_current_activity called, returning: {self.current_activity}") return self.current_activity def _fetch_initial_activities(self): """Fetch initial activities after connection""" try: activities = self.get_activities() # Already filtered to only active if activities: self.activities = activities if self.on_activities_update: self.on_activities_update(activities) logger.info(f"Fetched {len(activities)} active activities") except Exception as e: logger.error(f"Failed to fetch initial activities: {e}") # Global instance activity_service = ActivitySyncService()