Skip to content

feat: add sorted_series column for DataFusion streaming aggregation#6290

Open
g-talbot wants to merge 8 commits intogtt/sorted-series-columnfrom
gtt/sorted-series-key
Open

feat: add sorted_series column for DataFusion streaming aggregation#6290
g-talbot wants to merge 8 commits intogtt/sorted-series-columnfrom
gtt/sorted-series-key

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

  • Compute a composite, lexicographically sortable sorted_series binary column at Parquet write time using storekey order-preserving encoding
  • For each row, encodes non-null sort schema tag columns as (ordinal: u8, value: str) pairs, then appends timeseries_id (i64) as final discriminator
  • Identical timeseries always produce identical byte keys regardless of timestamp or value, enabling DataFusion's streaming AggregateExec and BoundedWindowAggExec with O(1) memory instead of O(N) hash tables
  • Column is placed after sort columns in physical layout (Phase 1b in reorder_columns) for optimal streaming read
  • Fixes create_nullable_dict_array bug: dictionary keys now correctly index into unique values (was using original array index, causing panics for mixed null/non-null inputs)

Stacked on top of #6287 (column ordering) and timeseries_id work.

Design

Based on the Sorted Series Column design doc:

Key structure for sort schema [metric_name(0), service(1), ..., host(5)]:

┌──────────┬────────────────┬──────────┬──────────────┬─────────────────┐
│ ordinal 0│ "cpu.usage"    │ ordinal 1│ "api"        │ timeseries_id   │
│ (u8)     │ (storekey str) │ (u8)     │ (storekey)   │ (storekey i64)  │
└──────────┴────────────────┴──────────┴──────────────┴─────────────────┘

Null columns are skipped. The ordinal prefix prevents cross-column byte collisions for sparse schemas.

Test plan

  • 19 tests (identity, discrimination, sort-order, null handling, stability, Parquet round-trip, structural ordinal verification, 2 proptests)
  • All 195 quickwit-parquet-engine tests pass
  • Clippy clean, formatted, no unused deps
  • License headers pass
  • Docs compile

🤖 Generated with Claude Code

@g-talbot g-talbot changed the base branch from main to gtt/sorted-series-column April 10, 2026 14:12
@g-talbot g-talbot changed the base branch from gtt/sorted-series-column to main April 10, 2026 14:12
@g-talbot g-talbot changed the base branch from main to gtt/sorted-series-column April 10, 2026 14:14
@g-talbot g-talbot force-pushed the gtt/sorted-series-column branch from 60d859c to 9522326 Compare April 10, 2026 14:17
@g-talbot g-talbot force-pushed the gtt/sorted-series-key branch from 53bc3e0 to cb1b4d2 Compare April 10, 2026 14:25
g-talbot and others added 3 commits April 10, 2026 10:42
Compute a composite, lexicographically sortable binary column
(sorted_series) at Parquet write time using storekey order-preserving
encoding. For each row the key encodes:

  1. Non-null sort schema tag columns as (ordinal: u8, value: str)
  2. timeseries_id (i64) as final discriminator

Identical timeseries always produce identical byte keys regardless of
timestamp or value, enabling DataFusion's streaming AggregateExec and
BoundedWindowAggExec with O(1) memory instead of O(N) hash tables.

Also fixes create_nullable_dict_array which used the original array
index as dictionary key instead of the position in the unique values
array, causing out-of-bounds panics for mixed null/non-null inputs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/sorted-series-column branch from 9522326 to b0344ba Compare April 10, 2026 14:42
@g-talbot g-talbot force-pushed the gtt/sorted-series-key branch from 9a8b9a5 to 58f4810 Compare April 10, 2026 14:42
g-talbot and others added 5 commits April 10, 2026 10:46
Without the ordinal, the timeseries_id bytes could collide with a
subsequent tag column's ordinal+string encoding. Every component in
the key now consistently gets an ordinal prefix from its sort schema
position.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add tests that assert:
- timeseries_id gets ordinal 6 prefix (its sort schema position)
- key length is exact: ordinal(1) + str(2) + ordinal(1) + i64(8) = 12
- when timeseries_id is absent, no trailing ordinal appears

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Writes a 6-row batch with 4 distinct series (including null tags)
through the ParquetWriter pipeline, reads back, and verifies:

- 4 distinct keys produced (series identity)
- series with 3 rows produces 3 identical keys
- null host differs from present host (ordinal skipping)
- all-null tags differ from partial-null tags
- ordinal bytes are correct (0x00 for metric_name, 0x01 for service,
  0x06 for timeseries_id) even when intermediate tags are null
- equal keys are contiguous after sort (streaming aggregation ready)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Regenerate storekey entry via dd-rust-license-tool (correct authors)
- Fix 4 rustfmt nightly formatting diffs in sorted_series tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant