import { NextRequest } from 'next/server'; import { createSupervisorClient } from '@/lib/supervisor/client'; import { createApiLogger } from '@/lib/utils/api-logger'; import { formatError, generateRequestId } from '@/lib/utils/logger'; export const dynamic = 'force-dynamic'; /** * Server-Sent Events endpoint for real-time process state updates * Polls supervisor every 2 seconds and sends state changes to clients */ export async function GET(request: NextRequest) { const requestId = generateRequestId(); const logger = createApiLogger(request, 'SSE-Events'); const encoder = new TextEncoder(); let intervalId: NodeJS.Timeout | null = null; let previousState: string | null = null; let pollCount = 0; let stateChangeCount = 0; logger.info({ requestId }, 'SSE connection initiated'); const stream = new ReadableStream({ async start(controller) { // Helper to send SSE message const sendEvent = (event: string, data: any) => { const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; controller.enqueue(encoder.encode(message)); }; // Send initial connection message sendEvent('connected', { timestamp: Date.now() }); logger.debug({ requestId }, 'SSE connected event sent'); // Poll supervisor for state changes const pollSupervisor = async () => { pollCount++; const pollStartTime = Date.now(); try { const client = createSupervisorClient(); const processes = await client.getAllProcessInfo(); // Create state snapshot const currentState = JSON.stringify( processes.map((p) => ({ name: `${p.group}:${p.name}`, state: p.state, pid: p.pid, statename: p.statename, })) ); // Send update if state changed if (currentState !== previousState) { stateChangeCount++; sendEvent('process-update', { processes, timestamp: Date.now(), }); logger.info({ requestId, pollCount, stateChangeCount, processCount: processes.length, duration: Date.now() - pollStartTime, }, `Process state change detected (change #${stateChangeCount})`); previousState = currentState; } // Send heartbeat every poll (reduced logging - only log every 10th heartbeat) sendEvent('heartbeat', { timestamp: Date.now() }); if (pollCount % 10 === 0) { logger.debug({ requestId, pollCount, stateChangeCount, duration: Date.now() - pollStartTime, }, `SSE heartbeat #${pollCount}`); } } catch (error: any) { const errorInfo = formatError(error); logger.error({ requestId, pollCount, error: errorInfo, duration: Date.now() - pollStartTime, }, `SSE polling error: ${errorInfo.message}`); sendEvent('error', { message: error.message || 'Failed to fetch process state', timestamp: Date.now(), }); } }; // Initial poll await pollSupervisor(); // Poll every 2 seconds intervalId = setInterval(pollSupervisor, 2000); }, cancel() { if (intervalId) { clearInterval(intervalId); } logger.info({ requestId, pollCount, stateChangeCount, duration: Date.now() - Date.now(), }, 'SSE connection closed'); }, }); return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', // Disable nginx buffering 'X-Request-ID': requestId, }, }); }