perf(raft): raise MaxInflightMsgs=1024, MaxSizePerMsg=4MB defaults#593
perf(raft): raise MaxInflightMsgs=1024, MaxSizePerMsg=4MB defaults#593
Conversation
Expose etcd/raft's MaxInflightMsgs and MaxSizePerMsg as runtime-tunable knobs via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and ELASTICKV_RAFT_MAX_SIZE_PER_MSG, following the same pattern as ELASTICKV_RAFT_SNAPSHOT_COUNT (#589) and ELASTICKV_RAFT_MAX_WAL_FILES (#589). Production defaults bump to 1024 and 4 MiB. Why: - MaxSizePerMsg 1 MiB -> 4 MiB: under small-entry KV workloads (Redis-style, median entry ~500 B) this drastically lowers the MsgApp count per committed byte. Fewer dispatcher wake-ups on the leader, fewer recv syscalls and less apply-vs-read contention on the follower. This is the real win of the change. - MaxInflightMsgs 256 -> 1024: insurance for burst write phases where entries arrive faster than round-trip completion on wide-bandwidth LAN. The in-process defaultMaxInflightMsg was already 1024 (see stepCh sizing in PR #560), but main.go was still passing 256 at the OpenConfig boundary; align the caller with the engine's internal expectation. Semantics: - Env var wins over caller-supplied cfg so operators can retune a running deployment without a rebuild. Invalid values (non-numeric, zero/negative, sub-1KiB size) log a warning and fall back to the compiled-in default rather than leak through to raft.Config. - Chosen values are logged at Open at info level so a misconfigured override is visible in the boot log. - MaxSizePerMsg is parsed as plain integer bytes for consistency with the other numeric knobs in this package. Tests cover: defaults when unset, valid override propagation into raft.Config, invalid values falling back to defaults.
📝 WalkthroughWalkthroughThe pull request re-tunes etcd/raft inbound buffering and message size limits, introducing environment variable overrides for Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request increases the default Raft message size limits from 1 MiB to 4 MiB and the maximum inflight messages from 256 to 1024 to optimize performance for small-entry KV workloads. It also introduces environment variable overrides for these parameters to allow operators to tune replication flow control at runtime without a rebuild. Review feedback identifies a logic discrepancy in the environment variable parsing functions: when an invalid value is provided, the code returns a failure signal that causes the system to retain caller-supplied configuration rather than falling back to the compiled-in defaults as indicated by the warning logs. Suggestions were provided to ensure the fallback behavior matches the logged intent.
| n, err := strconv.Atoi(v) | ||
| if err != nil || n < 1 { | ||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", | ||
| "value", v, "default", defaultMaxInflightMsg) | ||
| return 0, false | ||
| } |
There was a problem hiding this comment.
The warning message indicates that the engine will fall back to the default value (defaultMaxInflightMsg), but the function returns (0, false). This causes normalizeLimitConfig to retain the caller-supplied value from OpenConfig instead of the compiled-in default. If a caller provides a non-default value (e.g., 256) and the environment variable is malformed, the log message will be misleading as it won't actually use the default 1024. To align the implementation with the log message and the PR description, the function should return the default value and true when an invalid override is detected.
| n, err := strconv.Atoi(v) | |
| if err != nil || n < 1 { | |
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", | |
| "value", v, "default", defaultMaxInflightMsg) | |
| return 0, false | |
| } | |
| n, err := strconv.Atoi(v) | |
| if err != nil || n < 1 { | |
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", | |
| "value", v, "default", defaultMaxInflightMsg) | |
| return defaultMaxInflightMsg, true | |
| } |
References
- Avoid state inconsistencies during normalization by ensuring that invalid inputs are handled explicitly and do not lead to misleading states or logs.
| n, err := strconv.ParseUint(v, 10, 64) | ||
| if err != nil || n < minMaxSizePerMsg { | ||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | ||
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||
| return 0, false | ||
| } |
There was a problem hiding this comment.
Similar to the MaxInflightMsgs helper, this function returns (0, false) on invalid input, which prevents normalizeLimitConfig from applying the compiled-in default if the caller provided a different value. This makes the using default warning message misleading. The function should return the default value and true to ensure the fallback behavior matches the logs.
| n, err := strconv.ParseUint(v, 10, 64) | |
| if err != nil || n < minMaxSizePerMsg { | |
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | |
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | |
| return 0, false | |
| } | |
| n, err := strconv.ParseUint(v, 10, 64) | |
| if err != nil || n < minMaxSizePerMsg { | |
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | |
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | |
| return uint64(defaultMaxSizePerMsg), true | |
| } |
References
- Avoid state inconsistencies during normalization by ensuring that invalid inputs are handled explicitly and do not lead to misleading states or logs.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 98554e6efd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if v, ok := maxInflightMsgFromEnv(); ok { | ||
| cfg.MaxInflightMsg = v |
There was a problem hiding this comment.
Size inbound queues from overridden MaxInflightMsg
When ELASTICKV_RAFT_MAX_INFLIGHT_MSGS is set, this code allows values above 1024, but Open still allocates stepCh and dispatchReportCh with a fixed capacity of defaultMaxInflightMsg (1024). In clusters that tune inflight to values like 2048 (as the new tests demonstrate), inbound traffic can hit the smaller fixed queue and return errStepQueueFull, effectively dropping/deferring raft messages and negating the intended throughput gain under bursty replication load.
Useful? React with 👍 / 👎.
The maxInflightMsgFromEnv / maxSizePerMsgFromEnv helpers logged "using default" when the env var was malformed, but returned (0, false) — telling normalizeLimitConfig to keep whatever the caller supplied. A caller passing MaxInflightMsg=256 plus a garbage env var would silently retain 256, making the warning log a lie. Return (defaultMaxInflightMsg, true) and (defaultMaxSizePerMsg, true) on invalid input so the default the operator-visible warning promises is actually the value that reaches raft.Config. Addresses gemini review feedback on #593.
…ault Open() allocated stepCh and dispatchReportCh at fixed capacity defaultMaxInflightMsg (1024). When operators raised ELASTICKV_RAFT_MAX_INFLIGHT_MSGS (e.g. to 2048) the raft layer honoured the higher inflight limit, but the inbound channels stayed at 1024, so bursty replication still tripped errStepQueueFull at 1024 queued messages — silently negating the tuning knob the PR was meant to expose. Size the two channels from prepared.cfg.MaxInflightMsg (already post-normalizeLimitConfig, so env override and compiled default have been applied). A new inboundChannelCap() helper clamps to minInboundChannelCap=256 to preserve a survivable floor if a caller passes a tiny value. Add TestOpen_InboundChannelsHonourMaxInflightEnv (env=2048 -> cap=2048) and TestOpen_InboundChannelsDefaultCap (unset -> cap=1024) plus a unit test for the helper. Addresses codex P1 review feedback on #593.
|
Addressed the two review findings from gemini and codex on this PR. Commits
Channel capacity before/after
Tests added / updated
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request increases the default Raft message size from 1 MiB to 4 MiB and the inflight message limit from 256 to 1024 to optimize performance for small-entry workloads. It introduces environment variable overrides (ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and ELASTICKV_RAFT_MAX_SIZE_PER_MSG) and ensures that internal channel capacities are dynamically sized to match these limits. Review feedback highlighted the increased worst-case memory footprint, which could lead to OOM issues in some environments, and noted a documentation inconsistency where the code falls back to defaults instead of clamping to a floor value.
| // Note that with the current defaultMaxSizePerMsg of 4 MiB, the | ||
| // true worst-case bound can be much larger (up to roughly 4 GiB |
There was a problem hiding this comment.
The increase in defaultMaxInflightMsg (1024) and defaultMaxSizePerMsg (4 MiB) significantly raises the worst-case memory footprint. As noted in the comments, this can reach ~4 GiB per peer. In a cluster with multiple peers, the aggregate memory buffered in stepCh and the per-peer outbound replication lanes could lead to OOM under congestion if entries are large. While this improves performance for small-entry workloads, consider if these defaults are too aggressive for general deployments, or if a more conservative MaxSizePerMsg (e.g., 1-2 MiB) would be safer while still providing significant batching benefits.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.
| // of the knob; clamp to this floor rather than rejecting so that a | ||
| // fat-fingered operator doesn't take out the engine. |
There was a problem hiding this comment.
The comment mentions "clamping" to the floor, but the implementation in maxSizePerMsgFromEnv (line 2961) falls back to the defaultMaxSizePerMsg (4 MiB) if the value is below the floor (1 KiB). Clamping usually implies using the floor value itself. Given the large gap between the floor and the default, this behavior might be unexpected for an operator trying to tune for low memory.
| // of the knob; clamp to this floor rather than rejecting so that a | |
| // fat-fingered operator doesn't take out the engine. | |
| // of the knob; fall back to the default rather than rejecting so that a | |
| // fat-fingered operator doesn't take out the engine. |
|
Codex Review: Didn't find any major issues. Hooray! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
The comment on minMaxSizePerMsg said the override clamps to the floor, but maxSizePerMsgFromEnv actually falls back to defaultMaxSizePerMsg when the env value is below the floor (or unparseable). Clamp usually implies using the floor itself, so the doc misled readers about the behaviour. Rewrite the sentence to match the implementation: fall back to the default rather than rejecting, so a fat-fingered operator doesn't take out the engine. No code change.
…se memory at 2 GiB/peer Gemini reviewer flagged that 1024 inflight x 4 MiB per message is a 4 GiB per-peer worst-case buffered footprint. With N peers and bursty replication traffic, the aggregate can plausibly reach OOM territory on general-purpose deployments where the operator has not tuned the knob to the workload. In practice TCP backpressure on a typical LAN will cap the actually buffered bytes well below the product, but the reviewer's framing is reasonable: a compiled-in default should be conservative. Take option B from the gemini response matrix: keep MaxSizePerMsg at 4 MiB (the MsgApp-batching win that motivates this PR's narrative on small-entry KV workloads) and drop MaxInflightMsgs back to 512. This halves the worst-case product to 2 GiB per peer while preserving the batching improvement. 512 is still a 2x safety margin over the pre-#529 default of 256 that we observed filling up on followers under the pebble-seek-storm incident (PRs #560/#562/#563/#565 have since removed most of that CPU contention, so the deeper pipelining of 1024 was motivated more by belt-and-suspenders than measured need). Operators who need deeper pipelines (wide-bandwidth LAN, plenty of RAM) can still raise the knob via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators who need a tighter memory budget can lower MaxSizePerMsg via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. Update the in-code rationale comment, the caller-side constant in main.go (so the engine default is actually exercised when no env override is set), and the TestNormalizeLimitConfig_DefaultsWhenUnset pin from 1024 -> 512. Drop the stale "(1024)" parenthetical in the TestOpen_InboundChannelsHonourMaxInflightEnv docblock.
|
Addressed both gemini medium items. Item 1 — memory-footprint concern (option B)Commit Why B over A: the existing code comment explicitly frames the 4 MiB bump as a batching win (amortise more entries per MsgApp, cut dispatcher wake-ups / recv syscalls on small-entry KV). Dropping Also updated Item 2 — clamping doc fixCommit Verification
/gemini review @codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a248131afe
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||
| // lower in memory-constrained clusters. | ||
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; |
There was a problem hiding this comment.
Update documented MaxInflightMsg default to match code
The OpenConfig.MaxInflightMsg docstring says the default is 1024, but this commit sets defaultMaxInflightMsg to 512 and tests pin that value. This mismatch can mislead operators and library users who rely on GoDoc/comments for tuning decisions, especially when diagnosing queue pressure or memory footprint. Please update the comment to reflect the actual default.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request updates the Raft engine's default flow-control parameters, increasing the maximum message size to 4 MiB and reducing the maximum in-flight messages to 512. It introduces environment variable overrides for these settings to allow runtime tuning and ensures that internal channel capacities are dynamically sized based on these limits. Review feedback identifies a documentation mismatch regarding the new default in-flight message count and suggests refining the logging logic to avoid redundancy in multi-shard deployments.
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||
| // lower in memory-constrained clusters. | ||
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; |
There was a problem hiding this comment.
The comment indicates a default value of 1024, but the actual compiled-in default defaultMaxInflightMsg has been set to 512 in this PR (line 60). This mismatch can mislead callers who rely on the documentation to understand the behavior when the field is left at its zero value.
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; | |
| // Default: 512. Increase for deeper pipelining on high-bandwidth links; |
| slog.Info("etcd raft engine: message size limits", | ||
| "max_inflight_msgs", cfg.MaxInflightMsg, | ||
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | ||
| ) |
There was a problem hiding this comment.
Logging the message size limits at Info level inside normalizeLimitConfig will produce redundant log entries for every shard opened (e.g., 100+ lines of identical output in a typical multi-shard deployment). Consider logging this once per process or only when the values deviate from the compiled-in defaults to reduce log noise.
|
Lowered Worst-case memory math With
This fits comfortably inside the 4–16 GiB RAM envelope of typical elastickv nodes, while still doubling etcd/raft's 1 MiB upstream default. Batching impact is marginal On the small-entry KV workload this PR targets (median ~500 B), 2 MiB / 500 B ≈ 4000 entries per MsgApp — well past the point where per-RPC amortisation saturates. 4 MiB bought essentially nothing on top of that. Changes
Note that Verification
Per the user's explicit confirmation that /gemini review |
There was a problem hiding this comment.
Code Review
This pull request adjusts Raft flow control parameters by setting defaultMaxInflightMsg to 512 and defaultMaxSizePerMsg to 2 MiB to optimize memory usage within typical node envelopes. It introduces environment variable overrides for these settings, allowing runtime tuning without rebuilds, and ensures that internal channel capacities are sized according to the resolved limits. Review feedback correctly identifies discrepancies in the documentation regarding these new default values and suggests enhancing log messages with the local engine ID for better observability in sharded environments.
| HeartbeatTick int | ||
| StateMachine StateMachine | ||
| // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level | ||
| // flow control). Default: 4 MiB. Larger values amortise more entries per |
There was a problem hiding this comment.
The docstring for MaxSizePerMsg states a default of 4 MiB, but the constant defaultMaxSizePerMsg is defined as 2 MiB (line 88). Please update the documentation to match the implementation. Maintaining an accurate and strictly enforced maximum size limit is essential when pre-allocating buffers for deserialization.
| // flow control). Default: 4 MiB. Larger values amortise more entries per | |
| // flow control). Default: 2 MiB. Larger values amortise more entries per |
References
- Pre-allocating a buffer is acceptable if a strictly enforced maximum size limit is checked before allocation and the entire data must be materialized in memory.
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||
| // lower in memory-constrained clusters. | ||
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; |
There was a problem hiding this comment.
The docstring for MaxInflightMsg states a default of 1024, but the constant defaultMaxInflightMsg is set to 512 (line 64). This should be updated to avoid confusing callers. Applying fixed bounds to collections that grow from external requests is necessary to prevent unbounded memory growth and OOM issues.
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; | |
| // Default: 512. Increase for deeper pipelining on high-bandwidth links; |
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.
| slog.Info("etcd raft engine: message size limits", | ||
| "max_inflight_msgs", cfg.MaxInflightMsg, | ||
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | ||
| ) |
There was a problem hiding this comment.
In sharded environments where multiple Raft engines may be active, it is helpful to include the local_id in this log message to identify which engine's limits are being reported.
| slog.Info("etcd raft engine: message size limits", | |
| "max_inflight_msgs", cfg.MaxInflightMsg, | |
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | |
| ) | |
| slog.Info("etcd raft engine: message size limits", | |
| "local_id", cfg.LocalID, | |
| "max_inflight_msgs", cfg.MaxInflightMsg, | |
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | |
| ) |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4edd02d5c0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||
| // lower in memory-constrained clusters. | ||
| // Default: 1024. Increase for deeper pipelining on high-bandwidth links; |
There was a problem hiding this comment.
Correct stale defaults in OpenConfig docs
The exported OpenConfig field comments now advertise MaxSizePerMsg defaulting to 4 MiB and MaxInflightMsg defaulting to 1024, but normalizeLimitConfig and the package constants set the real defaults to 2 MiB and 512. This mismatch can mislead operators and integrators doing memory sizing or troubleshooting, because they will believe the runtime is using a different operating point than it actually is.
Useful? React with 👍 / 👎.
|
Addressed codex P3: Files corrected:
/gemini review |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/raftengine/etcd/engine_test.go`:
- Around line 1990-2024: The tests call Open() which normalizes both raft env
vars and can pick up an ambient ELASTICKV_RAFT_MAX_SIZE_PER_MSG; ensure test
isolation by clearing that size env var before calling Open() in both
TestOpen_InboundChannelsHonourMaxInflightEnv and
TestOpen_InboundChannelsDefaultCap—add t.Setenv(maxSizePerMsgEnvVar, "") (in
addition to the existing t.Setenv(maxInflightMsgEnvVar, ...)) immediately before
creating the engine so Open() sees an empty/cleared max size env var.
In `@internal/raftengine/etcd/engine.go`:
- Around line 493-501: Update the outdated numeric value in the explanatory
comment referencing channel sizing: instead of saying the channel would be
silently capped at 1024, update the wording to reflect the current
defaultMaxInflightMsg value (512) and ensure the comment still explains that
using defaultMaxInflightMsg would cap the channel at that default even when the
Raft layer (MaxInflightMsg via normalizeLimitConfig /
ELASTICKV_RAFT_MAX_INFLIGHT_MSGS) requests a larger buffer, potentially
re-triggering errStepQueueFull under bursty conditions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 99d7c0ca-4f3e-48e8-a666-69a3894c3c45
📒 Files selected for processing (3)
internal/raftengine/etcd/engine.gointernal/raftengine/etcd/engine_test.gomain.go
| func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { | ||
| t.Setenv(maxInflightMsgEnvVar, "2048") | ||
| fsm := &testStateMachine{} | ||
| engine, err := Open(context.Background(), OpenConfig{ | ||
| NodeID: 1, | ||
| LocalID: "n1", | ||
| LocalAddress: "127.0.0.1:7001", | ||
| DataDir: t.TempDir(), | ||
| Bootstrap: true, | ||
| StateMachine: fsm, | ||
| }) | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { | ||
| require.NoError(t, engine.Close()) | ||
| }) | ||
| require.Equal(t, 2048, cap(engine.stepCh), | ||
| "stepCh capacity must reflect the env-overridden MaxInflightMsg") | ||
| require.Equal(t, 2048, cap(engine.dispatchReportCh), | ||
| "dispatchReportCh capacity must reflect the env-overridden MaxInflightMsg") | ||
| } | ||
|
|
||
| // TestOpen_InboundChannelsDefaultCap pins that with no env override the | ||
| // inbound channels are sized from the compiled-in default (512), the | ||
| // current production value. | ||
| func TestOpen_InboundChannelsDefaultCap(t *testing.T) { | ||
| t.Setenv(maxInflightMsgEnvVar, "") | ||
| fsm := &testStateMachine{} | ||
| engine, err := Open(context.Background(), OpenConfig{ | ||
| NodeID: 1, | ||
| LocalID: "n1", | ||
| LocalAddress: "127.0.0.1:7001", | ||
| DataDir: t.TempDir(), | ||
| Bootstrap: true, | ||
| StateMachine: fsm, | ||
| }) |
There was a problem hiding this comment.
Clear the size env var in these Open() tests.
Open() normalizes both raft limit env vars, so these tests can accidentally inherit an ambient ELASTICKV_RAFT_MAX_SIZE_PER_MSG.
Proposed test isolation fix
func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) {
t.Setenv(maxInflightMsgEnvVar, "2048")
+ t.Setenv(maxSizePerMsgEnvVar, "")
fsm := &testStateMachine{}
engine, err := Open(context.Background(), OpenConfig{
@@
func TestOpen_InboundChannelsDefaultCap(t *testing.T) {
t.Setenv(maxInflightMsgEnvVar, "")
+ t.Setenv(maxSizePerMsgEnvVar, "")
fsm := &testStateMachine{}
engine, err := Open(context.Background(), OpenConfig{📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { | |
| t.Setenv(maxInflightMsgEnvVar, "2048") | |
| fsm := &testStateMachine{} | |
| engine, err := Open(context.Background(), OpenConfig{ | |
| NodeID: 1, | |
| LocalID: "n1", | |
| LocalAddress: "127.0.0.1:7001", | |
| DataDir: t.TempDir(), | |
| Bootstrap: true, | |
| StateMachine: fsm, | |
| }) | |
| require.NoError(t, err) | |
| t.Cleanup(func() { | |
| require.NoError(t, engine.Close()) | |
| }) | |
| require.Equal(t, 2048, cap(engine.stepCh), | |
| "stepCh capacity must reflect the env-overridden MaxInflightMsg") | |
| require.Equal(t, 2048, cap(engine.dispatchReportCh), | |
| "dispatchReportCh capacity must reflect the env-overridden MaxInflightMsg") | |
| } | |
| // TestOpen_InboundChannelsDefaultCap pins that with no env override the | |
| // inbound channels are sized from the compiled-in default (512), the | |
| // current production value. | |
| func TestOpen_InboundChannelsDefaultCap(t *testing.T) { | |
| t.Setenv(maxInflightMsgEnvVar, "") | |
| fsm := &testStateMachine{} | |
| engine, err := Open(context.Background(), OpenConfig{ | |
| NodeID: 1, | |
| LocalID: "n1", | |
| LocalAddress: "127.0.0.1:7001", | |
| DataDir: t.TempDir(), | |
| Bootstrap: true, | |
| StateMachine: fsm, | |
| }) | |
| func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { | |
| t.Setenv(maxInflightMsgEnvVar, "2048") | |
| t.Setenv(maxSizePerMsgEnvVar, "") | |
| fsm := &testStateMachine{} | |
| engine, err := Open(context.Background(), OpenConfig{ | |
| NodeID: 1, | |
| LocalID: "n1", | |
| LocalAddress: "127.0.0.1:7001", | |
| DataDir: t.TempDir(), | |
| Bootstrap: true, | |
| StateMachine: fsm, | |
| }) | |
| require.NoError(t, err) | |
| t.Cleanup(func() { | |
| require.NoError(t, engine.Close()) | |
| }) | |
| require.Equal(t, 2048, cap(engine.stepCh), | |
| "stepCh capacity must reflect the env-overridden MaxInflightMsg") | |
| require.Equal(t, 2048, cap(engine.dispatchReportCh), | |
| "dispatchReportCh capacity must reflect the env-overridden MaxInflightMsg") | |
| } | |
| // TestOpen_InboundChannelsDefaultCap pins that with no env override the | |
| // inbound channels are sized from the compiled-in default (512), the | |
| // current production value. | |
| func TestOpen_InboundChannelsDefaultCap(t *testing.T) { | |
| t.Setenv(maxInflightMsgEnvVar, "") | |
| t.Setenv(maxSizePerMsgEnvVar, "") | |
| fsm := &testStateMachine{} | |
| engine, err := Open(context.Background(), OpenConfig{ | |
| NodeID: 1, | |
| LocalID: "n1", | |
| LocalAddress: "127.0.0.1:7001", | |
| DataDir: t.TempDir(), | |
| Bootstrap: true, | |
| StateMachine: fsm, | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/raftengine/etcd/engine_test.go` around lines 1990 - 2024, The tests
call Open() which normalizes both raft env vars and can pick up an ambient
ELASTICKV_RAFT_MAX_SIZE_PER_MSG; ensure test isolation by clearing that size env
var before calling Open() in both TestOpen_InboundChannelsHonourMaxInflightEnv
and TestOpen_InboundChannelsDefaultCap—add t.Setenv(maxSizePerMsgEnvVar, "") (in
addition to the existing t.Setenv(maxInflightMsgEnvVar, ...)) immediately before
creating the engine so Open() sees an empty/cleared max size env var.
| // Size the inbound step / dispatch-report channels from the | ||
| // resolved MaxInflightMsg (post-normalizeLimitConfig, which has | ||
| // already applied the env override and compiled-in default) so | ||
| // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above | ||
| // the default actually get the extra buffering they asked for. | ||
| // Using defaultMaxInflightMsg here would silently cap the | ||
| // channel at 1024 even when the Raft layer has been told to | ||
| // keep 2048 in flight, re-triggering errStepQueueFull under | ||
| // the exact bursty conditions this knob is meant to absorb. |
There was a problem hiding this comment.
Update the stale 1024 comment.
defaultMaxInflightMsg is now 512, so this comment is misleading.
Proposed comment fix
- // Using defaultMaxInflightMsg here would silently cap the
- // channel at 1024 even when the Raft layer has been told to
+ // Using defaultMaxInflightMsg here would silently cap the
+ // channel at the compiled default even when the Raft layer has been told to
// keep 2048 in flight, re-triggering errStepQueueFull under🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/raftengine/etcd/engine.go` around lines 493 - 501, Update the
outdated numeric value in the explanatory comment referencing channel sizing:
instead of saying the channel would be silently capped at 1024, update the
wording to reflect the current defaultMaxInflightMsg value (512) and ensure the
comment still explains that using defaultMaxInflightMsg would cap the channel at
that default even when the Raft layer (MaxInflightMsg via normalizeLimitConfig /
ELASTICKV_RAFT_MAX_INFLIGHT_MSGS) requests a larger buffer, potentially
re-triggering errStepQueueFull under bursty conditions.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b5c5298a21
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| n, err := strconv.Atoi(v) | ||
| if err != nil || n < 1 { | ||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", |
There was a problem hiding this comment.
Cap MaxInflight env override to avoid startup OOM
maxInflightMsgFromEnv accepts any positive integer, but Open uses the resolved value to allocate stepCh, dispatchReportCh, and per-peer dispatch queues, so a fat-fingered value like ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000 can trigger massive channel allocations and crash the process before the node becomes healthy. Since this knob is now operator-facing at runtime, it needs a sane upper bound (or rejection) rather than only checking n < 1.
Useful? React with 👍 / 👎.
| n, err := strconv.ParseUint(v, 10, 64) | ||
| if err != nil || n < minMaxSizePerMsg { | ||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | ||
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||
| return uint64(defaultMaxSizePerMsg), true |
There was a problem hiding this comment.
Enforce transport-safe upper bound for MaxSizePerMsg env
maxSizePerMsgFromEnv currently accepts any value >= 1 KiB, but the transport path in this repo is bounded by gRPC message-size limits (internal.GRPCMaxMessageBytes = 64 MiB via server/dial options), so setting ELASTICKV_RAFT_MAX_SIZE_PER_MSG above that can make Raft emit MsgApp payloads the transport cannot carry, causing repeated send failures/unreachable reports under large batches. The env parser should reject or clamp values above the transport budget.
Useful? React with 👍 / 👎.
Summary
MaxInflightMsgsandMaxSizePerMsgas runtime-tunable env vars (ELASTICKV_RAFT_MAX_INFLIGHT_MSGS,ELASTICKV_RAFT_MAX_SIZE_PER_MSG), following the same pattern asELASTICKV_RAFT_SNAPSHOT_COUNT(fix(raft): retention GC for fsm snapshots + tunable snapshot trigger #589) andELASTICKV_RAFT_MAX_WAL_FILES(fix(raft): retention GC for fsm snapshots + tunable snapshot trigger #589).MaxInflightMsgs256 -> 1024,MaxSizePerMsg1 MiB -> 4 MiB.main.go'sFactoryConfigwith the engine's internaldefaultMaxInflightMsg(which was already 1024 forstepChsizing per PR perf(redis): fast-path GET to avoid ~17-seek type probe #560 but was being overridden by the caller-supplied 256).Why
MaxSizePerMsg bump (the main win). Under small-entry KV workloads (Redis-style, median entry ~500 B), a 1 MiB cap per MsgApp forces one MsgApp per ~2000 entries. At 4 MiB, each MsgApp amortises ~4x more entries. Concrete effects:
MaxInflightMsgs bump (insurance). In-rack RTT is ~0.2 ms, so the replication-rate cap
MaxInflight * MaxSize / RTTis already well above the NIC at the old defaults. The bump is for burst write phases where entries arrive faster than round-trip completion, and to align with the engine-internal 1024 already used forstepCh/dispatchReportChsizing.Worst-case memory bound. Per-peer buffered memory is
MaxInflight * MaxSize= 1024 * 4 MiB = 4 GiB worst-case (was 256 * 1 MiB = 256 MiB). In practice MsgApp payloads are far smaller than the cap, so steady-state is a small fraction of this.Env vars
ELASTICKV_RAFT_MAX_INFLIGHT_MSGSELASTICKV_RAFT_MAX_SIZE_PER_MSGEnv var wins over caller-supplied
OpenConfigvalue so operators can retune a running deployment without a rebuild. Invalid values (non-numeric, zero/negative, sub-1 KiB size) log a warning and fall back to the compiled-in default. Chosen values are logged at info level at Open.Before / after
raft.Config.MaxInflightMsgsraft.Config.MaxSizePerMsgraft.Config.MaxCommittedSizePerReadyQualitative: fewer dispatch wake-ups per committed byte under small-entry workloads; deeper pipelining during write bursts. No expected change on wire bandwidth or correctness.
Test plan
go test ./internal/raftengine/etcd/... -count=1passesgo test ./... -count=1passes across all packagesgolangci-lint run ./internal/raftengine/...cleanraft.Config; invalid values fall back to defaults with a warningTestSnapshotEveryFromEnv_*/TestMaxWALFilesFromEnv_*still pass (pattern mirrored)Surprising findings
defaultMaxInflightMsginsideinternal/raftengine/etcd/engine.gohad already been bumped to 1024 (forstepChsizing) and documented as such, butmain.gowas still passing 256 at theFactoryConfigboundary. The normalizer only fills defaults when the caller passes 0/<=0, so 256 leaked straight through toraft.Config.MaxInflightMsgs. This PR fixes that drift.MaxInflightMsgs/MaxSizePerMsgwere not previously overridden anywhere else in the tree.Summary by CodeRabbit
New Features
ELASTICKV_RAFT_MAX_INFLIGHT_MSGSandELASTICKV_RAFT_MAX_SIZE_PER_MSGnow allow runtime Raft configuration tuning.Chores
Tests