""" Process Manager Module Tracks and manages running Claude processes """ import subprocess import os import json from typing import Dict, Optional from datetime import datetime from utils.logger import logger class ProcessManager: def __init__(self): self.processes: Dict[str, subprocess.Popen] = {} self.process_data_file = os.path.join(os.path.dirname(__file__), "data/running_processes.json") logger.info("Initializing ProcessManager") self.load_process_data() def load_process_data(self): """Load process data from file""" try: if os.path.exists(self.process_data_file): with open(self.process_data_file, 'r') as f: data = json.load(f) # Clean up old data (processes that are no longer running) self.clean_process_data(data) except Exception as e: logger.error(f"Error loading process data: {e}") print(f"Error loading process data: {e}") def clean_process_data(self, data: dict): """Clean up process data for processes that are no longer running""" # For now, we start fresh each time the app starts # In a real app, we might check if processes are still running self.save_process_data({}) def save_process_data(self, data: dict = None): """Save process data to file""" if data is None: data = { project_id: { "pid": proc.pid if proc else None, "start_time": datetime.now().isoformat(), "status": "running" if proc and proc.poll() is None else "stopped" } for project_id, proc in self.processes.items() } os.makedirs(os.path.dirname(self.process_data_file), exist_ok=True) try: with open(self.process_data_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error saving process data: {e}") def start_process(self, project_id: str, command: list, shell: bool = False) -> subprocess.Popen: """Start a new process for a project""" logger.info(f"Starting process for project: {project_id}") logger.debug(f"Command: {' '.join(command if isinstance(command, list) else [command])}") # Stop existing process if any if project_id in self.processes: logger.warning(f"Stopping existing process for project: {project_id}") self.stop_process(project_id) # Start new process process = subprocess.Popen(command, shell=shell) self.processes[project_id] = process logger.info(f"Process started with PID: {process.pid}") self.save_process_data() return process def stop_process(self, project_id: str, project_name: str = None) -> bool: """Stop a process for a project""" logger.info(f"Stopping process for project: {project_id} (name: {project_name})") try: if project_id == "vps-permanent": # Kill VPS windows cmd_kill_cmd = 'taskkill /fi "WINDOWTITLE eq Claude VPS Server" /f >nul 2>&1' logger.debug(f"Executing VPS kill command: {cmd_kill_cmd}") os.system(cmd_kill_cmd) else: # Kill project-specific window if project_name: window_title = f"Claude - {project_name}" # Use powershell for more accurate window title matching ps_cmd = f'powershell -Command "Get-Process | Where-Object {{$_.MainWindowTitle -eq \'{window_title}\'}} | Stop-Process -Force"' logger.debug(f"Executing PowerShell kill command: {ps_cmd}") os.system(ps_cmd) # Also try taskkill as fallback cmd_kill_cmd = f'taskkill /fi "WINDOWTITLE eq {window_title}" /f >nul 2>&1' logger.debug(f"Executing taskkill command: {cmd_kill_cmd}") os.system(cmd_kill_cmd) # Clean up tracked process if exists if project_id in self.processes: process = self.processes[project_id] if process and process.poll() is None: try: process.terminate() process.wait(timeout=2) except: try: process.kill() except: pass del self.processes[project_id] self.save_process_data() logger.info(f"Process stopped successfully for project: {project_id}") return True except Exception as e: logger.error(f"Error stopping process: {e}") print(f"Error stopping process: {e}") return False def check_window_exists(self, window_title: str) -> bool: """Check if a window with the given title exists""" # This method is not used anymore - checking is done in ProjectProcessTracker return False def is_running(self, project_id: str) -> bool: """Check if a process is running for a project""" # Don't rely on the launcher process, check for actual window return False # Will be overridden by is_project_running def stop_all_processes(self): """Stop all running processes""" project_ids = list(self.processes.keys()) for project_id in project_ids: self.stop_process(project_id) def get_process_info(self, project_id: str) -> Optional[dict]: """Get process information for a project""" if project_id in self.processes: process = self.processes[project_id] return { "pid": process.pid if process else None, "running": self.is_running(project_id) } return None