diff --git a/packages/backend/src/db/schema/gamification.ts b/packages/backend/src/db/schema/gamification.ts index 80cacaf..02e067e 100644 --- a/packages/backend/src/db/schema/gamification.ts +++ b/packages/backend/src/db/schema/gamification.ts @@ -8,6 +8,7 @@ import { pgEnum, uniqueIndex, } from "drizzle-orm/pg-core"; +import { sql } from "drizzle-orm"; import { users } from "./users"; import { recordings } from "./recordings"; @@ -68,6 +69,11 @@ export const user_points = pgTable( (t) => [ index("user_points_user_idx").on(t.user_id), index("user_points_date_idx").on(t.date_created), + uniqueIndex("user_points_unique_action_recording") + .on(t.user_id, t.action, t.recording_id) + .where( + sql`"action" IN ('RECORDING_CREATE', 'RECORDING_FEATURED') AND "recording_id" IS NOT NULL`, + ), ], ); diff --git a/packages/backend/src/graphql/resolvers/comments.ts b/packages/backend/src/graphql/resolvers/comments.ts index 89f7733..1a96d80 100644 --- a/packages/backend/src/graphql/resolvers/comments.ts +++ b/packages/backend/src/graphql/resolvers/comments.ts @@ -3,8 +3,8 @@ import { builder } from "../builder"; import { CommentType, AdminCommentListType } from "../types/index"; import { comments, users } from "../../db/schema/index"; import { eq, and, desc, ilike, count } from "drizzle-orm"; -import { awardPoints, checkAchievements } from "../../lib/gamification"; import { requireOwnerOrAdmin, requireAdmin } from "../../lib/acl"; +import { gamificationQueue } from "../../queues/index"; builder.queryField("commentsForVideo", (t) => t.field({ @@ -59,10 +59,16 @@ builder.mutationField("createCommentForVideo", (t) => }) .returning(); - // Gamification (non-blocking) - awardPoints(ctx.db, ctx.currentUser.id, "COMMENT_CREATE") - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "social")) - .catch((e) => console.error("Gamification error on comment:", e)); + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "COMMENT_CREATE", + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "social", + }); const user = await ctx.db .select({ diff --git a/packages/backend/src/graphql/resolvers/recordings.ts b/packages/backend/src/graphql/resolvers/recordings.ts index deeeb0f..50a1ac2 100644 --- a/packages/backend/src/graphql/resolvers/recordings.ts +++ b/packages/backend/src/graphql/resolvers/recordings.ts @@ -4,8 +4,8 @@ import { RecordingType, AdminRecordingListType } from "../types/index"; import { recordings, recording_plays } from "../../db/schema/index"; import { eq, and, desc, ilike, count, type SQL } from "drizzle-orm"; import { slugify } from "../../lib/slugify"; -import { awardPoints, checkAchievements } from "../../lib/gamification"; import { requireAdmin } from "../../lib/acl"; +import { gamificationQueue } from "../../queues/index"; builder.queryField("recordings", (t) => t.field({ @@ -122,11 +122,18 @@ builder.mutationField("createRecording", (t) => const recording = newRecording[0]; - // Gamification (non-blocking) if (recording.status === "published") { - awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_CREATE", recording.id) - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) - .catch((e) => console.error("Gamification error on recording create:", e)); + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "RECORDING_CREATE", + recordingId: recording.id, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "recordings", + }); } return recording; @@ -180,15 +187,45 @@ builder.mutationField("updateRecording", (t) => const recording = updated[0]; - // Gamification (non-blocking) if (args.status === "published" && existing[0].status !== "published") { - awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_CREATE", recording.id) - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) - .catch((e) => console.error("Gamification error on recording publish:", e)); + // draft → published: award creation points + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "RECORDING_CREATE", + recordingId: recording.id, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "recordings", + }); + } else if (args.status === "draft" && existing[0].status === "published") { + // published → draft: revoke creation points + await gamificationQueue.add("revokePoints", { + job: "revokePoints", + userId: ctx.currentUser.id, + action: "RECORDING_CREATE", + recordingId: recording.id, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "recordings", + }); } else if (args.status === "published" && recording.featured && !existing[0].featured) { - awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_FEATURED", recording.id) - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "recordings")) - .catch((e) => console.error("Gamification error on recording feature:", e)); + // newly featured while published: award featured bonus + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "RECORDING_FEATURED", + recordingId: recording.id, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "recordings", + }); } return recording; @@ -290,11 +327,18 @@ builder.mutationField("recordRecordingPlay", (t) => }) .returning({ id: recording_plays.id }); - // Gamification (non-blocking) if (ctx.currentUser && recording[0].user_id !== ctx.currentUser.id) { - awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_PLAY", args.recordingId) - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "playback")) - .catch((e) => console.error("Gamification error on recording play:", e)); + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "RECORDING_PLAY", + recordingId: args.recordingId, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "playback", + }); } return { success: true, play_id: play[0].id }; @@ -329,11 +373,18 @@ builder.mutationField("updateRecordingPlay", (t) => }) .where(eq(recording_plays.id, args.playId)); - // Gamification (non-blocking) if (args.completed && !wasCompleted && ctx.currentUser) { - awardPoints(ctx.db, ctx.currentUser.id, "RECORDING_COMPLETE", existing[0].recording_id) - .then(() => checkAchievements(ctx.db, ctx.currentUser!.id, "playback")) - .catch((e) => console.error("Gamification error on recording complete:", e)); + await gamificationQueue.add("awardPoints", { + job: "awardPoints", + userId: ctx.currentUser.id, + action: "RECORDING_COMPLETE", + recordingId: existing[0].recording_id, + }); + await gamificationQueue.add("checkAchievements", { + job: "checkAchievements", + userId: ctx.currentUser.id, + category: "playback", + }); } return true; diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 3742958..973a6b5 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -17,6 +17,7 @@ import { redis } from "./lib/auth"; import { logger } from "./lib/logger"; import { migrate } from "drizzle-orm/node-postgres/migrator"; import { startMailWorker } from "./queues/workers/mail"; +import { startGamificationWorker } from "./queues/workers/gamification"; const PORT = parseInt(process.env.PORT || "4000"); const UPLOAD_DIR = process.env.UPLOAD_DIR || "/data/uploads"; @@ -31,6 +32,7 @@ async function main() { // Start background workers startMailWorker(); + startGamificationWorker(); logger.info("Queue workers started"); const fastify = Fastify({ loggerInstance: logger }); diff --git a/packages/backend/src/lib/gamification.ts b/packages/backend/src/lib/gamification.ts index 107a64f..b26457e 100644 --- a/packages/backend/src/lib/gamification.ts +++ b/packages/backend/src/lib/gamification.ts @@ -28,13 +28,34 @@ export async function awardPoints( recordingId?: string, ): Promise { const points = POINT_VALUES[action]; - await db.insert(user_points).values({ - user_id: userId, - action, - points, - recording_id: recordingId || null, - date_created: new Date(), - }); + await db + .insert(user_points) + .values({ + user_id: userId, + action, + points, + recording_id: recordingId || null, + date_created: new Date(), + }) + .onConflictDoNothing(); + await updateUserStats(db, userId); +} + +export async function revokePoints( + db: DB, + userId: string, + action: keyof typeof POINT_VALUES, + recordingId: string, +): Promise { + await db + .delete(user_points) + .where( + and( + eq(user_points.user_id, userId), + eq(user_points.action, action), + eq(user_points.recording_id, recordingId), + ), + ); await updateUserStats(db, userId); } diff --git a/packages/backend/src/migrations/0004_gamification_unique.sql b/packages/backend/src/migrations/0004_gamification_unique.sql new file mode 100644 index 0000000..38241a8 --- /dev/null +++ b/packages/backend/src/migrations/0004_gamification_unique.sql @@ -0,0 +1,6 @@ +-- Partial unique index: prevents duplicate RECORDING_CREATE / RECORDING_FEATURED points +-- for the same recording. RECORDING_PLAY / RECORDING_COMPLETE are excluded so a user +-- can earn play points across multiple sessions. +CREATE UNIQUE INDEX "user_points_unique_action_recording" +ON "user_points" ("user_id", "action", "recording_id") +WHERE "action" IN ('RECORDING_CREATE', 'RECORDING_FEATURED') AND "recording_id" IS NOT NULL; diff --git a/packages/backend/src/queues/index.ts b/packages/backend/src/queues/index.ts index e07f0f4..fe0de02 100644 --- a/packages/backend/src/queues/index.ts +++ b/packages/backend/src/queues/index.ts @@ -5,13 +5,21 @@ import { logger } from "../lib/logger.js"; const log = logger.child({ component: "queues" }); export const mailQueue = new Queue("mail", { connection: redisConnectionOpts }); - mailQueue.on("error", (err) => { log.error({ queue: "mail", err: err.message }, "Queue error"); }); -log.info("Mail queue initialized"); +export const gamificationQueue = new Queue("gamification", { + connection: redisConnectionOpts, + defaultJobOptions: { attempts: 3, backoff: { type: "exponential", delay: 2000 } }, +}); +gamificationQueue.on("error", (err) => { + log.error({ queue: "gamification", err: err.message }, "Queue error"); +}); + +log.info("Queues initialized"); export const queues: Record = { mail: mailQueue, + gamification: gamificationQueue, }; diff --git a/packages/backend/src/queues/workers/gamification.ts b/packages/backend/src/queues/workers/gamification.ts new file mode 100644 index 0000000..13ca5da --- /dev/null +++ b/packages/backend/src/queues/workers/gamification.ts @@ -0,0 +1,49 @@ +import { Worker } from "bullmq"; +import { redisConnectionOpts } from "../connection.js"; +import { awardPoints, revokePoints, checkAchievements } from "../../lib/gamification.js"; +import { db } from "../../db/connection.js"; +import { logger } from "../../lib/logger.js"; +import type { POINT_VALUES } from "../../lib/gamification.js"; + +const log = logger.child({ component: "gamification-worker" }); + +export type GamificationJobData = + | { job: "awardPoints"; userId: string; action: keyof typeof POINT_VALUES; recordingId?: string } + | { job: "revokePoints"; userId: string; action: keyof typeof POINT_VALUES; recordingId: string } + | { job: "checkAchievements"; userId: string; category?: string }; + +export function startGamificationWorker(): Worker { + const worker = new Worker( + "gamification", + async (bullJob) => { + const data = bullJob.data as GamificationJobData; + log.info({ jobId: bullJob.id, job: data.job, userId: data.userId }, "Processing gamification job"); + + switch (data.job) { + case "awardPoints": + await awardPoints(db, data.userId, data.action, data.recordingId); + break; + case "revokePoints": + await revokePoints(db, data.userId, data.action, data.recordingId); + break; + case "checkAchievements": + await checkAchievements(db, data.userId, data.category); + break; + default: + throw new Error(`Unknown gamification job: ${(data as GamificationJobData).job}`); + } + + log.info({ jobId: bullJob.id, job: data.job }, "Gamification job completed"); + }, + { connection: redisConnectionOpts }, + ); + + worker.on("failed", (bullJob, err) => { + log.error( + { jobId: bullJob?.id, job: (bullJob?.data as GamificationJobData)?.job, err: err.message }, + "Gamification job failed", + ); + }); + + return worker; +}