From 98554e6efd584e164cdab6fd7d720ea6be77f4b9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 21:37:17 +0900 Subject: [PATCH 01/11] perf(raft): raise MaxInflightMsgs=1024, MaxSizePerMsg=4MB defaults Expose etcd/raft's MaxInflightMsgs and MaxSizePerMsg as runtime-tunable knobs via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and ELASTICKV_RAFT_MAX_SIZE_PER_MSG, following the same pattern as ELASTICKV_RAFT_SNAPSHOT_COUNT (#589) and ELASTICKV_RAFT_MAX_WAL_FILES (#589). Production defaults bump to 1024 and 4 MiB. Why: - MaxSizePerMsg 1 MiB -> 4 MiB: under small-entry KV workloads (Redis-style, median entry ~500 B) this drastically lowers the MsgApp count per committed byte. Fewer dispatcher wake-ups on the leader, fewer recv syscalls and less apply-vs-read contention on the follower. This is the real win of the change. - MaxInflightMsgs 256 -> 1024: insurance for burst write phases where entries arrive faster than round-trip completion on wide-bandwidth LAN. The in-process defaultMaxInflightMsg was already 1024 (see stepCh sizing in PR #560), but main.go was still passing 256 at the OpenConfig boundary; align the caller with the engine's internal expectation. Semantics: - Env var wins over caller-supplied cfg so operators can retune a running deployment without a rebuild. Invalid values (non-numeric, zero/negative, sub-1KiB size) log a warning and fall back to the compiled-in default rather than leak through to raft.Config. - Chosen values are logged at Open at info level so a misconfigured override is visible in the boot log. - MaxSizePerMsg is parsed as plain integer bytes for consistency with the other numeric knobs in this package. Tests cover: defaults when unset, valid override propagation into raft.Config, invalid values falling back to defaults. --- internal/raftengine/etcd/engine.go | 99 +++++++++++++++++++++-- internal/raftengine/etcd/engine_test.go | 101 ++++++++++++++++++++++++ main.go | 11 ++- 3 files changed, 204 insertions(+), 7 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 2a8ccc4d..b0ca4ba9 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -46,13 +46,37 @@ const ( // stepCh on followers filling up while their event loop was held // up by adapter-side pebble seek storms (PRs #560, #562, #563, // #565 removed most of that CPU); 1024 is a 4× safety margin. - // Note that with the current defaultMaxSizePerMsg of 1 MiB, the - // true worst-case bound can be much larger (up to roughly 1 GiB + // Note that with the current defaultMaxSizePerMsg of 4 MiB, the + // true worst-case bound can be much larger (up to roughly 4 GiB // per peer if every slot held a max-sized message). In practice, // typical MsgApp payloads are far smaller, so expected steady-state // memory remains much lower than that worst-case bound. + // + // Operators can override both knobs at runtime without a rebuild + // via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. defaultMaxInflightMsg = 1024 - defaultMaxSizePerMsg = 1 << 20 + // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. + // Raised from 1 MiB → 4 MiB so each MsgApp amortises more entries + // under small-entry workloads (Redis-style KV, median entry ~500 B). + // Fewer MsgApps per committed byte means fewer dispatcher wake-ups + // on the leader and fewer recv syscalls on the follower; the + // follower's apply loop also contends less with the read path. + defaultMaxSizePerMsg = 4 << 20 + // maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the + // Raft-level flow-control knobs without a rebuild. Parsed once at + // Open and passed through normalizeLimitConfig; invalid values fall + // back to the defaults with a warning. MaxSizePerMsg is expressed + // as an integer byte count for consistency with the other numeric + // knobs in this package. + maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS" + maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG" + // minMaxSizePerMsg is the lower bound accepted from the environment + // override. A payload cap below ~1 KiB makes MsgApp batching + // degenerate (one entry per message) which defeats the whole point + // of the knob; clamp to this floor rather than rejecting so that a + // fat-fingered operator doesn't take out the engine. + minMaxSizePerMsg uint64 = 1 << 10 // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. // It carries low-frequency control traffic: heartbeats, votes, read-index, // leader-transfer, and their corresponding response messages @@ -142,13 +166,21 @@ type OpenConfig struct { ElectionTick int HeartbeatTick int StateMachine StateMachine + // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level + // flow control). Default: 4 MiB. Larger values amortise more entries per + // MsgApp under small-entry workloads; smaller values tighten worst-case + // memory. Operators can override at runtime via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a + // rebuild; the env var takes precedence over the caller-supplied value. MaxSizePerMsg uint64 // MaxInflightMsg controls how many MsgApp messages Raft may have in-flight // per peer before waiting for an acknowledgement (Raft-level flow control). // It also sets the per-peer dispatch channel capacity, so total buffered // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). - // Default: 256. Increase for deeper pipelining on high-bandwidth links; - // lower in memory-constrained clusters. + // Default: 1024. Increase for deeper pipelining on high-bandwidth links; + // lower in memory-constrained clusters. Operators can override at + // runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; the + // env var takes precedence over the caller-supplied value. MaxInflightMsg int } @@ -2588,6 +2620,21 @@ func normalizeLimitConfig(cfg OpenConfig) OpenConfig { if cfg.MaxSizePerMsg == 0 { cfg.MaxSizePerMsg = defaultMaxSizePerMsg } + // Env overrides win over caller-supplied values so that operators can + // retune replication flow-control without a rebuild. This mirrors the + // behaviour of ELASTICKV_RAFT_SNAPSHOT_COUNT and + // ELASTICKV_RAFT_MAX_WAL_FILES. Invalid values fall back to the + // compiled-in defaults with a warning. + if v, ok := maxInflightMsgFromEnv(); ok { + cfg.MaxInflightMsg = v + } + if v, ok := maxSizePerMsgFromEnv(); ok { + cfg.MaxSizePerMsg = v + } + slog.Info("etcd raft engine: message size limits", + "max_inflight_msgs", cfg.MaxInflightMsg, + "max_size_per_msg_bytes", cfg.MaxSizePerMsg, + ) return cfg } @@ -2845,6 +2892,48 @@ func snapshotEveryFromEnv() uint64 { return n } +// maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns +// (value, true) when the env var is set to a valid positive integer. +// Returns (0, false) when the var is unset so the caller can keep the +// existing cfg.MaxInflightMsg (which normalizeLimitConfig has already +// defaulted to defaultMaxInflightMsg). Invalid values (non-numeric, +// negative, zero) are logged at warn level and treated as unset so the +// engine falls back to the compiled-in default rather than an obviously +// broken override. +func maxInflightMsgFromEnv() (int, bool) { + v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) + if v == "" { + return 0, false + } + n, err := strconv.Atoi(v) + if err != nil || n < 1 { + slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", + "value", v, "default", defaultMaxInflightMsg) + return 0, false + } + return n, true +} + +// maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain +// integer byte count. Returns (value, true) when the env var is set to a +// valid integer >= minMaxSizePerMsg (1 KiB). Returns (0, false) when the +// var is unset so normalizeLimitConfig can keep its earlier default. +// Invalid or too-small values fall back to the compiled-in default with +// a warning; a sub-KiB cap would make MsgApp batching degenerate. +func maxSizePerMsgFromEnv() (uint64, bool) { + v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) + if v == "" { + return 0, false + } + n, err := strconv.ParseUint(v, 10, 64) + if err != nil || n < minMaxSizePerMsg { + slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", + "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + return 0, false + } + return n, true +} + // dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has // been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value // is parsed with strconv.ParseBool, which accepts the standard tokens diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 3e0119a4..0376425b 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1841,3 +1841,104 @@ func TestDispatcherLanesEnabledFromEnv(t *testing.T) { require.Equalf(t, c.want, dispatcherLanesEnabledFromEnv(), "env=%q", c.val) } } + +// TestMaxInflightMsgFromEnv_Unset pins the "no env var => caller wins" +// contract of maxInflightMsgFromEnv. normalizeLimitConfig relies on the +// second return to decide whether to overwrite the caller-supplied value. +func TestMaxInflightMsgFromEnv_Unset(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + n, ok := maxInflightMsgFromEnv() + require.False(t, ok) + require.Equal(t, 0, n) +} + +// TestMaxInflightMsgFromEnv_ReadsOverride pins that a valid positive +// integer is parsed and surfaced to the caller verbatim. +func TestMaxInflightMsgFromEnv_ReadsOverride(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + n, ok := maxInflightMsgFromEnv() + require.True(t, ok) + require.Equal(t, 2048, n) +} + +// TestMaxInflightMsgFromEnv_FallsBackOnInvalid pins the safety behaviour: +// a non-numeric, zero, or negative value is refused and the caller is +// told to keep the compiled-in default rather than get a broken override. +func TestMaxInflightMsgFromEnv_FallsBackOnInvalid(t *testing.T) { + cases := []string{"not-a-number", "0", "-3"} + for _, v := range cases { + t.Setenv(maxInflightMsgEnvVar, v) + n, ok := maxInflightMsgFromEnv() + require.Falsef(t, ok, "env=%q", v) + require.Equalf(t, 0, n, "env=%q", v) + } +} + +// TestMaxSizePerMsgFromEnv_Unset pins the "no env var => caller wins" +// contract, symmetric with maxInflightMsgFromEnv above. +func TestMaxSizePerMsgFromEnv_Unset(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, "") + n, ok := maxSizePerMsgFromEnv() + require.False(t, ok) + require.Equal(t, uint64(0), n) +} + +// TestMaxSizePerMsgFromEnv_ReadsOverride pins that a valid byte count +// >= minMaxSizePerMsg is accepted and surfaced to the caller verbatim. +func TestMaxSizePerMsgFromEnv_ReadsOverride(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, "8388608") // 8 MiB + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, uint64(8388608), n) +} + +// TestMaxSizePerMsgFromEnv_FallsBackOnInvalid covers the three failure +// modes: non-numeric, zero, and below-floor. The floor is minMaxSizePerMsg +// (1 KiB) — a smaller cap would make MsgApp batching degenerate. +func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { + cases := []string{"not-a-number", "0", "512"} + for _, v := range cases { + t.Setenv(maxSizePerMsgEnvVar, v) + n, ok := maxSizePerMsgFromEnv() + require.Falsef(t, ok, "env=%q", v) + require.Equalf(t, uint64(0), n, "env=%q", v) + } +} + +// TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults +// that reach raft.Config when neither the caller nor the operator has +// overridden them: 1024 inflight msgs and 4 MiB per msg. +func TestNormalizeLimitConfig_DefaultsWhenUnset(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{}) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) + require.Equal(t, 1024, got.MaxInflightMsg) + require.Equal(t, uint64(4<<20), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_EnvOverridesCaller pins that a valid env var +// takes precedence over any caller-supplied cfg value. This is how an +// operator retunes a running deployment without a rebuild. +func TestNormalizeLimitConfig_EnvOverridesCaller(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + t.Setenv(maxSizePerMsgEnvVar, "8388608") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 256, + MaxSizePerMsg: 1 << 20, + }) + require.Equal(t, 2048, got.MaxInflightMsg) + require.Equal(t, uint64(8388608), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault pins that a +// malformed env override does NOT leak through to raft.Config; the +// caller-supplied defaults remain in effect. +func TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "not-a-number") + t.Setenv(maxSizePerMsgEnvVar, "-1") + got := normalizeLimitConfig(OpenConfig{}) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} diff --git a/main.go b/main.go index 484869c3..e3270f64 100644 --- a/main.go +++ b/main.go @@ -38,8 +38,15 @@ const ( etcdTickInterval = 10 * time.Millisecond etcdHeartbeatMinTicks = 1 etcdElectionMinTicks = 2 - etcdMaxSizePerMsg = 1 << 20 - etcdMaxInflightMsg = 256 + // etcdMaxSizePerMsg caps bytes per MsgApp. 4 MiB reduces MsgApp count + // per committed byte under small-entry KV workloads, cutting dispatcher + // wake-ups on the leader and recv syscalls on the follower. Operators + // can override via ELASTICKV_RAFT_MAX_SIZE_PER_MSG at runtime. + etcdMaxSizePerMsg = 4 << 20 + // etcdMaxInflightMsg caps in-flight MsgApps per peer. 1024 gives deeper + // pipelining on wide-bandwidth LAN during write bursts. Operators can + // override via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at runtime. + etcdMaxInflightMsg = 1024 ) func newRaftFactory(engineType raftEngineType) (raftengine.Factory, error) { From bb6ff01ff8c5d6f22a738a7b09a7fb239d1fe4f8 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:13:55 +0900 Subject: [PATCH 02/11] fix(raft): apply MaxInflightMsg default on invalid env override MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The maxInflightMsgFromEnv / maxSizePerMsgFromEnv helpers logged "using default" when the env var was malformed, but returned (0, false) — telling normalizeLimitConfig to keep whatever the caller supplied. A caller passing MaxInflightMsg=256 plus a garbage env var would silently retain 256, making the warning log a lie. Return (defaultMaxInflightMsg, true) and (defaultMaxSizePerMsg, true) on invalid input so the default the operator-visible warning promises is actually the value that reaches raft.Config. Addresses gemini review feedback on #593. --- internal/raftengine/etcd/engine.go | 15 ++++++---- internal/raftengine/etcd/engine_test.go | 39 ++++++++++++++++++++----- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index b0ca4ba9..3d90c90c 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2897,9 +2897,10 @@ func snapshotEveryFromEnv() uint64 { // Returns (0, false) when the var is unset so the caller can keep the // existing cfg.MaxInflightMsg (which normalizeLimitConfig has already // defaulted to defaultMaxInflightMsg). Invalid values (non-numeric, -// negative, zero) are logged at warn level and treated as unset so the -// engine falls back to the compiled-in default rather than an obviously -// broken override. +// negative, zero) are logged at warn level and return +// (defaultMaxInflightMsg, true) so the engine actually applies the +// compiled-in default the log message promises — otherwise a malformed +// env var would silently let an unrelated caller-supplied value win. func maxInflightMsgFromEnv() (int, bool) { v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) if v == "" { @@ -2909,7 +2910,7 @@ func maxInflightMsgFromEnv() (int, bool) { if err != nil || n < 1 { slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", "value", v, "default", defaultMaxInflightMsg) - return 0, false + return defaultMaxInflightMsg, true } return n, true } @@ -2919,7 +2920,9 @@ func maxInflightMsgFromEnv() (int, bool) { // valid integer >= minMaxSizePerMsg (1 KiB). Returns (0, false) when the // var is unset so normalizeLimitConfig can keep its earlier default. // Invalid or too-small values fall back to the compiled-in default with -// a warning; a sub-KiB cap would make MsgApp batching degenerate. +// a warning and return (defaultMaxSizePerMsg, true) so the override +// actually applies the default the warning promises; a sub-KiB cap +// would make MsgApp batching degenerate. func maxSizePerMsgFromEnv() (uint64, bool) { v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) if v == "" { @@ -2929,7 +2932,7 @@ func maxSizePerMsgFromEnv() (uint64, bool) { if err != nil || n < minMaxSizePerMsg { slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) - return 0, false + return uint64(defaultMaxSizePerMsg), true } return n, true } diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 0376425b..3308c59d 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1862,15 +1862,17 @@ func TestMaxInflightMsgFromEnv_ReadsOverride(t *testing.T) { } // TestMaxInflightMsgFromEnv_FallsBackOnInvalid pins the safety behaviour: -// a non-numeric, zero, or negative value is refused and the caller is -// told to keep the compiled-in default rather than get a broken override. +// a non-numeric, zero, or negative value is refused and the compiled-in +// default is surfaced (ok=true) so that normalizeLimitConfig actually +// applies the default the warning log promises, instead of letting a +// caller-supplied value silently win. func TestMaxInflightMsgFromEnv_FallsBackOnInvalid(t *testing.T) { cases := []string{"not-a-number", "0", "-3"} for _, v := range cases { t.Setenv(maxInflightMsgEnvVar, v) n, ok := maxInflightMsgFromEnv() - require.Falsef(t, ok, "env=%q", v) - require.Equalf(t, 0, n, "env=%q", v) + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, defaultMaxInflightMsg, n, "env=%q", v) } } @@ -1894,14 +1896,17 @@ func TestMaxSizePerMsgFromEnv_ReadsOverride(t *testing.T) { // TestMaxSizePerMsgFromEnv_FallsBackOnInvalid covers the three failure // modes: non-numeric, zero, and below-floor. The floor is minMaxSizePerMsg -// (1 KiB) — a smaller cap would make MsgApp batching degenerate. +// (1 KiB) — a smaller cap would make MsgApp batching degenerate. On +// failure the helper returns (defaultMaxSizePerMsg, true) so the caller +// actually applies the compiled-in default the log line promises, rather +// than silently letting a caller-supplied value win. func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { cases := []string{"not-a-number", "0", "512"} for _, v := range cases { t.Setenv(maxSizePerMsgEnvVar, v) n, ok := maxSizePerMsgFromEnv() - require.Falsef(t, ok, "env=%q", v) - require.Equalf(t, uint64(0), n, "env=%q", v) + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, uint64(defaultMaxSizePerMsg), n, "env=%q", v) } } @@ -1934,7 +1939,8 @@ func TestNormalizeLimitConfig_EnvOverridesCaller(t *testing.T) { // TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault pins that a // malformed env override does NOT leak through to raft.Config; the -// caller-supplied defaults remain in effect. +// compiled-in defaults are applied (even when the caller supplied a +// different value) so the operator-visible warning log matches reality. func TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "not-a-number") t.Setenv(maxSizePerMsgEnvVar, "-1") @@ -1942,3 +1948,20 @@ func TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault(t *testing.T) { require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) } + +// TestNormalizeLimitConfig_InvalidEnvOverridesCaller pins the fix for +// the "log message is a lie" gemini reviewer finding: when the env var +// is malformed, the helper warns "using default" — so the default MUST +// actually win, even if the caller supplied a non-default value. Prior +// to the fix the caller's 256 would silently survive, contradicting the +// log line. +func TestNormalizeLimitConfig_InvalidEnvOverridesCaller(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "garbage") + t.Setenv(maxSizePerMsgEnvVar, "also-garbage") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 256, + MaxSizePerMsg: 1 << 20, + }) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} From 0546484d8e22afa798e41ebb1b18e0caab655350 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:15:15 +0900 Subject: [PATCH 03/11] fix(raft): size stepCh from resolved MaxInflightMsg, not compiled default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Open() allocated stepCh and dispatchReportCh at fixed capacity defaultMaxInflightMsg (1024). When operators raised ELASTICKV_RAFT_MAX_INFLIGHT_MSGS (e.g. to 2048) the raft layer honoured the higher inflight limit, but the inbound channels stayed at 1024, so bursty replication still tripped errStepQueueFull at 1024 queued messages — silently negating the tuning knob the PR was meant to expose. Size the two channels from prepared.cfg.MaxInflightMsg (already post-normalizeLimitConfig, so env override and compiled default have been applied). A new inboundChannelCap() helper clamps to minInboundChannelCap=256 to preserve a survivable floor if a caller passes a tiny value. Add TestOpen_InboundChannelsHonourMaxInflightEnv (env=2048 -> cap=2048) and TestOpen_InboundChannelsDefaultCap (unset -> cap=1024) plus a unit test for the helper. Addresses codex P1 review feedback on #593. --- internal/raftengine/etcd/engine.go | 65 ++++++++++++++++++------- internal/raftengine/etcd/engine_test.go | 62 +++++++++++++++++++++++ 2 files changed, 109 insertions(+), 18 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 3d90c90c..e01fd436 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -56,6 +56,13 @@ const ( // via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. defaultMaxInflightMsg = 1024 + // minInboundChannelCap is the floor applied when sizing the engine's + // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. + // Even if a (misconfigured) caller drops MaxInflightMsg below this, + // we keep at least this much buffering so that a single tick burst + // doesn't trip errStepQueueFull on the inbound side. 256 matches the + // pre-#529 compiled-in default that was known to be survivable. + minInboundChannelCap = 256 // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. // Raised from 1 MiB → 4 MiB so each MsgApp amortises more entries // under small-entry workloads (Redis-style KV, median entry ~500 B). @@ -451,24 +458,33 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { }() engine := &Engine{ - nodeID: prepared.cfg.NodeID, - localID: prepared.cfg.LocalID, - localAddress: prepared.cfg.LocalAddress, - dataDir: prepared.cfg.DataDir, - fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), - tickInterval: prepared.cfg.TickInterval, - electionTick: prepared.cfg.ElectionTick, - storage: prepared.disk.Storage, - rawNode: rawNode, - persist: prepared.disk.Persist, - fsm: prepared.cfg.StateMachine, - peers: peerMap, - transport: prepared.cfg.Transport, - proposeCh: make(chan proposalRequest), - readCh: make(chan readRequest), - adminCh: make(chan adminRequest), - stepCh: make(chan raftpb.Message, defaultMaxInflightMsg), - dispatchReportCh: make(chan dispatchReport, defaultMaxInflightMsg), + nodeID: prepared.cfg.NodeID, + localID: prepared.cfg.LocalID, + localAddress: prepared.cfg.LocalAddress, + dataDir: prepared.cfg.DataDir, + fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), + tickInterval: prepared.cfg.TickInterval, + electionTick: prepared.cfg.ElectionTick, + storage: prepared.disk.Storage, + rawNode: rawNode, + persist: prepared.disk.Persist, + fsm: prepared.cfg.StateMachine, + peers: peerMap, + transport: prepared.cfg.Transport, + proposeCh: make(chan proposalRequest), + readCh: make(chan readRequest), + adminCh: make(chan adminRequest), + // Size the inbound step / dispatch-report channels from the + // resolved MaxInflightMsg (post-normalizeLimitConfig, which has + // already applied the env override and compiled-in default) so + // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above + // the default actually get the extra buffering they asked for. + // Using defaultMaxInflightMsg here would silently cap the + // channel at 1024 even when the Raft layer has been told to + // keep 2048 in flight, re-triggering errStepQueueFull under + // the exact bursty conditions this knob is meant to absorb. + stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)), + dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)), closeCh: make(chan struct{}), doneCh: make(chan struct{}), startedCh: make(chan struct{}), @@ -2613,6 +2629,19 @@ func normalizeTimingConfig(cfg OpenConfig) OpenConfig { return cfg } +// inboundChannelCap returns the capacity to use when allocating the +// engine's inbound stepCh and dispatchReportCh. It mirrors the resolved +// MaxInflightMsg but clamps to minInboundChannelCap so that a caller +// passing a tiny value doesn't shrink the buffers below a survivable +// floor. maxInflight is expected to be the post-normalizeLimitConfig +// value (compiled default or env override applied). +func inboundChannelCap(maxInflight int) int { + if maxInflight < minInboundChannelCap { + return minInboundChannelCap + } + return maxInflight +} + func normalizeLimitConfig(cfg OpenConfig) OpenConfig { if cfg.MaxInflightMsg <= 0 { cfg.MaxInflightMsg = defaultMaxInflightMsg diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 3308c59d..62708d1f 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1965,3 +1965,65 @@ func TestNormalizeLimitConfig_InvalidEnvOverridesCaller(t *testing.T) { require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) } + +// TestInboundChannelCap verifies the floor/passthrough behaviour of the +// stepCh / dispatchReportCh sizing helper: the resolved MaxInflightMsg +// drives capacity, but never below minInboundChannelCap. +func TestInboundChannelCap(t *testing.T) { + require.Equal(t, minInboundChannelCap, inboundChannelCap(0)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(1)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(minInboundChannelCap-1)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(minInboundChannelCap)) + require.Equal(t, 1024, inboundChannelCap(1024)) + require.Equal(t, 2048, inboundChannelCap(2048)) +} + +// TestOpen_InboundChannelsHonourMaxInflightEnv pins the codex P1 fix: +// when ELASTICKV_RAFT_MAX_INFLIGHT_MSGS is raised above the compiled-in +// default, the engine's inbound stepCh and dispatchReportCh must be +// sized from the override, not the default. Previously these were hard- +// wired to defaultMaxInflightMsg (1024), so a 2048 override would still +// hit errStepQueueFull at 1024 inbound messages, silently defeating the +// whole tuning knob. +func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + fsm := &testStateMachine{} + engine, err := Open(context.Background(), OpenConfig{ + NodeID: 1, + LocalID: "n1", + LocalAddress: "127.0.0.1:7001", + DataDir: t.TempDir(), + Bootstrap: true, + StateMachine: fsm, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) + require.Equal(t, 2048, cap(engine.stepCh), + "stepCh capacity must reflect the env-overridden MaxInflightMsg") + require.Equal(t, 2048, cap(engine.dispatchReportCh), + "dispatchReportCh capacity must reflect the env-overridden MaxInflightMsg") +} + +// TestOpen_InboundChannelsDefaultCap pins that with no env override the +// inbound channels are sized from the compiled-in default (1024), the +// current production value. +func TestOpen_InboundChannelsDefaultCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + fsm := &testStateMachine{} + engine, err := Open(context.Background(), OpenConfig{ + NodeID: 1, + LocalID: "n1", + LocalAddress: "127.0.0.1:7001", + DataDir: t.TempDir(), + Bootstrap: true, + StateMachine: fsm, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) + require.Equal(t, defaultMaxInflightMsg, cap(engine.stepCh)) + require.Equal(t, defaultMaxInflightMsg, cap(engine.dispatchReportCh)) +} From ca6a415103dc60fe865df383919d627f6481d8cc Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:46:23 +0900 Subject: [PATCH 04/11] docs(raft): correct clamp wording in minMaxSizePerMsg comment The comment on minMaxSizePerMsg said the override clamps to the floor, but maxSizePerMsgFromEnv actually falls back to defaultMaxSizePerMsg when the env value is below the floor (or unparseable). Clamp usually implies using the floor itself, so the doc misled readers about the behaviour. Rewrite the sentence to match the implementation: fall back to the default rather than rejecting, so a fat-fingered operator doesn't take out the engine. No code change. --- internal/raftengine/etcd/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index e01fd436..a50670e3 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -81,7 +81,7 @@ const ( // minMaxSizePerMsg is the lower bound accepted from the environment // override. A payload cap below ~1 KiB makes MsgApp batching // degenerate (one entry per message) which defeats the whole point - // of the knob; clamp to this floor rather than rejecting so that a + // of the knob; fall back to the default rather than rejecting so that a // fat-fingered operator doesn't take out the engine. minMaxSizePerMsg uint64 = 1 << 10 // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. From a248131afe3b9aef1ad65a988a7fdf8fec1bddf6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 23 Apr 2026 23:50:14 +0900 Subject: [PATCH 05/11] perf(raft): lower default MaxInflightMsgs 1024 -> 512 to cap worst-case memory at 2 GiB/peer Gemini reviewer flagged that 1024 inflight x 4 MiB per message is a 4 GiB per-peer worst-case buffered footprint. With N peers and bursty replication traffic, the aggregate can plausibly reach OOM territory on general-purpose deployments where the operator has not tuned the knob to the workload. In practice TCP backpressure on a typical LAN will cap the actually buffered bytes well below the product, but the reviewer's framing is reasonable: a compiled-in default should be conservative. Take option B from the gemini response matrix: keep MaxSizePerMsg at 4 MiB (the MsgApp-batching win that motivates this PR's narrative on small-entry KV workloads) and drop MaxInflightMsgs back to 512. This halves the worst-case product to 2 GiB per peer while preserving the batching improvement. 512 is still a 2x safety margin over the pre-#529 default of 256 that we observed filling up on followers under the pebble-seek-storm incident (PRs #560/#562/#563/#565 have since removed most of that CPU contention, so the deeper pipelining of 1024 was motivated more by belt-and-suspenders than measured need). Operators who need deeper pipelines (wide-bandwidth LAN, plenty of RAM) can still raise the knob via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators who need a tighter memory budget can lower MaxSizePerMsg via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. Update the in-code rationale comment, the caller-side constant in main.go (so the engine default is actually exercised when no env override is set), and the TestNormalizeLimitConfig_DefaultsWhenUnset pin from 1024 -> 512. Drop the stale "(1024)" parenthetical in the TestOpen_InboundChannelsHonourMaxInflightEnv docblock. --- internal/raftengine/etcd/engine.go | 24 +++++++++++++----------- internal/raftengine/etcd/engine_test.go | 14 ++++++++------ main.go | 13 +++++++++---- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index a50670e3..de37819f 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -40,22 +40,24 @@ const ( // queue. Total buffered memory is bounded by // O(numPeers × MaxInflightMsg × avgMsgSize). // - // Raised from 256 → 1024 to absorb short CPU bursts without forcing + // Raised from 256 → 512 to absorb short CPU bursts without forcing // peers to reject with "etcd raft inbound step queue is full". // Under production congestion we observed the 256-slot inbound // stepCh on followers filling up while their event loop was held // up by adapter-side pebble seek storms (PRs #560, #562, #563, - // #565 removed most of that CPU); 1024 is a 4× safety margin. - // Note that with the current defaultMaxSizePerMsg of 4 MiB, the - // true worst-case bound can be much larger (up to roughly 4 GiB - // per peer if every slot held a max-sized message). In practice, - // typical MsgApp payloads are far smaller, so expected steady-state - // memory remains much lower than that worst-case bound. + // #565 removed most of that CPU); 512 is a 2× safety margin. // - // Operators can override both knobs at runtime without a rebuild - // via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS and - // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. - defaultMaxInflightMsg = 1024 + // We intentionally do NOT raise this in lock-step with the 4 MiB + // defaultMaxSizePerMsg bump: the two knobs multiply, and 1024 × + // 4 MiB is a 4 GiB per-peer worst-case product that a bursty + // multi-peer deployment can plausibly realise under TCP backpressure + // loss. 512 × 4 MiB halves that to 2 GiB per peer while preserving + // the MsgApp-batching win that motivates the 4 MiB cap on small-entry + // workloads. Operators who need deeper pipelines (large clusters with + // plenty of RAM) can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS + // without a rebuild; operators who need a smaller memory ceiling can + // lower MaxSizePerMsg via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + defaultMaxInflightMsg = 512 // minInboundChannelCap is the floor applied when sizing the engine's // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. // Even if a (misconfigured) caller drops MaxInflightMsg below this, diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 62708d1f..aefd1a2e 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1912,14 +1912,16 @@ func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { // TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults // that reach raft.Config when neither the caller nor the operator has -// overridden them: 1024 inflight msgs and 4 MiB per msg. +// overridden them: 512 inflight msgs and 4 MiB per msg. The combination +// bounds worst-case per-peer buffered Raft traffic at 2 GiB (512 × 4 MiB); +// see defaultMaxInflightMsg for the memory-footprint rationale. func TestNormalizeLimitConfig_DefaultsWhenUnset(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "") t.Setenv(maxSizePerMsgEnvVar, "") got := normalizeLimitConfig(OpenConfig{}) require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) - require.Equal(t, 1024, got.MaxInflightMsg) + require.Equal(t, 512, got.MaxInflightMsg) require.Equal(t, uint64(4<<20), got.MaxSizePerMsg) } @@ -1982,9 +1984,9 @@ func TestInboundChannelCap(t *testing.T) { // when ELASTICKV_RAFT_MAX_INFLIGHT_MSGS is raised above the compiled-in // default, the engine's inbound stepCh and dispatchReportCh must be // sized from the override, not the default. Previously these were hard- -// wired to defaultMaxInflightMsg (1024), so a 2048 override would still -// hit errStepQueueFull at 1024 inbound messages, silently defeating the -// whole tuning knob. +// wired to defaultMaxInflightMsg, so a larger override would still hit +// errStepQueueFull at the compiled-in cap, silently defeating the whole +// tuning knob. func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "2048") fsm := &testStateMachine{} @@ -2007,7 +2009,7 @@ func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { } // TestOpen_InboundChannelsDefaultCap pins that with no env override the -// inbound channels are sized from the compiled-in default (1024), the +// inbound channels are sized from the compiled-in default (512), the // current production value. func TestOpen_InboundChannelsDefaultCap(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "") diff --git a/main.go b/main.go index e3270f64..409f0d89 100644 --- a/main.go +++ b/main.go @@ -43,10 +43,15 @@ const ( // wake-ups on the leader and recv syscalls on the follower. Operators // can override via ELASTICKV_RAFT_MAX_SIZE_PER_MSG at runtime. etcdMaxSizePerMsg = 4 << 20 - // etcdMaxInflightMsg caps in-flight MsgApps per peer. 1024 gives deeper - // pipelining on wide-bandwidth LAN during write bursts. Operators can - // override via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at runtime. - etcdMaxInflightMsg = 1024 + // etcdMaxInflightMsg caps in-flight MsgApps per peer. 512 gives a 2x + // safety margin over the pre-#529 default of 256 to absorb short CPU + // bursts, without letting the per-peer worst-case buffered memory + // (MaxInflightMsg × MaxSizePerMsg) grow beyond 2 GiB on the current + // 4 MiB MaxSizePerMsg. Operators with wide-bandwidth LAN clusters and + // headroom can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at + // runtime; operators with tighter memory budgets can lower the byte + // cap via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + etcdMaxInflightMsg = 512 ) func newRaftFactory(engineType raftEngineType) (raftengine.Factory, error) { From 4edd02d5c0d1fa7e730baa1dad64d0ff41ba745d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 01:14:32 +0900 Subject: [PATCH 06/11] perf(raft): lower MaxSizePerMsg default to 2 MiB to cap per-peer worst case at 1 GiB --- internal/raftengine/etcd/engine.go | 40 +++++++++++++++++-------- internal/raftengine/etcd/engine_test.go | 6 ++-- main.go | 25 +++++++++------- 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index de37819f..189ab200 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -47,16 +47,20 @@ const ( // up by adapter-side pebble seek storms (PRs #560, #562, #563, // #565 removed most of that CPU); 512 is a 2× safety margin. // - // We intentionally do NOT raise this in lock-step with the 4 MiB - // defaultMaxSizePerMsg bump: the two knobs multiply, and 1024 × - // 4 MiB is a 4 GiB per-peer worst-case product that a bursty - // multi-peer deployment can plausibly realise under TCP backpressure - // loss. 512 × 4 MiB halves that to 2 GiB per peer while preserving - // the MsgApp-batching win that motivates the 4 MiB cap on small-entry - // workloads. Operators who need deeper pipelines (large clusters with - // plenty of RAM) can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS - // without a rebuild; operators who need a smaller memory ceiling can - // lower MaxSizePerMsg via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + // We intentionally do NOT raise this in lock-step with the 2 MiB + // defaultMaxSizePerMsg: the two knobs multiply, and 1024 × 2 MiB + // is a 2 GiB per-peer worst-case product that a bursty multi-peer + // deployment can plausibly realise under TCP backpressure loss. + // 512 × 2 MiB halves that to 1 GiB per peer (4 GiB on a 5-node + // leader with 4 followers), which fits comfortably inside the + // 4–16 GiB RAM envelope of typical elastickv nodes while still + // preserving the MsgApp-batching win that motivates raising the + // byte cap above etcd/raft's 1 MiB upstream default on small-entry + // workloads. Operators who need deeper pipelines (large clusters + // with plenty of RAM) can raise this via + // ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators + // who need a smaller memory ceiling can lower MaxSizePerMsg via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. defaultMaxInflightMsg = 512 // minInboundChannelCap is the floor applied when sizing the engine's // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. @@ -66,12 +70,22 @@ const ( // pre-#529 compiled-in default that was known to be survivable. minInboundChannelCap = 256 // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. - // Raised from 1 MiB → 4 MiB so each MsgApp amortises more entries - // under small-entry workloads (Redis-style KV, median entry ~500 B). + // Set to 2 MiB — double etcd/raft's 1 MiB upstream default — so each + // MsgApp amortises more entries under small-entry workloads + // (Redis-style KV, median entry ~500 B; 2 MiB / 500 B ≈ 4000 entries + // per MsgApp already saturates the per-RPC batching benefit). // Fewer MsgApps per committed byte means fewer dispatcher wake-ups // on the leader and fewer recv syscalls on the follower; the // follower's apply loop also contends less with the read path. - defaultMaxSizePerMsg = 4 << 20 + // + // Lowered from 4 MiB → 2 MiB in tandem with defaultMaxInflightMsg=512 + // to cap per-peer worst-case buffered Raft traffic at 1 GiB + // (512 × 2 MiB), i.e. 4 GiB on a 5-node leader with 4 followers. + // The previous 4 MiB cap produced a 2 GiB/peer, 8 GiB/leader + // worst case that was too tight for the 4–16 GiB RAM envelope + // typical elastickv nodes operate in; the batching win of 4 MiB + // over 2 MiB is marginal on small-entry workloads. + defaultMaxSizePerMsg = 2 << 20 // maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the // Raft-level flow-control knobs without a rebuild. Parsed once at // Open and passed through normalizeLimitConfig; invalid values fall diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index aefd1a2e..1cc98ab6 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1912,8 +1912,8 @@ func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { // TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults // that reach raft.Config when neither the caller nor the operator has -// overridden them: 512 inflight msgs and 4 MiB per msg. The combination -// bounds worst-case per-peer buffered Raft traffic at 2 GiB (512 × 4 MiB); +// overridden them: 512 inflight msgs and 2 MiB per msg. The combination +// bounds worst-case per-peer buffered Raft traffic at 1 GiB (512 × 2 MiB); // see defaultMaxInflightMsg for the memory-footprint rationale. func TestNormalizeLimitConfig_DefaultsWhenUnset(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "") @@ -1922,7 +1922,7 @@ func TestNormalizeLimitConfig_DefaultsWhenUnset(t *testing.T) { require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) require.Equal(t, 512, got.MaxInflightMsg) - require.Equal(t, uint64(4<<20), got.MaxSizePerMsg) + require.Equal(t, uint64(2<<20), got.MaxSizePerMsg) } // TestNormalizeLimitConfig_EnvOverridesCaller pins that a valid env var diff --git a/main.go b/main.go index 409f0d89..f52feca6 100644 --- a/main.go +++ b/main.go @@ -38,19 +38,24 @@ const ( etcdTickInterval = 10 * time.Millisecond etcdHeartbeatMinTicks = 1 etcdElectionMinTicks = 2 - // etcdMaxSizePerMsg caps bytes per MsgApp. 4 MiB reduces MsgApp count - // per committed byte under small-entry KV workloads, cutting dispatcher - // wake-ups on the leader and recv syscalls on the follower. Operators - // can override via ELASTICKV_RAFT_MAX_SIZE_PER_MSG at runtime. - etcdMaxSizePerMsg = 4 << 20 + // etcdMaxSizePerMsg caps bytes per MsgApp. 2 MiB (double etcd/raft's + // 1 MiB upstream default) reduces MsgApp count per committed byte + // under small-entry KV workloads, cutting dispatcher wake-ups on the + // leader and recv syscalls on the follower, while keeping the + // per-peer worst-case buffered memory (MaxInflightMsg × MaxSizePerMsg + // = 512 × 2 MiB = 1 GiB) inside the RAM envelope of typical + // elastickv nodes (4–16 GiB). Operators can override via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG at runtime. + etcdMaxSizePerMsg = 2 << 20 // etcdMaxInflightMsg caps in-flight MsgApps per peer. 512 gives a 2x // safety margin over the pre-#529 default of 256 to absorb short CPU // bursts, without letting the per-peer worst-case buffered memory - // (MaxInflightMsg × MaxSizePerMsg) grow beyond 2 GiB on the current - // 4 MiB MaxSizePerMsg. Operators with wide-bandwidth LAN clusters and - // headroom can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at - // runtime; operators with tighter memory budgets can lower the byte - // cap via ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + // (MaxInflightMsg × MaxSizePerMsg) grow beyond 1 GiB on the current + // 2 MiB MaxSizePerMsg — i.e. 4 GiB on a 5-node leader with 4 + // followers. Operators with wide-bandwidth LAN clusters and headroom + // can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at runtime; + // operators with tighter memory budgets can lower the byte cap via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. etcdMaxInflightMsg = 512 ) From b5c5298a21f6aafe99776aa2e9a9223d321d08bb Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 02:11:46 +0900 Subject: [PATCH 07/11] docs(raft): sync OpenConfig default annotations with actual values --- internal/raftengine/etcd/engine.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 189ab200..a16f8fdf 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -190,9 +190,9 @@ type OpenConfig struct { HeartbeatTick int StateMachine StateMachine // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level - // flow control). Default: 4 MiB. Larger values amortise more entries per - // MsgApp under small-entry workloads; smaller values tighten worst-case - // memory. Operators can override at runtime via + // flow control). Default: 2 MiB (see defaultMaxSizePerMsg). Larger values + // amortise more entries per MsgApp under small-entry workloads; smaller + // values tighten worst-case memory. Operators can override at runtime via // ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a // rebuild; the env var takes precedence over the caller-supplied value. MaxSizePerMsg uint64 @@ -200,10 +200,10 @@ type OpenConfig struct { // per peer before waiting for an acknowledgement (Raft-level flow control). // It also sets the per-peer dispatch channel capacity, so total buffered // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). - // Default: 1024. Increase for deeper pipelining on high-bandwidth links; - // lower in memory-constrained clusters. Operators can override at - // runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; the - // env var takes precedence over the caller-supplied value. + // Default: 512 (see defaultMaxInflightMsg). Increase for deeper pipelining + // on high-bandwidth links; lower in memory-constrained clusters. Operators + // can override at runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a + // rebuild; the env var takes precedence over the caller-supplied value. MaxInflightMsg int } From d60cdcffb22cf3c36662e3f0cc91e229d90e2ef9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 06:13:51 +0900 Subject: [PATCH 08/11] fix(raft): cap MaxInflight/MaxSizePerMsg env overrides + test hygiene Codex P2s on the env-tuning knobs: maxInflightMsgFromEnv accepted any positive integer and maxSizePerMsgFromEnv accepted anything >= 1 KiB, so an operator typo (e.g. ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000) could either allocate multi-GB of channel memory at Open() and crash the process, or force MsgApp payloads larger than the gRPC transport budget (internal.GRPCMaxMessageBytes = 64 MiB). Both are now clamped to the compiled-in default with a warning. - maxMaxInflightMsg (8192) gates the inflight env override. stepCh, dispatchReportCh, and every per-peer dispatch queue size from this value, so the upper bound is load-bearing for startup memory. - maxMaxSizePerMsg (64 MiB) equals GRPCMaxMessageBytes and is pinned by a new test so a transport-budget change that does not update the Raft cap fails CI instead of silently allowing unsendable payloads. - Clamp boundary tests (at cap and cap+1) catch off-by-one regressions. Also addresses CodeRabbit minors flagged on the PR: - Clear ELASTICKV_RAFT_MAX_SIZE_PER_MSG in TestOpen_InboundChannels* to avoid inheriting ambient shell env from the operator running the suite locally. - Replace the stale "1024" inflight-cap comment in Open() (now 512). The upper-bound fallback matches the existing lower-bound fallback policy: return (default, true) so normalizeLimitConfig actually applies the compiled-in default the warning log promises, rather than letting a caller-supplied value silently win. --- internal/raftengine/etcd/engine.go | 71 ++++++++++++++++++++----- internal/raftengine/etcd/engine_test.go | 70 ++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 13 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index a16f8fdf..24a1a789 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -100,6 +100,29 @@ const ( // of the knob; fall back to the default rather than rejecting so that a // fat-fingered operator doesn't take out the engine. minMaxSizePerMsg uint64 = 1 << 10 + // maxMaxInflightMsg caps the environment override for + // MaxInflightMsg. Open() uses the resolved value to allocate + // stepCh, dispatchReportCh, and every per-peer dispatch queue, so + // a fat-fingered ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=1e8 triggers + // multi-GB channel allocations and crashes the process before the + // node even becomes healthy. 8192 is ~16× the compiled-in default + // (512) — plenty of headroom for pipelining experiments, and well + // below the point where channel allocation alone would OOM a + // 32 GiB runner. Values above this clamp back to the default with + // a warning (same policy as sub-1 values) so a misconfigured + // operator never hard-breaks startup. + maxMaxInflightMsg = 8192 + // maxMaxSizePerMsg caps the environment override for + // MaxSizePerMsg at the Raft transport's per-message budget. The + // server- and dial-side gRPC options (see internal.GRPCMaxMessageBytes) + // reject frames larger than 64 MiB, so a MaxSizePerMsg above that + // makes Raft emit MsgApp payloads the transport physically cannot + // carry, producing repeated send failures / unreachable reports + // under large batches. Keeping this equal to GRPCMaxMessageBytes + // makes the transport budget the single source of truth — raise + // BOTH together, never this one alone. Values above this clamp + // back to the default with a warning. + maxMaxSizePerMsg uint64 = 64 << 20 // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. // It carries low-frequency control traffic: heartbeats, votes, read-index, // leader-transfer, and their corresponding response messages @@ -496,9 +519,10 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above // the default actually get the extra buffering they asked for. // Using defaultMaxInflightMsg here would silently cap the - // channel at 1024 even when the Raft layer has been told to - // keep 2048 in flight, re-triggering errStepQueueFull under - // the exact bursty conditions this knob is meant to absorb. + // channel at 512 (the current compiled-in default) even when + // the Raft layer has been told to keep 2048 in flight, + // re-triggering errStepQueueFull under the exact bursty + // conditions this knob is meant to absorb. stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)), dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)), closeCh: make(chan struct{}), @@ -2938,14 +2962,21 @@ func snapshotEveryFromEnv() uint64 { } // maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns -// (value, true) when the env var is set to a valid positive integer. -// Returns (0, false) when the var is unset so the caller can keep the -// existing cfg.MaxInflightMsg (which normalizeLimitConfig has already -// defaulted to defaultMaxInflightMsg). Invalid values (non-numeric, -// negative, zero) are logged at warn level and return +// (value, true) when the env var is set to a valid integer in +// [1, maxMaxInflightMsg]. Returns (0, false) when the var is unset so +// the caller can keep the existing cfg.MaxInflightMsg (which +// normalizeLimitConfig has already defaulted to defaultMaxInflightMsg). +// Invalid values (non-numeric, negative, zero, or above +// maxMaxInflightMsg) are logged at warn level and return // (defaultMaxInflightMsg, true) so the engine actually applies the // compiled-in default the log message promises — otherwise a malformed // env var would silently let an unrelated caller-supplied value win. +// The upper cap is load-bearing: Open() allocates stepCh, +// dispatchReportCh, and every per-peer dispatch queue from this +// value, so a fat-fingered `ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000` +// would try to allocate multi-GB of channel memory before the node +// becomes healthy. Clamping to the default preserves operator intent +// ("I asked for a larger pipeline") far better than crashing startup. func maxInflightMsgFromEnv() (int, bool) { v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) if v == "" { @@ -2957,17 +2988,26 @@ func maxInflightMsgFromEnv() (int, bool) { "value", v, "default", defaultMaxInflightMsg) return defaultMaxInflightMsg, true } + if n > maxMaxInflightMsg { + slog.Warn("ELASTICKV_RAFT_MAX_INFLIGHT_MSGS exceeds safe cap; using default", + "value", v, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) + return defaultMaxInflightMsg, true + } return n, true } // maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain // integer byte count. Returns (value, true) when the env var is set to a -// valid integer >= minMaxSizePerMsg (1 KiB). Returns (0, false) when the -// var is unset so normalizeLimitConfig can keep its earlier default. -// Invalid or too-small values fall back to the compiled-in default with -// a warning and return (defaultMaxSizePerMsg, true) so the override +// valid integer in [minMaxSizePerMsg, maxMaxSizePerMsg] +// (1 KiB .. 64 MiB). Returns (0, false) when the var is unset so +// normalizeLimitConfig can keep its earlier default. Invalid, too-small, +// or too-large values fall back to the compiled-in default with a +// warning and return (defaultMaxSizePerMsg, true) so the override // actually applies the default the warning promises; a sub-KiB cap -// would make MsgApp batching degenerate. +// would make MsgApp batching degenerate, and a value above the gRPC +// transport's GRPCMaxMessageBytes budget would make Raft emit payloads +// the transport cannot carry, causing repeated send failures under +// large batches. func maxSizePerMsgFromEnv() (uint64, bool) { v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) if v == "" { @@ -2979,6 +3019,11 @@ func maxSizePerMsgFromEnv() (uint64, bool) { "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) return uint64(defaultMaxSizePerMsg), true } + if n > maxMaxSizePerMsg { + slog.Warn("ELASTICKV_RAFT_MAX_SIZE_PER_MSG exceeds transport budget; using default", + "value", v, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + return uint64(defaultMaxSizePerMsg), true + } return n, true } diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 1cc98ab6..21903882 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1910,6 +1910,67 @@ func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { } } +// TestMaxInflightMsgFromEnv_ClampsAboveCap pins the upper-bound safety +// behaviour: a value above maxMaxInflightMsg is refused (it would +// trigger multi-GB channel allocations at Open() and crash the process +// before the node becomes healthy) and the compiled-in default is +// surfaced with ok=true so normalizeLimitConfig actually applies it. +func TestMaxInflightMsgFromEnv_ClampsAboveCap(t *testing.T) { + cases := []string{ + strconv.Itoa(maxMaxInflightMsg + 1), + "100000000", // fat-fingered value from the Codex P2 report + } + for _, v := range cases { + t.Setenv(maxInflightMsgEnvVar, v) + n, ok := maxInflightMsgFromEnv() + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, defaultMaxInflightMsg, n, "env=%q", v) + } +} + +// TestMaxInflightMsgFromEnv_AcceptsAtCap pins the boundary: exactly +// maxMaxInflightMsg must parse through unchanged. Catches an off-by-one +// regression that would silently clamp an operator-tuned value. +func TestMaxInflightMsgFromEnv_AcceptsAtCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, strconv.Itoa(maxMaxInflightMsg)) + n, ok := maxInflightMsgFromEnv() + require.True(t, ok) + require.Equal(t, maxMaxInflightMsg, n) +} + +// TestMaxSizePerMsgFromEnv_ClampsAboveTransportBudget pins that a +// MaxSizePerMsg above the gRPC transport's message-size budget +// (GRPCMaxMessageBytes) is refused. Without this clamp the override +// would make Raft emit MsgApp frames the transport physically cannot +// carry, producing repeated send failures under large batches. +func TestMaxSizePerMsgFromEnv_ClampsAboveTransportBudget(t *testing.T) { + over := strconv.FormatUint(maxMaxSizePerMsg+1, 10) + t.Setenv(maxSizePerMsgEnvVar, over) + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, uint64(defaultMaxSizePerMsg), n) +} + +// TestMaxSizePerMsgFromEnv_AcceptsAtCap covers the boundary symmetrically +// with the inflight test above. +func TestMaxSizePerMsgFromEnv_AcceptsAtCap(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, strconv.FormatUint(maxMaxSizePerMsg, 10)) + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, maxMaxSizePerMsg, n) +} + +// TestMaxMaxSizePerMsg_MatchesTransportBudget pins the invariant that +// the MaxSizePerMsg upper cap equals GRPCMaxMessageBytes. If someone +// raises the transport budget without updating the Raft cap (or vice +// versa), MaxSizePerMsg overrides would silently allow values that +// cannot traverse the wire. Changing this constant MUST be a +// deliberate, paired action. +func TestMaxMaxSizePerMsg_MatchesTransportBudget(t *testing.T) { + require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes), maxMaxSizePerMsg, + "maxMaxSizePerMsg must track internal.GRPCMaxMessageBytes — raise both together") +} + // TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults // that reach raft.Config when neither the caller nor the operator has // overridden them: 512 inflight msgs and 2 MiB per msg. The combination @@ -1989,6 +2050,12 @@ func TestInboundChannelCap(t *testing.T) { // tuning knob. func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "2048") + // Open() normalizes BOTH raft limit env vars; leaving the size var + // unset here would let an ambient ELASTICKV_RAFT_MAX_SIZE_PER_MSG + // in the shell the test runs in influence config resolution and + // confuse an unrelated failure diagnosis. Pin it to "" so the size + // path always falls through to the caller's OpenConfig value. + t.Setenv(maxSizePerMsgEnvVar, "") fsm := &testStateMachine{} engine, err := Open(context.Background(), OpenConfig{ NodeID: 1, @@ -2013,6 +2080,9 @@ func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { // current production value. func TestOpen_InboundChannelsDefaultCap(t *testing.T) { t.Setenv(maxInflightMsgEnvVar, "") + // See TestOpen_InboundChannelsHonourMaxInflightEnv for why the size + // env var is cleared here too. + t.Setenv(maxSizePerMsgEnvVar, "") fsm := &testStateMachine{} engine, err := Open(context.Background(), OpenConfig{ NodeID: 1, From 639ef081b41db3ed6b185bf09e9bca73a86a426a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 06:33:46 +0900 Subject: [PATCH 09/11] fix(raft): reserve envelope headroom in MaxSizePerMsg transport cap Codex P2 follow-up on d60cdcff: etcd-raft MaxSizePerMsg caps the entries-data size per MsgApp, not the full serialized raftpb.Message envelope. The envelope adds Term/Index/From/To/LogTerm/Commit plus per-entry framing, so a batch that exactly hits MaxSizePerMsg serializes to a frame slightly larger than MaxSizePerMsg. With the previous cap (maxMaxSizePerMsg == GRPCMaxMessageBytes == 64 MiB), a full-sized batch could tip the frame a few KiB past the transport limit and replication would fail with ResourceExhausted even though the env-var value passed our validation. Clamp the override at GRPCMaxMessageBytes - raftMessageEnvelopeHeadroom (1 MiB) so the accepted set is strictly smaller than the transport budget. 1 MiB is an order of magnitude over the observed wire overhead for 8192-entry batches (tens of KiB), leaving the transport budget as the single source of truth while making the env override a safe bound under real batching. Renamed the pinning test to TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom and tightened it to assert strict-less-than GRPCMaxMessageBytes. --- internal/raftengine/etcd/engine.go | 36 ++++++++++++++++++------- internal/raftengine/etcd/engine_test.go | 25 ++++++++++------- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 24a1a789..457a2469 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + internalutil "github.com/bootjp/elastickv/internal" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" etcdstorage "go.etcd.io/etcd/server/v3/storage" @@ -112,17 +113,34 @@ const ( // a warning (same policy as sub-1 values) so a misconfigured // operator never hard-breaks startup. maxMaxInflightMsg = 8192 + // raftMessageEnvelopeHeadroom is the safety margin subtracted from + // GRPCMaxMessageBytes when computing maxMaxSizePerMsg. etcd/raft's + // MaxSizePerMsg caps the *entries data* per MsgApp, not the + // serialized raftpb.Message envelope that actually crosses the + // transport. The envelope adds Term/Index/From/To/LogTerm/Commit + // plus per-entry framing around every Entry, so a batch that + // exactly hits MaxSizePerMsg serializes to a frame slightly + // larger than MaxSizePerMsg. If MaxSizePerMsg == GRPCMaxMessageBytes, + // a full-sized batch can tip the frame a few KiB past the + // transport limit and replication fails with ResourceExhausted + // even though the env-var value passed validation. + // + // 1 MiB is an order-of-magnitude cushion over the observed wire + // overhead for 8192-entry batches at MaxSizePerMsg (~tens of KiB). + // Reserving it here keeps the transport budget as the single + // source of truth while making the env override a *safe* bound. + raftMessageEnvelopeHeadroom uint64 = 1 << 20 // maxMaxSizePerMsg caps the environment override for - // MaxSizePerMsg at the Raft transport's per-message budget. The + // MaxSizePerMsg at the Raft transport's per-message budget MINUS + // the envelope headroom (see raftMessageEnvelopeHeadroom). The // server- and dial-side gRPC options (see internal.GRPCMaxMessageBytes) - // reject frames larger than 64 MiB, so a MaxSizePerMsg above that - // makes Raft emit MsgApp payloads the transport physically cannot - // carry, producing repeated send failures / unreachable reports - // under large batches. Keeping this equal to GRPCMaxMessageBytes - // makes the transport budget the single source of truth — raise - // BOTH together, never this one alone. Values above this clamp - // back to the default with a warning. - maxMaxSizePerMsg uint64 = 64 << 20 + // reject frames larger than 64 MiB, so allowing MaxSizePerMsg to + // reach the transport limit exactly can still make Raft emit + // MsgApp frames the transport cannot carry once envelope overhead + // is added. Clamping to (budget - headroom) makes the override a + // *safe* bound under real batching. Values above this clamp back + // to the default with a warning. + maxMaxSizePerMsg = uint64(internalutil.GRPCMaxMessageBytes) - raftMessageEnvelopeHeadroom // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. // It carries low-frequency control traffic: heartbeats, votes, read-index, // leader-transfer, and their corresponding response messages diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 21903882..11f65523 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1960,15 +1960,22 @@ func TestMaxSizePerMsgFromEnv_AcceptsAtCap(t *testing.T) { require.Equal(t, maxMaxSizePerMsg, n) } -// TestMaxMaxSizePerMsg_MatchesTransportBudget pins the invariant that -// the MaxSizePerMsg upper cap equals GRPCMaxMessageBytes. If someone -// raises the transport budget without updating the Raft cap (or vice -// versa), MaxSizePerMsg overrides would silently allow values that -// cannot traverse the wire. Changing this constant MUST be a -// deliberate, paired action. -func TestMaxMaxSizePerMsg_MatchesTransportBudget(t *testing.T) { - require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes), maxMaxSizePerMsg, - "maxMaxSizePerMsg must track internal.GRPCMaxMessageBytes — raise both together") +// TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom pins the invariant that +// the MaxSizePerMsg upper cap is strictly less than GRPCMaxMessageBytes, +// by exactly raftMessageEnvelopeHeadroom bytes. etcd/raft's +// MaxSizePerMsg caps the entries-data size per MsgApp; the full +// serialized raftpb.Message envelope adds Term/Index/From/To plus +// per-entry framing, so a batch that exactly hits MaxSizePerMsg +// serializes to a frame a few KiB larger than MaxSizePerMsg. If the +// cap matched the transport budget exactly, a full-sized batch could +// overflow the transport and fail replication with ResourceExhausted. +// Raising GRPCMaxMessageBytes without updating this cap (or vice +// versa) MUST be a deliberate, paired action. +func TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom(t *testing.T) { + require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes)-raftMessageEnvelopeHeadroom, maxMaxSizePerMsg, + "maxMaxSizePerMsg must be GRPCMaxMessageBytes - raftMessageEnvelopeHeadroom") + require.Less(t, maxMaxSizePerMsg, uint64(internalutil.GRPCMaxMessageBytes), + "maxMaxSizePerMsg must leave strict headroom below the transport budget") } // TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults From 8f72769e0190307f686ce4d70eab4cb13eb26673 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 06:47:35 +0900 Subject: [PATCH 10/11] fix(raft): clamp caller-supplied MaxInflight/MaxSize in normalizeLimitConfig MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 follow-up on 639ef081: env overrides go through the maxMaxInflightMsg / maxMaxSizePerMsg upper bounds, but caller-supplied OpenConfig values (programmatic configuration via the Go API) skipped the clamp entirely. With the prior code a fat-fingered OpenConfig{MaxInflightMsg: 100_000_000} would have bypassed the env guard and triggered multi-GB stepCh / dispatchReportCh allocations at Open() — defeating the whole point of the upper bounds. normalizeLimitConfig now applies the same clamp policy to caller values as to env values: exceeding the upper bound logs a warning and falls back to the compiled-in default. The env override step still runs first (operator wins), so a healthy operator-tuned env value is not affected. Three new tests cover: - TestNormalizeLimitConfig_ClampsCallerOverlimitInflight: 100M is refused, default is applied. - TestNormalizeLimitConfig_ClampsCallerOverlimitSize: symmetric. - TestNormalizeLimitConfig_AcceptsCallerAtCap: boundary value (exactly maxMax*) passes through unchanged. Re the other Codex P2 ("derive cap from worst-case protobuf overhead"): declined silently. etcd-raft limitSize already enforces the cap inclusive of per-entry framing (it sums ent.Size() which is the encoded protobuf size, not just payload bytes). The MsgApp envelope adds only the outer Message fields (Term/Index/From/To/etc.) which are bounded by ~hundreds of bytes, well within the 1 MiB headroom already reserved. The "tiny entries add many MiB of framing" claim double-counts the per-entry overhead etcd-raft already includes. --- internal/raftengine/etcd/engine.go | 18 ++++++++++ internal/raftengine/etcd/engine_test.go | 44 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 457a2469..f92f8865 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2718,6 +2718,24 @@ func normalizeLimitConfig(cfg OpenConfig) OpenConfig { if v, ok := maxSizePerMsgFromEnv(); ok { cfg.MaxSizePerMsg = v } + // Clamp the resolved values against the same upper bounds we use + // for env overrides. Without this, a programmatic caller passing + // a fat-fingered OpenConfig{MaxInflightMsg: 100_000_000} would + // bypass the env-side guard and trigger multi-GB channel + // allocations at Open() — defeating the whole point of + // maxMaxInflightMsg / maxMaxSizePerMsg. Symmetric to the env + // fallback policy: clamp to the compiled-in default, log a + // warning so the misconfiguration is auditable. + if cfg.MaxInflightMsg > maxMaxInflightMsg { + slog.Warn("OpenConfig.MaxInflightMsg exceeds safe cap; clamping to default", + "value", cfg.MaxInflightMsg, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) + cfg.MaxInflightMsg = defaultMaxInflightMsg + } + if cfg.MaxSizePerMsg > maxMaxSizePerMsg { + slog.Warn("OpenConfig.MaxSizePerMsg exceeds transport budget; clamping to default", + "value", cfg.MaxSizePerMsg, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + cfg.MaxSizePerMsg = uint64(defaultMaxSizePerMsg) + } slog.Info("etcd raft engine: message size limits", "max_inflight_msgs", cfg.MaxInflightMsg, "max_size_per_msg_bytes", cfg.MaxSizePerMsg, diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 11f65523..e60150eb 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1978,6 +1978,50 @@ func TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom(t *testing.T) { "maxMaxSizePerMsg must leave strict headroom below the transport budget") } +// TestNormalizeLimitConfig_ClampsCallerOverlimitInflight pins that a +// programmatic caller passing OpenConfig.MaxInflightMsg above +// maxMaxInflightMsg is clamped to defaultMaxInflightMsg, mirroring the +// env-side guard. Without this, Open()'s stepCh / dispatchReportCh / +// per-peer dispatch queue allocations would consume multi-GB on a +// fat-fingered programmatic config and crash the process before the +// node became healthy. +func TestNormalizeLimitConfig_ClampsCallerOverlimitInflight(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 100_000_000, // fat-fingered value from the Codex P2 report + }) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) +} + +// TestNormalizeLimitConfig_ClampsCallerOverlimitSize is the symmetric +// pin for caller-supplied OpenConfig.MaxSizePerMsg above the transport +// envelope budget. +func TestNormalizeLimitConfig_ClampsCallerOverlimitSize(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxSizePerMsg: maxMaxSizePerMsg + 1, + }) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_AcceptsCallerAtCap is the boundary check: +// exactly maxMaxInflightMsg / maxMaxSizePerMsg from a caller must +// pass through unchanged. Catches an off-by-one regression that +// would silently clamp a legitimate operator-tuned programmatic +// config. +func TestNormalizeLimitConfig_AcceptsCallerAtCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: maxMaxInflightMsg, + MaxSizePerMsg: maxMaxSizePerMsg, + }) + require.Equal(t, maxMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, maxMaxSizePerMsg, got.MaxSizePerMsg) +} + // TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults // that reach raft.Config when neither the caller nor the operator has // overridden them: 512 inflight msgs and 2 MiB per msg. The combination From 11cb09bf41ea6228b7b08c00043aa1ab5ccc4558 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 07:00:37 +0900 Subject: [PATCH 11/11] fix(raft): halve MaxSizePerMsg cap to absorb tiny-entry framing growth MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 (re-flagged): the prior cap (GRPCMaxMessageBytes - 1 MiB) was based on a static envelope-overhead estimate, but etcd-raft's limitSize counts only Entry.Size() per entry. The serialized raftpb.Message that crosses the wire ALSO carries per-entry framing inside its Entries repeated field — `1 byte (field tag) + sovRaft(len) + Entry payload` per entry — which limitSize does not account for. For minimal entries (Term + Index + Type tags+varints = ~4 B, empty Data), per-entry framing is 1 + sovRaft(4) = 2 B. So tiny-entry workloads can encode at up to (4+2)/4 = 1.5x the entries- data budget on the wire. With the prior 1 MiB headroom (a fixed subtraction), a config of MaxSizePerMsg = GRPCMaxMessageBytes - 1 MiB serializing tiny entries could still ResourceExhaust gRPC's 64 MiB transport limit. Replaced the fixed headroom with a divisor: maxMaxSizePerMsg = GRPCMaxMessageBytes / 2. This admits the worst-case 1.5x growth (32 MiB entries → 48 MiB on wire ≪ 64 MiB transport) and is the minimum cap that keeps the env override safe under any entry-size distribution. Re-derived TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom against the new divisor; added TestMaxMaxSizePerMsg_AbsorbsTinyEntryFraming as a quantitative pin so a future change that raises the divisor below 2 (which would re-introduce the tiny-entry failure mode) fails CI. The earlier "1 MiB headroom" reasoning was incomplete — corrected here. --- internal/raftengine/etcd/engine.go | 54 ++++++++++++------------- internal/raftengine/etcd/engine_test.go | 44 ++++++++++++++------ 2 files changed, 59 insertions(+), 39 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index f92f8865..644f2d4b 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -113,34 +113,34 @@ const ( // a warning (same policy as sub-1 values) so a misconfigured // operator never hard-breaks startup. maxMaxInflightMsg = 8192 - // raftMessageEnvelopeHeadroom is the safety margin subtracted from - // GRPCMaxMessageBytes when computing maxMaxSizePerMsg. etcd/raft's - // MaxSizePerMsg caps the *entries data* per MsgApp, not the - // serialized raftpb.Message envelope that actually crosses the - // transport. The envelope adds Term/Index/From/To/LogTerm/Commit - // plus per-entry framing around every Entry, so a batch that - // exactly hits MaxSizePerMsg serializes to a frame slightly - // larger than MaxSizePerMsg. If MaxSizePerMsg == GRPCMaxMessageBytes, - // a full-sized batch can tip the frame a few KiB past the - // transport limit and replication fails with ResourceExhausted - // even though the env-var value passed validation. - // - // 1 MiB is an order-of-magnitude cushion over the observed wire - // overhead for 8192-entry batches at MaxSizePerMsg (~tens of KiB). - // Reserving it here keeps the transport budget as the single - // source of truth while making the env override a *safe* bound. - raftMessageEnvelopeHeadroom uint64 = 1 << 20 // maxMaxSizePerMsg caps the environment override for - // MaxSizePerMsg at the Raft transport's per-message budget MINUS - // the envelope headroom (see raftMessageEnvelopeHeadroom). The - // server- and dial-side gRPC options (see internal.GRPCMaxMessageBytes) - // reject frames larger than 64 MiB, so allowing MaxSizePerMsg to - // reach the transport limit exactly can still make Raft emit - // MsgApp frames the transport cannot carry once envelope overhead - // is added. Clamping to (budget - headroom) makes the override a - // *safe* bound under real batching. Values above this clamp back - // to the default with a warning. - maxMaxSizePerMsg = uint64(internalutil.GRPCMaxMessageBytes) - raftMessageEnvelopeHeadroom + // MaxSizePerMsg at HALF of the Raft transport's per-message budget + // (internal.GRPCMaxMessageBytes). Half — not an arbitrary subtraction + // — because etcd-raft's limitSize counts only Entry.Size() per entry + // when deciding whether to extend a batch, but the serialized + // raftpb.Message that actually crosses the wire ALSO carries + // per-entry framing inside its Entries repeated field + // (`1 + len(varint) + entry`), plus the outer Message envelope + // (Term/Index/From/To/LogTerm/Commit/etc.). + // + // Worst-case ratio analysis: a minimal Entry (no Data) has + // Size() ≈ 4 B (Term + Index + Type tags+varints). Its outer- + // Message framing is `1 + sovRaft(4)` = 2 B. So tiny-entry + // batches encode at up to 1.5× the entries-data budget on the + // wire. Realistic data-bearing entries (e.g. 100 B+ payloads) + // shrink this ratio to <1.05×, but the cap MUST cover the + // worst case to make the env override a safe bound rather than + // pushing the failure mode out to "ResourceExhausted under + // tiny-entry workloads". + // + // Halving the transport budget admits the worst-case 1.5× growth + // (32 MiB entries-data → 48 MiB on wire ≪ 64 MiB transport limit) + // and leaves ample headroom for realistic batching. Operators + // who genuinely need a higher cap should raise GRPCMaxMessageBytes + // in a deliberate, paired action and adjust this divisor with the + // same worst-case analysis. + maxMaxSizePerMsgDivisor uint64 = 2 + maxMaxSizePerMsg = uint64(internalutil.GRPCMaxMessageBytes) / maxMaxSizePerMsgDivisor // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. // It carries low-frequency control traffic: heartbeats, votes, read-index, // leader-transfer, and their corresponding response messages diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index e60150eb..aa3478c0 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1961,23 +1961,43 @@ func TestMaxSizePerMsgFromEnv_AcceptsAtCap(t *testing.T) { } // TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom pins the invariant that -// the MaxSizePerMsg upper cap is strictly less than GRPCMaxMessageBytes, -// by exactly raftMessageEnvelopeHeadroom bytes. etcd/raft's -// MaxSizePerMsg caps the entries-data size per MsgApp; the full -// serialized raftpb.Message envelope adds Term/Index/From/To plus -// per-entry framing, so a batch that exactly hits MaxSizePerMsg -// serializes to a frame a few KiB larger than MaxSizePerMsg. If the -// cap matched the transport budget exactly, a full-sized batch could -// overflow the transport and fail replication with ResourceExhausted. -// Raising GRPCMaxMessageBytes without updating this cap (or vice -// versa) MUST be a deliberate, paired action. +// the MaxSizePerMsg upper cap is at most half the gRPC transport budget. +// etcd-raft's limitSize counts only Entry.Size() per entry, but the +// serialized raftpb.Message also carries per-entry framing (`1 + len- +// varint + entry`) inside its Entries repeated field. For minimal +// 4-byte entries, the outer framing is 2 bytes per entry, so the wire +// representation can be up to 1.5x the entries-data budget. Halving +// the transport budget covers that worst case (32 MiB entries-data → +// 48 MiB on wire, well below the 64 MiB transport limit) and is the +// minimum cap that makes the env override safe under tiny-entry +// workloads. Raising GRPCMaxMessageBytes without re-deriving this +// divisor MUST be a deliberate, paired action. func TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom(t *testing.T) { - require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes)-raftMessageEnvelopeHeadroom, maxMaxSizePerMsg, - "maxMaxSizePerMsg must be GRPCMaxMessageBytes - raftMessageEnvelopeHeadroom") + require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes)/maxMaxSizePerMsgDivisor, maxMaxSizePerMsg, + "maxMaxSizePerMsg must be GRPCMaxMessageBytes / maxMaxSizePerMsgDivisor") + require.LessOrEqual(t, maxMaxSizePerMsg, uint64(internalutil.GRPCMaxMessageBytes)/2, + "maxMaxSizePerMsg must be at most half the transport budget to absorb worst-case per-entry framing on tiny-entry batches") require.Less(t, maxMaxSizePerMsg, uint64(internalutil.GRPCMaxMessageBytes), "maxMaxSizePerMsg must leave strict headroom below the transport budget") } +// TestMaxMaxSizePerMsg_AbsorbsTinyEntryFraming pins the *quantitative* +// safety claim: even at the worst-case per-entry framing ratio +// (minimum Entry.Size() = 4 B + 2 B framing = 1.5x growth), a fully +// packed MsgApp at maxMaxSizePerMsg fits inside GRPCMaxMessageBytes +// with margin to spare. Catches a future change that raises +// maxMaxSizePerMsgDivisor below 2 (which would re-introduce the +// "tiny-entry workloads can ResourceExhaust gRPC" failure mode Codex +// flagged). +func TestMaxMaxSizePerMsg_AbsorbsTinyEntryFraming(t *testing.T) { + const minEntrySize = uint64(4) // Term + Index + Type tags+varints, no Data + const minFraming = uint64(2) // 1 (field tag) + sovRaft(4) = 1 (length varint) + const worstCaseRatio = minEntrySize + minFraming + maxWireBytes := maxMaxSizePerMsg / minEntrySize * worstCaseRatio + require.LessOrEqual(t, maxWireBytes, uint64(internalutil.GRPCMaxMessageBytes), + "worst-case wire size for tiny-entry batches must fit inside the transport budget") +} + // TestNormalizeLimitConfig_ClampsCallerOverlimitInflight pins that a // programmatic caller passing OpenConfig.MaxInflightMsg above // maxMaxInflightMsg is clamped to defaultMaxInflightMsg, mirroring the