From acce10d7c57f6d32b632b1cda188e02f5eecbcf7 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:09:02 +0200 Subject: [PATCH 1/8] feat(lib): add distributed lock primitive for multi-pod crons MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mongo-backed TTL doc lock with auto-expiry. Acquired via findOneAndUpdate upsert with stale-or-absent predicate; released by holder match. Addresses devkit audit P1 (2026-05-21) — billing crons race when K8s concurrencyPolicy bypass occurs (e.g. crash post-jitter pre-finalize). --- lib/distributedLock.js | 83 +++++++++++ .../tests/distributedLock.unit.tests.js | 141 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 lib/distributedLock.js create mode 100644 lib/services/tests/distributedLock.unit.tests.js diff --git a/lib/distributedLock.js b/lib/distributedLock.js new file mode 100644 index 000000000..d66023c23 --- /dev/null +++ b/lib/distributedLock.js @@ -0,0 +1,83 @@ +/** + * Distributed lock primitive for multi-pod cron jobs. + * + * Uses a MongoDB TTL collection (`cron_locks`) to ensure mutual exclusion + * across replicas. A lock document expires automatically after `ttlMs` + * milliseconds — pod crashes therefore never permanently block scheduling. + * + * Usage: + * const holder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}` + * const acquired = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 10 * 60 * 1000, holder }) + * if (!acquired) return // another pod holds the lock + * try { + * // ... work + * } finally { + * await releaseLock({ name: 'billing.weeklyReset', holder }) + * } + */ + +import mongoose from 'mongoose'; + +const LockSchema = new mongoose.Schema( + { + _id: { type: String, required: true }, + lockedAt: { type: Date, required: true }, + lockedUntil: { type: Date, required: true }, + holder: { type: String, required: true }, + }, + { collection: 'cron_locks', versionKey: false }, +); + +// MongoDB TTL index — auto-deletes expired docs so stale locks don't accumulate. +LockSchema.index({ lockedUntil: 1 }, { expireAfterSeconds: 0 }); + +export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', LockSchema); + +/** + * @function acquireLock + * @description Attempt to acquire a named lock. Returns true if acquired, + * false if the lock is currently held by another holder. + * + * Implementation: findOneAndUpdate with upsert on the condition that either + * no doc exists (_id absent) or the existing doc has expired (lockedUntil < now). + * Duplicate-key errors (E11000) from the unique _id index are caught and + * returned as false (another pod raced to acquire simultaneously). + * + * @param {object} opts + * @param {string} opts.name - Unique lock name (e.g. 'billing.weeklyReset') + * @param {number} opts.ttlMs - Lock duration in milliseconds + * @param {string} opts.holder - Unique identifier for the calling pod/process + * @returns {Promise} + */ +export async function acquireLock({ name, ttlMs, holder }) { + const now = new Date(); + const lockedUntil = new Date(now.getTime() + ttlMs); + try { + const result = await CronLock.findOneAndUpdate( + { _id: name, lockedUntil: { $lt: now } }, + { $set: { lockedAt: now, lockedUntil, holder } }, + { upsert: true, new: true, setDefaultsOnInsert: true }, + ); + return result?.holder === holder; + } catch (err) { + if (err.code === 11000) return false; + throw err; + } +} + +/** + * @function releaseLock + * @description Release a lock only if the caller is the current holder. + * No-op if the lock is held by a different holder (prevents accidental release + * after a TTL expiry + re-acquire by another pod). + * + * @param {object} opts + * @param {string} opts.name - Lock name to release + * @param {string} opts.holder - Must match the holder that acquired the lock + * @returns {Promise} + */ +export async function releaseLock({ name, holder }) { + await CronLock.deleteOne({ _id: name, holder }); +} + +export default { CronLock, acquireLock, releaseLock }; diff --git a/lib/services/tests/distributedLock.unit.tests.js b/lib/services/tests/distributedLock.unit.tests.js new file mode 100644 index 000000000..0d589cc2d --- /dev/null +++ b/lib/services/tests/distributedLock.unit.tests.js @@ -0,0 +1,141 @@ +/** + * Module dependencies. + */ +import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; + +/** + * Unit tests for lib/distributedLock.js + * + * All Mongoose interactions are mocked — no real DB connection required. + * Tests verify the acquire / release contract and contention handling. + */ +describe('distributedLock — acquireLock:', () => { + let acquireLock; + let releaseLock; + let mockFindOneAndUpdate; + let mockDeleteOne; + + beforeEach(async () => { + jest.resetModules(); + + mockFindOneAndUpdate = jest.fn(); + mockDeleteOne = jest.fn(); + + const mockCronLock = { + findOneAndUpdate: mockFindOneAndUpdate, + deleteOne: mockDeleteOne, + }; + + jest.unstable_mockModule('mongoose', () => ({ + default: { + Schema: class MockSchema { + constructor() {} + index() {} + }, + models: {}, + model: jest.fn(() => mockCronLock), + }, + })); + + ({ acquireLock, releaseLock } = await import('../../distributedLock.js')); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('returns true when findOneAndUpdate resolves with matching holder', async () => { + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + const ok = await acquireLock({ name: 'job-a', ttlMs: 60_000, holder: 'pod-1' }); + + expect(ok).toBe(true); + expect(mockFindOneAndUpdate).toHaveBeenCalledTimes(1); + const [filter, update, opts] = mockFindOneAndUpdate.mock.calls[0]; + expect(filter._id).toBe('job-a'); + expect(filter.lockedUntil.$lt).toBeInstanceOf(Date); + expect(update.$set.holder).toBe('pod-1'); + expect(opts.upsert).toBe(true); + }); + + test('returns false when findOneAndUpdate returns doc held by different holder', async () => { + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + const ok = await acquireLock({ name: 'job-b', ttlMs: 60_000, holder: 'pod-2' }); + + expect(ok).toBe(false); + }); + + test('returns false on E11000 duplicate-key (concurrent upsert race)', async () => { + const dupErr = new Error('E11000 duplicate key'); + dupErr.code = 11000; + mockFindOneAndUpdate.mockRejectedValue(dupErr); + + const ok = await acquireLock({ name: 'job-c', ttlMs: 60_000, holder: 'pod-1' }); + + expect(ok).toBe(false); + }); + + test('re-throws non-duplicate errors', async () => { + const dbErr = new Error('network timeout'); + dbErr.code = 13; + mockFindOneAndUpdate.mockRejectedValue(dbErr); + + await expect(acquireLock({ name: 'job-d', ttlMs: 60_000, holder: 'pod-1' })).rejects.toThrow('network timeout'); + }); + + test('lockedUntil is set to now + ttlMs', async () => { + const before = Date.now(); + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + await acquireLock({ name: 'job-e', ttlMs: 10_000, holder: 'pod-1' }); + + const after = Date.now(); + const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set; + expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 10_000); + expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + 10_000); + }); +}); + +describe('distributedLock — releaseLock:', () => { + let releaseLock; + let mockDeleteOne; + + beforeEach(async () => { + jest.resetModules(); + + mockDeleteOne = jest.fn().mockResolvedValue({}); + + const mockCronLock = { + findOneAndUpdate: jest.fn(), + deleteOne: mockDeleteOne, + }; + + jest.unstable_mockModule('mongoose', () => ({ + default: { + Schema: class MockSchema { + constructor() {} + index() {} + }, + models: {}, + model: jest.fn(() => mockCronLock), + }, + })); + + ({ releaseLock } = await import('../../distributedLock.js')); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('calls deleteOne with name and holder', async () => { + await releaseLock({ name: 'job-a', holder: 'pod-1' }); + + expect(mockDeleteOne).toHaveBeenCalledWith({ _id: 'job-a', holder: 'pod-1' }); + }); + + test('does not throw when deleteOne resolves', async () => { + await expect(releaseLock({ name: 'job-b', holder: 'pod-2' })).resolves.toBeUndefined(); + }); +}); From d944c2a81537bd60077f24c19345c4c6b28f5dc9 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:13:33 +0200 Subject: [PATCH 2/8] feat(billing): wrap crons with distributed lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit weeklyReset, dunningSweep, extrasExpiration now acquire a distributed lock before mutating subscriptions. Skip-on-contention with info log; release in finally for clean recovery. Also fixes Mongoose 8 deprecation: `new: true` → `returnDocument: 'after'` in findOneAndUpdate. Closes audit P1 — multi-pod race on crash post-jitter pre-finalize. --- lib/distributedLock.js | 2 +- modules/billing/crons/billing.dunningSweep.js | 101 +++++++++++------- .../billing/crons/billing.extrasExpiration.js | 59 ++++++---- modules/billing/crons/billing.weeklyReset.js | 27 ++++- .../billing.cron-lock.integration.tests.js | 100 +++++++++++++++++ 5 files changed, 223 insertions(+), 66 deletions(-) create mode 100644 modules/billing/tests/billing.cron-lock.integration.tests.js diff --git a/lib/distributedLock.js b/lib/distributedLock.js index d66023c23..07495e5a4 100644 --- a/lib/distributedLock.js +++ b/lib/distributedLock.js @@ -56,7 +56,7 @@ export async function acquireLock({ name, ttlMs, holder }) { const result = await CronLock.findOneAndUpdate( { _id: name, lockedUntil: { $lt: now } }, { $set: { lockedAt: now, lockedUntil, holder } }, - { upsert: true, new: true, setDefaultsOnInsert: true }, + { upsert: true, returnDocument: 'after', setDefaultsOnInsert: true }, ); return result?.holder === holder; } catch (err) { diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index 6fb46f18a..008ecd8d6 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -17,6 +17,8 @@ * NODE_ENV=production node modules/billing/crons/billing.dunningSweep.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -25,12 +27,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs, getDunningThresholdDays }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -38,63 +42,78 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.dunningSweep'; +const LOCK_TTL_MS = 15 * 60 * 1000; // 15 min + const startMs = Date.now(); logger.info('[cron.dunningSweep] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); await mongooseService.connect(); - const [{ default: BillingSubscriptionRepository }, { default: OrganizationRepository }] = await Promise.all([ - import('../repositories/billing.subscription.repository.js'), - import('../../organizations/repositories/organizations.repository.js'), - ]); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const [{ default: BillingSubscriptionRepository }, { default: OrganizationRepository }] = await Promise.all([ + import('../repositories/billing.subscription.repository.js'), + import('../../organizations/repositories/organizations.repository.js'), + ]); - const now = new Date(); - const threshold = new Date(now.getTime() - getDunningThresholdDays() * 24 * 60 * 60 * 1000); + const now = new Date(); + const threshold = new Date(now.getTime() - getDunningThresholdDays() * 24 * 60 * 60 * 1000); - const staleSubs = await BillingSubscriptionRepository.findStaleDunning(threshold); - logger.info('[cron.dunningSweep] stale past_due subscriptions found', { count: staleSubs.length }); + const staleSubs = await BillingSubscriptionRepository.findStaleDunning(threshold); + logger.info('[cron.dunningSweep] stale past_due subscriptions found', { count: staleSubs.length }); - let processed = 0; - let errors = 0; - let desyncErrors = 0; + let processed = 0; + let errors = 0; + let desyncErrors = 0; - for (const sub of staleSubs) { - try { - const subscription = await BillingSubscriptionRepository.markUnpaid(String(sub._id), threshold); - if (!subscription) { - logger.info('[cron.dunningSweep] sub skipped — already recovered', { subId: String(sub._id) }); - continue; - } + for (const sub of staleSubs) { + try { + const subscription = await BillingSubscriptionRepository.markUnpaid(String(sub._id), threshold); + if (!subscription) { + logger.info('[cron.dunningSweep] sub skipped — already recovered', { subId: String(sub._id) }); + continue; + } + + try { + await OrganizationRepository.setPlan(String(sub.organization), 'free'); + } catch (orgErr) { + // Compensation: Subscription is now unpaid but Org.plan update failed. + // Log for manual reconciliation — do not revert Subscription status. + logger.error('[cron.dunningSweep] Org plan sync failed (manual reconciliation required)', { + subId: String(sub._id), + orgId: String(sub.organization), + err: orgErr?.message, + stack: orgErr?.stack, + }); + desyncErrors += 1; + } - try { - await OrganizationRepository.setPlan(String(sub.organization), 'free'); - } catch (orgErr) { - // Compensation: Subscription is now unpaid but Org.plan update failed. - // Log for manual reconciliation — do not revert Subscription status. - logger.error('[cron.dunningSweep] Org plan sync failed (manual reconciliation required)', { - subId: String(sub._id), - orgId: String(sub.organization), - err: orgErr?.message, - stack: orgErr?.stack, - }); - desyncErrors += 1; + logger.info('[cron.dunningSweep] sub transitioned to unpaid', { + subId: String(sub._id), + orgId: String(sub.organization), + }); + processed += 1; + } catch (err) { + errors += 1; + logger.error('[cron.dunningSweep] failed for sub', { subId: String(sub._id), err: err?.message, stack: err?.stack }); + } } - logger.info('[cron.dunningSweep] sub transitioned to unpaid', { - subId: String(sub._id), - orgId: String(sub.organization), - }); - processed += 1; - } catch (err) { - errors += 1; - logger.error('[cron.dunningSweep] failed for sub', { subId: String(sub._id), err: err?.message, stack: err?.stack }); + logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); + process.exitCode = errors > 0 ? 1 : 0; + } finally { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } } - - logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); - process.exitCode = errors > 0 ? 1 : 0; } catch (err) { logger.error('[cron.dunningSweep] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index 2e9b2de04..b6a7be035 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -12,6 +12,8 @@ * NODE_ENV=production node modules/billing/crons/billing.extrasExpiration.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -20,12 +22,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -33,38 +37,53 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.extrasExpiration'; +const LOCK_TTL_MS = 5 * 60 * 1000; // 5 min + const startMs = Date.now(); logger.info('[cron.extrasExpiration] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); await mongooseService.connect(); - const [{ default: BillingExtraService }, { default: BillingExtraBalanceRepository }] = - await Promise.all([ - import('../services/billing.extra.service.js'), - import('../repositories/billing.extraBalance.repository.js'), - ]); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const [{ default: BillingExtraService }, { default: BillingExtraBalanceRepository }] = + await Promise.all([ + import('../services/billing.extra.service.js'), + import('../repositories/billing.extraBalance.repository.js'), + ]); + + const now = new Date(); + const orgIds = await BillingExtraBalanceRepository.findOrgsWithExpiringTopups(now); - const now = new Date(); - const orgIds = await BillingExtraBalanceRepository.findOrgsWithExpiringTopups(now); + let processed = 0; + let errors = 0; - let processed = 0; - let errors = 0; + for (const orgId of orgIds) { + try { + const added = await BillingExtraService.expireOldEntries(orgId); + logger.info('[cron.extrasExpiration] org processed', { orgId: String(orgId), entriesAdded: added }); + processed += 1; + } catch (err) { + errors += 1; + logger.error('[cron.extrasExpiration] expireOldEntries failed', { orgId: String(orgId), err: err?.message, stack: err?.stack }); + } + } - for (const orgId of orgIds) { - try { - const added = await BillingExtraService.expireOldEntries(orgId); - logger.info('[cron.extrasExpiration] org processed', { orgId: String(orgId), entriesAdded: added }); - processed += 1; - } catch (err) { - errors += 1; - logger.error('[cron.extrasExpiration] expireOldEntries failed', { orgId: String(orgId), err: err?.message, stack: err?.stack }); + logger.info('[cron.extrasExpiration] complete', { processed, errors, durationMs: Date.now() - startMs }); + process.exitCode = errors > 0 ? 1 : 0; + } finally { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } } - - logger.info('[cron.extrasExpiration] complete', { processed, errors, durationMs: Date.now() - startMs }); - process.exitCode = errors > 0 ? 1 : 0; } catch (err) { logger.error('[cron.extrasExpiration] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/crons/billing.weeklyReset.js b/modules/billing/crons/billing.weeklyReset.js index ed77ebbe6..d73f7266a 100644 --- a/modules/billing/crons/billing.weeklyReset.js +++ b/modules/billing/crons/billing.weeklyReset.js @@ -11,6 +11,8 @@ * NODE_ENV=production node modules/billing/crons/billing.weeklyReset.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -19,12 +21,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -32,19 +36,34 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.weeklyReset'; +const LOCK_TTL_MS = 10 * 60 * 1000; // 10 min + const startMs = Date.now(); logger.info('[cron.weeklyReset] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); await mongooseService.loadModels(); await mongooseService.connect(); - const { default: BillingResetService } = await import('../services/billing.reset.service.js'); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const { default: BillingResetService } = await import('../services/billing.reset.service.js'); - const result = await BillingResetService.resetAllDue(); - logger.info('[cron.weeklyReset] complete', { processed: result.processed, errors: result.errors, durationMs: Date.now() - startMs }); - process.exitCode = result.errors > 0 ? 1 : 0; + const result = await BillingResetService.resetAllDue(); + logger.info('[cron.weeklyReset] complete', { processed: result.processed, errors: result.errors, durationMs: Date.now() - startMs }); + process.exitCode = result.errors > 0 ? 1 : 0; + } finally { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } + } } catch (err) { logger.error('[cron.weeklyReset] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js new file mode 100644 index 000000000..e6682939c --- /dev/null +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -0,0 +1,100 @@ +/** + * Module dependencies. + */ +import mongoose from 'mongoose'; +import { describe, beforeAll, beforeEach, afterAll, test, expect } from '@jest/globals'; + +import mongooseService from '../../../lib/services/mongoose.js'; +import { acquireLock, releaseLock, CronLock } from '../../../lib/distributedLock.js'; + +/** + * Integration tests for lib/distributedLock.js — real MongoDB. + * + * Verifies the acquire / release / contention / expiry contract end-to-end. + * The cron scripts themselves are top-level-await CLI entry points that cannot + * be imported in tests; the lock primitive they delegate to is tested here. + */ +describe('distributedLock integration — acquire / release contract:', () => { + beforeAll(async () => { + await mongooseService.loadModels(); + await mongooseService.connect(); + }); + + beforeEach(async () => { + await CronLock.deleteMany({}); + }); + + afterAll(async () => { + await CronLock.deleteMany({}); + await mongooseService.disconnect(); + }); + + test('acquires lock when collection is empty', async () => { + const ok = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 60_000, holder: 'pod-1' }); + expect(ok).toBe(true); + + const doc = await CronLock.findById('billing.weeklyReset'); + expect(doc).not.toBeNull(); + expect(doc.holder).toBe('pod-1'); + }); + + test('rejects acquire when lock is held by another holder', async () => { + await acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-1' }); + const ok = await acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-2' }); + expect(ok).toBe(false); + }); + + test('allows acquire when previous lock has expired (ttlMs < 0)', async () => { + // Seed an already-expired lock: lockedUntil in the past + await CronLock.create({ + _id: 'billing.extrasExpiration', + lockedAt: new Date(Date.now() - 10_000), + lockedUntil: new Date(Date.now() - 5_000), + holder: 'old-pod', + }); + + const ok = await acquireLock({ name: 'billing.extrasExpiration', ttlMs: 60_000, holder: 'pod-2' }); + expect(ok).toBe(true); + }); + + test('releaseLock removes doc only when holder matches', async () => { + await acquireLock({ name: 'billing.weeklyReset', ttlMs: 60_000, holder: 'pod-1' }); + + // Wrong holder — lock should remain + await releaseLock({ name: 'billing.weeklyReset', holder: 'pod-2' }); + const stillHeld = await CronLock.findById('billing.weeklyReset'); + expect(stillHeld).not.toBeNull(); + expect(stillHeld.holder).toBe('pod-1'); + + // Correct holder — lock released + await releaseLock({ name: 'billing.weeklyReset', holder: 'pod-1' }); + const gone = await CronLock.findById('billing.weeklyReset'); + expect(gone).toBeNull(); + }); + + test('concurrent acquires: only one pod wins (Promise.all race)', async () => { + const results = await Promise.all([ + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-A' }), + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-B' }), + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-C' }), + ]); + + const wins = results.filter(Boolean); + expect(wins).toHaveLength(1); + + const doc = await CronLock.findById('billing.dunningSweep'); + expect(doc).not.toBeNull(); + // The winning holder is one of the three + expect(['pod-A', 'pod-B', 'pod-C']).toContain(doc.holder); + }); + + test('acquire sets correct TTL window', async () => { + const before = Date.now(); + await acquireLock({ name: 'billing.extrasExpiration', ttlMs: 5 * 60 * 1000, holder: 'pod-1' }); + const after = Date.now(); + + const doc = await CronLock.findById('billing.extrasExpiration'); + expect(doc.lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 5 * 60 * 1000); + expect(doc.lockedUntil.getTime()).toBeLessThanOrEqual(after + 5 * 60 * 1000); + }); +}); From 218c329d0552ee195ee7563049a3df5e411c8ea2 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:14:10 +0200 Subject: [PATCH 3/8] docs(billing): document distributed cron lock + runbook entry --- modules/billing/RUNBOOKS.md | 33 +++++++++++++++++++++++++++++++++ modules/billing/crons/README.md | 19 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/modules/billing/RUNBOOKS.md b/modules/billing/RUNBOOKS.md index 157ef0568..77a4ac158 100644 --- a/modules/billing/RUNBOOKS.md +++ b/modules/billing/RUNBOOKS.md @@ -175,3 +175,36 @@ Operational runbooks for the billing module. Each runbook references real endpoi 4. Monitor `billing.plans.stale` event frequency — if the stale cache is 24h+, alert the on-call to decide whether to take the plans endpoint down entirely or serve a static fallback. 5. Once Stripe recovers: `POST /api/admin/billing/sync/:orgId` on any org that attempted a subscription change during the outage. 6. Check dead-letter queue for events that exhausted retries during the outage window: `GET /api/admin/billing/dead-letters`. + +--- + +## 6 — Cron lock stuck + +**Symptom:** All billing crons emit `lock held by another pod, skipping` for longer than the lock TTL duration, meaning no billing cron is running at all. + +**Cause:** A pod crashed mid-job without reaching the `finally` block that calls `releaseLock`. The TTL has not yet expired on the stale lock doc in `cron_locks`. + +**Wait first:** Lock TTLs are sized 2–3× typical exec time. Wait for the TTL to expire (max 15 min for `dunningSweep`). MongoDB's TTL monitor runs every 60 seconds, so actual cleanup may lag up to 60 s after expiry. + +**If urgent — drop the stale lock manually:** + +```js +// weeklyReset +db.cron_locks.deleteOne({ _id: "billing.weeklyReset" }) + +// dunningSweep +db.cron_locks.deleteOne({ _id: "billing.dunningSweep" }) + +// extrasExpiration +db.cron_locks.deleteOne({ _id: "billing.extrasExpiration" }) +``` + +Or via `kubectl exec` on the mongo pod: + +```bash +kubectl exec -n pierreb-projects mongo-0 -- mongosh \ + "mongodb://localhost:27017/" \ + --eval 'db.cron_locks.deleteOne({ _id: "billing.weeklyReset" })' +``` + +**Prevention:** Lock TTLs are intentionally conservative. If you see frequent stuck-lock incidents, investigate cron duration (slow query? tenant scale?) rather than lower the TTL — a TTL too short defeats the mutual-exclusion guarantee. diff --git a/modules/billing/crons/README.md b/modules/billing/crons/README.md index 23c675294..3321937a9 100644 --- a/modules/billing/crons/README.md +++ b/modules/billing/crons/README.md @@ -118,3 +118,22 @@ const orgs = allOrgs.filter(o => { ## Dependency: meterMode flag All scripts check `config.billing.meterMode` at startup. Downstream projects must set this flag to `true` in their project config to activate billing crons. The devkit default is `false` — all crons are no-ops until explicitly enabled. + +## Concurrency control + +All billing crons acquire a distributed lock (`lib/distributedLock.js`) before +mutating state. The lock auto-expires after TTL (5–15 min depending on cron) +so that pod crashes don't permanently block scheduling. + +Lock names and TTLs: + +| Lock name | TTL | Cron | +|-----------|-----|------| +| `billing.weeklyReset` | 10 min | `billing.weeklyReset.js` | +| `billing.dunningSweep` | 15 min | `billing.dunningSweep.js` | +| `billing.extrasExpiration` | 5 min | `billing.extrasExpiration.js` | + +If you see `lock held by another pod, skipping` in logs, that is expected when +two pods race after a K8s `concurrencyPolicy` bypass (e.g. pod crash after +jitter but before finalize). See the runbook entry `## Cron lock stuck` in +`modules/billing/RUNBOOKS.md` for manual resolution. From 6aaeb21b7d404cdfc7cbeec56e4e9df7b9717dca Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:15:13 +0200 Subject: [PATCH 4/8] fix(tests): remove unused vars caught by lint (distributedLock) --- lib/services/tests/distributedLock.unit.tests.js | 3 +-- modules/billing/tests/billing.cron-lock.integration.tests.js | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/services/tests/distributedLock.unit.tests.js b/lib/services/tests/distributedLock.unit.tests.js index 0d589cc2d..36c73289d 100644 --- a/lib/services/tests/distributedLock.unit.tests.js +++ b/lib/services/tests/distributedLock.unit.tests.js @@ -11,7 +11,6 @@ import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globa */ describe('distributedLock — acquireLock:', () => { let acquireLock; - let releaseLock; let mockFindOneAndUpdate; let mockDeleteOne; @@ -37,7 +36,7 @@ describe('distributedLock — acquireLock:', () => { }, })); - ({ acquireLock, releaseLock } = await import('../../distributedLock.js')); + ({ acquireLock } = await import('../../distributedLock.js')); }); afterEach(() => { diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js index e6682939c..d5923bf59 100644 --- a/modules/billing/tests/billing.cron-lock.integration.tests.js +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -1,7 +1,6 @@ /** * Module dependencies. */ -import mongoose from 'mongoose'; import { describe, beforeAll, beforeEach, afterAll, test, expect } from '@jest/globals'; import mongooseService from '../../../lib/services/mongoose.js'; From d3264b8b8ccc641c6213ea3a4e235be5033db2b2 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:28:52 +0200 Subject: [PATCH 5/8] fix(billing): address code-quality findings on cron lock - Move distributedLock unit test to canonical lib location (or document deviation) - Add releaseLock throw-path test + non-fatal comment in 3 crons' finally - Harmonize log style in skip-on-contention path - Rename + document integration test (in-process concurrency clarification) - Add findOne verification step in RUNBOOKS before deleteOne Addresses code-quality review I1, I2, M1, M3, M4. --- lib/{services => }/tests/distributedLock.unit.tests.js | 10 ++++++++-- modules/billing/RUNBOOKS.md | 10 ++++++++++ modules/billing/crons/billing.dunningSweep.js | 6 +++++- modules/billing/crons/billing.extrasExpiration.js | 6 +++++- modules/billing/crons/billing.weeklyReset.js | 6 +++++- .../tests/billing.cron-lock.integration.tests.js | 4 +++- 6 files changed, 36 insertions(+), 6 deletions(-) rename lib/{services => }/tests/distributedLock.unit.tests.js (90%) diff --git a/lib/services/tests/distributedLock.unit.tests.js b/lib/tests/distributedLock.unit.tests.js similarity index 90% rename from lib/services/tests/distributedLock.unit.tests.js rename to lib/tests/distributedLock.unit.tests.js index 36c73289d..381f4f6c2 100644 --- a/lib/services/tests/distributedLock.unit.tests.js +++ b/lib/tests/distributedLock.unit.tests.js @@ -36,7 +36,7 @@ describe('distributedLock — acquireLock:', () => { }, })); - ({ acquireLock } = await import('../../distributedLock.js')); + ({ acquireLock } = await import('../distributedLock.js')); }); afterEach(() => { @@ -121,7 +121,7 @@ describe('distributedLock — releaseLock:', () => { }, })); - ({ releaseLock } = await import('../../distributedLock.js')); + ({ releaseLock } = await import('../distributedLock.js')); }); afterEach(() => { @@ -137,4 +137,10 @@ describe('distributedLock — releaseLock:', () => { test('does not throw when deleteOne resolves', async () => { await expect(releaseLock({ name: 'job-b', holder: 'pod-2' })).resolves.toBeUndefined(); }); + + test('propagates deleteOne errors to the caller', async () => { + const dbErr = new Error('network timeout'); + mockDeleteOne.mockRejectedValue(dbErr); + await expect(releaseLock({ name: 'job-c', holder: 'pod-1' })).rejects.toThrow('network timeout'); + }); }); diff --git a/modules/billing/RUNBOOKS.md b/modules/billing/RUNBOOKS.md index 77a4ac158..e932319c5 100644 --- a/modules/billing/RUNBOOKS.md +++ b/modules/billing/RUNBOOKS.md @@ -188,6 +188,16 @@ Operational runbooks for the billing module. Each runbook references real endpoi **If urgent — drop the stale lock manually:** +**Before drop:** verify the holder and TTL window first to avoid kicking a running cron. + +```js +db.cron_locks.findOne({ _id: "billing.weeklyReset" }) +// If lockedUntil is in the past → safe to drop. +// If in the future → the lock is genuinely held; wait for TTL unless the holder pod is confirmed dead. +``` + +Then drop: + ```js // weeklyReset db.cron_locks.deleteOne({ _id: "billing.weeklyReset" }) diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index 008ecd8d6..4a6c04815 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -56,7 +56,7 @@ try { lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); if (!acquired) { - logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + logger.info('[cron.dunningSweep] lock held by another pod, skipping'); process.exitCode = 0; } else { try { @@ -111,6 +111,10 @@ try { logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); process.exitCode = errors > 0 ? 1 : 0; } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // If this throws, the outer catch logs "failed" and sets exitCode=1 + // (misleading — the cron's actual work succeeded). Operators can grep + // for "failed to release lock" to distinguish. await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } } diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index b6a7be035..49fba7b2b 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -51,7 +51,7 @@ try { lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); if (!acquired) { - logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + logger.info('[cron.extrasExpiration] lock held by another pod, skipping'); process.exitCode = 0; } else { try { @@ -81,6 +81,10 @@ try { logger.info('[cron.extrasExpiration] complete', { processed, errors, durationMs: Date.now() - startMs }); process.exitCode = errors > 0 ? 1 : 0; } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // If this throws, the outer catch logs "failed" and sets exitCode=1 + // (misleading — the cron's actual work succeeded). Operators can grep + // for "failed to release lock" to distinguish. await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } } diff --git a/modules/billing/crons/billing.weeklyReset.js b/modules/billing/crons/billing.weeklyReset.js index d73f7266a..29b3e76b2 100644 --- a/modules/billing/crons/billing.weeklyReset.js +++ b/modules/billing/crons/billing.weeklyReset.js @@ -51,7 +51,7 @@ try { lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); if (!acquired) { - logger.info({ cron: LOCK_NAME }, 'lock held by another pod, skipping'); + logger.info('[cron.weeklyReset] lock held by another pod, skipping'); process.exitCode = 0; } else { try { @@ -61,6 +61,10 @@ try { logger.info('[cron.weeklyReset] complete', { processed: result.processed, errors: result.errors, durationMs: Date.now() - startMs }); process.exitCode = result.errors > 0 ? 1 : 0; } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // If this throws, the outer catch logs "failed" and sets exitCode=1 + // (misleading — the cron's actual work succeeded). Operators can grep + // for "failed to release lock" to distinguish. await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } } diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js index d5923bf59..92a20359b 100644 --- a/modules/billing/tests/billing.cron-lock.integration.tests.js +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -71,7 +71,9 @@ describe('distributedLock integration — acquire / release contract:', () => { expect(gone).toBeNull(); }); - test('concurrent acquires: only one pod wins (Promise.all race)', async () => { + test('only one of N concurrent in-process acquires wins via E11000', async () => { + // Node event loop: 3 Promise.all fires hit Mongo concurrently (not serialized by await). + // Real inter-pod race uses the same E11000 path; this exercises the same mechanism. const results = await Promise.all([ acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-A' }), acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-B' }), From b971b2cc37c16ba66c5fce1d8f7aaf8ad23328cc Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 21:52:12 +0200 Subject: [PATCH 6/8] fix(billing): address DeepSeek gate findings on cron lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - HIGH: restore (errors || desyncErrors) condition in dunningSweep exit code (regression introduced when wrapping with lock — would make K8s alerts silent) - MEDIUM: wrap releaseLock in try/catch within finally to preserve original work error if both throw - LOW: guard acquireLock against invalid ttlMs (must be positive finite) - NIT: drop redundant setDefaultsOnInsert (all fields explicitly $set) Addresses gate iteration 1 BLOCK. --- lib/distributedLock.js | 5 ++++- lib/tests/distributedLock.unit.tests.js | 14 ++++++++++++++ modules/billing/crons/billing.dunningSweep.js | 15 ++++++++++----- modules/billing/crons/billing.extrasExpiration.js | 13 +++++++++---- modules/billing/crons/billing.weeklyReset.js | 13 +++++++++---- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/lib/distributedLock.js b/lib/distributedLock.js index 07495e5a4..ba929d854 100644 --- a/lib/distributedLock.js +++ b/lib/distributedLock.js @@ -50,13 +50,16 @@ export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', L * @returns {Promise} */ export async function acquireLock({ name, ttlMs, holder }) { + if (!Number.isFinite(ttlMs) || ttlMs <= 0) { + throw new Error(`acquireLock: ttlMs must be a positive number, received ${ttlMs}`); + } const now = new Date(); const lockedUntil = new Date(now.getTime() + ttlMs); try { const result = await CronLock.findOneAndUpdate( { _id: name, lockedUntil: { $lt: now } }, { $set: { lockedAt: now, lockedUntil, holder } }, - { upsert: true, returnDocument: 'after', setDefaultsOnInsert: true }, + { upsert: true, returnDocument: 'after' }, ); return result?.holder === holder; } catch (err) { diff --git a/lib/tests/distributedLock.unit.tests.js b/lib/tests/distributedLock.unit.tests.js index 381f4f6c2..0f2c959ea 100644 --- a/lib/tests/distributedLock.unit.tests.js +++ b/lib/tests/distributedLock.unit.tests.js @@ -83,6 +83,20 @@ describe('distributedLock — acquireLock:', () => { await expect(acquireLock({ name: 'job-d', ttlMs: 60_000, holder: 'pod-1' })).rejects.toThrow('network timeout'); }); + test.each([ + [0, 'zero'], + [-1, 'negative'], + [Number.NaN, 'NaN'], + [Infinity, 'Infinity'], + [undefined, 'undefined'], + [null, 'null'], + ])('throws when ttlMs is %s (%s)', async (ttlMs) => { + await expect(acquireLock({ name: 'job-guard', ttlMs, holder: 'pod-1' })).rejects.toThrow( + 'acquireLock: ttlMs must be a positive number', + ); + expect(mockFindOneAndUpdate).not.toHaveBeenCalled(); + }); + test('lockedUntil is set to now + ttlMs', async () => { const before = Date.now(); mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index 4a6c04815..41e11dc05 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -109,13 +109,18 @@ try { } logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); - process.exitCode = errors > 0 ? 1 : 0; + process.exitCode = errors > 0 || desyncErrors > 0 ? 1 : 0; } finally { // releaseLock failure is non-fatal: lock auto-expires on TTL. - // If this throws, the outer catch logs "failed" and sets exitCode=1 - // (misleading — the cron's actual work succeeded). Operators can grep - // for "failed to release lock" to distinguish. - await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + // Log separately to preserve any original work error. + try { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error( + { err: releaseErr, cron: LOCK_NAME }, + '[cron.dunningSweep] failed to release lock — will auto-expire on TTL', + ); + } } } } catch (err) { diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index 49fba7b2b..0f8530908 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -82,10 +82,15 @@ try { process.exitCode = errors > 0 ? 1 : 0; } finally { // releaseLock failure is non-fatal: lock auto-expires on TTL. - // If this throws, the outer catch logs "failed" and sets exitCode=1 - // (misleading — the cron's actual work succeeded). Operators can grep - // for "failed to release lock" to distinguish. - await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + // Log separately to preserve any original work error. + try { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error( + { err: releaseErr, cron: LOCK_NAME }, + '[cron.extrasExpiration] failed to release lock — will auto-expire on TTL', + ); + } } } } catch (err) { diff --git a/modules/billing/crons/billing.weeklyReset.js b/modules/billing/crons/billing.weeklyReset.js index 29b3e76b2..2dfea67b5 100644 --- a/modules/billing/crons/billing.weeklyReset.js +++ b/modules/billing/crons/billing.weeklyReset.js @@ -62,10 +62,15 @@ try { process.exitCode = result.errors > 0 ? 1 : 0; } finally { // releaseLock failure is non-fatal: lock auto-expires on TTL. - // If this throws, the outer catch logs "failed" and sets exitCode=1 - // (misleading — the cron's actual work succeeded). Operators can grep - // for "failed to release lock" to distinguish. - await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + // Log separately to preserve any original work error. + try { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error( + { err: releaseErr, cron: LOCK_NAME }, + '[cron.weeklyReset] failed to release lock — will auto-expire on TTL', + ); + } } } } catch (err) { From 50a7a80a67aa86657fef3e2c0af78f24b1afd9a0 Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Thu, 21 May 2026 22:11:01 +0200 Subject: [PATCH 7/8] fix(billing): address CodeRabbit findings on PR #3688 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move lib/distributedLock.js → lib/services/distributedLock.js (aligns with existing convention for service-level primitives) - Move lib/tests/distributedLock.unit.tests.js → lib/services/tests/ - Update import paths in 3 billing crons + integration tests - Flip logger.error arg order in dunningSweep + extrasExpiration finally (match file convention: (message, meta) not (meta, message)) - Same fix applied to weeklyReset (same pattern found, not flagged by CR) - Update modules/billing/crons/README.md → reference '## 6 — Cron lock stuck' (exact match with RUNBOOKS section header) Heartbeat/renewLock suggestion intentionally NOT added — see PR reply. --- lib/{ => services}/distributedLock.js | 0 lib/{ => services}/tests/distributedLock.unit.tests.js | 2 +- modules/billing/crons/README.md | 4 ++-- modules/billing/crons/billing.dunningSweep.js | 10 +++++----- modules/billing/crons/billing.extrasExpiration.js | 10 +++++----- modules/billing/crons/billing.weeklyReset.js | 10 +++++----- .../tests/billing.cron-lock.integration.tests.js | 4 ++-- 7 files changed, 20 insertions(+), 20 deletions(-) rename lib/{ => services}/distributedLock.js (100%) rename lib/{ => services}/tests/distributedLock.unit.tests.js (98%) diff --git a/lib/distributedLock.js b/lib/services/distributedLock.js similarity index 100% rename from lib/distributedLock.js rename to lib/services/distributedLock.js diff --git a/lib/tests/distributedLock.unit.tests.js b/lib/services/tests/distributedLock.unit.tests.js similarity index 98% rename from lib/tests/distributedLock.unit.tests.js rename to lib/services/tests/distributedLock.unit.tests.js index 0f2c959ea..2d7766e0f 100644 --- a/lib/tests/distributedLock.unit.tests.js +++ b/lib/services/tests/distributedLock.unit.tests.js @@ -4,7 +4,7 @@ import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; /** - * Unit tests for lib/distributedLock.js + * Unit tests for lib/services/distributedLock.js * * All Mongoose interactions are mocked — no real DB connection required. * Tests verify the acquire / release contract and contention handling. diff --git a/modules/billing/crons/README.md b/modules/billing/crons/README.md index 3321937a9..0e4153345 100644 --- a/modules/billing/crons/README.md +++ b/modules/billing/crons/README.md @@ -121,7 +121,7 @@ All scripts check `config.billing.meterMode` at startup. Downstream projects mus ## Concurrency control -All billing crons acquire a distributed lock (`lib/distributedLock.js`) before +All billing crons acquire a distributed lock (`lib/services/distributedLock.js`) before mutating state. The lock auto-expires after TTL (5–15 min depending on cron) so that pod crashes don't permanently block scheduling. @@ -135,5 +135,5 @@ Lock names and TTLs: If you see `lock held by another pod, skipping` in logs, that is expected when two pods race after a K8s `concurrencyPolicy` bypass (e.g. pod crash after -jitter but before finalize). See the runbook entry `## Cron lock stuck` in +jitter but before finalize). See the runbook entry `## 6 — Cron lock stuck` in `modules/billing/RUNBOOKS.md` for manual resolution. diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index 41e11dc05..bbf4986ee 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -34,7 +34,7 @@ const [ import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), - import('../../../lib/distributedLock.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -116,10 +116,10 @@ try { try { await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } catch (releaseErr) { - logger.error( - { err: releaseErr, cron: LOCK_NAME }, - '[cron.dunningSweep] failed to release lock — will auto-expire on TTL', - ); + logger.error('[cron.dunningSweep] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, + }); } } } diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index 0f8530908..68a197465 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -29,7 +29,7 @@ const [ import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), - import('../../../lib/distributedLock.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -86,10 +86,10 @@ try { try { await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } catch (releaseErr) { - logger.error( - { err: releaseErr, cron: LOCK_NAME }, - '[cron.extrasExpiration] failed to release lock — will auto-expire on TTL', - ); + logger.error('[cron.extrasExpiration] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, + }); } } } diff --git a/modules/billing/crons/billing.weeklyReset.js b/modules/billing/crons/billing.weeklyReset.js index 2dfea67b5..3c5ca7904 100644 --- a/modules/billing/crons/billing.weeklyReset.js +++ b/modules/billing/crons/billing.weeklyReset.js @@ -28,7 +28,7 @@ const [ import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), - import('../../../lib/distributedLock.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -66,10 +66,10 @@ try { try { await releaseLock({ name: LOCK_NAME, holder: lockHolder }); } catch (releaseErr) { - logger.error( - { err: releaseErr, cron: LOCK_NAME }, - '[cron.weeklyReset] failed to release lock — will auto-expire on TTL', - ); + logger.error('[cron.weeklyReset] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, + }); } } } diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js index 92a20359b..5e3f0c8e6 100644 --- a/modules/billing/tests/billing.cron-lock.integration.tests.js +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -4,10 +4,10 @@ import { describe, beforeAll, beforeEach, afterAll, test, expect } from '@jest/globals'; import mongooseService from '../../../lib/services/mongoose.js'; -import { acquireLock, releaseLock, CronLock } from '../../../lib/distributedLock.js'; +import { acquireLock, releaseLock, CronLock } from '../../../lib/services/distributedLock.js'; /** - * Integration tests for lib/distributedLock.js — real MongoDB. + * Integration tests for lib/services/distributedLock.js — real MongoDB. * * Verifies the acquire / release / contention / expiry contract end-to-end. * The cron scripts themselves are top-level-await CLI entry points that cannot From b232b68549e5055da10d47abf0188eb51429aeda Mon Sep 17 00:00:00 2001 From: Pierre Brisorgueil Date: Fri, 22 May 2026 09:30:01 +0200 Subject: [PATCH 8/8] fix(billing): add loadModels to dunningSweep+extrasExpiration crons + clarify lock test name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves Copilot MissingSchemaError finding: dunningSweep and extrasExpiration called mongooseService.connect() without loadModels(), risking MissingSchemaError on repository top-level mongoose.model() calls. Matches weeklyReset/reconcile pattern. Also renames test 'ttlMs < 0' → 'existing lock has expired (lockedUntil < now)' to accurately describe the scenario being exercised. --- modules/billing/crons/billing.dunningSweep.js | 1 + modules/billing/crons/billing.extrasExpiration.js | 1 + modules/billing/tests/billing.cron-lock.integration.tests.js | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index bbf4986ee..6ba06436c 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -51,6 +51,7 @@ logger.info('[cron.dunningSweep] start'); let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); + await mongooseService.loadModels(); await mongooseService.connect(); lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index 68a197465..1dd565dd6 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -46,6 +46,7 @@ logger.info('[cron.extrasExpiration] start'); let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); + await mongooseService.loadModels(); await mongooseService.connect(); lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js index 5e3f0c8e6..6593a6c7c 100644 --- a/modules/billing/tests/billing.cron-lock.integration.tests.js +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -43,7 +43,7 @@ describe('distributedLock integration — acquire / release contract:', () => { expect(ok).toBe(false); }); - test('allows acquire when previous lock has expired (ttlMs < 0)', async () => { + test('allows acquire when the existing lock has expired (lockedUntil < now)', async () => { // Seed an already-expired lock: lockedUntil in the past await CronLock.create({ _id: 'billing.extrasExpiration',