feat: gamification queue with deduplication and unpublish revoke

- Add migration 0004: partial unique index on user_points (user_id, action, recording_id)
  for RECORDING_CREATE and RECORDING_FEATURED to prevent earn-on-republish farming
- Add revokePoints() to gamification lib; awardPoints() now uses onConflictDoNothing
- Add gamificationQueue (BullMQ) with 3-attempt exponential backoff
- Add gamification worker handling awardPoints, revokePoints, checkAchievements jobs
- Move all inline gamification calls in recordings + comments resolvers to queue
- Revoke RECORDING_CREATE points when a recording is unpublished (published → draft)
- Register gamification worker at server startup alongside mail worker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 19:50:33 +01:00
parent 1b724e86c9
commit 5f40a812d3
8 changed files with 183 additions and 34 deletions

View File

@@ -8,6 +8,7 @@ import {
pgEnum, pgEnum,
uniqueIndex, uniqueIndex,
} from "drizzle-orm/pg-core"; } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import { users } from "./users"; import { users } from "./users";
import { recordings } from "./recordings"; import { recordings } from "./recordings";
@@ -68,6 +69,11 @@ export const user_points = pgTable(
(t) => [ (t) => [
index("user_points_user_idx").on(t.user_id), index("user_points_user_idx").on(t.user_id),
index("user_points_date_idx").on(t.date_created), index("user_points_date_idx").on(t.date_created),
uniqueIndex("user_points_unique_action_recording")
.on(t.user_id, t.action, t.recording_id)
.where(
sql`"action" IN ('RECORDING_CREATE', 'RECORDING_FEATURED') AND "recording_id" IS NOT NULL`,
),
], ],
); );

View File

