diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index e46d359d6..aed3a7472 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -16,7 +16,7 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::ptr::NonNull; use std::str::FromStr; use std::sync::Arc; @@ -463,7 +463,7 @@ impl PySessionContext { pub fn register_listing_table( &self, name: &str, - path: &str, + path: PathBuf, table_partition_cols: Vec<(String, PyArrowType)>, file_extension: &str, schema: Option>, @@ -472,20 +472,9 @@ impl PySessionContext { ) -> PyDataFusionResult<()> { let options = ListingOptions::new(Arc::new(ParquetFormat::new())) .with_file_extension(file_extension) - .with_table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .with_file_sort_order( - file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(), - ); - let table_path = ListingTableUrl::parse(path)?; + .with_table_partition_cols(convert_partition_cols(table_partition_cols)) + .with_file_sort_order(convert_file_sort_order(file_sort_order)); + let table_path = ListingTableUrl::parse(path_to_str(&path)?)?; let resolved_schema: SchemaRef = match schema { Some(s) => Arc::new(s.0), None => { @@ -842,7 +831,7 @@ impl PySessionContext { pub fn register_parquet( &self, name: &str, - path: &str, + path: PathBuf, table_partition_cols: Vec<(String, PyArrowType)>, parquet_pruning: bool, file_extension: &str, @@ -851,25 +840,19 @@ impl PySessionContext { file_sort_order: Option>>, py: Python, ) -> PyDataFusionResult<()> { - let mut options = ParquetReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(); - - let result = self.ctx.register_parquet(name, path, options); - wait_for_future(py, result)??; + let options = build_parquet_options( + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + &schema, + file_sort_order, + ); + wait_for_future( + py, + self.ctx + .register_parquet(name, path_to_str(&path)?, options), + )??; Ok(()) } @@ -883,19 +866,24 @@ impl PySessionContext { options: Option<&PyCsvReadOptions>, py: Python, ) -> PyDataFusionResult<()> { - let options = options - .map(|opts| opts.try_into()) - .transpose()? - .unwrap_or_default(); + let options = convert_csv_options(options)?; if path.is_instance_of::() { - let paths = path.extract::>()?; - let result = self.register_csv_from_multiple_paths(name, paths, options); - wait_for_future(py, result)??; + let paths = path + .extract::>()? + .iter() + .map(|p| path_to_str(p).map(str::to_owned)) + .collect::>>()?; + wait_for_future( + py, + self.register_csv_from_multiple_paths(name, paths, options), + )??; } else { - let path = path.extract::()?; - let result = self.ctx.register_csv(name, &path, options); - wait_for_future(py, result)??; + let path = path.extract::()?; + wait_for_future( + py, + self.ctx.register_csv(name, path_to_str(&path)?, options), + )??; } Ok(()) @@ -920,25 +908,17 @@ impl PySessionContext { file_compression_type: Option, py: Python, ) -> PyDataFusionResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = JsonReadOptions::default() - .file_compression_type(parse_file_compression_type(file_compression_type)?) - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_json(name, path, options); - wait_for_future(py, result)??; - + let options = build_json_options( + table_partition_cols, + file_compression_type, + schema_infer_max_records, + file_extension, + &schema, + )?; + wait_for_future( + py, + self.ctx.register_json(name, path_to_str(&path)?, options), + )??; Ok(()) } @@ -957,22 +937,11 @@ impl PySessionContext { table_partition_cols: Vec<(String, PyArrowType)>, py: Python, ) -> PyDataFusionResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = AvroReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_avro(name, path, options); - wait_for_future(py, result)??; - + let options = build_avro_options(table_partition_cols, file_extension, &schema); + wait_for_future( + py, + self.ctx.register_avro(name, path_to_str(&path)?, options), + )??; Ok(()) } @@ -980,23 +949,17 @@ impl PySessionContext { pub fn register_arrow( &self, name: &str, - path: &str, + path: PathBuf, schema: Option>, file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, py: Python, ) -> PyDataFusionResult<()> { - let mut options = ArrowReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_arrow(name, path, options); - wait_for_future(py, result)??; + let options = build_arrow_options(table_partition_cols, file_extension, &schema); + wait_for_future( + py, + self.ctx.register_arrow(name, path_to_str(&path)?, options), + )??; Ok(()) } @@ -1155,27 +1118,14 @@ impl PySessionContext { file_compression_type: Option, py: Python, ) -> PyDataFusionResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let mut options = JsonReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let result = self.ctx.read_json(path, options); - wait_for_future(py, result)?? - } else { - let result = self.ctx.read_json(path, options); - wait_for_future(py, result)?? - }; + let options = build_json_options( + table_partition_cols, + file_compression_type, + schema_infer_max_records, + file_extension, + &schema, + )?; + let df = wait_for_future(py, self.ctx.read_json(path_to_str(&path)?, options))??; Ok(PyDataFrame::new(df)) } @@ -1188,23 +1138,18 @@ impl PySessionContext { options: Option<&PyCsvReadOptions>, py: Python, ) -> PyDataFusionResult { - let options = options - .map(|opts| opts.try_into()) - .transpose()? - .unwrap_or_default(); + let options = convert_csv_options(options)?; - if path.is_instance_of::() { - let paths = path.extract::>()?; - let paths = paths.iter().map(|p| p as &str).collect::>(); - let result = self.ctx.read_csv(paths, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); - Ok(df) + let paths: Vec = if path.is_instance_of::() { + path.extract::>()? + .iter() + .map(|p| path_to_str(p).map(str::to_owned)) + .collect::>()? } else { - let path = path.extract::()?; - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); - Ok(df) - } + vec![path_to_str(&path.extract::()?)?.to_owned()] + }; + let df = wait_for_future(py, self.ctx.read_csv(paths, options))??; + Ok(PyDataFrame::new(df)) } #[allow(clippy::too_many_arguments)] @@ -1218,7 +1163,7 @@ impl PySessionContext { file_sort_order=None))] pub fn read_parquet( &self, - path: &str, + path: PathBuf, table_partition_cols: Vec<(String, PyArrowType)>, parquet_pruning: bool, file_extension: &str, @@ -1227,25 +1172,18 @@ impl PySessionContext { file_sort_order: Option>>, py: Python, ) -> PyDataFusionResult { - let mut options = ParquetReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(); - - let result = self.ctx.read_parquet(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); + let options = build_parquet_options( + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + &schema, + file_sort_order, + ); + let df = PyDataFrame::new(wait_for_future( + py, + self.ctx.read_parquet(path_to_str(&path)?, options), + )??); Ok(df) } @@ -1253,50 +1191,28 @@ impl PySessionContext { #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))] pub fn read_avro( &self, - path: &str, + path: PathBuf, schema: Option>, table_partition_cols: Vec<(String, PyArrowType)>, file_extension: &str, py: Python, ) -> PyDataFusionResult { - let mut options = AvroReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future)?? - } else { - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future)?? - }; + let options = build_avro_options(table_partition_cols, file_extension, &schema); + let df = wait_for_future(py, self.ctx.read_avro(path_to_str(&path)?, options))??; Ok(PyDataFrame::new(df)) } #[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))] pub fn read_arrow( &self, - path: &str, + path: PathBuf, schema: Option>, file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, py: Python, ) -> PyDataFusionResult { - let mut options = ArrowReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.read_arrow(path, options); - let df = wait_for_future(py, result)??; + let options = build_arrow_options(table_partition_cols, file_extension, &schema); + let df = wait_for_future(py, self.ctx.read_arrow(path_to_str(&path)?, options))??; Ok(PyDataFrame::new(df)) } @@ -1396,7 +1312,7 @@ impl PySessionContext { // check if the file extension matches the expected extension for path in &table_paths { let file_path = path.as_str(); - if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { + if !file_path.ends_with(option_extension.as_str()) && !path.is_collection() { return exec_err!( "File path '{file_path}' does not match the expected extension '{option_extension}'" ); @@ -1437,6 +1353,97 @@ pub fn parse_file_compression_type( }) } +fn path_to_str(path: &Path) -> PyDataFusionResult<&str> { + path.to_str() + .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string").into()) +} + +fn convert_csv_options( + options: Option<&PyCsvReadOptions>, +) -> PyDataFusionResult> { + Ok(options + .map(|opts| opts.try_into()) + .transpose()? + .unwrap_or_default()) +} + +fn convert_partition_cols( + table_partition_cols: Vec<(String, PyArrowType)>, +) -> Vec<(String, DataType)> { + table_partition_cols + .into_iter() + .map(|(name, ty)| (name, ty.0)) + .collect() +} + +fn convert_file_sort_order( + file_sort_order: Option>>, +) -> Vec> { + file_sort_order + .unwrap_or_default() + .into_iter() + .map(|e| e.into_iter().map(|f| f.into()).collect()) + .collect() +} + +fn build_parquet_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + parquet_pruning: bool, + file_extension: &'a str, + skip_metadata: bool, + schema: &'a Option>, + file_sort_order: Option>>, +) -> ParquetReadOptions<'a> { + let mut options = ParquetReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)) + .parquet_pruning(parquet_pruning) + .skip_metadata(skip_metadata); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options.file_sort_order = convert_file_sort_order(file_sort_order); + options +} + +fn build_json_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + file_compression_type: Option, + schema_infer_max_records: usize, + file_extension: &'a str, + schema: &'a Option>, +) -> Result, PyErr> { + let mut options = JsonReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)) + .file_compression_type(parse_file_compression_type(file_compression_type)?); + options.schema_infer_max_records = schema_infer_max_records; + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + Ok(options) +} + +fn build_arrow_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + file_extension: &'a str, + schema: &'a Option>, +) -> ArrowReadOptions<'a> { + let mut options = ArrowReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options +} + +fn build_avro_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + file_extension: &'a str, + schema: &'a Option>, +) -> AvroReadOptions<'a> { + let mut options = AvroReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options +} + impl From for SessionContext { fn from(ctx: PySessionContext) -> SessionContext { ctx.ctx.as_ref().clone() diff --git a/python/datafusion/context.py b/python/datafusion/context.py index c3f94cc16..827acbd50 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -603,7 +603,7 @@ def register_listing_table( table_partition_cols = _convert_table_partition_cols(table_partition_cols) self.ctx.register_listing_table( name, - str(path), + path, table_partition_cols, file_extension, schema, @@ -971,7 +971,7 @@ def register_parquet( table_partition_cols = _convert_table_partition_cols(table_partition_cols) self.ctx.register_parquet( name, - str(path), + path, table_partition_cols, parquet_pruning, file_extension, @@ -1013,8 +1013,6 @@ def register_csv( options: Set advanced options for CSV reading. This cannot be combined with any of the other options in this method. """ - path_arg = [str(p) for p in path] if isinstance(path, list) else str(path) - if options is not None and ( schema is not None or not has_header @@ -1048,7 +1046,7 @@ def register_csv( self.ctx.register_csv( name, - path_arg, + path, options.to_inner(), ) @@ -1083,7 +1081,7 @@ def register_json( table_partition_cols = _convert_table_partition_cols(table_partition_cols) self.ctx.register_json( name, - str(path), + path, schema, schema_infer_max_records, file_extension, @@ -1114,9 +1112,7 @@ def register_avro( if table_partition_cols is None: table_partition_cols = [] table_partition_cols = _convert_table_partition_cols(table_partition_cols) - self.ctx.register_avro( - name, str(path), schema, file_extension, table_partition_cols - ) + self.ctx.register_avro(name, path, schema, file_extension, table_partition_cols) def register_arrow( self, @@ -1195,7 +1191,7 @@ def register_arrow( table_partition_cols = [] table_partition_cols = _convert_table_partition_cols(table_partition_cols) self.ctx.register_arrow( - name, str(path), schema, file_extension, table_partition_cols + name, path, schema, file_extension, table_partition_cols ) def register_dataset(self, name: str, dataset: pa.dataset.Dataset) -> None: @@ -1407,7 +1403,7 @@ def read_json( table_partition_cols = _convert_table_partition_cols(table_partition_cols) return DataFrame( self.ctx.read_json( - str(path), + path, schema, schema_infer_max_records, file_extension, @@ -1450,8 +1446,6 @@ def read_csv( Returns: DataFrame representation of the read CSV files """ - path_arg = [str(p) for p in path] if isinstance(path, list) else str(path) - if options is not None and ( schema is not None or not has_header @@ -1487,7 +1481,7 @@ def read_csv( return DataFrame( self.ctx.read_csv( - path_arg, + path, options.to_inner(), ) ) @@ -1530,7 +1524,7 @@ def read_parquet( file_sort_order = self._convert_file_sort_order(file_sort_order) return DataFrame( self.ctx.read_parquet( - str(path), + path, table_partition_cols, parquet_pruning, file_extension, @@ -1562,7 +1556,7 @@ def read_avro( file_partition_cols = [] file_partition_cols = _convert_table_partition_cols(file_partition_cols) return DataFrame( - self.ctx.read_avro(str(path), schema, file_partition_cols, file_extension) + self.ctx.read_avro(path, schema, file_partition_cols, file_extension) ) def read_arrow( @@ -1634,7 +1628,7 @@ def read_arrow( file_partition_cols = [] file_partition_cols = _convert_table_partition_cols(file_partition_cols) return DataFrame( - self.ctx.read_arrow(str(path), schema, file_extension, file_partition_cols) + self.ctx.read_arrow(path, schema, file_extension, file_partition_cols) ) def read_empty(self) -> DataFrame: