import pMap from "p-map" import { Mailer } from "../lib/Mailer" import { logger } from "../utils/logger" import { prisma } from "../utils/prisma" import { cronJob } from "./cron.utils" import { subSeconds } from "date-fns" export const sendMessagesCron = cronJob("sendMessages", async () => { const organizations = await prisma.organization.findMany() for (const organization of organizations) { const [smtpSettings, emailSettings, generalSettings] = await Promise.all([ prisma.smtpSettings.findFirst({ where: { organizationId: organization.id }, }), prisma.emailDeliverySettings.findFirst({ where: { organizationId: organization.id }, }), prisma.generalSettings.findFirst({ where: { organizationId: organization.id }, }), ]) if (!smtpSettings || !emailSettings) { logger.warn( `Required settings not found for org ${organization.id}, skipping` ) continue } const windowStart = subSeconds(new Date(), emailSettings.rateWindow) const sentInWindow = await prisma.message.count({ where: { status: { in: ["PENDING", "SENT", "OPENED", "CLICKED"], }, sentAt: { gte: windowStart, }, Campaign: { organizationId: organization.id, }, }, }) const availableSlots = Math.max(0, emailSettings.rateLimit - sentInWindow) if (availableSlots === 0) { continue } // Message status is now independent of campaign status. // This allows retrying individual messages even for completed campaigns. // We only filter by QUEUED and RETRYING message statuses. const messages = await prisma.message.findMany({ where: { Campaign: { organizationId: organization.id, }, OR: [ { status: "QUEUED" }, { status: "RETRYING", lastTriedAt: { lte: subSeconds(new Date(), emailSettings.retryDelay), }, }, ], }, include: { Subscriber: { select: { email: true, }, }, Campaign: { select: { subject: true, }, }, }, take: availableSlots, }) const noMoreRetryingMessages = await prisma.message.count({ where: { status: "RETRYING", Campaign: { organizationId: organization.id, }, }, }) if (!messages.length && noMoreRetryingMessages === 0) { await prisma.campaign.updateMany({ where: { status: "SENDING", organizationId: organization.id, Messages: { every: { status: { in: ["SENT", "FAILED", "OPENED", "CLICKED", "CANCELLED"], }, }, }, }, data: { status: "COMPLETED", completedAt: new Date(), }, }) continue } logger.info(`Found ${messages.length} messages to send`) const mailer = new Mailer({ ...smtpSettings, timeout: emailSettings.connectionTimeout, }) const fromName = smtpSettings.fromName ?? generalSettings?.defaultFromName ?? "" const fromEmail = smtpSettings.fromEmail ?? generalSettings?.defaultFromEmail ?? "" if (!fromName || !fromEmail) { logger.warn("No from name or email found, message will not be sent") continue } await pMap( messages, async (message) => { if (!message.Campaign.subject) { logger.warn("No subject found for campaign") return } await prisma.message.update({ where: { id: message.id }, data: { status: "PENDING" }, }) try { const result = await mailer.sendEmail({ to: message.Subscriber.email, subject: message.Campaign.subject, html: message.content, from: `${fromName} <${fromEmail}>`, }) await prisma.message.update({ where: { id: message.id }, data: { messageId: result.messageId, status: result.success ? "SENT" : message.tries >= emailSettings.maxRetries ? "FAILED" : "RETRYING", sentAt: result.success ? new Date() : undefined, tries: { increment: 1, }, lastTriedAt: new Date(), }, }) } catch (error) { await prisma.message.update({ where: { id: message.id }, data: { status: message.tries >= emailSettings.maxRetries ? "FAILED" : "RETRYING", error: String(error), tries: { increment: 1, }, lastTriedAt: new Date(), }, }) } }, { concurrency: emailSettings.concurrency } ) } })