Skip to content

Revert transition-heavy stages to Spark row-based execution #4518

@karuppayya

Description

@karuppayya

What is the problem the feature request solves?

When Comet converts operators to native columnar execution but several operators in the stage cannot run natively (UDFs, SortAggregate, unsupported expressions), the stage accumulates multiple columnar-to-row
(C2R) and row-to-columnar (R2C) transitions. With columnar shuffle enabled, each C2R has a corresponding R2C — so data repeatedly bounces between formats.

When many operators in a stage fall back, the conversion overhead of these round-trips can exceed the benefit of running the few remaining operators natively.

Currently Comet decides per-operator whether to convert, with no awareness of how many transitions the stage as a whole accumulates. This proposal introduces a stage-aware fallback — evaluating the stage after
all operators have been converted and transitions inserted, then reverting the entire stage if the transition count is too high.

Example — a stage with 3 C2R transitions:

 CometShuffleExchangeExec
    └── CometProject
          └── ColumnarToRow
                └── SortAggregate            (unsupported)
                      └── RowToColumnar
                            └── CometHashAggregate
                                  └── ColumnarToRow
                                        └── UDF Filter          (unsupported)
                                              └── RowToColumnar
                                                    └── CometHashAggregate
                                                          └── ColumnarToRow
                                                                └── SortAggregate  (unsupported)
                                                                      └── QueryStageExec

This stage pays 3 C2R + 3 R2C = 6 format conversions while only running 3 operators natively.

Describe the potential solution

A postColumnarTransitions rule that counts C2R transitions per stage and, if the count exceeds a configurable threshold, reverts the stage to Spark row-based execution using each CometExec.originalPlan. A single
RowToColumnarExec is inserted at the output to feed the columnar shuffle.

After revert:

  CometShuffleExchangeExec
    └── RowToColumnarExec
          └── ProjectExec
                └── SortAggregate
                      └── HashAggregateExec
                            └── FilterExec
                                  └── SortAggregate
                                        └── QueryStageExec

Additional context

This is conceptually similar to Spark's SimpleCostEvaluator in AQE, which uses the number of shuffle nodes as a cost signal to decide whether to accept a re-optimized plan. Here we use the number of C2R transitions as
a cost signal to decide whether native execution is worthwhile for a given stage.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestpriority:mediumFunctional bugs, performance regressions, broken features

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions