Skip to content

[fix](be) Avoid mutating shared local shuffle columns#64090

Closed
eldenmoon wants to merge 14 commits into
apache:masterfrom
eldenmoon:fe_local_shuffle_rebase3
Closed

[fix](be) Avoid mutating shared local shuffle columns#64090
eldenmoon wants to merge 14 commits into
apache:masterfrom
eldenmoon:fe_local_shuffle_rebase3

Conversation

@eldenmoon
Copy link
Copy Markdown
Member

What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Queries that evaluate complex VARIANT expressions after local exchange can share input blocks across downstream pipeline tasks. Variant casts and Variant serialization finalized the source column in-place, and nested-loop join lazy materialization also reused mutable probe-side columns while expanding rows. This could make local-shuffle query results unstable and could trigger lazy nested-loop join nullable-column size mismatches. This change finalizes private Variant copies for casts and serialization, deep-clones finalized Variant columns, and copies the lazy probe block before nested-loop join materialization appends rows.

Release note

None

Check List (For Author)

  • Test:
    • ./run-be-ut.sh --run --filter=FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:NLJAppendProbeDataWithNullTest.*:ColumnVariantTest.clone_finalized_deep_copies_columns
    • ./run-be-ut.sh --run --filter=ColumnVariantTest.serialize_does_not_finalize_source_column
    • ./build.sh --be
    • Manual test: local 4BE repro query for nullable nested-loop join lazy materialization, 8 workers x 80 loops, 0 failures and stable output hash
    • Manual test: local 4BE RQG-shaped Variant/local-shuffle query, local shuffle off baseline vs 50 local shuffle on runs, 0 mismatches
  • Behavior changed: No
  • Does this need documentation: No

924060929 and others added 14 commits May 21, 2026 17:42
Previously, local exchange (LE) nodes were inserted exclusively by the
BE's `_plan_local_exchange` at pipeline build time.  The FE had no
visibility into which operators needed a fan-out or shuffle before
execution, making it impossible to validate, optimize, or override LE
decisions at planning time.

This PR introduces a full FE-side local exchange planner that mirrors
BE semantics, brings several correctness fixes, and leaves the legacy
BE path fully intact behind a feature flag.  See "Current architecture
notes" at the bottom for what the FE planner does and does not own.

A new `AddLocalExchange` pass runs after normal fragment assignment.
It walks each fragment's plan tree bottom-up, calling the polymorphic
`PlanNode.enforceAndDeriveLocalExchange()` on every node.  Nodes
declare what distribution they require of their children; the framework
inserts `LocalExchangeNode` where needed.

`LocalExchangeNode` represents intra-fragment data redistribution and
supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE,
BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST,
PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP.

The pass is guarded by `enable_local_shuffle_planner` (default true).
When disabled, BE continues to run its own `_plan_local_exchange` as
before, keeping the old path fully intact.

`maxPerBeInstances` (max pipeline instances assigned to any single BE)
is used instead of a global `instanceCount`.  Planning is a no-op when
`maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline
would cause task-count mismatches and pipeline starvation.

When a serial operator (e.g. OlapScanNode with a single tablet bucket)
feeds a non-serial parent without an intermediate LE, downstream tasks
starve waiting for data that never arrives.  The framework detects this
case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly
matching BE's `required_data_distribution()` serial → PASSTHROUGH rule.

`LocalExchangeTypeRequire` abstracts two strategies:
- `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE`
  (safe for intra-fragment hash partitioning).
- `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE /
  GLOBAL_EXECUTION_HASH_SHUFFLE without degradation.

PR apache#62438 added `enable_local_exchange_before_agg`, but its BE guard
`!_needs_finalize && !enable_local_exchange_before_agg → base` conflated
two semantically different cases in AggSink and DistinctStreamingAgg:

- **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg
  (performance-only) and FIRST_MERGE dedup (correctness-critical).
  The flag-gated early-return wrongly skipped HASH for FIRST_MERGE,
  producing PASSTHROUGH-over-serial-child → wrong aggregation results.

- **DistinctStreamingAgg**: `!finalize` covered both streaming preagg
  (`useStreamingPreagg=true`, performance) and non-streaming dedup
  (`useStreamingPreagg=false`, correctness).  Same class of bug.

FE fix:
- AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL
  phases.  FIRST_MERGE always emits HASH regardless of the flag.
- DistinctStreamingAgg: restrict to `useStreamingPreagg=true`.
  Non-streaming dedup always emits HASH.

