diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index f8ad847d5e7..086c75adda7 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -13,13 +13,16 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; @@ -189,6 +192,7 @@ pub struct VortexSource { natural_split_ranges: Arc]>>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, + pub(crate) ordered: bool, vx_metrics_registry: Arc, file_metadata_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. @@ -223,6 +227,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + ordered: false, options: VortexTableOptions::default(), } } @@ -335,7 +340,7 @@ impl VortexSource { metrics_registry: Arc::clone(&self.vx_metrics_registry), layout_readers: Arc::clone(&self.layout_readers), natural_split_ranges: Arc::clone(&self.natural_split_ranges), - has_output_ordering: !base_config.output_ordering.is_empty(), + has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered, expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), projection_pushdown: self.options.projection_pushdown, @@ -378,6 +383,27 @@ impl FileSource for VortexSource { VORTEX_FILE_EXTENSION } + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> DFResult>> { + if order.is_empty() { + return Ok(SortOrderPushdownResult::Unsupported); + } + + if eq_properties.ordering_satisfy(order.iter().cloned())? { + let mut this = self.clone(); + this.ordered = true; + + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(this) as Arc, + }); + } + + Ok(SortOrderPushdownResult::Unsupported) + } + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { @@ -493,6 +519,7 @@ mod tests { use arrow_schema::Schema; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_expr::expressions::Column; use object_store::memory::InMemory; use vortex::VortexSessionDefault; @@ -532,6 +559,53 @@ mod tests { } } + fn sort_column(name: &str, index: usize) -> PhysicalSortExpr { + let expr: PhysicalExprRef = Arc::new(Column::new(name, index)); + PhysicalSortExpr::new_default(expr) + } + + fn sort_test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])) + } + + fn sort_test_source(schema: Arc) -> VortexSource { + VortexSource::new( + TableSchema::from_file_schema(schema), + VortexSession::default(), + ) + } + + fn assert_ordered_source(inner: Arc) -> anyhow::Result<()> { + let source = inner + .downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?; + + assert!(source.ordered); + Ok(()) + } + + #[test] + fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> { + let schema = sort_test_schema(); + let source = sort_test_source(Arc::clone(&schema)); + let order = vec![sort_column("a", 0), sort_column("b", 1)]; + let eq_properties = EquivalenceProperties::new_with_orderings(schema, [order.clone()]); + + let result = source.try_pushdown_sort(&order, &eq_properties)?; + + match result { + SortOrderPushdownResult::Exact { inner } => assert_ordered_source(inner)?, + SortOrderPushdownResult::Inexact { .. } | SortOrderPushdownResult::Unsupported => { + anyhow::bail!("expected exact sort pushdown") + } + } + assert!(!source.ordered); + Ok(()) + } + #[test] fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); diff --git a/vortex-sqllogictest/slt/datafusion/order_pushdown.slt b/vortex-sqllogictest/slt/datafusion/order_pushdown.slt new file mode 100644 index 00000000000..49ecba8acb5 --- /dev/null +++ b/vortex-sqllogictest/slt/datafusion/order_pushdown.slt @@ -0,0 +1,44 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +include ../setup.slt.no + +statement ok +CREATE EXTERNAL TABLE ordered_pushdown ( + c1 VARCHAR NOT NULL, + c2 INT NOT NULL +) +STORED AS vortex +WITH ORDER (c1 ASC) +LOCATION '$__TEST_DIR__/ordered_pushdown/'; + +statement ok +INSERT INTO ordered_pushdown VALUES + ('air', 5), + ('balloon', 42); + +statement ok +INSERT INTO ordered_pushdown VALUES + ('zebra', 5); + +statement ok +INSERT INTO ordered_pushdown VALUES + ('texas', 2000), + ('alabama', 2000); + +query TI +SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3; +---- +air 5 +alabama 2000 +balloon 42 + +query TT +EXPLAIN SELECT c1, c2 FROM ordered_pushdown ORDER BY c1 ASC LIMIT 3; +---- +logical_plan +01)Sort: ordered_pushdown.c1 ASC NULLS LAST, fetch=3 +02)--TableScan: ordered_pushdown projection=[c1, c2] +physical_plan +01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST], fetch=3 +02)--DataSourceExec: file_groups={}, projection=[c1, c2], limit=3, output_ordering=[c1@0 ASC NULLS LAST], file_type=vortex