From ae3983b0dcff8121f3ea2723f3b0af4eeac99513 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 27 May 2026 15:29:35 -0600 Subject: [PATCH] feat: custom Rust UDFs via arrow-ffi (alternative to #4283) [skip ci] Adds custom Rust scalar UDF support using arrow-only FFI surfaces, as suggested by paleolimbot and timsaucer in feedback on #4283. This is an alternative implementation for comparison; see #4283 for the bespoke-ABI version. Two ABI flavors are provided side-by-side so reviewers can compare: 1. C ABI (sedona-style): pure C-callable struct of function pointers, parameterized only by Arrow C Data Interface (FFI_ArrowSchema / FFI_ArrowArray). Decoupled from datafusion versions; future-portable to C/C++. Modeled on apache/sedona-db's SedonaCScalarKernel header. 2. datafusion-ffi (FFI_ScalarUDF): wraps user's ScalarUDFImpl as FFI_ScalarUDF. Inherits full ScalarUDFImpl surface (variadic signatures, type coercion, metadata-aware return types) for free, at the cost of a major-version pin against datafusion-ffi. A single library may export either or both; loader walks both discovery functions. `comet-test-udfs` exposes `add_one_c` (C ABI) and `add_one_df` (datafusion-ffi) and the e2e suite drives both through Spark. Scope is intentionally minimal for comparison: scalar-only, happy-path e2e tests only (no panic/error/signature-mismatch coverage). The Scala JVM API mirrors #4283 exactly so the comparison is apples-to-apples. Pieces: - native/comet-udf-sdk: SDK with both ABI flavors + export macros - native/comet-test-udfs: cdylib exposing one UDF per ABI - native/core/src/execution/rust_udf: loader, cache, ImportedCScalarUdf - native/core/src/comet_rust_udf_bridge.rs: JNI for validateLibrary/listUdfs - native/proto: RustUdfCall message - spark/.../udf: CometRustUDF.register, registry, exception classes, JNI bridge stub - spark/.../serde/CometScalaUDF: dispatch ScalaUDF to RustUdfCall when udfName is in the registry --- native/Cargo.lock | 243 +++++- native/Cargo.toml | 5 +- native/comet-test-udfs/Cargo.toml | 36 + native/comet-test-udfs/src/lib.rs | 142 ++++ native/comet-udf-sdk/Cargo.toml | 41 + native/comet-udf-sdk/src/c_abi.rs | 707 ++++++++++++++++++ native/comet-udf-sdk/src/df_abi.rs | 239 ++++++ native/comet-udf-sdk/src/lib.rs | 71 ++ native/core/Cargo.toml | 4 + native/core/build.rs | 43 ++ native/core/src/comet_rust_udf_bridge.rs | 97 +++ native/core/src/execution/mod.rs | 1 + native/core/src/execution/planner.rs | 41 + native/core/src/execution/rust_udf/cache.rs | 87 +++ .../core/src/execution/rust_udf/imported_c.rs | 312 ++++++++ native/core/src/execution/rust_udf/loader.rs | 350 +++++++++ native/core/src/execution/rust_udf/mod.rs | 43 ++ .../src/execution/rust_udf/test_support.rs | 24 + native/core/src/lib.rs | 1 + native/proto/src/proto/expr.proto | 20 + pom.xml | 2 + .../apache/comet/udf/CometRustUdfBridge.java | 41 + .../apache/comet/serde/CometScalaUDF.scala | 39 +- .../org/apache/comet/udf/CometRustUDF.scala | 140 ++++ .../comet/udf/CometRustUdfExceptions.scala | 51 ++ .../comet/udf/CometRustUdfRegistry.scala | 59 ++ .../org/apache/comet/CometRustUdfSuite.scala | 66 ++ 27 files changed, 2899 insertions(+), 6 deletions(-) create mode 100644 native/comet-test-udfs/Cargo.toml create mode 100644 native/comet-test-udfs/src/lib.rs create mode 100644 native/comet-udf-sdk/Cargo.toml create mode 100644 native/comet-udf-sdk/src/c_abi.rs create mode 100644 native/comet-udf-sdk/src/df_abi.rs create mode 100644 native/comet-udf-sdk/src/lib.rs create mode 100644 native/core/build.rs create mode 100644 native/core/src/comet_rust_udf_bridge.rs create mode 100644 native/core/src/execution/rust_udf/cache.rs create mode 100644 native/core/src/execution/rust_udf/imported_c.rs create mode 100644 native/core/src/execution/rust_udf/loader.rs create mode 100644 native/core/src/execution/rust_udf/mod.rs create mode 100644 native/core/src/execution/rust_udf/test_support.rs create mode 100644 spark/src/main/java/org/apache/comet/udf/CometRustUdfBridge.java create mode 100644 spark/src/main/scala/org/apache/comet/udf/CometRustUDF.scala create mode 100644 spark/src/main/scala/org/apache/comet/udf/CometRustUdfExceptions.scala create mode 100644 spark/src/main/scala/org/apache/comet/udf/CometRustUdfRegistry.scala create mode 100644 spark/src/test/scala/org/apache/comet/CometRustUdfSuite.scala diff --git a/native/Cargo.lock b/native/Cargo.lock index 11e9b1ccff..80ff09ddf7 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2,6 +2,54 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "abi_stable" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6512d3eb05ffe5004c59c206de7f99c34951504056ce23fc953842f12c445" +dependencies = [ + "abi_stable_derive", + "abi_stable_shared", + "const_panic", + "core_extensions", + "crossbeam-channel", + "generational-arena", + "libloading 0.7.4", + "lock_api", + "parking_lot", + "paste", + "repr_offset", + "rustc_version", + "serde", + "serde_derive", + "serde_json", +] + +[[package]] +name = "abi_stable_derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7178468b407a4ee10e881bc7a328a65e739f0863615cca4429d43916b05e898" +dependencies = [ + "abi_stable_shared", + "as_derive_utils", + "core_extensions", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", + "typed-arena", +] + +[[package]] +name = "abi_stable_shared" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b5df7688c123e63f4d4d649cba63f2967ba7f7861b1664fca3f77d3dad2b63" +dependencies = [ + "core_extensions", +] + [[package]] name = "addr2line" version = "0.25.1" @@ -458,6 +506,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063" +[[package]] +name = "as_derive_utils" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3c96645900a44cf11941c111bd08a6573b0e2f9f69bc9264b179d8fae753c4" +dependencies = [ + "core_extensions", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "assertables" version = "9.9.0" @@ -517,6 +577,15 @@ dependencies = [ "slab", ] +[[package]] +name = "async-ffi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4de21c0feef7e5a556e51af767c953f0501f7f300ba785cc99c47bdc8081a50" +dependencies = [ + "abi_stable", +] + [[package]] name = "async-global-executor" version = "2.4.1" @@ -1427,7 +1496,7 @@ checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" dependencies = [ "glob", "libc", - "libloading", + "libloading 0.8.9", ] [[package]] @@ -1501,6 +1570,24 @@ dependencies = [ "memchr", ] +[[package]] +name = "comet-test-udfs" +version = "0.17.0" +dependencies = [ + "arrow", + "comet-udf-sdk", + "datafusion", +] + +[[package]] +name = "comet-udf-sdk" +version = "0.17.0" +dependencies = [ + "arrow", + "datafusion", + "datafusion-ffi", +] + [[package]] name = "comfy-table" version = "7.2.2" @@ -1573,6 +1660,15 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "const_panic" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e262cdaac42494e3ae34c43969f9cdeb7da178bdb4b66fa6a1ea2edb4c8ae652" +dependencies = [ + "typewit", +] + [[package]] name = "constant_time_eq" version = "0.4.2" @@ -1595,6 +1691,21 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_extensions" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42bb5e5d0269fd4f739ea6cedaf29c16d81c27a7ce7582008e90eb50dcd57003" +dependencies = [ + "core_extensions_proc_macros", +] + +[[package]] +name = "core_extensions_proc_macros" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533d38ecd2709b7608fb8e18e4504deb99e9a72879e6aa66373a76d8dc4259ea" + [[package]] name = "cpp_demangle" version = "0.5.1" @@ -1982,6 +2093,8 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", + "comet-test-udfs", + "comet-udf-sdk", "criterion", "datafusion", "datafusion-comet-common", @@ -1991,6 +2104,7 @@ dependencies = [ "datafusion-comet-shuffle", "datafusion-comet-spark-expr", "datafusion-datasource", + "datafusion-ffi", "datafusion-functions-nested", "datafusion-physical-expr-adapter", "datafusion-spark", @@ -2003,6 +2117,7 @@ dependencies = [ "itertools 0.14.0", "jni 0.22.4", "lazy_static", + "libloading 0.8.9", "log", "log4rs", "mimalloc", @@ -2382,6 +2497,36 @@ dependencies = [ "paste", ] +[[package]] +name = "datafusion-ffi" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95173344d04ba62755c949bf44f8d1a6e4414cf6392a635db96c07e711b9a3c" +dependencies = [ + "abi_stable", + "arrow", + "arrow-schema", + "async-ffi", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-datasource", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-proto", + "datafusion-proto-common", + "datafusion-session", + "futures", + "log", + "prost", + "semver", + "tokio", +] + [[package]] name = "datafusion-functions" version = "53.1.0" @@ -2653,6 +2798,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-proto" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a387aaef949dc16bb6abc81bd1af850ec7449183aef011214f9724957495738" +dependencies = [ + "arrow", + "chrono", + "datafusion-catalog", + "datafusion-catalog-listing", + "datafusion-common", + "datafusion-datasource", + "datafusion-datasource-arrow", + "datafusion-datasource-csv", + "datafusion-datasource-json", + "datafusion-datasource-parquet", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-table", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-proto-common", + "object_store", + "prost", + "rand 0.9.4", +] + +[[package]] +name = "datafusion-proto-common" +version = "53.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e614c7c53a9c304c6a850b821010bb492e57300311835f1180613f9d2c63d9" +dependencies = [ + "arrow", + "datafusion-common", + "prost", +] + [[package]] name = "datafusion-pruning" version = "53.1.0" @@ -3196,6 +3380,15 @@ dependencies = [ "slab", ] +[[package]] +name = "generational-arena" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877e94aff08e743b651baaea359664321055749b398adff8740a7399af7796e7" +dependencies = [ + "cfg-if", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3971,7 +4164,7 @@ dependencies = [ "java-locator", "jni-macros", "jni-sys 0.4.1", - "libloading", + "libloading 0.8.9", "log", "simd_cesu8", "thiserror 2.0.18", @@ -4156,6 +4349,16 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libloading" version = "0.8.9" @@ -5555,6 +5758,15 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "repr_offset" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1070755bd29dffc19d0971cab794e607839ba2ef4b69a9e6fbc8733c1b72ea" +dependencies = [ + "tstr", +] + [[package]] name = "reqsign" version = "0.16.5" @@ -6752,6 +6964,21 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tstr" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8e0294f14baae476d0dd0a2d780b2e24d66e349a9de876f5126777a37bdba7" +dependencies = [ + "tstr_proc_macros", +] + +[[package]] +name = "tstr_proc_macros" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78122066b0cb818b8afd08f7ed22f7fdbc3e90815035726f0840d0d26c0747a" + [[package]] name = "twox-hash" version = "2.1.2" @@ -6761,6 +6988,12 @@ dependencies = [ "rand 0.9.4", ] +[[package]] +name = "typed-arena" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a" + [[package]] name = "typed-builder" version = "0.20.1" @@ -6826,6 +7059,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "typewit" +version = "1.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "214ca0b2191785cbc06209b9ca1861e048e39b5ba33574b3cedd58363d5bb5f6" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/native/Cargo.toml b/native/Cargo.toml index 9e252a796c..35942caa80 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -16,8 +16,8 @@ # under the License. [workspace] -default-members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle"] -members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle", "hdfs", "fs-hdfs"] +default-members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle", "comet-udf-sdk", "comet-test-udfs"] +members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle", "hdfs", "fs-hdfs", "comet-udf-sdk", "comet-test-udfs"] resolver = "2" [workspace.package] @@ -40,6 +40,7 @@ bytes = { version = "1.11.1" } parquet = { version = "58.3.0", default-features = false, features = ["experimental"] } datafusion = { version = "53.1.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } datafusion-datasource = { version = "53.1.0" } +datafusion-ffi = { version = "53.1.0" } datafusion-physical-expr-adapter = { version = "53.1.0" } datafusion-spark = { version = "53.1.0", features = ["core"] } datafusion-comet-spark-expr = { path = "spark-expr" } diff --git a/native/comet-test-udfs/Cargo.toml b/native/comet-test-udfs/Cargo.toml new file mode 100644 index 0000000000..90dc50b8a5 --- /dev/null +++ b/native/comet-test-udfs/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "comet-test-udfs" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +publish = false +description = "Test UDF cdylib used by Comet's Rust UDF host tests (arrow-ffi based)" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +arrow = { workspace = true } +datafusion = { workspace = true } +comet-udf-sdk = { path = "../comet-udf-sdk" } diff --git a/native/comet-test-udfs/src/lib.rs b/native/comet-test-udfs/src/lib.rs new file mode 100644 index 0000000000..a5031e503a --- /dev/null +++ b/native/comet-test-udfs/src/lib.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test UDF cdylib for Comet's Rust UDF host tests. +//! +//! Exports the same `add_one` semantics through both ABI flavors so the +//! host can verify them side-by-side: +//! +//! - `add_one_c` — exposed via the C ABI (arrow-only FFI, sedona-style) +//! - `add_one_df` — exposed via the datafusion-ffi ABI (`FFI_ScalarUDF`) +//! +//! Built as `libcomet_test_udfs.{so,dylib}`. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use arrow::datatypes::{DataType, Field}; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; + +use comet_udf_sdk::c_abi::CometCScalarUdf; +use comet_udf_sdk::{comet_c_udf_export, comet_df_udf_export}; + +// ---------- C ABI implementation ------------------------------------ + +/// `add_one` exposed via the C ABI. +pub struct AddOneC; + +impl Default for AddOneC { + fn default() -> Self { + AddOneC + } +} + +impl CometCScalarUdf for AddOneC { + fn name(&self) -> &str { + "add_one_c" + } + + fn return_field(&self, args: &[Field]) -> Result { + if args.len() != 1 { + return Err(format!("add_one_c expects 1 arg, got {}", args.len())); + } + if args[0].data_type() != &DataType::Int64 { + return Err(format!( + "add_one_c expects Int64, got {}", + args[0].data_type() + )); + } + Ok(Field::new("add_one_c", DataType::Int64, true)) + } + + fn invoke(&self, args: &[ArrayRef], _n_rows: usize) -> Result { + let arr = args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| "expected Int64Array".to_string())?; + let out: Int64Array = arr.iter().map(|v| v.map(|x| x + 1)).collect(); + Ok(Arc::new(out)) + } +} + +// ---------- datafusion-ffi implementation --------------------------- + +/// `add_one` exposed via datafusion-ffi. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct AddOneDf { + signature: Signature, +} + +impl AddOneDf { + fn new() -> Self { + Self { + signature: Signature::new( + TypeSignature::Exact(vec![DataType::Int64]), + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for AddOneDf { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "add_one_df" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _args: &[DataType]) -> datafusion::common::Result { + Ok(DataType::Int64) + } + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + let arr: ArrayRef = match &args.args[0] { + ColumnarValue::Array(a) => Arc::clone(a), + ColumnarValue::Scalar(s) => s.to_array_of_size(args.number_rows)?, + }; + let a = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("expected Int64".into()))?; + let out: Int64Array = a.iter().map(|v| v.map(|x| x + 1)).collect(); + Ok(ColumnarValue::Array(Arc::new(out))) + } +} + +/// Factory used by the discovery macro. +pub fn make_add_one_df() -> Arc { + Arc::new(ScalarUDF::from(AddOneDf::new())) +} + +// ---------- discovery exports -------------------------------------- + +// `comet_c_udf_export!` emits both `comet_c_udf_list_v1` and +// `comet_udf_abi_version`. `comet_df_udf_export!` would emit +// `comet_udf_abi_version` again, so we use the `no_abi_version` form to +// avoid the duplicate symbol. +comet_c_udf_export!(AddOneC); +comet_df_udf_export!(no_abi_version => make_add_one_df); diff --git a/native/comet-udf-sdk/Cargo.toml b/native/comet-udf-sdk/Cargo.toml new file mode 100644 index 0000000000..766a095358 --- /dev/null +++ b/native/comet-udf-sdk/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "comet-udf-sdk" +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } +description = "SDK for writing custom Rust UDFs that run inside Apache DataFusion Comet (arrow-ffi based)" + +publish = false + +[features] +# Enable the C ABI flavor (Arrow C Data Interface only, sedona-style). +c-abi = [] +# Enable the datafusion-ffi flavor (FFI_ScalarUDF / ForeignScalarUDF). +df-abi = ["dep:datafusion", "dep:datafusion-ffi"] +default = ["c-abi", "df-abi"] + +[dependencies] +arrow = { workspace = true } +datafusion = { workspace = true, optional = true } +datafusion-ffi = { workspace = true, optional = true } diff --git a/native/comet-udf-sdk/src/c_abi.rs b/native/comet-udf-sdk/src/c_abi.rs new file mode 100644 index 0000000000..e8069eb369 --- /dev/null +++ b/native/comet-udf-sdk/src/c_abi.rs @@ -0,0 +1,707 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pure C ABI flavor — sedona-style. +//! +//! The wire format is two `#[repr(C)]` structs of function pointers, +//! parameterized only by Arrow's C Data Interface +//! (`FFI_ArrowSchema` / `FFI_ArrowArray`). No DataFusion types appear in +//! the FFI surface, so the user's cdylib only needs a matching `arrow` +//! crate, not a matching `datafusion` version. +//! +//! # Authoring a UDF +//! +//! Implement [`CometCScalarUdf`] for a unit struct and use the +//! [`comet_c_udf_export!`] macro to emit the discovery entry point: +//! +//! ```ignore +//! use comet_udf_sdk::c_abi::*; +//! use arrow::array::{ArrayRef, Int64Array}; +//! use arrow::datatypes::{DataType, Field}; +//! use std::sync::Arc; +//! +//! pub struct AddOne; +//! impl CometCScalarUdf for AddOne { +//! fn name(&self) -> &str { "add_one_c" } +//! fn return_field(&self, args: &[Field]) -> Result { +//! if args.len() != 1 || args[0].data_type() != &DataType::Int64 { +//! return Err("expected (Int64) -> Int64".into()); +//! } +//! Ok(Field::new("add_one_c", DataType::Int64, true)) +//! } +//! fn invoke(&self, args: &[ArrayRef], _n: usize) -> Result { +//! let a = args[0].as_any().downcast_ref::().unwrap(); +//! Ok(Arc::new(a.iter().map(|v| v.map(|x| x + 1)).collect::())) +//! } +//! } +//! +//! comet_udf_sdk::comet_c_udf_export!(AddOne); +//! ``` + +use std::ffi::{c_char, c_int, c_void}; + +use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; + +/// Generic non-zero error code returned by `init` / `execute` to signal +/// failure. The host treats any non-zero return as an error and calls +/// `get_last_error` for the message; the specific code is informational. +const C_ABI_ERR: c_int = 1; + +// -- factory struct -------------------------------------------------------- + +/// Factory for [`CometCScalarKernelImpl`] instances. +/// +/// Lives in a registry, may be cloned across an FFI boundary. Calls to +/// `function_name` and `new_impl` must be thread-safe (the implementation +/// is responsible for any internal synchronization). +/// +/// `#[repr(C)]` layout, matched by the host loader. Adding new fields +/// requires bumping `COMET_UDF_ABI_VERSION`. +#[repr(C)] +pub struct CometCScalarKernel { + /// Return the function name this kernel implements as a NUL-terminated + /// UTF-8 C string. The pointer must remain valid for the lifetime of + /// the [`CometCScalarKernel`]. + /// + /// May be `None`, in which case the kernel is treated as anonymous and + /// won't be discoverable by name. (Comet always sets this; field is + /// optional for parity with sedona's design.) + pub function_name: Option *const c_char>, + + /// Initialize a new [`CometCScalarKernelImpl`] into `out`. Called once + /// per execution, on the thread that will then drive `init`/`execute`. + pub new_impl: Option< + unsafe extern "C" fn(*const CometCScalarKernel, out: *mut CometCScalarKernelImpl), + >, + + /// Release this kernel. After release, all callbacks must be set to + /// `None`. Called when the host's `LoadedLibrary` is dropped. + pub release: Option, + + /// Implementation-private data, opaque to the host. + pub private_data: *mut c_void, +} + +// SAFETY: `CometCScalarKernel` is a thin wrapper around C function +// pointers with caller-defined synchronization semantics; the trait impls +// are required so loaded kernels can be referenced from multi-threaded +// host code. Implementations of the FFI must respect thread safety as +// described in the doc comments. +unsafe impl Send for CometCScalarKernel {} +unsafe impl Sync for CometCScalarKernel {} + +impl Default for CometCScalarKernel { + fn default() -> Self { + Self { + function_name: None, + new_impl: None, + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +impl Drop for CometCScalarKernel { + fn drop(&mut self) { + if let Some(release) = self.release.take() { + // SAFETY: release is the FFI-defined cleanup callback; + // implementations must reset `release` to None per the contract. + unsafe { release(self) }; + } + } +} + +// -- per-execution instance struct ---------------------------------------- + +/// Per-execution instance produced by [`CometCScalarKernel::new_impl`]. +/// +/// Not thread-safe; the caller must serialize access. Typically used on +/// one thread for one batch then dropped. +#[repr(C)] +pub struct CometCScalarKernelImpl { + /// Compute the return type from arg types and (optionally) bound + /// scalar arguments. + /// + /// On success, `out` is populated with the return type as an + /// `FFI_ArrowSchema` and the function returns 0. On failure, returns + /// a non-zero errno and the host calls `get_last_error` to retrieve + /// the message. + /// + /// `arg_types` points to an array of `n_args` `*const FFI_ArrowSchema`. + /// `scalar_args` may be NULL (no scalars) or point to an array of + /// `n_args` `*mut FFI_ArrowArray`, each of length 1 (or NULL when + /// the corresponding argument is not a scalar). Implementations may + /// take ownership of scalar entries by replacing them with released + /// arrays. + pub init: Option< + unsafe extern "C" fn( + *mut CometCScalarKernelImpl, + arg_types: *const *const FFI_ArrowSchema, + scalar_args: *const *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, + ) -> c_int, + >, + + /// Execute one batch. + /// + /// `args` points to an array of `n_args` `*mut FFI_ArrowArray`. + /// Each input must have length `n_rows` or length 1 (scalar broadcast). + /// On success writes the result into `out` and returns 0. + pub execute: Option< + unsafe extern "C" fn( + *mut CometCScalarKernelImpl, + args: *const *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, + ) -> c_int, + >, + + /// Return the last error message produced by `init` or `execute`. + /// + /// Returns NULL if there is no error. The pointer is valid until the + /// next call to any method on this instance (or `release`). + pub get_last_error: + Option *const c_char>, + + /// Release this instance. After release `release` must be `None`. + pub release: Option, + + /// Implementation-private data, opaque to the host. + pub private_data: *mut c_void, +} + +impl Default for CometCScalarKernelImpl { + fn default() -> Self { + Self { + init: None, + execute: None, + get_last_error: None, + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +impl Drop for CometCScalarKernelImpl { + fn drop(&mut self) { + if let Some(release) = self.release.take() { + // SAFETY: per the FFI contract `release` cleans up + // private_data and resets `release` to None. + unsafe { release(self) }; + } + } +} + +// -- discovery list -------------------------------------------------------- + +/// List of kernels exposed by a cdylib via `comet_c_udf_list_v1`. +/// +/// Ownership of the underlying `CometCScalarKernel` array is transferred +/// to the host: the host invokes each kernel's `release` and then frees +/// the list via `release_list`. +#[repr(C)] +pub struct CometCScalarKernelList { + /// Pointer to the kernel array, or null if `len == 0`. + pub kernels: *mut CometCScalarKernel, + /// Number of kernels in `kernels`. + pub len: i64, + /// Free the array of kernels. Implementations must invoke each + /// kernel's `release` first, then release the array storage. + pub release: Option, +} + +impl Default for CometCScalarKernelList { + fn default() -> Self { + Self { kernels: std::ptr::null_mut(), len: 0, release: None } + } +} + +impl Drop for CometCScalarKernelList { + fn drop(&mut self) { + if let Some(release) = self.release.take() { + // SAFETY: `release` is responsible for freeing each kernel and + // the array storage that backs `kernels`. + unsafe { release(self) }; + } + } +} + +// -- high-level Rust trait + adapter -------------------------------------- + +use arrow::array::ArrayRef; +use arrow::datatypes::Field; + +/// High-level Rust trait the user implements to author a UDF. +/// +/// Adapted to the C ABI by [`ExportedScalarKernel`]. +pub trait CometCScalarUdf: Send + Sync { + /// Stable function name. Returned via `function_name` over the FFI. + fn name(&self) -> &str; + + /// Compute the output `Field` from the input `Field`s. + /// + /// Called once per execution, before `invoke`. May reject input + /// arities or types by returning an error; the host then surfaces + /// the message to the planner. + fn return_field(&self, args: &[Field]) -> Result; + + /// Evaluate one batch of `n_rows` rows. + fn invoke(&self, args: &[ArrayRef], n_rows: usize) -> Result; +} + +/// Wraps a user `CometCScalarUdf` impl as a [`CometCScalarKernel`] +/// suitable for emission via the C ABI discovery list. +pub struct ExportedScalarKernel { + inner: std::sync::Arc, + /// NUL-terminated C string holding the function name. Lifetime is + /// tied to `self` so the pointer returned to the host stays valid. + name_c: std::ffi::CString, +} + +impl ExportedScalarKernel { + /// Wrap `udf` for export. + pub fn new(udf: U) -> Self { + let name_c = std::ffi::CString::new(udf.name().to_string()) + .expect("UDF name must not contain interior NUL bytes"); + Self { + inner: std::sync::Arc::new(udf), + name_c, + } + } +} + +impl From for CometCScalarKernel { + fn from(value: ExportedScalarKernel) -> Self { + let boxed: Box = Box::new(value); + let private = Box::into_raw(boxed) as *mut c_void; + CometCScalarKernel { + function_name: Some(c_factory_function_name), + new_impl: Some(c_factory_new_impl), + release: Some(c_factory_release), + private_data: private, + } + } +} + +unsafe extern "C" fn c_factory_function_name(this: *const CometCScalarKernel) -> *const c_char { + debug_assert!(!this.is_null()); + let this = unsafe { &*this }; + debug_assert!(!this.private_data.is_null()); + let exp = unsafe { &*(this.private_data as *const ExportedScalarKernel) }; + exp.name_c.as_ptr() +} + +unsafe extern "C" fn c_factory_new_impl( + this: *const CometCScalarKernel, + out: *mut CometCScalarKernelImpl, +) { + debug_assert!(!this.is_null()); + debug_assert!(!out.is_null()); + let this = unsafe { &*this }; + let exp = unsafe { &*(this.private_data as *const ExportedScalarKernel) }; + let impl_state = ExportedScalarKernelImpl { + inner: std::sync::Arc::clone(&exp.inner), + last_arg_fields: None, + last_return_field: None, + last_error: std::ffi::CString::default(), + }; + unsafe { + std::ptr::write(out, CometCScalarKernelImpl::from(impl_state)); + } +} + +unsafe extern "C" fn c_factory_release(this: *mut CometCScalarKernel) { + debug_assert!(!this.is_null()); + let this_ref = unsafe { &mut *this }; + if !this_ref.private_data.is_null() { + // SAFETY: private_data was set via Box::into_raw in + // From; reclaim and drop. + let _ = unsafe { + Box::from_raw(this_ref.private_data as *mut ExportedScalarKernel) + }; + this_ref.private_data = std::ptr::null_mut(); + } + this_ref.function_name = None; + this_ref.new_impl = None; + this_ref.release = None; +} + +struct ExportedScalarKernelImpl { + inner: std::sync::Arc, + last_arg_fields: Option>, + last_return_field: Option, + last_error: std::ffi::CString, +} + +impl From for CometCScalarKernelImpl { + fn from(value: ExportedScalarKernelImpl) -> Self { + let boxed = Box::new(value); + let private = Box::into_raw(boxed) as *mut c_void; + CometCScalarKernelImpl { + init: Some(c_kernel_init), + execute: Some(c_kernel_execute), + get_last_error: Some(c_kernel_get_last_error), + release: Some(c_kernel_release), + private_data: private, + } + } +} + +unsafe extern "C" fn c_kernel_init( + this: *mut CometCScalarKernelImpl, + arg_types: *const *const FFI_ArrowSchema, + _scalar_args: *const *mut FFI_ArrowArray, + n_args: i64, + out: *mut FFI_ArrowSchema, +) -> c_int { + debug_assert!(!this.is_null()); + let this_ref = unsafe { &mut *this }; + let priv_ptr = this_ref.private_data as *mut ExportedScalarKernelImpl; + debug_assert!(!priv_ptr.is_null()); + let priv_ref = unsafe { &mut *priv_ptr }; + + let n = n_args as usize; + let mut fields = Vec::with_capacity(n); + for i in 0..n { + let schema_ptr = unsafe { *arg_types.add(i) }; + if schema_ptr.is_null() { + priv_ref.last_error = + std::ffi::CString::new(format!("arg #{i} has null FFI_ArrowSchema")) + .unwrap_or_default(); + return C_ABI_ERR; + } + let schema = unsafe { &*schema_ptr }; + match Field::try_from(schema) { + Ok(f) => fields.push(f), + Err(e) => { + priv_ref.last_error = + std::ffi::CString::new(format!("arg #{i}: {e}")).unwrap_or_default(); + return C_ABI_ERR; + } + } + } + + match priv_ref.inner.return_field(&fields) { + Ok(ret_field) => { + match FFI_ArrowSchema::try_from(&ret_field) { + Ok(ffi_schema) => { + unsafe { std::ptr::write(out, ffi_schema) }; + priv_ref.last_arg_fields = Some(fields); + priv_ref.last_return_field = Some(ret_field); + 0 + } + Err(e) => { + priv_ref.last_error = std::ffi::CString::new(format!( + "encoding return type: {e}" + )) + .unwrap_or_default(); + C_ABI_ERR + } + } + } + Err(msg) => { + priv_ref.last_error = std::ffi::CString::new(msg).unwrap_or_default(); + C_ABI_ERR + } + } +} + +unsafe extern "C" fn c_kernel_execute( + this: *mut CometCScalarKernelImpl, + args: *const *mut FFI_ArrowArray, + n_args: i64, + n_rows: i64, + out: *mut FFI_ArrowArray, +) -> c_int { + debug_assert!(!this.is_null()); + let this_ref = unsafe { &mut *this }; + let priv_ptr = this_ref.private_data as *mut ExportedScalarKernelImpl; + debug_assert!(!priv_ptr.is_null()); + let priv_ref = unsafe { &mut *priv_ptr }; + + let arg_fields = match priv_ref.last_arg_fields.as_ref() { + Some(f) => f, + None => { + priv_ref.last_error = + std::ffi::CString::new("execute called before init").unwrap_or_default(); + return C_ABI_ERR; + } + }; + if arg_fields.len() != n_args as usize { + priv_ref.last_error = std::ffi::CString::new(format!( + "execute n_args={} disagrees with init n_args={}", + n_args, + arg_fields.len() + )) + .unwrap_or_default(); + return C_ABI_ERR; + } + + // Take ownership of each input FFI_ArrowArray. + let n = n_args as usize; + let mut arrays: Vec = Vec::with_capacity(n); + for i in 0..n { + let raw = unsafe { *args.add(i) }; + if raw.is_null() { + priv_ref.last_error = + std::ffi::CString::new(format!("arg #{i} FFI_ArrowArray is null")) + .unwrap_or_default(); + return C_ABI_ERR; + } + // SAFETY: raw points at an FFI_ArrowArray owned by the caller; we + // take ownership by reading and zeroing it. + let owned = unsafe { std::ptr::read(raw) }; + unsafe { std::ptr::write(raw, FFI_ArrowArray::empty()) }; + let dt = arg_fields[i].data_type().clone(); + let data = match unsafe { arrow::ffi::from_ffi_and_data_type(owned, dt) } { + Ok(d) => d, + Err(e) => { + priv_ref.last_error = std::ffi::CString::new(format!( + "arg #{i} from_ffi: {e}" + )) + .unwrap_or_default(); + return C_ABI_ERR; + } + }; + arrays.push(arrow::array::make_array(data)); + } + + let result = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + priv_ref.inner.invoke(&arrays, n_rows as usize) + })) { + Ok(Ok(arr)) => arr, + Ok(Err(msg)) => { + priv_ref.last_error = std::ffi::CString::new(msg).unwrap_or_default(); + return C_ABI_ERR; + } + Err(panic) => { + let msg = match panic.downcast_ref::<&'static str>() { + Some(s) => format!("panic: {s}"), + None => match panic.downcast_ref::() { + Some(s) => format!("panic: {s}"), + None => "panic: ".to_string(), + }, + }; + priv_ref.last_error = std::ffi::CString::new(msg).unwrap_or_default(); + return C_ABI_ERR; + } + }; + + let ffi_out = FFI_ArrowArray::new(&result.to_data()); + unsafe { std::ptr::write(out, ffi_out) }; + 0 +} + +unsafe extern "C" fn c_kernel_get_last_error( + this: *mut CometCScalarKernelImpl, +) -> *const c_char { + debug_assert!(!this.is_null()); + let this_ref = unsafe { &*this }; + let priv_ptr = this_ref.private_data as *mut ExportedScalarKernelImpl; + if priv_ptr.is_null() { + return std::ptr::null(); + } + let priv_ref = unsafe { &*priv_ptr }; + priv_ref.last_error.as_ptr() +} + +unsafe extern "C" fn c_kernel_release(this: *mut CometCScalarKernelImpl) { + debug_assert!(!this.is_null()); + let this_ref = unsafe { &mut *this }; + if !this_ref.private_data.is_null() { + let _ = unsafe { + Box::from_raw(this_ref.private_data as *mut ExportedScalarKernelImpl) + }; + this_ref.private_data = std::ptr::null_mut(); + } + this_ref.init = None; + this_ref.execute = None; + this_ref.get_last_error = None; + this_ref.release = None; +} + +// -- discovery list construction ------------------------------------------ + +/// Build a heap-allocated [`CometCScalarKernelList`] from a vector of +/// [`CometCScalarKernel`]s. Ownership transfers to the caller, who must +/// free via the list's `release` callback. +pub fn build_kernel_list(kernels: Vec) -> CometCScalarKernelList { + if kernels.is_empty() { + return CometCScalarKernelList::default(); + } + let mut boxed = kernels.into_boxed_slice(); + let len = boxed.len() as i64; + let kernels_ptr = boxed.as_mut_ptr(); + std::mem::forget(boxed); + CometCScalarKernelList { + kernels: kernels_ptr, + len, + release: Some(c_list_release), + } +} + +unsafe extern "C" fn c_list_release(list: *mut CometCScalarKernelList) { + debug_assert!(!list.is_null()); + let list_ref = unsafe { &mut *list }; + if list_ref.kernels.is_null() || list_ref.len == 0 { + list_ref.release = None; + return; + } + let len = list_ref.len as usize; + // SAFETY: kernels was a Box<[CometCScalarKernel]> turned into raw ptr + + // forgotten in build_kernel_list; reconstruct and drop. Each kernel's + // own Drop runs its `release` callback. + let _ = unsafe { + Box::from_raw(std::slice::from_raw_parts_mut(list_ref.kernels, len)) + }; + list_ref.kernels = std::ptr::null_mut(); + list_ref.len = 0; + list_ref.release = None; +} + +// -- export macro --------------------------------------------------------- + +/// Emit the C-ABI discovery entry points for a list of UDF types. +/// +/// Each type passed must implement [`CometCScalarUdf`] and `Default`. The +/// macro produces: +/// +/// - `extern "C" fn comet_udf_abi_version() -> u32` +/// - `extern "C" fn comet_c_udf_list_v1(out: *mut CometCScalarKernelList) -> i32` +/// +/// Your `Cargo.toml` must declare `crate-type = ["cdylib"]`. +#[macro_export] +macro_rules! comet_c_udf_export { + ( $( $ty:ty ),+ $(,)? ) => { + const _: () = { + #[no_mangle] + pub extern "C" fn comet_udf_abi_version() -> u32 { + $crate::COMET_UDF_ABI_VERSION + } + + #[no_mangle] + pub unsafe extern "C" fn comet_c_udf_list_v1( + out: *mut $crate::c_abi::CometCScalarKernelList, + ) -> i32 { + if out.is_null() { return -1; } + let kernels: Vec<$crate::c_abi::CometCScalarKernel> = vec![ + $( + $crate::c_abi::CometCScalarKernel::from( + $crate::c_abi::ExportedScalarKernel::new(<$ty as Default>::default()) + ), + )+ + ]; + let list = $crate::c_abi::build_kernel_list(kernels); + std::ptr::write(out, list); + 0 + } + }; + }; +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, Int64Array}; + use arrow::datatypes::DataType; + use std::sync::Arc; + + struct AddOne; + impl CometCScalarUdf for AddOne { + fn name(&self) -> &str { + "add_one" + } + fn return_field(&self, args: &[Field]) -> Result { + if args.len() != 1 || args[0].data_type() != &DataType::Int64 { + return Err("expected (Int64) -> Int64".into()); + } + Ok(Field::new("add_one", DataType::Int64, true)) + } + fn invoke(&self, args: &[ArrayRef], _n: usize) -> Result { + let a = args[0] + .as_any() + .downcast_ref::() + .ok_or("not an Int64Array")?; + Ok(Arc::new( + a.iter().map(|v| v.map(|x| x + 1)).collect::(), + )) + } + } + + #[test] + fn adapter_roundtrip() { + let exp = ExportedScalarKernel::new(AddOne); + let kernel: CometCScalarKernel = exp.into(); + + // function_name lookup. + let name_ptr = unsafe { (kernel.function_name.unwrap())(&kernel) }; + let name = unsafe { std::ffi::CStr::from_ptr(name_ptr) }; + assert_eq!(name.to_str().unwrap(), "add_one"); + + // new_impl + init + execute. + let mut impl_state = CometCScalarKernelImpl::default(); + unsafe { + (kernel.new_impl.unwrap())(&kernel, &mut impl_state); + } + + let arg_field = Field::new("x", DataType::Int64, true); + let arg_schema = FFI_ArrowSchema::try_from(&arg_field).unwrap(); + let arg_schema_ptr: *const FFI_ArrowSchema = &arg_schema; + let mut out_schema = FFI_ArrowSchema::empty(); + let rc = unsafe { + (impl_state.init.unwrap())( + &mut impl_state, + &arg_schema_ptr as *const *const FFI_ArrowSchema, + std::ptr::null(), + 1, + &mut out_schema, + ) + }; + assert_eq!(rc, 0); + let out_field = Field::try_from(&out_schema).unwrap(); + assert_eq!(out_field.data_type(), &DataType::Int64); + + // execute. + let input: Arc = + Arc::new(Int64Array::from(vec![1, 2, 3])); + let mut input_ffi = FFI_ArrowArray::new(&input.to_data()); + let input_ffi_ptr: *mut FFI_ArrowArray = &mut input_ffi; + let mut out_arr = FFI_ArrowArray::empty(); + let rc = unsafe { + (impl_state.execute.unwrap())( + &mut impl_state, + &input_ffi_ptr as *const *mut FFI_ArrowArray, + 1, + 3, + &mut out_arr, + ) + }; + assert_eq!(rc, 0); + let result_data = + unsafe { arrow::ffi::from_ffi_and_data_type(out_arr, DataType::Int64) }.unwrap(); + let result = arrow::array::make_array(result_data); + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.values(), &[2, 3, 4]); + + // Releasing impl_state and kernel runs cleanup callbacks; ensure + // their function pointers are cleared. + drop(impl_state); + drop(kernel); + } +} diff --git a/native/comet-udf-sdk/src/df_abi.rs b/native/comet-udf-sdk/src/df_abi.rs new file mode 100644 index 0000000000..735ca84842 --- /dev/null +++ b/native/comet-udf-sdk/src/df_abi.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! datafusion-ffi flavor. +//! +//! Discovery returns a list of `FFI_ScalarUDF` values produced by +//! `datafusion_ffi`. The host imports each via +//! `ForeignScalarUDF::try_from`, yielding a `ScalarUDFImpl` it can plug +//! straight into its existing planner — no further adaptation needed. +//! +//! The user's library inherits the entire `ScalarUDFImpl` surface +//! (signature with type coercion, metadata-aware return types, aliases, +//! etc.) for free, at the cost of a hard major-version pin against +//! `datafusion-ffi`. +//! +//! # Authoring a UDF +//! +//! Implement `ScalarUDFImpl` as you would for any DataFusion UDF, then: +//! +//! ```ignore +//! use comet_udf_sdk::comet_df_udf_export; +//! use datafusion::logical_expr::ScalarUDF; +//! use std::sync::Arc; +//! +//! fn make_add_one_df() -> Arc { +//! // ... build ScalarUDF from your impl ... +//! # unimplemented!() +//! } +//! +//! comet_df_udf_export!(make_add_one_df); +//! ``` + +use std::sync::Arc; + +use datafusion::logical_expr::ScalarUDF; +use datafusion_ffi::udf::FFI_ScalarUDF; + +/// List of UDFs exposed by a cdylib via `comet_df_udf_list_v1`. +/// +/// Ownership of the array transfers to the host, which frees via the +/// list's `release` callback after consuming each FFI_ScalarUDF. +#[repr(C)] +pub struct CometDfUdfList { + /// Pointer to the FFI_ScalarUDF array, or null if `len == 0`. + pub udfs: *mut FFI_ScalarUDF, + /// Number of entries. + pub len: i64, + /// Release the array storage. Each FFI_ScalarUDF entry is released + /// individually by its own Drop / clone_release machinery; this + /// callback only frees the heap-allocated array itself. + pub release: Option, +} + +impl Default for CometDfUdfList { + fn default() -> Self { + Self { udfs: std::ptr::null_mut(), len: 0, release: None } + } +} + +impl Drop for CometDfUdfList { + fn drop(&mut self) { + if let Some(release) = self.release.take() { + // SAFETY: release frees the array storage. The FFI_ScalarUDF + // entries themselves were already dropped (or moved out) by + // the host before invoking release. + unsafe { release(self) }; + } + } +} + +/// Build a heap-allocated `CometDfUdfList` from a vector of `Arc`. +pub fn build_udf_list(udfs: Vec>) -> CometDfUdfList { + if udfs.is_empty() { + return CometDfUdfList::default(); + } + let ffi_udfs: Vec = + udfs.into_iter().map(FFI_ScalarUDF::from).collect(); + let mut boxed = ffi_udfs.into_boxed_slice(); + let len = boxed.len() as i64; + let udfs_ptr = boxed.as_mut_ptr(); + std::mem::forget(boxed); + CometDfUdfList { + udfs: udfs_ptr, + len, + release: Some(c_list_release), + } +} + +unsafe extern "C" fn c_list_release(list: *mut CometDfUdfList) { + debug_assert!(!list.is_null()); + let list_ref = unsafe { &mut *list }; + if list_ref.udfs.is_null() || list_ref.len == 0 { + list_ref.release = None; + return; + } + let len = list_ref.len as usize; + // SAFETY: udfs was a Box<[FFI_ScalarUDF]> turned into raw ptr + + // forgotten in build_udf_list. Reconstruct and drop. Each + // FFI_ScalarUDF entry's Drop runs its own release callback. + let _ = unsafe { + Box::from_raw(std::slice::from_raw_parts_mut(list_ref.udfs, len)) + }; + list_ref.udfs = std::ptr::null_mut(); + list_ref.len = 0; + list_ref.release = None; +} + +/// Emit the datafusion-ffi discovery entry points. +/// +/// Each item is the path of a function `fn() -> Arc`. The +/// macro produces: +/// +/// - `extern "C" fn comet_udf_abi_version() -> u32` (if not already exported) +/// - `extern "C" fn comet_df_udf_list_v1(out: *mut CometDfUdfList) -> i32` +/// +/// Use the `_no_abi_version` variant to suppress the abi_version export +/// when emitting both `comet_c_udf_export!` and `comet_df_udf_export!` from +/// the same crate (`comet_c_udf_export!` already emits it). +#[macro_export] +macro_rules! comet_df_udf_export { + ( $( $factory:path ),+ $(,)? ) => { + $crate::comet_df_udf_export!(@list $($factory),+); + const _: () = { + #[no_mangle] + pub extern "C" fn comet_udf_abi_version() -> u32 { + $crate::COMET_UDF_ABI_VERSION + } + }; + }; + (@list $( $factory:path ),+ $(,)? ) => { + const _: () = { + #[no_mangle] + pub unsafe extern "C" fn comet_df_udf_list_v1( + out: *mut $crate::df_abi::CometDfUdfList, + ) -> i32 { + if out.is_null() { return -1; } + let udfs: Vec> = vec![ + $( $factory(), )+ + ]; + let list = $crate::df_abi::build_udf_list(udfs); + std::ptr::write(out, list); + 0 + } + }; + }; + (no_abi_version => $( $factory:path ),+ $(,)? ) => { + $crate::comet_df_udf_export!(@list $($factory),+); + }; +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, Int64Array}; + use arrow::datatypes::DataType; + use datafusion::common::DataFusionError; + use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, + Volatility, + }; + use std::any::Any; + + #[derive(Debug, PartialEq, Eq, Hash)] + struct AddOne { + signature: Signature, + } + impl AddOne { + fn new() -> Self { + Self { + signature: Signature::new( + TypeSignature::Exact(vec![DataType::Int64]), + Volatility::Immutable, + ), + } + } + } + impl ScalarUDFImpl for AddOne { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "add_one_df" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _: &[DataType]) -> datafusion::common::Result { + Ok(DataType::Int64) + } + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + let arr: ArrayRef = match &args.args[0] { + ColumnarValue::Array(a) => Arc::clone(a), + ColumnarValue::Scalar(s) => s.to_array_of_size(args.number_rows)?, + }; + let a = arr + .as_any() + .downcast_ref::() + .ok_or_else(|| DataFusionError::Execution("not Int64".into()))?; + let out: Int64Array = a.iter().map(|v| v.map(|x| x + 1)).collect(); + Ok(ColumnarValue::Array(Arc::new(out))) + } + } + + #[test] + fn build_list_and_round_trip_through_ffi() { + let udf = Arc::new(ScalarUDF::from(AddOne::new())); + let mut list = build_udf_list(vec![udf]); + assert_eq!(list.len, 1); + + // Read the FFI_ScalarUDF entry and import it via the canonical + // From<&FFI_ScalarUDF> for Arc. + let entry: &FFI_ScalarUDF = unsafe { &*list.udfs }; + let foreign: Arc = entry.into(); + assert_eq!(foreign.name(), "add_one_df"); + + // Drop list to exercise the release path. + drop(foreign); + if let Some(rel) = list.release.take() { + unsafe { rel(&mut list) }; + } + } +} diff --git a/native/comet-udf-sdk/src/lib.rs b/native/comet-udf-sdk/src/lib.rs new file mode 100644 index 0000000000..9f4d8ef118 --- /dev/null +++ b/native/comet-udf-sdk/src/lib.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SDK for writing scalar UDFs in Rust that are loaded and executed by +//! Apache DataFusion Comet, using only Arrow's stable FFI surface. +//! +//! Two ABI flavors are available, behind cargo features: +//! +//! - **C ABI** (`c-abi` feature, on by default) — pure C-callable struct of +//! function pointers built only on the Arrow C Data Interface +//! (`FFI_ArrowSchema` / `FFI_ArrowArray`). Modeled on Apache Sedona's +//! `SedonaCScalarKernel`. Works with any Arrow producer that speaks the +//! C Data Interface — including C/C++ implementations. +//! +//! - **DataFusion FFI ABI** (`df-abi` feature, on by default) — wraps the +//! user's `ScalarUDFImpl` as `datafusion_ffi::udf::FFI_ScalarUDF`. Larger +//! surface, full DataFusion features (variadic signatures, type coercion, +//! metadata-aware return types) for free, but the user's library is +//! coupled to the matching `datafusion-ffi` major version. +//! +//! A single library may export either or both ABIs; the host loader tries +//! both discovery functions and prefers the C ABI on duplicate names. +//! +//! See `docs/source/user-guide/latest/custom-rust-udfs.md` for an +//! end-to-end walkthrough. + +#![warn(missing_docs)] + +/// Discovery ABI version. Bumped on any backwards-incompatible change to +/// the discovery entry-point signatures or to the FFI structs they yield. +pub const COMET_UDF_ABI_VERSION: u32 = 1; + +#[cfg(feature = "c-abi")] +pub mod c_abi; + +#[cfg(feature = "df-abi")] +pub mod df_abi; + +/// Symbol name of the discovery entry point exported by C-ABI cdylibs. +pub const C_ABI_DISCOVERY_SYMBOL: &str = "comet_c_udf_list_v1"; + +/// Symbol name of the discovery entry point exported by datafusion-ffi +/// cdylibs. +pub const DF_ABI_DISCOVERY_SYMBOL: &str = "comet_df_udf_list_v1"; + +/// Symbol name of the ABI version probe exported by every cdylib. +pub const ABI_VERSION_SYMBOL: &str = "comet_udf_abi_version"; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn abi_version_is_one() { + assert_eq!(COMET_UDF_ABI_VERSION, 1); + } +} diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c58d446917..ef3befb4bc 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -56,6 +56,7 @@ paste = "1.0.14" datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } datafusion-physical-expr-adapter = { workspace = true } datafusion-datasource = { workspace = true } +datafusion-ffi = { workspace = true } datafusion-spark = { workspace = true } once_cell = "1.18.0" datafusion-comet-common = { workspace = true } @@ -77,6 +78,8 @@ iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } serde_json = "1.0" uuid = "1.23.0" +libloading = "0.8" +comet-udf-sdk = { path = "../comet-udf-sdk" } [target.'cfg(target_os = "linux")'.dependencies] procfs = "0.18.0" @@ -92,6 +95,7 @@ lazy_static = "1.4" assertables = "9" hex = "0.4.3" datafusion-functions-nested = { version = "53.1.0" } +comet-test-udfs = { path = "../comet-test-udfs" } [features] backtrace = ["datafusion/backtrace"] diff --git a/native/core/build.rs b/native/core/build.rs new file mode 100644 index 0000000000..b896fdf82a --- /dev/null +++ b/native/core/build.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +fn main() { + // Expose the path of the comet-test-udfs cdylib to test code via + // COMET_TEST_UDFS_LIB. Cargo doesn't propagate cdylib outputs as + // DEP_<...>_OUT_DIR for non-rlib crates, so we compute the path + // from OUT_DIR. + if let Ok(out_dir) = std::env::var("OUT_DIR") { + let out_path = std::path::PathBuf::from(out_dir); + // OUT_DIR is .../target//build//out + let target_profile_dir = out_path + .ancestors() + .nth(3) + .map(|p| p.to_path_buf()) + .unwrap_or_default(); + let dylib_ext = if cfg!(target_os = "macos") { + "dylib" + } else if cfg!(target_os = "windows") { + "dll" + } else { + "so" + }; + let lib_path = target_profile_dir.join(format!("libcomet_test_udfs.{dylib_ext}")); + println!("cargo:rustc-env=COMET_TEST_UDFS_LIB={}", lib_path.display()); + } + println!("cargo:rerun-if-changed=../comet-test-udfs/src/lib.rs"); + println!("cargo:rerun-if-changed=../comet-test-udfs/Cargo.toml"); +} diff --git a/native/core/src/comet_rust_udf_bridge.rs b/native/core/src/comet_rust_udf_bridge.rs new file mode 100644 index 0000000000..7e6341a775 --- /dev/null +++ b/native/core/src/comet_rust_udf_bridge.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JNI entry points for driver-side validation of Rust UDF cdylibs. +//! Used by `org.apache.comet.udf.CometRustUdfBridge` on the driver. + +use crate::errors::{try_unwrap_or_throw, CometError}; +use crate::execution::rust_udf::cache::get_or_load; +use crate::execution::rust_udf::loader::LoadedUdf; +use jni::objects::{JClass, JString}; +use jni::sys::jobject; +use jni::EnvUnowned; + +/// Best-effort serialization of a single discovered UDF as JSON. The +/// `args`/`return_type` fields require calling the C-ABI kernel's `init` +/// to discover a return type — for the comparison PR we punt on that and +/// return only the bits the Scala registry needs (`name`, `abi`). +fn udf_to_json(udf: &LoadedUdf) -> serde_json::Value { + let abi = match udf.abi { + crate::execution::rust_udf::loader::UdfAbi::C => "c-abi", + crate::execution::rust_udf::loader::UdfAbi::DataFusion => "datafusion-ffi", + }; + serde_json::json!({ + "name": udf.name, + "abi": abi, + }) +} + +/// Validate that `library_path` loads, exposes a UDF named +/// `expected_name`, and return a JSON description of that UDF. Throws +/// on any error. +#[no_mangle] +pub extern "system" fn Java_org_apache_comet_udf_CometRustUdfBridge_validateLibrary( + e: EnvUnowned, + _class: JClass, + library_path: JString, + expected_name: JString, +) -> jobject { + try_unwrap_or_throw(&e, |env| { + let path: String = library_path + .try_to_string(env) + .map_err(|e| CometError::Internal(e.to_string()))?; + let name: String = expected_name + .try_to_string(env) + .map_err(|e| CometError::Internal(e.to_string()))?; + let lib = get_or_load(&path) + .map_err(|e| CometError::Internal(e.to_string()))?; + let udf = lib + .udfs + .iter() + .find(|u| u.name == name) + .ok_or_else(|| { + CometError::Internal(format!("UDF '{name}' not found in {path}")) + })?; + let json = udf_to_json(udf).to_string(); + let jstr = env + .new_string(json) + .map_err(|e| CometError::Internal(e.to_string()))?; + Ok(jstr.into_raw()) + }) +} + +/// Return a JSON array describing every UDF exposed by `library_path`. +#[no_mangle] +pub extern "system" fn Java_org_apache_comet_udf_CometRustUdfBridge_listUdfs( + e: EnvUnowned, + _class: JClass, + library_path: JString, +) -> jobject { + try_unwrap_or_throw(&e, |env| { + let path: String = library_path + .try_to_string(env) + .map_err(|e| CometError::Internal(e.to_string()))?; + let lib = get_or_load(&path) + .map_err(|e| CometError::Internal(e.to_string()))?; + let entries: Vec = lib.udfs.iter().map(udf_to_json).collect(); + let json = serde_json::Value::Array(entries).to_string(); + let jstr = env + .new_string(json) + .map_err(|e| CometError::Internal(e.to_string()))?; + Ok(jstr.into_raw()) + }) +} diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index ec247f72b7..9291eafdaa 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -23,6 +23,7 @@ pub(crate) mod merge_as_partial; pub(crate) mod metrics; pub mod operators; pub(crate) mod planner; +pub mod rust_udf; pub mod serde; pub use datafusion_comet_shuffle as shuffle; pub(crate) mod sort; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 542c3d9536..c172d422d7 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -764,6 +764,47 @@ impl PhysicalPlanner { self.task_context.clone(), ))) } + ExprStruct::RustUdfCall(call) => { + let arg_exprs: Vec> = call + .args + .iter() + .map(|e| self.create_expr(e, Arc::clone(&input_schema))) + .collect::, _>>()?; + + let lib = crate::execution::rust_udf::cache::get_or_load(&call.library_path) + .map_err(|e| { + GeneralError(format!( + "Rust UDF load '{}': {e}", + call.library_path + )) + })?; + + let loaded = lib + .udfs + .iter() + .find(|u| u.name == call.name) + .ok_or_else(|| { + GeneralError(format!( + "Rust UDF '{}' not found in '{}'", + call.name, call.library_path + )) + })?; + + let udf = Arc::new(ScalarUDF::new_from_shared_impl(Arc::clone(&loaded.udf_impl))); + + let return_type = to_arrow_datatype(call.return_type.as_ref().ok_or_else( + || GeneralError("RustUdfCall missing return_type".into()), + )?); + let return_field = Arc::new(Field::new(&call.name, return_type, true)); + let expr = Arc::new(ScalarFunctionExpr::new( + &call.name, + udf, + arg_exprs, + return_field, + Arc::new(ConfigOptions::default()), + )); + Ok(expr) + } expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } diff --git a/native/core/src/execution/rust_udf/cache.rs b/native/core/src/execution/rust_udf/cache.rs new file mode 100644 index 0000000000..ecc5ea6ea4 --- /dev/null +++ b/native/core/src/execution/rust_udf/cache.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Process-wide cache of loaded UDF cdylibs. +//! +//! Same-path lookups always return the same `Arc` for +//! the lifetime of the process — libraries are deliberately never +//! unloaded. Calling `dlclose` while a thread is mid-call would be a +//! use-after-free, and there is no safe point to unload without +//! per-invocation refcounting we don't want on the hot path. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, OnceLock, RwLock}; + +use super::loader::{load, LoadedLibrary, LoaderError}; + +static CACHE: OnceLock>>> = OnceLock::new(); + +fn cache() -> &'static RwLock>> { + CACHE.get_or_init(|| RwLock::new(HashMap::new())) +} + +/// Get an already-loaded library, or load and cache it. +pub fn get_or_load(path: impl AsRef) -> Result, LoaderError> { + let raw = path.as_ref().to_path_buf(); + + if let Some(lib) = cache().read().unwrap().get(&raw).cloned() { + return Ok(lib); + } + + let canonical = raw.canonicalize().unwrap_or_else(|_| raw.clone()); + if canonical != raw { + if let Some(lib) = cache().read().unwrap().get(&canonical).cloned() { + cache().write().unwrap().insert(raw, lib.clone()); + return Ok(lib); + } + } + + let mut w = cache().write().unwrap(); + if let Some(lib) = w.get(&canonical).cloned() { + if canonical != raw { + w.insert(raw, lib.clone()); + } + return Ok(lib); + } + let loaded = Arc::new(load(&canonical)?); + w.insert(canonical.clone(), loaded.clone()); + if canonical != raw { + w.insert(raw, loaded.clone()); + } + Ok(loaded) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::execution::rust_udf::test_support::test_udfs_path; + + #[test] + fn same_path_returns_same_arc() { + let p = test_udfs_path(); + let a = get_or_load(&p).unwrap(); + let b = get_or_load(&p).unwrap(); + assert!(Arc::ptr_eq(&a, &b)); + } + + #[test] + fn missing_path_propagates_error() { + let err = get_or_load("/no/such/file.dylib").unwrap_err(); + assert!(matches!(err, LoaderError::Open { .. })); + } +} diff --git a/native/core/src/execution/rust_udf/imported_c.rs b/native/core/src/execution/rust_udf/imported_c.rs new file mode 100644 index 0000000000..11823d2ec7 --- /dev/null +++ b/native/core/src/execution/rust_udf/imported_c.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adapter wrapping a C-ABI [`CometCScalarKernel`] as a DataFusion +//! [`ScalarUDFImpl`]. +//! +//! Lifecycle inside `invoke_with_args`: +//! +//! 1. Build a fresh [`CometCScalarKernelImpl`] via the kernel's `new_impl`. +//! 2. Call `init` with the input field types (and any scalar args) to get +//! the return type. +//! 3. Call `execute` once with the batch. +//! 4. Drop the impl (its `release` callback runs). + +use std::any::Any; +use std::ffi::CStr; +use std::sync::Mutex; + +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Field}; +use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema}; +use comet_udf_sdk::c_abi::{CometCScalarKernel, CometCScalarKernelImpl}; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; + +/// Adapter wrapping a [`CometCScalarKernel`] as a DataFusion +/// [`ScalarUDFImpl`]. +pub struct ImportedCScalarUdf { + name: String, + /// Boxed so the kernel's address is stable; held inside a Mutex + /// because the FFI Drop is not Sync-safe under concurrent invocation. + /// The kernel itself is logically immutable post-load — the lock only + /// protects the FFI calls' aliasing rules. (DataFusion serializes + /// invocations of a given ScalarUDFImpl per-batch through + /// invoke_with_args anyway; the lock is defensive.) + kernel: Mutex>, + signature: Signature, +} + +impl ImportedCScalarUdf { + /// Construct from an owned C kernel. + /// + /// Reads the kernel's name via its `function_name` callback and + /// stores it for `name()` lookups; the kernel itself is held inside + /// a mutex. + pub fn try_new(kernel: Box) -> Result { + let function_name_cb = kernel + .function_name + .ok_or_else(|| "kernel.function_name is null".to_string())?; + let _ = kernel + .new_impl + .ok_or_else(|| "kernel.new_impl is null".to_string())?; + + // SAFETY: function_name_cb is the FFI-supplied callback; + // implementations promise the returned pointer is a NUL-terminated + // UTF-8 string valid for the lifetime of the kernel. + let name_ptr = unsafe { function_name_cb(kernel.as_ref() as *const _) }; + if name_ptr.is_null() { + return Err("function_name returned null".into()); + } + let name = unsafe { CStr::from_ptr(name_ptr) } + .to_str() + .map_err(|e| format!("function_name not UTF-8: {e}"))? + .to_string(); + + // Use UserDefined signature: per-call init() is what decides + // whether the input types are acceptable. `coerce_types` is not + // implemented; user must pass exact types from the JVM register call. + let signature = Signature::new(TypeSignature::UserDefined, Volatility::Immutable); + + Ok(Self { + name, + kernel: Mutex::new(kernel), + signature, + }) + } +} + +impl std::fmt::Debug for ImportedCScalarUdf { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ImportedCScalarUdf") + .field("name", &self.name) + .finish() + } +} + +impl PartialEq for ImportedCScalarUdf { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} + +impl Eq for ImportedCScalarUdf {} + +impl std::hash::Hash for ImportedCScalarUdf { + fn hash(&self, state: &mut H) { + self.name.hash(state); + } +} + +impl ScalarUDFImpl for ImportedCScalarUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, args: &[DataType]) -> datafusion::common::Result { + // Build a fresh impl, call init, drop. Done at planning time so + // the planner can know the output type before execution. + let kernel = self.kernel.lock().unwrap(); + let mut impl_state = CometCScalarKernelImpl::default(); + let new_impl_cb = kernel + .new_impl + .ok_or_else(|| DataFusionError::Internal("new_impl is null".into()))?; + // SAFETY: new_impl_cb is the FFI-supplied factory; impl_state is a + // caller-allocated default value the cdylib writes into. + unsafe { + new_impl_cb(kernel.as_ref() as *const _, &mut impl_state); + } + + // Build input fields and FFI schemas. + let fields: Vec = args + .iter() + .map(|dt| Field::new("", dt.clone(), true)) + .collect(); + let ffi_schemas = build_ffi_schemas(&fields)?; + let ffi_schema_ptrs: Vec<*const FFI_ArrowSchema> = + ffi_schemas.iter().map(|s| s as *const _).collect(); + + let init_cb = impl_state + .init + .ok_or_else(|| DataFusionError::Internal("kernel impl missing init".into()))?; + let mut out_schema = FFI_ArrowSchema::empty(); + // SAFETY: pointers are valid for the duration of the call. + let rc = unsafe { + init_cb( + &mut impl_state, + ffi_schema_ptrs.as_ptr(), + std::ptr::null(), + fields.len() as i64, + &mut out_schema, + ) + }; + if rc != 0 { + let msg = read_last_error(&mut impl_state); + return Err(DataFusionError::Plan(format!( + "{}: init failed: {msg}", + self.name + ))); + } + let return_field = Field::try_from(&out_schema) + .map_err(|e| DataFusionError::Internal(format!("decoding return type: {e}")))?; + Ok(return_field.data_type().clone()) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + let n_rows = args.number_rows; + let kernel = self.kernel.lock().unwrap(); + + // Build a fresh impl_state; init then execute. + let new_impl_cb = kernel + .new_impl + .ok_or_else(|| DataFusionError::Internal("new_impl is null".into()))?; + let mut impl_state = CometCScalarKernelImpl::default(); + // SAFETY: see return_type. + unsafe { + new_impl_cb(kernel.as_ref() as *const _, &mut impl_state); + } + + // Resolve args to Arrays of length n_rows or 1. + let mut arrays: Vec = Vec::with_capacity(args.args.len()); + for a in args.args { + let arr = match a { + ColumnarValue::Array(arr) => arr, + ColumnarValue::Scalar(s) => s.to_array_of_size(n_rows)?, + }; + arrays.push(arr); + } + + // Build input fields + schemas (the kernel needs init to remember + // the arg types for execute). + let fields: Vec = arrays + .iter() + .map(|a| Field::new("", a.data_type().clone(), true)) + .collect(); + let ffi_schemas = build_ffi_schemas(&fields)?; + let ffi_schema_ptrs: Vec<*const FFI_ArrowSchema> = + ffi_schemas.iter().map(|s| s as *const _).collect(); + + let init_cb = impl_state + .init + .ok_or_else(|| DataFusionError::Internal("kernel impl missing init".into()))?; + let mut out_schema = FFI_ArrowSchema::empty(); + // SAFETY: ffi_schema_ptrs lives for the duration of this call. + let rc = unsafe { + init_cb( + &mut impl_state, + ffi_schema_ptrs.as_ptr(), + std::ptr::null(), + fields.len() as i64, + &mut out_schema, + ) + }; + if rc != 0 { + let msg = read_last_error(&mut impl_state); + return Err(DataFusionError::Execution(format!( + "{}: init failed: {msg}", + self.name + ))); + } + let return_field = Field::try_from(&out_schema) + .map_err(|e| DataFusionError::Internal(format!("decoding return type: {e}")))?; + + // Build FFI arrays. + let mut ffi_arrays: Vec = arrays + .iter() + .map(|a| FFI_ArrowArray::new(&a.to_data())) + .collect(); + let ffi_array_ptrs: Vec<*mut FFI_ArrowArray> = + ffi_arrays.iter_mut().map(|x| x as *mut _).collect(); + + let execute_cb = impl_state + .execute + .ok_or_else(|| DataFusionError::Internal("kernel impl missing execute".into()))?; + let mut out_arr = FFI_ArrowArray::empty(); + // SAFETY: ffi_array_ptrs live for the duration of this call. The + // kernel takes ownership of each input by replacing it with an + // empty FFI_ArrowArray (no-op Drop). + let rc = unsafe { + execute_cb( + &mut impl_state, + ffi_array_ptrs.as_ptr(), + arrays.len() as i64, + n_rows as i64, + &mut out_arr, + ) + }; + + if rc != 0 { + let msg = read_last_error(&mut impl_state); + return Err(DataFusionError::Execution(format!( + "{}: execute failed: {msg}", + self.name + ))); + } + + // Import result. + // SAFETY: out_arr was filled by the cdylib. + let data = unsafe { + from_ffi_and_data_type(out_arr, return_field.data_type().clone()) + } + .map_err(|e| DataFusionError::Execution(format!("from_ffi: {e}")))?; + let array = arrow::array::make_array(data); + if array.len() != n_rows { + return Err(DataFusionError::Execution(format!( + "{}: returned {} rows, expected {n_rows}", + self.name, + array.len() + ))); + } + Ok(ColumnarValue::Array(array)) + } +} + +fn build_ffi_schemas(fields: &[Field]) -> datafusion::common::Result> { + fields + .iter() + .map(FFI_ArrowSchema::try_from) + .collect::, _>>() + .map_err(|e| DataFusionError::Internal(format!("encoding arg type: {e}"))) +} + +fn read_last_error(impl_state: &mut CometCScalarKernelImpl) -> String { + let cb = match impl_state.get_last_error { + Some(cb) => cb, + None => return "(no get_last_error)".to_string(), + }; + // SAFETY: cb is the FFI-supplied callback. + let ptr = unsafe { cb(impl_state) }; + if ptr.is_null() { + return "(empty)".to_string(); + } + unsafe { CStr::from_ptr(ptr) } + .to_string_lossy() + .into_owned() +} diff --git a/native/core/src/execution/rust_udf/loader.rs b/native/core/src/execution/rust_udf/loader.rs new file mode 100644 index 0000000000..eaa5bff1c4 --- /dev/null +++ b/native/core/src/execution/rust_udf/loader.rs @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Loader: open a UDF cdylib via libloading, validate the ABI version, +//! discover UDFs via the C-ABI and/or datafusion-ffi entry points, and +//! produce DataFusion `ScalarUDFImpl` impls for each. +//! +//! See `super::mod.rs` for an overview of the two flavors. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use comet_udf_sdk::c_abi::CometCScalarKernelList; +use comet_udf_sdk::df_abi::CometDfUdfList; +use comet_udf_sdk::{ + ABI_VERSION_SYMBOL, COMET_UDF_ABI_VERSION, C_ABI_DISCOVERY_SYMBOL, DF_ABI_DISCOVERY_SYMBOL, +}; +use datafusion::logical_expr::ScalarUDFImpl; +use datafusion_ffi::udf::FFI_ScalarUDF; +use libloading::{Library, Symbol}; + +use super::imported_c::ImportedCScalarUdf; + +/// Which ABI flavor a given UDF was loaded through. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UdfAbi { + /// Loaded via the pure-C arrow-ffi flavor (sedona-style). + C, + /// Loaded via datafusion-ffi (`FFI_ScalarUDF`). + DataFusion, +} + +/// One loaded UDF: name, ABI flavor, and a `ScalarUDFImpl` ready to plug +/// into the planner. +pub struct LoadedUdf { + /// UDF name as exposed by the cdylib. + pub name: String, + /// Which ABI this UDF was discovered through. + pub abi: UdfAbi, + /// The `ScalarUDFImpl` adapter the planner will wrap. + pub udf_impl: Arc, +} + +impl std::fmt::Debug for LoadedUdf { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LoadedUdf") + .field("name", &self.name) + .field("abi", &self.abi) + .finish() + } +} + +/// Result of loading a UDF cdylib: the live `Library` plus per-UDF +/// adapters. +pub struct LoadedLibrary { + /// Canonicalized path the library was loaded from. + pub path: PathBuf, + /// The loaded `Library`. Held inside an `Arc` so loaded UDFs can + /// outlive lookups. Library is never unloaded for the process lifetime. + pub library: Arc, + /// One entry per UDF, with name and ScalarUDFImpl already built. + pub udfs: Vec, +} + +impl std::fmt::Debug for LoadedLibrary { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LoadedLibrary") + .field("path", &self.path) + .field("udfs", &self.udfs) + .finish() + } +} + +/// Errors returned by the loader. +#[derive(Debug)] +pub enum LoaderError { + /// `libloading::Library::new` failed. + Open { + /// Path that was passed to `Library::new`. + path: PathBuf, + /// Underlying error. + source: libloading::Error, + }, + /// `comet_udf_abi_version` is missing or returned an unexpected value. + AbiMismatch { + /// Path of the offending library. + path: PathBuf, + /// Version reported by the cdylib (or `None` if the symbol is missing). + found: Option, + /// Version this host expects. + expected: u32, + }, + /// Library exposes neither `comet_c_udf_list_v1` nor + /// `comet_df_udf_list_v1`. + NoDiscovery { + /// Path of the offending library. + path: PathBuf, + }, + /// One of the discovery functions returned a non-zero rc, or a + /// kernel/UDF entry was malformed. + Discovery { + /// Path of the library. + path: PathBuf, + /// Human-readable reason. + reason: String, + }, +} + +impl std::fmt::Display for LoaderError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use LoaderError::*; + match self { + Open { path, source } => write!(f, "failed to open {}: {source}", path.display()), + AbiMismatch { path, found, expected } => match found { + Some(v) => write!( + f, + "{} reports ABI v{v}, host expects v{expected}", + path.display() + ), + None => write!( + f, + "{} missing required symbol {ABI_VERSION_SYMBOL}", + path.display() + ), + }, + NoDiscovery { path } => write!( + f, + "{} exposes neither {C_ABI_DISCOVERY_SYMBOL} nor \ + {DF_ABI_DISCOVERY_SYMBOL}", + path.display() + ), + Discovery { path, reason } => write!(f, "{}: {reason}", path.display()), + } + } +} + +impl std::error::Error for LoaderError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + LoaderError::Open { source, .. } => Some(source), + _ => None, + } + } +} + +/// Open and validate a UDF cdylib. +pub fn load(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + // SAFETY: `Library::new` runs the cdylib's static initializers. We + // accept this risk because user UDF cdylibs are explicitly registered + // by an operator via `CometRustUDF.register`. + let library = unsafe { Library::new(&path) } + .map_err(|source| LoaderError::Open { path: path.clone(), source })?; + + // ABI version probe. + let v = read_abi_version(&library, &path)?; + if v != COMET_UDF_ABI_VERSION { + return Err(LoaderError::AbiMismatch { + path, + found: Some(v), + expected: COMET_UDF_ABI_VERSION, + }); + } + + let mut udfs: Vec = Vec::new(); + let mut have_any = false; + + if let Some(c_kernels) = read_c_kernels(&library, &path)? { + have_any = true; + for udf in c_kernels { + udfs.push(udf); + } + } + + if let Some(df_udfs) = read_df_udfs(&library, &path)? { + have_any = true; + for udf in df_udfs { + // Prefer C-ABI registration on collision. + if udfs.iter().any(|u| u.name == udf.name) { + continue; + } + udfs.push(udf); + } + } + + if !have_any { + return Err(LoaderError::NoDiscovery { path }); + } + + Ok(LoadedLibrary { + path, + library: Arc::new(library), + udfs, + }) +} + +fn read_abi_version(lib: &Library, path: &Path) -> Result { + let sym: Symbol u32> = + unsafe { lib.get(ABI_VERSION_SYMBOL.as_bytes()) } + .map_err(|_| LoaderError::AbiMismatch { + path: path.to_path_buf(), + found: None, + expected: COMET_UDF_ABI_VERSION, + })?; + // SAFETY: comet_udf_abi_version takes no arguments, returns u32, no side effects. + Ok(unsafe { sym() }) +} + +fn read_c_kernels( + lib: &Library, + path: &Path, +) -> Result>, LoaderError> { + let sym: Symbol< + unsafe extern "C" fn(*mut CometCScalarKernelList) -> i32, + > = match unsafe { lib.get(C_ABI_DISCOVERY_SYMBOL.as_bytes()) } { + Ok(s) => s, + Err(_) => return Ok(None), + }; + let mut list = CometCScalarKernelList::default(); + // SAFETY: list is caller-allocated; the cdylib writes into it via `out`. + let rc = unsafe { sym(&mut list) }; + if rc != 0 { + return Err(LoaderError::Discovery { + path: path.to_path_buf(), + reason: format!("{C_ABI_DISCOVERY_SYMBOL} returned rc={rc}"), + }); + } + let mut udfs = Vec::with_capacity(list.len.max(0) as usize); + if !list.kernels.is_null() && list.len > 0 { + // Move each kernel out of the array into a Box so it owns itself. + // We can't simply read each entry because they implement Drop; + // doing it via std::ptr::read transfers ownership cleanly. + let len = list.len as usize; + for i in 0..len { + // SAFETY: the kernel array was produced by the cdylib's + // `comet_c_udf_export!` and contains `len` valid entries. + // We move each entry out into a Box so its Drop runs when + // the host releases the loaded library. + let raw = unsafe { list.kernels.add(i) }; + let kernel = unsafe { std::ptr::read(raw) }; + // Replace the slot with a default kernel (no callbacks) so + // the array's release doesn't double-free. + unsafe { + std::ptr::write( + raw, + comet_udf_sdk::c_abi::CometCScalarKernel::default(), + ); + } + let imported = ImportedCScalarUdf::try_new(Box::new(kernel)).map_err(|e| { + LoaderError::Discovery { + path: path.to_path_buf(), + reason: format!("import C kernel idx={i}: {e}"), + } + })?; + udfs.push(LoadedUdf { + name: imported.name().to_string(), + abi: UdfAbi::C, + udf_impl: Arc::new(imported), + }); + } + } + // list's Drop releases the array storage. + drop(list); + Ok(Some(udfs)) +} + +fn read_df_udfs( + lib: &Library, + path: &Path, +) -> Result>, LoaderError> { + let sym: Symbol i32> = + match unsafe { lib.get(DF_ABI_DISCOVERY_SYMBOL.as_bytes()) } { + Ok(s) => s, + Err(_) => return Ok(None), + }; + let mut list = CometDfUdfList::default(); + // SAFETY: list is caller-allocated; the cdylib writes into it. + let rc = unsafe { sym(&mut list) }; + if rc != 0 { + return Err(LoaderError::Discovery { + path: path.to_path_buf(), + reason: format!("{DF_ABI_DISCOVERY_SYMBOL} returned rc={rc}"), + }); + } + let mut udfs = Vec::with_capacity(list.len.max(0) as usize); + if !list.udfs.is_null() && list.len > 0 { + let len = list.len as usize; + for i in 0..len { + // SAFETY: the FFI_ScalarUDF array was produced by the cdylib's + // `comet_df_udf_export!` and contains `len` valid entries. + let entry: &FFI_ScalarUDF = unsafe { &*list.udfs.add(i) }; + // Convert to ScalarUDFImpl via the canonical From impl; + // the resulting Arc clones the FFI wrapper internally. + let imp: Arc = entry.into(); + udfs.push(LoadedUdf { + name: imp.name().to_string(), + abi: UdfAbi::DataFusion, + udf_impl: imp, + }); + } + } + // list's Drop releases the array storage; each FFI_ScalarUDF entry + // remains valid for the lifetime of the loaded Library, since the + // resulting ScalarUDFImpls cloned the wrappers via the From impl. + drop(list); + Ok(Some(udfs)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::execution::rust_udf::test_support::test_udfs_path; + + #[test] + fn load_test_udfs_succeeds() { + let lib = load(test_udfs_path()).expect("load"); + let names: Vec<_> = lib.udfs.iter().map(|u| u.name.as_str()).collect(); + assert!(names.contains(&"add_one_c"), "names: {names:?}"); + assert!(names.contains(&"add_one_df"), "names: {names:?}"); + } + + #[test] + fn missing_path_errors_open() { + let err = load("/no/such/path.so").unwrap_err(); + assert!(matches!(err, LoaderError::Open { .. }), "got: {err:?}"); + } + + #[test] + fn c_and_df_abi_have_expected_flavors() { + let lib = load(test_udfs_path()).expect("load"); + let c_udf = lib.udfs.iter().find(|u| u.name == "add_one_c").unwrap(); + let df_udf = lib.udfs.iter().find(|u| u.name == "add_one_df").unwrap(); + assert_eq!(c_udf.abi, UdfAbi::C); + assert_eq!(df_udf.abi, UdfAbi::DataFusion); + } +} diff --git a/native/core/src/execution/rust_udf/mod.rs b/native/core/src/execution/rust_udf/mod.rs new file mode 100644 index 0000000000..9bb1fb345f --- /dev/null +++ b/native/core/src/execution/rust_udf/mod.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Loader and adapters for user-supplied Rust UDF cdylibs registered +//! through `CometRustUDF` on the JVM side. +//! +//! Two ABI flavors are supported, both built only on Arrow's stable FFI +//! (the C Data Interface): +//! +//! - **C ABI** — `comet_c_udf_list_v1` returns sedona-style +//! `CometCScalarKernel` factory structs. Decoupled from datafusion +//! versions; suitable for cross-language (C/C++) extensions. +//! +//! - **datafusion-ffi ABI** — `comet_df_udf_list_v1` returns +//! `FFI_ScalarUDF` values. The user's library inherits the entire +//! `ScalarUDFImpl` surface for free, at the cost of a major-version +//! pin against `datafusion-ffi`. +//! +//! A single library may export either or both. The loader walks both +//! discovery functions and exposes whichever it finds; in the rare case +//! of a name collision the C ABI flavor wins (favoring the +//! version-decoupled implementation). + +pub mod cache; +pub mod imported_c; +pub mod loader; + +#[cfg(test)] +pub(crate) mod test_support; diff --git a/native/core/src/execution/rust_udf/test_support.rs b/native/core/src/execution/rust_udf/test_support.rs new file mode 100644 index 0000000000..a1ea8ac643 --- /dev/null +++ b/native/core/src/execution/rust_udf/test_support.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test helpers shared across the rust_udf submodules. + +/// Path to the `comet-test-udfs` cdylib, baked in at build time by +/// `core/build.rs`. +pub(crate) fn test_udfs_path() -> std::path::PathBuf { + std::path::PathBuf::from(env!("COMET_TEST_UDFS_LIB")) +} diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index f4ae0b8834..aaa08e0fc2 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -67,6 +67,7 @@ use errors::{try_unwrap_or_throw, CometError, CometResult}; pub mod execution; pub mod parquet; +pub mod comet_rust_udf_bridge; // this module is for non release only. Intended for debugging/profiling purposes #[cfg(debug_assertions)] pub mod debug; diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 90e3d87032..7ff3b0f462 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -91,6 +91,7 @@ message Expr { HoursTransform hours_transform = 68; ArraysZip arrays_zip = 69; JvmScalarUdf jvm_scalar_udf = 70; + RustUdfCall rust_udf_call = 71; } // Optional QueryContext for error reporting (contains SQL text and position) @@ -530,3 +531,22 @@ message JvmScalarUdf { // Whether the result column may contain nulls. bool return_nullable = 4; } + +// Call to a user-supplied Rust UDF loaded from a cdylib. +// +// The native side resolves (library_path, name) against its loaded-library +// cache, looks up the kernel by name, and invokes it through whichever ABI +// flavor (C ABI / datafusion-ffi) the cdylib registered the kernel under. +message RustUdfCall { + // Function name as registered through CometRustUDF.register on the JVM + // side; matched against names exposed by the cdylib. + string name = 1; + // Filesystem path of the cdylib. + string library_path = 2; + // Argument expressions, evaluated before invocation. + repeated Expr args = 3; + // Expected return type, declared at register time on the JVM side. + DataType return_type = 4; + // Whether the call is deterministic (mirrors Spark's deterministic flag). + bool deterministic = 5; +} diff --git a/pom.xml b/pom.xml index 7419fecc92..9e83cae5f5 100644 --- a/pom.xml +++ b/pom.xml @@ -917,6 +917,8 @@ under the License. file:src/test/resources/log4j2.properties true ${project.build.directory}/tmp + + ${comet.test.udfs.lib} diff --git a/spark/src/main/java/org/apache/comet/udf/CometRustUdfBridge.java b/spark/src/main/java/org/apache/comet/udf/CometRustUdfBridge.java new file mode 100644 index 0000000000..3d40f7b13a --- /dev/null +++ b/spark/src/main/java/org/apache/comet/udf/CometRustUdfBridge.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf; + +import org.apache.comet.NativeBase; + +/** JNI bridge for driver-side Rust UDF library validation. */ +public final class CometRustUdfBridge extends NativeBase { + private CometRustUdfBridge() {} + + /** + * Validate that {@code libraryPath} loads, exposes a UDF named {@code expectedName}, and return a + * JSON description of that UDF. Throws RuntimeException on any error. + * + *

The returned JSON has the form: {@code {"name":"add_one_c","abi":"c-abi"}} + */ + public static native String validateLibrary(String libraryPath, String expectedName); + + /** + * Return a JSON array describing every UDF exposed by {@code libraryPath}. Each element has the + * same shape as the return value of {@link #validateLibrary}. + */ + public static native String listUdfs(String libraryPath); +} diff --git a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala index 852e80ae44..2ca0c183cc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometScalaUDF.scala @@ -28,6 +28,7 @@ import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.codegen.CometBatchKernelCodegen import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, serializeDataType} +import org.apache.comet.udf.CometRustUdfRegistry import org.apache.comet.udf.codegen.CometScalaUDFCodegen /** @@ -52,8 +53,42 @@ import org.apache.comet.udf.codegen.CometScalaUDFCodegen */ object CometScalaUDF extends CometExpressionSerde[ScalaUDF] { - override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = - emitJvmCodegenDispatch(expr, inputs, binding) + override def convert(expr: ScalaUDF, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { + // First check if this udfName is a registered Rust UDF -- those get emitted as RustUdfCall + // and dispatched to the loaded cdylib rather than the JVM codegen dispatcher. + expr.udfName.flatMap(CometRustUdfRegistry.instance.get) match { + case Some(meta) => + emitRustUdfCall(expr, meta.libraryPath, meta.returnType, inputs, binding) + case None => + emitJvmCodegenDispatch(expr, inputs, binding) + } + } + + private def emitRustUdfCall( + expr: ScalaUDF, + libraryPath: String, + returnType: org.apache.spark.sql.types.DataType, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val name = expr.udfName.get + val argProtos = expr.children.map(c => exprToProtoInternal(c, inputs, binding)) + if (argProtos.exists(_.isEmpty)) { + withInfo(expr, "one or more Rust UDF arguments are not supported", expr.children: _*) + return None + } + val returnTypeProto = serializeDataType(returnType).getOrElse { + withInfo(expr, s"return type $returnType not serializable", expr) + return None + } + val callBuilder = ExprOuterClass.RustUdfCall + .newBuilder() + .setName(name) + .setLibraryPath(libraryPath) + .setReturnType(returnTypeProto) + .setDeterministic(expr.deterministic) + argProtos.foreach(a => callBuilder.addArgs(a.get)) + Some(ExprOuterClass.Expr.newBuilder().setRustUdfCall(callBuilder.build()).build()) + } /** * Bind `expr`, closure-serialize it, and emit a `JvmScalarUdf` proto routed through diff --git a/spark/src/main/scala/org/apache/comet/udf/CometRustUDF.scala b/spark/src/main/scala/org/apache/comet/udf/CometRustUDF.scala new file mode 100644 index 0000000000..44d41c2e3d --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/udf/CometRustUDF.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import scala.util.Try + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.DataType + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.ObjectNode + +/** + * Public entry point for registering Rust scalar UDFs with Comet. + * + * Two ABI flavors are supported transparently -- the cdylib registers each UDF under one of: + * - `c-abi`: pure C / Arrow C Data Interface (sedona-style) + * - `datafusion-ffi`: `FFI_ScalarUDF` + * + * The user always calls `register` / `registerAll`; the native side picks the right ABI by name. + */ +object CometRustUDF { + + /** Spark conf key under which registered Rust UDF entries are propagated to executors. */ + val RUST_UDFS_CONF_KEY = "spark.comet.rustUdfs" + + private val mapper: ObjectMapper = new ObjectMapper() + + /** + * Register a single Rust UDF with an explicit signature. + * + * Validates the library on the driver (loads it, confirms a UDF named `name` exists). On + * success a stub Spark catalog UDF is installed (so SQL/DataFrame name resolution succeeds), + * the driver-side registry is updated, and `spark.comet.rustUdfs` is updated so executors see + * the registration. + */ + def register( + spark: SparkSession, + name: String, + libraryPath: String, + inputTypes: Seq[DataType], + returnType: DataType, + deterministic: Boolean = true): Unit = { + val described = describeOne(libraryPath, name) + require(described.name == name, s"unexpected name from native: ${described.name}") + installCatalogStub(spark, name, inputTypes, returnType, deterministic) + val meta = RustUdfMetadata(libraryPath, inputTypes, returnType, deterministic) + CometRustUdfRegistry.instance.register(name, meta) + propagateConf(spark) + } + + // -------- internals -------- + + private case class Described(name: String, abi: String) + + private def describeOne(libraryPath: String, name: String): Described = { + val json = + invokeBridge(() => CometRustUdfBridge.validateLibrary(libraryPath, name), libraryPath) + parseDescribed(json) + } + + private def invokeBridge(call: () => String, libraryPath: String): String = { + Try(call()).recover { case t: Throwable => throw classifyNativeError(libraryPath, t) }.get + } + + private def parseDescribed(json: String): Described = { + val node = mapper.readTree(json).asInstanceOf[ObjectNode] + Described(name = node.get("name").asText(), abi = node.get("abi").asText()) + } + + private def classifyNativeError(libraryPath: String, t: Throwable): RuntimeException = { + val m = Option(t.getMessage).getOrElse("") + if (m.contains("ABI") || m.contains("missing required symbol") || + m.contains("comet_udf_abi_version") || m.contains("exposes neither")) { + new CometRustUdfAbiException(m) + } else if (m.contains("not found in")) { + new java.util.NoSuchElementException(m) + } else { + new CometRustUdfLoadException(s"failed to load $libraryPath: $m", t) + } + } + + private def installCatalogStub( + spark: SparkSession, + name: String, + inputTypes: Seq[DataType], + returnType: DataType, + deterministic: Boolean): Unit = { + val arity = inputTypes.size + val u: UserDefinedFunction = arity match { + case 0 => + udf(() => throw new CometRustUdfNotEvaluatedException(name), returnType) + case 1 => + udf((_: Any) => throw new CometRustUdfNotEvaluatedException(name), returnType) + case 2 => + udf((_: Any, _: Any) => throw new CometRustUdfNotEvaluatedException(name), returnType) + case 3 => + udf( + (_: Any, _: Any, _: Any) => throw new CometRustUdfNotEvaluatedException(name), + returnType) + case 4 => + udf( + (_: Any, _: Any, _: Any, _: Any) => throw new CometRustUdfNotEvaluatedException(name), + returnType) + case n => + throw new IllegalArgumentException( + s"Rust UDF '$name' arity $n not supported by stub. Reduce arity " + + s"or open a feature request to extend stub coverage.") + } + val finalUdf = if (deterministic) u else u.asNondeterministic() + spark.udf.register(name, finalUdf) + } + + private def propagateConf(spark: SparkSession): Unit = { + val snapshot = CometRustUdfRegistry.instance.snapshot + val entries = snapshot.toSeq.map { case (name, meta) => + mapper.writeValueAsString(java.util.Map.of("name", name, "libraryPath", meta.libraryPath)) + } + spark.conf.set(RUST_UDFS_CONF_KEY, entries.mkString(";")) + } +} diff --git a/spark/src/main/scala/org/apache/comet/udf/CometRustUdfExceptions.scala b/spark/src/main/scala/org/apache/comet/udf/CometRustUdfExceptions.scala new file mode 100644 index 0000000000..3b239aa6a5 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/udf/CometRustUdfExceptions.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import org.apache.comet.{CometNativeException, CometRuntimeException} + +/** Thrown when a Rust UDF dynamic library cannot be opened. */ +class CometRustUdfLoadException(msg: String, cause: Throwable = null) + extends CometNativeException(msg) { + if (cause != null) initCause(cause) +} + +/** + * Thrown when a Rust UDF library exposes the wrong ABI version or is missing required discovery + * symbols. + */ +class CometRustUdfAbiException(msg: String) extends CometNativeException(msg) + +/** + * Thrown when the declared signature does not match what the library reports via + * `comet_*_udf_list_v1`. + */ +class CometRustUdfSignatureException(msg: String) extends CometRuntimeException(msg) + +/** + * Thrown by the catalog stub if a registered Rust UDF is invoked on the JVM (which means Comet's + * plan rule did not replace it). + */ +class CometRustUdfNotEvaluatedException(name: String) + extends CometRuntimeException( + s"Rust UDF '$name' must run inside Comet native execution; the JVM " + + s"stub was invoked, which means Comet did not replace this expression " + + s"with a native call. Check that Comet is enabled for the operator " + + s"hosting this expression.") diff --git a/spark/src/main/scala/org/apache/comet/udf/CometRustUdfRegistry.scala b/spark/src/main/scala/org/apache/comet/udf/CometRustUdfRegistry.scala new file mode 100644 index 0000000000..ecc12ab458 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/udf/CometRustUdfRegistry.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.util.concurrent.ConcurrentHashMap + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.types.DataType + +/** Metadata for a registered Rust UDF. */ +case class RustUdfMetadata( + libraryPath: String, + inputTypes: Seq[DataType], + returnType: DataType, + deterministic: Boolean) + +/** + * Driver-side registry of Rust UDFs. Looked up by `QueryPlanSerde` to recognize names that should + * be emitted as `RustUdfCall` instead of attempted as JVM-evaluated `ScalaUDF`s. + */ +class CometRustUdfRegistry { + private val byName = new ConcurrentHashMap[String, RustUdfMetadata]() + + /** Register or replace metadata for a name. */ + def register(name: String, meta: RustUdfMetadata): Unit = + byName.put(name, meta) + + /** Return metadata for a name, if registered. */ + def get(name: String): Option[RustUdfMetadata] = + Option(byName.get(name)) + + /** Snapshot the registered set as an immutable Map. */ + def snapshot: Map[String, RustUdfMetadata] = + byName.asScala.toMap +} + +object CometRustUdfRegistry { + + /** Process-wide singleton. */ + lazy val instance: CometRustUdfRegistry = new CometRustUdfRegistry +} diff --git a/spark/src/test/scala/org/apache/comet/CometRustUdfSuite.scala b/spark/src/test/scala/org/apache/comet/CometRustUdfSuite.scala new file mode 100644 index 0000000000..fd4f8553a3 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometRustUdfSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.types.LongType + +import org.apache.comet.udf.CometRustUDF + +/** + * End-to-end integration suite: register a Rust UDF, run a Spark query, verify the result. + * + * Exercises both ABI flavors that the test cdylib exports: + * - `add_one_c` via the pure C ABI (Arrow C Data Interface only) + * - `add_one_df` via datafusion-ffi (`FFI_ScalarUDF`) + * + * Requires the test cdylib at the path given by the system property `comet.test.udfs.lib`. + * + * To run locally: + * {{{ + * cargo build -p comet-test-udfs --manifest-path native/Cargo.toml + * ./mvnw test -pl spark -DwildcardSuites="CometRustUdfSuite" \ + * -Dcomet.test.udfs.lib=$PWD/native/target/debug/libcomet_test_udfs.dylib + * }}} + */ +class CometRustUdfSuite extends CometTestBase { + + private lazy val libPath: String = { + val p = System.getProperty("comet.test.udfs.lib") + if (p == null) { + cancel("set -Dcomet.test.udfs.lib=; skipping without cdylib") + } + p + } + + test("C ABI: add_one_c returns id + 1 for a range") { + CometRustUDF.register(spark, "add_one_c", libPath, Seq(LongType), LongType) + val df = spark.range(0, 5).selectExpr("add_one_c(id) AS y") + val out = df.collect().map(_.getLong(0)).sorted.toSeq + assert(out == Seq(1L, 2L, 3L, 4L, 5L)) + } + + test("datafusion-ffi: add_one_df returns id + 1 for a range") { + CometRustUDF.register(spark, "add_one_df", libPath, Seq(LongType), LongType) + val df = spark.range(0, 5).selectExpr("add_one_df(id) AS y") + val out = df.collect().map(_.getLong(0)).sorted.toSeq + assert(out == Seq(1L, 2L, 3L, 4L, 5L)) + } +}