Also add `requiresShuffleForCorrectness()` to mirror BE's
`is_shuffled_operator()`, so SetOperationNode propagates the
"downstream depends on hash" flag correctly instead of using the
coarser `parentRequire.preferType().isHashShuffle()` check that
over-inserted HASH LE on every union branch under a streaming preagg.

These fixes reduce FE/BE consistency mismatches from 8 to 3
(only pre-existing NLJ optimization differences remain).

- `enable_local_shuffle_planner` — use FE planner (default true)
- `enable_local_shuffle` — master switch for local shuffle
- `enable_local_exchange_before_agg` — HASH LE before non-final agg
  (default true, mirrors apache#62438)

`validateNoSerialWithoutLocalExchange()` walks the final plan tree and
logs a warning whenever a serial operator feeds a non-serial parent
without an intermediate LocalExchangeNode, catching planning gaps
before execution.

- `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns
  with the flag on and off; covers the FIRST_MERGE and
  DistinctStreamingAgg correctness fixes.
- `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL
  with `enable_local_shuffle_planner=true` and `=false` across the
  full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union,
  TableFunction, AssertNumRows, RQG-derived corner cases) and asserts
  result rows are identical.  Only data correctness is asserted — the
  two planners legitimately differ on the exact exchange counts/types
  they emit, so plan-shape equality is intentionally not checked.
- `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found
  crashes and wrong-result cases.
- `test_old_coordinator_local_shuffle.groovy` — verifies the old
  coordinator path is unaffected.
- `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join
  and aggregation plan shapes.

- `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in
  libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking.
- `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`)
  on macOS.
- `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` →
  `TLocalPartitionType::NOOP` after thrift enum rename.

This PR puts the FE planner in the driver's seat for LE insertion but
intentionally does NOT remove the BE-side machinery — readers should be
aware of three pieces the FE planner shares with or defers to BE:

1. **`is_serial_operator` is computed on both sides.**  FE computes the
   flag and writes it into Thrift, but BE's
   `OperatorBase::is_serial_operator()` is still overridden per operator
   in C++ and used for BE-side runtime decisions.  Any future change to
   the BE override needs to be mirrored on the FE side (and vice versa)
   to keep the planner's view consistent with execution.

2. **The legacy BE planner stays as a fallback.**
   `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved
   and gated by `runtime_state.h::plan_local_shuffle()`: when
   `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as
   before.  The two paths are mutually exclusive, never both running on
   the same query.

3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety
   net.**  The two propagation passes in
   `pipeline_fragment_context.cpp` fix up paired pipelines whose
   `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline
   splits leave a serial Exchange feeding an N-task sink).  FE's
   framework-level serial→non-serial fan-out (`enforceRequire` step 3)
   and the `validateNoSerialWithoutLocalExchange` check aim to make
   these mismatches impossible by construction, but the BE-side fixup
   remains as a defensive guard.

Co-authored-by: Gabriel <liwenqiang@selectdb.com>
… DSL

- Delete duplicate testAggFromScanUsesLocalExecutionHashShuffle
- Rewrite 8 substring-based tests as DSL shape assertions
- Add testUnionAllScanAndValues (Tier B from Trino)
- Add assertNoLocalExchangeOfType helper for negative checks
- Add nestedLoopJoin/partitionSort/olapScan() factories to PlanShape(Dsl)
…_multilevel_join_agg_local_shuffle

The local-shuffle FE planner inserts LocalExchange nodes after the Nereids
physical plan stage, so `explain shape plan` output is independent of
enable_local_shuffle_planner.  When the on/off variants did differ, the
diff was always driven by stats-sensitive rewrites (e.g. cost-based
InferSetOperatorDistinct), not by the planner mode itself — meaning the
shape check was effectively asserting stats stability across environments,
which it cannot guarantee.

Keep result-equality (qt_*_result_on / _result_off) and check_sql_equal
between planner modes; drop the shape assertions and the 72 _shape_on
blocks from the .out file.
…local-shuffle planner

FE-side local-shuffle planner wraps a serial fragment root with a PASSTHROUGH
LocalExchangeNode (AddLocalExchange#addLocalExchangeForFragment) so the data
sink can fan out across pipeline tasks.  That replaces `fragment.getPlanRoot()`
with a LocalExchangeNode wherever the original root was a serial FileScanNode.

InsertIntoTableCommand#applyInsertPlanStatistic was using
`fragment.getPlanRoot() instanceof FileScanNode` to find load-source scans, so
after the LE wrap the instanceof check fails, addLoadFileInfo is never called,
and LoadStatistic.fileNum / totalFileSizeB stay 0 even though the BE-side
scannedRows / loadBytes counters work normally.

Symptom: job_p0.streaming_job.test_streaming_insert_job fails with
  loadStat.fileNumber == 0 (expected 2) and loadStat.fileSize == 0 (expected 256)
while scannedRows / loadBytes are correct.

Fix: peel any LocalExchangeNode wrappers off the fragment root before the
instanceof check, then extract fileNum / totalFileSize from the underlying
FileScanNode as before.

Verified locally: INSERT INTO t SELECT * FROM LOCAL(...) shows FileNumber=1
FileSize=12 with the fix, FileNumber=0 FileSize=0 without it.
…rk it serial (DORIS-25865)

The FE local-shuffle planner used to insert a LocalExchangeNode directly
under RecursiveCteNode, which broke two RecursiveCte invariants:

1. ThriftPlansBuilder locates the recursive sender fragment via
   `recursiveCteNode.getChild(1).getChild(0).getFragment()`.  A wrapper LE
   between RecCte and the cross-fragment ExchangeNode shifts that path off
   the receiver and pulls the recursive producer fragment into
   `fragmentsToReset`, so BE rejects with
     [INTERNAL_ERROR]Fragment N contains a recursive CTE node
   from RecCTESourceOperatorX::prepare().

2. BE's RecCTESourceOperatorX::is_serial_operator() always returns true.
   RecursiveCteNode#isSerialNode() on the FE side defaulted to false, so
   the planner left the producer fragment with parallel=N sender pipelines
   even though only one instance actually emits data.  The downstream
   cross-fragment Exchange then waits forever on the N-1 silent senders.

Fix in RecursiveCteNode:
  - override isSerialNode() to return true so addLocalExchangeForFragment
    wraps the fragment root with PASSTHROUGH LE and fans the single
    producer out to N parallel sinks (mirrors BE-native behaviour);
  - override enforceAndDeriveLocalExchange to call children's own
    enforceAndDeriveLocalExchange directly, bypassing the framework's
    enforceRequire so no LE gets inserted between RecCte and its
    cross-fragment Exchange children — children's subtrees still get LE
    planning as normal.

Add regression test test_local_shuffle_recursive_cte covering the three
downstream consumer shapes the JIRA listed plus join / negative control:
  rec_cte_agg, rec_cte_window, rec_cte_grouping_sets, rec_cte_select,
  rec_cte_join.  Each is asserted to produce identical rows under
  enable_local_shuffle_planner=true vs =false.
Two regression checks were comparing actual rows against .out in a
specific order even though the SQL was order-insensitive at that point.
The FE local-shuffle planner can legitimately change row delivery order
within a fragment, exposing the latent assumption and failing the test.

- unnest_order_by_list_test.groovy: qt_window_function_order_by_unnested_value
  has a window RANK() over UNNEST without an outer ORDER BY.  Switch to
  order_qt_* (framework sorts the actual rows before comparing) and re-sort
  the corresponding .out block so the expected side is also in sorted order.

- test_python_udaf_complex.groovy: qt_json_array_agg → order_qt_json_array_agg.
  The query GROUP BY category already returns rows in alphabetical category
  order, so no .out change is needed.  Note: this only stabilises row order;
  the python UDAF's per-group array contents still depend on row arrival
  order inside each group, so a stricter pin (ORDER BY id in a subquery or
  array_sort around the agg) would still be needed if that variability
  resurfaces.
…E-planned LE

Two correctness issues in the FE-planned local-shuffle path, both surfaced
by single-tablet POOLING / share-scan fragments.

1. FE planner inserted LE(LOCAL_HASH) below a streaming partial agg with
   distributeExprLists = child table distribution (e.g. [id]) instead of
   grouping_exprs (e.g. [category]).  BE's AggSinkOperatorX /
   StreamingAggOperatorX::update_operator picks _partition_exprs =
   grouping_exprs when the chain is not followed_by_shuffled_operator —
   the common case for a streaming preagg at fragment root with only a
   cross-fragment HASH ExchangeSink above.  Using child distribution
   scattered same-group rows across N partial-agg instances, turning the
   preagg into a no-op and breaking row-arrival order at the downstream
   merge-finalize (manifests as non-deterministic group_concat /
   py_json_array_agg output, e.g. test_python_udaf_complex json_array_agg).

   Fix: add overridable PlanNode#getLocalExchangeDistributeExprs(childIndex,
   followedByShuffled) defaulting to the child's distribution, and override
   it on AggregationNode to mirror BE's update_operator: use child
   distribution only when (followedByShuffled || hasDistinct); otherwise
   use grouping_exprs.

