Skip to content

feat: clustered segments pt.2 (write side support)#19579

Open
clintropolis wants to merge 6 commits into
apache:masterfrom
clintropolis:clustered-segment-writer-stuff
Open

feat: clustered segments pt.2 (write side support)#19579
clintropolis wants to merge 6 commits into
apache:masterfrom
clintropolis:clustered-segment-writer-stuff

Conversation

@clintropolis

@clintropolis clintropolis commented Jun 13, 2026

Copy link
Copy Markdown
Member

Description

Follow-up to #19460, this PR introduces the writer side stuff so that the segments can actually be created.

As part of this I'm experimenting with a new V10 oriented way to express DataSchema, where the ingest spec looks a lot more like how the V10 segment metadata is organized, as 1 or more projections. Using the wikipedia example, a clustered segment ingest spec looks something like this:

    "dataSchema": {
      "dataSource": "wikipedia_clustered",
      "segmentGranularitySpec": {
        "segmentGranularity": "day"
      },
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "baseTable": {
        "type": "clusteredValueGroups",
        "virtualColumns": [
          {
            "type": "expression",
            "name": "__virtualGranularity",
            "expression": "timestamp_floor(__time,'PT1H')",
            "outputType": "LONG"
          }
        ],
        "clusteringColumns": [
          "channel"
        ],
        "dimensions": [
          "page",
          "comment"
...
        ]
      }
    }

Like projections, it requires expressing the query granularity as a virtual column transformation of __time (though a bit more fragile than expressions since it requires the column be named __virtualGranularity right now, thinking about changing this to 'resolve' it like we do for projections, or something else, not sure yet). To support this, when using baseTable there is also a segmentGranularity field which splits out the segment granularity parts of the existing GranularitySpec. To be less disruptive for now, these can be computed into a GranularitySpec, but over time i'd like to migrate to this model of expressing the schema.

Includes the fix part o #19578 since this is the branch where I ran into that problem.

changes:

  • adds BaseTableProjectionSpec interface to capture the operator facing shape of V10 base table schemas
  • adds ClusteredValueGroupsBaseTableSchema implementation for ingesting clustered segments
  • adds DataSchema.baseTable, a BaseTableProjectionSpec which when set puts the DataSchema into a new mode where the majority of the schema is defined via the baseTable, rejecting other top level fields
  • adds DataSchema.segmentGranularity to use when baseTable is set, which captures the segment granularity and intervals (query granularity is defined by the baseTable)
  • adds AdaptedBaseTableProjectionSpec implementation for converting classic DataSchema fields to a BaseTableProjectionSpec
  • adds OnHeapClusteredBaseTable, OnHeapClusterGroup used by OnHeapIncrementalIndex to build clustered segments
  • adds IndexMergerV10.makeClusteredIndexFiles which merges and builds clustered v10 segments
  • Sink/BatchAppenderator/StreamAppenderator wiring for clustered segments so that the cluster group tuples appear on the DataSegment
  • known issues: unbounded, no aggregate projections, no compaction support, no time ordered cursor support

changes:
* adds `BaseTableProjectionSpec` interface to capture the operator facing shape of V10 base table schemas
* adds `ClusteredValueGroupsBaseTableSchema` implementation for ingesting clustered segments
* adds `DataSchema.baseTable`, a `BaseTableProjectionSpec` which when set puts the DataSchema into a new mode where the majority of the schema is defined via the baseTable, rejecting other top level fields
* adds `DataSchema.segmentGranularity` to use when `baseTable` is set, which captures the segment granularity and intervals (query granularity is defined by the baseTable)
* adds `AdaptedBaseTableProjectionSpec` implementation for converting classic `DataSchema` fields to a `BaseTableProjectionSpec`
* adds `OnHeapClusteredBaseTable`, `OnHeapClusterGroup` used by `OnHeapIncrementalIndex` to build clustered segments
* adds `IndexMergerV10.makeClusteredIndexFiles` which merges and builds clustered v10 segments
* `Sink`/`BatchAppenderator`/`StreamAppenderator` wiring for clustered segments so that the cluster group tuples appear on the `DataSegment`
* known issues: unbounded, no aggregate projections, no compaction support, no time ordered cursor support

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

Reviewed 53 of 53 changed files.


This is an automated review by Codex GPT-5.5

this.metrics = metrics == null ? new AggregatorFactory[0] : metrics;
validateNoOverlap(this.clusteringColumns, this.nonClusteringDimensions);
this.dimensionsSpec = computeDimensionsSpec(this.clusteringColumns, this.nonClusteringDimensions);
this.ordering = declaredOrdering != null

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Declared ordering can be advertised without being honored

The spec accepts any declaredOrdering here, but the clustered write path does not use it to build the actual per-group row comparator: OnHeapClusterGroup derives ordering from timePosition/DimensionsSpec, while IndexMergerV10 emits groups in ascending tuple order and then stores firstSchema.getOrdering() in metadata. A spec such as [tenant ASC, page ASC, __time ASC], or any DESC order, can therefore be persisted with advertised ordering that the data does not actually satisfy. Query engines may trust CursorHolder/DataSegment ordering and skip sorting, producing misordered results. Please reject unsupported ordering or wire the declared order through ingestion and merge.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this is fair, i should probably make it purely computed for now like AggregateProjectionSpec since its a bit of work to actually wire this up to honor it

{
final int numClusteringColumns = clusteringColumns.size();
return OnheapIncrementalIndex.ROUGH_OVERHEAD_PER_MAP_ENTRY
+ (long) numClusteringColumns * (Long.BYTES * 2 + Long.BYTES)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants