Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::PhysicalExpr;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
Expand Down Expand Up @@ -190,6 +194,7 @@ pub struct VortexSource {
natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
pub(crate) ordered: bool,
vx_metrics_registry: Arc<dyn MetricsRegistry>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
/// Whether to enable expression pushdown into the underlying Vortex scan.
Expand Down Expand Up @@ -224,6 +229,7 @@ impl VortexSource {
vortex_reader_factory: None,
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
file_metadata_cache: None,
ordered: false,
options: VortexTableOptions::default(),
}
}
Expand Down Expand Up @@ -336,7 +342,7 @@ impl VortexSource {
metrics_registry: Arc::clone(&self.vx_metrics_registry),
layout_readers: Arc::clone(&self.layout_readers),
natural_split_ranges: Arc::clone(&self.natural_split_ranges),
has_output_ordering: !base_config.output_ordering.is_empty(),
has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered,
expression_convertor: Arc::clone(&self.expression_convertor),
file_metadata_cache: self.file_metadata_cache.clone(),
projection_pushdown: self.options.projection_pushdown,
Expand Down Expand Up @@ -383,6 +389,64 @@ impl FileSource for VortexSource {
VORTEX_FILE_EXTENSION
}

fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> DFResult<SortOrderPushdownResult<Arc<dyn FileSource>>> {
if order.is_empty() {
return Ok(SortOrderPushdownResult::Unsupported);
}

if eq_properties.ordering_satisfy(order.iter().cloned())? {
let mut this = self.clone();
this.ordered = true;

return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(this) as Arc<dyn FileSource>,
});
}

for prefix_len in 1..order.len() {
let prefix = order[..prefix_len].to_vec();
if eq_properties.ordering_satisfy(prefix.iter().cloned())? {
return Ok(SortOrderPushdownResult::Unsupported);
}
}

let sort_order = LexOrdering::new(order.iter().cloned());
let column_in_file_schema = sort_order.as_ref().is_some_and(|s| {
s.first()
.expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
.is_some_and(|col| {
self.table_schema
.file_schema()
.field_with_name(col.name())
.is_ok()
})
});

if !column_in_file_schema {
return Ok(SortOrderPushdownResult::Unsupported);
}

let is_descending = sort_order
.as_ref()
.is_some_and(|s| s.first().options.descending);

if !is_descending {
let mut this = self.clone();
this.ordered = true;
return Ok(SortOrderPushdownResult::Inexact {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a bit optimistic? I don't know how datafusion sort operators treat this but would we want to fallback to a near sorted optimised sort strategy always if we have an ascending sort of a column that exists in the file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordered here is just "the order of the file", instead of returning batches in whichever order we get them.

inner: Arc::new(this) as Arc<dyn FileSource>,
});
}

Ok(SortOrderPushdownResult::Unsupported)
}

fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
Expand Down
Loading