[fix](be) Avoid mutating shared local shuffle columns#64090
Closed
eldenmoon wants to merge 14 commits into
Closed
Conversation
Previously, local exchange (LE) nodes were inserted exclusively by the BE's `_plan_local_exchange` at pipeline build time. The FE had no visibility into which operators needed a fan-out or shuffle before execution, making it impossible to validate, optimize, or override LE decisions at planning time. This PR introduces a full FE-side local exchange planner that mirrors BE semantics, brings several correctness fixes, and leaves the legacy BE path fully intact behind a feature flag. See "Current architecture notes" at the bottom for what the FE planner does and does not own. A new `AddLocalExchange` pass runs after normal fragment assignment. It walks each fragment's plan tree bottom-up, calling the polymorphic `PlanNode.enforceAndDeriveLocalExchange()` on every node. Nodes declare what distribution they require of their children; the framework inserts `LocalExchangeNode` where needed. `LocalExchangeNode` represents intra-fragment data redistribution and supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP. The pass is guarded by `enable_local_shuffle_planner` (default true). When disabled, BE continues to run its own `_plan_local_exchange` as before, keeping the old path fully intact. `maxPerBeInstances` (max pipeline instances assigned to any single BE) is used instead of a global `instanceCount`. Planning is a no-op when `maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline would cause task-count mismatches and pipeline starvation. When a serial operator (e.g. OlapScanNode with a single tablet bucket) feeds a non-serial parent without an intermediate LE, downstream tasks starve waiting for data that never arrives. The framework detects this case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly matching BE's `required_data_distribution()` serial → PASSTHROUGH rule. `LocalExchangeTypeRequire` abstracts two strategies: - `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE` (safe for intra-fragment hash partitioning). - `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE / GLOBAL_EXECUTION_HASH_SHUFFLE without degradation. PR apache#62438 added `enable_local_exchange_before_agg`, but its BE guard `!_needs_finalize && !enable_local_exchange_before_agg → base` conflated two semantically different cases in AggSink and DistinctStreamingAgg: - **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg (performance-only) and FIRST_MERGE dedup (correctness-critical). The flag-gated early-return wrongly skipped HASH for FIRST_MERGE, producing PASSTHROUGH-over-serial-child → wrong aggregation results. - **DistinctStreamingAgg**: `!finalize` covered both streaming preagg (`useStreamingPreagg=true`, performance) and non-streaming dedup (`useStreamingPreagg=false`, correctness). Same class of bug. FE fix: - AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL phases. FIRST_MERGE always emits HASH regardless of the flag. - DistinctStreamingAgg: restrict to `useStreamingPreagg=true`. Non-streaming dedup always emits HASH. Also add `requiresShuffleForCorrectness()` to mirror BE's `is_shuffled_operator()`, so SetOperationNode propagates the "downstream depends on hash" flag correctly instead of using the coarser `parentRequire.preferType().isHashShuffle()` check that over-inserted HASH LE on every union branch under a streaming preagg. These fixes reduce FE/BE consistency mismatches from 8 to 3 (only pre-existing NLJ optimization differences remain). - `enable_local_shuffle_planner` — use FE planner (default true) - `enable_local_shuffle` — master switch for local shuffle - `enable_local_exchange_before_agg` — HASH LE before non-final agg (default true, mirrors apache#62438) `validateNoSerialWithoutLocalExchange()` walks the final plan tree and logs a warning whenever a serial operator feeds a non-serial parent without an intermediate LocalExchangeNode, catching planning gaps before execution. - `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns with the flag on and off; covers the FIRST_MERGE and DistinctStreamingAgg correctness fixes. - `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL with `enable_local_shuffle_planner=true` and `=false` across the full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union, TableFunction, AssertNumRows, RQG-derived corner cases) and asserts result rows are identical. Only data correctness is asserted — the two planners legitimately differ on the exact exchange counts/types they emit, so plan-shape equality is intentionally not checked. - `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found crashes and wrong-result cases. - `test_old_coordinator_local_shuffle.groovy` — verifies the old coordinator path is unaffected. - `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join and aggregation plan shapes. - `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking. - `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`) on macOS. - `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` → `TLocalPartitionType::NOOP` after thrift enum rename. This PR puts the FE planner in the driver's seat for LE insertion but intentionally does NOT remove the BE-side machinery — readers should be aware of three pieces the FE planner shares with or defers to BE: 1. **`is_serial_operator` is computed on both sides.** FE computes the flag and writes it into Thrift, but BE's `OperatorBase::is_serial_operator()` is still overridden per operator in C++ and used for BE-side runtime decisions. Any future change to the BE override needs to be mirrored on the FE side (and vice versa) to keep the planner's view consistent with execution. 2. **The legacy BE planner stays as a fallback.** `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved and gated by `runtime_state.h::plan_local_shuffle()`: when `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as before. The two paths are mutually exclusive, never both running on the same query. 3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety net.** The two propagation passes in `pipeline_fragment_context.cpp` fix up paired pipelines whose `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline splits leave a serial Exchange feeding an N-task sink). FE's framework-level serial→non-serial fan-out (`enforceRequire` step 3) and the `validateNoSerialWithoutLocalExchange` check aim to make these mismatches impossible by construction, but the BE-side fixup remains as a defensive guard. Co-authored-by: Gabriel <liwenqiang@selectdb.com>
… DSL - Delete duplicate testAggFromScanUsesLocalExecutionHashShuffle - Rewrite 8 substring-based tests as DSL shape assertions - Add testUnionAllScanAndValues (Tier B from Trino) - Add assertNoLocalExchangeOfType helper for negative checks - Add nestedLoopJoin/partitionSort/olapScan() factories to PlanShape(Dsl)
…_multilevel_join_agg_local_shuffle The local-shuffle FE planner inserts LocalExchange nodes after the Nereids physical plan stage, so `explain shape plan` output is independent of enable_local_shuffle_planner. When the on/off variants did differ, the diff was always driven by stats-sensitive rewrites (e.g. cost-based InferSetOperatorDistinct), not by the planner mode itself — meaning the shape check was effectively asserting stats stability across environments, which it cannot guarantee. Keep result-equality (qt_*_result_on / _result_off) and check_sql_equal between planner modes; drop the shape assertions and the 72 _shape_on blocks from the .out file.
…local-shuffle planner FE-side local-shuffle planner wraps a serial fragment root with a PASSTHROUGH LocalExchangeNode (AddLocalExchange#addLocalExchangeForFragment) so the data sink can fan out across pipeline tasks. That replaces `fragment.getPlanRoot()` with a LocalExchangeNode wherever the original root was a serial FileScanNode. InsertIntoTableCommand#applyInsertPlanStatistic was using `fragment.getPlanRoot() instanceof FileScanNode` to find load-source scans, so after the LE wrap the instanceof check fails, addLoadFileInfo is never called, and LoadStatistic.fileNum / totalFileSizeB stay 0 even though the BE-side scannedRows / loadBytes counters work normally. Symptom: job_p0.streaming_job.test_streaming_insert_job fails with loadStat.fileNumber == 0 (expected 2) and loadStat.fileSize == 0 (expected 256) while scannedRows / loadBytes are correct. Fix: peel any LocalExchangeNode wrappers off the fragment root before the instanceof check, then extract fileNum / totalFileSize from the underlying FileScanNode as before. Verified locally: INSERT INTO t SELECT * FROM LOCAL(...) shows FileNumber=1 FileSize=12 with the fix, FileNumber=0 FileSize=0 without it.
…rk it serial (DORIS-25865)
The FE local-shuffle planner used to insert a LocalExchangeNode directly
under RecursiveCteNode, which broke two RecursiveCte invariants:
1. ThriftPlansBuilder locates the recursive sender fragment via
`recursiveCteNode.getChild(1).getChild(0).getFragment()`. A wrapper LE
between RecCte and the cross-fragment ExchangeNode shifts that path off
the receiver and pulls the recursive producer fragment into
`fragmentsToReset`, so BE rejects with
[INTERNAL_ERROR]Fragment N contains a recursive CTE node
from RecCTESourceOperatorX::prepare().
2. BE's RecCTESourceOperatorX::is_serial_operator() always returns true.
RecursiveCteNode#isSerialNode() on the FE side defaulted to false, so
the planner left the producer fragment with parallel=N sender pipelines
even though only one instance actually emits data. The downstream
cross-fragment Exchange then waits forever on the N-1 silent senders.
Fix in RecursiveCteNode:
- override isSerialNode() to return true so addLocalExchangeForFragment
wraps the fragment root with PASSTHROUGH LE and fans the single
producer out to N parallel sinks (mirrors BE-native behaviour);
- override enforceAndDeriveLocalExchange to call children's own
enforceAndDeriveLocalExchange directly, bypassing the framework's
enforceRequire so no LE gets inserted between RecCte and its
cross-fragment Exchange children — children's subtrees still get LE
planning as normal.
Add regression test test_local_shuffle_recursive_cte covering the three
downstream consumer shapes the JIRA listed plus join / negative control:
rec_cte_agg, rec_cte_window, rec_cte_grouping_sets, rec_cte_select,
rec_cte_join. Each is asserted to produce identical rows under
enable_local_shuffle_planner=true vs =false.
Two regression checks were comparing actual rows against .out in a specific order even though the SQL was order-insensitive at that point. The FE local-shuffle planner can legitimately change row delivery order within a fragment, exposing the latent assumption and failing the test. - unnest_order_by_list_test.groovy: qt_window_function_order_by_unnested_value has a window RANK() over UNNEST without an outer ORDER BY. Switch to order_qt_* (framework sorts the actual rows before comparing) and re-sort the corresponding .out block so the expected side is also in sorted order. - test_python_udaf_complex.groovy: qt_json_array_agg → order_qt_json_array_agg. The query GROUP BY category already returns rows in alphabetical category order, so no .out change is needed. Note: this only stabilises row order; the python UDAF's per-group array contents still depend on row arrival order inside each group, so a stricter pin (ORDER BY id in a subquery or array_sort around the agg) would still be needed if that variability resurfaces.
…E-planned LE Two correctness issues in the FE-planned local-shuffle path, both surfaced by single-tablet POOLING / share-scan fragments. 1. FE planner inserted LE(LOCAL_HASH) below a streaming partial agg with distributeExprLists = child table distribution (e.g. [id]) instead of grouping_exprs (e.g. [category]). BE's AggSinkOperatorX / StreamingAggOperatorX::update_operator picks _partition_exprs = grouping_exprs when the chain is not followed_by_shuffled_operator — the common case for a streaming preagg at fragment root with only a cross-fragment HASH ExchangeSink above. Using child distribution scattered same-group rows across N partial-agg instances, turning the preagg into a no-op and breaking row-arrival order at the downstream merge-finalize (manifests as non-deterministic group_concat / py_json_array_agg output, e.g. test_python_udaf_complex json_array_agg). Fix: add overridable PlanNode#getLocalExchangeDistributeExprs(childIndex, followedByShuffled) defaulting to the child's distribution, and override it on AggregationNode to mirror BE's update_operator: use child distribution only when (followedByShuffled || hasDistinct); otherwise use grouping_exprs. 2. BE _create_deferred_local_exchangers used sender_count = upstream_pipe->num_tasks() with no max(_, _num_instances). When the upstream pipeline has a serial source (POOLING OlapScan, serial Exchange), num_tasks() stays at 1 and _propagate_local_exchange_num_tasks Pass 1 deliberately does not raise it, but the shared exchanger is shared across all _num_instances fragment instances on this BE — each instance closes once, so total close-count = _num_instances. Initial 1 minus _num_instances closes drove _running_sink_operators negative (e.g. -5 for 6 instances, -15 for 16), so the exchanger never reached "all senders done", downstream sources blocked on SHUFFLE_DATA_DEPENDENCY forever, and the query hung. Fragments hold block references through the hang; on BE shutdown mem_tracker_limiter::~MemTrackerLimiter fired FATAL, aborting BE and producing the build-948971 "stop grace fail" — root case being dictionary_p0.test_dict_load_and_get_ip_trie's refresh dictionary running scan + LE(PASSTHROUGH) + cross-fragment DICTIONARY_SINK. Fix: mirror BE-planned _add_local_exchange_impl (~line 1023) which uses std::max(cur_pipe->num_tasks(), _num_instances). Tests - New LocalExchangePlannerTest#testStreamingAggHashShuffleUsesGroupingExprs: with t1 DISTRIBUTED BY HASH(k1) and SELECT k2, count(*) GROUP BY k2 (k2 non-bucket -> two-phase agg), asserts the LE below the streaming partial agg carries [k2] (grouping key) not [k1] (child distribution). Verified failing pre-fix, passing post-fix. Whole class (26 tests) green. - Local cluster (output_test, 29030): group_concat probe stable 1,2,3,4,5 across 20 runs after both fixes; matches BE-planner=false output.
The previous commit ef4ea66 raised _create_deferred_local_exchangers' sender_count to std::max(upstream_pipe->num_tasks(), _num_instances) on the theory that shared_state is shared across all _num_instances fragment instances and each contributes one close. That breaks the inverse case: when upstream_pipe legitimately has 1 task (e.g. PASSTHROUGH below a serial scan inside a single fragment instance), only 1 close actually happens, so the raised initial count never reaches zero — downstream sources block on SHUFFLE_DATA_DEPENDENCY forever. Surfaced by mtmv_up_down_job_p0.load.test_upgrade_downgrade_prepare_job_mtmv (MV refresh hung at RUNNING in build 949402): plan dump shows LE id=5 PASSTHROUGH with _num_senders=6, _running_sink_operators=5 — only one close arrived, exchanger never finalized. Restore the original sender_count = info.upstream_pipe->num_tasks(). The dictionary-refresh "stop grace fail" diagnosis from the previous commit was wrong about the close-count math; that hang has a different root cause and needs separate investigation.
…mistakes Two recent attempts to "fix" _create_deferred_local_exchangers' sender_count have regressed CI: - ef4ea66 changed it to std::max(num_tasks, _num_instances) reasoning about build 948971's _running_sink_operators=-5 hang. That broke MTMV refresh in build 949402 (LE id=5 PASSTHROUGH, _num_senders=6, only 1 close arrived, hang). Reverted in c848ecf. Add a multi-line DANGER ZONE comment right above the assignment that documents the formula, the two failed alternatives with the case names that regressed, why the BE-planned _add_local_exchange_impl analog is not actually analogous, and where to look first if a future hang has _running_sink_operators < 0. No code change — the comment is the patch.
…Plans Dictionary refresh hangs because the FE-side planning inserted LE nodes (via PhysicalDictionarySink special case in NereidsPlanner.distribute), but EnvFactory.createCoordinator dispatched to legacy Coordinator, which forces enable_local_shuffle_planner=false at initQueryOptions. BE then runs its own _plan_local_exchange on a fragment that already has FE- planned LEs, doubling pipeline 1 num_tasks (1 -> 7) and adding a new pipeline. The FE-init exchanger was sized for sender_count=1, but 7 sink LocalStates close it; _running_sink_operators goes negative, source side short-circuits, dependent pipeline tasks never wake up, fragment never finishes, and the dict stays in LOADING forever. The mismatch is between two checks for "did FE plan distribute": - Plan time (NereidsPlanner.distribute, line 704): SessionVariable.canUseNereidsDistributePlanner() OR PhysicalDictionarySink -> true for dict refresh -> addLocalExchange runs -> LE inserted - Dispatch time (EnvFactory.createCoordinator, line 148/156): canUseNereidsDistributePlanner(context), which returns false when statementContext.parsedStatement is not a LogicalPlanAdapter — and dict refresh constructs StmtExecutor with a raw SQL string, leaving parsedStatement null -> dispatched to legacy Coordinator Replace the dispatch-side session-var check with a direct read of NereidsPlanner.getDistributedPlans(): non-empty iff FE actually ran distribute. This aligns dispatch with plan-time reality without depending on parsedStatement state, session var values, or sink type. Apply the same fix to CloudEnvFactory. Verified locally: test_dict_load_and_get_ip_trie was hanging at the first refresh and reproducing the _running_sink_operators=-6 pattern; with this fix the test passes and dict refresh correctly reports the expected duplicate-CIDR error.
…epeat The rollup REPEAT is not distribution-preserving (it NULLs grouping columns per set and produces GROUPING_ID, which the downstream agg hashes on but which does not exist below the repeat). Forwarding the parent HASH require down pushed the local exchange before the 8x row expansion and hashed by the child's single upstream shuffle key instead of the agg's grouping_exprs, collapsing rows onto one instance (tpcds q67: +73%, 65s -> 113s). Recurse with noRequire so the parent inserts its hash local exchange above the repeat using its own grouping_exprs, mirroring BE whose RepeatOperatorX returns a NOOP required_data_distribution.
…ibution behaviors Add a PlanShapeDsl repeat() factory and two LocalExchangePlannerTest cases pinning the two halves of the RepeatNode local-exchange fix: - testRepeatNoRequireKeepsHashLocalExchangeAboveRepeat: with a pooling scan upstream, the hash LE is placed ABOVE the Repeat (Agg <- LE(LOCAL_HASH) <- LE(PT) <- Repeat <- scan). repeat() pins the Repeat position so the buggy below-repeat placement would fail the assertion. - testRepeatReturnsChildDistributionSkipsRedundantHash: with a non-pooling colocate scan, the child BUCKET_HASH distribution propagates through the Repeat and satisfies the agg requirement, so no redundant LOCAL_HASH is inserted (returning NOOP would force one).
…ode for noRequire fix The RepeatNode fix (stop forwarding the parent HASH require) changed this unit test's expected behavior: RepeatNode now passes noRequire to its child (no hash LE below the Repeat) and reports the child's own distribution (NOOP) instead of forcing LOCAL_EXECUTION_HASH_SHUFFLE. Assert the child receives NoRequire, the output type is NOOP, and no LocalExchange is inserted below the Repeat.
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Queries that evaluate complex VARIANT expressions after local exchange can share input blocks across downstream pipeline tasks. Variant casts and Variant serialization finalized the source column in-place, and nested-loop join lazy materialization also reused mutable probe-side columns while expanding rows. This could make local-shuffle query results unstable and could trigger lazy nested-loop join nullable-column size mismatches. This change finalizes private Variant copies for casts and serialization, deep-clones finalized Variant columns, and copies the lazy probe block before nested-loop join materialization appends rows.
### Release note
None
### Check List (For Author)
- Test:
- ./run-be-ut.sh --run --filter=FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:NLJAppendProbeDataWithNullTest.*:ColumnVariantTest.clone_finalized_deep_copies_columns
- ./run-be-ut.sh --run --filter=ColumnVariantTest.serialize_does_not_finalize_source_column
- ./build.sh --be
- Manual test: local 4BE repro query for nullable nested-loop join lazy materialization, 8 workers x 80 loops, 0 failures and stable output hash
- Manual test: local 4BE RQG-shaped Variant/local-shuffle query, local shuffle off baseline vs 50 local shuffle on runs, 0 mismatches
- Behavior changed: No
- Does this need documentation: No
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
Member
Author
|
Superseded by #64092, which contains only the clean fix commit. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Queries that evaluate complex VARIANT expressions after local exchange can share input blocks across downstream pipeline tasks. Variant casts and Variant serialization finalized the source column in-place, and nested-loop join lazy materialization also reused mutable probe-side columns while expanding rows. This could make local-shuffle query results unstable and could trigger lazy nested-loop join nullable-column size mismatches. This change finalizes private Variant copies for casts and serialization, deep-clones finalized Variant columns, and copies the lazy probe block before nested-loop join materialization appends rows.
Release note
None
Check List (For Author)
./run-be-ut.sh --run --filter=FunctionVariantCast.CastFromVariantDoesNotFinalizeSourceColumn:NLJAppendProbeDataWithNullTest.*:ColumnVariantTest.clone_finalized_deep_copies_columns./run-be-ut.sh --run --filter=ColumnVariantTest.serialize_does_not_finalize_source_column./build.sh --be