Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/bashkit-js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ Use `reset()` only when you want to discard state entirely.
For synchronous execution, `executeSync(...)` and `executeSyncOrThrow(...)`
also accept `{ signal }`.

Async `execute(...)` calls are serialized per instance and keep a bounded
backlog. If too many executions are already pending on one `Bash`, `BashTool`,
or `ScriptedTool`, the next async call rejects instead of retaining another
queued command string. Commands larger than `maxInputBytes` are rejected before
entering the async queue.

## BashTool

`BashTool` wraps the interpreter with tool-contract metadata for agent frameworks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ for (const [label, create] of [
assert.equal(result.stdout, "");
});

it("rejects async execute when the per-instance queue is full", async () => {
const shell = create();
const blocker = shell.execute("sleep 0.05");
const accepted = Array.from({ length: 7 }, (_, i) =>
shell.execute(`echo queued-${i}`),
);

await assert.rejects(
() => shell.execute("echo should-not-queue"),
/too many pending async execute calls/,
);

assert.equal((await blocker).exitCode, 0);
for (const pending of accepted) {
assert.equal((await pending).exitCode, 0);
}
});

it("validates maxInputBytes before queueing async execute", async () => {
const shell = create({ maxInputBytes: 4 });

const result = await shell.execute("echo too-large");

assert.equal(result.exitCode, 1);
assert.match(result.error, /input too large/);
assert.equal(result.stdout, "");
});

