Files
ClaudeProjectManager/services/activity_sync.py
Claude Project Manager 6112313a91 Update changes
2025-07-09 22:40:13 +02:00

364 Zeilen
15 KiB
Python

"""
Activity Sync Service for CPM
Handles WebSocket connection to the Activity Server
"""
import socketio
import requests
import threading
import time
import json
from typing import Optional, Callable, List, Dict
from pathlib import Path
from utils.logger import logger
class ActivitySyncService:
def __init__(self, server_url: str = None, api_key: str = None, user_id: str = None, user_name: str = None):
self.server_url = server_url or "http://localhost:3001"
self.api_key = api_key
self.user_id = user_id
self.user_name = user_name
self.sio = None
self.connected = False
self.on_activities_update = None
self.activities = []
self.active_projects = [] # Changed from single current_activity to list
# Load settings
self.load_settings()
def load_settings(self):
"""Load activity sync settings from file"""
settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json"
try:
if settings_file.exists():
with open(settings_file, 'r') as f:
settings = json.load(f)
self.server_url = settings.get("server_url", self.server_url)
self.api_key = settings.get("api_key", self.api_key)
self.user_id = settings.get("user_id", self.user_id)
self.user_name = settings.get("user_name", self.user_name)
logger.info(f"Loaded activity sync settings from {settings_file}")
except Exception as e:
logger.error(f"Failed to load activity settings: {e}")
def save_settings(self):
"""Save activity sync settings to file"""
settings_file = Path.home() / ".claude_project_manager" / "activity_settings.json"
try:
settings_file.parent.mkdir(parents=True, exist_ok=True)
settings = {
"server_url": self.server_url,
"api_key": self.api_key,
"user_id": self.user_id,
"user_name": self.user_name
}
with open(settings_file, 'w') as f:
json.dump(settings, f, indent=2)
logger.info(f"Saved activity sync settings to {settings_file}")
except Exception as e:
logger.error(f"Failed to save activity settings: {e}")
def connect(self):
"""Connect to the activity server"""
if not all([self.server_url, self.api_key, self.user_id, self.user_name]):
logger.warning("Activity sync not configured properly")
return False
try:
# Create Socket.IO client
self.sio = socketio.Client()
# Register event handlers
@self.sio.event
def connect():
logger.info("Connected to activity server")
self.connected = True
# Fetch initial activities after connection
self._fetch_initial_activities()
@self.sio.event
def disconnect():
logger.info("Disconnected from activity server")
self.connected = False
@self.sio.event
def activities_update(data):
"""Handle activities update from server"""
logger.debug(f"Received raw activities update: {len(data)} items")
# Filter out inactive entries and ensure we only keep active ones
active_activities = [
activity for activity in data
if activity.get('isActive', False)
]
# Log details about the activities
for activity in active_activities:
logger.debug(f"Active: {activity.get('userName')} on {activity.get('projectName')}")
self.activities = active_activities
logger.info(f"Activities update: {len(active_activities)} active (of {len(data)} total)")
if self.on_activities_update:
self.on_activities_update(active_activities)
@self.sio.event
def connect_error(data):
logger.error(f"Connection error: {data}")
self.connected = False
# Connect with authentication
logger.info(f"Attempting to connect to activity server: {self.server_url}")
# Try to force polling transport if WebSockets are blocked
import os
os.environ['SOCKETIO_TRANSPORT'] = 'polling'
self.sio.connect(
self.server_url,
auth={
'token': self.api_key,
'userId': self.user_id,
'userName': self.user_name
},
wait_timeout=10,
transports=['polling', 'websocket'] # Try polling first, then websocket
)
return True
except Exception as e:
logger.error(f"Failed to connect to activity server: {e}")
self.connected = False
return False
def disconnect(self):
"""Disconnect from the activity server"""
if self.sio:
try:
self.sio.disconnect()
self.sio = None
self.connected = False
logger.info("Disconnected from activity server")
except Exception as e:
logger.error(f"Error disconnecting: {e}")
def start_activity(self, project_name: str, project_path: str, description: str = ""):
"""Start a new activity (adds to active projects list)"""
logger.debug(f"start_activity called for: {project_name}")
if not self.connected or not self.sio:
logger.warning("Not connected to activity server")
return False
try:
# Check if project is already active
existing = next((p for p in self.active_projects if p['projectName'] == project_name), None)
if existing:
logger.info(f"Project {project_name} is already active")
return True
# Add to active projects list
new_activity = {
'projectName': project_name,
'projectPath': project_path,
'userId': self.user_id,
'userName': self.user_name,
'isActive': True
}
self.active_projects.append(new_activity)
logger.debug(f"Added to active_projects: {new_activity}")
# Emit to server
self.sio.emit('activity-start', {
'projectName': project_name,
'projectPath': project_path,
'description': description
})
logger.info(f"Started activity for project: {project_name}")
# Force refresh after a short delay to ensure all clients get the update
def refresh_activities():
time.sleep(0.5) # Give server time to process
activities = self.get_activities()
if activities is not None:
self.activities = activities
if self.on_activities_update:
self.on_activities_update(activities)
logger.debug("Forced activity refresh after start")
threading.Thread(target=refresh_activities, daemon=True).start()
return True
except Exception as e:
logger.error(f"Failed to start activity: {e}")
# Remove from list on failure
self.active_projects = [p for p in self.active_projects if p['projectName'] != project_name]
return False
def stop_activity(self, project_name: str = None):
"""Stop activity for a specific project or all activities if no project specified"""
logger.debug(f"stop_activity called for project: {project_name}")
if not self.connected or not self.sio:
logger.warning("Not connected to activity server")
return False
try:
if project_name:
# Stop specific project
activity = next((p for p in self.active_projects if p['projectName'] == project_name), None)
if not activity:
logger.warning(f"Project {project_name} is not in active projects")
return False
# Remove from active projects
self.active_projects = [p for p in self.active_projects if p['projectName'] != project_name]
logger.debug(f"Removed {project_name} from active_projects")
# NEW: Emit with activityId if available
emit_data = {}
if 'id' in activity:
emit_data['activityId'] = activity['id']
elif 'activityId' in activity:
emit_data['activityId'] = activity['activityId']
else:
# Fallback to project name
emit_data['projectName'] = project_name
self.sio.emit('activity-stop', emit_data)
logger.info(f"Stopped activity for project: {project_name}")
# Force refresh after a short delay to ensure all clients get the update
def refresh_activities():
time.sleep(0.5) # Give server time to process
activities = self.get_activities()
if activities is not None:
self.activities = activities
if self.on_activities_update:
self.on_activities_update(activities)
logger.debug("Forced activity refresh after stop")
threading.Thread(target=refresh_activities, daemon=True).start()
else:
# Stop all activities (backward compatibility)
project_names = [p['projectName'] for p in self.active_projects]
self.active_projects = []
logger.debug("Cleared all active_projects")
# Emit to server (server should handle stopping all activities for this user)
self.sio.emit('activity-stop')
logger.info(f"Stopped all activities: {project_names}")
# Force refresh after a short delay to ensure all clients get the update
def refresh_activities():
time.sleep(0.5) # Give server time to process
activities = self.get_activities()
if activities is not None:
self.activities = activities
if self.on_activities_update:
self.on_activities_update(activities)
logger.debug("Forced activity refresh after stopping all")
threading.Thread(target=refresh_activities, daemon=True).start()
return True
except Exception as e:
logger.error(f"Failed to stop activity: {e}")
return False
def get_activities(self) -> List[Dict]:
"""Get current activities via REST API"""
if not self.api_key:
return []
try:
response = requests.get(
f"{self.server_url}/api/activities",
headers={"X-API-Key": self.api_key},
timeout=5
)
if response.status_code == 200:
data = response.json()
all_activities = data.get('activities', [])
# Filter to only return active activities
return [a for a in all_activities if a.get('isActive', False)]
else:
logger.error(f"Failed to fetch activities: {response.status_code}")
return []
except Exception as e:
logger.error(f"Failed to fetch activities: {e}")
return []
def is_configured(self) -> bool:
"""Check if the service is properly configured"""
return all([self.server_url, self.api_key, self.user_id, self.user_name])
def get_active_users_count(self) -> int:
"""Get count of active users"""
return len(self.activities)
def is_project_active(self, project_name: str) -> Optional[Dict]:
"""Check if a project has active users"""
for activity in self.activities:
if activity.get('projectName') == project_name and activity.get('isActive'):
return activity
return None
def get_current_activity(self, project_name: str = None) -> Optional[Dict]:
"""Get current user's activity for a specific project or first active project"""
if project_name:
# Return specific project if active
activity = next((p for p in self.active_projects if p['projectName'] == project_name), None)
logger.debug(f"get_current_activity for {project_name}, returning: {activity}")
return activity
else:
# Return first active project for backward compatibility
activity = self.active_projects[0] if self.active_projects else None
logger.debug(f"get_current_activity called, returning: {activity}")
return activity
def get_all_current_activities(self) -> List[Dict]:
"""Get all current user's active projects"""
logger.debug(f"get_all_current_activities called, returning {len(self.active_projects)} activities")
return self.active_projects.copy()
def is_project_active_for_user(self, project_name: str) -> bool:
"""Check if a specific project is active for the current user"""
return any(p['projectName'] == project_name for p in self.active_projects)
def _fetch_initial_activities(self):
"""Fetch initial activities after connection"""
try:
# Fetch user-specific activities
response = requests.get(
f"{self.server_url}/api/activities/{self.user_id}",
headers={"X-API-Key": self.api_key},
timeout=5
)
if response.status_code == 200:
data = response.json()
# NEW: Handle array format
user_activities = data.get('activities', [])
# Update our active projects list
self.active_projects = [a for a in user_activities if a.get('isActive', False)]
logger.info(f"Fetched {len(self.active_projects)} active projects for current user")
# Also fetch all activities
activities = self.get_activities() # Already filtered to only active
if activities:
self.activities = activities
if self.on_activities_update:
self.on_activities_update(activities)
logger.info(f"Fetched {len(activities)} active activities from all users")
except Exception as e:
logger.error(f"Failed to fetch initial activities: {e}")
# Global instance
activity_service = ActivitySyncService()