Skip to content

compute: insert temporal bucketing before ArrangeBy nodes#35935

Open
antiguru wants to merge 10 commits intoMaterializeInc:mainfrom
antiguru:moritz/temporal-bucketing-arrangeby
Open

compute: insert temporal bucketing before ArrangeBy nodes#35935
antiguru wants to merge 10 commits intoMaterializeInc:mainfrom
antiguru:moritz/temporal-bucketing-arrangeby

Conversation

@antiguru
Copy link
Copy Markdown
Member

Summary

  • Adds a has_temporal_predicates() helper to MapFilterProject to detect temporal MFPs (those using mz_now())
  • Propagates a has_future_updates: bool bit through LIR plan lowering — every plan node now knows whether its output may contain future-timestamped updates
  • Stores this as input_has_future_updates: bool on PlanNode::ArrangeBy / RenderPlanNode::ArrangeBy
  • When the flag is set and ENABLE_TEMPORAL_BUCKETING is on, inserts temporal bucketing (via BucketChain) between as_collection_core output and arrangement formation in ensure_collections
  • Removes the old source-level bucketing, which was less targeted (sources don't produce future updates — temporal MFPs do)
  • Adds maybe_apply_temporal_bucketing to RenderTimestamp to handle the TotalOrder bound: real bucketing for mz_repr::Timestamp, no-op for Product<...> in iterative scopes (TODO for future work)

Motivation

Arrangement merge batchers sort/consolidate by (data, time). When a temporal MFP (e.g. WHERE mz_now() > ts) produces updates with far-future timestamps, the batcher does unnecessary work merging data that won't be read for a long time. Temporal bucketing buffers these future updates in a logarithmic BucketChain and releases them as the frontier advances.

The old source-level bucketing applied to all source imports regardless of whether future updates were actually produced. This change targets bucketing precisely at ArrangeBy nodes that receive future updates from upstream temporal MFPs.

Test plan

  • cargo test -p mz-expr mz-compute-types mz-compute passes locally
  • SLT tests with temporal filters (CI)
  • Feature gated behind existing ENABLE_TEMPORAL_BUCKETING dyncfg (default false)

🤖 Generated with Claude Code

@github-actions
Copy link
Copy Markdown
Contributor

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

@antiguru antiguru force-pushed the moritz/temporal-bucketing-arrangeby branch 2 times, most recently from 650ca9b to fff4565 Compare April 13, 2026 08:27
@antiguru antiguru marked this pull request as ready for review April 13, 2026 09:28
@antiguru antiguru requested a review from a team as a code owner April 13, 2026 09:28
Copy link
Copy Markdown
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This make sense to me, though somebody who understand the lowering should probably review that part, as it's also the part that has the most potential for logic bugs.

until: Antichain<mz_repr::Timestamp>,
config_set: &ConfigSet,
input_has_future_updates: bool,
as_of: Antichain<mz_repr::Timestamp>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Move as_of before until.

antiguru and others added 10 commits April 14, 2026 13:16
Add a simple helper method that checks whether any predicate in an MFP
contains a temporal expression (mz_now()), without modifying the MFP.
This will be used in LIR lowering to detect when temporal bucketing is
needed before arrangement-forming operators.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Structural change only — the field is false everywhere. The lowering
logic to set it follows in the next commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Tracks whether each plan node's output may contain updates at future
timestamps (from temporal MFPs using mz_now()). Sets
input_has_future_updates on ArrangeBy nodes so the renderer knows
where to insert temporal bucketing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dispatches temporal bucketing based on timestamp type. For
mz_repr::Timestamp (TotalOrder), applies actual bucketing. For Product
timestamps in iterative scopes, no-op for now.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When an ArrangeBy node's input_has_future_updates flag is set and the
ENABLE_TEMPORAL_BUCKETING dyncfg is on, applies temporal bucketing to
the collection before forming arrangements. This buffers far-future
updates in a BucketChain, preventing the arrangement merge batcher
from doing unnecessary work on them.
The targeted ArrangeBy-level bucketing from the previous commit
subsumes this. Source data itself doesn't contain future updates;
future updates are produced by temporal MFPs downstream.
The previous commit only bucketed when ensure_collections formed a new
collection from scratch (self.collection.is_none()). In the typical
temporal-MFP -> ArrangeBy flow, the upstream Mfp/Get already produced
the collection, so that path was skipped. Add a second bucketing site
at arrangement consumption for pre-existing collections.
PassArrangements passes through the raw collection unchanged, which may
contain future timestamps from an upstream temporal MFP. Track which ids
have future updates in a BTreeSet during lowering so that a downstream
Get with PassArrangements correctly reports has_future_updates.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Add a benchmark that creates an index on a view with a temporal filter
over 1M rows joined with a table. Exercises the temporal bucketing path
in ArrangeBy rendering.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@antiguru antiguru force-pushed the moritz/temporal-bucketing-arrangeby branch from 489b798 to cfdd140 Compare April 14, 2026 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants