From 25d9029d14892f2e8c79f256809a299ceb225b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kr=C3=BCger?= Date: Sun, 23 Nov 2025 19:54:14 +0100 Subject: [PATCH] feat: implement Phase 11 - Real-time Updates with SSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Features added: - Created SSE (Server-Sent Events) endpoint at /api/supervisor/events - Polls supervisor every 2 seconds for state changes - Sends process-update events when state changes detected - Sends heartbeat events to keep connection alive - Includes error handling with error events - Created useEventSource hook for managing SSE connections - Automatic reconnection with exponential backoff - Configurable max reconnection attempts (default 10) - Connection status tracking (connecting, connected, disconnected, error) - Clean event listener management with proper cleanup - Heartbeat monitoring for connection health - Created ConnectionStatus component - Visual status indicator with icons (Wifi, WifiOff, Loader, AlertCircle) - Color-coded states (green=connected, yellow=connecting, red=error) - Shows reconnection attempt count - Manual reconnect button when disconnected/error - Integrated real-time updates into dashboard and processes pages - Auto-refresh process data when state changes occur - Connection status indicator in page headers - No manual refresh needed for live updates - Implemented proper cleanup on unmount - EventSource properly closed - Reconnection timeouts cleared - No memory leaks 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- app/api/supervisor/events/route.ts | 84 +++++++++++++++++ app/page.tsx | 40 ++++++-- app/processes/page.tsx | 42 +++++++-- components/ui/ConnectionStatus.tsx | 81 ++++++++++++++++ lib/hooks/useEventSource.ts | 144 +++++++++++++++++++++++++++++ 5 files changed, 377 insertions(+), 14 deletions(-) create mode 100644 app/api/supervisor/events/route.ts create mode 100644 components/ui/ConnectionStatus.tsx create mode 100644 lib/hooks/useEventSource.ts diff --git a/app/api/supervisor/events/route.ts b/app/api/supervisor/events/route.ts new file mode 100644 index 0000000..5b03545 --- /dev/null +++ b/app/api/supervisor/events/route.ts @@ -0,0 +1,84 @@ +import { NextRequest } from 'next/server'; +import { createSupervisorClient } from '@/lib/supervisor/client'; + +export const dynamic = 'force-dynamic'; + +/** + * Server-Sent Events endpoint for real-time process state updates + * Polls supervisor every 2 seconds and sends state changes to clients + */ +export async function GET(request: NextRequest) { + const encoder = new TextEncoder(); + let intervalId: NodeJS.Timeout | null = null; + let previousState: string | null = null; + + const stream = new ReadableStream({ + async start(controller) { + // Helper to send SSE message + const sendEvent = (event: string, data: any) => { + const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; + controller.enqueue(encoder.encode(message)); + }; + + // Send initial connection message + sendEvent('connected', { timestamp: Date.now() }); + + // Poll supervisor for state changes + const pollSupervisor = async () => { + try { + const client = createSupervisorClient(); + const processes = await client.getAllProcessInfo(); + + // Create state snapshot + const currentState = JSON.stringify( + processes.map((p) => ({ + name: `${p.group}:${p.name}`, + state: p.state, + pid: p.pid, + statename: p.statename, + })) + ); + + // Send update if state changed + if (currentState !== previousState) { + sendEvent('process-update', { + processes, + timestamp: Date.now(), + }); + previousState = currentState; + } + + // Send heartbeat every poll + sendEvent('heartbeat', { timestamp: Date.now() }); + } catch (error: any) { + console.error('SSE polling error:', error); + sendEvent('error', { + message: error.message || 'Failed to fetch process state', + timestamp: Date.now(), + }); + } + }; + + // Initial poll + await pollSupervisor(); + + // Poll every 2 seconds + intervalId = setInterval(pollSupervisor, 2000); + }, + + cancel() { + if (intervalId) { + clearInterval(intervalId); + } + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', // Disable nginx buffering + }, + }); +} diff --git a/app/page.tsx b/app/page.tsx index 533393c..9093d6d 100644 --- a/app/page.tsx +++ b/app/page.tsx @@ -1,16 +1,33 @@ 'use client'; +import { useState } from 'react'; import { Activity, Server, FileText, Settings } from 'lucide-react'; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; import { SystemStatus } from '@/components/process/SystemStatus'; import { ProcessStateChart } from '@/components/charts/ProcessStateChart'; import { ProcessUptimeChart } from '@/components/charts/ProcessUptimeChart'; import { GroupStatistics } from '@/components/charts/GroupStatistics'; +import { ConnectionStatus } from '@/components/ui/ConnectionStatus'; import { useProcesses } from '@/lib/hooks/useSupervisor'; +import { useEventSource } from '@/lib/hooks/useEventSource'; import { ProcessState } from '@/lib/supervisor/types'; export default function HomePage() { - const { data: processes, isLoading } = useProcesses(); + const [realtimeEnabled] = useState(true); + const { data: processes, isLoading, isError, refetch } = useProcesses(); + + // Real-time updates via Server-Sent Events + const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource( + '/api/supervisor/events', + { + enabled: realtimeEnabled && !isLoading && !isError, + onMessage: (message) => { + if (message.event === 'process-update') { + refetch(); + } + }, + } + ); const stats = { total: processes?.length ?? 0, @@ -22,13 +39,20 @@ export default function HomePage() { return (
{/* Header */} -
-

- Supervisor Dashboard -

-

- Monitor and manage your processes in real-time -

+
+
+

+ Supervisor Dashboard +

+

+ Monitor and manage your processes in real-time +

+
+
{/* System Status */} diff --git a/app/processes/page.tsx b/app/processes/page.tsx index 9a64f19..9b67b07 100644 --- a/app/processes/page.tsx +++ b/app/processes/page.tsx @@ -11,6 +11,8 @@ import { RefreshCw, AlertCircle, CheckSquare, Keyboard } from 'lucide-react'; import { Button } from '@/components/ui/button'; import { useKeyboardShortcuts } from '@/lib/hooks/useKeyboardShortcuts'; import { KeyboardShortcutsHelp } from '@/components/ui/KeyboardShortcutsHelp'; +import { ConnectionStatus } from '@/components/ui/ConnectionStatus'; +import { useEventSource } from '@/lib/hooks/useEventSource'; import type { ProcessInfo } from '@/lib/supervisor/types'; export default function ProcessesPage() { @@ -19,9 +21,30 @@ export default function ProcessesPage() { const [filteredProcesses, setFilteredProcesses] = useState([]); const [showShortcutsHelp, setShowShortcutsHelp] = useState(false); const [focusedIndex, setFocusedIndex] = useState(-1); + const [realtimeEnabled, setRealtimeEnabled] = useState(true); const searchInputRef = useRef(null); const { data: processes, isLoading, isError, refetch } = useProcesses(); + // Real-time updates via Server-Sent Events + const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource( + '/api/supervisor/events', + { + enabled: realtimeEnabled && !isLoading && !isError, + onMessage: (message) => { + if (message.event === 'process-update') { + // Invalidate and refetch process data + refetch(); + } + }, + onConnect: () => { + console.log('SSE connected'); + }, + onDisconnect: () => { + console.log('SSE disconnected'); + }, + } + ); + const handleFilterChange = useCallback((filtered: ProcessInfo[]) => { setFilteredProcesses(filtered); }, []); @@ -193,12 +216,19 @@ export default function ProcessesPage() { return (
-
-

Processes

-

- {displayedProcesses.length} of {processes?.length ?? 0} processes - {displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'} -

+
+
+

Processes

+

+ {displayedProcesses.length} of {processes?.length ?? 0} processes + {displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'} +

+
+
{viewMode === 'flat' && displayedProcesses.length > 0 && ( diff --git a/components/ui/ConnectionStatus.tsx b/components/ui/ConnectionStatus.tsx new file mode 100644 index 0000000..408b545 --- /dev/null +++ b/components/ui/ConnectionStatus.tsx @@ -0,0 +1,81 @@ +'use client'; + +import { Wifi, WifiOff, Loader2, AlertCircle } from 'lucide-react'; +import { Button } from '@/components/ui/button'; +import { ConnectionStatus as Status } from '@/lib/hooks/useEventSource'; +import { cn } from '@/lib/utils/cn'; + +interface ConnectionStatusProps { + status: Status; + reconnectAttempts?: number; + onReconnect?: () => void; +} + +export function ConnectionStatus({ status, reconnectAttempts = 0, onReconnect }: ConnectionStatusProps) { + const getStatusConfig = () => { + switch (status) { + case 'connected': + return { + icon: Wifi, + text: 'Live', + color: 'text-success', + bgColor: 'bg-success/10', + borderColor: 'border-success/20', + }; + case 'connecting': + return { + icon: Loader2, + text: 'Connecting...', + color: 'text-warning', + bgColor: 'bg-warning/10', + borderColor: 'border-warning/20', + animate: true, + }; + case 'error': + return { + icon: AlertCircle, + text: reconnectAttempts > 0 ? `Retrying (${reconnectAttempts})` : 'Error', + color: 'text-destructive', + bgColor: 'bg-destructive/10', + borderColor: 'border-destructive/20', + }; + case 'disconnected': + default: + return { + icon: WifiOff, + text: 'Offline', + color: 'text-muted-foreground', + bgColor: 'bg-muted', + borderColor: 'border-muted', + }; + } + }; + + const config = getStatusConfig(); + const Icon = config.icon; + + return ( +
+ + {config.text} + {(status === 'error' || status === 'disconnected') && onReconnect && ( + + )} +
+ ); +} diff --git a/lib/hooks/useEventSource.ts b/lib/hooks/useEventSource.ts new file mode 100644 index 0000000..6069fcd --- /dev/null +++ b/lib/hooks/useEventSource.ts @@ -0,0 +1,144 @@ +import { useEffect, useRef, useState, useCallback } from 'react'; + +export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error'; + +export interface EventSourceMessage { + event: string; + data: T; +} + +export interface UseEventSourceOptions { + enabled?: boolean; + reconnectInterval?: number; + maxReconnectAttempts?: number; + onMessage?: (message: EventSourceMessage) => void; + onError?: (error: Event) => void; + onConnect?: () => void; + onDisconnect?: () => void; +} + +export function useEventSource(url: string, options: UseEventSourceOptions = {}) { + const { + enabled = true, + reconnectInterval = 3000, + maxReconnectAttempts = 10, + onMessage, + onError, + onConnect, + onDisconnect, + } = options; + + const [status, setStatus] = useState('disconnected'); + const [reconnectAttempts, setReconnectAttempts] = useState(0); + const eventSourceRef = useRef(null); + const reconnectTimeoutRef = useRef(null); + + const connect = useCallback(() => { + if (!enabled || eventSourceRef.current) return; + + setStatus('connecting'); + + try { + const eventSource = new EventSource(url); + eventSourceRef.current = eventSource; + + eventSource.addEventListener('connected', () => { + setStatus('connected'); + setReconnectAttempts(0); + onConnect?.(); + }); + + eventSource.addEventListener('heartbeat', (event) => { + // Keep connection alive + if (status !== 'connected') { + setStatus('connected'); + } + }); + + eventSource.addEventListener('process-update', (event) => { + try { + const data = JSON.parse(event.data); + onMessage?.({ event: 'process-update', data }); + } catch (error) { + console.error('Failed to parse SSE message:', error); + } + }); + + eventSource.addEventListener('error', (event) => { + try { + const data = JSON.parse((event as MessageEvent).data); + onMessage?.({ event: 'error', data }); + } catch (error) { + // Not a message error, connection error + } + }); + + eventSource.onerror = (event) => { + console.error('EventSource error:', event); + setStatus('error'); + onError?.(event); + + // Close current connection + eventSource.close(); + eventSourceRef.current = null; + + // Attempt reconnection with exponential backoff + if (reconnectAttempts < maxReconnectAttempts) { + const delay = Math.min(reconnectInterval * Math.pow(2, reconnectAttempts), 30000); + console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})`); + + reconnectTimeoutRef.current = setTimeout(() => { + setReconnectAttempts((prev) => prev + 1); + connect(); + }, delay); + } else { + setStatus('disconnected'); + onDisconnect?.(); + } + }; + } catch (error) { + console.error('Failed to create EventSource:', error); + setStatus('error'); + } + }, [url, enabled, status, reconnectAttempts, maxReconnectAttempts, reconnectInterval, onMessage, onError, onConnect, onDisconnect]); + + const disconnect = useCallback(() => { + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + + if (eventSourceRef.current) { + eventSourceRef.current.close(); + eventSourceRef.current = null; + } + + setStatus('disconnected'); + setReconnectAttempts(0); + }, []); + + const reconnect = useCallback(() => { + disconnect(); + setReconnectAttempts(0); + connect(); + }, [disconnect, connect]); + + useEffect(() => { + if (enabled) { + connect(); + } else { + disconnect(); + } + + return () => { + disconnect(); + }; + }, [enabled, url]); + + return { + status, + reconnectAttempts, + reconnect, + disconnect, + }; +}