@@ -3,8 +3,8 @@ import { builder } from "../builder";
import { CommentType, AdminCommentListType } from "../types/index"; import { CommentType, AdminCommentListType } from "../types/index";
import { comments, users } from "../../db/schema/index"; import { comments, users } from "../../db/schema/index";
import { eq, and, desc, ilike, count } from "drizzle-orm"; import { eq, and, desc, ilike, count } from "drizzle-orm";
import { awardPoints, checkAchievements } from "../../lib/gamification";
import { requireOwnerOrAdmin, requireAdmin } from "../../lib/acl"; import { requireOwnerOrAdmin, requireAdmin } from "../../lib/acl";
import { gamificationQueue } from "../../queues/index";
builder.queryField("commentsForVideo", (t) => builder.queryField("commentsForVideo", (t) =>
t.field({ t.field({
@@ -59,10 +59,16 @@ builder.mutationField("createCommentForVideo", (t) =>
}) })
.returning(); .returning();
// Gamification (non-blocking) await gamificationQueue.add("awardPoints", {
awardPoints(ctx.db, ctx.currentUser.id, "COMMENT_CREATE") job: "awardPoints",
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "social")) userId: ctx.currentUser.id,
.catch((e) => console.error("Gamification error on comment:", e)); action: "COMMENT_CREATE",
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "social",
});
const user = await ctx.db const user = await ctx.db
.select({ .select({

View File

@@ -4,8 +4,8 @@ import { RecordingType, AdminRecordingListType } from "../types/index";
import { recordings, recording_plays } from "../../db/schema/index"; import { recordings, recording_plays } from "../../db/schema/index";
import { eq, and, desc, ilike, count, type SQL } from "drizzle-orm"; import { eq, and, desc, ilike, count, type SQL } from "drizzle-orm";
import { slugify } from "../../lib/slugify"; import { slugify } from "../../lib/slugify";
import { awardPoints, checkAchievements } from "../../lib/gamification";
import { requireAdmin } from "../../lib/acl"; import { requireAdmin } from "../../lib/acl";
import { gamificationQueue } from "../../queues/index";
builder.queryField("recordings", (t) => builder.queryField("recordings", (t) =>
t.field({ t.field({
@@ -122,11 +122,18 @@ builder.mutationField("createRecording", (t) =>
const recording = newRecording[0]; const recording = newRecording[0];
// Gamification (non-blocking)
if (recording.status === "published") { if (recording.status === "published") {
awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_CREATE", recording.id) await gamificationQueue.add("awardPoints", {
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) job: "awardPoints",
.catch((e) => console.error("Gamification error on recording create:", e)); userId: ctx.currentUser.id,
action: "RECORDING_CREATE",
recordingId: recording.id,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "recordings",
});
} }
return recording; return recording;
@@ -180,15 +187,45 @@ builder.mutationField("updateRecording", (t) =>
const recording = updated[0]; const recording = updated[0];
// Gamification (non-blocking)
if (args.status === "published" && existing[0].status !== "published") { if (args.status === "published" && existing[0].status !== "published") {
awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_CREATE", recording.id) // draft → published: award creation points
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) await gamificationQueue.add("awardPoints", {
.catch((e) => console.error("Gamification error on recording publish:", e)); job: "awardPoints",
userId: ctx.currentUser.id,
action: "RECORDING_CREATE",
recordingId: recording.id,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "recordings",
});
} else if (args.status === "draft" && existing[0].status === "published") {
// published → draft: revoke creation points
await gamificationQueue.add("revokePoints", {
job: "revokePoints",
userId: ctx.currentUser.id,
action: "RECORDING_CREATE",
recordingId: recording.id,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "recordings",
});
} else if (args.status === "published" && recording.featured && !existing[0].featured) { } else if (args.status === "published" && recording.featured && !existing[0].featured) {
awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_FEATURED", recording.id) // newly featured while published: award featured bonus
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) await gamificationQueue.add("awardPoints", {
.catch((e) => console.error("Gamification error on recording feature:", e)); job: "awardPoints",
userId: ctx.currentUser.id,
action: "RECORDING_FEATURED",
recordingId: recording.id,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "recordings",
});
} }
return recording; return recording;
@@ -290,11 +327,18 @@ builder.mutationField("recordRecordingPlay", (t) =>
}) })
.returning({ id: recording_plays.id }); .returning({ id: recording_plays.id });
// Gamification (non-blocking)
if (ctx.currentUser && recording[0].user_id !== ctx.currentUser.id) { if (ctx.currentUser && recording[0].user_id !== ctx.currentUser.id) {
awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_PLAY", args.recordingId) await gamificationQueue.add("awardPoints", {
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "playback")) job: "awardPoints",
.catch((e) => console.error("Gamification error on recording play:", e)); userId: ctx.currentUser.id,
action: "RECORDING_PLAY",
recordingId: args.recordingId,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "playback",
});
} }
return { success: true, play_id: play[0].id }; return { success: true, play_id: play[0].id };
@@ -329,11 +373,18 @@ builder.mutationField("updateRecordingPlay", (t) =>
}) })
.where(eq(recording_plays.id, args.playId)); .where(eq(recording_plays.id, args.playId));
// Gamification (non-blocking)
if (args.completed && !wasCompleted && ctx.currentUser) { if (args.completed && !wasCompleted && ctx.currentUser) {
awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_COMPLETE", existing[0].recording_id) await gamificationQueue.add("awardPoints", {
.then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "playback")) job: "awardPoints",
.catch((e) => console.error("Gamification error on recording complete:", e)); userId: ctx.currentUser.id,
action: "RECORDING_COMPLETE",
recordingId: existing[0].recording_id,
});
await gamificationQueue.add("checkAchievements", {
job: "checkAchievements",
userId: ctx.currentUser.id,
category: "playback",
});
} }
return true; return true;

View File

@@ -17,6 +17,7 @@ import { redis } from "./lib/auth";
import { logger } from "./lib/logger"; import { logger } from "./lib/logger";
import { migrate } from "drizzle-orm/node-postgres/migrator"; import { migrate } from "drizzle-orm/node-postgres/migrator";
import { startMailWorker } from "./queues/workers/mail"; import { startMailWorker } from "./queues/workers/mail";
import { startGamificationWorker } from "./queues/workers/gamification";
const PORT = parseInt(process.env.PORT || "4000"); const PORT = parseInt(process.env.PORT || "4000");
const UPLOAD_DIR = process.env.UPLOAD_DIR || "/data/uploads"; const UPLOAD_DIR = process.env.UPLOAD_DIR || "/data/uploads";
@@ -31,6 +32,7 @@ async function main() {
// Start background workers // Start background workers
startMailWorker(); startMailWorker();
startGamificationWorker();
logger.info("Queue workers started"); logger.info("Queue workers started");
const fastify = Fastify({ loggerInstance: logger }); const fastify = Fastify({ loggerInstance: logger });

View File

@@ -28,13 +28,34 @@ export async function awardPoints(
recordingId?: string, recordingId?: string,
): Promise<void> { ): Promise<void> {
const points = POINT_VALUES[action]; const points = POINT_VALUES[action];
await db.insert(user_points).values({ await db
user_id: userId, .insert(user_points)
action, .values({
points, user_id: userId,
recording_id: recordingId || null, action,
date_created: new Date(), points,
}); recording_id: recordingId || null,
date_created: new Date(),
})
.onConflictDoNothing();
await updateUserStats(db, userId);
}
export async function revokePoints(
db: DB,
userId: string,
action: keyof typeof POINT_VALUES,
recordingId: string,
): Promise<void> {
await db
.delete(user_points)
.where(
and(
eq(user_points.user_id, userId),
eq(user_points.action, action),
eq(user_points.recording_id, recordingId),
),
);
await updateUserStats(db, userId); await updateUserStats(db, userId);
} }

