Skip to content

perf: avoid FFI import/export when passing batches between native plans#3930

Open
andygrove wants to merge 20 commits intoapache:mainfrom
andygrove:batch-stash-shuffle-optimization
Open

perf: avoid FFI import/export when passing batches between native plans#3930
andygrove wants to merge 20 commits intoapache:mainfrom
andygrove:batch-stash-shuffle-optimization

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 12, 2026

Which issue does this PR close?

Closes #3925.

Rationale for this change

When Comet has a native ShuffleWriter and a native child plan, batches get created in native code, then exported to JVM via Arrow FFI, then imported back to native for the shuffle writer. The JVM never reads the data, so the FFI round-trip is unnecessary overhead.

This PR provides faster performance and reduces JVM memory allocations. Impact may be larger for more complex schemas / larger benchmarks, but I see a clear win with a local TPC-H @ SF100 comparison with the main branch.

tpch_queries_compare

What changes are included in this PR?

Introduces a "batch stash" optimization that passes batches as opaque u64 handles through the JVM instead of doing full Arrow FFI export/import. This reduces the per-batch overhead from 4 FFI boundary crossings to 2 lightweight JNI calls passing a single long.

New components:

  • BatchStash (native/core/src/execution/batch_stash.rs) -- global Mutex<HashMap<u64, RecordBatch>> registry. The child plan stashes its output batch, and the shuffle writer's ScanExec retrieves it by handle.
  • CometHandleBatchIterator (Java + Rust JNI bridge) -- passes handles between the two native execution contexts through the JVM.
  • executePlanBatchHandle JNI function -- like executePlan but stashes the output batch instead of FFI-exporting it. Shared execution logic is extracted into execute_plan_impl with an OutputMode enum to avoid duplication.
  • CometShuffleWriterInputIterator -- preserves the CometExecIterator reference through Spark's shuffle dependency RDD so the shuffle writer can detect native input.

Modified components:

  • CometExecIterator -- gains stash mode (enableStashMode(), nextHandle()) for producing handles instead of ColumnarBatch.
  • ScanExec -- gains handle_mode flag. When true, retrieves batches from the BatchStash instead of via Arrow FFI import.
  • Scan protobuf message -- new bool batch_stash_handle field signals handle mode to the native planner.
  • CometNativeShuffleWriter -- detects native child plans automatically and enables the stash path.

Detection is automatic -- no configuration needed. When a CometExecIterator feeds into a native shuffle writer, the optimization activates. Non-native child plans fall back to the existing FFI path.

How are these changes tested?

  • Existing CometNativeShuffleSuite (22 tests) passes -- validates correctness of the stash path since all native shuffle queries now use it.
  • Existing CometShuffleSuite passes -- validates the fallback FFI path for columnar shuffle.
  • New Rust unit tests for BatchStash (stash/take semantics, handle uniqueness, cleanup).
  • Clippy clean, formatted.

Design for avoiding unnecessary Arrow FFI import/export when passing
batches between two native plans (issue apache#3925). Uses a native-side
batch registry to pass opaque handles through the JVM instead of
full Arrow FFI round-trips.
- Extract shared execution logic into execute_plan_impl with OutputMode enum
- Replace stringly-typed handle mode detection with HANDLE_SCAN_SOURCE constant
- Remove no-op catch/throw in CometExecIterator.nextHandle()
- Remove unnecessary #![allow(dead_code)] from batch_stash module
- Remove unnecessary @volatile from stashMode field
arrow_ffi_safe: bool,
/// When true, input comes from a CometHandleBatchIterator and batches are
/// retrieved from the BatchStash instead of via Arrow FFI import.
pub handle_mode: bool,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably combine arrow_ffi_safe and handle_mode and use an enum, but I would prefer to do that as a follow on PR. Something like:

enum TransferMode {
  FfiSafe,
  FfiUnsafe,
  Handle

@andygrove andygrove changed the title feat: avoid FFI import/export when passing batches between native plans perf: avoid FFI import/export when passing batches between native plans Apr 12, 2026
…conciliation

Stashed RecordBatches are already fully formed by the child plan.
Decomposing them into columns and rebuilding via build_record_batch
caused assertion failures when the ScanExec schema (from protobuf
data_types) didn't exactly match the batch schema.
…ches

Stashed batches may have type differences from the ScanExec schema
(e.g., Timestamp without timezone vs with timezone). When column
counts match, delegate to build_record_batch for casting. When they
don't match, return the batch as-is.
@andygrove andygrove marked this pull request as ready for review April 12, 2026 19:19
@andygrove andygrove requested a review from wForget April 12, 2026 19:21
.intConf
.createWithDefault(1)

val COMET_SHUFFLE_BATCH_STASH_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added config for now just in case we discover bugs, but I plan on removing this config in the future.

static NEXT_HANDLE: AtomicU64 = AtomicU64::new(1);

/// Global stash mapping handles to RecordBatch values.
static STASH: Lazy<Mutex<HashMap<u64, RecordBatch>>> = Lazy::new(|| Mutex::new(HashMap::new()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this must be a global one? Any leak risk? Do we need some cleanup to remove the content?

let handle = NEXT_HANDLE.fetch_add(1, Ordering::Relaxed);
STASH
.lock()
.expect("batch_stash lock poisoned")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably return an error instead of panic? i.e., lock().unwrap_or_else(|e| e.into_inner()).

if columns.len() == self.schema.fields().len() {
// Column counts match. Use build_record_batch to handle any
// type differences (e.g., timestamp timezone casting).
let maybe_batch = self.build_record_batch(columns, num_rows);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the batch is complete, why need to build_record_batch?

/// A complete RecordBatch retrieved from the BatchStash. Bypasses
/// `build_record_batch` since the batch is already fully formed.
Complete(RecordBatch),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in shuffle_scan.rs, Complete batch is returned directly. It looks inconsistent.

} else {
// Column count mismatch (e.g., empty schema scan).
// Return the stashed batch as-is since it's already valid.
Poll::Ready(Some(Ok(batch.clone())))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more clear in semantics here to take it instead of clone?

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A performance improvement PR with high quality. Only a few comments.

Thanks @andygrove

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Avoid FFI import/export when passing batches between two native plans

2 participants