-
-
Notifications
You must be signed in to change notification settings - Fork 10
feat(billing): distributed lock for multi-pod crons #3688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
acce10d
feat(lib): add distributed lock primitive for multi-pod crons
PierreBrisorgueil d944c2a
feat(billing): wrap crons with distributed lock
PierreBrisorgueil 218c329
docs(billing): document distributed cron lock + runbook entry
PierreBrisorgueil 6aaeb21
fix(tests): remove unused vars caught by lint (distributedLock)
PierreBrisorgueil d3264b8
fix(billing): address code-quality findings on cron lock
PierreBrisorgueil b971b2c
fix(billing): address DeepSeek gate findings on cron lock
PierreBrisorgueil 50a7a80
fix(billing): address CodeRabbit findings on PR #3688
PierreBrisorgueil b232b68
fix(billing): add loadModels to dunningSweep+extrasExpiration crons +…
PierreBrisorgueil File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /** | ||
| * Distributed lock primitive for multi-pod cron jobs. | ||
| * | ||
| * Uses a MongoDB TTL collection (`cron_locks`) to ensure mutual exclusion | ||
| * across replicas. A lock document expires automatically after `ttlMs` | ||
| * milliseconds — pod crashes therefore never permanently block scheduling. | ||
| * | ||
| * Usage: | ||
| * const holder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}` | ||
| * const acquired = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 10 * 60 * 1000, holder }) | ||
| * if (!acquired) return // another pod holds the lock | ||
| * try { | ||
| * // ... work | ||
| * } finally { | ||
| * await releaseLock({ name: 'billing.weeklyReset', holder }) | ||
| * } | ||
| */ | ||
|
|
||
| import mongoose from 'mongoose'; | ||
|
|
||
| const LockSchema = new mongoose.Schema( | ||
| { | ||
| _id: { type: String, required: true }, | ||
| lockedAt: { type: Date, required: true }, | ||
| lockedUntil: { type: Date, required: true }, | ||
| holder: { type: String, required: true }, | ||
| }, | ||
| { collection: 'cron_locks', versionKey: false }, | ||
| ); | ||
|
|
||
| // MongoDB TTL index — auto-deletes expired docs so stale locks don't accumulate. | ||
| LockSchema.index({ lockedUntil: 1 }, { expireAfterSeconds: 0 }); | ||
|
|
||
| export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', LockSchema); | ||
|
|
||
| /** | ||
| * @function acquireLock | ||
| * @description Attempt to acquire a named lock. Returns true if acquired, | ||
| * false if the lock is currently held by another holder. | ||
| * | ||
| * Implementation: findOneAndUpdate with upsert on the condition that either | ||
| * no doc exists (_id absent) or the existing doc has expired (lockedUntil < now). | ||
| * Duplicate-key errors (E11000) from the unique _id index are caught and | ||
| * returned as false (another pod raced to acquire simultaneously). | ||
| * | ||
| * @param {object} opts | ||
| * @param {string} opts.name - Unique lock name (e.g. 'billing.weeklyReset') | ||
| * @param {number} opts.ttlMs - Lock duration in milliseconds | ||
| * @param {string} opts.holder - Unique identifier for the calling pod/process | ||
| * @returns {Promise<boolean>} | ||
| */ | ||
| export async function acquireLock({ name, ttlMs, holder }) { | ||
| if (!Number.isFinite(ttlMs) || ttlMs <= 0) { | ||
| throw new Error(`acquireLock: ttlMs must be a positive number, received ${ttlMs}`); | ||
| } | ||
| const now = new Date(); | ||
| const lockedUntil = new Date(now.getTime() + ttlMs); | ||
| try { | ||
| const result = await CronLock.findOneAndUpdate( | ||
| { _id: name, lockedUntil: { $lt: now } }, | ||
| { $set: { lockedAt: now, lockedUntil, holder } }, | ||
| { upsert: true, returnDocument: 'after' }, | ||
| ); | ||
| return result?.holder === holder; | ||
| } catch (err) { | ||
| if (err.code === 11000) return false; | ||
| throw err; | ||
| } | ||
| } | ||
|
PierreBrisorgueil marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * @function releaseLock | ||
| * @description Release a lock only if the caller is the current holder. | ||
| * No-op if the lock is held by a different holder (prevents accidental release | ||
| * after a TTL expiry + re-acquire by another pod). | ||
| * | ||
| * @param {object} opts | ||
| * @param {string} opts.name - Lock name to release | ||
| * @param {string} opts.holder - Must match the holder that acquired the lock | ||
| * @returns {Promise<void>} | ||
| */ | ||
| export async function releaseLock({ name, holder }) { | ||
| await CronLock.deleteOne({ _id: name, holder }); | ||
| } | ||
|
|
||
| export default { CronLock, acquireLock, releaseLock }; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| /** | ||
| * Module dependencies. | ||
| */ | ||
| import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; | ||
|
|
||
| /** | ||
| * Unit tests for lib/services/distributedLock.js | ||
| * | ||
| * All Mongoose interactions are mocked — no real DB connection required. | ||
| * Tests verify the acquire / release contract and contention handling. | ||
| */ | ||
| describe('distributedLock — acquireLock:', () => { | ||
| let acquireLock; | ||
| let mockFindOneAndUpdate; | ||
| let mockDeleteOne; | ||
|
|
||
| beforeEach(async () => { | ||
| jest.resetModules(); | ||
|
|
||
| mockFindOneAndUpdate = jest.fn(); | ||
| mockDeleteOne = jest.fn(); | ||
|
|
||
| const mockCronLock = { | ||
| findOneAndUpdate: mockFindOneAndUpdate, | ||
| deleteOne: mockDeleteOne, | ||
| }; | ||
|
|
||
| jest.unstable_mockModule('mongoose', () => ({ | ||
| default: { | ||
| Schema: class MockSchema { | ||
| constructor() {} | ||
| index() {} | ||
| }, | ||
| models: {}, | ||
| model: jest.fn(() => mockCronLock), | ||
| }, | ||
| })); | ||
|
|
||
| ({ acquireLock } = await import('../distributedLock.js')); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| jest.restoreAllMocks(); | ||
| }); | ||
|
|
||
| test('returns true when findOneAndUpdate resolves with matching holder', async () => { | ||
| mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); | ||
|
|
||
| const ok = await acquireLock({ name: 'job-a', ttlMs: 60_000, holder: 'pod-1' }); | ||
|
|
||
| expect(ok).toBe(true); | ||
| expect(mockFindOneAndUpdate).toHaveBeenCalledTimes(1); | ||
| const [filter, update, opts] = mockFindOneAndUpdate.mock.calls[0]; | ||
| expect(filter._id).toBe('job-a'); | ||
| expect(filter.lockedUntil.$lt).toBeInstanceOf(Date); | ||
| expect(update.$set.holder).toBe('pod-1'); | ||
| expect(opts.upsert).toBe(true); | ||
| }); | ||
|
|
||
| test('returns false when findOneAndUpdate returns doc held by different holder', async () => { | ||
| mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); | ||
|
|
||
| const ok = await acquireLock({ name: 'job-b', ttlMs: 60_000, holder: 'pod-2' }); | ||
|
|
||
| expect(ok).toBe(false); | ||
| }); | ||
|
|
||
| test('returns false on E11000 duplicate-key (concurrent upsert race)', async () => { | ||
| const dupErr = new Error('E11000 duplicate key'); | ||
| dupErr.code = 11000; | ||
| mockFindOneAndUpdate.mockRejectedValue(dupErr); | ||
|
|
||
| const ok = await acquireLock({ name: 'job-c', ttlMs: 60_000, holder: 'pod-1' }); | ||
|
|
||
| expect(ok).toBe(false); | ||
| }); | ||
|
|
||
| test('re-throws non-duplicate errors', async () => { | ||
| const dbErr = new Error('network timeout'); | ||
| dbErr.code = 13; | ||
| mockFindOneAndUpdate.mockRejectedValue(dbErr); | ||
|
|
||
| await expect(acquireLock({ name: 'job-d', ttlMs: 60_000, holder: 'pod-1' })).rejects.toThrow('network timeout'); | ||
| }); | ||
|
|
||
| test.each([ | ||
| [0, 'zero'], | ||
| [-1, 'negative'], | ||
| [Number.NaN, 'NaN'], | ||
| [Infinity, 'Infinity'], | ||
| [undefined, 'undefined'], | ||
| [null, 'null'], | ||
| ])('throws when ttlMs is %s (%s)', async (ttlMs) => { | ||
| await expect(acquireLock({ name: 'job-guard', ttlMs, holder: 'pod-1' })).rejects.toThrow( | ||
| 'acquireLock: ttlMs must be a positive number', | ||
| ); | ||
| expect(mockFindOneAndUpdate).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| test('lockedUntil is set to now + ttlMs', async () => { | ||
| const before = Date.now(); | ||
| mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); | ||
|
|
||
| await acquireLock({ name: 'job-e', ttlMs: 10_000, holder: 'pod-1' }); | ||
|
|
||
| const after = Date.now(); | ||
| const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set; | ||
| expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 10_000); | ||
| expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + 10_000); | ||
| }); | ||
| }); | ||
|
|
||
| describe('distributedLock — releaseLock:', () => { | ||
| let releaseLock; | ||
| let mockDeleteOne; | ||
|
|
||
| beforeEach(async () => { | ||
| jest.resetModules(); | ||
|
|
||
| mockDeleteOne = jest.fn().mockResolvedValue({}); | ||
|
|
||
| const mockCronLock = { | ||
| findOneAndUpdate: jest.fn(), | ||
| deleteOne: mockDeleteOne, | ||
| }; | ||
|
|
||
| jest.unstable_mockModule('mongoose', () => ({ | ||
| default: { | ||
| Schema: class MockSchema { | ||
| constructor() {} | ||
| index() {} | ||
| }, | ||
| models: {}, | ||
| model: jest.fn(() => mockCronLock), | ||
| }, | ||
| })); | ||
|
|
||
| ({ releaseLock } = await import('../distributedLock.js')); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| jest.restoreAllMocks(); | ||
| }); | ||
|
|
||
| test('calls deleteOne with name and holder', async () => { | ||
| await releaseLock({ name: 'job-a', holder: 'pod-1' }); | ||
|
|
||
| expect(mockDeleteOne).toHaveBeenCalledWith({ _id: 'job-a', holder: 'pod-1' }); | ||
| }); | ||
|
|
||
| test('does not throw when deleteOne resolves', async () => { | ||
| await expect(releaseLock({ name: 'job-b', holder: 'pod-2' })).resolves.toBeUndefined(); | ||
| }); | ||
|
|
||
| test('propagates deleteOne errors to the caller', async () => { | ||
| const dbErr = new Error('network timeout'); | ||
| mockDeleteOne.mockRejectedValue(dbErr); | ||
| await expect(releaseLock({ name: 'job-c', holder: 'pod-1' })).rejects.toThrow('network timeout'); | ||
| }); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.