Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/bashkit-js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ Use `reset()` only when you want to discard state entirely.
For synchronous execution, `executeSync(...)` and `executeSyncOrThrow(...)`
also accept `{ signal }`.

Async `execute(...)` calls are serialized per instance and keep a bounded
backlog. If too many executions are already pending on one `Bash`, `BashTool`,
or `ScriptedTool`, the next async call rejects instead of retaining another
queued command string. Commands larger than `maxInputBytes` are rejected before
entering the async queue.
Comment thread
chaliy marked this conversation as resolved.

## BashTool

`BashTool` wraps the interpreter with tool-contract metadata for agent frameworks:
Expand Down Expand Up @@ -565,6 +571,7 @@ import {
- `maxCommands?: number`
- `maxLoopIterations?: number`
- `maxMemory?: number`
- `maxInputBytes?: number`
- `timeoutMs?: number`
- `files?: Record<string, string | (() => string) | (() => Promise<string>)>`
- `mounts?: Array<{ path: string; root: string; writable?: boolean }>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ for (const [label, create] of [
assert.equal(result.stdout, "");
});

it("rejects async execute when the per-instance queue is full", async () => {
const shell = create();
const blocker = shell.execute("sleep 0.05");
const accepted = Array.from({ length: 7 }, (_, i) =>
shell.execute(`echo queued-${i}`),
);

await assert.rejects(
() => shell.execute("echo should-not-queue"),
/too many pending async execute calls/,
);

assert.equal((await blocker).exitCode, 0);
for (const pending of accepted) {
assert.equal((await pending).exitCode, 0);
}
});

it("validates maxInputBytes before queueing async execute", async () => {
const shell = create({ maxInputBytes: 4 });

const result = await shell.execute("echo too-large");

assert.equal(result.exitCode, 1);
assert.match(result.error, /input too large/);
assert.equal(result.stdout, "");
});

it("async rejects Promise-returning onOutput", async () => {
const shell = create();

Expand Down
53 changes: 52 additions & 1 deletion crates/bashkit-js/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};

// ---------------------------------------------------------------------------
// Shared tokio runtime + concurrency limiter for JS tool callbacks (issue #982).
Expand All @@ -46,6 +46,12 @@ use tokio::sync::Mutex;
// maximum number of concurrent in-flight callbacks to prevent DoS.
// ---------------------------------------------------------------------------
const MAX_CONCURRENT_TOOL_CALLBACKS: usize = 10;
// Decision: JS async execute() is per-instance serialized by RustBash, so only
// a small bounded backlog is useful. Bound it before awaiting the mutex so
// pending futures cannot retain unbounded command strings.
const MAX_PENDING_ASYNC_EXECUTIONS: usize = 8;
const ASYNC_EXECUTE_QUEUE_FULL_ERROR: &str =
"too many pending async execute calls for this instance";

fn callback_runtime() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
Expand Down Expand Up @@ -1388,6 +1394,7 @@ struct SharedState {
/// function check this and fail closed instead of deadlocking on a
/// callback the loop can never service. See [`SYNC_BUILTIN_DEADLOCK_ERROR`].
in_sync_execute_depth: Arc<AtomicUsize>,
async_execute_semaphore: Arc<Semaphore>,
username: Option<String>,
hostname: Option<String>,
max_commands: Option<u32>,
Expand Down Expand Up @@ -1446,6 +1453,32 @@ where
rt_guard.block_on(f(s2))
}

fn max_input_bytes_for_state(state: &SharedState) -> usize {
state.max_input_bytes.map_or_else(
|| ExecutionLimits::default().max_input_bytes,
|v| v as usize,
)
}

fn validate_async_execute_input(state: &SharedState, commands: &str) -> Option<ExecResult> {
let max_input_bytes = max_input_bytes_for_state(state);
let input_len = commands.len();
if input_len > max_input_bytes {
return Some(js_exec_result_from_error(format!(
"input too large: {input_len} bytes exceeds maxInputBytes {max_input_bytes}"
)));
}
None
}

fn acquire_async_execute_slot(state: &Arc<SharedState>) -> napi::Result<OwnedSemaphorePermit> {
state
.async_execute_semaphore
.clone()
.try_acquire_owned()
.map_err(|_| napi::Error::from_reason(ASYNC_EXECUTE_QUEUE_FULL_ERROR))
}

fn canonicalize_path(path: &str, label: &str) -> napi::Result<PathBuf> {
std::fs::canonicalize(path)
.map_err(|e| napi::Error::from_reason(format!("Invalid {label} '{path}': {e}")))
Expand Down Expand Up @@ -1559,7 +1592,11 @@ impl Bash {
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
if let Some(result) = validate_async_execute_input(&self.state, &commands) {
return Ok(result);
}
let s = self.state.clone();
let _slot = acquire_async_execute_slot(&s)?;
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
}
Expand All @@ -1581,6 +1618,10 @@ impl Bash {
raw_env,
async move {
reject_on_output_reentry(&state)?;
if let Some(result) = validate_async_execute_input(&state, &commands) {
return Ok(result);
}
let _slot = acquire_async_execute_slot(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -2144,7 +2185,11 @@ impl BashTool {
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
if let Some(result) = validate_async_execute_input(&self.state, &commands) {
return Ok(result);
}
let s = self.state.clone();
let _slot = acquire_async_execute_slot(&s)?;
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
}
Expand All @@ -2166,6 +2211,10 @@ impl BashTool {
raw_env,
async move {
reject_on_output_reentry(&state)?;
if let Some(result) = validate_async_execute_input(&state, &commands) {
return Ok(result);
}
let _slot = acquire_async_execute_slot(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -3226,6 +3275,7 @@ fn shared_state_from_opts(
cancelled: std::sync::Mutex::new(Arc::new(AtomicBool::new(false))),
on_output_reentry_depth: Arc::new(AtomicUsize::new(0)),
in_sync_execute_depth: Arc::new(AtomicUsize::new(0)),
async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)),
username: opts.username.clone(),
hostname: opts.hostname.clone(),
max_commands: opts.max_commands,
Expand Down Expand Up @@ -3275,6 +3325,7 @@ fn shared_state_from_opts(
cancelled: std::sync::Mutex::new(cancelled),
on_output_reentry_depth: tmp.on_output_reentry_depth,
in_sync_execute_depth: tmp.in_sync_execute_depth,
async_execute_semaphore: Arc::new(Semaphore::new(MAX_PENDING_ASYNC_EXECUTIONS)),
username: opts.username,
hostname: opts.hostname,
max_commands: opts.max_commands,
Expand Down
Loading
Loading