From ea23233645419750ce4d53595993bc6f7bd54f9b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sebastian=20Kr=C3=BCger?=
Date: Sun, 8 Mar 2026 18:25:09 +0100
Subject: [PATCH] feat: add BullMQ job queue with admin monitoring UI
- Add BullMQ to backend; mail jobs (verification, password reset) now enqueued instead of sent inline
- Mail worker processes jobs with 3-attempt exponential backoff retry
- Admin GraphQL resolvers: adminQueues, adminQueueJobs, adminRetryJob, adminRemoveJob, adminPauseQueue, adminResumeQueue
- Admin frontend page at /admin/queues: queue cards with counts, job table with status filter, retry/remove/pause actions
Co-Authored-By: Claude Sonnet 4.6
---
packages/backend/package.json | 3 +-
packages/backend/src/graphql/index.ts | 1 +
.../backend/src/graphql/resolvers/auth.ts | 10 +-
.../backend/src/graphql/resolvers/queues.ts | 153 +++++++++
packages/backend/src/graphql/types/index.ts | 68 ++++
packages/backend/src/index.ts | 5 +
packages/backend/src/lib/email.ts | 11 +
packages/backend/src/queues/connection.ts | 16 +
packages/backend/src/queues/index.ts | 8 +
packages/backend/src/queues/workers/mail.ts | 28 ++
packages/frontend/src/lib/i18n/locales/en.ts | 31 ++
packages/frontend/src/lib/services.ts | 142 +++++++++
.../frontend/src/routes/admin/+layout.svelte | 5 +
.../src/routes/admin/queues/+page.server.ts | 7 +
.../src/routes/admin/queues/+page.svelte | 298 ++++++++++++++++++
pnpm-lock.yaml | 147 +++++++++
16 files changed, 927 insertions(+), 6 deletions(-)
create mode 100644 packages/backend/src/graphql/resolvers/queues.ts
create mode 100644 packages/backend/src/queues/connection.ts
create mode 100644 packages/backend/src/queues/index.ts
create mode 100644 packages/backend/src/queues/workers/mail.ts
create mode 100644 packages/frontend/src/routes/admin/queues/+page.server.ts
create mode 100644 packages/frontend/src/routes/admin/queues/+page.svelte
diff --git a/packages/backend/package.json b/packages/backend/package.json
index 1c4af89..576498a 100644
--- a/packages/backend/package.json
+++ b/packages/backend/package.json
@@ -14,14 +14,15 @@
"check": "tsc --noEmit"
},
"dependencies": {
- "@sexy.pivoine.art/types": "workspace:*",
"@fastify/cookie": "^11.0.2",
"@fastify/cors": "^10.0.2",
"@fastify/multipart": "^9.0.3",
"@fastify/static": "^8.1.1",
"@pothos/core": "^4.4.0",
"@pothos/plugin-errors": "^4.2.0",
+ "@sexy.pivoine.art/types": "workspace:*",
"argon2": "^0.43.0",
+ "bullmq": "^5.70.4",
"drizzle-orm": "^0.44.1",
"fastify": "^5.4.0",
"fluent-ffmpeg": "^2.1.3",
diff --git a/packages/backend/src/graphql/index.ts b/packages/backend/src/graphql/index.ts
index 16b0717..0ac836d 100644
--- a/packages/backend/src/graphql/index.ts
+++ b/packages/backend/src/graphql/index.ts
@@ -9,6 +9,7 @@ import "./resolvers/recordings.js";
import "./resolvers/comments.js";
import "./resolvers/gamification.js";
import "./resolvers/stats.js";
+import "./resolvers/queues.js";
import { builder } from "./builder";
export const schema = builder.toSchema();
diff --git a/packages/backend/src/graphql/resolvers/auth.ts b/packages/backend/src/graphql/resolvers/auth.ts
index fff3b62..690af37 100644
--- a/packages/backend/src/graphql/resolvers/auth.ts
+++ b/packages/backend/src/graphql/resolvers/auth.ts
@@ -9,7 +9,7 @@ interface ReplyLike {
}
import { hash, verify as verifyArgon } from "../../lib/argon";
import { setSession, deleteSession } from "../../lib/auth";
-import { sendVerification, sendPasswordReset } from "../../lib/email";
+import { enqueueVerification, enqueuePasswordReset } from "../../lib/email";
import { slugify } from "../../lib/slugify";
import { nanoid } from "nanoid";
@@ -131,9 +131,9 @@ builder.mutationField("register", (t) =>
});
try {
- await sendVerification(args.email, verifyToken);
+ await enqueueVerification(args.email, verifyToken);
} catch (e) {
- console.warn("Failed to send verification email:", (e as Error).message);
+ console.warn("Failed to enqueue verification email:", (e as Error).message);
}
return true;
},
@@ -190,9 +190,9 @@ builder.mutationField("requestPasswordReset", (t) =>
.where(eq(users.id, user[0].id));
try {
- await sendPasswordReset(args.email, token);
+ await enqueuePasswordReset(args.email, token);
} catch (e) {
- console.warn("Failed to send password reset email:", (e as Error).message);
+ console.warn("Failed to enqueue password reset email:", (e as Error).message);
}
return true;
},
diff --git a/packages/backend/src/graphql/resolvers/queues.ts b/packages/backend/src/graphql/resolvers/queues.ts
new file mode 100644
index 0000000..89306bb
--- /dev/null
+++ b/packages/backend/src/graphql/resolvers/queues.ts
@@ -0,0 +1,153 @@
+import { GraphQLError } from "graphql";
+import type { Job } from "bullmq";
+import { builder } from "../builder.js";
+import { JobType, QueueInfoType } from "../types/index.js";
+import { queues } from "../../queues/index.js";
+import { requireAdmin } from "../../lib/acl.js";
+
+const JOB_STATUSES = ["waiting", "active", "completed", "failed", "delayed"] as const;
+type JobStatus = (typeof JOB_STATUSES)[number];
+
+async function toJobData(job: Job, queueName: string) {
+ const status = await job.getState();
+ return {
+ id: job.id ?? "",
+ name: job.name,
+ queue: queueName,
+ status,
+ data: job.data as unknown,
+ result: job.returnvalue as unknown,
+ failedReason: job.failedReason ?? null,
+ attemptsMade: job.attemptsMade,
+ createdAt: new Date(job.timestamp),
+ processedAt: job.processedOn ? new Date(job.processedOn) : null,
+ finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
+ progress: typeof job.progress === "number" ? job.progress : null,
+ };
+}
+
+builder.queryField("adminQueues", (t) =>
+ t.field({
+ type: [QueueInfoType],
+ resolve: async (_root, _args, ctx) => {
+ requireAdmin(ctx);
+ return Promise.all(
+ Object.entries(queues).map(async ([name, queue]) => {
+ const counts = await queue.getJobCounts(
+ "waiting",
+ "active",
+ "completed",
+ "failed",
+ "delayed",
+ "paused",
+ );
+ const isPaused = await queue.isPaused();
+ return {
+ name,
+ counts: {
+ waiting: counts.waiting ?? 0,
+ active: counts.active ?? 0,
+ completed: counts.completed ?? 0,
+ failed: counts.failed ?? 0,
+ delayed: counts.delayed ?? 0,
+ paused: counts.paused ?? 0,
+ },
+ isPaused,
+ };
+ }),
+ );
+ },
+ }),
+);
+
+builder.queryField("adminQueueJobs", (t) =>
+ t.field({
+ type: [JobType],
+ args: {
+ queue: t.arg.string({ required: true }),
+ status: t.arg.string(),
+ limit: t.arg.int(),
+ offset: t.arg.int(),
+ },
+ resolve: async (_root, args, ctx) => {
+ requireAdmin(ctx);
+ const queue = queues[args.queue];
+ if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
+
+ const limit = args.limit ?? 25;
+ const offset = args.offset ?? 0;
+ const statuses: JobStatus[] = args.status
+ ? [args.status as JobStatus]
+ : [...JOB_STATUSES];
+
+ const jobs = await queue.getJobs(statuses, offset, offset + limit - 1);
+ return Promise.all(jobs.map((job) => toJobData(job, args.queue)));
+ },
+ }),
+);
+
+builder.mutationField("adminRetryJob", (t) =>
+ t.field({
+ type: "Boolean",
+ args: {
+ queue: t.arg.string({ required: true }),
+ jobId: t.arg.string({ required: true }),
+ },
+ resolve: async (_root, args, ctx) => {
+ requireAdmin(ctx);
+ const queue = queues[args.queue];
+ if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
+ const job = await queue.getJob(args.jobId);
+ if (!job) throw new GraphQLError(`Job "${args.jobId}" not found`);
+ await job.retry();
+ return true;
+ },
+ }),
+);
+
+builder.mutationField("adminRemoveJob", (t) =>
+ t.field({
+ type: "Boolean",
+ args: {
+ queue: t.arg.string({ required: true }),
+ jobId: t.arg.string({ required: true }),
+ },
+ resolve: async (_root, args, ctx) => {
+ requireAdmin(ctx);
+ const queue = queues[args.queue];
+ if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
+ const job = await queue.getJob(args.jobId);
+ if (!job) throw new GraphQLError(`Job "${args.jobId}" not found`);
+ await job.remove();
+ return true;
+ },
+ }),
+);
+
+builder.mutationField("adminPauseQueue", (t) =>
+ t.field({
+ type: "Boolean",
+ args: { queue: t.arg.string({ required: true }) },
+ resolve: async (_root, args, ctx) => {
+ requireAdmin(ctx);
+ const queue = queues[args.queue];
+ if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
+ await queue.pause();
+ return true;
+ },
+ }),
+);
+
+builder.mutationField("adminResumeQueue", (t) =>
+ t.field({
+ type: "Boolean",
+ args: { queue: t.arg.string({ required: true }) },
+ resolve: async (_root, args, ctx) => {
+ requireAdmin(ctx);
+ const queue = queues[args.queue];
+ if (!queue) throw new GraphQLError(`Queue "${args.queue}" not found`);
+ await queue.resume();
+ return true;
+ },
+ }),
+);
diff --git a/packages/backend/src/graphql/types/index.ts b/packages/backend/src/graphql/types/index.ts
index 28d5d88..df56b53 100644
--- a/packages/backend/src/graphql/types/index.ts
+++ b/packages/backend/src/graphql/types/index.ts
@@ -333,6 +333,74 @@ export const AchievementType = builder.objectRef("Achievement").imp
}),
});
+// --- Queue / Job types (admin only, not in shared types package) ---
+
+type JobCounts = {
+ waiting: number;
+ active: number;
+ completed: number;
+ failed: number;
+ delayed: number;
+ paused: number;
+};
+
+type JobData = {
+ id: string;
+ name: string;
+ queue: string;
+ status: string;
+ data: unknown;
+ result: unknown;
+ failedReason: string | null;
+ attemptsMade: number;
+ createdAt: Date;
+ processedAt: Date | null;
+ finishedAt: Date | null;
+ progress: number | null;
+};
+
+type QueueInfoData = {
+ name: string;
+ counts: JobCounts;
+ isPaused: boolean;
+};
+
+export const JobCountsType = builder.objectRef("JobCounts").implement({
+ fields: (t) => ({
+ waiting: t.exposeInt("waiting"),
+ active: t.exposeInt("active"),
+ completed: t.exposeInt("completed"),
+ failed: t.exposeInt("failed"),
+ delayed: t.exposeInt("delayed"),
+ paused: t.exposeInt("paused"),
+ }),
+});
+
+export const JobType = builder.objectRef("Job").implement({
+ fields: (t) => ({
+ id: t.exposeString("id"),
+ name: t.exposeString("name"),
+ queue: t.exposeString("queue"),
+ status: t.exposeString("status"),
+ data: t.expose("data", { type: "JSON" }),
+ result: t.expose("result", { type: "JSON", nullable: true }),
+ failedReason: t.exposeString("failedReason", { nullable: true }),
+ attemptsMade: t.exposeInt("attemptsMade"),
+ createdAt: t.expose("createdAt", { type: "DateTime" }),
+ processedAt: t.expose("processedAt", { type: "DateTime", nullable: true }),
+ finishedAt: t.expose("finishedAt", { type: "DateTime", nullable: true }),
+ progress: t.exposeFloat("progress", { nullable: true }),
+ }),
+});
+
+export const QueueInfoType = builder.objectRef("QueueInfo").implement({
+ fields: (t) => ({
+ name: t.exposeString("name"),
+ counts: t.expose("counts", { type: JobCountsType }),
+ isPaused: t.exposeBoolean("isPaused"),
+ }),
+});
+
export const VideoListType = builder
.objectRef<{ items: Video[]; total: number }>("VideoList")
.implement({
diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts
index f2f7b69..3742958 100644
--- a/packages/backend/src/index.ts
+++ b/packages/backend/src/index.ts
@@ -16,6 +16,7 @@ import { db } from "./db/connection";
import { redis } from "./lib/auth";
import { logger } from "./lib/logger";
import { migrate } from "drizzle-orm/node-postgres/migrator";
+import { startMailWorker } from "./queues/workers/mail";
const PORT = parseInt(process.env.PORT || "4000");
const UPLOAD_DIR = process.env.UPLOAD_DIR || "/data/uploads";
@@ -28,6 +29,10 @@ async function main() {
await migrate(db, { migrationsFolder });
logger.info("Migrations complete");
+ // Start background workers
+ startMailWorker();
+ logger.info("Queue workers started");
+
const fastify = Fastify({ loggerInstance: logger });
await fastify.register(fastifyCookie, {
diff --git a/packages/backend/src/lib/email.ts b/packages/backend/src/lib/email.ts
index 6360ac2..1d165ce 100644
--- a/packages/backend/src/lib/email.ts
+++ b/packages/backend/src/lib/email.ts
@@ -1,4 +1,5 @@
import nodemailer from "nodemailer";
+import { mailQueue } from "../queues/index.js";
const transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST || "localhost",
@@ -32,3 +33,13 @@ export async function sendPasswordReset(email: string, token: string): PromiseClick here to reset your password.
`,
});
}
+
+const jobOpts = { attempts: 3, backoff: { type: "exponential" as const, delay: 5000 } };
+
+export async function enqueueVerification(email: string, token: string): Promise {
+ await mailQueue.add("sendVerification", { email, token }, jobOpts);
+}
+
+export async function enqueuePasswordReset(email: string, token: string): Promise {
+ await mailQueue.add("sendPasswordReset", { email, token }, jobOpts);
+}
diff --git a/packages/backend/src/queues/connection.ts b/packages/backend/src/queues/connection.ts
new file mode 100644
index 0000000..a28fb79
--- /dev/null
+++ b/packages/backend/src/queues/connection.ts
@@ -0,0 +1,16 @@
+function parseRedisUrl(url: string): { host: string; port: number; password?: string } {
+ const parsed = new URL(url);
+ return {
+ host: parsed.hostname,
+ port: parseInt(parsed.port) || 6379,
+ password: parsed.password || undefined,
+ };
+}
+
+// BullMQ creates its own IORedis connections from these options.
+// maxRetriesPerRequest: null is required for workers.
+export const redisConnectionOpts = {
+ ...parseRedisUrl(process.env.REDIS_URL || "redis://localhost:6379"),
+ maxRetriesPerRequest: null as null,
+ enableReadyCheck: false,
+};
diff --git a/packages/backend/src/queues/index.ts b/packages/backend/src/queues/index.ts
new file mode 100644
index 0000000..c141fe9
--- /dev/null
+++ b/packages/backend/src/queues/index.ts
@@ -0,0 +1,8 @@
+import { Queue } from "bullmq";
+import { redisConnectionOpts } from "./connection.js";
+
+export const mailQueue = new Queue("mail", { connection: redisConnectionOpts });
+
+export const queues: Record = {
+ mail: mailQueue,
+};
diff --git a/packages/backend/src/queues/workers/mail.ts b/packages/backend/src/queues/workers/mail.ts
new file mode 100644
index 0000000..6e8547d
--- /dev/null
+++ b/packages/backend/src/queues/workers/mail.ts
@@ -0,0 +1,28 @@
+import { Worker } from "bullmq";
+import { redisConnectionOpts } from "../connection.js";
+import { sendVerification, sendPasswordReset } from "../../lib/email.js";
+
+export function startMailWorker(): Worker {
+ const worker = new Worker(
+ "mail",
+ async (job) => {
+ switch (job.name) {
+ case "sendVerification":
+ await sendVerification(job.data.email as string, job.data.token as string);
+ break;
+ case "sendPasswordReset":
+ await sendPasswordReset(job.data.email as string, job.data.token as string);
+ break;
+ default:
+ throw new Error(`Unknown mail job: ${job.name}`);
+ }
+ },
+ { connection: redisConnectionOpts },
+ );
+
+ worker.on("failed", (job, err) => {
+ console.error(`Mail job ${job?.id} (${job?.name}) failed:`, err.message);
+ });
+
+ return worker;
+}
diff --git a/packages/frontend/src/lib/i18n/locales/en.ts b/packages/frontend/src/lib/i18n/locales/en.ts
index dd114b6..c6c6805 100644
--- a/packages/frontend/src/lib/i18n/locales/en.ts
+++ b/packages/frontend/src/lib/i18n/locales/en.ts
@@ -913,6 +913,7 @@ export default {
articles: "Articles",
comments: "Comments",
recordings: "Recordings",
+ queues: "Queues",
},
common: {
save_changes: "Save changes",
@@ -1058,6 +1059,36 @@ export default {
delete_success: "Recording deleted",
delete_error: "Failed to delete recording",
},
+ queues: {
+ title: "Job Queues",
+ pause: "Pause",
+ resume: "Resume",
+ paused_badge: "Paused",
+ retry: "Retry",
+ remove: "Remove",
+ retry_success: "Job retried",
+ retry_error: "Failed to retry job",
+ remove_success: "Job removed",
+ remove_error: "Failed to remove job",
+ pause_success: "Queue paused",
+ pause_error: "Failed to pause queue",
+ resume_success: "Queue resumed",
+ resume_error: "Failed to resume queue",
+ col_id: "ID",
+ col_name: "Name",
+ col_status: "Status",
+ col_attempts: "Attempts",
+ col_created: "Created",
+ col_actions: "Actions",
+ no_jobs: "No jobs found",
+ status_all: "All",
+ status_waiting: "Waiting",
+ status_active: "Active",
+ status_completed: "Completed",
+ status_failed: "Failed",
+ status_delayed: "Delayed",
+ failed_reason: "Reason: {reason}",
+ },
article_form: {
new_title: "New article",
edit_title: "Edit article",
diff --git a/packages/frontend/src/lib/services.ts b/packages/frontend/src/lib/services.ts
index d5c2cfe..9fe0c32 100644
--- a/packages/frontend/src/lib/services.ts
+++ b/packages/frontend/src/lib/services.ts
@@ -1876,3 +1876,145 @@ export async function adminDeleteRecording(id: string): Promise {
await getGraphQLClient().request(ADMIN_DELETE_RECORDING_MUTATION, { id });
});
}
+
+// --- Queues ---
+
+export type JobCounts = {
+ waiting: number;
+ active: number;
+ completed: number;
+ failed: number;
+ delayed: number;
+ paused: number;
+};
+
+export type QueueInfo = {
+ name: string;
+ counts: JobCounts;
+ isPaused: boolean;
+};
+
+export type Job = {
+ id: string;
+ name: string;
+ queue: string;
+ status: string;
+ data: Record;
+ result: unknown;
+ failedReason: string | null;
+ attemptsMade: number;
+ createdAt: string;
+ processedAt: string | null;
+ finishedAt: string | null;
+ progress: number | null;
+};
+
+const ADMIN_QUEUES_QUERY = gql`
+ query AdminQueues {
+ adminQueues {
+ name
+ isPaused
+ counts {
+ waiting
+ active
+ completed
+ failed
+ delayed
+ paused
+ }
+ }
+ }
+`;
+
+export async function getAdminQueues(
+ fetchFn?: typeof globalThis.fetch,
+ token?: string,
+): Promise {
+ return loggedApiCall("getAdminQueues", async () => {
+ const client = token ? getAuthClient(token, fetchFn) : getGraphQLClient(fetchFn);
+ const data = await client.request<{ adminQueues: QueueInfo[] }>(ADMIN_QUEUES_QUERY);
+ return data.adminQueues;
+ });
+}
+
+const ADMIN_QUEUE_JOBS_QUERY = gql`
+ query AdminQueueJobs($queue: String!, $status: String, $limit: Int, $offset: Int) {
+ adminQueueJobs(queue: $queue, status: $status, limit: $limit, offset: $offset) {
+ id
+ name
+ queue
+ status
+ data
+ result
+ failedReason
+ attemptsMade
+ createdAt
+ processedAt
+ finishedAt
+ progress
+ }
+ }
+`;
+
+export async function getAdminQueueJobs(
+ queue: string,
+ status?: string,
+ limit?: number,
+ offset?: number,
+): Promise {
+ return loggedApiCall("getAdminQueueJobs", async () => {
+ const data = await getGraphQLClient().request<{ adminQueueJobs: Job[] }>(
+ ADMIN_QUEUE_JOBS_QUERY,
+ { queue, status, limit, offset },
+ );
+ return data.adminQueueJobs;
+ });
+}
+
+const ADMIN_RETRY_JOB_MUTATION = gql`
+ mutation AdminRetryJob($queue: String!, $jobId: String!) {
+ adminRetryJob(queue: $queue, jobId: $jobId)
+ }
+`;
+
+export async function adminRetryJob(queue: string, jobId: string): Promise {
+ return loggedApiCall("adminRetryJob", async () => {
+ await getGraphQLClient().request(ADMIN_RETRY_JOB_MUTATION, { queue, jobId });
+ });
+}
+
+const ADMIN_REMOVE_JOB_MUTATION = gql`
+ mutation AdminRemoveJob($queue: String!, $jobId: String!) {
+ adminRemoveJob(queue: $queue, jobId: $jobId)
+ }
+`;
+
+export async function adminRemoveJob(queue: string, jobId: string): Promise {
+ return loggedApiCall("adminRemoveJob", async () => {
+ await getGraphQLClient().request(ADMIN_REMOVE_JOB_MUTATION, { queue, jobId });
+ });
+}
+
+const ADMIN_PAUSE_QUEUE_MUTATION = gql`
+ mutation AdminPauseQueue($queue: String!) {
+ adminPauseQueue(queue: $queue)
+ }
+`;
+
+const ADMIN_RESUME_QUEUE_MUTATION = gql`
+ mutation AdminResumeQueue($queue: String!) {
+ adminResumeQueue(queue: $queue)
+ }
+`;
+
+export async function adminPauseQueue(queue: string): Promise {
+ return loggedApiCall("adminPauseQueue", async () => {
+ await getGraphQLClient().request(ADMIN_PAUSE_QUEUE_MUTATION, { queue });
+ });
+}
+
+export async function adminResumeQueue(queue: string): Promise {
+ return loggedApiCall("adminResumeQueue", async () => {
+ await getGraphQLClient().request(ADMIN_RESUME_QUEUE_MUTATION, { queue });
+ });
+}
diff --git a/packages/frontend/src/routes/admin/+layout.svelte b/packages/frontend/src/routes/admin/+layout.svelte
index 5796d7d..876c85c 100644
--- a/packages/frontend/src/routes/admin/+layout.svelte
+++ b/packages/frontend/src/routes/admin/+layout.svelte
@@ -14,6 +14,11 @@
href: "/admin/recordings",
icon: "icon-[ri--record-circle-line]",
},
+ {
+ name: $_("admin.nav.queues"),
+ href: "/admin/queues",
+ icon: "icon-[ri--stack-line]",
+ },
]);
function isActive(href: string) {
diff --git a/packages/frontend/src/routes/admin/queues/+page.server.ts b/packages/frontend/src/routes/admin/queues/+page.server.ts
new file mode 100644
index 0000000..b214bbd
--- /dev/null
+++ b/packages/frontend/src/routes/admin/queues/+page.server.ts
@@ -0,0 +1,7 @@
+import { getAdminQueues } from "$lib/services";
+
+export async function load({ fetch, cookies }) {
+ const token = cookies.get("session_token") || "";
+ const queues = await getAdminQueues(fetch, token).catch(() => []);
+ return { queues };
+}
diff --git a/packages/frontend/src/routes/admin/queues/+page.svelte b/packages/frontend/src/routes/admin/queues/+page.svelte
new file mode 100644
index 0000000..a4747e5
--- /dev/null
+++ b/packages/frontend/src/routes/admin/queues/+page.svelte
@@ -0,0 +1,298 @@
+
+
+
+
+
{$_("admin.queues.title")}
+
+
+
+
+ {#each queues as queue (queue.name)}
+ {@const isSelected = selectedQueue === queue.name}
+
+ {/each}
+
+
+ {#if selectedQueue}
+
+
+ {#each STATUS_FILTERS as f (f.value ?? "all")}
+ selectStatus(f.value)}
+ >
+ {f.label}
+
+ {/each}
+
+
+
+
+
+
+
+ | {$_("admin.queues.col_id")} |
+ {$_("admin.queues.col_name")} |
+ {$_("admin.queues.col_status")} |
+
+ {$_("admin.queues.col_attempts")}
+ |
+
+ {$_("admin.queues.col_created")}
+ |
+ {$_("admin.queues.col_actions")} |
+
+
+
+ {#if loadingJobs}
+
+ | {$_("common.loading")} |
+
+ {:else}
+ {#each jobs as job (job.id)}
+
+ | {job.id} |
+
+
+ {job.name}
+ {#if job.failedReason}
+
+ {$_("admin.queues.failed_reason", { values: { reason: job.failedReason } })}
+
+ {/if}
+
+ |
+
+ {job.status}
+ |
+ {job.attemptsMade} |
+ {formatDate(job.createdAt)} |
+
+
+ {#if job.status === "failed"}
+ retryJob(job)}
+ >
+
+
+ {/if}
+ removeJob(job)}
+ >
+
+
+
+ |
+
+ {/each}
+ {#if jobs.length === 0}
+
+ | {$_("admin.queues.no_jobs")} |
+
+ {/if}
+ {/if}
+
+
+
+ {/if}
+
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index df03c4d..cb6901f 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -59,6 +59,9 @@ importers:
argon2:
specifier: ^0.43.0
version: 0.43.1
+ bullmq:
+ specifier: ^5.70.4
+ version: 5.70.4
drizzle-orm:
specifier: ^0.44.1
version: 0.44.7(@types/pg@8.18.0)(knex@3.1.0(pg@8.19.0))(pg@8.19.0)
@@ -1182,6 +1185,9 @@ packages:
'@internationalized/date@3.12.0':
resolution: {integrity: sha512-/PyIMzK29jtXaGU23qTvNZxvBXRtKbNnGDFD+PY6CZw/Y8Ex8pFUzkuCJCG9aOqmShjqhS9mPqP6Dk5onQY8rQ==}
+ '@ioredis/commands@1.5.0':
+ resolution: {integrity: sha512-eUgLqrMf8nJkZxT24JvVRrQya1vZkQh8BBeYNwGDqa5I0VUi8ACx7uFvAaLxintokpTenkK6DASvo/bvNbBGow==}
+
'@ioredis/commands@1.5.1':
resolution: {integrity: sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==}
@@ -1217,6 +1223,36 @@ packages:
resolution: {integrity: sha512-9I2Zn6+NJLfaGoz9jN3lpwDgAYvfGeNYdbAIjJOqzs4Tpc+VU3Jqq4IofSUBKajiDS8k9fZIg18/z13mpk1bsA==}
engines: {node: '>=8'}
+ '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
+ resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
+ cpu: [arm64]
+ os: [darwin]
+
+ '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
+ resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==}
+ cpu: [x64]
+ os: [darwin]
+
+ '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
+ resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==}
+ cpu: [arm64]
+ os: [linux]
+
+ '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
+ resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==}
+ cpu: [arm]
+ os: [linux]
+
+ '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
+ resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==}
+ cpu: [x64]
+ os: [linux]
+
+ '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
+ resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==}
+ cpu: [x64]
+ os: [win32]
+
'@phc/format@1.0.0':
resolution: {integrity: sha512-m7X9U6BG2+J+R1lSOdCiITLLrxm+cWlNI3HUFA92oLO77ObGNzaKdh8pMLqdZcshtkKuV84olNNXDfMc4FezBQ==}
engines: {node: '>=10'}
@@ -1778,6 +1814,9 @@ packages:
buffer-from@1.1.2:
resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==}
+ bullmq@5.70.4:
+ resolution: {integrity: sha512-S58YT/tGdhc4pEPcIahtZRBR1TcTLpss1UKiXimF+Vy4yZwF38pW2IvhHqs4j4dEbZqDt8oi0jGGN/WYQHbPDg==}
+
ce-la-react@0.3.2:
resolution: {integrity: sha512-QJ6k4lOD/btI08xG8jBPxRCGXvCnusGGkTsiXk0u3NqUu/W+BXRnFD4PYjwtqh8AWmGa5LDbGk0fLQsqr0nSMA==}
peerDependencies:
@@ -1855,6 +1894,10 @@ packages:
resolution: {integrity: sha512-ei8Aos7ja0weRpFzJnEA9UHJ/7XQmqglbRwnf2ATjcB9Wq874VKH9kfjjirM6UhU2/E5fFYadylyhFldcqSidQ==}
engines: {node: '>=18'}
+ cron-parser@4.9.0:
+ resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
+ engines: {node: '>=12.0.0'}
+
cross-inspect@1.0.1:
resolution: {integrity: sha512-Pcw1JTvZLSJH83iiGWt6fRcT+BjZlCDRVwYLbUcHzv/CRpB7r0MlSrGbIyQvVSNyGnbt7G4AXuyCiDR3POvZ1A==}
engines: {node: '>=16.0.0'}
@@ -2419,6 +2462,10 @@ packages:
resolution: {integrity: sha512-HVBe9OFuqs+Z6n64q09PQvP1/R4Bm+30PAyyD4wIEqssh3v9L21QjCVk4kRLucMBcDokJTcLjsGeVRlq/nH6DA==}
engines: {node: '>=12.22.0'}
+ ioredis@5.9.3:
+ resolution: {integrity: sha512-VI5tMCdeoxZWU5vjHWsiE/Su76JGhBvWF1MJnV9ZtGltHk9BmD48oDq8Tj8haZ85aceXZMxLNDQZRVo5QKNgXA==}
+ engines: {node: '>=12.22.0'}
+
ipaddr.js@2.3.0:
resolution: {integrity: sha512-Zv/pA+ciVFbCSBBjGfaKUya/CcGmUHzTydLMaTwrUUEM2DIEO3iZvueGxmacvmN50fGpGVKeTXpb2LcYQxeVdg==}
engines: {node: '>= 10'}
@@ -2628,6 +2675,10 @@ packages:
lru-queue@0.1.0:
resolution: {integrity: sha512-BpdYkt9EvGl8OfWHDQPISVpcl5xZthb+XPsbELj5AQXxIC8IriDZIQYjBJPEm5rS420sjZ0TLEzRcq5KdBhYrQ==}
+ luxon@3.7.2:
+ resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==}
+ engines: {node: '>=12'}
+
lz-string@1.5.0:
resolution: {integrity: sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==}
hasBin: true
@@ -2719,6 +2770,13 @@ packages:
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
+ msgpackr-extract@3.0.3:
+ resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==}
+ hasBin: true
+
+ msgpackr@1.11.5:
+ resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==}
+
nanoid@3.3.11:
resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==}
engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1}
@@ -2730,10 +2788,17 @@ packages:
next-tick@1.1.0:
resolution: {integrity: sha512-CXdUiJembsNjuToQvxayPZF9Vqht7hewsvy2sOWafLvi2awflj9mOC6bHIg50orX8IJvWKY9wYQ/zB2kogPslQ==}
+ node-abort-controller@3.1.1:
+ resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
+
node-addon-api@8.6.0:
resolution: {integrity: sha512-gBVjCaqDlRUk0EwoPNKzIr9KkS9041G/q31IBShPs1Xz6UTA+EXdZADbzqAJQrpDRq71CIMnOP5VMut3SL0z5Q==}
engines: {node: ^18 || ^20 || >= 21}
+ node-gyp-build-optional-packages@5.2.2:
+ resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
+ hasBin: true
+
node-gyp-build@4.8.4:
resolution: {integrity: sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==}
hasBin: true
@@ -3049,6 +3114,11 @@ packages:
engines: {node: '>=10'}
hasBin: true
+ semver@7.7.4:
+ resolution: {integrity: sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==}
+ engines: {node: '>=10'}
+ hasBin: true
+
set-cookie-parser@2.7.2:
resolution: {integrity: sha512-oeM1lpU/UvhTxw+g3cIfxXHyJRc/uidd3yK1P242gzHds0udQBYzs3y8j4gCCW+ZJ7ad0yctld8RYO+bdurlvw==}
@@ -4046,6 +4116,8 @@ snapshots:
dependencies:
'@swc/helpers': 0.5.19
+ '@ioredis/commands@1.5.0': {}
+
'@ioredis/commands@1.5.1': {}
'@isaacs/cliui@9.0.0': {}
@@ -4081,6 +4153,24 @@ snapshots:
'@lukeed/ms@2.0.2': {}
+ '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
+ optional: true
+
+ '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
+ optional: true
+
+ '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
+ optional: true
+
+ '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
+ optional: true
+
+ '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
+ optional: true
+
+ '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
+ optional: true
+
'@phc/format@1.0.0': {}
'@pinojs/redact@0.4.0': {}
@@ -4597,6 +4687,18 @@ snapshots:
buffer-from@1.1.2: {}
+ bullmq@5.70.4:
+ dependencies:
+ cron-parser: 4.9.0
+ ioredis: 5.9.3
+ msgpackr: 1.11.5
+ node-abort-controller: 3.1.1
+ semver: 7.7.4
+ tslib: 2.8.1
+ uuid: 11.1.0
+ transitivePeerDependencies:
+ - supports-color
+
ce-la-react@0.3.2(react@19.2.0):
dependencies:
react: 19.2.0
@@ -4662,6 +4764,10 @@ snapshots:
cookie@1.1.1: {}
+ cron-parser@4.9.0:
+ dependencies:
+ luxon: 3.7.2
+
cross-inspect@1.0.1:
dependencies:
tslib: 2.8.1
@@ -5277,6 +5383,20 @@ snapshots:
transitivePeerDependencies:
- supports-color
+ ioredis@5.9.3:
+ dependencies:
+ '@ioredis/commands': 1.5.0
+ cluster-key-slot: 1.1.2
+ debug: 4.4.3
+ denque: 2.1.0
+ lodash.defaults: 4.2.0
+ lodash.isarguments: 3.1.0
+ redis-errors: 1.2.0
+ redis-parser: 3.0.0
+ standard-as-callback: 2.1.0
+ transitivePeerDependencies:
+ - supports-color
+
ipaddr.js@2.3.0: {}
is-arrayish@0.3.4: {}
@@ -5440,6 +5560,8 @@ snapshots:
dependencies:
es5-ext: 0.10.64
+ luxon@3.7.2: {}
+
lz-string@1.5.0: {}
magic-string@0.30.21:
@@ -5524,14 +5646,37 @@ snapshots:
ms@2.1.3: {}
+ msgpackr-extract@3.0.3:
+ dependencies:
+ node-gyp-build-optional-packages: 5.2.2
+ optionalDependencies:
+ '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3
+ '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3
+ '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3
+ '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3
+ '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3
+ '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3
+ optional: true
+
+ msgpackr@1.11.5:
+ optionalDependencies:
+ msgpackr-extract: 3.0.3
+
nanoid@3.3.11: {}
natural-compare@1.4.0: {}
next-tick@1.1.0: {}
+ node-abort-controller@3.1.1: {}
+
node-addon-api@8.6.0: {}
+ node-gyp-build-optional-packages@5.2.2:
+ dependencies:
+ detect-libc: 2.1.2
+ optional: true
+
node-gyp-build@4.8.4: {}
nodemailer@7.0.13: {}
@@ -5827,6 +5972,8 @@ snapshots:
semver@7.7.3: {}
+ semver@7.7.4: {}
+
set-cookie-parser@2.7.2: {}
set-cookie-parser@3.0.1: {}