From 276db608ce99807c06071d419e2c168d0ea87d2f Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Mon, 13 Apr 2026 17:44:03 +0100 Subject: [PATCH 1/3] initial Signed-off-by: Mikhail Kot --- benchmarks/datafusion-bench/src/main.rs | 2 +- vortex-array/src/stats/stats_set.rs | 27 +++++- .../cpp/include/duckdb_vx/table_function.h | 15 ++- vortex-duckdb/cpp/table_function.cpp | 88 +++++++++++++++++- vortex-duckdb/src/datasource.rs | 54 ++++++++++- .../src/duckdb/table_function/mod.rs | 65 ++++++++++++- .../src/duckdb/table_function/statistics.rs | 54 +++++++++++ .../src/e2e_test/object_cache_test.rs | 13 +++ vortex-duckdb/src/multi_file.rs | 14 +-- vortex-file/public-api.lock | 10 +- vortex-file/src/file.rs | 4 + vortex-file/src/footer/file_statistics.rs | 16 ++++ vortex-file/src/footer/mod.rs | 4 + vortex-file/src/multi/mod.rs | 91 ++++++++++++++++--- vortex-layout/public-api.lock | 2 + vortex-layout/src/scan/multi.rs | 17 ++++ 16 files changed, 437 insertions(+), 39 deletions(-) create mode 100644 vortex-duckdb/src/duckdb/table_function/statistics.rs diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index ad0df8ea74a..37c93687827 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -316,7 +316,7 @@ async fn register_v2_tables( None => format!("*.{}", format.ext()), }; - let multi_ds = MultiFileDataSource::new(SESSION.clone()) + let (multi_ds, _) = MultiFileDataSource::new(SESSION.clone()) .with_glob(glob_pattern, Some(fs)) .build() .await?; diff --git a/vortex-array/src/stats/stats_set.rs b/vortex-array/src/stats/stats_set.rs index 5ddd0e908b1..7c9cba3b869 100644 --- a/vortex-array/src/stats/stats_set.rs +++ b/vortex-array/src/stats/stats_set.rs @@ -459,11 +459,28 @@ impl MutTypedStatsSetRef<'_, '_> { ) { (Some(m1), Some(m2)) => { // If the combine sum is exact, then we can sum them. - if let Some(scalar_value) = m1.zip(m2).as_exact().and_then(|(s1, s2)| { - s1.as_primitive() - .checked_add(&s2.as_primitive()) - .and_then(|pscalar| pscalar.pvalue().map(ScalarValue::Primitive)) - }) { + if let Some(scalar_value) = + m1.zip(m2).as_exact().and_then(|(s1, s2)| match s1.dtype() { + DType::Primitive(..) => s1 + .as_primitive() + .checked_add(&s2.as_primitive()) + .and_then(|pscalar| pscalar.pvalue().map(ScalarValue::Primitive)), + DType::Decimal(..) => s1 + .as_decimal() + .checked_binary_numeric( + &s2.as_decimal(), + crate::scalar::NumericOperator::Add, + ) + .map(|scalar| { + ScalarValue::Decimal( + scalar + .decimal_value() + .vortex_expect("no decimal value in scalar"), + ) + }), + _ => None, + }) + { self.set(Stat::Sum, Precision::Exact(scalar_value)); } } diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index e8f483514a5..52ae1b9bbe5 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -97,6 +97,15 @@ typedef struct { bool has_max_cardinality; } duckdb_vx_node_statistics; +typedef struct { + duckdb_value min; + duckdb_value max; + // upper bit: "length is set". lower 32 bits: DuckDB's max string length. + uint64_t max_string_length; +} duckdb_column_statistics; + +typedef idx_t column_t; + // A transparent DuckDB table function vtable, which can be used to configure a table function. // See duckdb/include/function/tfunc.hpp for details on each field. typedef struct { @@ -137,7 +146,11 @@ typedef struct { // void *in_out_function; // void *in_out_function_final; - void *statistics; + + void (*statistics)(duckdb_client_context context, + const void *bind_data, + size_t column_index, + duckdb_column_statistics *stats_out); // void *dependency; void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out); diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index 6154e91f738..717fe038180 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#include "duckdb_vx/table_function.h" #include "duckdb_vx/duckdb_diagnostics.h" DUCKDB_INCLUDES_BEGIN @@ -30,8 +31,10 @@ struct CTableFunctionInfo final : TableFunctionInfo { }; struct CTableBindData final : TableFunctionData { - CTableBindData(unique_ptr info_p, unique_ptr ffi_data_p) - : info(std::move(info_p)), ffi_data(std::move(ffi_data_p)) { + CTableBindData(unique_ptr info_p, + unique_ptr ffi_data_p, + const vector &types) + : info(std::move(info_p)), ffi_data(std::move(ffi_data_p)), types(types) { } unique_ptr Copy() const override { @@ -43,11 +46,13 @@ struct CTableBindData final : TableFunctionData { throw BinderException(IntoErrString(error_out)); } return make_uniq(make_uniq(info->vtab), - unique_ptr(reinterpret_cast(copied_ffi_data))); + unique_ptr(reinterpret_cast(copied_ffi_data)), + types); } unique_ptr info; unique_ptr ffi_data; + vector types; }; struct CTableGlobalData final : GlobalTableFunctionState { @@ -88,6 +93,79 @@ double c_table_scan_progress(ClientContext &context, return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state); } +static Value &UnwrapValue(duckdb_value value) { + return *(reinterpret_cast(value)); +} + +unique_ptr numeric_stats(duckdb_column_statistics &stats, LogicalType type) { + BaseStatistics out = StringStats::CreateUnknown(type); + if (stats.min) { + NumericStats::SetMin(out, UnwrapValue(stats.min)); + duckdb_destroy_value(&stats.min); + } + if (stats.max) { + NumericStats::SetMax(out, UnwrapValue(stats.max)); + duckdb_destroy_value(&stats.max); + } + return out.ToUnique(); +} + +unique_ptr string_stats(duckdb_column_statistics &stats, LogicalType type) { + BaseStatistics out = StringStats::CreateUnknown(type); + if (stats.min) { + StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min))); + duckdb_destroy_value(&stats.min); + } + if (stats.max) { + StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max))); + duckdb_destroy_value(&stats.max); + } + if (stats.max_string_length >> 63) { + StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length)); + } + return out.ToUnique(); +} + +unique_ptr +c_statistics(ClientContext &context, const FunctionData *bind_data, column_t column_index) { + if (column_index == COLUMN_IDENTIFIER_EMPTY) { + return BaseStatistics::CreateUnknown(LogicalTypeId::INVALID).ToUnique(); + } + + const auto &bind = bind_data->Cast(); + void *const ffi_bind = bind.ffi_data->DataPtr(); + + duckdb_client_context c_ctx = reinterpret_cast(&context); + duckdb_column_statistics statistics = {}; + const LogicalType type = bind.types[column_index]; + + switch (type.id()) { + case LogicalTypeId::BOOLEAN: + case LogicalTypeId::TINYINT: + case LogicalTypeId::SMALLINT: + case LogicalTypeId::INTEGER: + case LogicalTypeId::BIGINT: + case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::UTINYINT: + case LogicalTypeId::USMALLINT: + case LogicalTypeId::UINTEGER: + case LogicalTypeId::UBIGINT: + case LogicalTypeId::UHUGEINT: + case LogicalTypeId::HUGEINT: { + bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics); + return numeric_stats(statistics, type); + } + case LogicalTypeId::VARCHAR: + case LogicalTypeId::BLOB: { + bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics); + return string_stats(statistics, type); + } + default: + return BaseStatistics::CreateUnknown(type).ToUnique(); + } +} + unique_ptr c_bind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, @@ -111,7 +189,8 @@ unique_ptr c_bind(ClientContext &context, } return make_uniq(make_uniq(info.vtab), - unique_ptr(reinterpret_cast(ffi_bind_data))); + unique_ptr(reinterpret_cast(ffi_bind_data)), + return_types); } unique_ptr c_init_global(ClientContext &context, TableFunctionInitInput &input) { @@ -363,6 +442,7 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.get_virtual_columns = c_get_virtual_columns; tf.to_string = c_to_string; tf.table_scan_progress = c_table_scan_progress; + tf.statistics = c_statistics; // Set up the parameters tf.arguments.reserve(vtab->parameter_count); diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 47a1eba2b4f..107d0da4134 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -37,6 +37,7 @@ use vortex::expr::col; use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; +use vortex::file::FileStatistics; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; @@ -54,6 +55,7 @@ use crate::duckdb::BindInputRef; use crate::duckdb::BindResultRef; use crate::duckdb::Cardinality; use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; use crate::duckdb::ExpressionRef; use crate::duckdb::LogicalType; @@ -80,6 +82,8 @@ use crate::exporter::ConversionCache; static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; static EMPTY_COLUMN_NAME: &str = ""; +pub type DataSourceWithStats = (DataSourceRef, Option); + /// A trait for table functions that resolve to a [`DataSourceRef`]. /// /// Implementors only need to define how parameters are declared and how binding produces a @@ -97,7 +101,14 @@ pub(crate) trait DataSourceTableFunction: Sized + Debug { } /// Bind the table function and return a [`DataSourceRef`]. - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult; + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult; +} + +#[derive(Clone)] +pub enum DataSourceStatistics { + All(Vec), + /// Dummy column to return a reference to + None(ColumnStatistics), } /// Bind data produced by a [`DataSourceTableFunction`]. @@ -106,6 +117,7 @@ pub struct DataSourceBindData { filter_exprs: Vec, column_names: Vec, column_types: Vec, + stats: DataSourceStatistics, } impl Clone for DataSourceBindData { @@ -116,6 +128,7 @@ impl Clone for DataSourceBindData { filter_exprs: vec![], column_names: self.column_names.clone(), column_types: self.column_types.clone(), + stats: self.stats.clone(), } } } @@ -189,19 +202,39 @@ impl TableFunction for T { input: &BindInputRef, result: &mut BindResultRef, ) -> VortexResult { - let data_source = T::bind(ctx, input)?; - + let (data_source, file_stats) = T::bind(ctx, input)?; let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; - for (column_name, column_type) in column_names.iter().zip(&column_types) { - result.add_result_column(column_name, column_type); + if file_stats.is_none() { + for i in 0..column_names.len() { + result.add_result_column(&column_names[i], &column_types[i]); + } + + let stats = DataSourceStatistics::None(ColumnStatistics::default()); + return Ok(DataSourceBindData { + data_source, + filter_exprs: vec![], + column_names, + column_types, + stats, + }); } + let file_stats = file_stats.vortex_expect("no stats"); + + let mut stats = Vec::with_capacity(column_names.len()); + for i in 0..column_names.len() { + result.add_result_column(&column_names[i], &column_types[i]); + let (stats_set, dtype) = file_stats.get(i); + stats.push(ColumnStatistics::new(stats_set, dtype.clone())); + } + let stats = DataSourceStatistics::All(stats); Ok(DataSourceBindData { data_source, filter_exprs: vec![], column_names, column_types, + stats, }) } @@ -412,6 +445,17 @@ impl TableFunction for T { Ok(false) } + fn statistics<'a>( + _client_context: &ClientContextRef, + bind_data: &'a Self::BindData, + column_index: usize, + ) -> &'a ColumnStatistics { + match &bind_data.stats { + DataSourceStatistics::All(items) => &items[column_index], + DataSourceStatistics::None(dummy) => dummy, + } + } + fn cardinality(bind_data: &Self::BindData) -> Cardinality { match bind_data.data_source.row_count() { Some(Precision::Exact(v)) => Cardinality::Maximum(v), diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index f20e844d381..5c822c6be12 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -7,6 +7,7 @@ use std::ffi::c_void; use std::fmt::Debug; use std::ptr; +use vortex::dtype::DType; use vortex::error::VortexExpect; use vortex::error::VortexResult; mod bind; @@ -14,6 +15,7 @@ mod cardinality; mod init; mod partition; mod pushdown_complex_filter; +mod statistics; mod table_scan_progress; mod virtual_columns; @@ -21,6 +23,10 @@ pub use bind::*; pub use init::*; pub use virtual_columns::VirtualColumnsResult; pub use virtual_columns::VirtualColumnsResultRef; +use vortex::array::stats::StatsSet; +use vortex::expr::stats::Precision; +use vortex::expr::stats::Stat; +use vortex::scalar::ScalarValue; use crate::cpp; use crate::cpp::duckdb_client_context; @@ -34,10 +40,61 @@ use crate::duckdb::expr::ExpressionRef; use crate::duckdb::table_function::cardinality::cardinality_callback; use crate::duckdb::table_function::partition::get_partition_data_callback; use crate::duckdb::table_function::pushdown_complex_filter::pushdown_complex_filter_callback; +use crate::duckdb::table_function::statistics::statistics; use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb::table_function::virtual_columns::get_virtual_columns_callback; use crate::duckdb_try; +#[derive(Clone)] +#[cfg_attr(test, derive(Debug))] +pub struct ColumnStatistics { + pub dtype: DType, + pub min: Option, + pub max: Option, + pub max_string_length: Option, +} + +// Needed for absent file stats in data source +impl Default for ColumnStatistics { + fn default() -> Self { + Self { + dtype: DType::Null, + min: None, + max: None, + max_string_length: None, + } + } +} + +impl ColumnStatistics { + pub fn new(stats: &StatsSet, dtype: DType) -> Self { + let min = match stats.get(Stat::Min) { + Some(Precision::Exact(min)) => Some(min), + _ => None, + }; + let max = match stats.get(Stat::Max) { + Some(Precision::Exact(max)) => Some(max), + _ => None, + }; + + let max_string_length = + if let Some(Precision::Exact(value)) = stats.get(Stat::UncompressedSizeInBytes) { + // DuckDB's string length is u32 + #[allow(clippy::cast_possible_truncation)] + Some(value.as_primitive().as_u64().vortex_expect("not a u64") as u32) + } else { + None + }; + + Self { + dtype, + min, + max, + max_string_length, + } + } +} + /// A trait that defines the supported operations for a table function in DuckDB. /// /// This trait does not yet cover the full C++ API, see table_function.hpp. @@ -84,6 +141,12 @@ pub trait TableFunction: Sized + Debug { result: &mut BindResultRef, ) -> VortexResult; + fn statistics<'a>( + client_context: &ClientContextRef, + bind_data: &'a Self::BindData, + column_index: usize, + ) -> &'a ColumnStatistics; + /// The function is called during query execution and is responsible for producing the output fn scan( client_context: &ClientContextRef, @@ -188,7 +251,7 @@ impl DatabaseRef { init_global: Some(init_global_callback::), init_local: Some(init_local_callback::), function: Some(function::), - statistics: ptr::null_mut::(), + statistics: Some(statistics::), cardinality: Some(cardinality_callback::), pushdown_complex_filter: Some(pushdown_complex_filter_callback::), pushdown_expression: ptr::null_mut::(), diff --git a/vortex-duckdb/src/duckdb/table_function/statistics.rs b/vortex-duckdb/src/duckdb/table_function/statistics.rs new file mode 100644 index 00000000000..93d1148910a --- /dev/null +++ b/vortex-duckdb/src/duckdb/table_function/statistics.rs @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ffi::c_void; +use std::ptr; + +use vortex::error::VortexExpect; +use vortex::scalar::Scalar; + +use crate::convert::ToDuckDBScalar; +use crate::cpp::duckdb_client_context; +use crate::cpp::{self}; +use crate::duckdb::ClientContext; +use crate::duckdb::TableFunction; +use crate::duckdb::Value; + +pub(crate) unsafe extern "C-unwind" fn statistics( + ctx: duckdb_client_context, + bind_data: *const c_void, + column_index: usize, + stats_out: *mut cpp::duckdb_column_statistics, +) { + let stats_out = unsafe { &mut *stats_out }; + let client_context = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); + let stats_ref = T::statistics(client_context, bind_data, column_index); + let dtype = &stats_ref.dtype; + + // By definition dtype matches the value, so use vortex_expect + if let Some(ref value) = stats_ref.min { + let value = Scalar::try_new(dtype.clone(), Some(value.clone())) + .vortex_expect("scalar dtype and value are incompatible") + .try_to_duckdb_scalar() + .vortex_expect("can't convert Scalar to duckdb Value"); + stats_out.min = Value::into_ptr(value); + } else { + stats_out.min = ptr::null_mut(); + } + + if let Some(ref value) = stats_ref.max { + let value = Scalar::try_new(dtype.clone(), Some(value.clone())) + .vortex_expect("scalar dtype and value are incompatible") + .try_to_duckdb_scalar() + .vortex_expect("can't convert Scalar to duckdb Value"); + stats_out.max = Value::into_ptr(value); + } else { + stats_out.max = ptr::null_mut(); + } + + stats_out.max_string_length = stats_ref + .max_string_length + .map_or(0, |len| (1u64 << 63) | (len as u64)); +} diff --git a/vortex-duckdb/src/e2e_test/object_cache_test.rs b/vortex-duckdb/src/e2e_test/object_cache_test.rs index 712d2a41209..2a12f34c6c5 100644 --- a/vortex-duckdb/src/e2e_test/object_cache_test.rs +++ b/vortex-duckdb/src/e2e_test/object_cache_test.rs @@ -5,13 +5,16 @@ use std::ffi::CString; +use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::error::vortex_err; +use vortex_array::stats::StatsSet; use crate::cpp::DUCKDB_TYPE; use crate::duckdb::BindInputRef; use crate::duckdb::BindResultRef; use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; use crate::duckdb::LogicalType; use crate::duckdb::TableFunction; @@ -23,6 +26,7 @@ pub struct TestTableFunction; #[derive(Debug, Clone)] pub struct TestBindData { cache_key: String, + stats: ColumnStatistics, } #[derive(Debug)] @@ -61,6 +65,7 @@ impl TableFunction for TestTableFunction { Ok(TestBindData { cache_key: "test_table_function_data".to_string(), + stats: ColumnStatistics::new(&StatsSet::default(), DType::Null), }) } @@ -112,6 +117,14 @@ impl TableFunction for TestTableFunction { ) -> VortexResult { Ok(0) } + + fn statistics<'a>( + _client_context: &ClientContextRef, + bind_data: &'a Self::BindData, + _column_index: usize, + ) -> &'a ColumnStatistics { + &bind_data.stats + } } use crate::duckdb::Database; diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 14c8e02befb..7ab5bf362d9 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -19,6 +19,7 @@ use vortex_utils::aliases::hash_map::HashMap; use crate::RUNTIME; use crate::SESSION; use crate::datasource::DataSourceTableFunction; +use crate::datasource::DataSourceWithStats; use crate::duckdb::BindInputRef; use crate::duckdb::ClientContextRef; use crate::duckdb::ExtractedValue; @@ -76,7 +77,7 @@ impl DataSourceTableFunction for VortexMultiFileScan { vec![LogicalType::varchar()] } - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { bind_multi_file_scan(ctx, input) } } @@ -89,7 +90,7 @@ impl DataSourceTableFunction for VortexMultiFileScanList { ] } - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { bind_multi_file_scan(ctx, input) } } @@ -98,7 +99,7 @@ impl DataSourceTableFunction for VortexMultiFileScanList { fn bind_multi_file_scan( ctx: &ClientContextRef, input: &BindInputRef, -) -> VortexResult { +) -> VortexResult { let glob_url_parameter = input .get_parameter(0) .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; @@ -139,7 +140,7 @@ fn bind_multi_file_scan( } RUNTIME.block_on(async { - let mut builder = MultiFileDataSource::new(SESSION.clone()); + let mut builder = MultiFileDataSource::new_eager(SESSION.clone()); for glob_url in &glob_urls { let mut base_url = glob_url.clone(); @@ -151,8 +152,9 @@ fn bind_multi_file_scan( builder = builder.with_glob(glob_url.path(), Some(fs)); } - let ds = builder.build().await?; - Ok(Arc::new(ds) as DataSourceRef) + let (data_source, stats) = builder.build().await?; + let ds = Arc::new(data_source) as DataSourceRef; + Ok((ds, stats)) }) } diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index f5be6aa8f23..094ea0c816f 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -6,10 +6,12 @@ pub struct vortex_file::multi::MultiFileDataSource impl vortex_file::multi::MultiFileDataSource -pub async fn vortex_file::multi::MultiFileDataSource::build(self) -> vortex_error::VortexResult +pub async fn vortex_file::multi::MultiFileDataSource::build(self) -> vortex_error::VortexResult<(impl vortex_scan::DataSource, core::option::Option)> pub fn vortex_file::multi::MultiFileDataSource::new(session: vortex_session::VortexSession) -> Self +pub fn vortex_file::multi::MultiFileDataSource::new_eager(session: vortex_session::VortexSession) -> Self + pub fn vortex_file::multi::MultiFileDataSource::with_glob(self, glob: impl core::convert::Into, fs: core::option::Option) -> Self pub fn vortex_file::multi::MultiFileDataSource::with_open_options(self, f: impl core::ops::function::Fn(vortex_file::VortexOpenOptions) -> vortex_file::VortexOpenOptions + core::marker::Send + core::marker::Sync + 'static) -> Self @@ -136,6 +138,8 @@ pub fn vortex_file::FileStatistics::from_flatbuffer<'a>(fb: &vortex_flatbuffers: pub fn vortex_file::FileStatistics::get(&self, field_idx: usize) -> (&vortex_array::stats::stats_set::StatsSet, &vortex_array::dtype::DType) +pub fn vortex_file::FileStatistics::merge(self, other: &vortex_file::FileStatistics) -> vortex_error::VortexResult + pub fn vortex_file::FileStatistics::new(stats: alloc::sync::Arc<[vortex_array::stats::stats_set::StatsSet]>, dtypes: alloc::sync::Arc<[vortex_array::dtype::DType]>) -> Self pub fn vortex_file::FileStatistics::new_with_dtype(stats: alloc::sync::Arc<[vortex_array::stats::stats_set::StatsSet]>, file_dtype: &vortex_array::dtype::DType) -> Self @@ -186,6 +190,8 @@ pub fn vortex_file::Footer::segment_map(&self) -> &alloc::sync::Arc<[vortex_file pub fn vortex_file::Footer::statistics(&self) -> core::option::Option<&vortex_file::FileStatistics> +pub fn vortex_file::Footer::take_statistics(&mut self) -> core::option::Option + impl core::clone::Clone for vortex_file::Footer pub fn vortex_file::Footer::clone(&self) -> vortex_file::Footer @@ -280,6 +286,8 @@ pub fn vortex_file::VortexFile::segment_source(&self) -> alloc::sync::Arc vortex_error::VortexResult>> +pub fn vortex_file::VortexFile::take_file_stats(&mut self) -> core::option::Option + impl core::clone::Clone for vortex_file::VortexFile pub fn vortex_file::VortexFile::clone(&self) -> vortex_file::VortexFile diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 7257676b05c..b64ab6ed918 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -73,6 +73,10 @@ impl VortexFile { self.footer.statistics() } + pub fn take_file_stats(&mut self) -> Option { + self.footer.take_statistics() + } + /// Create a new segment source for reading from the file. /// /// This may spawn a background I/O driver that will exit when the returned segment source diff --git a/vortex-file/src/footer/file_statistics.rs b/vortex-file/src/footer/file_statistics.rs index 4fac3ad8482..5126b03a2d4 100644 --- a/vortex-file/src/footer/file_statistics.rs +++ b/vortex-file/src/footer/file_statistics.rs @@ -146,6 +146,22 @@ impl FileStatistics { pub fn get(&self, field_idx: usize) -> (&StatsSet, &DType) { (&self.stats[field_idx], &self.dtypes[field_idx]) } + + pub fn merge(self, other: &FileStatistics) -> VortexResult { + vortex_ensure_eq!(self.stats.len(), other.stats_sets().len()); + + let FileStatistics { mut stats, dtypes } = self; + for (this, other_stat, dtype) in itertools::izip!( + Arc::make_mut(&mut stats).iter_mut(), + other.stats_sets().iter(), + dtypes.iter() + ) { + let owned = std::mem::take(this); + *this = owned.merge_unordered(other_stat, dtype); + } + + Ok(FileStatistics::new(stats, dtypes)) + } } impl<'a> IntoIterator for &'a FileStatistics { diff --git a/vortex-file/src/footer/mod.rs b/vortex-file/src/footer/mod.rs index ac2153f7cd1..bf0a87049b2 100644 --- a/vortex-file/src/footer/mod.rs +++ b/vortex-file/src/footer/mod.rs @@ -146,6 +146,10 @@ impl Footer { self.statistics.as_ref() } + pub fn take_statistics(&mut self) -> Option { + self.statistics.take() + } + /// Returns the [`DType`] of the file. pub fn dtype(&self) -> &DType { self.root_layout.dtype() diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index bf687f97ec1..c479adc5c3e 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use futures::TryStreamExt; use session::MultiFileSessionExt; use tracing::debug; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::filesystem::FileListing; @@ -21,6 +22,7 @@ use vortex_layout::scan::multi::MultiLayoutDataSource; use vortex_scan::DataSource; use vortex_session::VortexSession; +use crate::FileStatistics; use crate::OpenOptionsSessionExt; use crate::VortexOpenOptions; use crate::v2::FileStatsLayoutReader; @@ -61,8 +63,12 @@ pub struct MultiFileDataSource { /// When the filesystem is None, a local filesystem will be created in build(). glob_sources: Vec<(String, Option)>, open_options_fn: Arc VortexOpenOptions + Send + Sync>, + open_all: bool, } +type FileWithFs = (FileListing, FileSystemRef); +type DataSourceWithStats = (MultiLayoutDataSource, Option); + impl MultiFileDataSource { /// Create a new [`MultiFileDataSource`] builder. pub fn new(session: VortexSession) -> Self { @@ -70,6 +76,16 @@ impl MultiFileDataSource { session, glob_sources: Vec::new(), open_options_fn: Arc::new(|opts| opts), + open_all: false, + } + } + + pub fn new_eager(session: VortexSession) -> Self { + Self { + session, + glob_sources: Vec::new(), + open_options_fn: Arc::new(|opts| opts), + open_all: true, } } @@ -98,7 +114,7 @@ impl MultiFileDataSource { /// /// Discovers files via glob, opens the first file eagerly to determine the schema, /// and creates lazy factories for the remaining files. - pub async fn build(self) -> VortexResult { + pub async fn build(mut self) -> VortexResult<(impl DataSource, Option)> { if self.glob_sources.is_empty() { vortex_bail!("MultiFileDataSource requires at least one glob pattern"); } @@ -138,23 +154,70 @@ impl MultiFileDataSource { let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect(); debug!(file_count, glob = ?globs, "discovered files"); + if self.open_all { + self.build_eager(all_files).await + } else { + Ok((self.build_lazy(all_files).await?, None)) + } + } + + async fn build_eager(&mut self, files: Vec) -> VortexResult { + let mut all_files_have_stats = true; + let mut stats: Option = None; + + let open_fn = self.open_options_fn.as_ref(); + let files_len = files.len(); + let mut readers = Vec::with_capacity(files_len); + + for (file, fs) in files { + let mut file = open_file(&fs, &file, &self.session, open_fn).await?; + let mut reader = file.layout_reader()?; + + let file_stats = file.take_file_stats(); + if file_stats.is_none() { + all_files_have_stats = false; + readers.push(reader); + continue; + } + let file_stats = file_stats.vortex_expect("no stats"); + + if all_files_have_stats { + stats = Some(match stats.take() { + None => file_stats.clone(), + Some(st) => st.merge(&file_stats)?, + }) + } + + reader = Arc::new(FileStatsLayoutReader::new( + reader, + file_stats, + self.session.clone(), + )); + readers.push(reader); + } + + let inner = MultiLayoutDataSource::new_eager(readers, &self.session); + debug!(file_count = files_len, dtype = %inner.dtype(), "built MultiFileDataSource"); + + if !all_files_have_stats { + stats = None + } + Ok((inner, stats)) + } + + async fn build_lazy(&self, files: Vec) -> VortexResult { // Open first file eagerly for dtype. - let (first_file_listing, first_fs) = &all_files[0]; - let first_file = open_file( - first_fs, - first_file_listing, - &self.session, - self.open_options_fn.as_ref(), - ) - .await?; + let (first_file_listing, first_fs) = &files[0]; + let open_fn = self.open_options_fn.as_ref(); + let first_file = open_file(first_fs, first_file_listing, &self.session, open_fn).await?; let first_reader = layout_reader_with_stats(&first_file)?; - let factories: Vec> = all_files[1..] + let factories: Vec> = files[1..] .iter() - .map(|(file, fs)| { + .map(|(f, fs)| { Arc::new(VortexFileReaderFactory { fs: Arc::clone(fs), - file: file.clone(), + file: f.clone(), session: self.session.clone(), open_options_fn: Arc::clone(&self.open_options_fn), }) as Arc @@ -162,9 +225,7 @@ impl MultiFileDataSource { .collect(); let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session); - - debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); - + debug!(file_count = files.len(), dtype = %inner.dtype(), "built MultiFileDataSource"); Ok(inner) } } diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index 4458a5a779c..4c5711e71bd 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -1010,6 +1010,8 @@ impl vortex_layout::scan::multi::MultiLayoutDataSource pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_deferred(dtype: vortex_array::dtype::DType, factories: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self +pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_eager(readers: alloc::vec::Vec, session: &vortex_session::VortexSession) -> Self + pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_with_first(first: vortex_layout::LayoutReaderRef, remaining: alloc::vec::Vec>, session: &vortex_session::VortexSession) -> Self pub fn vortex_layout::scan::multi::MultiLayoutDataSource::with_concurrency(self, concurrency: usize) -> Self diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 307aca187cc..9af521c978a 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -116,6 +116,23 @@ impl MultiLayoutDataSource { } } + pub fn new_eager(readers: Vec, session: &VortexSession) -> Self { + let dtype = readers[0].dtype().clone(); + let concurrency = std::thread::available_parallelism() + .map(|v| v.get()) + .unwrap_or(DEFAULT_CONCURRENCY); + + let mut children = Vec::with_capacity(readers.len()); + children.extend(readers.into_iter().map(MultiLayoutChild::Opened)); + + Self { + dtype, + session: session.clone(), + children, + concurrency, + } + } + /// Creates a multi-layout data source where all children are deferred. /// /// The dtype must be provided externally since there is no pre-opened reader to infer it From 4f53d8d4841b4f856d72a52a3de78d5b2ea3795a Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 15 Apr 2026 17:37:02 +0100 Subject: [PATCH 2/3] lazy stats evaluation Signed-off-by: Mikhail Kot --- vortex-duckdb/src/datasource.rs | 201 ++++++++++++++---- .../src/duckdb/table_function/mod.rs | 63 +----- .../src/duckdb/table_function/statistics.rs | 42 +--- .../src/e2e_test/object_cache_test.rs | 12 +- vortex-duckdb/src/multi_file.rs | 13 +- vortex-file/src/multi/mod.rs | 43 +--- vortex-file/src/v2/file_stats_reader.rs | 2 +- vortex-layout/src/scan/multi.rs | 4 +- 8 files changed, 195 insertions(+), 185 deletions(-) diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 107d0da4134..fb89abd45c7 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -7,6 +7,9 @@ //! to get a blanket [`TableFunction`] implementation covering init, scan, progress, filter //! pushdown, cardinality, partitioning, and virtual columns. +use std::any::Any; +use std::cmp::max; +use std::collections::HashSet; use std::ffi::CString; use std::fmt::Debug; use std::sync::Arc; @@ -26,10 +29,12 @@ use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; use vortex::array::arrays::scalar_fn::ScalarFnArrayExt; use vortex::array::optimizer::ArrayOptimizer; +use vortex::array::stats::StatsSet; use vortex::dtype::DType; use vortex::dtype::FieldNames; use vortex::error::VortexExpect; use vortex::error::VortexResult; +use vortex::error::vortex_bail; use vortex::error::vortex_err; use vortex::expr::Expression; use vortex::expr::and_collect; @@ -37,18 +42,24 @@ use vortex::expr::col; use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; -use vortex::file::FileStatistics; +use vortex::expr::stats::Stat; +use vortex::file::v2::FileStatsLayoutReader; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; +use vortex::layout::scan::multi::MultiLayoutChild; +use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex::metrics::tracing::get_global_labels; +use vortex::scalar::Scalar; +use vortex::scalar::ScalarValue; use vortex::scalar_fn::fns::pack::Pack; +use vortex::scan::DataSource; use vortex::scan::DataSourceRef; use vortex::scan::ScanRequest; -use vortex_utils::aliases::hash_set::HashSet; use crate::RUNTIME; use crate::SESSION; +use crate::convert::ToDuckDBScalar; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; use crate::duckdb::BindInputRef; @@ -82,8 +93,6 @@ use crate::exporter::ConversionCache; static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; static EMPTY_COLUMN_NAME: &str = ""; -pub type DataSourceWithStats = (DataSourceRef, Option); - /// A trait for table functions that resolve to a [`DataSourceRef`]. /// /// Implementors only need to define how parameters are declared and how binding produces a @@ -101,15 +110,11 @@ pub(crate) trait DataSourceTableFunction: Sized + Debug { } /// Bind the table function and return a [`DataSourceRef`]. - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult; + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult; } -#[derive(Clone)] -pub enum DataSourceStatistics { - All(Vec), - /// Dummy column to return a reference to - None(ColumnStatistics), -} +/// Stats set reference for a file from FileStatistics. +type FileStatRef = Arc<[StatsSet]>; /// Bind data produced by a [`DataSourceTableFunction`]. pub struct DataSourceBindData { @@ -117,7 +122,13 @@ pub struct DataSourceBindData { filter_exprs: Vec, column_names: Vec, column_types: Vec, - stats: DataSourceStatistics, + + /// Unmerged statistics for all files forming this DataSource. + /// If any file is missing stats, this vector is empty. + stats: Vec, + /// Merged per-column statistics, aggregated for all files. + //stats_cache: Arc>, + column_dtypes: Vec, } impl Clone for DataSourceBindData { @@ -128,6 +139,7 @@ impl Clone for DataSourceBindData { filter_exprs: vec![], column_names: self.column_names.clone(), column_types: self.column_types.clone(), + column_dtypes: self.column_dtypes.clone(), stats: self.stats.clone(), } } @@ -176,6 +188,84 @@ fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 { read as f64 / total as f64 * 100. } +impl ColumnStatistics { + fn from(stats: &ColumnStatisticsAggregate, dtype: DType) -> Self { + let min = stats.min.as_ref().map(|value| { + Scalar::try_new(dtype.clone(), Some(value.clone())) + .vortex_expect("scalar dtype and value are incompatible") + .try_to_duckdb_scalar() + .vortex_expect("can't convert Scalar to duckdb Value") + }); + + let max = stats.max.as_ref().map(|value| { + Scalar::try_new(dtype.clone(), Some(value.clone())) + .vortex_expect("scalar dtype and value are incompatible") + .try_to_duckdb_scalar() + .vortex_expect("can't convert Scalar to duckdb Value") + }); + + let max_string_length = stats + .max_string_length + .map_or(0, |len| (1u64 << 63) | (len as u64)); + + Self { + min, + max, + max_string_length, + } + } +} + +#[derive(Default)] +pub struct ColumnStatisticsAggregate { + pub min: Option, + pub max: Option, + pub max_string_length: Option, +} + +impl ColumnStatisticsAggregate { + pub fn new(stats: &StatsSet) -> Self { + let min = match stats.get(Stat::Min) { + Some(Precision::Exact(min)) => Some(min), + _ => None, + }; + let max = match stats.get(Stat::Max) { + Some(Precision::Exact(max)) => Some(max), + _ => None, + }; + + let max_string_length = + if let Some(Precision::Exact(value)) = stats.get(Stat::UncompressedSizeInBytes) { + // DuckDB's string length is u32 + #[allow(clippy::cast_possible_truncation)] + Some(value.as_primitive().as_u64().vortex_expect("not a u64") as u32) + } else { + None + }; + + Self { + min, + max, + max_string_length, + } + } + + pub fn merge(&mut self, mut other: Self) { + self.min = match (self.min.take(), other.min.take()) { + (Some(left), Some(right)) => Some(if left.lt(&right) { left } else { right }), + _ => None, + }; + self.max = match (self.max.take(), other.max.take()) { + (Some(left), Some(right)) => Some(if left.gt(&right) { left } else { right }), + _ => None, + }; + self.max_string_length = match (self.max_string_length, other.max_string_length) { + (Some(left), Some(right)) => Some(max(left, right)), + _ => None, + }; + } +} + // --------------------------------------------------------------------------- // Blanket TableFunction implementation for any DataSourceTableFunction // --------------------------------------------------------------------------- @@ -202,38 +292,36 @@ impl TableFunction for T { input: &BindInputRef, result: &mut BindResultRef, ) -> VortexResult { - let (data_source, file_stats) = T::bind(ctx, input)?; - let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; - - if file_stats.is_none() { - for i in 0..column_names.len() { - result.add_result_column(&column_names[i], &column_types[i]); - } - - let stats = DataSourceStatistics::None(ColumnStatistics::default()); - return Ok(DataSourceBindData { - data_source, - filter_exprs: vec![], - column_names, - column_types, - stats, - }); + let data_source = T::bind(ctx, input)?; + let (column_names, column_types, column_dtypes) = + extract_schema_from_dtype(data_source.dtype())?; + for (column_name, column_type) in column_names.iter().zip(&column_types) { + result.add_result_column(column_name, column_type); } - let file_stats = file_stats.vortex_expect("no stats"); - let mut stats = Vec::with_capacity(column_names.len()); - for i in 0..column_names.len() { - result.add_result_column(&column_names[i], &column_types[i]); - let (stats_set, dtype) = file_stats.get(i); - stats.push(ColumnStatistics::new(stats_set, dtype.clone())); + let len = column_names.len(); + let mut stats = Vec::with_capacity(len); + for child in &data_source.children { + let MultiLayoutChild::Opened(reader) = child else { + vortex_bail!("Got deferred file in table function bind"); + }; + match (reader as &dyn Any).downcast_ref::() { + Some(reader_inner) => { + stats.push(reader_inner.file_stats.stats_sets().clone()); + } + None => { + stats.clear(); + break; + } + } } - let stats = DataSourceStatistics::All(stats); Ok(DataSourceBindData { - data_source, + data_source: Arc::new(data_source) as DataSourceRef, filter_exprs: vec![], column_names, column_types, + column_dtypes, stats, }) } @@ -445,15 +533,29 @@ impl TableFunction for T { Ok(false) } - fn statistics<'a>( - _client_context: &ClientContextRef, - bind_data: &'a Self::BindData, + fn statistics( + client_context: &ClientContextRef, + bind_data: &Self::BindData, column_index: usize, - ) -> &'a ColumnStatistics { - match &bind_data.stats { - DataSourceStatistics::All(items) => &items[column_index], - DataSourceStatistics::None(dummy) => dummy, + ) -> ColumnStatistics { + let cache = client_context.object_cache(); + // TODO(myrrc) cursed + let key = format!("{:p}-{}", std::ptr::addr_of!(bind_data), column_index); + if let Some(value) = cache.get(&key) { + return ColumnStatistics::from(value, bind_data.column_dtypes[column_index].clone()); } + + if bind_data.stats.is_empty() { + let stats = ColumnStatisticsAggregate::default(); + return ColumnStatistics::from(&stats, bind_data.column_dtypes[column_index].clone()); + }; + let mut stats = ColumnStatisticsAggregate::new(&bind_data.stats[0][column_index]); + for file_stats in &bind_data.stats[1..] { + stats.merge(ColumnStatisticsAggregate::new(&file_stats[column_index])); + } + let stats = cache.put(&key, stats); + let stats = unsafe { &*stats }; + ColumnStatistics::from(stats, bind_data.column_dtypes[column_index].clone()) } fn cardinality(bind_data: &Self::BindData) -> Cardinality { @@ -497,21 +599,26 @@ impl TableFunction for T { // --------------------------------------------------------------------------- /// Extracts DuckDB column names and logical types from a Vortex struct DType. -fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec, Vec)> { +fn extract_schema_from_dtype( + dtype: &DType, +) -> VortexResult<(Vec, Vec, Vec)> { let struct_dtype = dtype .as_struct_fields_opt() .ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?; - let mut column_names = Vec::new(); - let mut column_types = Vec::new(); + let len = struct_dtype.names().len(); + let mut column_names = Vec::with_capacity(len); + let mut column_types = Vec::with_capacity(len); + let mut column_dtypes = Vec::with_capacity(len); for (field_name, field_dtype) in struct_dtype.names().iter().zip(struct_dtype.fields()) { let logical_type = LogicalType::try_from(&field_dtype)?; column_names.push(field_name.to_string()); column_types.push(logical_type); + column_dtypes.push(field_dtype); } - Ok((column_names, column_types)) + Ok((column_names, column_types, column_dtypes)) } /// Creates a projection expression from raw projection/column ID slices and column names. diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 5c822c6be12..7ae7c5901dd 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -7,7 +7,6 @@ use std::ffi::c_void; use std::fmt::Debug; use std::ptr; -use vortex::dtype::DType; use vortex::error::VortexExpect; use vortex::error::VortexResult; mod bind; @@ -23,10 +22,6 @@ pub use bind::*; pub use init::*; pub use virtual_columns::VirtualColumnsResult; pub use virtual_columns::VirtualColumnsResultRef; -use vortex::array::stats::StatsSet; -use vortex::expr::stats::Precision; -use vortex::expr::stats::Stat; -use vortex::scalar::ScalarValue; use crate::cpp; use crate::cpp::duckdb_client_context; @@ -34,6 +29,7 @@ use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::DatabaseRef; use crate::duckdb::LogicalType; +use crate::duckdb::Value; use crate::duckdb::client_context::ClientContextRef; use crate::duckdb::data_chunk::DataChunkRef; use crate::duckdb::expr::ExpressionRef; @@ -45,54 +41,11 @@ use crate::duckdb::table_function::table_scan_progress::table_scan_progress_call use crate::duckdb::table_function::virtual_columns::get_virtual_columns_callback; use crate::duckdb_try; -#[derive(Clone)] -#[cfg_attr(test, derive(Debug))] +#[derive(Debug, Default)] pub struct ColumnStatistics { - pub dtype: DType, - pub min: Option, - pub max: Option, - pub max_string_length: Option, -} - -// Needed for absent file stats in data source -impl Default for ColumnStatistics { - fn default() -> Self { - Self { - dtype: DType::Null, - min: None, - max: None, - max_string_length: None, - } - } -} - -impl ColumnStatistics { - pub fn new(stats: &StatsSet, dtype: DType) -> Self { - let min = match stats.get(Stat::Min) { - Some(Precision::Exact(min)) => Some(min), - _ => None, - }; - let max = match stats.get(Stat::Max) { - Some(Precision::Exact(max)) => Some(max), - _ => None, - }; - - let max_string_length = - if let Some(Precision::Exact(value)) = stats.get(Stat::UncompressedSizeInBytes) { - // DuckDB's string length is u32 - #[allow(clippy::cast_possible_truncation)] - Some(value.as_primitive().as_u64().vortex_expect("not a u64") as u32) - } else { - None - }; - - Self { - dtype, - min, - max, - max_string_length, - } - } + pub min: Option, + pub max: Option, + pub max_string_length: u64, } /// A trait that defines the supported operations for a table function in DuckDB. @@ -141,11 +94,11 @@ pub trait TableFunction: Sized + Debug { result: &mut BindResultRef, ) -> VortexResult; - fn statistics<'a>( + fn statistics( client_context: &ClientContextRef, - bind_data: &'a Self::BindData, + bind_data: &Self::BindData, column_index: usize, - ) -> &'a ColumnStatistics; + ) -> ColumnStatistics; /// The function is called during query execution and is responsible for producing the output fn scan( diff --git a/vortex-duckdb/src/duckdb/table_function/statistics.rs b/vortex-duckdb/src/duckdb/table_function/statistics.rs index 93d1148910a..1b48389b149 100644 --- a/vortex-duckdb/src/duckdb/table_function/statistics.rs +++ b/vortex-duckdb/src/duckdb/table_function/statistics.rs @@ -5,17 +5,13 @@ use std::ffi::c_void; use std::ptr; use vortex::error::VortexExpect; -use vortex::scalar::Scalar; -use crate::convert::ToDuckDBScalar; -use crate::cpp::duckdb_client_context; -use crate::cpp::{self}; +use crate::cpp; use crate::duckdb::ClientContext; use crate::duckdb::TableFunction; -use crate::duckdb::Value; pub(crate) unsafe extern "C-unwind" fn statistics( - ctx: duckdb_client_context, + ctx: cpp::duckdb_client_context, bind_data: *const c_void, column_index: usize, stats_out: *mut cpp::duckdb_column_statistics, @@ -25,30 +21,14 @@ pub(crate) unsafe extern "C-unwind" fn statistics( let bind_data = unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); let stats_ref = T::statistics(client_context, bind_data, column_index); - let dtype = &stats_ref.dtype; - // By definition dtype matches the value, so use vortex_expect - if let Some(ref value) = stats_ref.min { - let value = Scalar::try_new(dtype.clone(), Some(value.clone())) - .vortex_expect("scalar dtype and value are incompatible") - .try_to_duckdb_scalar() - .vortex_expect("can't convert Scalar to duckdb Value"); - stats_out.min = Value::into_ptr(value); - } else { - stats_out.min = ptr::null_mut(); - } - - if let Some(ref value) = stats_ref.max { - let value = Scalar::try_new(dtype.clone(), Some(value.clone())) - .vortex_expect("scalar dtype and value are incompatible") - .try_to_duckdb_scalar() - .vortex_expect("can't convert Scalar to duckdb Value"); - stats_out.max = Value::into_ptr(value); - } else { - stats_out.max = ptr::null_mut(); - } - - stats_out.max_string_length = stats_ref - .max_string_length - .map_or(0, |len| (1u64 << 63) | (len as u64)); + stats_out.min = stats_ref + .min + .as_ref() + .map_or(ptr::null_mut(), |v| v.as_ptr()); + stats_out.max = stats_ref + .max + .as_ref() + .map_or(ptr::null_mut(), |v| v.as_ptr()); + stats_out.max_string_length = stats_ref.max_string_length; } diff --git a/vortex-duckdb/src/e2e_test/object_cache_test.rs b/vortex-duckdb/src/e2e_test/object_cache_test.rs index 2a12f34c6c5..7aba481ada6 100644 --- a/vortex-duckdb/src/e2e_test/object_cache_test.rs +++ b/vortex-duckdb/src/e2e_test/object_cache_test.rs @@ -5,10 +5,8 @@ use std::ffi::CString; -use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::error::vortex_err; -use vortex_array::stats::StatsSet; use crate::cpp::DUCKDB_TYPE; use crate::duckdb::BindInputRef; @@ -26,7 +24,6 @@ pub struct TestTableFunction; #[derive(Debug, Clone)] pub struct TestBindData { cache_key: String, - stats: ColumnStatistics, } #[derive(Debug)] @@ -65,7 +62,6 @@ impl TableFunction for TestTableFunction { Ok(TestBindData { cache_key: "test_table_function_data".to_string(), - stats: ColumnStatistics::new(&StatsSet::default(), DType::Null), }) } @@ -118,12 +114,12 @@ impl TableFunction for TestTableFunction { Ok(0) } - fn statistics<'a>( + fn statistics( _client_context: &ClientContextRef, - bind_data: &'a Self::BindData, + _bind_data: &Self::BindData, _column_index: usize, - ) -> &'a ColumnStatistics { - &bind_data.stats + ) -> ColumnStatistics { + ColumnStatistics::default() } } diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 7ab5bf362d9..466756cea64 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -13,13 +13,12 @@ use vortex::error::vortex_err; use vortex::file::multi::MultiFileDataSource; use vortex::io::filesystem::FileSystemRef; use vortex::io::runtime::BlockingRuntime; -use vortex::scan::DataSourceRef; +use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex_utils::aliases::hash_map::HashMap; use crate::RUNTIME; use crate::SESSION; use crate::datasource::DataSourceTableFunction; -use crate::datasource::DataSourceWithStats; use crate::duckdb::BindInputRef; use crate::duckdb::ClientContextRef; use crate::duckdb::ExtractedValue; @@ -77,7 +76,7 @@ impl DataSourceTableFunction for VortexMultiFileScan { vec![LogicalType::varchar()] } - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { bind_multi_file_scan(ctx, input) } } @@ -90,7 +89,7 @@ impl DataSourceTableFunction for VortexMultiFileScanList { ] } - fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { + fn bind(ctx: &ClientContextRef, input: &BindInputRef) -> VortexResult { bind_multi_file_scan(ctx, input) } } @@ -99,7 +98,7 @@ impl DataSourceTableFunction for VortexMultiFileScanList { fn bind_multi_file_scan( ctx: &ClientContextRef, input: &BindInputRef, -) -> VortexResult { +) -> VortexResult { let glob_url_parameter = input .get_parameter(0) .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; @@ -152,9 +151,7 @@ fn bind_multi_file_scan( builder = builder.with_glob(glob_url.path(), Some(fs)); } - let (data_source, stats) = builder.build().await?; - let ds = Arc::new(data_source) as DataSourceRef; - Ok((ds, stats)) + builder.build().await }) } diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index c479adc5c3e..61dde06da16 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -11,7 +11,6 @@ use async_trait::async_trait; use futures::TryStreamExt; use session::MultiFileSessionExt; use tracing::debug; -use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::filesystem::FileListing; @@ -22,7 +21,6 @@ use vortex_layout::scan::multi::MultiLayoutDataSource; use vortex_scan::DataSource; use vortex_session::VortexSession; -use crate::FileStatistics; use crate::OpenOptionsSessionExt; use crate::VortexOpenOptions; use crate::v2::FileStatsLayoutReader; @@ -67,7 +65,6 @@ pub struct MultiFileDataSource { } type FileWithFs = (FileListing, FileSystemRef); -type DataSourceWithStats = (MultiLayoutDataSource, Option); impl MultiFileDataSource { /// Create a new [`MultiFileDataSource`] builder. @@ -114,7 +111,7 @@ impl MultiFileDataSource { /// /// Discovers files via glob, opens the first file eagerly to determine the schema, /// and creates lazy factories for the remaining files. - pub async fn build(mut self) -> VortexResult<(impl DataSource, Option)> { + pub async fn build(mut self) -> VortexResult { if self.glob_sources.is_empty() { vortex_bail!("MultiFileDataSource requires at least one glob pattern"); } @@ -157,14 +154,11 @@ impl MultiFileDataSource { if self.open_all { self.build_eager(all_files).await } else { - Ok((self.build_lazy(all_files).await?, None)) + self.build_lazy(all_files).await } } - async fn build_eager(&mut self, files: Vec) -> VortexResult { - let mut all_files_have_stats = true; - let mut stats: Option = None; - + async fn build_eager(&mut self, files: Vec) -> VortexResult { let open_fn = self.open_options_fn.as_ref(); let files_len = files.len(); let mut readers = Vec::with_capacity(files_len); @@ -173,36 +167,19 @@ impl MultiFileDataSource { let mut file = open_file(&fs, &file, &self.session, open_fn).await?; let mut reader = file.layout_reader()?; - let file_stats = file.take_file_stats(); - if file_stats.is_none() { - all_files_have_stats = false; - readers.push(reader); - continue; - } - let file_stats = file_stats.vortex_expect("no stats"); - - if all_files_have_stats { - stats = Some(match stats.take() { - None => file_stats.clone(), - Some(st) => st.merge(&file_stats)?, - }) + if let Some(file_stats) = file.take_file_stats() { + reader = Arc::new(FileStatsLayoutReader::new( + reader, + file_stats, + self.session.clone(), + )); } - - reader = Arc::new(FileStatsLayoutReader::new( - reader, - file_stats, - self.session.clone(), - )); readers.push(reader); } let inner = MultiLayoutDataSource::new_eager(readers, &self.session); debug!(file_count = files_len, dtype = %inner.dtype(), "built MultiFileDataSource"); - - if !all_files_have_stats { - stats = None - } - Ok((inner, stats)) + Ok(inner) } async fn build_lazy(&self, files: Vec) -> VortexResult { diff --git a/vortex-file/src/v2/file_stats_reader.rs b/vortex-file/src/v2/file_stats_reader.rs index fb31b28d61f..495f24a094d 100644 --- a/vortex-file/src/v2/file_stats_reader.rs +++ b/vortex-file/src/v2/file_stats_reader.rs @@ -46,7 +46,7 @@ use crate::FileStatistics; /// (the result is the same regardless of which row range is requested). pub struct FileStatsLayoutReader { child: LayoutReaderRef, - file_stats: FileStatistics, + pub file_stats: FileStatistics, struct_fields: StructFields, session: VortexSession, prune_cache: DashMap, diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 9af521c978a..1478c41a1ed 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -80,11 +80,11 @@ pub trait LayoutReaderFactory: 'static + Send + Sync { pub struct MultiLayoutDataSource { dtype: DType, session: VortexSession, - children: Vec, + pub children: Vec, concurrency: usize, } -enum MultiLayoutChild { +pub enum MultiLayoutChild { Opened(LayoutReaderRef), Deferred(Arc), } From b4cf77b73d7d8cad6d9413e33509ff778f006b2b Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 15 Apr 2026 17:43:56 +0100 Subject: [PATCH 3/3] better --- benchmarks/datafusion-bench/src/main.rs | 2 +- vortex-file/src/footer/file_statistics.rs | 16 ---------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 37c93687827..ad0df8ea74a 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -316,7 +316,7 @@ async fn register_v2_tables( None => format!("*.{}", format.ext()), }; - let (multi_ds, _) = MultiFileDataSource::new(SESSION.clone()) + let multi_ds = MultiFileDataSource::new(SESSION.clone()) .with_glob(glob_pattern, Some(fs)) .build() .await?; diff --git a/vortex-file/src/footer/file_statistics.rs b/vortex-file/src/footer/file_statistics.rs index 5126b03a2d4..4fac3ad8482 100644 --- a/vortex-file/src/footer/file_statistics.rs +++ b/vortex-file/src/footer/file_statistics.rs @@ -146,22 +146,6 @@ impl FileStatistics { pub fn get(&self, field_idx: usize) -> (&StatsSet, &DType) { (&self.stats[field_idx], &self.dtypes[field_idx]) } - - pub fn merge(self, other: &FileStatistics) -> VortexResult { - vortex_ensure_eq!(self.stats.len(), other.stats_sets().len()); - - let FileStatistics { mut stats, dtypes } = self; - for (this, other_stat, dtype) in itertools::izip!( - Arc::make_mut(&mut stats).iter_mut(), - other.stats_sets().iter(), - dtypes.iter() - ) { - let owned = std::mem::take(this); - *this = owned.merge_unordered(other_stat, dtype); - } - - Ok(FileStatistics::new(stats, dtypes)) - } } impl<'a> IntoIterator for &'a FileStatistics {