From b33bd2c02409b4367932e72991d217389cab6980 Mon Sep 17 00:00:00 2001 From: Mykhailo Chalyi Date: Thu, 11 Jun 2026 20:28:07 -0500 Subject: [PATCH] fix(js): bound async execute queue --- crates/bashkit-js/README.md | 6 ++ .../runtime-compat/streaming-output.test.mjs | 28 +++++ crates/bashkit-js/src/lib.rs | 53 +++++++++- crates/bashkit-js/wrapper.ts | 100 +++++++++++++++--- 4 files changed, 172 insertions(+), 15 deletions(-) diff --git a/crates/bashkit-js/README.md b/crates/bashkit-js/README.md index 320c0ec07..5ae2b715f 100644 --- a/crates/bashkit-js/README.md +++ b/crates/bashkit-js/README.md @@ -244,6 +244,12 @@ Use `reset()` only when you want to discard state entirely. For synchronous execution, `executeSync(...)` and `executeSyncOrThrow(...)` also accept `{ signal }`. +Async `execute(...)` calls are serialized per instance and keep a bounded +backlog. If too many executions are already pending on one `Bash`, `BashTool`, +or `ScriptedTool`, the next async call rejects instead of retaining another +queued command string. Commands larger than `maxInputBytes` are rejected before +entering the async queue. + ## BashTool `BashTool` wraps the interpreter with tool-contract metadata for agent frameworks: diff --git a/crates/bashkit-js/__test__/runtime-compat/streaming-output.test.mjs b/crates/bashkit-js/__test__/runtime-compat/streaming-output.test.mjs index a931f8d98..bed495a8d 100644 --- a/crates/bashkit-js/__test__/runtime-compat/streaming-output.test.mjs +++ b/crates/bashkit-js/__test__/runtime-compat/streaming-output.test.mjs @@ -115,6 +115,34 @@ for (const [label, create] of [ assert.equal(result.stdout, ""); }); + it("rejects async execute when the per-instance queue is full", async () => { + const shell = create(); + const blocker = shell.execute("sleep 0.05"); + const accepted = Array.from({ length: 7 }, (_, i) => + shell.execute(`echo queued-${i}`), + ); + + await assert.rejects( + () => shell.execute("echo should-not-queue"), + /too many pending async execute calls/, + ); + + assert.equal((await blocker).exitCode, 0); + for (const pending of accepted) { + assert.equal((await pending).exitCode, 0); + } + }); + + it("validates maxInputBytes before queueing async execute", async () => { + const shell = create({ maxInputBytes: 4 }); + + const result = await shell.execute("echo too-large"); + + assert.equal(result.exitCode, 1); + assert.match(result.error, /input too large/); + assert.equal(result.stdout, ""); + }); + it("async rejects Promise-returning onOutput", async () => { const shell = create(); diff --git a/crates/bashkit-js/src/lib.rs b/crates/bashkit-js/src/lib.rs index af731a7cc..b64a11305 100644 --- a/crates/bashkit-js/src/lib.rs +++ b/crates/bashkit-js/src/lib.rs @@ -35,7 +35,7 @@ use std::pin::Pin; use std::ptr; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex as StdMutex}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore}; // --------------------------------------------------------------------------- // Shared tokio runtime + concurrency limiter for JS tool callbacks (issue #982). @@ -45,6 +45,12 @@ use tokio::sync::Mutex; // maximum number of concurrent in-flight callbacks to prevent DoS. // --------------------------------------------------------------------------- const MAX_CONCURRENT_TOOL_CALLBACKS: usize = 10; +// Decision: JS async execute() is per-instance serialized by RustBash, so only +// a small bounded backlog is useful. Bound it before awaiting the mutex so +// pending futures cannot retain unbounded command strings. +const MAX_PENDING_ASYNC_EXECUTIONS: usize = 8; +const ASYNC_EXECUTE_QUEUE_FULL_ERROR: &str = + "too many pending async execute calls for this instance"; fn callback_runtime() -> &'static tokio::runtime::Runtime { use std::sync::OnceLock; @@ -1099,6 +1105,7 @@ struct SharedState { /// function check this and fail closed instead of deadlocking on a /// callback the loop can never service. See [`SYNC_BUILTIN_DEADLOCK_ERROR`]. in_sync_execute_depth: Arc, + async_execute_semaphore: Arc, username: Option, hostname: Option, max_commands: Option, @@ -1155,6 +1162,32 @@ where rt_guard.block_on(f(s2)) } +fn max_input_bytes_for_state(state: &SharedState) -> usize { + state.max_input_bytes.map_or_else( + || ExecutionLimits::default().max_input_bytes, + |v| v as usize, + ) +} + +fn validate_async_execute_input(state: &SharedState, commands: &str) -> Option { + let max_input_bytes = max_input_bytes_for_state(state); + let input_len = commands.len(); + if input_len > max_input_bytes { + return Some(js_exec_result_from_error(format!( + "input too large: {input_len} bytes exceeds maxInputBytes {max_input_bytes}" + ))); + } + None +} + +fn acquire_async_execute_slot(state: &Arc) -> napi::Result { + state + .async_execute_semaphore + .clone() + .try_acquire_owned() + .map_err(|_| napi::Error::from_reason(ASYNC_EXECUTE_QUEUE_FULL_ERROR)) +} + fn canonicalize_path(path: &str, label: &str) -> napi::Result { std::fs::canonicalize(path) .map_err(|e| napi::Error::from_reason(format!("Invalid {label} '{path}': {e}"))) @@ -1268,7 +1301,11 @@ impl Bash { #[napi] pub async fn execute(&self, commands: String) -> napi::Result { reject_on_output_reentry(&self.state)?; + if let Some(result) = validate_async_execute_input(&self.state, &commands) { + return Ok(result); + } let s = self.state.clone(); + let _slot = acquire_async_execute_slot(&s)?; let mut bash = s.inner.lock().await; execute_rust_bash(&mut bash, &commands, None, None, None, None).await } @@ -1290,6 +1327,10 @@ impl Bash { raw_env, async move { reject_on_output_reentry(&state)?; + if let Some(result) = validate_async_execute_input(&state, &commands) { + return Ok(result); + } + let _slot = acquire_async_execute_slot(&state)?; let mut bash = state.inner.lock().await; let cancelled = bash.cancellation_token(); let callback_requested_cancel = Arc::new(AtomicBool::new(false)); @@ -1838,7 +1879,11 @@ impl BashTool { #[napi] pub async fn execute(&self, commands: String) -> napi::Result { reject_on_output_reentry(&self.state)?; + if let Some(result) = validate_async_execute_input(&self.state, &commands) { + return Ok(result); + } let s = self.state.clone(); + let _slot = acquire_async_execute_slot(&s)?; let mut bash = s.inner.lock().await; execute_rust_bash(&mut bash, &commands, None, None, None, None).await } @@ -1860,6 +1905,10 @@ impl BashTool { raw_env, async move { reject_on_output_reentry(&state)?; + if let Some(result) = validate_async_execute_input(&state, &commands) { + return Ok(result); + } + let _slot = acquire_async_execute_slot(&state)?; let mut bash = state.inner.lock().await; let cancelled = bash.cancellation_token(); let callback_requested_cancel = Arc::new(AtomicBool::new(false)); @@ -2865,6 +2914,7 @@ fn shared_state_from_opts( cancelled: std::sync::Mutex::new(Arc::new(AtomicBool::new(false))), on_output_reentry_depth: Arc::new(AtomicUsize::new(0)), in_sync_execute_depth: Arc::new(AtomicUsize::new(0)), + async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)), username: opts.username.clone(), hostname: opts.hostname.clone(), max_commands: opts.max_commands, @@ -2913,6 +2963,7 @@ fn shared_state_from_opts( cancelled: std::sync::Mutex::new(cancelled), on_output_reentry_depth: tmp.on_output_reentry_depth, in_sync_execute_depth: tmp.in_sync_execute_depth, + async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)), username: opts.username, hostname: opts.hostname, max_commands: opts.max_commands, diff --git a/crates/bashkit-js/wrapper.ts b/crates/bashkit-js/wrapper.ts index cba9d019c..7f6f3ac1c 100644 --- a/crates/bashkit-js/wrapper.ts +++ b/crates/bashkit-js/wrapper.ts @@ -136,6 +136,13 @@ export interface BashOptions { maxCommands?: number; maxLoopIterations?: number; maxTotalLoopIterations?: number; + /** + * Maximum script input size in UTF-8 bytes. + * + * Async execute validates this before entering the per-instance queue so + * oversized calls cannot wait while retaining large command strings. + */ + maxInputBytes?: number; /** * Maximum interpreter memory in bytes (variables, arrays, functions). * @@ -289,7 +296,15 @@ export interface ExecuteOptions { type NativeOnOutput = (chunkPair: [string, string]) => string | undefined; const ASYNC_ON_OUTPUT_ERROR = "onOutput must be synchronous and must not return a Promise"; -const asyncExecuteQueues = new WeakMap>(); +const DEFAULT_MAX_INPUT_BYTES = 10_000_000; +const MAX_PENDING_ASYNC_EXECUTIONS = 8; +const ASYNC_EXECUTE_QUEUE_FULL_ERROR = + "too many pending async execute calls for this instance"; +interface AsyncExecuteQueueState { + tail: Promise; + pending: number; +} +const asyncExecuteQueues = new WeakMap(); function isAsyncFunction(fn: Function): boolean { return Object.prototype.toString.call(fn) === "[object AsyncFunction]"; @@ -303,12 +318,12 @@ function isPromiseLike(value: unknown): value is PromiseLike { ); } -function cancelledExecResult(): ExecResult { +function errorExecResult(error: string): ExecResult { return { stdout: "", - stderr: "", + stderr: error, exitCode: 1, - error: "execution cancelled", + error, stdoutTruncated: false, stderrTruncated: false, finalEnv: undefined, @@ -316,24 +331,54 @@ function cancelledExecResult(): ExecResult { }; } +function cancelledExecResult(): ExecResult { + return errorExecResult("execution cancelled"); +} + +function inputTooLargeExecResult( + commands: string, + maxInputBytes: number, +): ExecResult | undefined { + const inputBytes = Buffer.byteLength(commands, "utf8"); + if (inputBytes <= maxInputBytes) { + return undefined; + } + return errorExecResult( + `input too large: ${inputBytes} bytes exceeds maxInputBytes ${maxInputBytes}`, + ); +} + // Decision: serialize async execute() per instance in JS so queued AbortSignal -// listeners only attach once a call reaches the front of the line. +// listeners only attach once a call reaches the front of the line. Also bound +// the backlog before retaining large command strings in queued closures. function queueAsyncExecute( owner: object, run: () => Promise, ): Promise { - const previous = asyncExecuteQueues.get(owner) ?? Promise.resolve(); + let state = asyncExecuteQueues.get(owner); + if (!state) { + state = { tail: Promise.resolve(), pending: 0 }; + asyncExecuteQueues.set(owner, state); + } + if (state.pending >= MAX_PENDING_ASYNC_EXECUTIONS) { + return Promise.reject(new Error(ASYNC_EXECUTE_QUEUE_FULL_ERROR)); + } + state.pending += 1; + const previous = state.tail; const completion = previous.then( () => run(), () => run(), ); - asyncExecuteQueues.set( - owner, - completion.then( - () => undefined, - () => undefined, - ), + state.tail = completion.then( + () => undefined, + () => undefined, ); + state.tail.finally(() => { + state.pending -= 1; + if (state.pending === 0 && asyncExecuteQueues.get(owner) === state) { + asyncExecuteQueues.delete(owner); + } + }); return completion; } @@ -690,10 +735,12 @@ function registerCustomBuiltins( */ export class Bash { private native: NativeBashType; + private maxInputBytes: number; constructor(options?: BashOptions) { const resolved = resolveFilesSync(options?.files); this.native = new NativeBash(toNativeOptions(options, resolved)); + this.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES; registerCustomBuiltins(this.native, options?.customBuiltins); } @@ -715,6 +762,7 @@ export class Bash { const resolved = await resolveFiles(options?.files); const instance = Object.create(Bash.prototype) as Bash; instance.native = new NativeBash(toNativeOptions(options, resolved)); + instance.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES; registerCustomBuiltins(instance.native, options?.customBuiltins); return instance; } @@ -793,6 +841,13 @@ export class Bash { if (signal?.aborted) { return cancelledExecResult(); } + const inputLimitResult = inputTooLargeExecResult( + commands, + this.maxInputBytes, + ); + if (inputLimitResult) { + return inputLimitResult; + } return queueAsyncExecute(this, async () => { if (signal?.aborted) { return cancelledExecResult(); @@ -1108,10 +1163,12 @@ export class Bash { */ export class BashTool { private native: NativeBashToolType; + private maxInputBytes: number; constructor(options?: BashOptions) { const resolved = resolveFilesSync(options?.files); this.native = new NativeBashTool(toNativeOptions(options, resolved)); + this.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES; registerCustomBuiltins(this.native, options?.customBuiltins); } @@ -1122,6 +1179,7 @@ export class BashTool { const resolved = await resolveFiles(options?.files); const instance = Object.create(BashTool.prototype) as BashTool; instance.native = new NativeBashTool(toNativeOptions(options, resolved)); + instance.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES; registerCustomBuiltins(instance.native, options?.customBuiltins); return instance; } @@ -1180,6 +1238,13 @@ export class BashTool { if (signal?.aborted) { return cancelledExecResult(); } + const inputLimitResult = inputTooLargeExecResult( + commands, + this.maxInputBytes, + ); + if (inputLimitResult) { + return inputLimitResult; + } return queueAsyncExecute(this, async () => { if (signal?.aborted) { return cancelledExecResult(); @@ -1642,7 +1707,14 @@ export class ScriptedTool { * tool callbacks require the Node.js event loop to be running. */ async execute(commands: string): Promise { - return this.native.execute(commands); + const inputLimitResult = inputTooLargeExecResult( + commands, + DEFAULT_MAX_INPUT_BYTES, + ); + if (inputLimitResult) { + return inputLimitResult; + } + return queueAsyncExecute(this, () => this.native.execute(commands)); } /** @@ -1662,7 +1734,7 @@ export class ScriptedTool { * Execute asynchronously. Throws `BashError` on non-zero exit. */ async executeOrThrow(commands: string): Promise { - const result = await this.native.execute(commands); + const result = await this.execute(commands); if (result.exitCode !== 0) { throw new BashError(result); }