feat: add BullMQ job queue with admin monitoring UI
- Add BullMQ to backend; mail jobs (verification, password reset) now enqueued instead of sent inline - Mail worker processes jobs with 3-attempt exponential backoff retry - Admin GraphQL resolvers: adminQueues, adminQueueJobs, adminRetryJob, adminRemoveJob, adminPauseQueue, adminResumeQueue - Admin frontend page at /admin/queues: queue cards with counts, job table with status filter, retry/remove/pause actions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -14,14 +14,15 @@
|
||||
"check": "tsc --noEmit"
|
||||
},
|
||||
"dependencies": {
|
||||
"@sexy.pivoine.art/types": "workspace:*",
|
||||
"@fastify/cookie": "^11.0.2",
|
||||
"@fastify/cors": "^10.0.2",
|
||||
"@fastify/multipart": "^9.0.3",
|
||||
"@fastify/static": "^8.1.1",
|
||||
"@pothos/core": "^4.4.0",
|
||||
"@pothos/plugin-errors": "^4.2.0",
|
||||
"@sexy.pivoine.art/types": "workspace:*",
|
||||
"argon2": "^0.43.0",
|
||||
"bullmq": "^5.70.4",
|
||||
"drizzle-orm": "^0.44.1",
|
||||
"fastify": "^5.4.0",
|
||||
"fluent-ffmpeg": "^2.1.3",
|
||||
|
||||
@@ -9,6 +9,7 @@ import "./resolvers/recordings.js";
|
||||
import "./resolvers/comments.js";
|
||||
import "./resolvers/gamification.js";
|
||||
import "./resolvers/stats.js";
|
||||
import "./resolvers/queues.js";
|
||||
import { builder } from "./builder";
|
||||
|
||||
export const schema = builder.toSchema();
|
||||
|
||||
@@ -9,7 +9,7 @@ interface ReplyLike {
|
||||
}
|
||||
import { hash, verify as verifyArgon } from "../../lib/argon";
|
||||
import { setSession, deleteSession } from "../../lib/auth";
|
||||
import { sendVerification, sendPasswordReset } from "../../lib/email";
|
||||
import { enqueueVerification, enqueuePasswordReset } from "../../lib/email";
|
||||
import { slugify } from "../../lib/slugify";
|
||||
import { nanoid } from "nanoid";
|
||||
|
||||
@@ -131,9 +131,9 @@ builder.mutationField("register", (t) =>
|
||||
});
|
||||
|
||||
try {
|
||||
await sendVerification(args.email, verifyToken);
|
||||
await enqueueVerification(args.email, verifyToken);
|
||||
} catch (e) {
|
||||
console.warn("Failed to send verification email:", (e as Error).message);
|
||||
console.warn("Failed to enqueue verification email:", (e as Error).message);
|
||||
}
|
||||
return true;
|
||||
},
|
||||
@@ -190,9 +190,9 @@ builder.mutationField("requestPasswordReset", (t) =>
|
||||
.where(eq(users.id, user[0].id));
|
||||
|
||||
try {
|
||||
await sendPasswordReset(args.email, token);
|
||||
await enqueuePasswordReset(args.email, token);
|
||||
} catch (e) {
|
||||
console.warn("Failed to send password reset email:", (e as Error).message);
|
||||
console.warn("Failed to enqueue password reset email:", (e as Error).message);
|
||||
}
|
||||
return true;
|
||||
},
|
||||
|
||||
153
packages/backend/src/graphql/resolvers/queues.ts
Normal file
153
packages/backend/src/graphql/resolvers/queues.ts
Normal file
@@ -0,0 +1,153 @@
|
||||
import { GraphQLError } from "graphql";
|
||||
import type { Job } from "bullmq";
|
||||
import { builder } from "../builder.js";
|
||||
import { JobType, QueueInfoType } from "../types/index.js";
|
||||
import { queues } from "../../queues/index.js";
|
||||
import { requireAdmin } from "../../lib/acl.js";
|
||||
|
||||
const JOB_STATUSES = ["waiting", "active", "completed", "failed", "delayed"] as const;
|
||||
type JobStatus = (typeof JOB_STATUSES)[number];
|
||||
|
||||
async function toJobData(job: Job, queueName: string) {
|
||||
const status = await job.getState();
|
||||
return {
|
||||
id: job.id ?? "",
|
||||
name: job.name,
|
||||
queue: queueName,
|
||||
status,
|
||||
data: job.data as unknown,
|
||||
result: job.returnvalue as unknown,
|
||||
failedReason: job.failedReason ?? null,
|
||||
attemptsMade: job.attemptsMade,
|
||||
createdAt: new Date(job.timestamp),
|
||||
processedAt: job.processedOn ? new Date(job.processedOn) : null,
|
||||
finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
|
||||
progress: typeof job.progress === "number" ? job.progress : null,
|
||||
};
|
||||
}
|
||||
|
||||
builder.queryField("adminQueues", (t) =>
|
||||
t.field({
|
||||
type: [QueueInfoType],
|
||||
resolve: async (_root, _args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
return Promise.all(
|
||||
Object.entries(queues).map(async ([name, queue]) => {
|
||||
const counts = await queue.getJobCounts(
|
||||
"waiting",
|
||||
"active",
|
||||
"completed",
|
||||
"failed",
|
||||
"delayed",
|
||||
"paused",
|
||||
);
|
||||
const isPaused = await queue.isPaused();
|
||||
return {
|
||||
name,
|
||||
counts: {
|
||||
waiting: counts.waiting ?? 0,
|
||||
active: counts.active ?? 0,
|
||||
completed: counts.completed ?? 0,
|
||||
failed: counts.failed ?? 0,
|
||||
delayed: counts.delayed ?? 0,
|
||||
paused: counts.paused ?? 0,
|
||||
},
|
||||
isPaused,
|
||||
};
|
||||
}),
|
||||
);
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
builder.queryField("adminQueueJobs", (t) =>
|
||||
t.field({
|
||||
type: [JobType],
|
||||
args: {
|
||||
queue: t.arg.string({ required: true }),
|
||||
status: t.arg.string(),
|
||||
limit: t.arg.int(),
|
||||
offset: t.arg.int(),
|
||||
},
|
||||
resolve: async (_root, args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
const queue = queues[args.queue];
|
||||
if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
|
||||
|
||||
const limit = args.limit ?? 25;
|
||||
const offset = args.offset ?? 0;
|
||||
const statuses: JobStatus[] = args.status
|
||||
? [args.status as JobStatus]
|
||||
: [...JOB_STATUSES];
|
||||
|
||||
const jobs = await queue.getJobs(statuses, offset, offset + limit - 1);
|
||||
return Promise.all(jobs.map((job) => toJobData(job, args.queue)));
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
builder.mutationField("adminRetryJob", (t) =>
|
||||
t.field({
|
||||
type: "Boolean",
|
||||
args: {
|
||||
queue: t.arg.string({ required: true }),
|
||||
jobId: t.arg.string({ required: true }),
|
||||
},
|
||||
resolve: async (_root, args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
const queue = queues[args.queue];
|
||||
if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
|
||||
const job = await queue.getJob(args.jobId);
|
||||
if (!job) throw new GraphQLError(`Job "${args.jobId}" not found`);
|
||||
await job.retry();
|
||||
return true;
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
builder.mutationField("adminRemoveJob", (t) =>
|
||||
t.field({
|
||||
type: "Boolean",
|
||||
args: {
|
||||
queue: t.arg.string({ required: true }),
|
||||
jobId: t.arg.string({ required: true }),
|
||||
},
|
||||
resolve: async (_root, args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
const queue = queues[args.queue];
|
||||
if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
|
||||
const job = await queue.getJob(args.jobId);
|
||||
if (!job) throw new GraphQLError(`Job "${args.jobId}" not found`);
|
||||
await job.remove();
|
||||
return true;
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
builder.mutationField("adminPauseQueue", (t) =>
|
||||
t.field({
|
||||
type: "Boolean",
|
||||
args: { queue: t.arg.string({ required: true }) },
|
||||
resolve: async (_root, args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
const queue = queues[args.queue];
|
||||
if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
|
||||
await queue.pause();
|
||||
return true;
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
builder.mutationField("adminResumeQueue", (t) =>
|
||||
t.field({
|
||||
type: "Boolean",
|
||||
args: { queue: t.arg.string({ required: true }) },
|
||||
resolve: async (_root, args, ctx) => {
|
||||
requireAdmin(ctx);
|
||||
const queue = queues[args.queue];
|
||||
if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
|
||||
await queue.resume();
|
||||
return true;
|
||||
},
|
||||
}),
|
||||
);
|
||||
@@ -333,6 +333,74 @@ export const AchievementType = builder.objectRef<Achievement>("Achievement").imp
|
||||
}),
|
||||
});
|
||||
|
||||
// --- Queue / Job types (admin only, not in shared types package) ---
|
||||
|
||||
type JobCounts = {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
paused: number;
|
||||
};
|
||||
|
||||
type JobData = {
|
||||
id: string;
|
||||
name: string;
|
||||
queue: string;
|
||||
status: string;
|
||||
data: unknown;
|
||||
result: unknown;
|
||||
failedReason: string | null;
|
||||
attemptsMade: number;
|
||||
createdAt: Date;
|
||||
processedAt: Date | null;
|
||||
finishedAt: Date | null;
|
||||
progress: number | null;
|
||||
};
|
||||
|
||||
type QueueInfoData = {
|
||||
name: string;
|
||||
counts: JobCounts;
|
||||
isPaused: boolean;
|
||||
};
|
||||
|
||||
export const JobCountsType = builder.objectRef<JobCounts>("JobCounts").implement({
|
||||
fields: (t) => ({
|
||||
waiting: t.exposeInt("waiting"),
|
||||
active: t.exposeInt("active"),
|
||||
completed: t.exposeInt("completed"),
|
||||
failed: t.exposeInt("failed"),
|
||||
delayed: t.exposeInt("delayed"),
|
||||
paused: t.exposeInt("paused"),
|
||||
}),
|
||||
});
|
||||
|
||||
export const JobType = builder.objectRef<JobData>("Job").implement({
|
||||
fields: (t) => ({
|
||||
id: t.exposeString("id"),
|
||||
name: t.exposeString("name"),
|
||||
queue: t.exposeString("queue"),
|
||||
status: t.exposeString("status"),
|
||||
data: t.expose("data", { type: "JSON" }),
|
||||
result: t.expose("result", { type: "JSON", nullable: true }),
|
||||
failedReason: t.exposeString("failedReason", { nullable: true }),
|
||||
attemptsMade: t.exposeInt("attemptsMade"),
|
||||
createdAt: t.expose("createdAt", { type: "DateTime" }),
|
||||
processedAt: t.expose("processedAt", { type: "DateTime", nullable: true }),
|
||||
finishedAt: t.expose("finishedAt", { type: "DateTime", nullable: true }),
|
||||
progress: t.exposeFloat("progress", { nullable: true }),
|
||||
}),
|
||||
});
|
||||
|
||||
export const QueueInfoType = builder.objectRef<QueueInfoData>("QueueInfo").implement({
|
||||
fields: (t) => ({
|
||||
name: t.exposeString("name"),
|
||||
counts: t.expose("counts", { type: JobCountsType }),
|
||||
isPaused: t.exposeBoolean("isPaused"),
|
||||
}),
|
||||
});
|
||||
|
||||
export const VideoListType = builder
|
||||
.objectRef<{ items: Video[]; total: number }>("VideoList")
|
||||
.implement({
|
||||
|
||||
@@ -16,6 +16,7 @@ import { db } from "./db/connection";
|
||||
import { redis } from "./lib/auth";
|
||||
import { logger } from "./lib/logger";
|
||||
import { migrate } from "drizzle-orm/node-postgres/migrator";
|
||||
import { startMailWorker } from "./queues/workers/mail";
|
||||
|
||||
const PORT = parseInt(process.env.PORT || "4000");
|
||||
const UPLOAD_DIR = process.env.UPLOAD_DIR || "/data/uploads";
|
||||
@@ -28,6 +29,10 @@ async function main() {
|
||||
await migrate(db, { migrationsFolder });
|
||||
logger.info("Migrations complete");
|
||||
|
||||
// Start background workers
|
||||
startMailWorker();
|
||||
logger.info("Queue workers started");
|
||||
|
||||
const fastify = Fastify({ loggerInstance: logger });
|
||||
|
||||
await fastify.register(fastifyCookie, {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import nodemailer from "nodemailer";
|
||||
import { mailQueue } from "../queues/index.js";
|
||||
|
||||
const transporter = nodemailer.createTransport({
|
||||
host: process.env.SMTP_HOST || "localhost",
|
||||
@@ -32,3 +33,13 @@ export async function sendPasswordReset(email: string, token: string): Promise<v
|
||||
html: `<p>Click <a href="${BASE_URL}/password/reset?token=${token}">here</a> to reset your password.</p>`,
|
||||
});
|
||||
}
|
||||
|
||||
const jobOpts = { attempts: 3, backoff: { type: "exponential" as const, delay: 5000 } };
|
||||
|
||||
export async function enqueueVerification(email: string, token: string): Promise<void> {
|
||||
await mailQueue.add("sendVerification", { email, token }, jobOpts);
|
||||
}
|
||||
|
||||
export async function enqueuePasswordReset(email: string, token: string): Promise<void> {
|
||||
await mailQueue.add("sendPasswordReset", { email, token }, jobOpts);
|
||||
}
|
||||
|
||||
16
packages/backend/src/queues/connection.ts
Normal file
16
packages/backend/src/queues/connection.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
function parseRedisUrl(url: string): { host: string; port: number; password?: string } {
|
||||
const parsed = new URL(url);
|
||||
return {
|
||||
host: parsed.hostname,
|
||||
port: parseInt(parsed.port) || 6379,
|
||||
password: parsed.password || undefined,
|
||||
};
|
||||
}
|
||||
|
||||
// BullMQ creates its own IORedis connections from these options.
|
||||
// maxRetriesPerRequest: null is required for workers.
|
||||
export const redisConnectionOpts = {
|
||||
...parseRedisUrl(process.env.REDIS_URL || "redis://localhost:6379"),
|
||||
maxRetriesPerRequest: null as null,
|
||||
enableReadyCheck: false,
|
||||
};
|
||||
8
packages/backend/src/queues/index.ts
Normal file
8
packages/backend/src/queues/index.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import { Queue } from "bullmq";
|
||||
import { redisConnectionOpts } from "./connection.js";
|
||||
|
||||
export const mailQueue = new Queue("mail", { connection: redisConnectionOpts });
|
||||
|
||||
export const queues: Record<string, Queue> = {
|
||||
mail: mailQueue,
|
||||
};
|
||||
28
packages/backend/src/queues/workers/mail.ts
Normal file
28
packages/backend/src/queues/workers/mail.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import { Worker } from "bullmq";
|
||||
import { redisConnectionOpts } from "../connection.js";
|
||||
import { sendVerification, sendPasswordReset } from "../../lib/email.js";
|
||||
|
||||
export function startMailWorker(): Worker {
|
||||
const worker = new Worker(
|
||||
"mail",
|
||||
async (job) => {
|
||||
switch (job.name) {
|
||||
case "sendVerification":
|
||||
await sendVerification(job.data.email as string, job.data.token as string);
|
||||
break;
|
||||
case "sendPasswordReset":
|
||||
await sendPasswordReset(job.data.email as string, job.data.token as string);
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unknown mail job: ${job.name}`);
|
||||
}
|
||||
},
|
||||
{ connection: redisConnectionOpts },
|
||||
);
|
||||
|
||||
worker.on("failed", (job, err) => {
|
||||
console.error(`Mail job ${job?.id} (${job?.name}) failed:`, err.message);
|
||||
});
|
||||
|
||||
return worker;
|
||||
}
|
||||
Reference in New Issue
Block a user