perf(raft): partition etcd dispatcher by message type to isolate heartbeats#577
perf(raft): partition etcd dispatcher by message type to isolate heartbeats#577
Conversation
…tbeats Adds an opt-in 4-lane dispatcher layout (heartbeat / replication / snapshot / other) behind the ELASTICKV_RAFT_DISPATCHER_LANES feature flag. Default behavior (flag unset or "0") is byte-for-byte identical to the current 2-lane (heartbeat + normal) implementation introduced in PR #522. When enabled, each per-peer peerQueues gains three extra channels and three extra goroutines so that: - MsgApp / MsgAppResp run on their own replication lane, independent of heartbeats and of MsgSnap. - MsgSnap (bulky, rare) runs on its own snapshot lane and can no longer stall subsequent MsgApps behind a multi-MiB transfer. - Heartbeat / vote / read-index traffic keeps its dedicated priority lane, so heartbeats still cannot be starved under write load. Per-peer within-type ordering (the raft invariant that matters for MsgApp) is preserved because a given peer MsgApp stream still shares one lane and one worker. dispatchDropCount and postDispatchReport firing semantics are unchanged, and the existing drop paths have been factored to close whichever subset of lanes is actually wired up. Rollout plan: ship default-off, enable in staging with ELASTICKV_RAFT_DISPATCHER_LANES=1 and watch dispatchDropCount + dispatchErrorCount + Prometheus heartbeat-drop gauge for 24h before flipping production. The flag can be removed once the 4-lane path has soaked.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 43 minutes and 37 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdded an environment-controlled feature flag to switch between legacy 2-lane per-peer dispatch and a new opt-in 4-lane per-peer dispatch; extended peer queues, lane selection, worker startup/cleanup, and tests to validate routing and concurrent behavior. Changes
Sequence Diagram(s)(omitted — changes are internal feature additions with limited multi-actor runtime sequencing) Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements an opt-in 4-lane dispatcher for the Raft engine to prevent large snapshot transfers from blocking replication traffic. It introduces new message lanes for replication, snapshots, and other traffic, controlled by an environment variable. The implementation includes a new routing function, selectDispatchLane, and comprehensive tests for the multi-lane logic. Feedback suggests simplifying the routing logic by utilizing the existing isPriorityMsg helper to reduce code duplication.
| func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest { | ||
| if !e.dispatcherLanesEnabled { | ||
| if isPriorityMsg(msgType) { | ||
| return pd.heartbeat | ||
| } | ||
| return pd.normal | ||
| } | ||
| switch msgType { | ||
| case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: | ||
| return pd.heartbeat | ||
| case raftpb.MsgApp, raftpb.MsgAppResp: | ||
| return pd.replication | ||
| case raftpb.MsgSnap: | ||
| return pd.snapshot | ||
| case raftpb.MsgVote, raftpb.MsgVoteResp, | ||
| raftpb.MsgPreVote, raftpb.MsgPreVoteResp, | ||
| raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, | ||
| raftpb.MsgTimeoutNow: | ||
| // Election / read-index traffic is small and latency-sensitive but | ||
| // rare; put it on the heartbeat lane so it keeps its priority | ||
| // treatment and, like today, is never stuck behind MsgApp. | ||
| return pd.heartbeat | ||
| default: | ||
| return pd.other | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic in selectDispatchLane can be simplified by leveraging the existing isPriorityMsg helper. Since all priority messages (heartbeats, votes, read-index, etc.) are routed to the heartbeat lane in both legacy and 4-lane modes, checking it first allows for a cleaner implementation and avoids duplicating the list of priority message types in the switch statement.
func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest {
if isPriorityMsg(msgType) {
return pd.heartbeat
}
if !e.dispatcherLanesEnabled {
return pd.normal
}
switch msgType {
case raftpb.MsgApp, raftpb.MsgAppResp:
return pd.replication
case raftpb.MsgSnap:
return pd.snapshot
default:
return pd.other
}
}Route priority control traffic through isPriorityMsg up front in both legacy and 4-lane modes, dropping the duplicated message-type list in the switch. Behaviour is identical: heartbeats, votes, read-index and timeout-now still ride pd.heartbeat, MsgApp/MsgAppResp go to pd.replication, MsgSnap to pd.snapshot, and everything else falls through to pd.other (4-lane) or pd.normal (legacy).
|
Addressed the gemini-code-assist review (
Commit: 95370c7 /gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher layout for Raft messages, splitting traffic into heartbeat, replication, snapshot, and other categories to prevent bulky snapshots from stalling replication. The feature is controlled by the ELASTICKV_RAFT_DISPATCHER_LANES environment variable. Feedback was provided to use strconv.ParseBool for more standard environment variable parsing.
| func dispatcherLanesEnabledFromEnv() bool { | ||
| v := os.Getenv(dispatcherLanesEnvVar) | ||
| return v != "" && v != "0" | ||
| } |
There was a problem hiding this comment.
The current implementation of dispatcherLanesEnabledFromEnv is quite permissive, enabling the feature for any non-empty value other than "0". While this matches the PR description, using strconv.ParseBool would provide a more standard and predictable behavior for boolean environment variables (e.g., handling "true", "1", "false", "0" consistently). Additionally, if this dispatcher lanes feature is a prototype or limited-scope implementation, ensure that its limitations are explicitly documented and runtime validation is added to prevent misuse in a general context.
| func dispatcherLanesEnabledFromEnv() bool { | |
| v := os.Getenv(dispatcherLanesEnvVar) | |
| return v != "" && v != "0" | |
| } | |
| func dispatcherLanesEnabledFromEnv() bool { | |
| v := os.Getenv(dispatcherLanesEnvVar) | |
| enabled, _ := strconv.ParseBool(v) | |
| return enabled | |
| } |
References
- For prototype or limited-scope implementations, explicitly document the limitations (e.g., single-node only, blocking startup) and add runtime validation to prevent misuse in a general context.
Replace the permissive "non-empty and not 0" check in dispatcherLanesEnabledFromEnv with strconv.ParseBool so the env var accepts only the standard boolean tokens (1/t/T/TRUE/true/True and 0/f/F/FALSE/false/False). Unrecognized or empty values default to false. Addresses Gemini review on PR #577.
|
/gemini review Addressed the 2026-04-21T20:52:56Z medium-priority review:
Commit: 6ff5fce |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
Pull request overview
This PR adds an opt-in, per-peer 4-lane outbound dispatcher to the etcd/raft engine to prevent head-of-line blocking between heartbeats, replication traffic, and large snapshot transfers, while keeping the default (legacy 2-lane) behavior unchanged.
Changes:
- Introduces a 4-lane per-peer dispatcher (heartbeat / replication / snapshot / other) gated by
ELASTICKV_RAFT_DISPATCHER_LANES. - Adds
selectDispatchLaneto centralize routing of raft message types to the appropriate lane. - Expands test coverage to pin lane selection behavior and validate that snapshot dispatch cannot block replication in 4-lane mode.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
internal/raftengine/etcd/engine.go |
Adds the opt-in 4-lane dispatcher plumbing (env flag, lane selection, per-peer lane creation, and lane-safe close on peer removal). |
internal/raftengine/etcd/engine_test.go |
Adds tests for lane routing, snapshot vs replication independence, peer removal cleanup in 4-lane mode, and env-flag parsing. |
| cases := map[raftpb.MessageType]chan dispatchRequest{ | ||
| raftpb.MsgHeartbeat: pd.heartbeat, | ||
| raftpb.MsgHeartbeatResp: pd.heartbeat, | ||
| raftpb.MsgReadIndex: pd.heartbeat, | ||
| raftpb.MsgVote: pd.heartbeat, | ||
| raftpb.MsgTimeoutNow: pd.heartbeat, | ||
| raftpb.MsgApp: pd.normal, | ||
| raftpb.MsgAppResp: pd.normal, | ||
| raftpb.MsgSnap: pd.normal, | ||
| } |
There was a problem hiding this comment.
TestSelectDispatchLane_LegacyTwoLane claims to pin the legacy routing table, but it doesn’t cover several message types that are treated as priority by isPriorityMsg (e.g., MsgReadIndexResp, MsgVoteResp, MsgPreVote, MsgPreVoteResp). Adding these cases would better prevent accidental routing regressions in legacy mode.
|
/gemini review Addressed Copilot feedback: added |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher layout for Raft messages, designed to prevent bulky snapshot transfers from stalling log replication. It adds specific lanes for replication, snapshots, and other control messages, while maintaining the existing heartbeat lane for priority traffic. The feature is opt-in via the ELASTICKV_RAFT_DISPATCHER_LANES environment variable. The changes include updates to the Engine and peerQueues structures, message routing logic, and peer lifecycle management, supported by new unit tests verifying lane independence and environment variable parsing. I have no feedback to provide as there were no review comments.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/raftengine/etcd/engine.go (1)
3045-3055: Optional: consider per-lane drop counters for the 4-lane rollout.
dispatchDropCountaggregates drops across all lanes, so dashboards can't distinguish a heartbeat-lane stall (cluster-liveness impacting) from a snapshot-lane stall (benign/expected under retries). Since the whole point of the 4-lane partitioning is isolation, per-lane counters (or at least a lane label on the existing counter, viamsg.Typebucketed to{heartbeat, replication, snapshot, other}) would make the staging soak and single-replica production enable much easier to evaluate. Not a blocker for default-off merge, but worth adding before the fleetwide enable step in the rollout plan.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 3045 - 3055, recordDroppedDispatch currently increments a single aggregate counter (dispatchDropCount) which hides which raft lane (heartbeat, replication, snapshot, other) dropped a message; update recordDroppedDispatch to map msg.Type (use its value in recordDroppedDispatch) into a lane bucket and increment a per-lane counter instead (e.g. add e.dispatchDropCounts map[string]*atomic.Int64 or a metrics counter with a "lane" label and increment the appropriate bucket), still call shouldLogDispatchEvent using the per-lane count or both aggregate+lane as desired, and include the lane identifier in the slog.Warn fields; update any initialization of dispatchDropCount to create the per-lane counters and adjust uses of dispatchDropCount accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 3045-3055: recordDroppedDispatch currently increments a single
aggregate counter (dispatchDropCount) which hides which raft lane (heartbeat,
replication, snapshot, other) dropped a message; update recordDroppedDispatch to
map msg.Type (use its value in recordDroppedDispatch) into a lane bucket and
increment a per-lane counter instead (e.g. add e.dispatchDropCounts
map[string]*atomic.Int64 or a metrics counter with a "lane" label and increment
the appropriate bucket), still call shouldLogDispatchEvent using the per-lane
count or both aggregate+lane as desired, and include the lane identifier in the
slog.Warn fields; update any initialization of dispatchDropCount to create the
per-lane counters and adjust uses of dispatchDropCount accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2af03391-3951-472f-93d3-53b9f7639ca3
📒 Files selected for processing (2)
internal/raftengine/etcd/engine.gointernal/raftengine/etcd/engine_test.go
| @@ -65,11 +67,26 @@ const ( | |||
| // upside is that a ~5 s transient pause (election-timeout scale) | |||
| // no longer drops heartbeats and force the peers' lease to expire. | |||
There was a problem hiding this comment.
Minor grammar in the constant comment: "no longer drops heartbeats and force the peers' lease to expire" should be "...and forces...".
| // no longer drops heartbeats and force the peers' lease to expire. | |
| // no longer drops heartbeats and forces the peers' lease to expire. |
|
/gemini review Addressed both review items in 676f1d1:
Verification:
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/raftengine/etcd/engine.go (1)
1475-1490:⚠️ Potential issue | 🟠 MajorReport dropped snapshots back to raft.
In 4-lane mode,
MsgSnapcan be dropped before it reaches transport when the snapshot lane is full. OnlyrecordDroppedDispatchis called on pre-transport drops, so raft never receivesReportSnapshot(..., SnapshotFailure)and the peer remains stuck in snapshot progress. Add snapshot failure reporting on pre-transport drops.Proposed fix
ch := e.selectDispatchLane(pd, msg.Type) // Avoid the expensive deep-clone in prepareDispatchRequest when the channel // is already full. The len/cap check is safe here because this function is // only ever called from the single engine event-loop goroutine. if len(ch) >= cap(ch) { e.recordDroppedDispatch(msg) + e.reportDroppedSnapshot(msg) return nil } dispatchReq := prepareDispatchRequest(msg) select { case ch <- dispatchReq: return nil default: _ = dispatchReq.Close() e.recordDroppedDispatch(msg) + e.reportDroppedSnapshot(msg) return nil } } + +func (e *Engine) reportDroppedSnapshot(msg raftpb.Message) { + if msg.Type != raftpb.MsgSnap || e.rawNode == nil { + return + } + e.rawNode.ReportSnapshot(msg.To, etcdraft.SnapshotFailure) +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 1475 - 1490, When a dispatch is dropped before transport (both the len(ch) >= cap(ch) fast-path and the default case after attempting to send), detect if msg.Type == MsgSnap and invoke ReportSnapshot to notify raft of a snapshot failure; specifically, in the pre-transport drop branches around selectDispatchLane/prepareDispatchRequest/recordDroppedDispatch add a call like ReportSnapshot(msg.To, msg.RegionID, SnapshotFailure) (or the local wrapper that sends ReportSnapshot) before returning, so raft receives SnapshotFailure for dropped MsgSnap messages.
🧹 Nitpick comments (1)
internal/raftengine/etcd/engine.go (1)
1523-1530: Avoid the new//nolint:exhaustive.This routing is intentionally partial, so simple conditionals avoid the linter suppression without changing behavior.
♻️ Proposed refactor
- switch msgType { //nolint:exhaustive // only MsgApp/MsgAppResp/MsgSnap need dedicated lanes; the rest falls through to pd.other - case raftpb.MsgApp, raftpb.MsgAppResp: + if msgType == raftpb.MsgApp || msgType == raftpb.MsgAppResp { return pd.replication - case raftpb.MsgSnap: + } + if msgType == raftpb.MsgSnap { return pd.snapshot - default: - return pd.other } + return pd.otherAs per coding guidelines, Go code should “avoid adding
//nolintunless absolutely required; prefer refactoring.”🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 1523 - 1530, The switch uses a `//nolint:exhaustive` suppression; remove that by replacing the switch with explicit conditionals that express the same partial routing: check msgType against raftpb.MsgApp || raftpb.MsgAppResp and return pd.replication, else if msgType == raftpb.MsgSnap return pd.snapshot, otherwise return pd.other — update the logic around the msgType variable and the pd.replication/pd.snapshot/pd.other returns so behavior is identical but without the nolint.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 1475-1490: When a dispatch is dropped before transport (both the
len(ch) >= cap(ch) fast-path and the default case after attempting to send),
detect if msg.Type == MsgSnap and invoke ReportSnapshot to notify raft of a
snapshot failure; specifically, in the pre-transport drop branches around
selectDispatchLane/prepareDispatchRequest/recordDroppedDispatch add a call like
ReportSnapshot(msg.To, msg.RegionID, SnapshotFailure) (or the local wrapper that
sends ReportSnapshot) before returning, so raft receives SnapshotFailure for
dropped MsgSnap messages.
---
Nitpick comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 1523-1530: The switch uses a `//nolint:exhaustive` suppression;
remove that by replacing the switch with explicit conditionals that express the
same partial routing: check msgType against raftpb.MsgApp || raftpb.MsgAppResp
and return pd.replication, else if msgType == raftpb.MsgSnap return pd.snapshot,
otherwise return pd.other — update the logic around the msgType variable and the
pd.replication/pd.snapshot/pd.other returns so behavior is identical but without
the nolint.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: aec7e83e-ff09-488e-9144-ddf64c77d8eb
📒 Files selected for processing (2)
internal/raftengine/etcd/engine.gointernal/raftengine/etcd/engine_test.go
✅ Files skipped from review due to trivial changes (1)
- internal/raftengine/etcd/engine_test.go
There was a problem hiding this comment.
Code Review
This pull request introduces an opt-in 4-lane dispatcher for Raft messages, allowing heartbeats, replication, snapshots, and other control traffic to operate on independent goroutines. This prevents large snapshot transfers from stalling critical replication or heartbeat messages. The feedback suggests improving the robustness of the message routing logic by explicitly handling all message types in the dispatcher's switch statement to avoid potential misclassification of future additions.
| switch msgType { //nolint:exhaustive // only MsgApp/MsgAppResp/MsgSnap need dedicated lanes; the rest falls through to pd.other | ||
| case raftpb.MsgApp, raftpb.MsgAppResp: | ||
| return pd.replication | ||
| case raftpb.MsgSnap: | ||
| return pd.snapshot | ||
| default: | ||
| return pd.other | ||
| } |
There was a problem hiding this comment.
The switch statement in selectDispatchLane uses a //nolint:exhaustive comment. To avoid silently dropping or misclassifying entries, which could lead to state inconsistencies, it is better to explicitly handle all raftpb.MessageType entries. This ensures that any future additions are correctly categorized and not just lumped into a default lane, maintaining the integrity of the message dispatching logic.
References
- Avoid silently dropping entries during serialization or normalization to ensure they are preserved on disk and avoid state inconsistencies.
Address gemini medium review on PR #577: replace the //nolint:exhaustive on selectDispatchLane with an exhaustive switch that lists every raftpb.MessageType. Future additions to the upstream enum now fail the exhaustive linter instead of being silently routed to pd.other. Behaviour is preserved: MsgApp/MsgAppResp stay on the replication lane, MsgSnap stays on the snapshot lane, and all local-only / feedback messages continue to route to pd.other. MsgSnapStatus is grouped with MsgSnap for readability (it is a local feedback message and is never dispatched to peers in practice).
|
/gemini review Addressed the medium review on
Tests: Commit: dbe21a6 |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher layout for Raft messages to prevent large snapshot transfers from stalling log replication. By enabling the ELASTICKV_RAFT_DISPATCHER_LANES environment variable, traffic is partitioned into heartbeat, replication, snapshot, and other lanes, each handled by a dedicated goroutine. The changes include updates to the Engine and peerQueues structures, new lane selection logic, and comprehensive unit tests. A review comment identifies unreachable message types in the selectDispatchLane switch statement that should be removed to improve maintainability.
| switch msgType { | ||
| case raftpb.MsgApp, raftpb.MsgAppResp: | ||
| return pd.replication | ||
| case raftpb.MsgSnap, raftpb.MsgSnapStatus: | ||
| return pd.snapshot | ||
| case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp, | ||
| raftpb.MsgVote, raftpb.MsgVoteResp, | ||
| raftpb.MsgPreVote, raftpb.MsgPreVoteResp, | ||
| raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, | ||
| raftpb.MsgTimeoutNow: | ||
| // Unreachable: isPriorityMsg already short-circuited these above. | ||
| // Listed for exhaustiveness; keep them on the heartbeat lane so any | ||
| // future refactor that removes the short-circuit stays correct. | ||
| return pd.heartbeat | ||
| case raftpb.MsgHup, raftpb.MsgBeat, raftpb.MsgProp, | ||
| raftpb.MsgUnreachable, raftpb.MsgCheckQuorum, raftpb.MsgTransferLeader, | ||
| raftpb.MsgStorageAppend, raftpb.MsgStorageAppendResp, | ||
| raftpb.MsgStorageApply, raftpb.MsgStorageApplyResp, | ||
| raftpb.MsgForgetLeader: | ||
| return pd.other | ||
| } |
There was a problem hiding this comment.
The switch statement in selectDispatchLane includes several message types that are local to the raft node (e.g., MsgHup, MsgBeat, MsgProp, MsgCheckQuorum, MsgSnapStatus, MsgUnreachable). These cases are unreachable because these message types are filtered out by skipDispatchMessage earlier in the call chain. To improve code clarity and maintainability, these unreachable code paths should be removed.
References
- Remove dead or unreachable code paths, such as asynchronous handlers for tasks that are always performed synchronously, to improve code clarity and maintainability.
Remove cases for MsgHup, MsgBeat, MsgProp, MsgUnreachable, MsgSnapStatus,
MsgCheckQuorum and MsgStorage{Append,Apply}{,Resp} from selectDispatchLane.
All of these are unreachable in this code path:
- MsgHup/MsgBeat/MsgUnreachable/MsgSnapStatus/MsgCheckQuorum and the
MsgStorage* family are local messages per etcd/raft's IsLocalMsg table,
and skipDispatchMessage already drops them before enqueueDispatchMessage
calls selectDispatchLane.
- MsgProp is never emitted outbound because DisableProposalForwarding is
set and handleProposal rejects non-leader proposals, so no follower
ever forwards a proposal to the leader.
- The priority control types (MsgHeartbeat/Resp, votes, read-index,
MsgTimeoutNow) are short-circuited by the isPriorityMsg branch above
the switch.
The switch now lists only the types that can actually reach it
(MsgApp/MsgAppResp, MsgSnap, MsgTransferLeader, MsgForgetLeader) plus a
fallback default for any future raftpb additions. The exhaustive linter
is silenced with a targeted //nolint:exhaustive that references
skipDispatchMessage and isPriorityMsg so future reviewers understand why
the switch isn't exhaustive.
|
Verified the claim about Confirmed unreachable (commit 3076cfb):
Action taken: removed the dead cases; the switch now lists only types that can actually reach it ( Tests pass in both modes:
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher layout for Raft messages, partitioned into heartbeat, replication, snapshot, and other lanes to prevent large snapshot transfers from blocking replication. The feature is opt-in via the ELASTICKV_RAFT_DISPATCHER_LANES environment variable. The implementation includes updated message routing logic, per-lane worker goroutines, and unit tests verifying lane independence. Review feedback suggests enhancing the robustness of the message routing switch statement by explicitly handling additional message types like MsgProp to guard against future library changes.
| switch msgType { //nolint:exhaustive // filtered types handled by skipDispatchMessage + isPriorityMsg; see comment above. | ||
| case raftpb.MsgApp, raftpb.MsgAppResp: | ||
| return pd.replication | ||
| case raftpb.MsgSnap: | ||
| return pd.snapshot | ||
| case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader: | ||
| return pd.other | ||
| } |
There was a problem hiding this comment.
The selectDispatchLane function uses a switch statement with nolint:exhaustive to route non-priority messages. While the comment explains that many types are filtered out earlier, explicitly handling raftpb.MsgProp (even if unreachable due to current configuration) or providing a more descriptive default case would improve robustness against future changes in the etcd/raft library or engine configuration.
Adds an explicit MsgProp case in selectDispatchLane that panics with a message referencing DisableProposalForwarding. Today the case is unreachable because DisableProposalForwarding=true prevents outbound MsgProp, but the prior default fallback silently routed any future MsgProp to pd.other. Turning that silent misrouting into a loud failure flags the invariant if proposal forwarding is ever re-enabled upstream or in our raft.Config without also updating this switch.
|
/gemini review Addressed the latest medium on Added an explicit Commit: 7d91662 Verification:
|
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher layout for Raft messages to prevent bulky snapshot transfers from stalling log replication. By opting in via the ELASTICKV_RAFT_DISPATCHER_LANES environment variable, the system partitions traffic into heartbeat, replication, snapshot, and other lanes, each with its own goroutine. The implementation includes updated peer queue structures, dynamic lane selection logic, and comprehensive tests verifying lane independence. Feedback was provided regarding the use of a panic for unexpected message types, suggesting a safer fallback to error logging and a default lane to maintain system availability.
| case raftpb.MsgProp: | ||
| // DisableProposalForwarding=true (see raft.Config construction in this | ||
| // file) guarantees no outbound MsgProp is ever emitted, so this case | ||
| // is unreachable today. If it does fire, proposal forwarding was | ||
| // re-enabled and we must decide the correct lane explicitly rather | ||
| // than silently lumping proposals into pd.other. | ||
| panic("selectDispatchLane: outbound MsgProp requires explicit lane once DisableProposalForwarding is disabled") |
There was a problem hiding this comment.
Panicking in a library or engine component due to an unexpected message type is generally discouraged, as it can crash the entire process. While DisableProposalForwarding is currently set to true, a future configuration change or a bug in the upstream etcd/raft library could trigger this path. It is safer to log a critical error and fall back to a default lane (like pd.other) to maintain system availability.
| case raftpb.MsgProp: | |
| // DisableProposalForwarding=true (see raft.Config construction in this | |
| // file) guarantees no outbound MsgProp is ever emitted, so this case | |
| // is unreachable today. If it does fire, proposal forwarding was | |
| // re-enabled and we must decide the correct lane explicitly rather | |
| // than silently lumping proposals into pd.other. | |
| panic("selectDispatchLane: outbound MsgProp requires explicit lane once DisableProposalForwarding is disabled") | |
| case raftpb.MsgProp: | |
| slog.Error("selectDispatchLane: unexpected outbound MsgProp encountered", "node_id", e.nodeID, "to", pd.heartbeat) | |
| return pd.other |
Panicking inside a raft engine dispatcher goroutine would crash the entire node. DisableProposalForwarding=true still guarantees outbound MsgProp should not reach selectDispatchLane, but if it does (upstream etcd/raft or our config changed), log loudly and route to pd.other rather than taking the process down. Per gemini review on PR #577.
|
/gemini review Addressed medium: replaced Added Commit: 767f40f |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher for Raft messages to isolate heartbeats, replication, and snapshots, preventing bulky transfers from blocking critical traffic. Controlled via the ELASTICKV_RAFT_DISPATCHER_LANES environment variable, the update includes a new message routing function and comprehensive tests. Review feedback suggests removing the handling logic for MsgProp messages to simplify the dispatcher's logic, as they are currently unreachable.
| slog.Error("selectDispatchLane: unexpected outbound MsgProp encountered", | ||
| slog.Uint64("raft_node_id", e.nodeID), | ||
| slog.String("type", msgType.String()), | ||
| ) |
There was a problem hiding this comment.
The code indicates that MsgProp is expected to be unreachable due to DisableProposalForwarding=true. To improve code clarity and maintainability, dead or unreachable code paths should be removed rather than just logged. Consider removing this unreachable logic entirely from the engine's hot path.
References
- Remove dead or unreachable code paths, such as asynchronous handlers for tasks that are always performed synchronously, to improve code clarity and maintainability.
|
/gemini review Addressed gemini's medium suggestion in cdad492: removed the unreachable
|
There was a problem hiding this comment.
Code Review
This pull request introduces an optional 4-lane dispatcher for Raft messages, designed to prevent bulky snapshot transfers from stalling log replication. By partitioning traffic into heartbeat, replication, snapshot, and other lanes, the engine can maintain better throughput and stability under load. The feature is opt-in via the ELASTICKV_RAFT_DISPATCHER_LANES environment variable. Feedback was provided regarding a potential compilation error due to the use of a message type that may not exist in the standard Raft package.
I am having trouble creating individual review comments. Click here to see my feedback.
internal/raftengine/etcd/engine.go (1532-1539)
The switch statement for message routing in 4-lane mode uses raftpb.MsgForgetLeader. However, this message type is not part of the standard go.etcd.io/raft/v3/raftpb package. If this is a custom extension or from a specific fork, please ensure it is correctly defined in the imported package to avoid compilation errors. If it was intended to be a different standard type, please correct it.
|
It was added to upstream etcd/raft for the leadership-transfer protocol; the branch builds cleanly and |
## Summary Adds an `EXTRA_ENV` pass-through to `scripts/rolling-update.sh` so operators can set container environment variables from `deploy.env` without editing this script each time. ## Motivation Immediate trigger: enabling `ELASTICKV_RAFT_DISPATCHER_LANES=1` (feature flag added in PR #577). We want to flip it on via the deploy envelope, not by rebuilding the image or hand-editing this script. Post-#575 raft metrics are already healthy, so the 4-lane dispatcher is being enabled as a defensive measure — it keeps heartbeats from being starved by MsgApp / MsgSnap bursts under extreme write load, rather than as a fix for anything actively broken. ## Change - `run_container`: if `EXTRA_ENV` is set, split on whitespace and forward each pair as a single `docker run -e KEY=VALUE` flag. - Comment documents the whitespace-split semantics (pairs must not contain whitespace; values may contain characters bash would otherwise interpret). ## Usage ```bash # deploy.env EXTRA_ENV="ELASTICKV_RAFT_DISPATCHER_LANES=1 ELASTICKV_PEBBLE_CACHE_MB=512" ``` ## Test plan - [x] `bash -n scripts/rolling-update.sh` passes - [x] `shellcheck` / `make lint` — 0 issues - [ ] Rolling deploy with EXTRA_ENV set; verify via `docker inspect elastickv --format '{{.Config.Env}}'` on each node <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added the ability to pass arbitrary additional environment variables to containers via a new input; these are injected into container runtime. * **Improvements** * Extra env entries are validated and safely normalized/escaped for remote transport. * Core runtime variables are now sent in escaped/quoted form to the remote execution context for safer handling. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
ELASTICKV_RAFT_DISPATCHER_LANES=1. Default behavior is unchanged.MsgSnapon its own goroutine so a multi-MiB snapshot transfer can no longer stall subsequentMsgApps queued to the same peer; addresses the residual failure mode from the 324k-heartbeat-drop incident that PR perf(kv): serve ShardStore.GetAt reads via lease, not read-index #575 partially mitigated.dispatchDropCountaccounting, andpostDispatchReportfiring after every dispatch attempt (soReportUnreachable/ReportSnapshotstill land in etcd/raft).What changed
peerQueuesalready had two lanes (heartbeat+normal) from PR #522. With the flag on, it now carries four:heartbeatMsgHeartbeat/MsgHeartbeatResp, Vote / PreVote (+Resp), ReadIndex (+Resp),MsgTimeoutNowreplicationMsgApp/MsgAppRespMaxInflightMsg(1024)snapshotMsgSnapotherEach lane gets its own goroutine, drained independently. Within-type ordering to a given peer is preserved because all of a peer's messages of one type share exactly one channel and one worker. See
selectDispatchLane.When the flag is off we construct the old
heartbeat + normalpair exactly as today — no behavior change, no goroutine count change, no allocation change.Why behind a flag
The raft dispatch path is high blast radius: a bug here can drop heartbeats cluster-wide and trigger mass elections. Rather than swap the hot path in one shot, this lands the structural change plumbed but dormant so the default production footprint is unchanged.
Rollout plan
ELASTICKV_RAFT_DISPATCHER_LANES=1in staging, watchelastickv_raft_dispatch_drop_total,elastickv_raft_dispatch_errors_total, and heartbeat loss metrics for 24h under synthetic write load + forced snapshot transfer.startPeerDispatcher/selectDispatchLanein a follow-up PR.Test plan
go test -race -count=1 -timeout 300s ./internal/raftengine/etcd/...— passes (flag off)ELASTICKV_RAFT_DISPATCHER_LANES=1 go test -race -count=1 -timeout 300s ./internal/raftengine/etcd/...— passes (flag on)TestSelectDispatchLane_LegacyTwoLane— pins today's routing tableTestSelectDispatchLane_FourLane— pins new routing tableTestFourLaneDispatcher_SnapshotDoesNotBlockReplication— exercises the HOL-blocking invariant the flag is meant to fixTestFourLaneDispatcher_RemovePeerClosesAllLanes— ensures no goroutine leak on peer removal in 4-lane modeTestDispatcherLanesEnabledFromEnv— env parsingSummary by CodeRabbit
New Features
Tests