2. BE _create_deferred_local_exchangers used sender_count =
   upstream_pipe->num_tasks() with no max(_, _num_instances).  When the
   upstream pipeline has a serial source (POOLING OlapScan, serial
   Exchange), num_tasks() stays at 1 and _propagate_local_exchange_num_tasks
   Pass 1 deliberately does not raise it, but the shared exchanger is
   shared across all _num_instances fragment instances on this BE — each
   instance closes once, so total close-count = _num_instances.  Initial 1
   minus _num_instances closes drove _running_sink_operators negative
   (e.g. -5 for 6 instances, -15 for 16), so the exchanger never reached
   "all senders done", downstream sources blocked on
   SHUFFLE_DATA_DEPENDENCY forever, and the query hung.  Fragments hold
   block references through the hang; on BE shutdown
   mem_tracker_limiter::~MemTrackerLimiter fired FATAL, aborting BE and
   producing the build-948971 "stop grace fail" — root case being
   dictionary_p0.test_dict_load_and_get_ip_trie's refresh dictionary
   running scan + LE(PASSTHROUGH) + cross-fragment DICTIONARY_SINK.

   Fix: mirror BE-planned _add_local_exchange_impl (~line 1023) which uses
   std::max(cur_pipe->num_tasks(), _num_instances).

Tests
- New LocalExchangePlannerTest#testStreamingAggHashShuffleUsesGroupingExprs:
  with t1 DISTRIBUTED BY HASH(k1) and SELECT k2, count(*) GROUP BY k2
  (k2 non-bucket -> two-phase agg), asserts the LE below the streaming
  partial agg carries [k2] (grouping key) not [k1] (child distribution).
  Verified failing pre-fix, passing post-fix.  Whole class (26 tests) green.
- Local cluster (output_test, 29030): group_concat probe stable 1,2,3,4,5
  across 20 runs after both fixes; matches BE-planner=false output.
The previous commit ef4ea66 raised _create_deferred_local_exchangers'
sender_count to std::max(upstream_pipe->num_tasks(), _num_instances) on
the theory that shared_state is shared across all _num_instances fragment
instances and each contributes one close.

That breaks the inverse case: when upstream_pipe legitimately has 1 task
(e.g. PASSTHROUGH below a serial scan inside a single fragment instance),
only 1 close actually happens, so the raised initial count never reaches
zero — downstream sources block on SHUFFLE_DATA_DEPENDENCY forever.
Surfaced by mtmv_up_down_job_p0.load.test_upgrade_downgrade_prepare_job_mtmv
(MV refresh hung at RUNNING in build 949402): plan dump shows LE id=5
PASSTHROUGH with _num_senders=6, _running_sink_operators=5 — only one
close arrived, exchanger never finalized.

Restore the original sender_count = info.upstream_pipe->num_tasks().
The dictionary-refresh "stop grace fail" diagnosis from the previous
commit was wrong about the close-count math; that hang has a different
root cause and needs separate investigation.
…mistakes

Two recent attempts to "fix" _create_deferred_local_exchangers' sender_count
have regressed CI:

  - ef4ea66 changed it to std::max(num_tasks, _num_instances) reasoning
    about build 948971's _running_sink_operators=-5 hang.  That broke MTMV
    refresh in build 949402 (LE id=5 PASSTHROUGH, _num_senders=6, only 1
    close arrived, hang).  Reverted in c848ecf.

Add a multi-line DANGER ZONE comment right above the assignment that
documents the formula, the two failed alternatives with the case names
that regressed, why the BE-planned _add_local_exchange_impl analog is
not actually analogous, and where to look first if a future hang has
_running_sink_operators < 0.

No code change — the comment is the patch.
…Plans

Dictionary refresh hangs because the FE-side planning inserted LE nodes
(via PhysicalDictionarySink special case in NereidsPlanner.distribute),
but EnvFactory.createCoordinator dispatched to legacy Coordinator, which
forces enable_local_shuffle_planner=false at initQueryOptions. BE then
runs its own _plan_local_exchange on a fragment that already has FE-
planned LEs, doubling pipeline 1 num_tasks (1 -> 7) and adding a new
pipeline. The FE-init exchanger was sized for sender_count=1, but 7 sink
LocalStates close it; _running_sink_operators goes negative, source side
short-circuits, dependent pipeline tasks never wake up, fragment never
finishes, and the dict stays in LOADING forever.

