From 2d67c07044640a6a6dce80d720ec4fda4fe197c6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 12:25:23 +0100 Subject: [PATCH 1/8] feat(schedule-engine): stop persisting per-tick schedule state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each scheduled-task tick previously issued 3 Prisma UPDATEs against TaskSchedule.lastRunTriggeredAt, TaskScheduleInstance.lastScheduledTimestamp, and TaskScheduleInstance.nextScheduledTimestamp. All three were pure denormalization — every value can be derived without persisting. Engine - Drop the three per-tick prisma.update calls. - Refactor registerNextTaskScheduleInstance to take a fromTimestamp arg instead of reading instance.lastScheduledTimestamp from the DB. - Add optional lastScheduleTime to the schedule worker payload so the previous fire time travels forward via Redis. payload.lastTimestamp is now sourced from the worker payload, not a DB column. First-ever fires still report undefined so customer "first-run" sentinel patterns keep working. - For in-flight Redis jobs enqueued before this change (which lack lastScheduleTime), fall back to instance.lastScheduledTimestamp once. After those drain, the column is never read again. Schema - Mark the three columns @deprecated via triple-slash Prisma docstrings. No migration — columns remain in place so revert is code-only. They can be dropped in a follow-up once the rollout is stable. Webapp - ScheduleListPresenter derives the dashboard "Last run" cell from the cron expression's previous slot, gated on schedule.createdAt so brand-new schedules show "–". UI is best-effort; runs page is the source of truth. - API responses (api.v1.schedules.*) already compute nextRun from cron; no public API change. lastTimestamp on the SDK payload retains Date | undefined semantics — no SDK change either. Tests - scheduleEngine integration test asserts first-fire lastTimestamp is undefined and the second fire carries the previous fire's timestamp exactly. - scheduleRecovery tests no longer assert against the deprecated nextScheduledTimestamp column; presence of the worker job is the source of truth. References - New references/scheduled-tasks project with declarative schedules at multiple cadences plus three validators (first-fire-detector, interval-validator, upcoming-validator) that throw on FAIL — used for E2E-verifying the worker-payload flow. Refs TRI-8891 --- .../stop-persisting-schedule-tick-state.md | 6 + .../v3/ScheduleListPresenter.server.ts | 18 +- .../v3/utils/calculateNextSchedule.server.ts | 14 ++ .../database/prisma/schema.prisma | 3 + .../schedule-engine/src/engine/index.ts | 97 ++++---- .../schedule-engine/src/engine/types.ts | 2 + .../src/engine/workerCatalog.ts | 5 + .../test/scheduleEngine.test.ts | 87 +++---- .../test/scheduleRecovery.test.ts | 22 +- pnpm-lock.yaml | 19 ++ references/scheduled-tasks/README.md | 39 ++++ references/scheduled-tasks/package.json | 18 ++ .../scheduled-tasks/src/trigger/schedules.ts | 216 ++++++++++++++++++ references/scheduled-tasks/trigger.config.ts | 21 ++ references/scheduled-tasks/tsconfig.json | 14 ++ 15 files changed, 453 insertions(+), 128 deletions(-) create mode 100644 .server-changes/stop-persisting-schedule-tick-state.md create mode 100644 references/scheduled-tasks/README.md create mode 100644 references/scheduled-tasks/package.json create mode 100644 references/scheduled-tasks/src/trigger/schedules.ts create mode 100644 references/scheduled-tasks/trigger.config.ts create mode 100644 references/scheduled-tasks/tsconfig.json diff --git a/.server-changes/stop-persisting-schedule-tick-state.md b/.server-changes/stop-persisting-schedule-tick-state.md new file mode 100644 index 00000000000..859ef0e2b78 --- /dev/null +++ b/.server-changes/stop-persisting-schedule-tick-state.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Stop writing per-tick state (`lastScheduledTimestamp`, `nextScheduledTimestamp`, `lastRunTriggeredAt`) on `TaskSchedule` and `TaskScheduleInstance`. The schedule engine now carries the previous fire time forward via the worker queue payload, eliminating ~270K dead-tuple-driven autovacuums per year on these hot tables and the associated `IO:XactSync` mini-spikes on the writer. Customer-facing `payload.lastTimestamp` semantics are unchanged. diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 22c151d720b..aa9b233c71b 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -6,7 +6,10 @@ import { getLimit } from "~/services/platform.v3.server"; import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { CheckScheduleService } from "~/v3/services/checkSchedule.server"; -import { calculateNextScheduledTimestampFromNow } from "~/v3/utils/calculateNextSchedule.server"; +import { + calculateNextScheduledTimestampFromNow, + previousScheduledTimestamp, +} from "~/v3/utils/calculateNextSchedule.server"; import { BasePresenter } from "./basePresenter.server"; type ScheduleListOptions = { @@ -193,7 +196,6 @@ export class ScheduleListPresenter extends BasePresenter { }, }, active: true, - lastRunTriggeredAt: true, createdAt: true, }, where: { @@ -244,6 +246,16 @@ export class ScheduleListPresenter extends BasePresenter { }); const schedules: ScheduleListItem[] = rawSchedules.map((schedule) => { + // Approximate "last run" from the cron's previous slot. If that slot + // predates the schedule itself, the schedule hasn't fired yet — show + // undefined rather than a misleading timestamp. UI is best-effort; + // accurate run history is on the schedule's runs page. + const cronPrev = previousScheduledTimestamp( + schedule.generatorExpression, + schedule.timezone + ); + const lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; + return { id: schedule.id, type: schedule.type, @@ -256,7 +268,7 @@ export class ScheduleListPresenter extends BasePresenter { timezone: schedule.timezone, active: schedule.active, externalId: schedule.externalId, - lastRun: schedule.lastRunTriggeredAt ?? undefined, + lastRun, nextRun: calculateNextScheduledTimestampFromNow( schedule.generatorExpression, schedule.timezone diff --git a/apps/webapp/app/v3/utils/calculateNextSchedule.server.ts b/apps/webapp/app/v3/utils/calculateNextSchedule.server.ts index 68adbd3a4b4..9be995e4aa1 100644 --- a/apps/webapp/app/v3/utils/calculateNextSchedule.server.ts +++ b/apps/webapp/app/v3/utils/calculateNextSchedule.server.ts @@ -22,6 +22,20 @@ function calculateNextStep(schedule: string, timezone: string | null, currentDat .toDate(); } +export function previousScheduledTimestamp( + schedule: string, + timezone: string | null, + fromTimestamp: Date = new Date() +) { + return parseExpression(schedule, { + currentDate: fromTimestamp, + utc: timezone === null, + tz: timezone ?? undefined, + }) + .prev() + .toDate(); +} + export function nextScheduledTimestamps( cron: string, timezone: string | null, diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index ee75ce82b5f..c7b5e7ce12b 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2128,6 +2128,7 @@ model TaskSchedule { ///Instances of the schedule that are active instances TaskScheduleInstance[] + /// @deprecated stop writing 2026-04-30; reads moved out of code (UI now derives from cron's previous slot). Drop in follow-up. lastRunTriggeredAt DateTime? project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade) @@ -2173,7 +2174,9 @@ model TaskScheduleInstance { active Boolean @default(true) + /// @deprecated stop writing 2026-04-30; engine derives from cron + exactScheduleTime. Drop in follow-up. lastScheduledTimestamp DateTime? + /// @deprecated stop writing 2026-04-30; engine derives from cron + now(). Drop in follow-up. nextScheduledTimestamp DateTime? //you can only have a schedule attached to each environment once diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 4eb641176b2..bd5909e5301 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -171,13 +171,13 @@ export class ScheduleEngine { instance.taskSchedule.generatorExpression ); - const lastScheduledTimestamp = instance.lastScheduledTimestamp ?? new Date(); - span.setAttribute("last_scheduled_timestamp", lastScheduledTimestamp.toISOString()); + const fromTimestamp = params.fromTimestamp ?? new Date(); + span.setAttribute("from_timestamp", fromTimestamp.toISOString()); const nextScheduledTimestamp = calculateNextScheduledTimestamp( instance.taskSchedule.generatorExpression, instance.taskSchedule.timezone, - lastScheduledTimestamp + fromTimestamp ); span.setAttribute("next_scheduled_timestamp", nextScheduledTimestamp.toISOString()); @@ -194,17 +194,16 @@ export class ScheduleEngine { timezone: instance.taskSchedule.timezone, }); - await this.prisma.taskScheduleInstance.update({ - where: { - id: params.instanceId, - }, - data: { - nextScheduledTimestamp, - }, - }); - - // Enqueue the scheduled task - await this.enqueueScheduledTask(params.instanceId, nextScheduledTimestamp); + // Enqueue the scheduled task. Pass fromTimestamp through as the next + // job's lastScheduleTime so the dequeueing engine can populate + // payload.lastTimestamp without re-reading state from the DB. When + // fromTimestamp is undefined (first-ever registration / recovery), + // lastScheduleTime is also undefined → consumer reports undefined. + await this.enqueueScheduledTask( + params.instanceId, + nextScheduledTimestamp, + params.fromTimestamp + ); // Record metrics this.scheduleRegistrationCounter.add(1, { @@ -244,6 +243,7 @@ export class ScheduleEngine { instanceId: payload.instanceId, finalAttempt: false, // TODO: implement retry logic exactScheduleTime: payload.exactScheduleTime, + lastScheduleTime: payload.lastScheduleTime, }); } @@ -350,14 +350,6 @@ export class ScheduleEngine { skipReason = "schedule_inactive"; } - if (!instance.nextScheduledTimestamp) { - this.logger.debug("No next scheduled timestamp", { - instanceId: params.instanceId, - }); - shouldTrigger = false; - skipReason = "no_next_timestamp"; - } - // For development environments, check if there's an active session if (instance.environment.type === "DEVELOPMENT") { this.devEnvironmentCheckCounter.add(1, { @@ -396,15 +388,26 @@ export class ScheduleEngine { } // Calculate the schedule timestamp that will be used (regardless of whether we trigger or not) - const scheduleTimestamp = - params.exactScheduleTime ?? instance.nextScheduledTimestamp ?? new Date(); + const scheduleTimestamp = params.exactScheduleTime ?? new Date(); if (shouldTrigger) { + // payload.lastTimestamp is the actual previous fire time. Sources, in + // order: + // 1. params.lastScheduleTime — populated by the engine when this + // job was enqueued. Always present for jobs enqueued post-deploy. + // 2. instance.lastScheduledTimestamp — backward-compat fallback for + // in-flight Redis jobs enqueued by older engines that didn't + // include lastScheduleTime in the payload. Once those drain + // this fallback never triggers and we can drop the column. + // 3. undefined — first-ever fire (no previous fire to point at). + const lastTimestamp = + params.lastScheduleTime ?? instance.lastScheduledTimestamp ?? undefined; + const payload = { scheduleId: instance.taskSchedule.friendlyId, type: instance.taskSchedule.type as "DECLARATIVE" | "IMPERATIVE", timestamp: scheduleTimestamp, - lastTimestamp: instance.lastScheduledTimestamp ?? undefined, + lastTimestamp, externalId: instance.taskSchedule.externalId ?? undefined, timezone: instance.taskSchedule.timezone, upcoming: nextScheduledTimestamps( @@ -428,7 +431,7 @@ export class ScheduleEngine { scheduleTimestamp: scheduleTimestamp.toISOString(), actualExecutionTime: actualExecutionTime.toISOString(), schedulingAccuracyMs, - lastTimestamp: instance.lastScheduledTimestamp?.toISOString(), + lastTimestamp: lastTimestamp?.toISOString(), }); const triggerStartTime = Date.now(); @@ -473,16 +476,6 @@ export class ScheduleEngine { ); } else if (result) { if (result.success) { - // Update the last run triggered timestamp - await this.prisma.taskSchedule.update({ - where: { - id: instance.taskSchedule.id, - }, - data: { - lastRunTriggeredAt: new Date(), - }, - }); - this.logger.info("Successfully triggered scheduled task", { instanceId: params.instanceId, taskIdentifier: instance.taskSchedule.taskIdentifier, @@ -542,20 +535,14 @@ export class ScheduleEngine { }); } - // Always update the last scheduled timestamp and register next run - await this.prisma.taskScheduleInstance.update({ - where: { - id: params.instanceId, - }, - data: { - lastScheduledTimestamp: scheduleTimestamp, - }, - }); - - // Register the next run + // Register the next run, calculating from the timestamp we just fired (or + // skipped) so we don't need to round-trip through DB state. // Rewritten try/catch to use tryCatch utility const [nextRunError] = await tryCatch( - this.registerNextTaskScheduleInstance({ instanceId: params.instanceId }) + this.registerNextTaskScheduleInstance({ + instanceId: params.instanceId, + fromTimestamp: scheduleTimestamp, + }) ); if (nextRunError) { this.logger.error("Failed to schedule next run after execution", { @@ -610,10 +597,17 @@ export class ScheduleEngine { /** * Enqueues a scheduled task with distributed execution timing */ - private async enqueueScheduledTask(instanceId: string, exactScheduleTime: Date) { + private async enqueueScheduledTask( + instanceId: string, + exactScheduleTime: Date, + lastScheduleTime?: Date + ) { return startSpan(this.tracer, "enqueueScheduledTask", async (span) => { span.setAttribute("instanceId", instanceId); span.setAttribute("exactScheduleTime", exactScheduleTime.toISOString()); + if (lastScheduleTime) { + span.setAttribute("lastScheduleTime", lastScheduleTime.toISOString()); + } const distributedExecutionTime = calculateDistributedExecutionTime( exactScheduleTime, @@ -646,6 +640,7 @@ export class ScheduleEngine { payload: { instanceId, exactScheduleTime, + lastScheduleTime, }, availableAt: distributedExecutionTime, }); @@ -698,8 +693,6 @@ export class ScheduleEngine { select: { id: true, environmentId: true, - lastScheduledTimestamp: true, - nextScheduledTimestamp: true, }, }, }, @@ -774,8 +767,6 @@ export class ScheduleEngine { instance: { id: string; environmentId: string; - lastScheduledTimestamp: Date | null; - nextScheduledTimestamp: Date | null; }; schedule: { id: string; generatorExpression: string }; }) { diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts index f4888c447ae..f424ee6a4d3 100644 --- a/internal-packages/schedule-engine/src/engine/types.ts +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -74,8 +74,10 @@ export interface TriggerScheduleParams { instanceId: string; finalAttempt: boolean; exactScheduleTime?: Date; + lastScheduleTime?: Date; } export interface RegisterScheduleInstanceParams { instanceId: string; + fromTimestamp?: Date; } diff --git a/internal-packages/schedule-engine/src/engine/workerCatalog.ts b/internal-packages/schedule-engine/src/engine/workerCatalog.ts index ee634c6fb54..c960f458f88 100644 --- a/internal-packages/schedule-engine/src/engine/workerCatalog.ts +++ b/internal-packages/schedule-engine/src/engine/workerCatalog.ts @@ -5,6 +5,11 @@ export const scheduleWorkerCatalog = { schema: z.object({ instanceId: z.string(), exactScheduleTime: z.coerce.date(), + // Optional for backward compat with in-flight jobs enqueued by older + // engines. After deploy, every newly-enqueued job populates this with + // the just-fired schedule time so the next dequeue can report + // payload.lastTimestamp accurately without a DB round-trip. + lastScheduleTime: z.coerce.date().optional(), }), visibilityTimeoutMs: 60_000, retry: { diff --git a/internal-packages/schedule-engine/test/scheduleEngine.test.ts b/internal-packages/schedule-engine/test/scheduleEngine.test.ts index 99bac0936df..c597a936cf9 100644 --- a/internal-packages/schedule-engine/test/scheduleEngine.test.ts +++ b/internal-packages/schedule-engine/test/scheduleEngine.test.ts @@ -111,17 +111,12 @@ describe("ScheduleEngine Integration", () => { expectedUpcoming.push(upcoming); } - // Manually enqueue the first scheduled task to kick off the lifecycle + // Manually enqueue the first scheduled task to kick off the lifecycle. + // Engine no longer persists nextScheduledTimestamp — the same time can be + // reproduced from the cron expression alone, so we use expectedExecutionTime + // directly downstream. await engine.registerNextTaskScheduleInstance({ instanceId: scheduleInstance.id }); - // Get the actual nextScheduledTimestamp that was calculated by the engine - const instanceAfterRegistration = await prisma.taskScheduleInstance.findFirst({ - where: { id: scheduleInstance.id }, - }); - const actualNextExecution = instanceAfterRegistration?.nextScheduledTimestamp; - expect(actualNextExecution).toBeDefined(); - expect(actualNextExecution).toEqual(expectedExecutionTime); - // Wait for the first execution console.log("Waiting for first execution..."); const startTime = Date.now(); @@ -201,64 +196,44 @@ describe("ScheduleEngine Integration", () => { payload: { scheduleId: "sched_abc123", type: "DECLARATIVE", - timestamp: actualNextExecution, - lastTimestamp: undefined, // First run has no lastTimestamp + timestamp: expectedExecutionTime, + // First-ever fire: cron's previous slot predates instance.createdAt + // (which was set ~now), so lastTimestamp is undefined. This preserves + // the `if (!payload.lastTimestamp)` first-run sentinel customers rely on. + lastTimestamp: undefined, externalId: "ext-123", timezone: "UTC", upcoming: expect.arrayContaining([expect.any(Date)]), }, scheduleInstanceId: scheduleInstance.id, scheduleId: taskSchedule.id, - exactScheduleTime: actualNextExecution, + exactScheduleTime: expectedExecutionTime, }); // Verify the second execution parameters - if (actualNextExecution) { - const expectedSecondExecution = new Date(actualNextExecution); - expectedSecondExecution.setMinutes(actualNextExecution.getMinutes() + 1); + const expectedSecondExecution = new Date(expectedExecutionTime); + expectedSecondExecution.setMinutes(expectedExecutionTime.getMinutes() + 1); - expect(secondExecution.params).toEqual({ - taskIdentifier: "test-task", - environment: expect.objectContaining({ - id: environment.id, - type: "PRODUCTION", - }), - payload: { - scheduleId: "sched_abc123", - type: "DECLARATIVE", - timestamp: expectedSecondExecution, - lastTimestamp: actualNextExecution, // Second run should have the first execution time as lastTimestamp - externalId: "ext-123", - timezone: "UTC", - upcoming: expect.arrayContaining([expect.any(Date)]), - }, - scheduleInstanceId: scheduleInstance.id, - scheduleId: taskSchedule.id, - exactScheduleTime: expectedSecondExecution, - }); - } - - // Verify database updates occurred after both executions - const updatedSchedule = await prisma.taskSchedule.findFirst({ - where: { id: taskSchedule.id }, - }); - expect(updatedSchedule?.lastRunTriggeredAt).toBeTruthy(); - expect(updatedSchedule?.lastRunTriggeredAt).toBeInstanceOf(Date); - - const finalInstance = await prisma.taskScheduleInstance.findFirst({ - where: { id: scheduleInstance.id }, + expect(secondExecution.params).toEqual({ + taskIdentifier: "test-task", + environment: expect.objectContaining({ + id: environment.id, + type: "PRODUCTION", + }), + payload: { + scheduleId: "sched_abc123", + type: "DECLARATIVE", + timestamp: expectedSecondExecution, + // Previous slot before second execution = first execution time. + lastTimestamp: expectedExecutionTime, + externalId: "ext-123", + timezone: "UTC", + upcoming: expect.arrayContaining([expect.any(Date)]), + }, + scheduleInstanceId: scheduleInstance.id, + scheduleId: taskSchedule.id, + exactScheduleTime: expectedSecondExecution, }); - - // After two executions, lastScheduledTimestamp should be the second execution time - if (actualNextExecution && secondExecution.params.exactScheduleTime) { - const secondExecutionTime = secondExecution.params.exactScheduleTime; - expect(finalInstance?.lastScheduledTimestamp).toEqual(secondExecutionTime); - - // The next scheduled timestamp should be 1 minute after the second execution - const expectedThirdExecution = new Date(secondExecutionTime); - expectedThirdExecution.setMinutes(secondExecutionTime.getMinutes() + 1); - expect(finalInstance?.nextScheduledTimestamp).toEqual(expectedThirdExecution); - } } finally { // Clean up: stop the worker await engine.quit(); diff --git a/internal-packages/schedule-engine/test/scheduleRecovery.test.ts b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts index 99a3351aed7..586ea73fc3a 100644 --- a/internal-packages/schedule-engine/test/scheduleRecovery.test.ts +++ b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts @@ -93,18 +93,14 @@ describe("Schedule Recovery", () => { // Perform recovery await engine.recoverSchedulesInEnvironment(project.id, environment.id); - // Verify that a job was created + // Verify that a job was created. The engine no longer persists + // nextScheduledTimestamp; correctness is now determined entirely by the + // job sitting in the worker queue. const jobAfterRecovery = await engine.getJob( `scheduled-task-instance:${scheduleInstance.id}` ); expect(jobAfterRecovery).not.toBeNull(); expect(jobAfterRecovery?.job).toBe("schedule.triggerScheduledTask"); - - // Verify the instance was updated with next scheduled timestamp - const updatedInstance = await prisma.taskScheduleInstance.findFirst({ - where: { id: scheduleInstance.id }, - }); - expect(updatedInstance?.nextScheduledTimestamp).toBeDefined(); } finally { await engine.quit(); } @@ -313,20 +309,14 @@ describe("Schedule Recovery", () => { // Perform recovery await engine.recoverSchedulesInEnvironment(project.id, environment.id); - // Verify that jobs were created for all instances + // Verify that jobs were created for all instances. The engine no longer + // persists nextScheduledTimestamp — the worker-queue presence is the + // source of truth. for (const instance of instances) { const job = await engine.getJob(`scheduled-task-instance:${instance.id}`); expect(job).not.toBeNull(); expect(job?.job).toBe("schedule.triggerScheduledTask"); } - - // Verify all instances were updated - for (const instance of instances) { - const updatedInstance = await prisma.taskScheduleInstance.findFirst({ - where: { id: instance.id }, - }); - expect(updatedInstance?.nextScheduledTimestamp).toBeDefined(); - } } finally { await engine.quit(); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ab379c6a8c..52298e5887d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2828,6 +2828,25 @@ importers: specifier: 5.5.4 version: 5.5.4 + references/scheduled-tasks: + dependencies: + '@trigger.dev/sdk': + specifier: workspace:* + version: link:../../packages/trigger-sdk + zod: + specifier: 3.25.76 + version: 3.25.76 + devDependencies: + '@trigger.dev/build': + specifier: workspace:* + version: link:../../packages/build + trigger.dev: + specifier: workspace:* + version: link:../../packages/cli-v3 + typescript: + specifier: 5.5.4 + version: 5.5.4 + references/seed: dependencies: '@sinclair/typebox': diff --git a/references/scheduled-tasks/README.md b/references/scheduled-tasks/README.md new file mode 100644 index 00000000000..c6e4dada3a2 --- /dev/null +++ b/references/scheduled-tasks/README.md @@ -0,0 +1,39 @@ +# scheduled-tasks reference + +E2E test bed for the schedule engine. Designed to verify the worker-payload flow +that carries `payload.lastTimestamp` forward across fires (no DB round-trip, +no cron-derivation drift). + +## Setup + +1. Create a project in the local webapp at http://localhost:3030 and copy the + project ref from Project Settings. +2. Replace `proj_REPLACE_ME` in `trigger.config.ts` with that ref. +3. From this directory: + +```bash +pnpm exec trigger login -a http://localhost:3030 --profile local +pnpm exec trigger dev --profile local +``` + +4. Open the project in the dashboard and visit each schedule. Click "Attach to + environment" → dev. The first fire happens at the next cron slot. + +## What to look for + +- **`first-fire-detector`** — first run of a freshly-attached schedule should + log `first-fire-detector PASS (first fire)` with `lastTimestamp: null`. + Subsequent runs log `PASS (Nth fire)` with a Date. +- **`interval-validator`** — every non-first fire of an every-minute task + should log `interval-validator PASS` with `actualIntervalMs: 60000`. A + `FAIL` here means the worker payload isn't carrying the previous fire time + correctly. +- **`upcoming-validator`** — every fire should log `upcoming-validator PASS` + with 10 strictly-increasing slots, each 60s apart. +- **`every-minute`**, **`every-five-minutes`**, **`hourly-utc`** — sanity + checks across cadences. Inspect `payload` in the dashboard to confirm + `timestamp` and `lastTimestamp` look right. +- **`daily-*`** schedules — won't fire during a short dev session, but the + attach action should enqueue a Redis job for the next slot in the listed + timezone. Worth checking that next-fire time matches the expected wall-clock + in that tz. diff --git a/references/scheduled-tasks/package.json b/references/scheduled-tasks/package.json new file mode 100644 index 00000000000..916959e0bca --- /dev/null +++ b/references/scheduled-tasks/package.json @@ -0,0 +1,18 @@ +{ + "name": "references-scheduled-tasks", + "private": true, + "type": "module", + "scripts": { + "dev": "trigger dev", + "deploy": "trigger deploy" + }, + "dependencies": { + "@trigger.dev/sdk": "workspace:*", + "zod": "3.25.76" + }, + "devDependencies": { + "@trigger.dev/build": "workspace:*", + "trigger.dev": "workspace:*", + "typescript": "^5.5.4" + } +} diff --git a/references/scheduled-tasks/src/trigger/schedules.ts b/references/scheduled-tasks/src/trigger/schedules.ts new file mode 100644 index 00000000000..88be0add7cf --- /dev/null +++ b/references/scheduled-tasks/src/trigger/schedules.ts @@ -0,0 +1,216 @@ +import { logger, schedules } from "@trigger.dev/sdk"; + +/** + * Reference project for E2E-testing the schedule engine. + * + * The schedule engine carries the previous fire time forward via the worker + * queue payload (no DB round-trip). These tasks exercise the customer-visible + * surface so we can verify the flow end-to-end: + * + * - First-ever fire reports `payload.lastTimestamp === undefined`. + * - Subsequent fires report `payload.lastTimestamp` equal to the previous + * `payload.timestamp` exactly (no cron-derivation drift). + * - `payload.upcoming` is a strictly-increasing array of 10 future slots. + * - Multiple cron syntaxes and timezones all behave consistently. + * + * Validators (every-minute, interval, upcoming) emit explicit PASS / FAIL + * log lines so you can grep `trigger dev` output for regressions. + */ + +// --- Basic recurring tasks ---------------------------------------------------- + +export const everyMinute = schedules.task({ + id: "every-minute", + cron: "* * * * *", + run: async (payload) => { + logger.info("every-minute fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + upcomingCount: payload.upcoming.length, + timezone: payload.timezone, + scheduleId: payload.scheduleId, + }); + + return { + timestamp: payload.timestamp, + lastTimestamp: payload.lastTimestamp, + }; + }, +}); + +export const everyFiveMinutes = schedules.task({ + id: "every-five-minutes", + cron: "*/5 * * * *", + run: async (payload) => { + logger.info("every-five-minutes fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + }); + }, +}); + +export const hourlyUtc = schedules.task({ + id: "hourly-utc", + cron: "0 * * * *", + run: async (payload) => { + logger.info("hourly-utc fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + timezone: payload.timezone, + }); + }, +}); + +// --- Timezone coverage -------------------------------------------------------- +// +// These exercise the engine's tz handling. They fire infrequently, so they're +// mostly useful for inspecting enqueued jobs in the dashboard rather than for +// short dev sessions. + +export const dailyNewYorkMorning = schedules.task({ + id: "daily-new-york-morning", + cron: { pattern: "0 9 * * *", timezone: "America/New_York" }, + run: async (payload) => { + logger.info("daily-new-york-morning fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + timezone: payload.timezone, + // For DST-observing tz the cron interpretation may shift twice/year — + // worth eyeballing the timestamp lines up with 09:00 NY local. + }); + }, +}); + +export const dailyLondonEvening = schedules.task({ + id: "daily-london-evening", + cron: { pattern: "0 18 * * *", timezone: "Europe/London" }, + run: async (payload) => { + logger.info("daily-london-evening fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + timezone: payload.timezone, + }); + }, +}); + +export const dailyTokyoMidnight = schedules.task({ + id: "daily-tokyo-midnight", + cron: { pattern: "0 0 * * *", timezone: "Asia/Tokyo" }, + run: async (payload) => { + logger.info("daily-tokyo-midnight fired", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + timezone: payload.timezone, + }); + }, +}); + +// --- Validators --------------------------------------------------------------- +// +// These explicitly check invariants and emit PASS / FAIL log lines. Running +// `trigger dev` and watching these for several fires gives us the E2E signal +// that the worker-payload flow is correct. + +/** + * Verifies that the very first fire reports `lastTimestamp === undefined` and + * subsequent fires report a real Date. This is the customer-visible surface + * for the "first-run sentinel" pattern: `if (!payload.lastTimestamp) initOnce()`. + */ +export const firstFireDetector = schedules.task({ + id: "first-fire-detector", + cron: "* * * * *", + run: async (payload) => { + const isFirstFire = payload.lastTimestamp === undefined; + logger.info( + isFirstFire ? "first-fire-detector PASS (first fire)" : "first-fire-detector PASS (Nth fire)", + { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp?.toISOString() ?? null, + isFirstFire, + } + ); + }, +}); + +/** + * Verifies that for non-first fires, `timestamp - lastTimestamp` equals exactly + * the cron interval (60s for every-minute). This is the key invariant of the + * workerCatalog approach — the value is carried through the Redis payload, so + * it should be exact, not a cron-derived approximation. + */ +export const intervalValidator = schedules.task({ + id: "interval-validator", + cron: "* * * * *", + run: async (payload) => { + if (payload.lastTimestamp === undefined) { + logger.info("interval-validator skipped (first fire — no lastTimestamp)", { + timestamp: payload.timestamp.toISOString(), + }); + return; + } + + const expectedIntervalMs = 60_000; + const actualIntervalMs = payload.timestamp.getTime() - payload.lastTimestamp.getTime(); + const passed = actualIntervalMs === expectedIntervalMs; + + logger.info(passed ? "interval-validator PASS" : "interval-validator FAIL", { + timestamp: payload.timestamp.toISOString(), + lastTimestamp: payload.lastTimestamp.toISOString(), + expectedIntervalMs, + actualIntervalMs, + driftMs: actualIntervalMs - expectedIntervalMs, + }); + + if (!passed) { + throw new Error( + `interval-validator FAIL: expected ${expectedIntervalMs}ms between fires, got ${actualIntervalMs}ms` + ); + } + }, +}); + +/** + * Verifies that `payload.upcoming` is a strictly-increasing array of 10 future + * slots, each 60s apart for `* * * * *`. + */ +export const upcomingValidator = schedules.task({ + id: "upcoming-validator", + cron: "* * * * *", + run: async (payload) => { + const issues: string[] = []; + + if (payload.upcoming.length !== 10) { + issues.push(`expected upcoming.length === 10, got ${payload.upcoming.length}`); + } + + for (let i = 0; i < payload.upcoming.length; i++) { + const slot = payload.upcoming[i]; + if (slot.getTime() <= payload.timestamp.getTime()) { + issues.push(`upcoming[${i}] (${slot.toISOString()}) is not strictly after timestamp`); + } + if (i > 0) { + const prev = payload.upcoming[i - 1]; + const gapMs = slot.getTime() - prev.getTime(); + if (gapMs !== 60_000) { + issues.push( + `upcoming[${i}] - upcoming[${i - 1}] = ${gapMs}ms, expected 60000ms (every-minute cron)` + ); + } + } + } + + if (issues.length === 0) { + logger.info("upcoming-validator PASS", { + timestamp: payload.timestamp.toISOString(), + upcoming: payload.upcoming.map((d) => d.toISOString()), + }); + } else { + logger.error("upcoming-validator FAIL", { + timestamp: payload.timestamp.toISOString(), + issues, + upcoming: payload.upcoming.map((d) => d.toISOString()), + }); + throw new Error(`upcoming-validator FAIL: ${issues.join("; ")}`); + } + }, +}); diff --git a/references/scheduled-tasks/trigger.config.ts b/references/scheduled-tasks/trigger.config.ts new file mode 100644 index 00000000000..68e4c7e21e5 --- /dev/null +++ b/references/scheduled-tasks/trigger.config.ts @@ -0,0 +1,21 @@ +import { defineConfig } from "@trigger.dev/sdk"; + +export default defineConfig({ + project: "proj_qcibamtpklwidqfzzyir", + runtime: "node", + machine: "small-1x", + maxDuration: 60, + dirs: ["./src/trigger"], + logLevel: "debug", + compatibilityFlags: ["run_engine_v2"], + retries: { + enabledInDev: true, + default: { + maxAttempts: 3, + minTimeoutInMs: 1000, + maxTimeoutInMs: 10000, + factor: 2, + randomize: true, + }, + }, +}); diff --git a/references/scheduled-tasks/tsconfig.json b/references/scheduled-tasks/tsconfig.json new file mode 100644 index 00000000000..3bb455e5d40 --- /dev/null +++ b/references/scheduled-tasks/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2023", + "module": "Node16", + "moduleResolution": "Node16", + "esModuleInterop": true, + "strict": true, + "skipLibCheck": true, + "customConditions": ["@triggerdotdev/source"], + "lib": ["DOM", "DOM.Iterable"], + "noEmit": true + }, + "include": ["./src/**/*.ts", "trigger.config.ts"] +} From 23c99d8b175672719f6d32dcdf9126567a7396b9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 12:53:22 +0100 Subject: [PATCH 2/8] fix(schedule-engine): split fromTimestamp from carried lastScheduleTime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address CodeRabbit review on PR #3476: 1. Engine — split fromTimestamp from lastScheduleTime `RegisterScheduleInstanceParams.fromTimestamp` was doing double duty as both the next-cron-slot anchor and the "previous fire time" embedded in the next worker job's payload. On skipped ticks (inactive schedule, dev env disconnected, etc.) the engine still re-registered with `fromTimestamp = scheduleTimestamp`, so a long pause/disconnect would quietly overwrite the real last-fire time with a stream of skipped slots — defeating the workerCatalog accuracy benefit for the very case it was meant to handle. `RegisterScheduleInstanceParams` now has a separate optional `lastScheduleTime` field. `fromTimestamp` advances on every tick; `lastScheduleTime` only advances on real fires. After a skip the re-registration carries forward the existing `params.lastScheduleTime` (or the legacy `instance.lastScheduledTimestamp` fallback). 2. ScheduleListPresenter — guard cron back-calculation `previousScheduledTimestamp(...)` calls cron-parser, which throws on malformed expressions. A single bad row would have failed the whole schedules-list response. Wrapped per-row in try/catch so a degraded row falls back to `lastRun: undefined` instead of taking down the page. 3. scheduleEngine integration test — anchor on observed values The test compared against a precomputed "next minute boundary" Date, which is flaky if the test setup straddles a minute boundary. Switched to deriving expectations from the first observed `exactScheduleTime` and asserting relative invariants (60s gap, second lastTimestamp equals first timestamp). --- .../v3/ScheduleListPresenter.server.ts | 19 +++++-- .../schedule-engine/src/engine/index.ts | 28 +++++++--- .../schedule-engine/src/engine/types.ts | 12 ++++ .../test/scheduleEngine.test.ts | 55 +++++++++---------- 4 files changed, 71 insertions(+), 43 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index aa9b233c71b..6b2130a990a 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -249,12 +249,19 @@ export class ScheduleListPresenter extends BasePresenter { // Approximate "last run" from the cron's previous slot. If that slot // predates the schedule itself, the schedule hasn't fired yet — show // undefined rather than a misleading timestamp. UI is best-effort; - // accurate run history is on the schedule's runs page. - const cronPrev = previousScheduledTimestamp( - schedule.generatorExpression, - schedule.timezone - ); - const lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; + // accurate run history is on the schedule's runs page. cron-parser + // throws on malformed expressions, so degrade to undefined per-row + // rather than failing the whole list. + let lastRun: Date | undefined; + try { + const cronPrev = previousScheduledTimestamp( + schedule.generatorExpression, + schedule.timezone + ); + lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; + } catch { + lastRun = undefined; + } return { id: schedule.id, diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index bd5909e5301..0cc0f1516ab 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -194,15 +194,16 @@ export class ScheduleEngine { timezone: instance.taskSchedule.timezone, }); - // Enqueue the scheduled task. Pass fromTimestamp through as the next - // job's lastScheduleTime so the dequeueing engine can populate - // payload.lastTimestamp without re-reading state from the DB. When - // fromTimestamp is undefined (first-ever registration / recovery), - // lastScheduleTime is also undefined → consumer reports undefined. + // Enqueue the scheduled task. The next job's `lastScheduleTime` + // payload is the *actual* previous fire time (passed in by the + // caller), not `fromTimestamp` — `fromTimestamp` advances on every + // tick (including skips) so it can't be used as the previous-fire + // anchor without leaking skipped slots into customer-visible + // payload.lastTimestamp. await this.enqueueScheduledTask( params.instanceId, nextScheduledTimestamp, - params.fromTimestamp + params.lastScheduleTime ); // Record metrics @@ -535,13 +536,22 @@ export class ScheduleEngine { }); } - // Register the next run, calculating from the timestamp we just fired (or - // skipped) so we don't need to round-trip through DB state. - // Rewritten try/catch to use tryCatch utility + // Register the next run. `fromTimestamp` advances on every tick so + // the next cron slot keeps marching forward even through skips. + // `lastScheduleTime` is the actual previous fire time the next job + // will report as `payload.lastTimestamp` — only advance it when we + // actually triggered, otherwise carry forward the existing value so + // a long pause/disconnect doesn't quietly overwrite the real + // last-fire timestamp with a series of skipped slots. + const carriedLastScheduleTime = shouldTrigger + ? scheduleTimestamp + : params.lastScheduleTime ?? instance.lastScheduledTimestamp ?? undefined; + const [nextRunError] = await tryCatch( this.registerNextTaskScheduleInstance({ instanceId: params.instanceId, fromTimestamp: scheduleTimestamp, + lastScheduleTime: carriedLastScheduleTime, }) ); if (nextRunError) { diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts index f424ee6a4d3..f323e13909b 100644 --- a/internal-packages/schedule-engine/src/engine/types.ts +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -79,5 +79,17 @@ export interface TriggerScheduleParams { export interface RegisterScheduleInstanceParams { instanceId: string; + /** + * Anchor for computing the next cron slot. Defaults to now() when omitted. + * This advances on every tick (fired or skipped) so the next slot keeps + * marching forward regardless of skip reasons. + */ fromTimestamp?: Date; + /** + * The actual previous fire time to embed in the next worker job's payload, + * which becomes that job's `payload.lastTimestamp` on dequeue. Distinct + * from `fromTimestamp` so that skipped ticks (inactive schedule, dev env + * disconnected, etc.) do NOT advance this — only real fires do. + */ + lastScheduleTime?: Date; } diff --git a/internal-packages/schedule-engine/test/scheduleEngine.test.ts b/internal-packages/schedule-engine/test/scheduleEngine.test.ts index c597a936cf9..c261697dacc 100644 --- a/internal-packages/schedule-engine/test/scheduleEngine.test.ts +++ b/internal-packages/schedule-engine/test/scheduleEngine.test.ts @@ -98,23 +98,11 @@ describe("ScheduleEngine Integration", () => { }, }); - // Calculate the expected next execution time (next minute boundary) - const now = new Date(); - const expectedExecutionTime = new Date(now); - expectedExecutionTime.setMinutes(now.getMinutes() + 1, 0, 0); // Next minute, 0 seconds, 0 milliseconds - - // Calculate the expected upcoming execution times (next 10 minutes after the first execution) - const expectedUpcoming = []; - for (let i = 1; i <= 10; i++) { - const upcoming = new Date(expectedExecutionTime); - upcoming.setMinutes(expectedExecutionTime.getMinutes() + i); - expectedUpcoming.push(upcoming); - } - // Manually enqueue the first scheduled task to kick off the lifecycle. - // Engine no longer persists nextScheduledTimestamp — the same time can be - // reproduced from the cron expression alone, so we use expectedExecutionTime - // directly downstream. + // Anchor expectations to the first observed `exactScheduleTime` rather + // than a precomputed wall-clock value — registration that happens to + // straddle a minute boundary used to flake tests asserting against a + // pre-baked "next minute". await engine.registerNextTaskScheduleInstance({ instanceId: scheduleInstance.id }); // Wait for the first execution @@ -176,6 +164,17 @@ describe("ScheduleEngine Integration", () => { } } + // Anchor all expectations to what the engine actually fired with, so + // the test stays deterministic regardless of when within a minute it + // started. + const firstScheduledTime = firstExecution.params.exactScheduleTime; + const secondScheduledTime = secondExecution.params.exactScheduleTime; + expect(firstScheduledTime).toBeDefined(); + expect(secondScheduledTime).toBeDefined(); + + // Each cron slot for "* * * * *" is exactly 60s apart. + expect(secondScheduledTime!.getTime() - firstScheduledTime!.getTime()).toBe(60_000); + // Verify the first execution parameters expect(firstExecution.params).toEqual({ taskIdentifier: "test-task", @@ -196,10 +195,12 @@ describe("ScheduleEngine Integration", () => { payload: { scheduleId: "sched_abc123", type: "DECLARATIVE", - timestamp: expectedExecutionTime, - // First-ever fire: cron's previous slot predates instance.createdAt - // (which was set ~now), so lastTimestamp is undefined. This preserves - // the `if (!payload.lastTimestamp)` first-run sentinel customers rely on. + timestamp: firstScheduledTime, + // First-ever fire: no `lastScheduleTime` carried in the worker + // payload and `instance.lastScheduledTimestamp` is null on a + // fresh instance, so lastTimestamp is undefined. This preserves + // the `if (!payload.lastTimestamp)` first-run sentinel customers + // rely on. lastTimestamp: undefined, externalId: "ext-123", timezone: "UTC", @@ -207,13 +208,10 @@ describe("ScheduleEngine Integration", () => { }, scheduleInstanceId: scheduleInstance.id, scheduleId: taskSchedule.id, - exactScheduleTime: expectedExecutionTime, + exactScheduleTime: firstScheduledTime, }); // Verify the second execution parameters - const expectedSecondExecution = new Date(expectedExecutionTime); - expectedSecondExecution.setMinutes(expectedExecutionTime.getMinutes() + 1); - expect(secondExecution.params).toEqual({ taskIdentifier: "test-task", environment: expect.objectContaining({ @@ -223,16 +221,17 @@ describe("ScheduleEngine Integration", () => { payload: { scheduleId: "sched_abc123", type: "DECLARATIVE", - timestamp: expectedSecondExecution, - // Previous slot before second execution = first execution time. - lastTimestamp: expectedExecutionTime, + timestamp: secondScheduledTime, + // The previous fire's exactScheduleTime is carried through the + // worker payload as `lastScheduleTime` and surfaced here. + lastTimestamp: firstScheduledTime, externalId: "ext-123", timezone: "UTC", upcoming: expect.arrayContaining([expect.any(Date)]), }, scheduleInstanceId: scheduleInstance.id, scheduleId: taskSchedule.id, - exactScheduleTime: expectedSecondExecution, + exactScheduleTime: secondScheduledTime, }); } finally { // Clean up: stop the worker From 695eab1a4b822c5e17b860f13637ffa15e008165 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 13:55:20 +0100 Subject: [PATCH 3/8] fix(webapp): skip cron back-calc for inactive schedules in list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ScheduleListPresenter was deriving "Last run" via cron's previous slot even for deactivated schedules, causing the dashboard to show "1 minute ago" for a schedule that was actually deactivated months ago. Gate the cron back-calc on `schedule.active`. Inactive schedules now show "–" in the Last run cell, matching the semantic that they aren't firing. Per Devin review on PR #3476. --- .../v3/ScheduleListPresenter.server.ts | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 6b2130a990a..99143f49f3d 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -246,21 +246,25 @@ export class ScheduleListPresenter extends BasePresenter { }); const schedules: ScheduleListItem[] = rawSchedules.map((schedule) => { - // Approximate "last run" from the cron's previous slot. If that slot - // predates the schedule itself, the schedule hasn't fired yet — show - // undefined rather than a misleading timestamp. UI is best-effort; - // accurate run history is on the schedule's runs page. cron-parser + // Approximate "last run" from the cron's previous slot. Skip inactive + // schedules — the cron's previous slot reflects what *would* have + // fired, but a deactivated schedule didn't actually fire there. Skip + // schedules whose cron's previous slot predates their creation — the + // schedule hasn't existed long enough to have fired. cron-parser // throws on malformed expressions, so degrade to undefined per-row - // rather than failing the whole list. + // rather than failing the whole list. UI is best-effort; the runs + // page is the source of truth. let lastRun: Date | undefined; - try { - const cronPrev = previousScheduledTimestamp( - schedule.generatorExpression, - schedule.timezone - ); - lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; - } catch { - lastRun = undefined; + if (schedule.active) { + try { + const cronPrev = previousScheduledTimestamp( + schedule.generatorExpression, + schedule.timezone + ); + lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; + } catch { + lastRun = undefined; + } } return { From 6765f63e8c9f41a9413c8844aa52d4bc138868c1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 14:04:41 +0100 Subject: [PATCH 4/8] fix(schedule-engine): seed lastScheduleTime on recovery via cron-prev MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recovery (Redis job missing for an instance) was re-registering with no lastScheduleTime, so the post-recovery first fire fell through the fallback chain to instance.lastScheduledTimestamp — which is frozen at the value last written before this PR stopped writing the column. For schedules that fired heavily after deploy then suffered a Redis loss, that meant payload.lastTimestamp would report a stale pre-deploy timestamp on the first fire post-recovery. Compute lastScheduleTime as the cron expression's previous slot (pure cron math, no DB read — recovery fan-outs must not add load to hot tables). Guarded against the cron-prev predating the instance's createdAt and against cron-parser throwing on malformed expressions. For continuously-running schedules this equals the actual last fire time; for long-paused or recently-edited schedules it's the same approximation the dashboard "Last run" cell accepts. Per Devin and CodeRabbit review on PR #3476. --- .../schedule-engine/src/engine/index.ts | 41 +++++++++++++++++-- .../src/engine/scheduleCalculation.ts | 20 +++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 0cc0f1516ab..f23755f5213 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -11,7 +11,11 @@ import { Logger } from "@trigger.dev/core/logger"; import { PrismaClient } from "@trigger.dev/database"; import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker"; import { calculateDistributedExecutionTime } from "./distributedScheduling.js"; -import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js"; +import { + calculateNextScheduledTimestamp, + nextScheduledTimestamps, + previousScheduledTimestamp, +} from "./scheduleCalculation.js"; import { RegisterScheduleInstanceParams, ScheduleEngineOptions, @@ -699,10 +703,12 @@ export class ScheduleEngine { select: { id: true, generatorExpression: true, + timezone: true, instances: { select: { id: true, environmentId: true, + createdAt: true, }, }, }, @@ -777,8 +783,9 @@ export class ScheduleEngine { instance: { id: string; environmentId: string; + createdAt: Date; }; - schedule: { id: string; generatorExpression: string }; + schedule: { id: string; generatorExpression: string; timezone: string | null }; }) { // inspect the schedule worker to see if there is a job for this instance const job = await this.worker.getJob(`scheduled-task-instance:${instance.id}`); @@ -793,13 +800,39 @@ export class ScheduleEngine { return "skipped"; } + // Approximate the previous fire from the cron expression itself rather + // than reading state from the DB. For a continuously-running schedule + // this equals the actual last fire time. For paused-then-resumed + // schedules or recently-edited cron expressions the value will be + // approximate — same trade-off the dashboard "Last run" cell accepts. + // Guarded against the schedule not having existed long enough to have + // fired (cron's previous slot before instance creation), and against + // cron-parser throwing on malformed expressions. Pure cron math, no DB + // read — recovery fan-outs (Redis crash, restart storms) must not add + // load to hot tables. + let lastScheduleTime: Date | undefined; + try { + const cronPrev = previousScheduledTimestamp( + schedule.generatorExpression, + schedule.timezone + ); + if (cronPrev.getTime() > instance.createdAt.getTime()) { + lastScheduleTime = cronPrev; + } + } catch { + lastScheduleTime = undefined; + } + this.logger.info("No job found for instance, registering next run", { instanceId: instance.id, schedule, + lastScheduleTime: lastScheduleTime?.toISOString(), }); - // If the job does not exist, register the next run - await this.registerNextTaskScheduleInstance({ instanceId: instance.id }); + await this.registerNextTaskScheduleInstance({ + instanceId: instance.id, + lastScheduleTime, + }); return "recovered"; } diff --git a/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts b/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts index e5564187060..140ea4e285f 100644 --- a/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts +++ b/internal-packages/schedule-engine/src/engine/scheduleCalculation.ts @@ -29,6 +29,26 @@ function calculateNextStep(schedule: string, timezone: string | null, currentDat .toDate(); } +/** + * Cron's previous slot relative to `fromTimestamp`. For a continuously- + * running schedule this equals the actual last fire time; for paused or + * DST-edge cases it's an approximation. Used only on the recovery path + * where the actual last fire isn't recoverable from in-flight worker state. + */ +export function previousScheduledTimestamp( + cron: string, + timezone: string | null, + fromTimestamp: Date = new Date() +) { + return parseExpression(cron, { + currentDate: fromTimestamp, + utc: timezone === null, + tz: timezone ?? undefined, + }) + .prev() + .toDate(); +} + export function nextScheduledTimestamps( cron: string, timezone: string | null, From db83e67b6c5679e2fe9f260d26e157e591b82daf Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 14:06:57 +0100 Subject: [PATCH 5/8] chore(schedule-engine): demote per-tick log lines to debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audit the schedule engine's logger.info calls and demote anything that fires per-tick or per-instance to logger.debug. The previous mix would emit ~3 info lines per fire ("Calculated next schedule timestamp", "Triggering scheduled task", "Successfully triggered scheduled task") which scales linearly with schedule volume. Demoted to debug: - "Calculated next schedule timestamp" — every tick (re-register after every fire) - "Triggering scheduled task" — every fire - "Successfully triggered scheduled task" — every fire - "Recovering schedule" — per-instance in the recovery loop, fan-out potential during recovery storms - "Job already exists for instance" — per-instance recovery - "No job found for instance, registering next run" — per-instance recovery Kept at info (lifecycle / per-event, fires once): - Worker startup / disabled / shutdown - "Recovering schedules in environment" (per recovery call, not per instance) - "No instances found for environment" (empty recovery summary) --- .../schedule-engine/src/engine/index.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index f23755f5213..3c7ba1389fe 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -189,7 +189,7 @@ export class ScheduleEngine { const schedulingDelayMs = nextScheduledTimestamp.getTime() - Date.now(); span.setAttribute("scheduling_delay_ms", schedulingDelayMs); - this.logger.info("Calculated next schedule timestamp", { + this.logger.debug("Calculated next schedule timestamp", { instanceId: params.instanceId, taskIdentifier: instance.taskSchedule.taskIdentifier, nextScheduledTimestamp: nextScheduledTimestamp.toISOString(), @@ -430,7 +430,7 @@ export class ScheduleEngine { span.setAttribute("scheduling_accuracy_ms", schedulingAccuracyMs); span.setAttribute("actual_execution_time", actualExecutionTime.toISOString()); - this.logger.info("Triggering scheduled task", { + this.logger.debug("Triggering scheduled task", { instanceId: params.instanceId, taskIdentifier: instance.taskSchedule.taskIdentifier, scheduleTimestamp: scheduleTimestamp.toISOString(), @@ -481,7 +481,7 @@ export class ScheduleEngine { ); } else if (result) { if (result.success) { - this.logger.info("Successfully triggered scheduled task", { + this.logger.debug("Successfully triggered scheduled task", { instanceId: params.instanceId, taskIdentifier: instance.taskSchedule.taskIdentifier, durationMs: triggerDuration, @@ -742,7 +742,7 @@ export class ScheduleEngine { } as { recovered: string[]; skipped: string[] }; for (const { instance, schedule } of instancesWithSchedule) { - this.logger.info("Recovering schedule", { + this.logger.debug("Recovering schedule", { schedule, instance, }); @@ -791,7 +791,7 @@ export class ScheduleEngine { const job = await this.worker.getJob(`scheduled-task-instance:${instance.id}`); if (job) { - this.logger.info("Job already exists for instance", { + this.logger.debug("Job already exists for instance", { instanceId: instance.id, job, schedule, @@ -823,7 +823,7 @@ export class ScheduleEngine { lastScheduleTime = undefined; } - this.logger.info("No job found for instance, registering next run", { + this.logger.debug("No job found for instance, registering next run", { instanceId: instance.id, schedule, lastScheduleTime: lastScheduleTime?.toISOString(), From cd5e6299a21dcd6a38553ba49cd144e6900c2ada Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 16:29:35 +0100 Subject: [PATCH 6/8] fix(webapp): anchor schedules-list lastRun on updatedAt, not createdAt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ScheduleListPresenter was guarding the cron-derived lastRun against schedule.createdAt, which only proves the row exists. If a schedule was edited (cron changed, timezone changed) or deactivated and then reactivated, the cron's previous slot might predate the most recent config change but still pass the createdAt guard — surfacing a "Last run" the schedule never actually fired at under the current configuration. Switch the guard to schedule.updatedAt. Prisma's @updatedAt bumps on every row update (cron change, timezone change, activate toggle, etc.), so the cron-derived slot is only shown once it's strictly after the most recent config change — meaning the current config has been in effect long enough to have actually fired at that slot. Per CodeRabbit review on PR #3476. --- .../presenters/v3/ScheduleListPresenter.server.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts index 99143f49f3d..19812b0a548 100644 --- a/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ScheduleListPresenter.server.ts @@ -197,6 +197,7 @@ export class ScheduleListPresenter extends BasePresenter { }, active: true, createdAt: true, + updatedAt: true, }, where: { projectId: project.id, @@ -249,11 +250,13 @@ export class ScheduleListPresenter extends BasePresenter { // Approximate "last run" from the cron's previous slot. Skip inactive // schedules — the cron's previous slot reflects what *would* have // fired, but a deactivated schedule didn't actually fire there. Skip - // schedules whose cron's previous slot predates their creation — the - // schedule hasn't existed long enough to have fired. cron-parser - // throws on malformed expressions, so degrade to undefined per-row - // rather than failing the whole list. UI is best-effort; the runs - // page is the source of truth. + // when the cron's previous slot predates `updatedAt`: any config + // change (cron edited, timezone changed, deactivate/reactivate) + // bumps updatedAt, and a slot from before the most recent change + // didn't fire under the current configuration. cron-parser throws + // on malformed expressions, so degrade to undefined per-row rather + // than failing the whole list. UI is best-effort; the runs page is + // the source of truth. let lastRun: Date | undefined; if (schedule.active) { try { @@ -261,7 +264,7 @@ export class ScheduleListPresenter extends BasePresenter { schedule.generatorExpression, schedule.timezone ); - lastRun = cronPrev.getTime() > schedule.createdAt.getTime() ? cronPrev : undefined; + lastRun = cronPrev.getTime() > schedule.updatedAt.getTime() ? cronPrev : undefined; } catch { lastRun = undefined; } From c9569e7341db93598c2b01dca8acc92e02412e6f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 21:21:43 +0100 Subject: [PATCH 7/8] test(schedule-engine): cover deploy-moment legacy-payload backward compat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an integration test for the path where an in-flight Redis job enqueued by the old engine (no `lastScheduleTime` in its payload) is dequeued by the new engine. The new engine must report the value persisted at `instance.lastScheduledTimestamp` as `payload.lastTimestamp` rather than reporting `undefined`, so customers don't see a one-fire gap in their lastTimestamp during the rollout. The existing integration test exercises the fresh-schedule path and the worker payload flow on subsequent fires. This new test specifically exercises the DB-column fallback in the `params.lastScheduleTime ?? instance.lastScheduledTimestamp ?? undefined` chain — the bridge that handles the legacy queue at deploy time. --- .../test/scheduleEngine.test.ts | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/internal-packages/schedule-engine/test/scheduleEngine.test.ts b/internal-packages/schedule-engine/test/scheduleEngine.test.ts index c261697dacc..4063521d823 100644 --- a/internal-packages/schedule-engine/test/scheduleEngine.test.ts +++ b/internal-packages/schedule-engine/test/scheduleEngine.test.ts @@ -239,4 +239,108 @@ describe("ScheduleEngine Integration", () => { } } ); + + // Deploy-moment backward compatibility. At deploy time, in-flight Redis jobs + // were enqueued by the old engine — their payload has no `lastScheduleTime` + // field — and `instance.lastScheduledTimestamp` is still populated (last + // written by the old engine pre-deploy). The new engine must report that DB + // value as `payload.lastTimestamp` so customers don't see a transient + // `undefined` for the one fire per schedule that drains the legacy queue. + containerTest( + "should fall back to instance.lastScheduledTimestamp when payload lacks lastScheduleTime", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const triggerCalls: TriggerScheduledTaskParams[] = []; + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: true, // Don't actually run the worker — calling triggerScheduledTask directly + pollIntervalMs: 1000, + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + triggerCalls.push(params); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: vi.fn().mockResolvedValue(true), + }); + + try { + const organization = await prisma.organization.create({ + data: { title: "Legacy Payload Org", slug: "legacy-payload-org" }, + }); + + const project = await prisma.project.create({ + data: { + name: "Legacy Payload Project", + slug: "legacy-payload-project", + externalRef: "legacy-payload-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "legacy-payload-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_legacy_1234", + pkApiKey: "pk_legacy_1234", + shortcode: "legacy-short", + }, + }); + + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_legacy_payload", + taskIdentifier: "legacy-payload-task", + projectId: project.id, + deduplicationKey: "legacy-payload-dedup", + userProvidedDeduplicationKey: false, + generatorExpression: "*/5 * * * *", + generatorDescription: "Every 5 minutes", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + externalId: "legacy-ext", + }, + }); + + // Pre-populate lastScheduledTimestamp on the instance — simulates the + // value the old engine wrote to the DB before this PR deployed. + const preDeployLastFire = new Date("2026-04-30T10:00:00.000Z"); + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + projectId: project.id, + active: true, + lastScheduledTimestamp: preDeployLastFire, + }, + }); + + // Call triggerScheduledTask directly without lastScheduleTime, + // simulating an in-flight Redis job enqueued by the old engine. + const exactScheduleTime = new Date("2026-04-30T10:05:00.000Z"); + await engine.triggerScheduledTask({ + instanceId: scheduleInstance.id, + finalAttempt: false, + exactScheduleTime, + // lastScheduleTime intentionally omitted — legacy payload shape + }); + + expect(triggerCalls.length).toBe(1); + expect(triggerCalls[0].payload.timestamp).toEqual(exactScheduleTime); + // Falls back to instance.lastScheduledTimestamp from the DB rather + // than reporting undefined for this one transitional fire. + expect(triggerCalls[0].payload.lastTimestamp).toEqual(preDeployLastFire); + } finally { + await engine.quit(); + } + } + ); }); From 39107817d84b05d825b06197c50a7f95ff37ae87 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 30 Apr 2026 21:52:28 +0100 Subject: [PATCH 8/8] fix(schedule-engine): centralize lastScheduleTime cron-prev fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit External callers of registerNextTaskScheduleInstance — the deploy-time declarative schedule sync, schedule upsert (cron change / activate), and recovery — all called with just `{ instanceId }`, no lastScheduleTime. That replaced the in-flight Redis job's payload with one having lastScheduleTime: undefined, so the next fire fell back to instance.lastScheduledTimestamp (a column this PR stops writing). On every subsequent app deploy, customers would have seen one stale fire per schedule, in perpetuity — the staleness compounding as the unmaintained DB column drifted further from reality. Move the cron-prev derivation inside registerNextTaskScheduleInstance itself: when the caller doesn't pass lastScheduleTime, derive from the cron expression's previous slot (guarded against the slot predating the instance's createdAt — preserves first-fire `undefined` semantics for brand-new schedules). Internal callers (after-fire, after-skip) keep passing explicit values so their carry-forward semantics are unchanged. Also drops the duplicate cron-prev block from `#recoverTaskScheduleInstance` — recovery now relies on the centralized fallback inside register. Two new tests: - "should derive lastScheduleTime from cron when external callers omit it" — exercises the deploy/upsert pattern on a long-running instance. - "should leave lastScheduleTime undefined for brand-new schedules" — preserves the first-run sentinel (`if (!payload.lastTimestamp)`) customers rely on. Per Devin review on PR #3476. --- .../schedule-engine/src/engine/index.ts | 74 +++---- .../test/scheduleRecovery.test.ts | 190 ++++++++++++++++++ 2 files changed, 229 insertions(+), 35 deletions(-) diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 3c7ba1389fe..2c78beccbc8 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -198,16 +198,43 @@ export class ScheduleEngine { timezone: instance.taskSchedule.timezone, }); - // Enqueue the scheduled task. The next job's `lastScheduleTime` - // payload is the *actual* previous fire time (passed in by the - // caller), not `fromTimestamp` — `fromTimestamp` advances on every - // tick (including skips) so it can't be used as the previous-fire - // anchor without leaking skipped slots into customer-visible - // payload.lastTimestamp. + // Determine the lastScheduleTime to embed in the next worker job's + // payload. If the caller passed it explicitly (the after-fire path + // does this with the just-fired timestamp, the after-skip path + // carries the existing value forward), use that. Otherwise — every + // external caller (deploy sync, schedule upsert, recovery) — derive + // from the cron expression's previous slot. + // + // Without this fallback, every deploy / cron edit would clobber the + // existing in-flight job's lastScheduleTime with `undefined`, and + // the next fire would surface a frozen DB-column value to the + // customer (since this PR stops writing that column). Pure cron + // math, no DB read on top of the existing instance load — the + // recovery loop already pays the cost of loading the instance. + let lastScheduleTime = params.lastScheduleTime; + if (lastScheduleTime === undefined) { + try { + const cronPrev = previousScheduledTimestamp( + instance.taskSchedule.generatorExpression, + instance.taskSchedule.timezone + ); + // Guarded against the cron's previous slot predating the + // instance itself — for a brand-new schedule, the slot is from + // before the schedule existed, so `undefined` is the honest + // answer (preserves the `if (!payload.lastTimestamp)` first-run + // sentinel customers rely on). + if (cronPrev.getTime() > instance.createdAt.getTime()) { + lastScheduleTime = cronPrev; + } + } catch { + // Malformed cron — leave undefined. + } + } + await this.enqueueScheduledTask( params.instanceId, nextScheduledTimestamp, - params.lastScheduleTime + lastScheduleTime ); // Record metrics @@ -800,39 +827,16 @@ export class ScheduleEngine { return "skipped"; } - // Approximate the previous fire from the cron expression itself rather - // than reading state from the DB. For a continuously-running schedule - // this equals the actual last fire time. For paused-then-resumed - // schedules or recently-edited cron expressions the value will be - // approximate — same trade-off the dashboard "Last run" cell accepts. - // Guarded against the schedule not having existed long enough to have - // fired (cron's previous slot before instance creation), and against - // cron-parser throwing on malformed expressions. Pure cron math, no DB - // read — recovery fan-outs (Redis crash, restart storms) must not add - // load to hot tables. - let lastScheduleTime: Date | undefined; - try { - const cronPrev = previousScheduledTimestamp( - schedule.generatorExpression, - schedule.timezone - ); - if (cronPrev.getTime() > instance.createdAt.getTime()) { - lastScheduleTime = cronPrev; - } - } catch { - lastScheduleTime = undefined; - } - this.logger.debug("No job found for instance, registering next run", { instanceId: instance.id, schedule, - lastScheduleTime: lastScheduleTime?.toISOString(), }); - await this.registerNextTaskScheduleInstance({ - instanceId: instance.id, - lastScheduleTime, - }); + // No `lastScheduleTime` passed — `registerNextTaskScheduleInstance` + // will derive it from the cron's previous slot (with a createdAt + // guard) so the post-recovery fire reports an accurate + // `payload.lastTimestamp`. + await this.registerNextTaskScheduleInstance({ instanceId: instance.id }); return "recovered"; } diff --git a/internal-packages/schedule-engine/test/scheduleRecovery.test.ts b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts index 586ea73fc3a..40ce4b1bba6 100644 --- a/internal-packages/schedule-engine/test/scheduleRecovery.test.ts +++ b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts @@ -386,4 +386,194 @@ describe("Schedule Recovery", () => { } } ); + + // External-caller backward-compat. Deploy sync (`syncDeclarativeSchedules`) + // and schedule upsert both call `registerNextTaskScheduleInstance` with no + // `lastScheduleTime`. They run on every app deploy and on every cron edit. + // For an existing-and-firing schedule, the call must NOT clobber the + // worker payload's `lastScheduleTime` with `undefined` — otherwise the + // next fire would surface a stale frozen DB-column value to the customer + // (since this PR stops writing that column). The function must derive a + // sensible `lastScheduleTime` from the cron expression's previous slot + // when the caller doesn't pass one. + containerTest( + "should derive lastScheduleTime from cron when external callers omit it", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { concurrency: 1, disabled: true, pollIntervalMs: 1000 }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async () => ({ success: true }), + isDevEnvironmentConnectedHandler: vi.fn().mockResolvedValue(true), + }); + + try { + const organization = await prisma.organization.create({ + data: { title: "External Caller Org", slug: "external-caller-org" }, + }); + const project = await prisma.project.create({ + data: { + name: "External Caller Project", + slug: "external-caller-project", + externalRef: "external-caller-ref", + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "external-caller-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_external_1234", + pkApiKey: "pk_external_1234", + shortcode: "external-short", + }, + }); + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_external_caller", + taskIdentifier: "external-caller-task", + projectId: project.id, + deduplicationKey: "external-caller-dedup", + userProvidedDeduplicationKey: false, + generatorExpression: "*/5 * * * *", + generatorDescription: "Every 5 minutes", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + }, + }); + + // Backdate the instance so the cron's previous slot postdates + // createdAt — this simulates a long-running schedule, the case + // Devin flagged (deploy clobbers lastScheduleTime, post-deploy fire + // would otherwise read from a frozen DB column). + const longAgo = new Date(Date.now() - 24 * 60 * 60 * 1000); + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + projectId: project.id, + active: true, + }, + }); + await prisma.taskScheduleInstance.update({ + where: { id: scheduleInstance.id }, + data: { createdAt: longAgo }, + }); + + // External-caller pattern — no lastScheduleTime. + await engine.registerNextTaskScheduleInstance({ + instanceId: scheduleInstance.id, + }); + + const job = await engine.getJob(`scheduled-task-instance:${scheduleInstance.id}`); + expect(job).not.toBeNull(); + // The function should have derived lastScheduleTime from cron, + // putting a real timestamp into the worker payload rather than + // undefined. The Redis worker stores payloads as JSON, so the value + // is a string when read back here — Zod re-coerces it to Date on + // dequeue (workerCatalog uses `z.coerce.date()`). + const enqueuedLastScheduleTime = (job?.item as { lastScheduleTime?: string }) + .lastScheduleTime; + expect(enqueuedLastScheduleTime).toBeDefined(); + const derived = new Date(enqueuedLastScheduleTime!); + // The derived value should match the cron's previous slot — for + // `*/5 * * * *`, a 5-minute boundary in the recent past. + expect(derived.getTime()).toBeLessThan(Date.now()); + expect(derived.getUTCSeconds()).toBe(0); + expect(derived.getUTCMinutes() % 5).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + // Brand-new schedules must NOT receive a cron-derived lastScheduleTime — + // the cron's previous slot predates the instance, so it's not a real + // previous fire. The first-run sentinel (`if (!payload.lastTimestamp)`) + // must keep working. + containerTest( + "should leave lastScheduleTime undefined for brand-new schedules", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { concurrency: 1, disabled: true, pollIntervalMs: 1000 }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async () => ({ success: true }), + isDevEnvironmentConnectedHandler: vi.fn().mockResolvedValue(true), + }); + + try { + const organization = await prisma.organization.create({ + data: { title: "Brand New Org", slug: "brand-new-org" }, + }); + const project = await prisma.project.create({ + data: { + name: "Brand New Project", + slug: "brand-new-project", + externalRef: "brand-new-ref", + organizationId: organization.id, + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "brand-new-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_brandnew_1234", + pkApiKey: "pk_brandnew_1234", + shortcode: "brandnew-short", + }, + }); + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_brand_new", + taskIdentifier: "brand-new-task", + projectId: project.id, + deduplicationKey: "brand-new-dedup", + userProvidedDeduplicationKey: false, + // Hourly cron — the previous slot is plausibly minutes-to-an-hour + // ago, comfortably predating an instance just created. + generatorExpression: "0 * * * *", + generatorDescription: "Hourly", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + }, + }); + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + projectId: project.id, + active: true, + }, + }); + + await engine.registerNextTaskScheduleInstance({ + instanceId: scheduleInstance.id, + }); + + const job = await engine.getJob(`scheduled-task-instance:${scheduleInstance.id}`); + expect(job).not.toBeNull(); + const enqueuedLastScheduleTime = (job?.item as { lastScheduleTime?: Date }).lastScheduleTime; + // Brand-new schedule: cron's previous slot predates instance.createdAt, + // so the function leaves lastScheduleTime undefined — the first fire + // will report `payload.lastTimestamp: undefined` and customer first-run + // sentinel patterns keep working. + expect(enqueuedLastScheduleTime).toBeUndefined(); + } finally { + await engine.quit(); + } + } + ); });