View File

@@ -0,0 +1,6 @@
-- Partial unique index: prevents duplicate RECORDING_CREATE / RECORDING_FEATURED points
-- for the same recording. RECORDING_PLAY / RECORDING_COMPLETE are excluded so a user
-- can earn play points across multiple sessions.
CREATE UNIQUE INDEX "user_points_unique_action_recording"
ON "user_points" ("user_id", "action", "recording_id")
WHERE "action" IN ('RECORDING_CREATE', 'RECORDING_FEATURED') AND "recording_id" IS NOT NULL;

View File

@@ -5,13 +5,21 @@ import { logger } from "../lib/logger.js";
const log = logger.child({ component: "queues" }); const log = logger.child({ component: "queues" });
export const mailQueue = new Queue("mail", { connection: redisConnectionOpts }); export const mailQueue = new Queue("mail", { connection: redisConnectionOpts });
mailQueue.on("error", (err) => { mailQueue.on("error", (err) => {
log.error({ queue: "mail", err: err.message }, "Queue error"); log.error({ queue: "mail", err: err.message }, "Queue error");
}); });
log.info("Mail queue initialized"); export const gamificationQueue = new Queue("gamification", {
connection: redisConnectionOpts,
defaultJobOptions: { attempts: 3, backoff: { type: "exponential", delay: 2000 } },
});
gamificationQueue.on("error", (err) => {
log.error({ queue: "gamification", err: err.message }, "Queue error");
});
log.info("Queues initialized");
export const queues: Record<string, Queue> = { export const queues: Record<string, Queue> = {
mail: mailQueue, mail: mailQueue,
gamification: gamificationQueue,
}; };

View File

@@ -0,0 +1,49 @@
import { Worker } from "bullmq";
import { redisConnectionOpts } from "../connection.js";
import { awardPoints, revokePoints, checkAchievements } from "../../lib/gamification.js";
import { db } from "../../db/connection.js";
import { logger } from "../../lib/logger.js";
import type { POINT_VALUES } from "../../lib/gamification.js";
const log = logger.child({ component: "gamification-worker" });
export type GamificationJobData =
| { job: "awardPoints"; userId: string; action: keyof typeof POINT_VALUES; recordingId?: string }
| { job: "revokePoints"; userId: string; action: keyof typeof POINT_VALUES; recordingId: string }
| { job: "checkAchievements"; userId: string; category?: string };
export function startGamificationWorker(): Worker {
const worker = new Worker(
"gamification",
async (bullJob) => {
const data = bullJob.data as GamificationJobData;
log.info({ jobId: bullJob.id, job: data.job, userId: data.userId }, "Processing gamification job");
switch (data.job) {
case "awardPoints":
await awardPoints(db, data.userId, data.action, data.recordingId);
break;
case "revokePoints":
await revokePoints(db, data.userId, data.action, data.recordingId);
break;
case "checkAchievements":
await checkAchievements(db, data.userId, data.category);
break;
default:
throw new Error(`Unknown gamification job: ${(data as GamificationJobData).job}`);
}
log.info({ jobId: bullJob.id, job: data.job }, "Gamification job completed");
},
{ connection: redisConnectionOpts },
);
worker.on("failed", (bullJob, err) => {
log.error(
{ jobId: bullJob?.id, job: (bullJob?.data as GamificationJobData)?.job, err: err.message },
"Gamification job failed",
);
});
return worker;
}