/** * TASKMATE - Claude Assistant Routes * ==================================== * REST-Endpunkte und Session-Manager fuer den Claude-Assistenten * * Kommuniziert ueber HTTP mit dem Claude-Proxy (SSE-Streaming) * Proxy-URL: http://172.20.0.1:3100/api/chat */ const express = require('express'); const router = express.Router(); const http = require('http'); const { getDb } = require('../database'); const logger = require('../utils/logger'); // ============================================================================ // SESSION MANAGER // ============================================================================ // Aktive Sessions: userId -> { claudeSessionId, busy, sessionId, socket, currentRequest, timeout } const activeSessions = new Map(); const SESSION_TIMEOUT_MS = 30 * 60 * 1000; // 30 Minuten /** * Erlaubte Benutzer fuer den Assistenten (lowercase) */ const ALLOWED_ASSISTANT_USERS = new Set([ 'hendrik', 'monami', 'hendrik_gebhardt@gmx.de', 'momohomma@googlemail.com' ]); function hasAssistantAccess(user) { const username = (user.username || '').toLowerCase(); const displayName = (user.displayName || '').toLowerCase(); return ALLOWED_ASSISTANT_USERS.has(username) || ALLOWED_ASSISTANT_USERS.has(displayName); } /** * Berechtigungs-Middleware: Nur Hendrik und Monami duerfen den Assistenten nutzen */ function requireAssistantAccess(req, res, next) { if (!hasAssistantAccess(req.user)) { return res.status(403).json({ error: 'Kein Zugriff auf den Assistenten' }); } next(); } /** * Socket-Berechtigung pruefen */ function checkSocketAccess(socket) { return hasAssistantAccess(socket.user); } /** * Timeout zuruecksetzen fuer eine Session */ function resetTimeout(userId) { const session = activeSessions.get(userId); if (!session) return; if (session.timeout) { clearTimeout(session.timeout); } session.timeout = setTimeout(() => { logger.info(`[Assistant] Session-Timeout fuer User ${userId} - wird beendet`); stopSession(userId); }, SESSION_TIMEOUT_MS); } /** * Proxy-URL und Token */ const PROXY_URL = 'http://172.20.0.1:3100/api/chat'; const PROXY_TOKEN = process.env.PROXY_TOKEN || ''; /** * Claude-Session aktivieren (kein Prozess - der wird erst bei sendMessage gestartet) */ function startSession(userId, sessionId, socket) { const existing = activeSessions.get(userId); // Gleiche Session: Nur Socket aktualisieren if (existing && existing.sessionId === sessionId) { existing.socket = socket; resetTimeout(userId); socket.emit('assistant:status', { sessionId, status: existing.busy ? 'thinking' : 'active' }); return; } // Andere Session: bestehende aufraemen if (existing) { if (existing.currentRequest) { try { existing.currentRequest.destroy(); } catch (e) {} } if (existing.timeout) clearTimeout(existing.timeout); } const sessionData = { claudeSessionId: null, busy: false, sessionId, socket, currentRequest: null, timeout: null }; activeSessions.set(userId, sessionData); resetTimeout(userId); logger.info(`[Assistant] Session ${sessionId} aktiviert fuer User ${userId}`); socket.emit('assistant:status', { sessionId, status: 'active' }); // TaskContext als erste Nachricht senden try { const db = getDb(); const dbSession = db.prepare('SELECT task_context FROM assistant_sessions WHERE id = ?').get(sessionId); if (dbSession && dbSession.task_context) { sendMessage(userId, dbSession.task_context, socket); } } catch (err) { logger.error(`[Assistant] TaskContext-Fehler: ${err.message}`); } } /** * Nachricht an Claude senden (HTTP-Request an Proxy mit SSE-Streaming) */ function sendMessage(userId, message, socket) { const session = activeSessions.get(userId); if (!session) throw new Error('Keine aktive Session'); if (session.busy) throw new Error('Assistent verarbeitet noch eine Nachricht'); session.busy = true; session.socket = socket; // User-Nachricht in DB speichern try { const db = getDb(); db.prepare(` INSERT INTO assistant_messages (session_id, role, content) VALUES (?, 'user', ?) `).run(session.sessionId, message); } catch (err) { logger.error(`[Assistant] DB-Fehler (user msg): ${err.message}`); } socket.emit('assistant:status', { sessionId: session.sessionId, status: 'thinking' }); logger.info(`[Assistant] Proxy-Aufruf fuer Session ${session.sessionId} (resume: ${session.claudeSessionId || 'nein'})`); // HTTP-Request an Proxy const postData = JSON.stringify({ message, resumeSessionId: session.claudeSessionId || null }); const url = new URL(PROXY_URL); const options = { hostname: url.hostname, port: url.port, path: url.pathname, method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Proxy-Token': PROXY_TOKEN, 'Content-Length': Buffer.byteLength(postData) } }; const req = http.request(options, (res) => { let fullOutput = ''; let buffer = ''; let currentEvent = null; res.setEncoding('utf8'); res.on('data', (chunk) => { buffer += chunk; const lines = buffer.split('\n'); buffer = lines.pop(); // Unvollstaendige Zeile behalten for (const line of lines) { // SSE event-Zeile if (line.startsWith('event: ')) { currentEvent = line.substring(7).trim(); continue; } // SSE data-Zeile if (line.startsWith('data: ')) { const dataStr = line.substring(6); if (currentEvent === 'done') { // Done-Event: Enthaelt claudeSessionId try { const json = JSON.parse(dataStr); if (json.sessionId) { session.claudeSessionId = json.sessionId; logger.info(`[Assistant] Claude-SessionId gesetzt: ${json.sessionId}`); } } catch (e) { logger.warn(`[Assistant] Done-Event Parse-Fehler: ${e.message}`); } currentEvent = null; continue; } if (currentEvent === 'error') { // Error-Event try { const json = JSON.parse(dataStr); logger.error(`[Assistant] Proxy-Fehler: ${json.error || dataStr}`); if (socket.connected) { socket.emit('assistant:status', { sessionId: session.sessionId, status: 'error', error: json.error || 'Proxy-Fehler' }); } } catch (e) { logger.error(`[Assistant] Proxy-Fehler (raw): ${dataStr}`); } currentEvent = null; continue; } // Normales Text-Event try { const json = JSON.parse(dataStr); const text = json.text || json.content || ''; if (text) { fullOutput += text; if (socket.connected) { socket.emit('assistant:output', { sessionId: session.sessionId, content: text }); } } } catch (e) { // Kein valides JSON - Rohtext verwenden if (dataStr.trim()) { fullOutput += dataStr; if (socket.connected) { socket.emit('assistant:output', { sessionId: session.sessionId, content: dataStr }); } } } currentEvent = null; } } }); res.on('end', () => { // Restlichen Buffer verarbeiten if (buffer.trim()) { const lines = buffer.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const dataStr = line.substring(6); try { const json = JSON.parse(dataStr); const text = json.text || json.content || ''; if (text) { fullOutput += text; if (socket.connected) { socket.emit('assistant:output', { sessionId: session.sessionId, content: text }); } } } catch (e) {} } } } // Komplette Antwort in DB speichern if (fullOutput) { try { const db = getDb(); db.prepare(` INSERT INTO assistant_messages (session_id, role, content) VALUES (?, 'assistant', ?) `).run(session.sessionId, fullOutput); } catch (err) { logger.error(`[Assistant] DB-Fehler (assistant msg): ${err.message}`); } } logger.info(`[Assistant] Proxy-Antwort abgeschlossen fuer Session ${session.sessionId}`); session.busy = false; session.currentRequest = null; resetTimeout(userId); if (socket.connected) { if (res.statusCode !== 200 && !fullOutput) { socket.emit('assistant:status', { sessionId: session.sessionId, status: 'error', error: `Proxy-Fehler (HTTP ${res.statusCode})` }); } else { socket.emit('assistant:status', { sessionId: session.sessionId, status: 'active' }); } } }); }); req.on('error', (err) => { logger.error(`[Assistant] HTTP-Fehler: ${err.message}`); session.busy = false; session.currentRequest = null; if (socket.connected) { socket.emit('assistant:status', { sessionId: session.sessionId, status: 'error', error: `Verbindung zum Assistenten fehlgeschlagen: ${err.message}` }); } }); session.currentRequest = req; req.write(postData); req.end(); } /** * Session beenden */ function stopSession(userId) { const session = activeSessions.get(userId); if (!session) return; logger.info(`[Assistant] Session ${session.sessionId} wird beendet fuer User ${userId}`); if (session.currentRequest) { try { session.currentRequest.destroy(); } catch (e) {} } if (session.timeout) clearTimeout(session.timeout); activeSessions.delete(userId); // DB aktualisieren try { const db = getDb(); db.prepare(` UPDATE assistant_sessions SET status = 'ended', ended_at = CURRENT_TIMESTAMP WHERE id = ? `).run(session.sessionId); } catch (err) { logger.error(`[Assistant] DB-Fehler beim Beenden: ${err.message}`); } } // ============================================================================ // REST ENDPOINTS // ============================================================================ // Alle Routen brauchen Assistant-Zugriff router.use(requireAssistantAccess); /** * GET /sessions - Alle Sessions des Users */ router.get('/sessions', (req, res) => { try { const db = getDb(); const sessions = db.prepare(` SELECT id, user_id, title, status, task_context, created_at, ended_at FROM assistant_sessions WHERE user_id = ? ORDER BY created_at DESC `).all(req.user.id); res.json(sessions); } catch (err) { logger.error(`[Assistant] Fehler beim Laden der Sessions: ${err.message}`); res.status(500).json({ error: 'Fehler beim Laden der Sessions' }); } }); /** * GET /sessions/:id/messages - Alle Nachrichten einer Session */ router.get('/sessions/:id/messages', (req, res) => { try { const db = getDb(); const sessionId = parseInt(req.params.id, 10); // Pruefen ob Session dem User gehoert const session = db.prepare(` SELECT id FROM assistant_sessions WHERE id = ? AND user_id = ? `).get(sessionId, req.user.id); if (!session) { return res.status(404).json({ error: 'Session nicht gefunden' }); } const messages = db.prepare(` SELECT id, session_id, role, content, created_at FROM assistant_messages WHERE session_id = ? ORDER BY created_at ASC `).all(sessionId); res.json(messages); } catch (err) { logger.error(`[Assistant] Fehler beim Laden der Nachrichten: ${err.message}`); res.status(500).json({ error: 'Fehler beim Laden der Nachrichten' }); } }); /** * POST /sessions - Neue Session erstellen */ router.post('/sessions', (req, res) => { try { const db = getDb(); const { title, taskContext } = req.body; const result = db.prepare(` INSERT INTO assistant_sessions (user_id, title, task_context) VALUES (?, ?, ?) `).run(req.user.id, title || 'Neue Session', taskContext || null); const session = db.prepare(` SELECT id, user_id, title, status, task_context, created_at, ended_at FROM assistant_sessions WHERE id = ? `).get(result.lastInsertRowid); logger.info(`[Assistant] Neue Session ${session.id} erstellt von ${req.user.username}`); res.status(201).json(session); } catch (err) { logger.error(`[Assistant] Fehler beim Erstellen der Session: ${err.message}`); res.status(500).json({ error: 'Fehler beim Erstellen der Session' }); } }); /** * DELETE /sessions/:id - Session loeschen */ router.delete('/sessions/:id', (req, res) => { try { const db = getDb(); const sessionId = parseInt(req.params.id, 10); // Pruefen ob Session dem User gehoert const session = db.prepare(` SELECT id, user_id FROM assistant_sessions WHERE id = ? AND user_id = ? `).get(sessionId, req.user.id); if (!session) { return res.status(404).json({ error: 'Session nicht gefunden' }); } // Wenn aktiver Prozess laeuft, zuerst beenden const activeSession = activeSessions.get(req.user.id); if (activeSession && activeSession.sessionId === sessionId) { stopSession(req.user.id); } // Session und zugehoerige Nachrichten loeschen (CASCADE) db.prepare('DELETE FROM assistant_sessions WHERE id = ?').run(sessionId); logger.info(`[Assistant] Session ${sessionId} geloescht von ${req.user.username}`); res.json({ success: true }); } catch (err) { logger.error(`[Assistant] Fehler beim Loeschen der Session: ${err.message}`); res.status(500).json({ error: 'Fehler beim Loeschen der Session' }); } }); // ============================================================================ // SOCKET EVENT HANDLER (exportiert fuer server.js) // ============================================================================ /** * Socket-Events registrieren */ function registerSocketEvents(socket) { const userId = socket.user.id; // assistant:start - Session aktivieren socket.on('assistant:start', (data) => { try { if (!checkSocketAccess(socket)) { socket.emit('assistant:status', { status: 'error', error: 'Kein Zugriff' }); return; } const sessionId = data && data.sessionId; if (!sessionId) { socket.emit('assistant:status', { status: 'error', error: 'Keine Session-ID angegeben' }); return; } // Pruefen ob Session existiert und dem User gehoert const db = getDb(); const session = db.prepare(` SELECT id, user_id, status FROM assistant_sessions WHERE id = ? AND user_id = ? `).get(sessionId, userId); if (!session) { socket.emit('assistant:status', { status: 'error', error: 'Session nicht gefunden' }); return; } // Beendete Session reaktivieren if (session.status === 'ended') { db.prepare(`UPDATE assistant_sessions SET status = 'active', ended_at = NULL WHERE id = ?`).run(sessionId); } startSession(userId, sessionId, socket); } catch (err) { logger.error(`[Assistant] Fehler beim Starten: ${err.message}`); socket.emit('assistant:status', { status: 'error', error: err.message }); } }); // assistant:message - Nachricht senden socket.on('assistant:message', (data) => { try { if (!checkSocketAccess(socket)) { socket.emit('assistant:status', { status: 'error', error: 'Kein Zugriff' }); return; } const message = data && data.message; if (!message || typeof message !== 'string') { socket.emit('assistant:status', { status: 'error', error: 'Keine Nachricht angegeben' }); return; } sendMessage(userId, message, socket); } catch (err) { logger.error(`[Assistant] Fehler beim Senden: ${err.message}`); socket.emit('assistant:status', { status: 'error', error: err.message }); } }); // assistant:stop - Session beenden socket.on('assistant:stop', () => { try { if (!checkSocketAccess(socket)) { socket.emit('assistant:status', { status: 'error', error: 'Kein Zugriff' }); return; } stopSession(userId); socket.emit('assistant:status', { status: 'stopped' }); } catch (err) { logger.error(`[Assistant] Fehler beim Stoppen: ${err.message}`); socket.emit('assistant:status', { status: 'error', error: err.message }); } }); // Bei Disconnect: aktive Session beenden socket.on('disconnect', () => { if (activeSessions.has(userId)) { logger.info(`[Assistant] Socket disconnect - Session wird beendet fuer User ${userId}`); stopSession(userId); } }); } module.exports = router; module.exports.registerSocketEvents = registerSocketEvents; module.exports.stopSession = stopSession;