diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index ad3626e..8f384a6 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -1,36 +1,40 @@ -import { randomUUID } from 'node:crypto' -import { Redis, type RedisOptions } from 'ioredis' -import { DEFAULT_PRIORITY } from '../constants.js' -import { calculateScore } from '../utils.js' -import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' -import type { DedupOutcome } from '../types/main.js' +import { randomUUID } from 'node:crypto'; +import { Redis, type RedisOptions } from 'ioredis'; +import { DEFAULT_PRIORITY } from '../constants.js'; +import { calculateScore } from '../utils.js'; +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js'; +import type { DedupOutcome } from '../types/main.js'; import type { - JobData, - JobRecord, - JobRetention, - ScheduleConfig, - ScheduleData, - ScheduleListOptions, -} from '../types/main.js' -import { resolveRetention } from '../utils.js' -import { encodeRedisJobPayloadOverlay, hydrateRedisJob } from './redis_job_storage.js' + JobData, + JobRecord, + JobRetention, + ScheduleConfig, + ScheduleData, + ScheduleListOptions, +} from '../types/main.js'; +import { resolveRetention } from '../utils.js'; import { - ACQUIRE_JOB_SCRIPT, - CLAIM_SCHEDULE_SCRIPT, - FINALIZE_JOB_SCRIPT, - GET_JOB_SCRIPT, - PUSH_DEDUP_JOB_SCRIPT, - PUSH_DELAYED_JOB_SCRIPT, - PUSH_JOB_SCRIPT, - RECOVER_STALLED_JOBS_SCRIPT, - REMOVE_JOB_SCRIPT, - RETRY_JOB_SCRIPT, -} from './redis_scripts.js' - -const redisKey = 'jobs' -const schedulesKey = 'schedules' -const schedulesIndexKey = 'schedules::index' -type RedisConfig = Redis | RedisOptions + encodeRedisJobPayloadOverlay, + hydrateRedisJob, +} from './redis_job_storage.js'; +import { + ACQUIRE_JOB_SCRIPT, + CLAIM_SCHEDULE_SCRIPT, + FINALIZE_JOB_SCRIPT, + GET_JOB_SCRIPT, + PUSH_DEDUP_JOB_SCRIPT, + PUSH_DELAYED_JOB_SCRIPT, + PUSH_JOB_SCRIPT, + RECOVER_STALLED_JOBS_SCRIPT, + REMOVE_JOB_SCRIPT, + RETRY_JOB_SCRIPT, +} from './redis_scripts.js'; + +const redisKey = 'jobs'; +const schedulesKey = 'schedules'; +const schedulesIndexKey = 'schedules::index'; +const schedulesDueKey = 'schedules::due'; +type RedisConfig = Redis | RedisOptions; /** * Create a new Redis adapter factory. @@ -43,551 +47,676 @@ type RedisConfig = Redis | RedisOptions * managing the connection lifecycle. */ export function redis(config?: RedisConfig) { - return () => { - if (config instanceof Redis) { - return new RedisAdapter(config, false) - } - - const options: RedisOptions = { - host: 'localhost', - port: 6379, - keyPrefix: 'boringnode::queue::', - db: 0, - ...config, - } - - const connection = new Redis(options) - return new RedisAdapter(connection, true) - } + return () => { + if (config instanceof Redis) { + return new RedisAdapter(config, false); + } + + const options: RedisOptions = { + host: 'localhost', + port: 6379, + keyPrefix: 'boringnode::queue::', + db: 0, + ...config, + }; + + const connection = new Redis(options); + return new RedisAdapter(connection, true); + }; } export class RedisAdapter implements Adapter { - readonly #connection: Redis - readonly #ownsConnection: boolean - #workerId: string = '' - - constructor(connection: Redis, ownsConnection: boolean = false) { - this.#connection = connection - this.#ownsConnection = ownsConnection - } - - #getKeys(queue: string) { - return { - data: `${redisKey}::${queue}::data`, - pending: `${redisKey}::${queue}::pending`, - delayed: `${redisKey}::${queue}::delayed`, - active: `${redisKey}::${queue}::active`, - overlay: `${redisKey}::${queue}::metadata`, - completed: `${redisKey}::${queue}::completed`, - completedIndex: `${redisKey}::${queue}::completed::index`, - failed: `${redisKey}::${queue}::failed`, - failedIndex: `${redisKey}::${queue}::failed::index`, - } - } - - #getDedupKey(queue: string, dedupId: string): string { - return `${this.#getDedupPrefix(queue)}${dedupId}` - } - - #getDedupPrefix(queue: string): string { - return `${redisKey}::${queue}::dedup::` - } - - setWorkerId(workerId: string): void { - this.#workerId = workerId - } - - async destroy(): Promise { - if (this.#ownsConnection) { - await this.#connection.quit() - } - } - - pop(): Promise { - return this.popFrom('default') - } - - async popFrom(queue: string): Promise { - const keys = this.#getKeys(queue) - const now = Date.now() - - const result = await this.#connection.eval( - ACQUIRE_JOB_SCRIPT, - 5, - keys.data, - keys.pending, - keys.active, - keys.delayed, - keys.overlay, - this.#workerId, - now.toString() - ) - - if (!result) { - return null - } - - const { data, overlay, acquiredAt } = JSON.parse(result as string) as { - data: string - overlay?: string - acquiredAt: number - } - - return { ...hydrateRedisJob(data, overlay), acquiredAt } - } - - async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { - const keys = this.#getKeys(queue) - const dedupPrefix = this.#getDedupPrefix(queue) - const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) - - if (!keep) { + readonly #connection: Redis; + readonly #ownsConnection: boolean; + #workerId: string = ''; + #dueIndexReady = false; + + constructor(connection: Redis, ownsConnection: boolean = false) { + this.#connection = connection; + this.#ownsConnection = ownsConnection; + } + + #getKeys(queue: string) { + return { + data: `${redisKey}::${queue}::data`, + pending: `${redisKey}::${queue}::pending`, + delayed: `${redisKey}::${queue}::delayed`, + active: `${redisKey}::${queue}::active`, + overlay: `${redisKey}::${queue}::metadata`, + completed: `${redisKey}::${queue}::completed`, + completedIndex: `${redisKey}::${queue}::completed::index`, + failed: `${redisKey}::${queue}::failed`, + failedIndex: `${redisKey}::${queue}::failed::index`, + }; + } + + #getDedupKey(queue: string, dedupId: string): string { + return `${this.#getDedupPrefix(queue)}${dedupId}`; + } + + #getDedupPrefix(queue: string): string { + return `${redisKey}::${queue}::dedup::`; + } + + setWorkerId(workerId: string): void { + this.#workerId = workerId; + } + + async destroy(): Promise { + if (this.#ownsConnection) { + await this.#connection.quit(); + } + } + + pop(): Promise { + return this.popFrom('default'); + } + + async popFrom(queue: string): Promise { + const keys = this.#getKeys(queue); + const now = Date.now(); + + const result = await this.#connection.eval( + ACQUIRE_JOB_SCRIPT, + 5, + keys.data, + keys.pending, + keys.active, + keys.delayed, + keys.overlay, + this.#workerId, + now.toString(), + ); + + if (!result) { + return null; + } + + const { data, overlay, acquiredAt } = JSON.parse(result as string) as { + data: string; + overlay?: string; + acquiredAt: number; + }; + + return { ...hydrateRedisJob(data, overlay), acquiredAt }; + } + + async completeJob( + jobId: string, + queue: string, + removeOnComplete?: JobRetention, + ): Promise { + const keys = this.#getKeys(queue); + const dedupPrefix = this.#getDedupPrefix(queue); + const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete); + + if (!keep) { + await this.#connection.eval( + REMOVE_JOB_SCRIPT, + 3, + keys.data, + keys.active, + keys.overlay, + jobId, + dedupPrefix, + ); + return; + } + await this.#connection.eval( - REMOVE_JOB_SCRIPT, - 3, - keys.data, - keys.active, - keys.overlay, - jobId, - dedupPrefix - ) - return - } - - await this.#connection.eval( - FINALIZE_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.completed, - keys.completedIndex, - keys.overlay, - jobId, - Date.now().toString(), - maxAge.toString(), - maxCount.toString(), - '', - dedupPrefix - ) - } - - async failJob( - jobId: string, - queue: string, - error?: Error, - removeOnFail?: JobRetention - ): Promise { - const keys = this.#getKeys(queue) - const dedupPrefix = this.#getDedupPrefix(queue) - const { keep, maxAge, maxCount } = resolveRetention(removeOnFail) - - if (!keep) { + FINALIZE_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.completed, + keys.completedIndex, + keys.overlay, + jobId, + Date.now().toString(), + maxAge.toString(), + maxCount.toString(), + '', + dedupPrefix, + ); + } + + async failJob( + jobId: string, + queue: string, + error?: Error, + removeOnFail?: JobRetention, + ): Promise { + const keys = this.#getKeys(queue); + const dedupPrefix = this.#getDedupPrefix(queue); + const { keep, maxAge, maxCount } = resolveRetention(removeOnFail); + + if (!keep) { + await this.#connection.eval( + REMOVE_JOB_SCRIPT, + 3, + keys.data, + keys.active, + keys.overlay, + jobId, + dedupPrefix, + ); + return; + } + + await this.#connection.eval( + FINALIZE_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.failed, + keys.failedIndex, + keys.overlay, + jobId, + Date.now().toString(), + maxAge.toString(), + maxCount.toString(), + error?.message || '', + dedupPrefix, + ); + } + + async retryJob(jobId: string, queue: string, retryAt?: Date): Promise { + const keys = this.#getKeys(queue); + const now = Date.now(); + await this.#connection.eval( - REMOVE_JOB_SCRIPT, - 3, - keys.data, - keys.active, - keys.overlay, - jobId, - dedupPrefix - ) - return - } - - await this.#connection.eval( - FINALIZE_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.failed, - keys.failedIndex, - keys.overlay, - jobId, - Date.now().toString(), - maxAge.toString(), - maxCount.toString(), - error?.message || '', - dedupPrefix - ) - } - - async retryJob(jobId: string, queue: string, retryAt?: Date): Promise { - const keys = this.#getKeys(queue) - const now = Date.now() - - await this.#connection.eval( - RETRY_JOB_SCRIPT, - 5, - keys.data, - keys.active, - keys.pending, - keys.delayed, - keys.overlay, - jobId, - retryAt ? retryAt.getTime().toString() : '0', - now.toString() - ) - } - - async getJob(jobId: string, queue: string): Promise { - const keys = this.#getKeys(queue) - - const result = await this.#connection.eval( - GET_JOB_SCRIPT, - 7, - keys.data, - keys.pending, - keys.delayed, - keys.active, - keys.completed, - keys.failed, - keys.overlay, - jobId - ) - - if (!result) { - return null - } - - const record = JSON.parse(result as string) as Omit & { - data: string - overlay?: string - } - - return { ...record, data: hydrateRedisJob(record.data, record.overlay) } - } - - push(jobData: JobData): Promise { - return this.pushOn('default', jobData) - } - - pushLater(jobData: JobData, delay: number): Promise { - return this.pushLaterOn('default', jobData, delay) - } - - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - const keys = this.#getKeys(queue) - const executeAt = Date.now() + delay - - if (jobData.dedup) { - const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) - const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay(jobData.payload) - const result = (await this.#connection.eval( - PUSH_DEDUP_JOB_SCRIPT, - 5, - keys.data, - keys.delayed, - dedupKey, - keys.pending, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - executeAt.toString(), - (jobData.dedup.ttl ?? 0).toString(), - jobData.dedup.extend ? '1' : '0', - jobData.dedup.replace ? '1' : '0', - payloadData, - payloadIsUndefined - )) as [string, string] - return { outcome: result[0] as DedupOutcome, jobId: result[1] } - } - - await this.#connection.eval( - PUSH_DELAYED_JOB_SCRIPT, - 3, - keys.data, - keys.delayed, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - executeAt.toString() - ) - } - - async pushOn(queue: string, jobData: JobData): Promise { - const keys = this.#getKeys(queue) - const priority = jobData.priority ?? DEFAULT_PRIORITY - const timestamp = Date.now() - const score = calculateScore(priority, timestamp) - - if (jobData.dedup) { - const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) - const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay(jobData.payload) - const result = (await this.#connection.eval( - PUSH_DEDUP_JOB_SCRIPT, - 5, - keys.data, - keys.pending, - dedupKey, - keys.delayed, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - score.toString(), - (jobData.dedup.ttl ?? 0).toString(), - jobData.dedup.extend ? '1' : '0', - jobData.dedup.replace ? '1' : '0', - payloadData, - payloadIsUndefined - )) as [string, string] - return { outcome: result[0] as DedupOutcome, jobId: result[1] } - } - - await this.#connection.eval( - PUSH_JOB_SCRIPT, - 3, - keys.data, - keys.pending, - keys.overlay, - jobData.id, - JSON.stringify(jobData), - score.toString() - ) - } - - pushMany(jobs: JobData[]): Promise { - return this.pushManyOn('default', jobs) - } - - async pushManyOn(queue: string, jobs: JobData[]): Promise { - if (jobs.length === 0) return - - if (jobs.some((j) => j.dedup)) { - throw new Error('dedup is not supported in batch dispatch; use single dispatch') - } - - const keys = this.#getKeys(queue) - const now = Date.now() - const multi = this.#connection.multi() - - for (const job of jobs) { - const priority = job.priority ?? DEFAULT_PRIORITY - const score = calculateScore(priority, now) - multi.hdel(keys.overlay, job.id) - multi.hset(keys.data, job.id, JSON.stringify(job)) - multi.zadd(keys.pending, score, job.id) - } - - await multi.exec() - } - - size(): Promise { - return this.sizeOf('default') - } - - sizeOf(queue: string): Promise { - const keys = this.#getKeys(queue) - return this.#connection.zcard(keys.pending) - } - - async recoverStalledJobs( - queue: string, - stalledThreshold: number, - maxStalledCount: number - ): Promise { - const keys = this.#getKeys(queue) - const now = Date.now() - - const recovered = await this.#connection.eval( - RECOVER_STALLED_JOBS_SCRIPT, - 4, - keys.data, - keys.active, - keys.pending, - keys.overlay, - now.toString(), - stalledThreshold.toString(), - maxStalledCount.toString(), - this.#getDedupPrefix(queue) - ) - - return recovered as number - } - - async upsertSchedule(config: ScheduleConfig): Promise { - const id = config.id ?? randomUUID() - const now = Date.now() - const scheduleKey = `${schedulesKey}::${id}` - const [existingRunCount, existingCreatedAt] = await this.#connection.hmget( - scheduleKey, - 'run_count', - 'created_at' - ) - - const scheduleData: Record = { - id, - name: config.name, - payload: JSON.stringify(config.payload), - timezone: config.timezone, - status: 'active', - run_count: existingRunCount ?? '0', - created_at: existingCreatedAt ?? now.toString(), - } - - if (config.cronExpression !== undefined) scheduleData.cron_expression = config.cronExpression - if (config.everyMs !== undefined) scheduleData.every_ms = config.everyMs.toString() - if (config.from !== undefined) scheduleData.from_date = config.from.getTime().toString() - if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() - if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() - - // Upsert schedule and clear stale optional fields from previous config. - await this.#connection - .multi() - .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') - .hset(scheduleKey, scheduleData) - .sadd(schedulesIndexKey, id) - .exec() - - return id - } - - /** - * @deprecated Use `upsertSchedule` instead. - */ - createSchedule(config: ScheduleConfig): Promise { - return this.upsertSchedule(config) - } - - async getSchedule(id: string): Promise { - const scheduleKey = `${schedulesKey}::${id}` - const data = await this.#connection.hgetall(scheduleKey) - - if (!data || Object.keys(data).length === 0) { - return null - } - - return this.#hashToScheduleData(data) - } - - async listSchedules(options?: ScheduleListOptions): Promise { - const ids = await this.#connection.smembers(schedulesIndexKey) - if (ids.length === 0) { - return [] - } - - const pipeline = this.#connection.pipeline() - - for (const id of ids) { - pipeline.hgetall(`${schedulesKey}::${id}`) - } - - const results = await pipeline.exec() - if (!results) { - return [] - } - - const schedules: ScheduleData[] = [] - - for (const [, data] of results) { + RETRY_JOB_SCRIPT, + 5, + keys.data, + keys.active, + keys.pending, + keys.delayed, + keys.overlay, + jobId, + retryAt ? retryAt.getTime().toString() : '0', + now.toString(), + ); + } + + async getJob(jobId: string, queue: string): Promise { + const keys = this.#getKeys(queue); + + const result = await this.#connection.eval( + GET_JOB_SCRIPT, + 7, + keys.data, + keys.pending, + keys.delayed, + keys.active, + keys.completed, + keys.failed, + keys.overlay, + jobId, + ); + + if (!result) { + return null; + } + + const record = JSON.parse(result as string) as Omit & { + data: string; + overlay?: string; + }; + + return { ...record, data: hydrateRedisJob(record.data, record.overlay) }; + } + + push(jobData: JobData): Promise { + return this.pushOn('default', jobData); + } + + pushLater(jobData: JobData, delay: number): Promise { + return this.pushLaterOn('default', jobData, delay); + } + + async pushLaterOn( + queue: string, + jobData: JobData, + delay: number, + ): Promise { + const keys = this.#getKeys(queue); + const executeAt = Date.now() + delay; + + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id); + const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay( + jobData.payload, + ); + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 5, + keys.data, + keys.delayed, + dedupKey, + keys.pending, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0', + payloadData, + payloadIsUndefined, + )) as [string, string]; + return { outcome: result[0] as DedupOutcome, jobId: result[1] }; + } + + await this.#connection.eval( + PUSH_DELAYED_JOB_SCRIPT, + 3, + keys.data, + keys.delayed, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + ); + } + + async pushOn(queue: string, jobData: JobData): Promise { + const keys = this.#getKeys(queue); + const priority = jobData.priority ?? DEFAULT_PRIORITY; + const timestamp = Date.now(); + const score = calculateScore(priority, timestamp); + + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id); + const [payloadData, payloadIsUndefined] = encodeRedisJobPayloadOverlay( + jobData.payload, + ); + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 5, + keys.data, + keys.pending, + dedupKey, + keys.delayed, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + score.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0', + payloadData, + payloadIsUndefined, + )) as [string, string]; + return { outcome: result[0] as DedupOutcome, jobId: result[1] }; + } + + await this.#connection.eval( + PUSH_JOB_SCRIPT, + 3, + keys.data, + keys.pending, + keys.overlay, + jobData.id, + JSON.stringify(jobData), + score.toString(), + ); + } + + pushMany(jobs: JobData[]): Promise { + return this.pushManyOn('default', jobs); + } + + async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.length === 0) return; + + if (jobs.some((j) => j.dedup)) { + throw new Error( + 'dedup is not supported in batch dispatch; use single dispatch', + ); + } + + const keys = this.#getKeys(queue); + const now = Date.now(); + const multi = this.#connection.multi(); + + for (const job of jobs) { + const priority = job.priority ?? DEFAULT_PRIORITY; + const score = calculateScore(priority, now); + multi.hdel(keys.overlay, job.id); + multi.hset(keys.data, job.id, JSON.stringify(job)); + multi.zadd(keys.pending, score, job.id); + } + + await multi.exec(); + } + + size(): Promise { + return this.sizeOf('default'); + } + + sizeOf(queue: string): Promise { + const keys = this.#getKeys(queue); + return this.#connection.zcard(keys.pending); + } + + async recoverStalledJobs( + queue: string, + stalledThreshold: number, + maxStalledCount: number, + ): Promise { + const keys = this.#getKeys(queue); + const now = Date.now(); + + const recovered = await this.#connection.eval( + RECOVER_STALLED_JOBS_SCRIPT, + 4, + keys.data, + keys.active, + keys.pending, + keys.overlay, + now.toString(), + stalledThreshold.toString(), + maxStalledCount.toString(), + this.#getDedupPrefix(queue), + ); + + return recovered as number; + } + + async upsertSchedule(config: ScheduleConfig): Promise { + const id = config.id ?? randomUUID(); + const now = Date.now(); + const scheduleKey = `${schedulesKey}::${id}`; + const [existingRunCount, existingCreatedAt, existingNextRunAt] = + await this.#connection.hmget( + scheduleKey, + 'run_count', + 'created_at', + 'next_run_at', + ); + + const scheduleData: Record = { + id, + name: config.name, + payload: JSON.stringify(config.payload), + timezone: config.timezone, + status: 'active', + run_count: existingRunCount ?? '0', + created_at: existingCreatedAt ?? now.toString(), + }; + + if (config.cronExpression !== undefined) + scheduleData.cron_expression = config.cronExpression; + if (config.everyMs !== undefined) + scheduleData.every_ms = config.everyMs.toString(); + if (config.from !== undefined) + scheduleData.from_date = config.from.getTime().toString(); + if (config.to !== undefined) + scheduleData.to_date = config.to.getTime().toString(); + if (config.limit !== undefined) + scheduleData.run_limit = config.limit.toString(); + + const multi = this.#connection + .multi() + .hdel( + scheduleKey, + 'cron_expression', + 'every_ms', + 'from_date', + 'to_date', + 'run_limit', + ) + .hset(scheduleKey, scheduleData) + .sadd(schedulesIndexKey, id); + + if (existingNextRunAt) { + multi.zadd( + schedulesDueKey, + Number.parseInt(existingNextRunAt, 10), + id, + ); + } + + await multi.exec(); + + return id; + } + + /** + * @deprecated Use `upsertSchedule` instead. + */ + createSchedule(config: ScheduleConfig): Promise { + return this.upsertSchedule(config); + } + + async getSchedule(id: string): Promise { + const scheduleKey = `${schedulesKey}::${id}`; + const data = await this.#connection.hgetall(scheduleKey); + if (!data || Object.keys(data).length === 0) { - continue + return null; + } + + return this.#hashToScheduleData(data); + } + + async listSchedules(options?: ScheduleListOptions): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey); + if (ids.length === 0) { + return []; } - const schedule = this.#hashToScheduleData(data as Record) + const pipeline = this.#connection.pipeline(); + + for (const id of ids) { + pipeline.hgetall(`${schedulesKey}::${id}`); + } - // Filter by status if provided - if (options?.status && schedule.status !== options.status) { - continue + const results = await pipeline.exec(); + if (!results) { + return []; } - schedules.push(schedule) - } - - return schedules - } - - async updateSchedule( - id: string, - updates: Partial> - ): Promise { - const scheduleKey = `${schedulesKey}::${id}` - const data: Record = {} - - if (updates.status !== undefined) data.status = updates.status - if (updates.nextRunAt !== undefined) { - data.next_run_at = updates.nextRunAt ? updates.nextRunAt.getTime().toString() : '' - } - if (updates.lastRunAt !== undefined) { - data.last_run_at = updates.lastRunAt ? updates.lastRunAt.getTime().toString() : '' - } - if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() - - if (Object.keys(data).length > 0) { - await this.#connection.hset(scheduleKey, data) - } - } - - async deleteSchedule(id: string): Promise { - const scheduleKey = `${schedulesKey}::${id}` - await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec() - } - - async claimDueSchedule(): Promise { - const now = Date.now() - const result = await this.#connection.eval( - CLAIM_SCHEDULE_SCRIPT, - 2, - schedulesIndexKey, - `${schedulesKey}::`, - now.toString() - ) - - if (!result) { - return null - } - - const data = JSON.parse(result as string) as Record - - // If cron expression, we need to recalculate next_run_at properly. - // The Lua script only handles simple interval; cron needs JS cron-parser. - // This is safe because the schedule is already claimed (run_count incremented). - if (data.cron_expression) { - const { CronExpressionParser } = await import('cron-parser') - const cron = CronExpressionParser.parse(data.cron_expression, { - currentDate: new Date(now), - tz: data.timezone || 'UTC', - }) - const nextRun = cron.next().toDate().getTime() - - // Check limits before updating - const runCount = Number.parseInt(data.run_count || '0', 10) + 1 - const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null - const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null - - let newNextRunAt: number | string = nextRun - - if (runLimit !== null && runCount >= runLimit) { - newNextRunAt = '' - } else if (toDate && nextRun > toDate) { - newNextRunAt = '' + const schedules: ScheduleData[] = []; + + for (const [, data] of results) { + if (!data || Object.keys(data).length === 0) { + continue; + } + + const schedule = this.#hashToScheduleData( + data as Record, + ); + + // Filter by status if provided + if (options?.status && schedule.status !== options.status) { + continue; + } + + schedules.push(schedule); + } + + return schedules; + } + + async updateSchedule( + id: string, + updates: Partial< + Pick + >, + ): Promise { + const scheduleKey = `${schedulesKey}::${id}`; + const data: Record = {}; + + if (updates.status !== undefined) data.status = updates.status; + if (updates.nextRunAt !== undefined) { + data.next_run_at = updates.nextRunAt + ? updates.nextRunAt.getTime().toString() + : ''; + } + if (updates.lastRunAt !== undefined) { + data.last_run_at = updates.lastRunAt + ? updates.lastRunAt.getTime().toString() + : ''; + } + if (updates.runCount !== undefined) + data.run_count = updates.runCount.toString(); + + if (Object.keys(data).length === 0) return; + + const multi = this.#connection.multi().hset(scheduleKey, data); + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id); + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id); + } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget( + scheduleKey, + 'next_run_at', + ); + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id); + } + } + + await multi.exec(); + } + + async deleteSchedule(id: string): Promise { + const scheduleKey = `${schedulesKey}::${id}`; + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec(); + } + + async #ensureDueIndex(): Promise { + if (this.#dueIndexReady) return; + await this.backfillDueIndex(); + this.#dueIndexReady = true; + } + + async claimDueSchedule(): Promise { + await this.#ensureDueIndex(); + + const now = Date.now(); + const result = await this.#connection.eval( + CLAIM_SCHEDULE_SCRIPT, + 2, + schedulesDueKey, + `${schedulesKey}::`, + now.toString(), + ); + + if (!result) { + return null; + } + + const data = JSON.parse(result as string) as Record; + + // If cron expression, we need to recalculate next_run_at properly. + // The Lua script only handles simple interval; cron needs JS cron-parser. + // This is safe because the schedule is already claimed (run_count incremented). + if (data.cron_expression) { + const { CronExpressionParser } = await import('cron-parser'); + const cron = CronExpressionParser.parse(data.cron_expression, { + currentDate: new Date(now), + tz: data.timezone || 'UTC', + }); + const nextRun = cron.next().toDate().getTime(); + + const runCount = Number.parseInt(data.run_count || '0', 10) + 1; + const runLimit = data.run_limit + ? Number.parseInt(data.run_limit, 10) + : null; + const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null; + + let newNextRunAt: number | string = nextRun; + + if (runLimit !== null && runCount >= runLimit) { + newNextRunAt = ''; + } else if (toDate && nextRun > toDate) { + newNextRunAt = ''; + } + + const scheduleKey = `${schedulesKey}::${data.id}`; + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()); + + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id); + } else { + multi.zrem(schedulesDueKey, data.id); + } + + await multi.exec(); + } + + return this.#hashToScheduleData(data); + } + + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey); + if (ids.length === 0) return 0; + + const pipeline = this.#connection.pipeline(); + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status'); + } + const results = await pipeline.exec(); + if (!results) return 0; + + const addPipeline = this.#connection.pipeline(); + let count = 0; + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i]; + if (err || !values) continue; + const [nextRunAt, status] = values as [string | null, string | null]; + if (nextRunAt && status === 'active') { + addPipeline.zadd( + schedulesDueKey, + Number.parseInt(nextRunAt, 10), + ids[i], + ); + count++; + } } - await this.#connection.hset( - `${schedulesKey}::${data.id}`, - 'next_run_at', - newNextRunAt.toString() - ) - } - - return this.#hashToScheduleData(data) - } - - #hashToScheduleData(data: Record): ScheduleData { - return { - id: data.id, - name: data.name, - payload: JSON.parse(data.payload || '{}'), - cronExpression: data.cron_expression || null, - everyMs: data.every_ms ? Number.parseInt(data.every_ms, 10) : null, - timezone: data.timezone || 'UTC', - from: data.from_date ? new Date(Number.parseInt(data.from_date, 10)) : null, - to: data.to_date ? new Date(Number.parseInt(data.to_date, 10)) : null, - limit: data.run_limit ? Number.parseInt(data.run_limit, 10) : null, - runCount: Number.parseInt(data.run_count || '0', 10), - nextRunAt: data.next_run_at ? new Date(Number.parseInt(data.next_run_at, 10)) : null, - lastRunAt: data.last_run_at ? new Date(Number.parseInt(data.last_run_at, 10)) : null, - status: (data.status as 'active' | 'paused') || 'active', - createdAt: data.created_at ? new Date(Number.parseInt(data.created_at, 10)) : new Date(), - } - } + if (count > 0) await addPipeline.exec(); + return count; + } + + #hashToScheduleData(data: Record): ScheduleData { + return { + id: data.id, + name: data.name, + payload: JSON.parse(data.payload || '{}'), + cronExpression: data.cron_expression || null, + everyMs: data.every_ms ? Number.parseInt(data.every_ms, 10) : null, + timezone: data.timezone || 'UTC', + from: data.from_date + ? new Date(Number.parseInt(data.from_date, 10)) + : null, + to: data.to_date ? new Date(Number.parseInt(data.to_date, 10)) : null, + limit: data.run_limit ? Number.parseInt(data.run_limit, 10) : null, + runCount: Number.parseInt(data.run_count || '0', 10), + nextRunAt: data.next_run_at + ? new Date(Number.parseInt(data.next_run_at, 10)) + : null, + lastRunAt: data.last_run_at + ? new Date(Number.parseInt(data.last_run_at, 10)) + : null, + status: (data.status as 'active' | 'paused') || 'active', + createdAt: data.created_at + ? new Date(Number.parseInt(data.created_at, 10)) + : new Date(), + }; + } } diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index e0ff6dd..2f8cedf 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -1,4 +1,4 @@ -import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js' +import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js'; /** * Lua script for pushing a job to the queue. @@ -18,7 +18,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', pending_key, score, job_id) return 1 -` +`; /** * Lua script for pushing a dedup job. @@ -80,7 +80,7 @@ ${REDIS_DEDUP_LUA} redis.call('PEXPIRE', dedup_key, ttl) end return {'added', job_id} -` +`; /** * Lua script for pushing a delayed job. @@ -100,7 +100,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', delayed_key, execute_at, job_id) return 1 -` +`; /** * Lua script for atomic job acquisition. @@ -158,7 +158,7 @@ ${REDIS_JOB_STORAGE_LUA} return encode_job_result(job_data, overlay_key, job_id, { acquiredAt = now }) -` +`; /** * Lua script for removing a job completely (no history). @@ -193,7 +193,7 @@ ${REDIS_JOB_STORAGE_LUA} delete_job_data(data_key, overlay_key, job_id) return 1 -` +`; /** * Lua script for finalizing a job in history. @@ -277,7 +277,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for retrying a job. @@ -330,7 +330,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for recovering stalled jobs. @@ -399,7 +399,7 @@ ${REDIS_JOB_STORAGE_LUA} end return recovered -` +`; /** * Lua script for getting a job record with its status. @@ -458,77 +458,99 @@ ${REDIS_JOB_STORAGE_LUA} finishedAt = finished_at, error = error_msg }) -` +`; /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ export const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #candidates == 0 then + return nil + end - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' + end - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil -` +`; diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index 7285c3d..e061869 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -99,6 +99,9 @@ test.group('Adapter | Redis', (group) => { await adapter.updateSchedule(id, { nextRunAt: futureRunAt }) } + // Warm the due-index backfill so it doesn't count against the spy + await adapter.claimDueSchedule() + const { result: claimed, writes } = await withRedisWriteSpy({ connection, run: () => adapter.claimDueSchedule(),