From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/5] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/5] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 9da0edb7f4e7801611bd7167b896cb9f0801a59f Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 2 May 2026 21:36:01 +0400 Subject: [PATCH 3/5] WIP --- native/Cargo.lock | 53 +++++++++++++++++++ native/core/Cargo.toml | 2 +- .../src/execution/operators/parquet_writer.rs | 4 +- native/core/src/parquet/parquet_support.rs | 15 ++++++ .../operator/CometDataWritingCommand.scala | 12 +++-- 5 files changed, 81 insertions(+), 5 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index b7416c3bbe..02c12ac3b6 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -4635,6 +4635,7 @@ dependencies = [ "opendal-layer-retry", "opendal-layer-timeout", "opendal-service-hdfs", + "opendal-service-s3", ] [[package]] @@ -4717,6 +4718,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-s3" +version = "0.55.0" +source = "git+https://github.com/apache/opendal?rev=6909efcdfd12b3b2ac3a76f654c35ee576811512#6909efcdfd12b3b2ac3a76f654c35ee576811512" +dependencies = [ + "base64", + "bytes", + "crc32c", + "http 1.4.0", + "log", + "md-5", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "url", +] + [[package]] name = "openssl-probe" version = "0.2.1" @@ -5558,6 +5579,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", + "serde_urlencoded", + "sha1", +] + [[package]] name = "reqsign-core" version = "3.0.0" @@ -5580,6 +5622,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d54a03b7b6..3a26a7b72d 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -72,7 +72,7 @@ datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", package = "object_store_opendal", optional = true} hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} -opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", optional = true, features = ["services-hdfs"] } +opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", optional = true, features = ["services-hdfs", "services-s3"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } serde_json = "1.0" diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..284095e113 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -32,7 +32,7 @@ use opendal::Operator; use std::io::Cursor; use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::is_hdfs_scheme; +use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; #[cfg(feature = "hdfs-opendal")] use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; use arrow::datatypes::{Schema, SchemaRef}; @@ -335,6 +335,8 @@ impl ParquetWriterExec { "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), )) } + } else if is_s3_scheme(&url) { + } else if output_file_path.starts_with("file://") || output_file_path.starts_with("file:") || !output_file_path.contains("://") diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3418a17c43..7ee2c2de33 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -372,6 +372,11 @@ pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) } } +pub fn is_s3_scheme(url: &Url) -> bool { + let scheme = url.scheme(); + scheme == "s3a" +} + // Creates an HDFS object store from a URL using the native HDFS implementation #[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] fn create_hdfs_object_store( @@ -404,6 +409,16 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { + let builder = opendal::services::S3::default(); + opendal::Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "hdfs-opendal", + source: error.into(), + }) + .map(|op| op.finish()) +} + // Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] pub(crate) fn create_hdfs_object_store( diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 69b9bd5f85..b600740cb0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -46,6 +46,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd") + private val supportedFilesystemProtocols = Set("file", "hdfs", "s3a") + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED) @@ -58,9 +60,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString - .startsWith("hdfs:")) { - return Unsupported(Some("Supported output filesystems: local, HDFS")) + if (!isSupportedFilesystemProtocol(cmd.outputPath.toString)) { + return Unsupported(Some("Supported output filesystems: local, HDFS, S3")) } if (cmd.bucketSpec.isDefined) { @@ -204,6 +205,11 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId) } + private def isSupportedFilesystemProtocol(outputPath: String) = { + supportedFilesystemProtocols + .exists(protocol => outputPath.startsWith(s"${protocol}:")) + } + private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { cmd.options .getOrElse( From f2bce23062d47b56e7c64cf66ab3fbf876a0fdd4 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 5 May 2026 22:01:06 +0400 Subject: [PATCH 4/5] work --- .../src/execution/operators/parquet_writer.rs | 37 ++++++++++++++++++- native/core/src/parquet/parquet_support.rs | 4 +- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 284095e113..fe51dd4a8b 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -34,7 +34,7 @@ use std::io::Cursor; use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; +use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs, create_s3_operator}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -336,7 +336,42 @@ impl ParquetWriterExec { )) } } else if is_s3_scheme(&url) { + let (_object_store_url, object_store_path) = prepare_object_store_with_configs( + _runtime_env, + output_file_path.to_string(), + object_store_options, + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{}': {}", + output_file_path, e + )) + })?; + // For remote storage (HDFS, S3), write to an in-memory buffer + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create S3 writer: {}", e)) + })?; + + // Create S3 operator with configuration options using the helper function + let op = create_s3_operator(&url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{}': {}", + output_file_path, e + )) + })?; + + // HDFS writer will be created lazily on first write + // Use the path from prepare_object_store_with_configs + Ok(ParquetWriter::Remote( + arrow_parquet_buffer_writer, + None, + op, + object_store_path.to_string(), + )) } else if output_file_path.starts_with("file://") || output_file_path.starts_with("file:") || !output_file_path.contains("://") diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 7ee2c2de33..51a68cf271 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -409,11 +409,11 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { +pub(crate) fn create_s3_operator(_url: &Url) -> Result { let builder = opendal::services::S3::default(); opendal::Operator::new(builder) .map_err(|error| object_store::Error::Generic { - store: "hdfs-opendal", + store: "s3-opendal", source: error.into(), }) .map(|op| op.finish()) From 2b2acb1ebfe540292881a60814c18381c03642df Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 31 May 2026 19:29:39 +0400 Subject: [PATCH 5/5] Complete native s3 write draft feature --- .../src/execution/operators/parquet_writer.rs | 607 +++++++++--------- native/core/src/parquet/parquet_support.rs | 15 - 2 files changed, 310 insertions(+), 312 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index fe51dd4a8b..4edd5617d3 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,27 +17,14 @@ //! Parquet writer operator for writing RecordBatches to Parquet files -use std::{ - any::Any, - collections::HashMap, - fmt, - fmt::{Debug, Formatter}, - fs::File, - sync::Arc, -}; - -#[cfg(feature = "hdfs-opendal")] -use opendal::Operator; -#[cfg(feature = "hdfs-opendal")] -use std::io::Cursor; - use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; +use crate::parquet::parquet_support::is_hdfs_scheme; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs, create_s3_operator}; +use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -51,141 +38,325 @@ use datafusion::{ }, }; use futures::TryStreamExt; +#[cfg(feature = "hdfs-opendal")] +use opendal::Operator; +use parquet::errors::ParquetError; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; +use std::fs::create_dir_all; +#[cfg(feature = "hdfs-opendal")] +use std::io::Cursor; +use std::path::Path; +use std::{ + any::Any, + collections::HashMap, + fmt, + fmt::{Debug, Formatter}, + fs::File, + sync::Arc, +}; use url::Url; -/// Enum representing different types of Arrow writers based on storage backend -enum ParquetWriter { - /// Writer for local file system - LocalFile(ArrowWriter), - /// Writer for HDFS or other remote storage (writes to in-memory buffer) - /// Contains the arrow writer, HDFS operator, and destination path - /// an Arrow writer writes to in-memory buffer the data converted to Parquet format - /// The opendal::Writer is created lazily on first write - #[cfg(feature = "hdfs-opendal")] - Remote( - ArrowWriter>>, - Option, - Operator, - String, - ), +// A trait abstracting over different Parquet write targets (local filesystem, HDFS, S3, etc.) +#[async_trait] +trait ParquetWriter: Send { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError>; + async fn close(self: Box) -> Result<(), ParquetError>; } -impl ParquetWriter { - /// Write a RecordBatch to the underlying writer - async fn write( - &mut self, - batch: &RecordBatch, - ) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => writer.write(batch), - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - hdfs_writer_opt, - op, - output_path, - ) => { - // Write batch to in-memory buffer - arrow_parquet_buffer_writer.write(batch)?; - - // Flush and get the current buffer content - arrow_parquet_buffer_writer.flush()?; - let cursor = arrow_parquet_buffer_writer.inner_mut(); - let current_data = cursor.get_ref().clone(); - - // Create HDFS writer lazily on first write - if hdfs_writer_opt.is_none() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - *hdfs_writer_opt = Some(writer); - } - - // Write the accumulated data to HDFS - if let Some(hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(current_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write batch to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - } +// A `ParquetWriter` implementation that writes directly to a local file. +struct LocalFileWriter { + writer: ArrowWriter, +} - // Clear the buffer after upload - cursor.get_mut().clear(); - cursor.set_position(0); +impl LocalFileWriter { + fn try_new(path: &str, schema: SchemaRef, props: WriterProperties) -> Result { + let local_path = path + .strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path); - Ok(()) - } + let output_dir = Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{local_path}'" + )) + })?; + + create_dir_all(output_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {e}", + output_dir.display() + )) + })?; + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to create output file '{local_path}': {e}")) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local writer: {e}")) + })?; + + Ok(Self { writer }) + } +} + +#[async_trait] +impl ParquetWriter for LocalFileWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.writer.write(batch) + } + + async fn close(mut self: Box) -> Result<(), ParquetError> { + self.writer.close()?; + Ok(()) + } +} + +// A `ParquetWriter` implementation that streams Parquet data to remote object storage +// (HDFS, S3, etc.) via the OpenDAL abstraction layer. +struct OpendalWriter { + arrow_writer: ArrowWriter>>, + opendal_writer: Option, + operator: Operator, + path: String, +} + +impl OpendalWriter { + fn try_new( + operator: Operator, + path: String, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + let cursor = Cursor::new(Vec::new()); + let arrow_writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create OpenDAL arrow writer: {e}")) + })?; + Ok(Self { + arrow_writer, + opendal_writer: None, + operator, + path, + }) + } +} + +#[async_trait] +impl ParquetWriter for OpendalWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.arrow_writer.write(batch)?; + self.arrow_writer.flush()?; + + let cursor = self.arrow_writer.inner_mut(); + let data = cursor.get_ref().clone(); + + if data.is_empty() { + return Ok(()); + } + + if self.opendal_writer.is_none() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if let Some(w) = &mut self.opendal_writer { + w.write(data).await.map_err(|e| { + ParquetError::External(format!("Failed to write to '{}': {e}", self.path).into()) + })?; } + + let cursor = self.arrow_writer.inner_mut(); + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) } - /// Close the writer and finalize the file - async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => { - writer.close()?; - Ok(()) - } - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - mut hdfs_writer_opt, - op, - output_path, - ) => { - // Close the arrow writer to finalize parquet format - let cursor = arrow_parquet_buffer_writer.into_inner()?; - let final_data = cursor.into_inner(); - - // Create HDFS writer if not already created - if hdfs_writer_opt.is_none() && !final_data.is_empty() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), + async fn close(mut self: Box) -> Result<(), ParquetError> { + let cursor = self.arrow_writer.into_inner()?; + let final_data = cursor.into_inner(); + + if self.opendal_writer.is_none() && !final_data.is_empty() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if !final_data.is_empty() { + if let Some(mut writer) = self.opendal_writer { + writer.write(final_data).await.map_err(|e| { + ParquetError::External( + format!( + "Failed to write final data to HDFS file '{}': {}", + self.path, e ) - })?; - hdfs_writer_opt = Some(writer); - } - - // Write any remaining data - if !final_data.is_empty() { - if let Some(mut hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(final_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write final data to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - - // Close the HDFS writer - hdfs_writer.close().await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to close HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - } - } - - Ok(()) + .into(), + ) + })?; + + writer.close().await.map_err(|e| { + ParquetError::External( + format!("Failed to close HDFS writer for '{}': {}", self.path, e).into(), + ) + })?; } } + + Ok(()) + } +} + +// A factory that inspects the destination URL and produces the appropriate +// `ParquetWriter` implementation for the target storage backend. +struct StorageWriterFactory; + +impl StorageWriterFactory { + // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. + // Supported backends: + // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. + // - **S3 / S3A** – detected by scheme; backed by `OpendalWriter`. + // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. + fn create( + output_path: &str, + schema: SchemaRef, + props: WriterProperties, + runtime_env: Arc, + object_store_options: &HashMap, + ) -> Result> { + let (_, object_store_path) = prepare_object_store_with_configs( + runtime_env, + output_path.to_string(), + &HashMap::new(), + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{output_path}': {e}" + )) + })?; + + let url = Url::parse(output_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse URL '{output_path}': {e}")) + })?; + + if is_hdfs_scheme(&url, object_store_options) { + Self::create_hdfs_parquet_writer(schema, props, &object_store_path.to_string(), &url) + } else if Self::is_s3_scheme(&url) { + Self::create_s3_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + object_store_options, + ) + } else if Self::is_local_path(output_path) { + Ok(Box::new(LocalFileWriter::try_new( + output_path, + schema, + props, + )?)) + } else { + Err(DataFusionError::Execution(format!( + "Unsupported storage scheme in path: {output_path}" + ))) + } + } + + fn create_hdfs_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + ) -> Result> { + let url_str = url.as_str(); + let op = create_hdfs_operator(url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn create_s3_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + object_store_options: &HashMap, + ) -> Result> { + let url_str = url.as_str(); + let access_key = object_store_options.get("fs.s3a.access.key"); + if access_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.access.key".to_string(), + )); + } + let secret_key = object_store_options.get("fs.s3a.secret.key"); + if secret_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.secret.key".to_string(), + )); + } + let endpoint = object_store_options.get("fs.s3a.endpoint"); + if endpoint.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint".to_string(), + )); + } + let region = object_store_options.get("fs.s3a.endpoint.region"); + if region.is_none() { + //todo try extract region from fs.s3a.endpoint + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint.region".to_string(), + )); + } + let bucket_name = url.host_str().unwrap(); + let builder = opendal::services::S3::default() + .endpoint(endpoint.unwrap()) + .secret_access_key(secret_key.unwrap()) + .access_key_id(access_key.unwrap()) + .region(region.unwrap()) + .bucket(bucket_name); + let op = Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "s3-opendal", + source: error.into(), + }) + .map(|op| op.finish()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn is_local_path(path: &str) -> bool { + path.starts_with("file://") || path.starts_with("file:") || !path.contains("://") + } + + fn is_s3_scheme(url: &Url) -> bool { + url.scheme() == "s3a" || url.scheme() == "s3" } } @@ -263,165 +434,6 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } - - /// Create an Arrow writer based on the storage scheme - /// - /// # Arguments - /// * `output_file_path` - The full path to the output file - /// * `schema` - The Arrow schema for the Parquet file - /// * `props` - Writer properties including compression - /// * `runtime_env` - Runtime environment for object store registration - /// * `object_store_options` - Configuration options for object store - /// - /// # Returns - /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme - /// * `Err(DataFusionError)` - If writer creation fails - fn create_arrow_writer( - output_file_path: &str, - schema: SchemaRef, - props: WriterProperties, - _runtime_env: Arc, - object_store_options: &HashMap, - ) -> Result { - // Parse URL and match on storage scheme directly - let url = Url::parse(output_file_path).map_err(|e| { - DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e)) - })?; - - if is_hdfs_scheme(&url, object_store_options) { - #[cfg(feature = "hdfs-opendal")] - { - // Use prepare_object_store_with_configs to create and register the object store - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) - })?; - - // Create HDFS operator with configuration options using the helper function - let op = create_hdfs_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create HDFS operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } - #[cfg(not(feature = "hdfs-opendal"))] - { - Err(DataFusionError::Execution( - "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), - )) - } - } else if is_s3_scheme(&url) { - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create S3 writer: {}", e)) - })?; - - // Create S3 operator with configuration options using the helper function - let op = create_s3_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create S3 operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } else if output_file_path.starts_with("file://") - || output_file_path.starts_with("file:") - || !output_file_path.contains("://") - { - // Local file system - { - // For a local file system, write directly to file - // Strip file:// or file: prefix if present - let local_path = output_file_path - .strip_prefix("file://") - .or_else(|| output_file_path.strip_prefix("file:")) - .unwrap_or(output_file_path); - - // Extract the parent directory from the file path - let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { - DataFusionError::Execution(format!( - "Failed to extract parent directory from path '{}'", - local_path - )) - })?; - - // Create the parent directory if it doesn't exist - std::fs::create_dir_all(output_dir).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - )) - })?; - - let file = File::create(local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - local_path, e - )) - })?; - - let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { - DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) - })?; - Ok(ParquetWriter::LocalFile(writer)) - } - } else { - // Unsupported storage scheme - Err(DataFusionError::Execution(format!( - "Unsupported storage scheme in path: {}", - output_file_path - ))) - } - } } impl DisplayAs for ParquetWriterExec { @@ -535,7 +547,8 @@ impl ExecutionPlan for ParquetWriterExec { .build(); let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( + + let mut writer = StorageWriterFactory::create( &part_file, Arc::clone(&output_schema), props, @@ -570,7 +583,7 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).await.map_err(|e| { + writer.write_batch(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } @@ -792,7 +805,7 @@ mod tests { let full_output_path = format!("hdfs://namenode:9000{}", output_path); let session_ctx = datafusion::prelude::SessionContext::new(); let runtime_env = session_ctx.runtime_env(); - let mut writer = ParquetWriterExec::create_arrow_writer( + let mut writer = StorageWriterFactory::create( &full_output_path, create_test_record_batch(1)?.schema(), props, @@ -804,7 +817,7 @@ mod tests { for i in 1..=5 { let record_batch = create_test_record_batch(i)?; - writer.write(&record_batch).await.map_err(|e| { + writer.write_batch(&record_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) })?; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 9b0fb40f60..06abe5b539 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -427,11 +427,6 @@ pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) } } -pub fn is_s3_scheme(url: &Url) -> bool { - let scheme = url.scheme(); - scheme == "s3a" -} - // Creates an HDFS object store from a URL using the native HDFS implementation #[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] fn create_hdfs_object_store( @@ -464,16 +459,6 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { - let builder = opendal::services::S3::default(); - opendal::Operator::new(builder) - .map_err(|error| object_store::Error::Generic { - store: "s3-opendal", - source: error.into(), - }) - .map(|op| op.finish()) -} - // Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] pub(crate) fn create_hdfs_object_store(