[WIP] columnar: Producer-Consumer Pipeline Read Model#10904
[WIP] columnar: Producer-Consumer Pipeline Read Model#10904JaySon-Huang wants to merge 1 commit into
Conversation
Signed-off-by: JaySon-Huang <tshent@qq.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughIntroduces a producer-consumer pipeline for disaggregated columnar reads under ChangesColumnar Producer-Consumer Pipeline
Sequence Diagram(s)sequenceDiagram
participant PipelineBuilder as StorageDisaggregated
participant ColumnarReadSourceOp as ColumnarReadSourceOp (IO)
participant PrefetchColumnarReaderTask as PrefetchTask (IO pool)
participant RNColumnarReaderWork as ReaderWork
participant SharedQueue as SharedQueue
participant RNColumnarSourceOp as RNColumnarSourceOp (CPU)
rect rgba(100, 150, 220, 0.5)
note over PipelineBuilder: Pipeline construction
PipelineBuilder->>ColumnarReadSourceOp: add N producers
PipelineBuilder->>SharedQueue: create bounded queue
PipelineBuilder->>RNColumnarSourceOp: add consumer
end
rect rgba(220, 140, 60, 0.5)
note over ColumnarReadSourceOp: executeIOImpl — reader materialization
ColumnarReadSourceOp->>PrefetchColumnarReaderTask: submit to IO pool (startAsyncMaterializeReader)
PrefetchColumnarReaderTask->>RNColumnarReaderWork: createColumnarReaderWithBackoff → state=Ready
PrefetchColumnarReaderTask->>RNColumnarReaderWork: notify_future.notifyAll()
ColumnarReadSourceOp->>RNColumnarReaderWork: awaitImpl sees Ready, consumeReadyReader
ColumnarReadSourceOp->>SharedQueue: push blocks via SharedQueueSinkOp
end
rect rgba(60, 180, 100, 0.5)
note over RNColumnarSourceOp: readImpl — queue consumption
RNColumnarSourceOp->>SharedQueue: tryPop(block)
alt READY
SharedQueue-->>RNColumnarSourceOp: HAS_OUTPUT
else EMPTY
SharedQueue-->>RNColumnarSourceOp: WAIT_FOR_NOTIFY
else FINISHED
SharedQueue-->>RNColumnarSourceOp: HAS_OUTPUT (empty block = EOF)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dbms/src/Storages/StorageDisaggregatedColumnar.cpp (1)
1231-1247:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winDo not transition to
Creatingwhen no pipeline executor is available.Line 1235 sets
reader_work->state = Creatingbefore Line 1246 checksexec_context. In stream mode (exec_context == nullptr), this leaves the work stuck inCreatingwith no scheduled task, and subsequentgetOrCreateReader()can block forever.Proposed fix
void RNColumnarReadTask::prefetchReaderWork(const RNColumnarReaderWorkPtr & reader_work) { RUNTIME_CHECK(reader_work != nullptr); + // Stream path has no pipeline scheduler; keep work in NotStarted so inline creation can proceed. + if (exec_context == nullptr) + return; + { auto guard = std::lock_guard(reader_work->mutex); if (reader_work->state != RNColumnarReaderMaterializeState::NotStarted) return; reader_work->state = RNColumnarReaderMaterializeState::Creating; } const auto region_id = reader_work->plan.region_id;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@dbms/src/Storages/StorageDisaggregatedColumnar.cpp` around lines 1231 - 1247, The state transition to Creating occurs before checking whether exec_context is available. In stream mode where exec_context is nullptr, this leaves the work stuck in Creating state with no scheduled task to complete it. Move the exec_context nullptr check to occur before the state transition to Creating (before line 1235 where reader_work->state = RNColumnarReaderMaterializeState::Creating is set), so that the function returns early without changing state when there is no pipeline executor available.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@dbms/src/Storages/Columnar/ColumnarReadSourceOp.cpp`:
- Around line 176-190: The issue is that both NotStarted and Creating states are
triggering inline materialization, causing concurrent reader creation in
prefetch tasks and source IO paths. Fix this by only materializing when the
state is NotStarted, not when it's already Creating (which indicates
materialization is already in progress). In the switch statement around line 177
in ColumnarReadSourceOp.cpp, remove the Creating case from triggering
should_materialize or only set should_materialize = true for the NotStarted
case. Apply the same logic fix at the sibling location around lines 227-236 to
ensure Creating state is not treated as a trigger for inline materialization at
any point in the code.
In `@dbms/src/Storages/DeltaMerge/ScanContext.h`:
- Line 51: The dm_io_seek_count atomic counter member variable is defined in the
ScanContext class but is not integrated into the serialization and aggregation
operations, causing loss of I/O seek instrumentation data in distributed
queries. Add handling for dm_io_seek_count in the deserialize() method (around
line 180) to read the value from the tipb::TiFlashScanContext protobuf message,
in the serialize() method (around line 269) to write the value to the protobuf
message, in the merge(const ScanContext&) overload (around line 359) to
aggregate counters from another ScanContext instance, and in the merge(const
tipb::TiFlashScanContext&) overload (around line 454) to aggregate from a
protobuf message. Additionally, verify that the protobuf definition for
tipb::TiFlashScanContext includes a dm_io_seek_count field; if it does not
exist, update the .proto file to add this field before implementing the C++
changes.
---
Outside diff comments:
In `@dbms/src/Storages/StorageDisaggregatedColumnar.cpp`:
- Around line 1231-1247: The state transition to Creating occurs before checking
whether exec_context is available. In stream mode where exec_context is nullptr,
this leaves the work stuck in Creating state with no scheduled task to complete
it. Move the exec_context nullptr check to occur before the state transition to
Creating (before line 1235 where reader_work->state =
RNColumnarReaderMaterializeState::Creating is set), so that the function returns
early without changing state when there is no pipeline executor available.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: cdb4af6f-e15d-48cc-8056-f6c6a72161f6
📒 Files selected for processing (16)
.gitignoredbms/CMakeLists.txtdbms/src/Storages/Columnar/ColumnarReadSourceOp.cppdbms/src/Storages/Columnar/ColumnarReadSourceOp.hdbms/src/Storages/Columnar/ColumnarSourceOp.cppdbms/src/Storages/Columnar/ColumnarSourceOp.hdbms/src/Storages/Columnar/PrefetchColumnarReaderTask.cppdbms/src/Storages/Columnar/PrefetchColumnarReaderTask.hdbms/src/Storages/DeltaMerge/File/DMFileReader.cppdbms/src/Storages/DeltaMerge/ScanContext.cppdbms/src/Storages/DeltaMerge/ScanContext.hdbms/src/Storages/StorageDisaggregatedColumnar.cppdbms/src/Storages/StorageDisaggregatedColumnar.hdbms/src/Storages/tests/gtest_storage_disaggregated_columnar.cppdocs/design/2026-06-13-columnar-pipeline-producer-consumer-model.mdtests/docker/next-gen-utils/Makefile
| case RNColumnarReaderMaterializeState::NotStarted: | ||
| case RNColumnarReaderMaterializeState::Creating: | ||
| current_reader_work->state = RNColumnarReaderMaterializeState::Creating; | ||
| should_materialize = true; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (taken_reader.has_value()) | ||
| { | ||
| consumeReadyReader(std::move(taken_reader.value())); | ||
| return OperatorStatus::IO_IN; | ||
| } | ||
| if (should_materialize) | ||
| return OperatorStatus::IO_IN; |
There was a problem hiding this comment.
Avoid double materialization when a work item is already Creating.
In Line 177 and Line 227, Creating is handled as “materialize inline”, so a prefetched work can be created twice concurrently (prefetch task + source IO path). This duplicates remote reader creation and can drop the losing reader result.
Suggested direction
// In awaitImpl(), NEED_READER branch:
- case RNColumnarReaderMaterializeState::NotStarted:
- case RNColumnarReaderMaterializeState::Creating:
- current_reader_work->state = RNColumnarReaderMaterializeState::Creating;
- should_materialize = true;
- break;
+ case RNColumnarReaderMaterializeState::NotStarted:
+ current_reader_work->state = RNColumnarReaderMaterializeState::Creating;
+ should_materialize = true;
+ break;
+ case RNColumnarReaderMaterializeState::Creating:
+ state = ColumnarReadSourceState::WAIT_READER;
+ setNotifyFuture(¤t_reader_work->notify_future);
+ return OperatorStatus::WAIT_FOR_NOTIFY;// In executeIOImpl(), NEED_READER/WAIT_READER branch:
// only inline-create when this operator owns the NotStarted -> Creating transition.Also applies to: 227-236
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@dbms/src/Storages/Columnar/ColumnarReadSourceOp.cpp` around lines 176 - 190,
The issue is that both NotStarted and Creating states are triggering inline
materialization, causing concurrent reader creation in prefetch tasks and source
IO paths. Fix this by only materializing when the state is NotStarted, not when
it's already Creating (which indicates materialization is already in progress).
In the switch statement around line 177 in ColumnarReadSourceOp.cpp, remove the
Creating case from triggering should_materialize or only set should_materialize
= true for the NotStarted case. Apply the same logic fix at the sibling location
around lines 227-236 to ensure Creating state is not treated as a trigger for
inline materialization at any point in the code.
| std::atomic<uint64_t> dmfile_lm_filter_scanned_rows{0}; | ||
| std::atomic<uint64_t> dmfile_lm_filter_skipped_rows{0}; | ||
| std::atomic<uint64_t> total_dmfile_read_time_ns{0}; | ||
| std::atomic<uint64_t> dm_io_seek_count{0}; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify protobuf contract for dm_io_seek_count
# Look for the .proto definition file and check if dm_io_seek_count field exists
# Search for .proto files that define TiFlashScanContext
find . -name "*.proto" -type f | xargs grep -l "TiFlashScanContext" 2>/dev/null
# Then check if dm_io_seek_count is defined in those files
fd -e proto | xargs grep -n "dm_io_seek_count" 2>/dev/nullRepository: pingcap/tiflash
Length of output: 43
🏁 Script executed:
#!/bin/bash
# First, locate the ScanContext files
find . -name "ScanContext.*" -type fRepository: pingcap/tiflash
Length of output: 151
🏁 Script executed:
#!/bin/bash
# Find .proto files that might define TiFlashScanContext
find . -name "*.proto" -type f | head -20Repository: pingcap/tiflash
Length of output: 1045
🏁 Script executed:
#!/bin/bash
# Search for TiFlashScanContext in the entire codebase
rg "TiFlashScanContext" --type cpp --type h -lRepository: pingcap/tiflash
Length of output: 147
🏁 Script executed:
#!/bin/bash
# Search for TiFlashScanContext in .proto files
rg "TiFlashScanContext" --type protoRepository: pingcap/tiflash
Length of output: 89
🏁 Script executed:
#!/bin/bash
# Read the ScanContext.h file to check line 51 and the serialize/deserialize/merge methods
wc -l ./dbms/src/Storages/DeltaMerge/ScanContext.hRepository: pingcap/tiflash
Length of output: 108
🏁 Script executed:
#!/bin/bash
# Look for proto files that might contain ScanContext
find . -name "*.proto" -type f -exec grep -l "ScanContext\|scan_context" {} \;Repository: pingcap/tiflash
Length of output: 41
🏁 Script executed:
#!/bin/bash
# Read the ScanContext.h file - focus on the key sections
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | head -100Repository: pingcap/tiflash
Length of output: 4890
🏁 Script executed:
#!/bin/bash
# Check around line 51 where the new member is added
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '40,70p'Repository: pingcap/tiflash
Length of output: 1742
🏁 Script executed:
#!/bin/bash
# Check the serialize/deserialize methods around the mentioned lines
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '170,190p'Repository: pingcap/tiflash
Length of output: 1404
🏁 Script executed:
#!/bin/bash
# Check the serialize method around line 269
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '260,290p'Repository: pingcap/tiflash
Length of output: 2579
🏁 Script executed:
#!/bin/bash
# Check the merge methods around lines 359 and 454
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '350,370p'Repository: pingcap/tiflash
Length of output: 1460
🏁 Script executed:
#!/bin/bash
# Check the second merge method around line 454
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '445,465p'Repository: pingcap/tiflash
Length of output: 1547
🏁 Script executed:
#!/bin/bash
# Search for protobuf definitions in tipb directory
find . -name "*.proto" -path "*/tipb/*" -type fRepository: pingcap/tiflash
Length of output: 41
🏁 Script executed:
#!/bin/bash
# Search for tipb proto files more broadly
find . -name "executor.proto" -o -name "tipb.proto" -o -name "*tipb*.proto" 2>/dev/null | head -20Repository: pingcap/tiflash
Length of output: 41
🏁 Script executed:
#!/bin/bash
# Check if tipb is an external dependency - search for it in includes
grep -r "tipb/executor" --include="*.h" --include="*.hpp" | head -5Repository: pingcap/tiflash
Length of output: 442
🏁 Script executed:
#!/bin/bash
# Check the entire deserialize method to see all fields being deserialized
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '180,268p'Repository: pingcap/tiflash
Length of output: 7267
🏁 Script executed:
#!/bin/bash
# Check the entire serialize method to see all fields being serialized
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '269,357p'Repository: pingcap/tiflash
Length of output: 7396
🏁 Script executed:
#!/bin/bash
# Get the complete merge(const ScanContext&) method
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '359,453p'Repository: pingcap/tiflash
Length of output: 5988
🏁 Script executed:
#!/bin/bash
# Get the complete merge(const tipb::TiFlashScanContext&) method
cat -n ./dbms/src/Storages/DeltaMerge/ScanContext.h | sed -n '454,584p'Repository: pingcap/tiflash
Length of output: 8409
dm_io_seek_count member not wired into serialize/deserialize/merge operations.
The new dm_io_seek_count counter added to the header is missing from deserialize(), serialize(), and both merge() methods. This prevents the counter from being transmitted between instances, aggregated in distributed queries, or restored from remote snapshots, resulting in loss of I/O seek instrumentation data.
Add dm_io_seek_count handling to:
deserialize()(line 180): deserialize fromtipb::TiFlashScanContextserialize()(line 269): serialize totipb::TiFlashScanContextmerge(const ScanContext&)(line 359): aggregate the countermerge(const tipb::TiFlashScanContext&)(line 454): aggregate from proto
Also verify that the protobuf definition (tipb::TiFlashScanContext) includes a dm_io_seek_count field. If not, the .proto file must be updated.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@dbms/src/Storages/DeltaMerge/ScanContext.h` at line 51, The dm_io_seek_count
atomic counter member variable is defined in the ScanContext class but is not
integrated into the serialization and aggregation operations, causing loss of
I/O seek instrumentation data in distributed queries. Add handling for
dm_io_seek_count in the deserialize() method (around line 180) to read the value
from the tipb::TiFlashScanContext protobuf message, in the serialize() method
(around line 269) to write the value to the protobuf message, in the merge(const
ScanContext&) overload (around line 359) to aggregate counters from another
ScanContext instance, and in the merge(const tipb::TiFlashScanContext&) overload
(around line 454) to aggregate from a protobuf message. Additionally, verify
that the protobuf definition for tipb::TiFlashScanContext includes a
dm_io_seek_count field; if it does not exist, update the .proto file to add this
field before implementing the C++ changes.
|
@JaySon-Huang: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note
Summary by CodeRabbit
New Features
Documentation
Chores