diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index f4c751b7b72..7b329a2fd12 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -235,6 +235,7 @@ export const env = createEnv({ IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms) IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms) + IVM_MAX_EXECUTIONS_PER_WORKER: z.string().optional().default('500'), // Max lifetime executions before worker is recycled // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) diff --git a/apps/sim/lib/execution/isolated-vm-worker.cjs b/apps/sim/lib/execution/isolated-vm-worker.cjs index 2641b80e11d..a6f25053267 100644 --- a/apps/sim/lib/execution/isolated-vm-worker.cjs +++ b/apps/sim/lib/execution/isolated-vm-worker.cjs @@ -142,27 +142,40 @@ async function executeCode(request) { stdoutTruncated = true } + let context = null + let bootstrapScript = null + let userScript = null + let logCallback = null + let errorCallback = null + let fetchCallback = null + const externalCopies = [] + try { isolate = new ivm.Isolate({ memoryLimit: 128 }) - const context = await isolate.createContext() + context = await isolate.createContext() const jail = context.global await jail.set('global', jail.derefInto()) - const logCallback = new ivm.Callback((...args) => { + logCallback = new ivm.Callback((...args) => { const message = args.map((arg) => stringifyLogValue(arg)).join(' ') appendStdout(`${message}\n`) }) await jail.set('__log', logCallback) - const errorCallback = new ivm.Callback((...args) => { + errorCallback = new ivm.Callback((...args) => { const message = args.map((arg) => stringifyLogValue(arg)).join(' ') appendStdout(`ERROR: ${message}\n`) }) await jail.set('__error', errorCallback) - await jail.set('params', new ivm.ExternalCopy(params).copyInto()) - await jail.set('environmentVariables', new ivm.ExternalCopy(envVars).copyInto()) + const paramsCopy = new ivm.ExternalCopy(params) + externalCopies.push(paramsCopy) + await jail.set('params', paramsCopy.copyInto()) + + const envVarsCopy = new ivm.ExternalCopy(envVars) + externalCopies.push(envVarsCopy) + await jail.set('environmentVariables', envVarsCopy.copyInto()) for (const [key, value] of Object.entries(contextVariables)) { if (value === undefined) { @@ -170,11 +183,13 @@ async function executeCode(request) { } else if (value === null) { await jail.set(key, null) } else { - await jail.set(key, new ivm.ExternalCopy(value).copyInto()) + const ctxCopy = new ivm.ExternalCopy(value) + externalCopies.push(ctxCopy) + await jail.set(key, ctxCopy.copyInto()) } } - const fetchCallback = new ivm.Reference(async (url, optionsJson) => { + fetchCallback = new ivm.Reference(async (url, optionsJson) => { return new Promise((resolve) => { const fetchId = ++fetchIdCounter const timeout = setTimeout(() => { @@ -267,7 +282,7 @@ async function executeCode(request) { } ` - const bootstrapScript = await isolate.compileScript(bootstrap) + bootstrapScript = await isolate.compileScript(bootstrap) await bootstrapScript.run(context) const wrappedCode = ` @@ -290,7 +305,7 @@ async function executeCode(request) { })() ` - const userScript = await isolate.compileScript(wrappedCode, { filename: 'user-function.js' }) + userScript = await isolate.compileScript(wrappedCode, { filename: 'user-function.js' }) const resultJson = await userScript.run(context, { timeout: timeoutMs, promise: true }) let result = null @@ -357,8 +372,26 @@ async function executeCode(request) { }, } } finally { + const releaseables = [ + userScript, + bootstrapScript, + ...externalCopies, + fetchCallback, + errorCallback, + logCallback, + context, + ] + for (const obj of releaseables) { + if (obj) { + try { + obj.release() + } catch {} + } + } if (isolate) { - isolate.dispose() + try { + isolate.dispose() + } catch {} } } } diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 877035760e5..3ec9b30c938 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -70,6 +70,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER = Number.parseInt(env.IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER) || MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000 +const MAX_EXECUTIONS_PER_WORKER = Number.parseInt(env.IVM_MAX_EXECUTIONS_PER_WORKER) || 500 const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner' const LEASE_REDIS_DEADLINE_MS = 200 const QUEUE_RETRY_DELAY_MS = 1000 @@ -89,6 +90,8 @@ interface WorkerInfo { pendingExecutions: Map idleTimeout: ReturnType | null id: number + lifetimeExecutions: number + retiring: boolean } interface QueuedExecution { @@ -538,8 +541,20 @@ function handleWorkerMessage(workerId: number, message: unknown) { owner.activeExecutions = Math.max(0, owner.activeExecutions - 1) maybeCleanupOwner(owner.ownerKey) } + workerInfo!.lifetimeExecutions++ + if (workerInfo!.lifetimeExecutions >= MAX_EXECUTIONS_PER_WORKER && !workerInfo!.retiring) { + workerInfo!.retiring = true + logger.info('Worker marked for retirement', { + workerId, + lifetimeExecutions: workerInfo!.lifetimeExecutions, + }) + } + if (workerInfo!.retiring && workerInfo!.activeExecutions === 0) { + cleanupWorker(workerId) + } else { + resetWorkerIdleTimeout(workerId) + } pending.resolve(msg.result as IsolatedVMExecutionResult) - resetWorkerIdleTimeout(workerId) drainQueue() } return @@ -679,6 +694,8 @@ function spawnWorker(): Promise { pendingExecutions: new Map(), idleTimeout: null, id: workerId, + lifetimeExecutions: 0, + retiring: false, } workerInfo.readyPromise = new Promise((resolve, reject) => { @@ -710,7 +727,8 @@ function spawnWorker(): Promise { import('node:child_process') .then(({ spawn }) => { - const proc = spawn('node', [workerPath], { + // Required for isolated-vm on Node.js 20+ (issue #377) + const proc = spawn('node', ['--no-node-snapshot', workerPath], { stdio: ['ignore', 'pipe', 'pipe', 'ipc'], serialization: 'json', }) @@ -801,6 +819,7 @@ function selectWorker(): WorkerInfo | null { let best: WorkerInfo | null = null for (const w of workers.values()) { if (!w.ready) continue + if (w.retiring) continue if (w.activeExecutions >= MAX_PER_WORKER) continue if (!best || w.activeExecutions < best.activeExecutions) { best = w @@ -818,7 +837,8 @@ async function acquireWorker(): Promise { const existing = selectWorker() if (existing) return existing - const currentPoolSize = workers.size + spawnInProgress + const activeWorkerCount = [...workers.values()].filter((w) => !w.retiring).length + const currentPoolSize = activeWorkerCount + spawnInProgress if (currentPoolSize < POOL_SIZE) { try { return await spawnWorker() @@ -850,12 +870,24 @@ function dispatchToWorker( totalActiveExecutions-- ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1) maybeCleanupOwner(ownerState.ownerKey) + workerInfo.lifetimeExecutions++ + if (workerInfo.lifetimeExecutions >= MAX_EXECUTIONS_PER_WORKER && !workerInfo.retiring) { + workerInfo.retiring = true + logger.info('Worker marked for retirement', { + workerId: workerInfo.id, + lifetimeExecutions: workerInfo.lifetimeExecutions, + }) + } resolve({ result: null, stdout: '', error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' }, }) - resetWorkerIdleTimeout(workerInfo.id) + if (workerInfo.retiring && workerInfo.activeExecutions === 0) { + cleanupWorker(workerInfo.id) + } else { + resetWorkerIdleTimeout(workerInfo.id) + } drainQueue() }, req.timeoutMs + 1000) @@ -878,7 +910,11 @@ function dispatchToWorker( stdout: '', error: { message: 'Code execution failed to start. Please try again.', name: 'Error' }, }) - resetWorkerIdleTimeout(workerInfo.id) + if (workerInfo.retiring && workerInfo.activeExecutions === 0) { + cleanupWorker(workerInfo.id) + } else { + resetWorkerIdleTimeout(workerInfo.id) + } // Defer to break synchronous recursion: drainQueue → dispatchToWorker → catch → drainQueue queueMicrotask(() => drainQueue()) } @@ -952,7 +988,8 @@ function drainQueue() { while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) { const worker = selectWorker() if (!worker) { - const currentPoolSize = workers.size + spawnInProgress + const activeWorkerCount = [...workers.values()].filter((w) => !w.retiring).length + const currentPoolSize = activeWorkerCount + spawnInProgress if (currentPoolSize < POOL_SIZE) { spawnWorker() .then(() => drainQueue())