Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
3763370
enable CometLocalTableScanExec by default
mbutrovich May 21, 2026
6830b09
Merge branch 'main' into enable_localtablescan
mbutrovich May 22, 2026
810e5d5
add NullType to toArrowType
mbutrovich May 22, 2026
174c939
add NullType to shuffles
mbutrovich May 22, 2026
3790c10
fix windowexec test and nulltype. fix timetype issues
mbutrovich May 22, 2026
18cd14b
Fix TimeType test.
mbutrovich May 22, 2026
fc40d59
fix null value type in map in native shuffle
mbutrovich May 22, 2026
92cc260
Merge branch 'main' into enable_localtablescan
mbutrovich May 22, 2026
cf0c1df
Merge branch 'main' into enable_localtablescan
mbutrovich May 26, 2026
8c088a7
avoid reuse in LocalTableScanExec
mbutrovich May 26, 2026
0cdfafe
Merge branch 'main' into enable_localtablescan
mbutrovich May 26, 2026
bd04fb4
Replace Comet's bespoke CometBatchIterator JNI input path with the ca…
mbutrovich May 27, 2026
2742b49
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
04597c0
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
b6db996
Unpack dictionaries.
mbutrovich May 27, 2026
5ca923f
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
cf7bb6e
Fix shading issue.
mbutrovich May 27, 2026
2560e6f
Merge remote-tracking branch 'origin/enable_localtablescan' into enab…
mbutrovich May 27, 2026
82c9a1b
Try again to fix shading issue.
mbutrovich May 27, 2026
6adf124
Fix alignment issue for FFI Decimal128 with ArrowArrayStreamReader
mbutrovich May 27, 2026
3da08dc
Merge branch 'refs/heads/main' into enable_localtablescan
mbutrovich May 28, 2026
0e08018
Fix schema mismatch in CometArrowStream.
mbutrovich May 28, 2026
a5046e3
Fix nullability mismatch in CometArrowStreamSuite.
mbutrovich May 28, 2026
8f1c35a
Fix format.
mbutrovich May 28, 2026
5c41215
Passes CometFuzzTestSuite, CometNativeShuffleSuite, CometExecSuite.
mbutrovich May 29, 2026
8c99fc5
Passes CometFuzzTestSuite, CometNativeShuffleSuite, CometExecSuite.
mbutrovich May 29, 2026
443a1c7
Cleanup, update docs.
mbutrovich May 29, 2026
07e7944
remove non-ascii
mbutrovich May 29, 2026
cc7c5be
handle arrow type mismatches on child stream in native shuffle.
mbutrovich May 29, 2026
c76a263
stash
mbutrovich May 29, 2026
1b55b97
refactor to handle on JVM side.
mbutrovich May 29, 2026
d736dd5
remove instrumentation.
mbutrovich May 29, 2026
214a75b
cleanup.
mbutrovich May 29, 2026
e8e438f
cleanup.
mbutrovich May 29, 2026
29d9e19
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
cdbb0e6
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
1913208
fix aggregation wrapping now that we don't have an extra CometExecIte…
mbutrovich May 29, 2026
327a653
Remove archeology comments.
mbutrovich May 29, 2026
0b71de4
Undo stricter tests since they're not happy on Spark 3.x.
mbutrovich May 29, 2026
a6744eb
Merge branch 'main' into opt_native_shuffle
mbutrovich May 29, 2026
1ecfd8a
Remove unintended change.
mbutrovich May 29, 2026
51d4d42
Merge remote-tracking branch 'apache/main' into opt_native_shuffle
mbutrovich May 30, 2026
4dba7ea
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
14633f7
Merge branch 'main' into enable_localtablescan
mbutrovich Jun 2, 2026
6afbdce
add CometArrowStreamSuite to CI workflows
mbutrovich Jun 2, 2026
deb697a
fix withInfo use
mbutrovich Jun 2, 2026
51a3a0d
Merge branch 'main' into enable_localtablescan
mbutrovich Jun 2, 2026
b43b5da
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
f7fd3bd
Merge branch 'main' into enable_localtablescan
mbutrovich Jun 2, 2026
bfbae18
Merge branch 'main' into opt_native_shuffle
mbutrovich Jun 2, 2026
1f5b757
mbutrovich Jun 2, 2026
6f913f9
Don't enable LocalTableScan by default, cruft from #4393.
mbutrovich Jun 2, 2026
d162399
cleanup
mbutrovich Jun 2, 2026
9f32157
cleanup
mbutrovich Jun 2, 2026
bf63e3d
cleanup
mbutrovich Jun 2, 2026
04c0825
Address PR feedback from #4507.
mbutrovich Jun 2, 2026
6aa6621
Remove inadvertent test change brought over from #4393.
mbutrovich Jun 2, 2026
c5d7f3b
Merge branch 'main' into arrow-stream-reader
mbutrovich Jun 2, 2026
95ae067
Merge branch 'main' into arrow-stream-reader
mbutrovich Jun 3, 2026
249a5e9
Merge branch 'main' into arrow-stream-reader
mbutrovich Jun 4, 2026
b4cb826
Fix batch size calculation in native shuffle writer, and task metrics…
mbutrovich Jun 4, 2026
e1a9491
Move SchemaAlignExec under shuffle.
mbutrovich Jun 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ jobs:
org.apache.spark.sql.CometToPrettyStringSuite
org.apache.spark.sql.CometCollationSuite
org.apache.comet.CometFuzzAggregateSuite
org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite
- name: "expressions"
value: |
org.apache.comet.CometExpressionSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ jobs:
org.apache.spark.sql.CometToPrettyStringSuite
org.apache.spark.sql.CometCollationSuite
org.apache.comet.CometFuzzAggregateSuite
org.apache.spark.sql.comet.execution.arrow.CometArrowStreamSuite
- name: "expressions"
value: |
org.apache.comet.CometExpressionSuite
Expand Down
38 changes: 24 additions & 14 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
│ - Builds protobuf operator plan: ShuffleWriter(child = childNativeOp) │
│ - Reads per-partition leaf iterators from CometNativeShuffleInputIterator │
│ - Drives one CometExecIterator per partition │
└─────────────────────────────────────────────────────────────────────────────┘
▼ (JNI)
Expand Down Expand Up @@ -103,13 +104,14 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Scala Side

| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds protobuf plan and invokes native execution. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, and range partition bounds. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
| Class | Location | Description |
| ------------------------------ | ------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CometShuffleExchangeExec` | `.../shuffle/CometShuffleExchangeExec.scala` | Physical plan node. Validates types and partitioning, creates `CometShuffleDependency`. |
| `CometNativeShuffleWriter` | `.../shuffle/CometNativeShuffleWriter.scala` | Implements `ShuffleWriter`. Builds the unified `ShuffleWriter(child = childNativeOp)` plan and runs it in one `CometExecIterator` per partition. |
| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Holds shuffle type, schema, range partition bounds, and (native shuffle only) a `NativeShuffleSpec`. |
| `CometNativeShuffleInputRDD` | `.../shuffle/CometNativeShuffleInputRDD.scala` | Thin scheduling-anchor RDD on the native-shuffle path. `compute` returns a `CometNativeShuffleInputIterator` carrying per-partition leaf iterators. |
| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Reads shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. |
| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |

### Rust Side

Expand All @@ -123,11 +125,19 @@ Native shuffle (`CometExchange`) is selected when all of the following condition

### Write Path

1. **Plan construction**: `CometNativeShuffleWriter` builds a protobuf operator plan containing:
- A scan operator reading from the input iterator
- A `ShuffleWriter` operator with partitioning config and compression codec

2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.
1. **Plan construction**: `CometNativeShuffleWriter` builds a protobuf operator tree with a
`ShuffleWriter` operator at the root and `childNativeOp` as its child. `childNativeOp` takes
one of two shapes:
- The child plan's `nativeOp` directly, when `CometShuffleExchangeExec`'s child is a
`CometNativeExec` subtree. The upstream operators run inside the same `CometExecIterator`
as the writer, with no JVM-to-native batch boundary between them.
- A synthetic `Scan("ShuffleWriterInput")` placeholder, when the dep was built via the
convenience `prepareShuffleDependency(rdd, ...)` overload (used by
`CometCollectLimitExec` and `CometTakeOrderedAndProjectExec`, or when the
exchange's child is a non-native `CometPlan` such as `CometSparkToColumnarExec`). Native
code reads `ColumnarBatch`es from the JVM input iterator via Arrow C Stream Interface.

2. **Native execution**: A single `CometExecIterator` per partition runs the unified plan.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
Expand Down
110 changes: 110 additions & 0 deletions native/core/src/execution/operators/aligned_stream_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{RecordBatch, RecordBatchOptions, StructArray};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi_stream::FFI_ArrowArrayStream;
use std::ffi::CStr;
use std::sync::Arc;

/// C Stream Interface reader that calls [`arrow::array::ArrayData::align_buffers`] on every
/// imported batch before constructing typed arrays. Stock `ArrowArrayStreamReader` panics
/// when a JVM producer hands us a `Decimal128` buffer at an offset that is 8-byte but not
/// 16-byte aligned, which Java's allocator does not guarantee. Track upstream:
/// <https://github.com/apache/arrow-rs/issues/10028>.
#[derive(Debug)]
pub struct AlignedArrowStreamReader {
stream: FFI_ArrowArrayStream,
schema: SchemaRef,
}

impl AlignedArrowStreamReader {
/// # Safety
/// `raw` must point at a valid `FFI_ArrowArrayStream` whose ownership is being transferred
/// to this reader. The stream's release callback fires when the reader is dropped.
pub unsafe fn from_raw(raw: *mut FFI_ArrowArrayStream) -> Result<Self, ArrowError> {
let mut stream = FFI_ArrowArrayStream::from_raw(raw);
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}
let schema = read_schema(&mut stream)?;
Ok(Self { stream, schema })
}

pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn last_error(&mut self) -> Option<String> {
let get = self.stream.get_last_error?;
let ptr = unsafe { get(&mut self.stream) };
if ptr.is_null() {
return None;
}
Some(
unsafe { CStr::from_ptr(ptr) }
.to_string_lossy()
.into_owned(),
)
}
}

impl Iterator for AlignedArrowStreamReader {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
let mut array = FFI_ArrowArray::empty();
let ret = unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) };
if ret != 0 {
let msg = self
.last_error()
.unwrap_or_else(|| format!("get_next returned {ret}"));
return Some(Err(ArrowError::CDataInterface(msg)));
}
if array.is_released() {
return None;
}

let dt = DataType::Struct(self.schema.fields().clone());
Some(
unsafe { from_ffi_and_data_type(array, dt) }.and_then(|mut data| {
data.align_buffers();
let len = data.len();
RecordBatch::try_new_with_options(
Arc::clone(&self.schema),
StructArray::from(data).into_parts().1,
&RecordBatchOptions::new().with_row_count(Some(len)),
)
}),
)
}
}

fn read_schema(stream: &mut FFI_ArrowArrayStream) -> Result<SchemaRef, ArrowError> {
let mut schema = FFI_ArrowSchema::empty();
let ret = unsafe { stream.get_schema.unwrap()(stream, &mut schema) };
if ret != 0 {
return Err(ArrowError::CDataInterface(format!(
"Cannot get schema from input stream. Error code: {ret}"
)));
}
Ok(Arc::new(Schema::try_from(&schema)?))
}
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

pub use crate::errors::ExecutionError;

pub use aligned_stream_reader::*;
pub use copy::*;
pub use iceberg_scan::*;
pub use scan::*;

mod aligned_stream_reader;
mod copy;
mod expand;
pub use expand::ExpandExec;
Expand Down
Loading
Loading