feat: implement Phase 11 - Real-time Updates with SSE
All checks were successful
Build and Push Docker Image to Gitea / build-and-push (push) Successful in 1m11s
All checks were successful
Build and Push Docker Image to Gitea / build-and-push (push) Successful in 1m11s
Features added: - Created SSE (Server-Sent Events) endpoint at /api/supervisor/events - Polls supervisor every 2 seconds for state changes - Sends process-update events when state changes detected - Sends heartbeat events to keep connection alive - Includes error handling with error events - Created useEventSource hook for managing SSE connections - Automatic reconnection with exponential backoff - Configurable max reconnection attempts (default 10) - Connection status tracking (connecting, connected, disconnected, error) - Clean event listener management with proper cleanup - Heartbeat monitoring for connection health - Created ConnectionStatus component - Visual status indicator with icons (Wifi, WifiOff, Loader, AlertCircle) - Color-coded states (green=connected, yellow=connecting, red=error) - Shows reconnection attempt count - Manual reconnect button when disconnected/error - Integrated real-time updates into dashboard and processes pages - Auto-refresh process data when state changes occur - Connection status indicator in page headers - No manual refresh needed for live updates - Implemented proper cleanup on unmount - EventSource properly closed - Reconnection timeouts cleared - No memory leaks 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
84
app/api/supervisor/events/route.ts
Normal file
84
app/api/supervisor/events/route.ts
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import { NextRequest } from 'next/server';
|
||||||
|
import { createSupervisorClient } from '@/lib/supervisor/client';
|
||||||
|
|
||||||
|
export const dynamic = 'force-dynamic';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server-Sent Events endpoint for real-time process state updates
|
||||||
|
* Polls supervisor every 2 seconds and sends state changes to clients
|
||||||
|
*/
|
||||||
|
export async function GET(request: NextRequest) {
|
||||||
|
const encoder = new TextEncoder();
|
||||||
|
let intervalId: NodeJS.Timeout | null = null;
|
||||||
|
let previousState: string | null = null;
|
||||||
|
|
||||||
|
const stream = new ReadableStream({
|
||||||
|
async start(controller) {
|
||||||
|
// Helper to send SSE message
|
||||||
|
const sendEvent = (event: string, data: any) => {
|
||||||
|
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
|
||||||
|
controller.enqueue(encoder.encode(message));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send initial connection message
|
||||||
|
sendEvent('connected', { timestamp: Date.now() });
|
||||||
|
|
||||||
|
// Poll supervisor for state changes
|
||||||
|
const pollSupervisor = async () => {
|
||||||
|
try {
|
||||||
|
const client = createSupervisorClient();
|
||||||
|
const processes = await client.getAllProcessInfo();
|
||||||
|
|
||||||
|
// Create state snapshot
|
||||||
|
const currentState = JSON.stringify(
|
||||||
|
processes.map((p) => ({
|
||||||
|
name: `${p.group}:${p.name}`,
|
||||||
|
state: p.state,
|
||||||
|
pid: p.pid,
|
||||||
|
statename: p.statename,
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send update if state changed
|
||||||
|
if (currentState !== previousState) {
|
||||||
|
sendEvent('process-update', {
|
||||||
|
processes,
|
||||||
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
|
previousState = currentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send heartbeat every poll
|
||||||
|
sendEvent('heartbeat', { timestamp: Date.now() });
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('SSE polling error:', error);
|
||||||
|
sendEvent('error', {
|
||||||
|
message: error.message || 'Failed to fetch process state',
|
||||||
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Initial poll
|
||||||
|
await pollSupervisor();
|
||||||
|
|
||||||
|
// Poll every 2 seconds
|
||||||
|
intervalId = setInterval(pollSupervisor, 2000);
|
||||||
|
},
|
||||||
|
|
||||||
|
cancel() {
|
||||||
|
if (intervalId) {
|
||||||
|
clearInterval(intervalId);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Response(stream, {
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
'Cache-Control': 'no-cache, no-transform',
|
||||||
|
'Connection': 'keep-alive',
|
||||||
|
'X-Accel-Buffering': 'no', // Disable nginx buffering
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
40
app/page.tsx
40
app/page.tsx
@@ -1,16 +1,33 @@
|
|||||||
'use client';
|
'use client';
|
||||||
|
|
||||||
|
import { useState } from 'react';
|
||||||
import { Activity, Server, FileText, Settings } from 'lucide-react';
|
import { Activity, Server, FileText, Settings } from 'lucide-react';
|
||||||
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
|
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
|
||||||
import { SystemStatus } from '@/components/process/SystemStatus';
|
import { SystemStatus } from '@/components/process/SystemStatus';
|
||||||
import { ProcessStateChart } from '@/components/charts/ProcessStateChart';
|
import { ProcessStateChart } from '@/components/charts/ProcessStateChart';
|
||||||
import { ProcessUptimeChart } from '@/components/charts/ProcessUptimeChart';
|
import { ProcessUptimeChart } from '@/components/charts/ProcessUptimeChart';
|
||||||
import { GroupStatistics } from '@/components/charts/GroupStatistics';
|
import { GroupStatistics } from '@/components/charts/GroupStatistics';
|
||||||
|
import { ConnectionStatus } from '@/components/ui/ConnectionStatus';
|
||||||
import { useProcesses } from '@/lib/hooks/useSupervisor';
|
import { useProcesses } from '@/lib/hooks/useSupervisor';
|
||||||
|
import { useEventSource } from '@/lib/hooks/useEventSource';
|
||||||
import { ProcessState } from '@/lib/supervisor/types';
|
import { ProcessState } from '@/lib/supervisor/types';
|
||||||
|
|
||||||
export default function HomePage() {
|
export default function HomePage() {
|
||||||
const { data: processes, isLoading } = useProcesses();
|
const [realtimeEnabled] = useState(true);
|
||||||
|
const { data: processes, isLoading, isError, refetch } = useProcesses();
|
||||||
|
|
||||||
|
// Real-time updates via Server-Sent Events
|
||||||
|
const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource(
|
||||||
|
'/api/supervisor/events',
|
||||||
|
{
|
||||||
|
enabled: realtimeEnabled && !isLoading && !isError,
|
||||||
|
onMessage: (message) => {
|
||||||
|
if (message.event === 'process-update') {
|
||||||
|
refetch();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
const stats = {
|
const stats = {
|
||||||
total: processes?.length ?? 0,
|
total: processes?.length ?? 0,
|
||||||
@@ -22,13 +39,20 @@ export default function HomePage() {
|
|||||||
return (
|
return (
|
||||||
<div className="space-y-8 animate-fade-in">
|
<div className="space-y-8 animate-fade-in">
|
||||||
{/* Header */}
|
{/* Header */}
|
||||||
<div>
|
<div className="flex items-start justify-between">
|
||||||
<h1 className="text-4xl font-bold bg-gradient-to-r from-primary to-accent bg-clip-text text-transparent">
|
<div>
|
||||||
Supervisor Dashboard
|
<h1 className="text-4xl font-bold bg-gradient-to-r from-primary to-accent bg-clip-text text-transparent">
|
||||||
</h1>
|
Supervisor Dashboard
|
||||||
<p className="text-muted-foreground mt-2">
|
</h1>
|
||||||
Monitor and manage your processes in real-time
|
<p className="text-muted-foreground mt-2">
|
||||||
</p>
|
Monitor and manage your processes in real-time
|
||||||
|
</p>
|
||||||
|
</div>
|
||||||
|
<ConnectionStatus
|
||||||
|
status={connectionStatus}
|
||||||
|
reconnectAttempts={reconnectAttempts}
|
||||||
|
onReconnect={reconnect}
|
||||||
|
/>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{/* System Status */}
|
{/* System Status */}
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ import { RefreshCw, AlertCircle, CheckSquare, Keyboard } from 'lucide-react';
|
|||||||
import { Button } from '@/components/ui/button';
|
import { Button } from '@/components/ui/button';
|
||||||
import { useKeyboardShortcuts } from '@/lib/hooks/useKeyboardShortcuts';
|
import { useKeyboardShortcuts } from '@/lib/hooks/useKeyboardShortcuts';
|
||||||
import { KeyboardShortcutsHelp } from '@/components/ui/KeyboardShortcutsHelp';
|
import { KeyboardShortcutsHelp } from '@/components/ui/KeyboardShortcutsHelp';
|
||||||
|
import { ConnectionStatus } from '@/components/ui/ConnectionStatus';
|
||||||
|
import { useEventSource } from '@/lib/hooks/useEventSource';
|
||||||
import type { ProcessInfo } from '@/lib/supervisor/types';
|
import type { ProcessInfo } from '@/lib/supervisor/types';
|
||||||
|
|
||||||
export default function ProcessesPage() {
|
export default function ProcessesPage() {
|
||||||
@@ -19,9 +21,30 @@ export default function ProcessesPage() {
|
|||||||
const [filteredProcesses, setFilteredProcesses] = useState<ProcessInfo[]>([]);
|
const [filteredProcesses, setFilteredProcesses] = useState<ProcessInfo[]>([]);
|
||||||
const [showShortcutsHelp, setShowShortcutsHelp] = useState(false);
|
const [showShortcutsHelp, setShowShortcutsHelp] = useState(false);
|
||||||
const [focusedIndex, setFocusedIndex] = useState<number>(-1);
|
const [focusedIndex, setFocusedIndex] = useState<number>(-1);
|
||||||
|
const [realtimeEnabled, setRealtimeEnabled] = useState(true);
|
||||||
const searchInputRef = useRef<HTMLInputElement>(null);
|
const searchInputRef = useRef<HTMLInputElement>(null);
|
||||||
const { data: processes, isLoading, isError, refetch } = useProcesses();
|
const { data: processes, isLoading, isError, refetch } = useProcesses();
|
||||||
|
|
||||||
|
// Real-time updates via Server-Sent Events
|
||||||
|
const { status: connectionStatus, reconnectAttempts, reconnect } = useEventSource(
|
||||||
|
'/api/supervisor/events',
|
||||||
|
{
|
||||||
|
enabled: realtimeEnabled && !isLoading && !isError,
|
||||||
|
onMessage: (message) => {
|
||||||
|
if (message.event === 'process-update') {
|
||||||
|
// Invalidate and refetch process data
|
||||||
|
refetch();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onConnect: () => {
|
||||||
|
console.log('SSE connected');
|
||||||
|
},
|
||||||
|
onDisconnect: () => {
|
||||||
|
console.log('SSE disconnected');
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
const handleFilterChange = useCallback((filtered: ProcessInfo[]) => {
|
const handleFilterChange = useCallback((filtered: ProcessInfo[]) => {
|
||||||
setFilteredProcesses(filtered);
|
setFilteredProcesses(filtered);
|
||||||
}, []);
|
}, []);
|
||||||
@@ -193,12 +216,19 @@ export default function ProcessesPage() {
|
|||||||
return (
|
return (
|
||||||
<div className="space-y-6 animate-fade-in">
|
<div className="space-y-6 animate-fade-in">
|
||||||
<div className="flex items-center justify-between">
|
<div className="flex items-center justify-between">
|
||||||
<div>
|
<div className="flex items-center gap-4">
|
||||||
<h1 className="text-3xl font-bold">Processes</h1>
|
<div>
|
||||||
<p className="text-muted-foreground mt-1">
|
<h1 className="text-3xl font-bold">Processes</h1>
|
||||||
{displayedProcesses.length} of {processes?.length ?? 0} processes
|
<p className="text-muted-foreground mt-1">
|
||||||
{displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'}
|
{displayedProcesses.length} of {processes?.length ?? 0} processes
|
||||||
</p>
|
{displayedProcesses.length !== (processes?.length ?? 0) && ' (filtered)'}
|
||||||
|
</p>
|
||||||
|
</div>
|
||||||
|
<ConnectionStatus
|
||||||
|
status={connectionStatus}
|
||||||
|
reconnectAttempts={reconnectAttempts}
|
||||||
|
onReconnect={reconnect}
|
||||||
|
/>
|
||||||
</div>
|
</div>
|
||||||
<div className="flex items-center gap-4">
|
<div className="flex items-center gap-4">
|
||||||
{viewMode === 'flat' && displayedProcesses.length > 0 && (
|
{viewMode === 'flat' && displayedProcesses.length > 0 && (
|
||||||
|
|||||||
81
components/ui/ConnectionStatus.tsx
Normal file
81
components/ui/ConnectionStatus.tsx
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
'use client';
|
||||||
|
|
||||||
|
import { Wifi, WifiOff, Loader2, AlertCircle } from 'lucide-react';
|
||||||
|
import { Button } from '@/components/ui/button';
|
||||||
|
import { ConnectionStatus as Status } from '@/lib/hooks/useEventSource';
|
||||||
|
import { cn } from '@/lib/utils/cn';
|
||||||
|
|
||||||
|
interface ConnectionStatusProps {
|
||||||
|
status: Status;
|
||||||
|
reconnectAttempts?: number;
|
||||||
|
onReconnect?: () => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ConnectionStatus({ status, reconnectAttempts = 0, onReconnect }: ConnectionStatusProps) {
|
||||||
|
const getStatusConfig = () => {
|
||||||
|
switch (status) {
|
||||||
|
case 'connected':
|
||||||
|
return {
|
||||||
|
icon: Wifi,
|
||||||
|
text: 'Live',
|
||||||
|
color: 'text-success',
|
||||||
|
bgColor: 'bg-success/10',
|
||||||
|
borderColor: 'border-success/20',
|
||||||
|
};
|
||||||
|
case 'connecting':
|
||||||
|
return {
|
||||||
|
icon: Loader2,
|
||||||
|
text: 'Connecting...',
|
||||||
|
color: 'text-warning',
|
||||||
|
bgColor: 'bg-warning/10',
|
||||||
|
borderColor: 'border-warning/20',
|
||||||
|
animate: true,
|
||||||
|
};
|
||||||
|
case 'error':
|
||||||
|
return {
|
||||||
|
icon: AlertCircle,
|
||||||
|
text: reconnectAttempts > 0 ? `Retrying (${reconnectAttempts})` : 'Error',
|
||||||
|
color: 'text-destructive',
|
||||||
|
bgColor: 'bg-destructive/10',
|
||||||
|
borderColor: 'border-destructive/20',
|
||||||
|
};
|
||||||
|
case 'disconnected':
|
||||||
|
default:
|
||||||
|
return {
|
||||||
|
icon: WifiOff,
|
||||||
|
text: 'Offline',
|
||||||
|
color: 'text-muted-foreground',
|
||||||
|
bgColor: 'bg-muted',
|
||||||
|
borderColor: 'border-muted',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const config = getStatusConfig();
|
||||||
|
const Icon = config.icon;
|
||||||
|
|
||||||
|
return (
|
||||||
|
<div
|
||||||
|
className={cn(
|
||||||
|
'flex items-center gap-2 px-3 py-1.5 rounded-md border text-sm',
|
||||||
|
config.bgColor,
|
||||||
|
config.borderColor
|
||||||
|
)}
|
||||||
|
>
|
||||||
|
<Icon
|
||||||
|
className={cn('h-4 w-4', config.color, config.animate && 'animate-spin')}
|
||||||
|
/>
|
||||||
|
<span className={cn('font-medium', config.color)}>{config.text}</span>
|
||||||
|
{(status === 'error' || status === 'disconnected') && onReconnect && (
|
||||||
|
<Button
|
||||||
|
variant="ghost"
|
||||||
|
size="sm"
|
||||||
|
onClick={onReconnect}
|
||||||
|
className="h-6 px-2 ml-1"
|
||||||
|
>
|
||||||
|
Reconnect
|
||||||
|
</Button>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
}
|
||||||
144
lib/hooks/useEventSource.ts
Normal file
144
lib/hooks/useEventSource.ts
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
import { useEffect, useRef, useState, useCallback } from 'react';
|
||||||
|
|
||||||
|
export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error';
|
||||||
|
|
||||||
|
export interface EventSourceMessage<T = any> {
|
||||||
|
event: string;
|
||||||
|
data: T;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface UseEventSourceOptions {
|
||||||
|
enabled?: boolean;
|
||||||
|
reconnectInterval?: number;
|
||||||
|
maxReconnectAttempts?: number;
|
||||||
|
onMessage?: (message: EventSourceMessage) => void;
|
||||||
|
onError?: (error: Event) => void;
|
||||||
|
onConnect?: () => void;
|
||||||
|
onDisconnect?: () => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function useEventSource(url: string, options: UseEventSourceOptions = {}) {
|
||||||
|
const {
|
||||||
|
enabled = true,
|
||||||
|
reconnectInterval = 3000,
|
||||||
|
maxReconnectAttempts = 10,
|
||||||
|
onMessage,
|
||||||
|
onError,
|
||||||
|
onConnect,
|
||||||
|
onDisconnect,
|
||||||
|
} = options;
|
||||||
|
|
||||||
|
const [status, setStatus] = useState<ConnectionStatus>('disconnected');
|
||||||
|
const [reconnectAttempts, setReconnectAttempts] = useState(0);
|
||||||
|
const eventSourceRef = useRef<EventSource | null>(null);
|
||||||
|
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||||
|
|
||||||
|
const connect = useCallback(() => {
|
||||||
|
if (!enabled || eventSourceRef.current) return;
|
||||||
|
|
||||||
|
setStatus('connecting');
|
||||||
|
|
||||||
|
try {
|
||||||
|
const eventSource = new EventSource(url);
|
||||||
|
eventSourceRef.current = eventSource;
|
||||||
|
|
||||||
|
eventSource.addEventListener('connected', () => {
|
||||||
|
setStatus('connected');
|
||||||
|
setReconnectAttempts(0);
|
||||||
|
onConnect?.();
|
||||||
|
});
|
||||||
|
|
||||||
|
eventSource.addEventListener('heartbeat', (event) => {
|
||||||
|
// Keep connection alive
|
||||||
|
if (status !== 'connected') {
|
||||||
|
setStatus('connected');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
eventSource.addEventListener('process-update', (event) => {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(event.data);
|
||||||
|
onMessage?.({ event: 'process-update', data });
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Failed to parse SSE message:', error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
eventSource.addEventListener('error', (event) => {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse((event as MessageEvent).data);
|
||||||
|
onMessage?.({ event: 'error', data });
|
||||||
|
} catch (error) {
|
||||||
|
// Not a message error, connection error
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
eventSource.onerror = (event) => {
|
||||||
|
console.error('EventSource error:', event);
|
||||||
|
setStatus('error');
|
||||||
|
onError?.(event);
|
||||||
|
|
||||||
|
// Close current connection
|
||||||
|
eventSource.close();
|
||||||
|
eventSourceRef.current = null;
|
||||||
|
|
||||||
|
// Attempt reconnection with exponential backoff
|
||||||
|
if (reconnectAttempts < maxReconnectAttempts) {
|
||||||
|
const delay = Math.min(reconnectInterval * Math.pow(2, reconnectAttempts), 30000);
|
||||||
|
console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})`);
|
||||||
|
|
||||||
|
reconnectTimeoutRef.current = setTimeout(() => {
|
||||||
|
setReconnectAttempts((prev) => prev + 1);
|
||||||
|
connect();
|
||||||
|
}, delay);
|
||||||
|
} else {
|
||||||
|
setStatus('disconnected');
|
||||||
|
onDisconnect?.();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Failed to create EventSource:', error);
|
||||||
|
setStatus('error');
|
||||||
|
}
|
||||||
|
}, [url, enabled, status, reconnectAttempts, maxReconnectAttempts, reconnectInterval, onMessage, onError, onConnect, onDisconnect]);
|
||||||
|
|
||||||
|
const disconnect = useCallback(() => {
|
||||||
|
if (reconnectTimeoutRef.current) {
|
||||||
|
clearTimeout(reconnectTimeoutRef.current);
|
||||||
|
reconnectTimeoutRef.current = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eventSourceRef.current) {
|
||||||
|
eventSourceRef.current.close();
|
||||||
|
eventSourceRef.current = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
setStatus('disconnected');
|
||||||
|
setReconnectAttempts(0);
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const reconnect = useCallback(() => {
|
||||||
|
disconnect();
|
||||||
|
setReconnectAttempts(0);
|
||||||
|
connect();
|
||||||
|
}, [disconnect, connect]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (enabled) {
|
||||||
|
connect();
|
||||||
|
} else {
|
||||||
|
disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
disconnect();
|
||||||
|
};
|
||||||
|
}, [enabled, url]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
status,
|
||||||
|
reconnectAttempts,
|
||||||
|
reconnect,
|
||||||
|
disconnect,
|
||||||
|
};
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user