The mismatch is between two checks for "did FE plan distribute":
- Plan time (NereidsPlanner.distribute, line 704):
  SessionVariable.canUseNereidsDistributePlanner() OR PhysicalDictionarySink
  -> true for dict refresh -> addLocalExchange runs -> LE inserted
- Dispatch time (EnvFactory.createCoordinator, line 148/156):
  canUseNereidsDistributePlanner(context), which returns false when
  statementContext.parsedStatement is not a LogicalPlanAdapter — and dict
  refresh constructs StmtExecutor with a raw SQL string, leaving
  parsedStatement null -> dispatched to legacy Coordinator

Replace the dispatch-side session-var check with a direct read of
NereidsPlanner.getDistributedPlans(): non-empty iff FE actually ran
distribute. This aligns dispatch with plan-time reality without depending
on parsedStatement state, session var values, or sink type.

Apply the same fix to CloudEnvFactory.

Verified locally: test_dict_load_and_get_ip_trie was hanging at the first
refresh and reproducing the _running_sink_operators=-6 pattern; with this
fix the test passes and dict refresh correctly reports the expected
duplicate-CIDR error.
…epeat

The rollup REPEAT is not distribution-preserving (it NULLs grouping columns per
set and produces GROUPING_ID, which the downstream agg hashes on but which does
not exist below the repeat). Forwarding the parent HASH require down pushed the
local exchange before the 8x row expansion and hashed by the child's single
upstream shuffle key instead of the agg's grouping_exprs, collapsing rows onto
one instance (tpcds q67: +73%, 65s -> 113s).

Recurse with noRequire so the parent inserts its hash local exchange above the
repeat using its own grouping_exprs, mirroring BE whose RepeatOperatorX returns
a NOOP required_data_distribution.
…ibution behaviors

Add a PlanShapeDsl repeat() factory and two LocalExchangePlannerTest cases pinning
the two halves of the RepeatNode local-exchange fix:
- testRepeatNoRequireKeepsHashLocalExchangeAboveRepeat: with a pooling scan upstream,
  the hash LE is placed ABOVE the Repeat (Agg <- LE(LOCAL_HASH) <- LE(PT) <- Repeat
  <- scan). repeat() pins the Repeat position so the buggy below-repeat placement
  would fail the assertion.
- testRepeatReturnsChildDistributionSkipsRedundantHash: with a non-pooling colocate
  scan, the child BUCKET_HASH distribution propagates through the Repeat and satisfies
  the agg requirement, so no redundant LOCAL_HASH is inserted (returning NOOP would
  force one).
…ode for noRequire fix

The RepeatNode fix (stop forwarding the parent HASH require) changed this unit
test's expected behavior: RepeatNode now passes noRequire to its child (no hash LE
below the Repeat) and reports the child's own distribution (NOOP) instead of forcing
LOCAL_EXECUTION_HASH_SHUFFLE.  Assert the child receives NoRequire, the output type
is NOOP, and no LocalExchange is inserted below the Repeat.
### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Queries that evaluate complex VARIANT expressions after local exchange can share input blocks across downstream pipeline tasks. Variant casts and Variant serialization finalized the source column in-place, and nested-loop join lazy materialization also reused mutable probe-side columns while expanding rows. This could make local-shuffle query results unstable and could trigger lazy nested-loop join nullable-column size mismatches. This change finalizes private Variant copies for casts and serialization, deep-clones finalized Variant columns, and copies the lazy probe block before nested-loop join materialization appends rows.

### Release note

None

### Check List (For Author)

- Test:
    - ./run-be-ut.sh --run --filter=FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:NLJAppendProbeDataWithNullTest.*:ColumnVariantTest.clone_finalized_deep_copies_columns
    - ./run-be-ut.sh --run --filter=ColumnVariantTest.serialize_does_not_finalize_source_column
    - ./build.sh --be
    - Manual test: local 4BE repro query for nullable nested-loop join lazy materialization, 8 workers x 80 loops, 0 failures and stable output hash
    - Manual test: local 4BE RQG-shaped Variant/local-shuffle query, local shuffle off baseline vs 50 local shuffle on runs, 0 mismatches
- Behavior changed: No
- Does this need documentation: No
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@eldenmoon
Copy link
Copy Markdown
Member Author

Superseded by #64092, which contains only the clean fix commit.

@eldenmoon eldenmoon closed this Jun 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants