diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..4edd5617d3 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,20 +17,6 @@ //! Parquet writer operator for writing RecordBatches to Parquet files -use std::{ - any::Any, - collections::HashMap, - fmt, - fmt::{Debug, Formatter}, - fs::File, - sync::Arc, -}; - -#[cfg(feature = "hdfs-opendal")] -use opendal::Operator; -#[cfg(feature = "hdfs-opendal")] -use std::io::Cursor; - use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::is_hdfs_scheme; #[cfg(feature = "hdfs-opendal")] @@ -38,6 +24,7 @@ use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -51,141 +38,325 @@ use datafusion::{ }, }; use futures::TryStreamExt; +#[cfg(feature = "hdfs-opendal")] +use opendal::Operator; +use parquet::errors::ParquetError; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; +use std::fs::create_dir_all; +#[cfg(feature = "hdfs-opendal")] +use std::io::Cursor; +use std::path::Path; +use std::{ + any::Any, + collections::HashMap, + fmt, + fmt::{Debug, Formatter}, + fs::File, + sync::Arc, +}; use url::Url; -/// Enum representing different types of Arrow writers based on storage backend -enum ParquetWriter { - /// Writer for local file system - LocalFile(ArrowWriter), - /// Writer for HDFS or other remote storage (writes to in-memory buffer) - /// Contains the arrow writer, HDFS operator, and destination path - /// an Arrow writer writes to in-memory buffer the data converted to Parquet format - /// The opendal::Writer is created lazily on first write - #[cfg(feature = "hdfs-opendal")] - Remote( - ArrowWriter>>, - Option, - Operator, - String, - ), +// A trait abstracting over different Parquet write targets (local filesystem, HDFS, S3, etc.) +#[async_trait] +trait ParquetWriter: Send { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError>; + async fn close(self: Box) -> Result<(), ParquetError>; } -impl ParquetWriter { - /// Write a RecordBatch to the underlying writer - async fn write( - &mut self, - batch: &RecordBatch, - ) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => writer.write(batch), - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - hdfs_writer_opt, - op, - output_path, - ) => { - // Write batch to in-memory buffer - arrow_parquet_buffer_writer.write(batch)?; - - // Flush and get the current buffer content - arrow_parquet_buffer_writer.flush()?; - let cursor = arrow_parquet_buffer_writer.inner_mut(); - let current_data = cursor.get_ref().clone(); - - // Create HDFS writer lazily on first write - if hdfs_writer_opt.is_none() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - *hdfs_writer_opt = Some(writer); - } - - // Write the accumulated data to HDFS - if let Some(hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(current_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write batch to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - } +// A `ParquetWriter` implementation that writes directly to a local file. +struct LocalFileWriter { + writer: ArrowWriter, +} - // Clear the buffer after upload - cursor.get_mut().clear(); - cursor.set_position(0); +impl LocalFileWriter { + fn try_new(path: &str, schema: SchemaRef, props: WriterProperties) -> Result { + let local_path = path + .strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path); + + let output_dir = Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{local_path}'" + )) + })?; - Ok(()) - } + create_dir_all(output_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {e}", + output_dir.display() + )) + })?; + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to create output file '{local_path}': {e}")) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local writer: {e}")) + })?; + + Ok(Self { writer }) + } +} + +#[async_trait] +impl ParquetWriter for LocalFileWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.writer.write(batch) + } + + async fn close(mut self: Box) -> Result<(), ParquetError> { + self.writer.close()?; + Ok(()) + } +} + +// A `ParquetWriter` implementation that streams Parquet data to remote object storage +// (HDFS, S3, etc.) via the OpenDAL abstraction layer. +struct OpendalWriter { + arrow_writer: ArrowWriter>>, + opendal_writer: Option, + operator: Operator, + path: String, +} + +impl OpendalWriter { + fn try_new( + operator: Operator, + path: String, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + let cursor = Cursor::new(Vec::new()); + let arrow_writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create OpenDAL arrow writer: {e}")) + })?; + Ok(Self { + arrow_writer, + opendal_writer: None, + operator, + path, + }) + } +} + +#[async_trait] +impl ParquetWriter for OpendalWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.arrow_writer.write(batch)?; + self.arrow_writer.flush()?; + + let cursor = self.arrow_writer.inner_mut(); + let data = cursor.get_ref().clone(); + + if data.is_empty() { + return Ok(()); + } + + if self.opendal_writer.is_none() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if let Some(w) = &mut self.opendal_writer { + w.write(data).await.map_err(|e| { + ParquetError::External(format!("Failed to write to '{}': {e}", self.path).into()) + })?; } + + let cursor = self.arrow_writer.inner_mut(); + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) } - /// Close the writer and finalize the file - async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => { - writer.close()?; - Ok(()) - } - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - mut hdfs_writer_opt, - op, - output_path, - ) => { - // Close the arrow writer to finalize parquet format - let cursor = arrow_parquet_buffer_writer.into_inner()?; - let final_data = cursor.into_inner(); - - // Create HDFS writer if not already created - if hdfs_writer_opt.is_none() && !final_data.is_empty() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), + async fn close(mut self: Box) -> Result<(), ParquetError> { + let cursor = self.arrow_writer.into_inner()?; + let final_data = cursor.into_inner(); + + if self.opendal_writer.is_none() && !final_data.is_empty() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if !final_data.is_empty() { + if let Some(mut writer) = self.opendal_writer { + writer.write(final_data).await.map_err(|e| { + ParquetError::External( + format!( + "Failed to write final data to HDFS file '{}': {}", + self.path, e ) - })?; - hdfs_writer_opt = Some(writer); - } - - // Write any remaining data - if !final_data.is_empty() { - if let Some(mut hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(final_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write final data to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - - // Close the HDFS writer - hdfs_writer.close().await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to close HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - } - } - - Ok(()) + .into(), + ) + })?; + + writer.close().await.map_err(|e| { + ParquetError::External( + format!("Failed to close HDFS writer for '{}': {}", self.path, e).into(), + ) + })?; } } + + Ok(()) + } +} + +// A factory that inspects the destination URL and produces the appropriate +// `ParquetWriter` implementation for the target storage backend. +struct StorageWriterFactory; + +impl StorageWriterFactory { + // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. + // Supported backends: + // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. + // - **S3 / S3A** – detected by scheme; backed by `OpendalWriter`. + // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. + fn create( + output_path: &str, + schema: SchemaRef, + props: WriterProperties, + runtime_env: Arc, + object_store_options: &HashMap, + ) -> Result> { + let (_, object_store_path) = prepare_object_store_with_configs( + runtime_env, + output_path.to_string(), + &HashMap::new(), + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{output_path}': {e}" + )) + })?; + + let url = Url::parse(output_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse URL '{output_path}': {e}")) + })?; + + if is_hdfs_scheme(&url, object_store_options) { + Self::create_hdfs_parquet_writer(schema, props, &object_store_path.to_string(), &url) + } else if Self::is_s3_scheme(&url) { + Self::create_s3_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + object_store_options, + ) + } else if Self::is_local_path(output_path) { + Ok(Box::new(LocalFileWriter::try_new( + output_path, + schema, + props, + )?)) + } else { + Err(DataFusionError::Execution(format!( + "Unsupported storage scheme in path: {output_path}" + ))) + } + } + + fn create_hdfs_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + ) -> Result> { + let url_str = url.as_str(); + let op = create_hdfs_operator(url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn create_s3_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + object_store_options: &HashMap, + ) -> Result> { + let url_str = url.as_str(); + let access_key = object_store_options.get("fs.s3a.access.key"); + if access_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.access.key".to_string(), + )); + } + let secret_key = object_store_options.get("fs.s3a.secret.key"); + if secret_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.secret.key".to_string(), + )); + } + let endpoint = object_store_options.get("fs.s3a.endpoint"); + if endpoint.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint".to_string(), + )); + } + let region = object_store_options.get("fs.s3a.endpoint.region"); + if region.is_none() { + //todo try extract region from fs.s3a.endpoint + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint.region".to_string(), + )); + } + let bucket_name = url.host_str().unwrap(); + let builder = opendal::services::S3::default() + .endpoint(endpoint.unwrap()) + .secret_access_key(secret_key.unwrap()) + .access_key_id(access_key.unwrap()) + .region(region.unwrap()) + .bucket(bucket_name); + let op = Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "s3-opendal", + source: error.into(), + }) + .map(|op| op.finish()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn is_local_path(path: &str) -> bool { + path.starts_with("file://") || path.starts_with("file:") || !path.contains("://") + } + + fn is_s3_scheme(url: &Url) -> bool { + url.scheme() == "s3a" || url.scheme() == "s3" } } @@ -263,128 +434,6 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } - - /// Create an Arrow writer based on the storage scheme - /// - /// # Arguments - /// * `output_file_path` - The full path to the output file - /// * `schema` - The Arrow schema for the Parquet file - /// * `props` - Writer properties including compression - /// * `runtime_env` - Runtime environment for object store registration - /// * `object_store_options` - Configuration options for object store - /// - /// # Returns - /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme - /// * `Err(DataFusionError)` - If writer creation fails - fn create_arrow_writer( - output_file_path: &str, - schema: SchemaRef, - props: WriterProperties, - _runtime_env: Arc, - object_store_options: &HashMap, - ) -> Result { - // Parse URL and match on storage scheme directly - let url = Url::parse(output_file_path).map_err(|e| { - DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e)) - })?; - - if is_hdfs_scheme(&url, object_store_options) { - #[cfg(feature = "hdfs-opendal")] - { - // Use prepare_object_store_with_configs to create and register the object store - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) - })?; - - // Create HDFS operator with configuration options using the helper function - let op = create_hdfs_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create HDFS operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } - #[cfg(not(feature = "hdfs-opendal"))] - { - Err(DataFusionError::Execution( - "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), - )) - } - } else if output_file_path.starts_with("file://") - || output_file_path.starts_with("file:") - || !output_file_path.contains("://") - { - // Local file system - { - // For a local file system, write directly to file - // Strip file:// or file: prefix if present - let local_path = output_file_path - .strip_prefix("file://") - .or_else(|| output_file_path.strip_prefix("file:")) - .unwrap_or(output_file_path); - - // Extract the parent directory from the file path - let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { - DataFusionError::Execution(format!( - "Failed to extract parent directory from path '{}'", - local_path - )) - })?; - - // Create the parent directory if it doesn't exist - std::fs::create_dir_all(output_dir).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - )) - })?; - - let file = File::create(local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - local_path, e - )) - })?; - - let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { - DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) - })?; - Ok(ParquetWriter::LocalFile(writer)) - } - } else { - // Unsupported storage scheme - Err(DataFusionError::Execution(format!( - "Unsupported storage scheme in path: {}", - output_file_path - ))) - } - } } impl DisplayAs for ParquetWriterExec { @@ -498,7 +547,8 @@ impl ExecutionPlan for ParquetWriterExec { .build(); let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( + + let mut writer = StorageWriterFactory::create( &part_file, Arc::clone(&output_schema), props, @@ -533,7 +583,7 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).await.map_err(|e| { + writer.write_batch(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } @@ -755,7 +805,7 @@ mod tests { let full_output_path = format!("hdfs://namenode:9000{}", output_path); let session_ctx = datafusion::prelude::SessionContext::new(); let runtime_env = session_ctx.runtime_env(); - let mut writer = ParquetWriterExec::create_arrow_writer( + let mut writer = StorageWriterFactory::create( &full_output_path, create_test_record_batch(1)?.schema(), props, @@ -767,7 +817,7 @@ mod tests { for i in 1..=5 { let record_batch = create_test_record_batch(i)?; - writer.write(&record_batch).await.map_err(|e| { + writer.write_batch(&record_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) })?; diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 60fb65277e..229d82764f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -46,6 +46,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd") + private val supportedFilesystemProtocols = Set("file", "hdfs", "s3a") + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED) @@ -58,9 +60,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString - .startsWith("hdfs:")) { - return Unsupported(Some("Supported output filesystems: local, HDFS")) + if (!isSupportedFilesystemProtocol(cmd.outputPath.toString)) { + return Unsupported(Some("Supported output filesystems: local, HDFS, S3")) } if (cmd.bucketSpec.isDefined) { @@ -204,6 +205,11 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId) } + private def isSupportedFilesystemProtocol(outputPath: String) = { + supportedFilesystemProtocols + .exists(protocol => outputPath.startsWith(s"${protocol}:")) + } + private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { cmd.options .getOrElse(