diff --git a/lib/services/distributedLock.js b/lib/services/distributedLock.js new file mode 100644 index 000000000..ba929d854 --- /dev/null +++ b/lib/services/distributedLock.js @@ -0,0 +1,86 @@ +/** + * Distributed lock primitive for multi-pod cron jobs. + * + * Uses a MongoDB TTL collection (`cron_locks`) to ensure mutual exclusion + * across replicas. A lock document expires automatically after `ttlMs` + * milliseconds — pod crashes therefore never permanently block scheduling. + * + * Usage: + * const holder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}` + * const acquired = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 10 * 60 * 1000, holder }) + * if (!acquired) return // another pod holds the lock + * try { + * // ... work + * } finally { + * await releaseLock({ name: 'billing.weeklyReset', holder }) + * } + */ + +import mongoose from 'mongoose'; + +const LockSchema = new mongoose.Schema( + { + _id: { type: String, required: true }, + lockedAt: { type: Date, required: true }, + lockedUntil: { type: Date, required: true }, + holder: { type: String, required: true }, + }, + { collection: 'cron_locks', versionKey: false }, +); + +// MongoDB TTL index — auto-deletes expired docs so stale locks don't accumulate. +LockSchema.index({ lockedUntil: 1 }, { expireAfterSeconds: 0 }); + +export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', LockSchema); + +/** + * @function acquireLock + * @description Attempt to acquire a named lock. Returns true if acquired, + * false if the lock is currently held by another holder. + * + * Implementation: findOneAndUpdate with upsert on the condition that either + * no doc exists (_id absent) or the existing doc has expired (lockedUntil < now). + * Duplicate-key errors (E11000) from the unique _id index are caught and + * returned as false (another pod raced to acquire simultaneously). + * + * @param {object} opts + * @param {string} opts.name - Unique lock name (e.g. 'billing.weeklyReset') + * @param {number} opts.ttlMs - Lock duration in milliseconds + * @param {string} opts.holder - Unique identifier for the calling pod/process + * @returns {Promise} + */ +export async function acquireLock({ name, ttlMs, holder }) { + if (!Number.isFinite(ttlMs) || ttlMs <= 0) { + throw new Error(`acquireLock: ttlMs must be a positive number, received ${ttlMs}`); + } + const now = new Date(); + const lockedUntil = new Date(now.getTime() + ttlMs); + try { + const result = await CronLock.findOneAndUpdate( + { _id: name, lockedUntil: { $lt: now } }, + { $set: { lockedAt: now, lockedUntil, holder } }, + { upsert: true, returnDocument: 'after' }, + ); + return result?.holder === holder; + } catch (err) { + if (err.code === 11000) return false; + throw err; + } +} + +/** + * @function releaseLock + * @description Release a lock only if the caller is the current holder. + * No-op if the lock is held by a different holder (prevents accidental release + * after a TTL expiry + re-acquire by another pod). + * + * @param {object} opts + * @param {string} opts.name - Lock name to release + * @param {string} opts.holder - Must match the holder that acquired the lock + * @returns {Promise} + */ +export async function releaseLock({ name, holder }) { + await CronLock.deleteOne({ _id: name, holder }); +} + +export default { CronLock, acquireLock, releaseLock }; diff --git a/lib/services/tests/distributedLock.unit.tests.js b/lib/services/tests/distributedLock.unit.tests.js new file mode 100644 index 000000000..2d7766e0f --- /dev/null +++ b/lib/services/tests/distributedLock.unit.tests.js @@ -0,0 +1,160 @@ +/** + * Module dependencies. + */ +import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals'; + +/** + * Unit tests for lib/services/distributedLock.js + * + * All Mongoose interactions are mocked — no real DB connection required. + * Tests verify the acquire / release contract and contention handling. + */ +describe('distributedLock — acquireLock:', () => { + let acquireLock; + let mockFindOneAndUpdate; + let mockDeleteOne; + + beforeEach(async () => { + jest.resetModules(); + + mockFindOneAndUpdate = jest.fn(); + mockDeleteOne = jest.fn(); + + const mockCronLock = { + findOneAndUpdate: mockFindOneAndUpdate, + deleteOne: mockDeleteOne, + }; + + jest.unstable_mockModule('mongoose', () => ({ + default: { + Schema: class MockSchema { + constructor() {} + index() {} + }, + models: {}, + model: jest.fn(() => mockCronLock), + }, + })); + + ({ acquireLock } = await import('../distributedLock.js')); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('returns true when findOneAndUpdate resolves with matching holder', async () => { + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + const ok = await acquireLock({ name: 'job-a', ttlMs: 60_000, holder: 'pod-1' }); + + expect(ok).toBe(true); + expect(mockFindOneAndUpdate).toHaveBeenCalledTimes(1); + const [filter, update, opts] = mockFindOneAndUpdate.mock.calls[0]; + expect(filter._id).toBe('job-a'); + expect(filter.lockedUntil.$lt).toBeInstanceOf(Date); + expect(update.$set.holder).toBe('pod-1'); + expect(opts.upsert).toBe(true); + }); + + test('returns false when findOneAndUpdate returns doc held by different holder', async () => { + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + const ok = await acquireLock({ name: 'job-b', ttlMs: 60_000, holder: 'pod-2' }); + + expect(ok).toBe(false); + }); + + test('returns false on E11000 duplicate-key (concurrent upsert race)', async () => { + const dupErr = new Error('E11000 duplicate key'); + dupErr.code = 11000; + mockFindOneAndUpdate.mockRejectedValue(dupErr); + + const ok = await acquireLock({ name: 'job-c', ttlMs: 60_000, holder: 'pod-1' }); + + expect(ok).toBe(false); + }); + + test('re-throws non-duplicate errors', async () => { + const dbErr = new Error('network timeout'); + dbErr.code = 13; + mockFindOneAndUpdate.mockRejectedValue(dbErr); + + await expect(acquireLock({ name: 'job-d', ttlMs: 60_000, holder: 'pod-1' })).rejects.toThrow('network timeout'); + }); + + test.each([ + [0, 'zero'], + [-1, 'negative'], + [Number.NaN, 'NaN'], + [Infinity, 'Infinity'], + [undefined, 'undefined'], + [null, 'null'], + ])('throws when ttlMs is %s (%s)', async (ttlMs) => { + await expect(acquireLock({ name: 'job-guard', ttlMs, holder: 'pod-1' })).rejects.toThrow( + 'acquireLock: ttlMs must be a positive number', + ); + expect(mockFindOneAndUpdate).not.toHaveBeenCalled(); + }); + + test('lockedUntil is set to now + ttlMs', async () => { + const before = Date.now(); + mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' }); + + await acquireLock({ name: 'job-e', ttlMs: 10_000, holder: 'pod-1' }); + + const after = Date.now(); + const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set; + expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 10_000); + expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + 10_000); + }); +}); + +describe('distributedLock — releaseLock:', () => { + let releaseLock; + let mockDeleteOne; + + beforeEach(async () => { + jest.resetModules(); + + mockDeleteOne = jest.fn().mockResolvedValue({}); + + const mockCronLock = { + findOneAndUpdate: jest.fn(), + deleteOne: mockDeleteOne, + }; + + jest.unstable_mockModule('mongoose', () => ({ + default: { + Schema: class MockSchema { + constructor() {} + index() {} + }, + models: {}, + model: jest.fn(() => mockCronLock), + }, + })); + + ({ releaseLock } = await import('../distributedLock.js')); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + test('calls deleteOne with name and holder', async () => { + await releaseLock({ name: 'job-a', holder: 'pod-1' }); + + expect(mockDeleteOne).toHaveBeenCalledWith({ _id: 'job-a', holder: 'pod-1' }); + }); + + test('does not throw when deleteOne resolves', async () => { + await expect(releaseLock({ name: 'job-b', holder: 'pod-2' })).resolves.toBeUndefined(); + }); + + test('propagates deleteOne errors to the caller', async () => { + const dbErr = new Error('network timeout'); + mockDeleteOne.mockRejectedValue(dbErr); + await expect(releaseLock({ name: 'job-c', holder: 'pod-1' })).rejects.toThrow('network timeout'); + }); +}); diff --git a/modules/billing/RUNBOOKS.md b/modules/billing/RUNBOOKS.md index 157ef0568..e932319c5 100644 --- a/modules/billing/RUNBOOKS.md +++ b/modules/billing/RUNBOOKS.md @@ -175,3 +175,46 @@ Operational runbooks for the billing module. Each runbook references real endpoi 4. Monitor `billing.plans.stale` event frequency — if the stale cache is 24h+, alert the on-call to decide whether to take the plans endpoint down entirely or serve a static fallback. 5. Once Stripe recovers: `POST /api/admin/billing/sync/:orgId` on any org that attempted a subscription change during the outage. 6. Check dead-letter queue for events that exhausted retries during the outage window: `GET /api/admin/billing/dead-letters`. + +--- + +## 6 — Cron lock stuck + +**Symptom:** All billing crons emit `lock held by another pod, skipping` for longer than the lock TTL duration, meaning no billing cron is running at all. + +**Cause:** A pod crashed mid-job without reaching the `finally` block that calls `releaseLock`. The TTL has not yet expired on the stale lock doc in `cron_locks`. + +**Wait first:** Lock TTLs are sized 2–3× typical exec time. Wait for the TTL to expire (max 15 min for `dunningSweep`). MongoDB's TTL monitor runs every 60 seconds, so actual cleanup may lag up to 60 s after expiry. + +**If urgent — drop the stale lock manually:** + +**Before drop:** verify the holder and TTL window first to avoid kicking a running cron. + +```js +db.cron_locks.findOne({ _id: "billing.weeklyReset" }) +// If lockedUntil is in the past → safe to drop. +// If in the future → the lock is genuinely held; wait for TTL unless the holder pod is confirmed dead. +``` + +Then drop: + +```js +// weeklyReset +db.cron_locks.deleteOne({ _id: "billing.weeklyReset" }) + +// dunningSweep +db.cron_locks.deleteOne({ _id: "billing.dunningSweep" }) + +// extrasExpiration +db.cron_locks.deleteOne({ _id: "billing.extrasExpiration" }) +``` + +Or via `kubectl exec` on the mongo pod: + +```bash +kubectl exec -n pierreb-projects mongo-0 -- mongosh \ + "mongodb://localhost:27017/" \ + --eval 'db.cron_locks.deleteOne({ _id: "billing.weeklyReset" })' +``` + +**Prevention:** Lock TTLs are intentionally conservative. If you see frequent stuck-lock incidents, investigate cron duration (slow query? tenant scale?) rather than lower the TTL — a TTL too short defeats the mutual-exclusion guarantee. diff --git a/modules/billing/crons/README.md b/modules/billing/crons/README.md index 23c675294..0e4153345 100644 --- a/modules/billing/crons/README.md +++ b/modules/billing/crons/README.md @@ -118,3 +118,22 @@ const orgs = allOrgs.filter(o => { ## Dependency: meterMode flag All scripts check `config.billing.meterMode` at startup. Downstream projects must set this flag to `true` in their project config to activate billing crons. The devkit default is `false` — all crons are no-ops until explicitly enabled. + +## Concurrency control + +All billing crons acquire a distributed lock (`lib/services/distributedLock.js`) before +mutating state. The lock auto-expires after TTL (5–15 min depending on cron) +so that pod crashes don't permanently block scheduling. + +Lock names and TTLs: + +| Lock name | TTL | Cron | +|-----------|-----|------| +| `billing.weeklyReset` | 10 min | `billing.weeklyReset.js` | +| `billing.dunningSweep` | 15 min | `billing.dunningSweep.js` | +| `billing.extrasExpiration` | 5 min | `billing.extrasExpiration.js` | + +If you see `lock held by another pod, skipping` in logs, that is expected when +two pods race after a K8s `concurrencyPolicy` bypass (e.g. pod crash after +jitter but before finalize). See the runbook entry `## 6 — Cron lock stuck` in +`modules/billing/RUNBOOKS.md` for manual resolution. diff --git a/modules/billing/crons/billing.dunningSweep.js b/modules/billing/crons/billing.dunningSweep.js index 6fb46f18a..6ba06436c 100644 --- a/modules/billing/crons/billing.dunningSweep.js +++ b/modules/billing/crons/billing.dunningSweep.js @@ -17,6 +17,8 @@ * NODE_ENV=production node modules/billing/crons/billing.dunningSweep.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -25,12 +27,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs, getDunningThresholdDays }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -38,63 +42,88 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.dunningSweep'; +const LOCK_TTL_MS = 15 * 60 * 1000; // 15 min + const startMs = Date.now(); logger.info('[cron.dunningSweep] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); + await mongooseService.loadModels(); await mongooseService.connect(); - const [{ default: BillingSubscriptionRepository }, { default: OrganizationRepository }] = await Promise.all([ - import('../repositories/billing.subscription.repository.js'), - import('../../organizations/repositories/organizations.repository.js'), - ]); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info('[cron.dunningSweep] lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const [{ default: BillingSubscriptionRepository }, { default: OrganizationRepository }] = await Promise.all([ + import('../repositories/billing.subscription.repository.js'), + import('../../organizations/repositories/organizations.repository.js'), + ]); - const now = new Date(); - const threshold = new Date(now.getTime() - getDunningThresholdDays() * 24 * 60 * 60 * 1000); + const now = new Date(); + const threshold = new Date(now.getTime() - getDunningThresholdDays() * 24 * 60 * 60 * 1000); - const staleSubs = await BillingSubscriptionRepository.findStaleDunning(threshold); - logger.info('[cron.dunningSweep] stale past_due subscriptions found', { count: staleSubs.length }); + const staleSubs = await BillingSubscriptionRepository.findStaleDunning(threshold); + logger.info('[cron.dunningSweep] stale past_due subscriptions found', { count: staleSubs.length }); - let processed = 0; - let errors = 0; - let desyncErrors = 0; + let processed = 0; + let errors = 0; + let desyncErrors = 0; - for (const sub of staleSubs) { - try { - const subscription = await BillingSubscriptionRepository.markUnpaid(String(sub._id), threshold); - if (!subscription) { - logger.info('[cron.dunningSweep] sub skipped — already recovered', { subId: String(sub._id) }); - continue; + for (const sub of staleSubs) { + try { + const subscription = await BillingSubscriptionRepository.markUnpaid(String(sub._id), threshold); + if (!subscription) { + logger.info('[cron.dunningSweep] sub skipped — already recovered', { subId: String(sub._id) }); + continue; + } + + try { + await OrganizationRepository.setPlan(String(sub.organization), 'free'); + } catch (orgErr) { + // Compensation: Subscription is now unpaid but Org.plan update failed. + // Log for manual reconciliation — do not revert Subscription status. + logger.error('[cron.dunningSweep] Org plan sync failed (manual reconciliation required)', { + subId: String(sub._id), + orgId: String(sub.organization), + err: orgErr?.message, + stack: orgErr?.stack, + }); + desyncErrors += 1; + } + + logger.info('[cron.dunningSweep] sub transitioned to unpaid', { + subId: String(sub._id), + orgId: String(sub.organization), + }); + processed += 1; + } catch (err) { + errors += 1; + logger.error('[cron.dunningSweep] failed for sub', { subId: String(sub._id), err: err?.message, stack: err?.stack }); + } } + logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); + process.exitCode = errors > 0 || desyncErrors > 0 ? 1 : 0; + } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // Log separately to preserve any original work error. try { - await OrganizationRepository.setPlan(String(sub.organization), 'free'); - } catch (orgErr) { - // Compensation: Subscription is now unpaid but Org.plan update failed. - // Log for manual reconciliation — do not revert Subscription status. - logger.error('[cron.dunningSweep] Org plan sync failed (manual reconciliation required)', { - subId: String(sub._id), - orgId: String(sub.organization), - err: orgErr?.message, - stack: orgErr?.stack, + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error('[cron.dunningSweep] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, }); - desyncErrors += 1; } - - logger.info('[cron.dunningSweep] sub transitioned to unpaid', { - subId: String(sub._id), - orgId: String(sub.organization), - }); - processed += 1; - } catch (err) { - errors += 1; - logger.error('[cron.dunningSweep] failed for sub', { subId: String(sub._id), err: err?.message, stack: err?.stack }); } } - - logger.info('[cron.dunningSweep] complete', { processed, errors, desyncErrors, durationMs: Date.now() - startMs }); - process.exitCode = errors > 0 ? 1 : 0; } catch (err) { logger.error('[cron.dunningSweep] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/crons/billing.extrasExpiration.js b/modules/billing/crons/billing.extrasExpiration.js index 2e9b2de04..1dd565dd6 100644 --- a/modules/billing/crons/billing.extrasExpiration.js +++ b/modules/billing/crons/billing.extrasExpiration.js @@ -12,6 +12,8 @@ * NODE_ENV=production node modules/billing/crons/billing.extrasExpiration.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -20,12 +22,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -33,38 +37,63 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.extrasExpiration'; +const LOCK_TTL_MS = 5 * 60 * 1000; // 5 min + const startMs = Date.now(); logger.info('[cron.extrasExpiration] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); + await mongooseService.loadModels(); await mongooseService.connect(); - const [{ default: BillingExtraService }, { default: BillingExtraBalanceRepository }] = - await Promise.all([ - import('../services/billing.extra.service.js'), - import('../repositories/billing.extraBalance.repository.js'), - ]); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info('[cron.extrasExpiration] lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const [{ default: BillingExtraService }, { default: BillingExtraBalanceRepository }] = + await Promise.all([ + import('../services/billing.extra.service.js'), + import('../repositories/billing.extraBalance.repository.js'), + ]); + + const now = new Date(); + const orgIds = await BillingExtraBalanceRepository.findOrgsWithExpiringTopups(now); - const now = new Date(); - const orgIds = await BillingExtraBalanceRepository.findOrgsWithExpiringTopups(now); + let processed = 0; + let errors = 0; - let processed = 0; - let errors = 0; + for (const orgId of orgIds) { + try { + const added = await BillingExtraService.expireOldEntries(orgId); + logger.info('[cron.extrasExpiration] org processed', { orgId: String(orgId), entriesAdded: added }); + processed += 1; + } catch (err) { + errors += 1; + logger.error('[cron.extrasExpiration] expireOldEntries failed', { orgId: String(orgId), err: err?.message, stack: err?.stack }); + } + } - for (const orgId of orgIds) { - try { - const added = await BillingExtraService.expireOldEntries(orgId); - logger.info('[cron.extrasExpiration] org processed', { orgId: String(orgId), entriesAdded: added }); - processed += 1; - } catch (err) { - errors += 1; - logger.error('[cron.extrasExpiration] expireOldEntries failed', { orgId: String(orgId), err: err?.message, stack: err?.stack }); + logger.info('[cron.extrasExpiration] complete', { processed, errors, durationMs: Date.now() - startMs }); + process.exitCode = errors > 0 ? 1 : 0; + } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // Log separately to preserve any original work error. + try { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error('[cron.extrasExpiration] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, + }); + } } } - - logger.info('[cron.extrasExpiration] complete', { processed, errors, durationMs: Date.now() - startMs }); - process.exitCode = errors > 0 ? 1 : 0; } catch (err) { logger.error('[cron.extrasExpiration] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/crons/billing.weeklyReset.js b/modules/billing/crons/billing.weeklyReset.js index ed77ebbe6..3c5ca7904 100644 --- a/modules/billing/crons/billing.weeklyReset.js +++ b/modules/billing/crons/billing.weeklyReset.js @@ -11,6 +11,8 @@ * NODE_ENV=production node modules/billing/crons/billing.weeklyReset.js */ +import { randomUUID } from 'node:crypto'; + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; const [ @@ -19,12 +21,14 @@ const [ { default: logger }, { applyJitter }, { getCronJitterMaxMs }, + { acquireLock, releaseLock }, ] = await Promise.all([ import('../../../config/index.js'), import('../../../lib/services/mongoose.js'), import('../../../lib/services/logger.js'), import('../lib/billing.cron-utils.js'), import('../lib/billing.constants.js'), + import('../../../lib/services/distributedLock.js'), ]); if (!config?.billing?.meterMode) { @@ -32,19 +36,43 @@ if (!config?.billing?.meterMode) { process.exit(0); } +const LOCK_NAME = 'billing.weeklyReset'; +const LOCK_TTL_MS = 10 * 60 * 1000; // 10 min + const startMs = Date.now(); logger.info('[cron.weeklyReset] start'); +let lockHolder = null; try { await applyJitter(getCronJitterMaxMs()); await mongooseService.loadModels(); await mongooseService.connect(); - const { default: BillingResetService } = await import('../services/billing.reset.service.js'); + lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`; + const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder }); + if (!acquired) { + logger.info('[cron.weeklyReset] lock held by another pod, skipping'); + process.exitCode = 0; + } else { + try { + const { default: BillingResetService } = await import('../services/billing.reset.service.js'); - const result = await BillingResetService.resetAllDue(); - logger.info('[cron.weeklyReset] complete', { processed: result.processed, errors: result.errors, durationMs: Date.now() - startMs }); - process.exitCode = result.errors > 0 ? 1 : 0; + const result = await BillingResetService.resetAllDue(); + logger.info('[cron.weeklyReset] complete', { processed: result.processed, errors: result.errors, durationMs: Date.now() - startMs }); + process.exitCode = result.errors > 0 ? 1 : 0; + } finally { + // releaseLock failure is non-fatal: lock auto-expires on TTL. + // Log separately to preserve any original work error. + try { + await releaseLock({ name: LOCK_NAME, holder: lockHolder }); + } catch (releaseErr) { + logger.error('[cron.weeklyReset] failed to release lock — will auto-expire on TTL', { + err: releaseErr, + cron: LOCK_NAME, + }); + } + } + } } catch (err) { logger.error('[cron.weeklyReset] failed', { err: err?.message, stack: err?.stack }); process.exitCode = 1; diff --git a/modules/billing/tests/billing.cron-lock.integration.tests.js b/modules/billing/tests/billing.cron-lock.integration.tests.js new file mode 100644 index 000000000..6593a6c7c --- /dev/null +++ b/modules/billing/tests/billing.cron-lock.integration.tests.js @@ -0,0 +1,101 @@ +/** + * Module dependencies. + */ +import { describe, beforeAll, beforeEach, afterAll, test, expect } from '@jest/globals'; + +import mongooseService from '../../../lib/services/mongoose.js'; +import { acquireLock, releaseLock, CronLock } from '../../../lib/services/distributedLock.js'; + +/** + * Integration tests for lib/services/distributedLock.js — real MongoDB. + * + * Verifies the acquire / release / contention / expiry contract end-to-end. + * The cron scripts themselves are top-level-await CLI entry points that cannot + * be imported in tests; the lock primitive they delegate to is tested here. + */ +describe('distributedLock integration — acquire / release contract:', () => { + beforeAll(async () => { + await mongooseService.loadModels(); + await mongooseService.connect(); + }); + + beforeEach(async () => { + await CronLock.deleteMany({}); + }); + + afterAll(async () => { + await CronLock.deleteMany({}); + await mongooseService.disconnect(); + }); + + test('acquires lock when collection is empty', async () => { + const ok = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 60_000, holder: 'pod-1' }); + expect(ok).toBe(true); + + const doc = await CronLock.findById('billing.weeklyReset'); + expect(doc).not.toBeNull(); + expect(doc.holder).toBe('pod-1'); + }); + + test('rejects acquire when lock is held by another holder', async () => { + await acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-1' }); + const ok = await acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-2' }); + expect(ok).toBe(false); + }); + + test('allows acquire when the existing lock has expired (lockedUntil < now)', async () => { + // Seed an already-expired lock: lockedUntil in the past + await CronLock.create({ + _id: 'billing.extrasExpiration', + lockedAt: new Date(Date.now() - 10_000), + lockedUntil: new Date(Date.now() - 5_000), + holder: 'old-pod', + }); + + const ok = await acquireLock({ name: 'billing.extrasExpiration', ttlMs: 60_000, holder: 'pod-2' }); + expect(ok).toBe(true); + }); + + test('releaseLock removes doc only when holder matches', async () => { + await acquireLock({ name: 'billing.weeklyReset', ttlMs: 60_000, holder: 'pod-1' }); + + // Wrong holder — lock should remain + await releaseLock({ name: 'billing.weeklyReset', holder: 'pod-2' }); + const stillHeld = await CronLock.findById('billing.weeklyReset'); + expect(stillHeld).not.toBeNull(); + expect(stillHeld.holder).toBe('pod-1'); + + // Correct holder — lock released + await releaseLock({ name: 'billing.weeklyReset', holder: 'pod-1' }); + const gone = await CronLock.findById('billing.weeklyReset'); + expect(gone).toBeNull(); + }); + + test('only one of N concurrent in-process acquires wins via E11000', async () => { + // Node event loop: 3 Promise.all fires hit Mongo concurrently (not serialized by await). + // Real inter-pod race uses the same E11000 path; this exercises the same mechanism. + const results = await Promise.all([ + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-A' }), + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-B' }), + acquireLock({ name: 'billing.dunningSweep', ttlMs: 60_000, holder: 'pod-C' }), + ]); + + const wins = results.filter(Boolean); + expect(wins).toHaveLength(1); + + const doc = await CronLock.findById('billing.dunningSweep'); + expect(doc).not.toBeNull(); + // The winning holder is one of the three + expect(['pod-A', 'pod-B', 'pod-C']).toContain(doc.holder); + }); + + test('acquire sets correct TTL window', async () => { + const before = Date.now(); + await acquireLock({ name: 'billing.extrasExpiration', ttlMs: 5 * 60 * 1000, holder: 'pod-1' }); + const after = Date.now(); + + const doc = await CronLock.findById('billing.extrasExpiration'); + expect(doc.lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 5 * 60 * 1000); + expect(doc.lockedUntil.getTime()).toBeLessThanOrEqual(after + 5 * 60 * 1000); + }); +});