diff --git a/app/api/supervisor/events/route.ts b/app/api/supervisor/events/route.ts
new file mode 100644
index 0000000..5b03545
--- /dev/null
+++ b/app/api/supervisor/events/route.ts
@@ -0,0 +1,84 @@
+import { NextRequest } from 'next/server';
+import { createSupervisorClient } from '@/lib/supervisor/client';
+
+export const dynamic = 'force-dynamic';
+
+/**
+ * Server-Sent Events endpoint for real-time process state updates
+ * Polls supervisor every 2 seconds and sends state changes to clients
+ */
+export async function GET(request: NextRequest) {
+ const encoder = new TextEncoder();
+ let intervalId: NodeJS.Timeout | null = null;
+ let previousState: string | null = null;
+
+ const stream = new ReadableStream({
+ async start(controller) {
+ // Helper to send SSE message
+ const sendEvent = (event: string, data: any) => {
+ const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
+ controller.enqueue(encoder.encode(message));
+ };
+
+ // Send initial connection message
+ sendEvent('connected', { timestamp: Date.now() });
+
+ // Poll supervisor for state changes
+ const pollSupervisor = async () => {
+ try {
+ const client = createSupervisorClient();
+ const processes = await client.getAllProcessInfo();
+
+ // Create state snapshot
+ const currentState = JSON.stringify(
+ processes.map((p) => ({
+ name: `${p.group}:${p.name}`,
+ state: p.state,
+ pid: p.pid,
+ statename: p.statename,
+ }))
+ );
+
+ // Send update if state changed
+ if (currentState !== previousState) {
+ sendEvent('process-update', {
+ processes,
+ timestamp: Date.now(),
+ });
+ previousState = currentState;
+ }
+
+ // Send heartbeat every poll
+ sendEvent('heartbeat', { timestamp: Date.now() });
+ } catch (error: any) {
+ console.error('SSE polling error:', error);
+ sendEvent('error', {
+ message: error.message || 'Failed to fetch process state',
+ timestamp: Date.now(),
+ });
+ }
+ };
+
+ // Initial poll
+ await pollSupervisor();
+
+ // Poll every 2 seconds
+ intervalId = setInterval(pollSupervisor, 2000);
+ },
+
+ cancel() {
+ if (intervalId) {
+ clearInterval(intervalId);
+ }
+ },
+ });
+
+ return new Response(stream, {
+ headers: {
+ 'Content-Type': 'text/event-stream',
+ 'Cache-Control': 'no-cache, no-transform',
+ 'Connection': 'keep-alive',
+ 'X-Accel-Buffering': 'no', // Disable nginx buffering
+ },
+ });
+}
diff --git a/app/page.tsx b/app/page.tsx
index 533393c..9093d6d 100644
--- a/app/page.tsx
+++ b/app/page.tsx
@@ -1,16 +1,33 @@
'use client';
+import { useState } from 'react';
import { Activity, Server, FileText, Settings } from 'lucide-react';
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
import { SystemStatus } from '@/components/process/SystemStatus';
import { ProcessStateChart } from '@/components/charts/ProcessStateChart';
import { ProcessUptimeChart } from '@/components/charts/ProcessUptimeChart';
import { GroupStatistics } from '@/components/charts/GroupStatistics';
+import { ConnectionStatus } from '@/components/ui/ConnectionStatus';
import { useProcesses } from '@/lib/hooks/useSupervisor';
+import { useEventSource } from '@/lib/hooks/useEventSource';
import { ProcessState } from '@/lib/supervisor/types';
export default function HomePage() {
- const { data: processes, isLoading } = useProcesses();
+ const [realtimeEnabled] = useState(true);
+ const { data: processes, isLoading, isError, refetch } = useProcesses();
+
+ // Real-time updates via Server-Sent Events
+ const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource(
+ '/api/supervisor/events',
+ {
+ enabled: realtimeEnabled && !isLoading && !isError,
+ onMessage: (message) => {
+ if (message.event === 'process-update') {
+ refetch();
+ }
+ },
+ }
+ );
const stats = {
total: processes?.length ?? 0,
@@ -22,13 +39,20 @@ export default function HomePage() {
return (
{/* Header */}
-
-
- Supervisor Dashboard
-
-
- Monitor and manage your processes in real-time
-
+
+
+
+ Supervisor Dashboard
+
+
+ Monitor and manage your processes in real-time
+
+
+
{/* System Status */}
diff --git a/app/processes/page.tsx b/app/processes/page.tsx
index 9a64f19..9b67b07 100644
--- a/app/processes/page.tsx
+++ b/app/processes/page.tsx
@@ -11,6 +11,8 @@ import { RefreshCw, AlertCircle, CheckSquare, Keyboard } from 'lucide-react';
import { Button } from '@/components/ui/button';
import { useKeyboardShortcuts } from '@/lib/hooks/useKeyboardShortcuts';
import { KeyboardShortcutsHelp } from '@/components/ui/KeyboardShortcutsHelp';
+import { ConnectionStatus } from '@/components/ui/ConnectionStatus';
+import { useEventSource } from '@/lib/hooks/useEventSource';
import type { ProcessInfo } from '@/lib/supervisor/types';
export default function ProcessesPage() {
@@ -19,9 +21,30 @@ export default function ProcessesPage() {
const [filteredProcesses, setFilteredProcesses] = useState
([]);
const [showShortcutsHelp, setShowShortcutsHelp] = useState(false);
const [focusedIndex, setFocusedIndex] = useState(-1);
+ const [realtimeEnabled, setRealtimeEnabled] = useState(true);
const searchInputRef = useRef(null);
const { data: processes, isLoading, isError, refetch } = useProcesses();
+ // Real-time updates via Server-Sent Events
+ const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource(
+ '/api/supervisor/events',
+ {
+ enabled: realtimeEnabled && !isLoading && !isError,
+ onMessage: (message) => {
+ if (message.event === 'process-update') {
+ // Invalidate and refetch process data
+ refetch();
+ }
+ },
+ onConnect: () => {
+ console.log('SSE connected');
+ },
+ onDisconnect: () => {
+ console.log('SSE disconnected');
+ },
+ }
+ );
+
const handleFilterChange = useCallback((filtered: ProcessInfo[]) => {
setFilteredProcesses(filtered);
}, []);
@@ -193,12 +216,19 @@ export default function ProcessesPage() {
return (
-
-
Processes
-
- {displayedProcesses.length} of {processes?.length ?? 0} processes
- {displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'}
-
+
+
+
Processes
+
+ {displayedProcesses.length} of {processes?.length ?? 0} processes
+ {displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'}
+
+
+
{viewMode === 'flat' && displayedProcesses.length > 0 && (
diff --git a/components/ui/ConnectionStatus.tsx b/components/ui/ConnectionStatus.tsx
new file mode 100644
index 0000000..408b545
--- /dev/null
+++ b/components/ui/ConnectionStatus.tsx
@@ -0,0 +1,81 @@
+'use client';
+
+import { Wifi, WifiOff, Loader2, AlertCircle } from 'lucide-react';
+import { Button } from '@/components/ui/button';
+import { ConnectionStatus as Status } from '@/lib/hooks/useEventSource';
+import { cn } from '@/lib/utils/cn';
+
+interface ConnectionStatusProps {
+ status: Status;
+ reconnectAttempts?: number;
+ onReconnect?: () => void;
+}
+
+export function ConnectionStatus({ status, reconnectAttempts = 0, onReconnect }: ConnectionStatusProps) {
+ const getStatusConfig = () => {
+ switch (status) {
+ case 'connected':
+ return {
+ icon: Wifi,
+ text: 'Live',
+ color: 'text-success',
+ bgColor: 'bg-success/10',
+ borderColor: 'border-success/20',
+ };
+ case 'connecting':
+ return {
+ icon: Loader2,
+ text: 'Connecting...',
+ color: 'text-warning',
+ bgColor: 'bg-warning/10',
+ borderColor: 'border-warning/20',
+ animate: true,
+ };
+ case 'error':
+ return {
+ icon: AlertCircle,
+ text: reconnectAttempts > 0 ? `Retrying (${reconnectAttempts})` : 'Error',
+ color: 'text-destructive',
+ bgColor: 'bg-destructive/10',
+ borderColor: 'border-destructive/20',
+ };
+ case 'disconnected':
+ default:
+ return {
+ icon: WifiOff,
+ text: 'Offline',
+ color: 'text-muted-foreground',
+ bgColor: 'bg-muted',
+ borderColor: 'border-muted',
+ };
+ }
+ };
+
+ const config = getStatusConfig();
+ const Icon = config.icon;
+
+ return (
+
+
+ {config.text}
+ {(status === 'error' || status === 'disconnected') && onReconnect && (
+
+ )}
+
+ );
+}
diff --git a/lib/hooks/useEventSource.ts b/lib/hooks/useEventSource.ts
new file mode 100644
index 0000000..6069fcd
--- /dev/null
+++ b/lib/hooks/useEventSource.ts
@@ -0,0 +1,144 @@
+import { useEffect, useRef, useState, useCallback } from 'react';
+
+export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error';
+
+export interface EventSourceMessage
{
+ event: string;
+ data: T;
+}
+
+export interface UseEventSourceOptions {
+ enabled?: boolean;
+ reconnectInterval?: number;
+ maxReconnectAttempts?: number;
+ onMessage?: (message: EventSourceMessage) => void;
+ onError?: (error: Event) => void;
+ onConnect?: () => void;
+ onDisconnect?: () => void;
+}
+
+export function useEventSource(url: string, options: UseEventSourceOptions = {}) {
+ const {
+ enabled = true,
+ reconnectInterval = 3000,
+ maxReconnectAttempts = 10,
+ onMessage,
+ onError,
+ onConnect,
+ onDisconnect,
+ } = options;
+
+ const [status, setStatus] = useState('disconnected');
+ const [reconnectAttempts, setReconnectAttempts] = useState(0);
+ const eventSourceRef = useRef(null);
+ const reconnectTimeoutRef = useRef(null);
+
+ const connect = useCallback(() => {
+ if (!enabled || eventSourceRef.current) return;
+
+ setStatus('connecting');
+
+ try {
+ const eventSource = new EventSource(url);
+ eventSourceRef.current = eventSource;
+
+ eventSource.addEventListener('connected', () => {
+ setStatus('connected');
+ setReconnectAttempts(0);
+ onConnect?.();
+ });
+
+ eventSource.addEventListener('heartbeat', (event) => {
+ // Keep connection alive
+ if (status !== 'connected') {
+ setStatus('connected');
+ }
+ });
+
+ eventSource.addEventListener('process-update', (event) => {
+ try {
+ const data = JSON.parse(event.data);
+ onMessage?.({ event: 'process-update', data });
+ } catch (error) {
+ console.error('Failed to parse SSE message:', error);
+ }
+ });
+
+ eventSource.addEventListener('error', (event) => {
+ try {
+ const data = JSON.parse((event as MessageEvent).data);
+ onMessage?.({ event: 'error', data });
+ } catch (error) {
+ // Not a message error, connection error
+ }
+ });
+
+ eventSource.onerror = (event) => {
+ console.error('EventSource error:', event);
+ setStatus('error');
+ onError?.(event);
+
+ // Close current connection
+ eventSource.close();
+ eventSourceRef.current = null;
+
+ // Attempt reconnection with exponential backoff
+ if (reconnectAttempts < maxReconnectAttempts) {
+ const delay = Math.min(reconnectInterval * Math.pow(2, reconnectAttempts), 30000);
+ console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})`);
+
+ reconnectTimeoutRef.current = setTimeout(() => {
+ setReconnectAttempts((prev) => prev + 1);
+ connect();
+ }, delay);
+ } else {
+ setStatus('disconnected');
+ onDisconnect?.();
+ }
+ };
+ } catch (error) {
+ console.error('Failed to create EventSource:', error);
+ setStatus('error');
+ }
+ }, [url, enabled, status, reconnectAttempts, maxReconnectAttempts, reconnectInterval, onMessage, onError, onConnect, onDisconnect]);
+
+ const disconnect = useCallback(() => {
+ if (reconnectTimeoutRef.current) {
+ clearTimeout(reconnectTimeoutRef.current);
+ reconnectTimeoutRef.current = null;
+ }
+
+ if (eventSourceRef.current) {
+ eventSourceRef.current.close();
+ eventSourceRef.current = null;
+ }
+
+ setStatus('disconnected');
+ setReconnectAttempts(0);
+ }, []);
+
+ const reconnect = useCallback(() => {
+ disconnect();
+ setReconnectAttempts(0);
+ connect();
+ }, [disconnect, connect]);
+
+ useEffect(() => {
+ if (enabled) {
+ connect();
+ } else {
+ disconnect();
+ }
+
+ return () => {
+ disconnect();
+ };
+ }, [enabled, url]);
+
+ return {
+ status,
+ reconnectAttempts,
+ reconnect,
+ disconnect,
+ };
+}