185 lines
4.4 KiB
TypeScript
185 lines
4.4 KiB
TypeScript
|
|
import type { FilterType, FilterParams } from '@/types/filter';
|
||
|
|
|
||
|
|
interface WorkerTask {
|
||
|
|
type: FilterType;
|
||
|
|
imageData: ImageData;
|
||
|
|
params: FilterParams;
|
||
|
|
resolve: (data: ImageData) => void;
|
||
|
|
reject: (error: Error) => void;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Worker Pool Manager
|
||
|
|
* Manages a pool of Web Workers for parallel filter processing
|
||
|
|
*/
|
||
|
|
export class WorkerPool {
|
||
|
|
private workers: Worker[] = [];
|
||
|
|
private availableWorkers: Worker[] = [];
|
||
|
|
private taskQueue: WorkerTask[] = [];
|
||
|
|
private maxWorkers: number;
|
||
|
|
|
||
|
|
constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
|
||
|
|
this.maxWorkers = Math.min(maxWorkers, 8); // Cap at 8 workers
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Initialize the worker pool
|
||
|
|
*/
|
||
|
|
private initializeWorker(): Worker {
|
||
|
|
const worker = new Worker(new URL('../workers/filter.worker.ts', import.meta.url));
|
||
|
|
|
||
|
|
worker.onmessage = (e: MessageEvent) => {
|
||
|
|
const { success, data, error } = e.data;
|
||
|
|
|
||
|
|
// Find the task associated with this worker
|
||
|
|
const taskIndex = this.taskQueue.findIndex((task) => {
|
||
|
|
// This is a simple check - in production you'd want a better task tracking system
|
||
|
|
return true;
|
||
|
|
});
|
||
|
|
|
||
|
|
if (taskIndex !== -1) {
|
||
|
|
const task = this.taskQueue.splice(taskIndex, 1)[0];
|
||
|
|
|
||
|
|
if (success) {
|
||
|
|
// Create ImageData from the returned buffer
|
||
|
|
const imageData = new ImageData(
|
||
|
|
new Uint8ClampedArray(data),
|
||
|
|
task.imageData.width,
|
||
|
|
task.imageData.height
|
||
|
|
);
|
||
|
|
task.resolve(imageData);
|
||
|
|
} else {
|
||
|
|
task.reject(new Error(error || 'Worker processing failed'));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Mark worker as available and process next task
|
||
|
|
this.availableWorkers.push(worker);
|
||
|
|
this.processNextTask();
|
||
|
|
};
|
||
|
|
|
||
|
|
worker.onerror = (error) => {
|
||
|
|
console.error('Worker error:', error);
|
||
|
|
// Mark worker as available even on error
|
||
|
|
this.availableWorkers.push(worker);
|
||
|
|
this.processNextTask();
|
||
|
|
};
|
||
|
|
|
||
|
|
this.workers.push(worker);
|
||
|
|
this.availableWorkers.push(worker);
|
||
|
|
|
||
|
|
return worker;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Process the next task in the queue
|
||
|
|
*/
|
||
|
|
private processNextTask(): void {
|
||
|
|
if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const worker = this.availableWorkers.pop()!;
|
||
|
|
const task = this.taskQueue.shift()!;
|
||
|
|
|
||
|
|
// Clone the image data for the worker
|
||
|
|
const data = new Uint8ClampedArray(task.imageData.data);
|
||
|
|
|
||
|
|
// Send task to worker (transfer ownership of the buffer for better performance)
|
||
|
|
worker.postMessage(
|
||
|
|
{
|
||
|
|
type: task.type,
|
||
|
|
data: data,
|
||
|
|
width: task.imageData.width,
|
||
|
|
height: task.imageData.height,
|
||
|
|
params: task.params,
|
||
|
|
},
|
||
|
|
{ transfer: [data.buffer] }
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Execute a filter using the worker pool
|
||
|
|
*/
|
||
|
|
async executeFilter(
|
||
|
|
imageData: ImageData,
|
||
|
|
type: FilterType,
|
||
|
|
params: FilterParams
|
||
|
|
): Promise<ImageData> {
|
||
|
|
return new Promise((resolve, reject) => {
|
||
|
|
// Ensure we have at least one worker
|
||
|
|
if (this.workers.length === 0) {
|
||
|
|
this.initializeWorker();
|
||
|
|
}
|
||
|
|
|
||
|
|
// Add task to queue
|
||
|
|
this.taskQueue.push({
|
||
|
|
type,
|
||
|
|
imageData,
|
||
|
|
params,
|
||
|
|
resolve,
|
||
|
|
reject,
|
||
|
|
});
|
||
|
|
|
||
|
|
// Try to process immediately if workers are available
|
||
|
|
this.processNextTask();
|
||
|
|
|
||
|
|
// If no workers available but we can create more, do so
|
||
|
|
if (
|
||
|
|
this.availableWorkers.length === 0 &&
|
||
|
|
this.workers.length < this.maxWorkers
|
||
|
|
) {
|
||
|
|
this.initializeWorker();
|
||
|
|
this.processNextTask();
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Terminate all workers and clear the pool
|
||
|
|
*/
|
||
|
|
terminate(): void {
|
||
|
|
this.workers.forEach((worker) => worker.terminate());
|
||
|
|
this.workers = [];
|
||
|
|
this.availableWorkers = [];
|
||
|
|
this.taskQueue = [];
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Get the number of active workers
|
||
|
|
*/
|
||
|
|
get activeWorkers(): number {
|
||
|
|
return this.workers.length - this.availableWorkers.length;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Get the number of queued tasks
|
||
|
|
*/
|
||
|
|
get queuedTasks(): number {
|
||
|
|
return this.taskQueue.length;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Singleton instance
|
||
|
|
let workerPool: WorkerPool | null = null;
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Get the global worker pool instance
|
||
|
|
*/
|
||
|
|
export function getWorkerPool(): WorkerPool {
|
||
|
|
if (!workerPool) {
|
||
|
|
workerPool = new WorkerPool();
|
||
|
|
}
|
||
|
|
return workerPool;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Clean up the worker pool (call on app unmount)
|
||
|
|
*/
|
||
|
|
export function terminateWorkerPool(): void {
|
||
|
|
if (workerPool) {
|
||
|
|
workerPool.terminate();
|
||
|
|
workerPool = null;
|
||
|
|
}
|
||
|
|
}
|