it("async rejects Promise-returning onOutput", async () => {
const shell = create();

Expand Down
53 changes: 52 additions & 1 deletion crates/bashkit-js/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};

// ---------------------------------------------------------------------------
// Shared tokio runtime + concurrency limiter for JS tool callbacks (issue #982).
Expand All @@ -45,6 +45,12 @@ use tokio::sync::Mutex;
// maximum number of concurrent in-flight callbacks to prevent DoS.
// ---------------------------------------------------------------------------
const MAX_CONCURRENT_TOOL_CALLBACKS: usize = 10;
// Decision: JS async execute() is per-instance serialized by RustBash, so only
// a small bounded backlog is useful. Bound it before awaiting the mutex so
// pending futures cannot retain unbounded command strings.
const MAX_PENDING_ASYNC_EXECUTIONS: usize = 8;
const ASYNC_EXECUTE_QUEUE_FULL_ERROR: &str =
"too many pending async execute calls for this instance";

fn callback_runtime() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
Expand Down Expand Up @@ -1099,6 +1105,7 @@ struct SharedState {
/// function check this and fail closed instead of deadlocking on a
/// callback the loop can never service. See [`SYNC_BUILTIN_DEADLOCK_ERROR`].
in_sync_execute_depth: Arc<AtomicUsize>,
async_execute_semaphore: Arc<Semaphore>,
username: Option<String>,
hostname: Option<String>,
max_commands: Option<u32>,
Expand Down Expand Up @@ -1155,6 +1162,32 @@ where
rt_guard.block_on(f(s2))
}

fn max_input_bytes_for_state(state: &SharedState) -> usize {
state.max_input_bytes.map_or_else(
|| ExecutionLimits::default().max_input_bytes,
|v| v as usize,
)
}
Comment on lines +1165 to +1170

fn validate_async_execute_input(state: &SharedState, commands: &str) -> Option<ExecResult> {
let max_input_bytes = max_input_bytes_for_state(state);
let input_len = commands.len();
if input_len > max_input_bytes {
return Some(js_exec_result_from_error(format!(
"input too large: {input_len} bytes exceeds maxInputBytes {max_input_bytes}"
)));
}
None
}

fn acquire_async_execute_slot(state: &Arc<SharedState>) -> napi::Result<OwnedSemaphorePermit> {
state
.async_execute_semaphore
.clone()
.try_acquire_owned()
.map_err(|_| napi::Error::from_reason(ASYNC_EXECUTE_QUEUE_FULL_ERROR))
}

fn canonicalize_path(path: &str, label: &str) -> napi::Result<PathBuf> {
std::fs::canonicalize(path)
.map_err(|e| napi::Error::from_reason(format!("Invalid {label} '{path}': {e}")))
Expand Down Expand Up @@ -1268,7 +1301,11 @@ impl Bash {
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
if let Some(result) = validate_async_execute_input(&self.state, &commands) {
return Ok(result);
}
let s = self.state.clone();
let _slot = acquire_async_execute_slot(&s)?;
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
}
Expand All @@ -1290,6 +1327,10 @@ impl Bash {
raw_env,
async move {
reject_on_output_reentry(&state)?;
if let Some(result) = validate_async_execute_input(&state, &commands) {
return Ok(result);
}
let _slot = acquire_async_execute_slot(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -1838,7 +1879,11 @@ impl BashTool {
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
if let Some(result) = validate_async_execute_input(&self.state, &commands) {
return Ok(result);
}
let s = self.state.clone();
let _slot = acquire_async_execute_slot(&s)?;
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
}
Expand All @@ -1860,6 +1905,10 @@ impl BashTool {
raw_env,
async move {
reject_on_output_reentry(&state)?;
if let Some(result) = validate_async_execute_input(&state, &commands) {
return Ok(result);
}
let _slot = acquire_async_execute_slot(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -2865,6 +2914,7 @@ fn shared_state_from_opts(
cancelled: std::sync::Mutex::new(Arc::new(AtomicBool::new(false))),
on_output_reentry_depth: Arc::new(AtomicUsize::new(0)),
in_sync_execute_depth: Arc::new(AtomicUsize::new(0)),
async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)),
username: opts.username.clone(),
hostname: opts.hostname.clone(),
max_commands: opts.max_commands,
Expand Down Expand Up @@ -2913,6 +2963,7 @@ fn shared_state_from_opts(
cancelled: std::sync::Mutex::new(cancelled),
on_output_reentry_depth: tmp.on_output_reentry_depth,
in_sync_execute_depth: tmp.in_sync_execute_depth,
async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)),
username: opts.username,
hostname: opts.hostname,
max_commands: opts.max_commands,
Expand Down
100 changes: 86 additions & 14 deletions crates/bashkit-js/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ export interface BashOptions {
maxCommands?: number;
maxLoopIterations?: number;
maxTotalLoopIterations?: number;
/**
* Maximum script input size in UTF-8 bytes.
*
* Async execute validates this before entering the per-instance queue so
* oversized calls cannot wait while retaining large command strings.
*/
maxInputBytes?: number;
/**
* Maximum interpreter memory in bytes (variables, arrays, functions).
*
Expand Down Expand Up @@ -289,7 +296,15 @@ export interface ExecuteOptions {
type NativeOnOutput = (chunkPair: [string, string]) => string | undefined;
const ASYNC_ON_OUTPUT_ERROR =
"onOutput must be synchronous and must not return a Promise";
const asyncExecuteQueues = new WeakMap<object, Promise<void>>();
const DEFAULT_MAX_INPUT_BYTES = 10_000_000;
const MAX_PENDING_ASYNC_EXECUTIONS = 8;
const ASYNC_EXECUTE_QUEUE_FULL_ERROR =
"too many pending async execute calls for this instance";
interface AsyncExecuteQueueState {
tail: Promise<void>;
pending: number;
}
const asyncExecuteQueues = new WeakMap<object, AsyncExecuteQueueState>();

function isAsyncFunction(fn: Function): boolean {
return Object.prototype.toString.call(fn) === "[object AsyncFunction]";
Expand All @@ -303,37 +318,67 @@ function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
);
}

function cancelledExecResult(): ExecResult {
function errorExecResult(error: string): ExecResult {
return {
stdout: "",
stderr: "",
stderr: error,
exitCode: 1,
error: "execution cancelled",
error,
stdoutTruncated: false,
stderrTruncated: false,
finalEnv: undefined,
success: false,
};
}

function cancelledExecResult(): ExecResult {
return errorExecResult("execution cancelled");
}

function inputTooLargeExecResult(
commands: string,
maxInputBytes: number,
): ExecResult | undefined {
const inputBytes = Buffer.byteLength(commands, "utf8");
if (inputBytes <= maxInputBytes) {
return undefined;
}
return errorExecResult(
`input too large: ${inputBytes} bytes exceeds maxInputBytes ${maxInputBytes}`,
);
}

// Decision: serialize async execute() per instance in JS so queued AbortSignal
// listeners only attach once a call reaches the front of the line.
// listeners only attach once a call reaches the front of the line. Also bound
// the backlog before retaining large command strings in queued closures.
function queueAsyncExecute<T>(
owner: object,
run: () => Promise<T>,
): Promise<T> {
const previous = asyncExecuteQueues.get(owner) ?? Promise.resolve();
let state = asyncExecuteQueues.get(owner);
if (!state) {
state = { tail: Promise.resolve(), pending: 0 };
asyncExecuteQueues.set(owner, state);
}
if (state.pending >= MAX_PENDING_ASYNC_EXECUTIONS) {
return Promise.reject(new Error(ASYNC_EXECUTE_QUEUE_FULL_ERROR));
}
state.pending += 1;
const previous = state.tail;
const completion = previous.then(
() => run(),
() => run(),
);
asyncExecuteQueues.set(
owner,
completion.then(
() => undefined,
() => undefined,
),
state.tail = completion.then(
() => undefined,
() => undefined,
);
state.tail.finally(() => {
state.pending -= 1;
if (state.pending === 0 && asyncExecuteQueues.get(owner) === state) {
asyncExecuteQueues.delete(owner);
}
});
return completion;
}

Expand Down Expand Up @@ -690,10 +735,12 @@ function registerCustomBuiltins(
*/
export class Bash {
private native: NativeBashType;
private maxInputBytes: number;

constructor(options?: BashOptions) {
const resolved = resolveFilesSync(options?.files);
this.native = new NativeBash(toNativeOptions(options, resolved));
this.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES;
registerCustomBuiltins(this.native, options?.customBuiltins);
}
Comment on lines 740 to 745

Expand All @@ -715,6 +762,7 @@ export class Bash {
const resolved = await resolveFiles(options?.files);
const instance = Object.create(Bash.prototype) as Bash;
instance.native = new NativeBash(toNativeOptions(options, resolved));
instance.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES;
registerCustomBuiltins(instance.native, options?.customBuiltins);
return instance;
}
Expand Down Expand Up @@ -793,6 +841,13 @@ export class Bash {
if (signal?.aborted) {
return cancelledExecResult();
}
const inputLimitResult = inputTooLargeExecResult(
commands,
this.maxInputBytes,
);
if (inputLimitResult) {
return inputLimitResult;
}
return queueAsyncExecute(this, async () => {
if (signal?.aborted) {
return cancelledExecResult();
Expand Down Expand Up @@ -1108,10 +1163,12 @@ export class Bash {
*/
export class BashTool {
private native: NativeBashToolType;
private maxInputBytes: number;

constructor(options?: BashOptions) {
const resolved = resolveFilesSync(options?.files);
this.native = new NativeBashTool(toNativeOptions(options, resolved));
this.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES;
registerCustomBuiltins(this.native, options?.customBuiltins);
}
Comment on lines 1168 to 1173

Expand All @@ -1122,6 +1179,7 @@ export class BashTool {
const resolved = await resolveFiles(options?.files);
const instance = Object.create(BashTool.prototype) as BashTool;
instance.native = new NativeBashTool(toNativeOptions(options, resolved));
instance.maxInputBytes = options?.maxInputBytes ?? DEFAULT_MAX_INPUT_BYTES;
registerCustomBuiltins(instance.native, options?.customBuiltins);
return instance;
}
Expand Down Expand Up @@ -1180,6 +1238,13 @@ export class BashTool {
if (signal?.aborted) {
return cancelledExecResult();
}
const inputLimitResult = inputTooLargeExecResult(
commands,
this.maxInputBytes,
);
if (inputLimitResult) {
return inputLimitResult;
}
return queueAsyncExecute(this, async () => {
if (signal?.aborted) {
return cancelledExecResult();
Expand Down Expand Up @@ -1642,7 +1707,14 @@ export class ScriptedTool {
* tool callbacks require the Node.js event loop to be running.
*/
async execute(commands: string): Promise<ExecResult> {
return this.native.execute(commands);
const inputLimitResult = inputTooLargeExecResult(
commands,
DEFAULT_MAX_INPUT_BYTES,
);
if (inputLimitResult) {
return inputLimitResult;
}
return queueAsyncExecute(this, () => this.native.execute(commands));
}

/**
Expand All @@ -1662,7 +1734,7 @@ export class ScriptedTool {
* Execute asynchronously. Throws `BashError` on non-zero exit.
*/
async executeOrThrow(commands: string): Promise<ExecResult> {
const result = await this.native.execute(commands);
const result = await this.execute(commands);
if (result.exitCode !== 0) {
throw new BashError(result);
}
Expand Down
Loading