-
Notifications
You must be signed in to change notification settings - Fork 2
perf(raft): raise MaxInflightMsgs=1024, MaxSizePerMsg=4MB defaults #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
98554e6
bb6ff01
0546484
ca6a415
a248131
4edd02d
b5c5298
d60cdcf
639ef08
8f72769
11cb09b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,7 @@ import ( | |||||||||||||||||||||||||
| "sync/atomic" | ||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| internalutil "github.com/bootjp/elastickv/internal" | ||||||||||||||||||||||||||
| "github.com/bootjp/elastickv/internal/raftengine" | ||||||||||||||||||||||||||
| "github.com/cockroachdb/errors" | ||||||||||||||||||||||||||
| etcdstorage "go.etcd.io/etcd/server/v3/storage" | ||||||||||||||||||||||||||
|
|
@@ -40,19 +41,106 @@ const ( | |||||||||||||||||||||||||
| // queue. Total buffered memory is bounded by | ||||||||||||||||||||||||||
| // O(numPeers × MaxInflightMsg × avgMsgSize). | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Raised from 256 → 1024 to absorb short CPU bursts without forcing | ||||||||||||||||||||||||||
| // Raised from 256 → 512 to absorb short CPU bursts without forcing | ||||||||||||||||||||||||||
| // peers to reject with "etcd raft inbound step queue is full". | ||||||||||||||||||||||||||
| // Under production congestion we observed the 256-slot inbound | ||||||||||||||||||||||||||
| // stepCh on followers filling up while their event loop was held | ||||||||||||||||||||||||||
| // up by adapter-side pebble seek storms (PRs #560, #562, #563, | ||||||||||||||||||||||||||
| // #565 removed most of that CPU); 1024 is a 4× safety margin. | ||||||||||||||||||||||||||
| // Note that with the current defaultMaxSizePerMsg of 1 MiB, the | ||||||||||||||||||||||||||
| // true worst-case bound can be much larger (up to roughly 1 GiB | ||||||||||||||||||||||||||
| // per peer if every slot held a max-sized message). In practice, | ||||||||||||||||||||||||||
| // typical MsgApp payloads are far smaller, so expected steady-state | ||||||||||||||||||||||||||
| // memory remains much lower than that worst-case bound. | ||||||||||||||||||||||||||
| defaultMaxInflightMsg = 1024 | ||||||||||||||||||||||||||
| defaultMaxSizePerMsg = 1 << 20 | ||||||||||||||||||||||||||
| // #565 removed most of that CPU); 512 is a 2× safety margin. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // We intentionally do NOT raise this in lock-step with the 2 MiB | ||||||||||||||||||||||||||
| // defaultMaxSizePerMsg: the two knobs multiply, and 1024 × 2 MiB | ||||||||||||||||||||||||||
| // is a 2 GiB per-peer worst-case product that a bursty multi-peer | ||||||||||||||||||||||||||
| // deployment can plausibly realise under TCP backpressure loss. | ||||||||||||||||||||||||||
| // 512 × 2 MiB halves that to 1 GiB per peer (4 GiB on a 5-node | ||||||||||||||||||||||||||
| // leader with 4 followers), which fits comfortably inside the | ||||||||||||||||||||||||||
| // 4–16 GiB RAM envelope of typical elastickv nodes while still | ||||||||||||||||||||||||||
| // preserving the MsgApp-batching win that motivates raising the | ||||||||||||||||||||||||||
| // byte cap above etcd/raft's 1 MiB upstream default on small-entry | ||||||||||||||||||||||||||
| // workloads. Operators who need deeper pipelines (large clusters | ||||||||||||||||||||||||||
| // with plenty of RAM) can raise this via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators | ||||||||||||||||||||||||||
| // who need a smaller memory ceiling can lower MaxSizePerMsg via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. | ||||||||||||||||||||||||||
| defaultMaxInflightMsg = 512 | ||||||||||||||||||||||||||
| // minInboundChannelCap is the floor applied when sizing the engine's | ||||||||||||||||||||||||||
| // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. | ||||||||||||||||||||||||||
| // Even if a (misconfigured) caller drops MaxInflightMsg below this, | ||||||||||||||||||||||||||
| // we keep at least this much buffering so that a single tick burst | ||||||||||||||||||||||||||
| // doesn't trip errStepQueueFull on the inbound side. 256 matches the | ||||||||||||||||||||||||||
| // pre-#529 compiled-in default that was known to be survivable. | ||||||||||||||||||||||||||
| minInboundChannelCap = 256 | ||||||||||||||||||||||||||
| // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. | ||||||||||||||||||||||||||
| // Set to 2 MiB — double etcd/raft's 1 MiB upstream default — so each | ||||||||||||||||||||||||||
| // MsgApp amortises more entries under small-entry workloads | ||||||||||||||||||||||||||
| // (Redis-style KV, median entry ~500 B; 2 MiB / 500 B ≈ 4000 entries | ||||||||||||||||||||||||||
| // per MsgApp already saturates the per-RPC batching benefit). | ||||||||||||||||||||||||||
| // Fewer MsgApps per committed byte means fewer dispatcher wake-ups | ||||||||||||||||||||||||||
| // on the leader and fewer recv syscalls on the follower; the | ||||||||||||||||||||||||||
| // follower's apply loop also contends less with the read path. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Lowered from 4 MiB → 2 MiB in tandem with defaultMaxInflightMsg=512 | ||||||||||||||||||||||||||
| // to cap per-peer worst-case buffered Raft traffic at 1 GiB | ||||||||||||||||||||||||||
| // (512 × 2 MiB), i.e. 4 GiB on a 5-node leader with 4 followers. | ||||||||||||||||||||||||||
| // The previous 4 MiB cap produced a 2 GiB/peer, 8 GiB/leader | ||||||||||||||||||||||||||
| // worst case that was too tight for the 4–16 GiB RAM envelope | ||||||||||||||||||||||||||
| // typical elastickv nodes operate in; the batching win of 4 MiB | ||||||||||||||||||||||||||
| // over 2 MiB is marginal on small-entry workloads. | ||||||||||||||||||||||||||
| defaultMaxSizePerMsg = 2 << 20 | ||||||||||||||||||||||||||
| // maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the | ||||||||||||||||||||||||||
| // Raft-level flow-control knobs without a rebuild. Parsed once at | ||||||||||||||||||||||||||
| // Open and passed through normalizeLimitConfig; invalid values fall | ||||||||||||||||||||||||||
| // back to the defaults with a warning. MaxSizePerMsg is expressed | ||||||||||||||||||||||||||
| // as an integer byte count for consistency with the other numeric | ||||||||||||||||||||||||||
| // knobs in this package. | ||||||||||||||||||||||||||
| maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS" | ||||||||||||||||||||||||||
| maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG" | ||||||||||||||||||||||||||
| // minMaxSizePerMsg is the lower bound accepted from the environment | ||||||||||||||||||||||||||
| // override. A payload cap below ~1 KiB makes MsgApp batching | ||||||||||||||||||||||||||
| // degenerate (one entry per message) which defeats the whole point | ||||||||||||||||||||||||||
| // of the knob; fall back to the default rather than rejecting so that a | ||||||||||||||||||||||||||
| // fat-fingered operator doesn't take out the engine. | ||||||||||||||||||||||||||
| minMaxSizePerMsg uint64 = 1 << 10 | ||||||||||||||||||||||||||
| // maxMaxInflightMsg caps the environment override for | ||||||||||||||||||||||||||
| // MaxInflightMsg. Open() uses the resolved value to allocate | ||||||||||||||||||||||||||
| // stepCh, dispatchReportCh, and every per-peer dispatch queue, so | ||||||||||||||||||||||||||
| // a fat-fingered ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=1e8 triggers | ||||||||||||||||||||||||||
| // multi-GB channel allocations and crashes the process before the | ||||||||||||||||||||||||||
| // node even becomes healthy. 8192 is ~16× the compiled-in default | ||||||||||||||||||||||||||
| // (512) — plenty of headroom for pipelining experiments, and well | ||||||||||||||||||||||||||
| // below the point where channel allocation alone would OOM a | ||||||||||||||||||||||||||
| // 32 GiB runner. Values above this clamp back to the default with | ||||||||||||||||||||||||||
| // a warning (same policy as sub-1 values) so a misconfigured | ||||||||||||||||||||||||||
| // operator never hard-breaks startup. | ||||||||||||||||||||||||||
| maxMaxInflightMsg = 8192 | ||||||||||||||||||||||||||
| // maxMaxSizePerMsg caps the environment override for | ||||||||||||||||||||||||||
| // MaxSizePerMsg at HALF of the Raft transport's per-message budget | ||||||||||||||||||||||||||
| // (internal.GRPCMaxMessageBytes). Half — not an arbitrary subtraction | ||||||||||||||||||||||||||
| // — because etcd-raft's limitSize counts only Entry.Size() per entry | ||||||||||||||||||||||||||
| // when deciding whether to extend a batch, but the serialized | ||||||||||||||||||||||||||
| // raftpb.Message that actually crosses the wire ALSO carries | ||||||||||||||||||||||||||
| // per-entry framing inside its Entries repeated field | ||||||||||||||||||||||||||
| // (`1 + len(varint) + entry`), plus the outer Message envelope | ||||||||||||||||||||||||||
| // (Term/Index/From/To/LogTerm/Commit/etc.). | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Worst-case ratio analysis: a minimal Entry (no Data) has | ||||||||||||||||||||||||||
| // Size() ≈ 4 B (Term + Index + Type tags+varints). Its outer- | ||||||||||||||||||||||||||
| // Message framing is `1 + sovRaft(4)` = 2 B. So tiny-entry | ||||||||||||||||||||||||||
| // batches encode at up to 1.5× the entries-data budget on the | ||||||||||||||||||||||||||
| // wire. Realistic data-bearing entries (e.g. 100 B+ payloads) | ||||||||||||||||||||||||||
| // shrink this ratio to <1.05×, but the cap MUST cover the | ||||||||||||||||||||||||||
| // worst case to make the env override a safe bound rather than | ||||||||||||||||||||||||||
| // pushing the failure mode out to "ResourceExhausted under | ||||||||||||||||||||||||||
| // tiny-entry workloads". | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // Halving the transport budget admits the worst-case 1.5× growth | ||||||||||||||||||||||||||
| // (32 MiB entries-data → 48 MiB on wire ≪ 64 MiB transport limit) | ||||||||||||||||||||||||||
| // and leaves ample headroom for realistic batching. Operators | ||||||||||||||||||||||||||
| // who genuinely need a higher cap should raise GRPCMaxMessageBytes | ||||||||||||||||||||||||||
| // in a deliberate, paired action and adjust this divisor with the | ||||||||||||||||||||||||||
| // same worst-case analysis. | ||||||||||||||||||||||||||
| maxMaxSizePerMsgDivisor uint64 = 2 | ||||||||||||||||||||||||||
| maxMaxSizePerMsg = uint64(internalutil.GRPCMaxMessageBytes) / maxMaxSizePerMsgDivisor | ||||||||||||||||||||||||||
| // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. | ||||||||||||||||||||||||||
| // It carries low-frequency control traffic: heartbeats, votes, read-index, | ||||||||||||||||||||||||||
| // leader-transfer, and their corresponding response messages | ||||||||||||||||||||||||||
|
|
@@ -142,13 +230,21 @@ type OpenConfig struct { | |||||||||||||||||||||||||
| ElectionTick int | ||||||||||||||||||||||||||
| HeartbeatTick int | ||||||||||||||||||||||||||
| StateMachine StateMachine | ||||||||||||||||||||||||||
| // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level | ||||||||||||||||||||||||||
| // flow control). Default: 2 MiB (see defaultMaxSizePerMsg). Larger values | ||||||||||||||||||||||||||
| // amortise more entries per MsgApp under small-entry workloads; smaller | ||||||||||||||||||||||||||
| // values tighten worst-case memory. Operators can override at runtime via | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a | ||||||||||||||||||||||||||
| // rebuild; the env var takes precedence over the caller-supplied value. | ||||||||||||||||||||||||||
| MaxSizePerMsg uint64 | ||||||||||||||||||||||||||
| // MaxInflightMsg controls how many MsgApp messages Raft may have in-flight | ||||||||||||||||||||||||||
| // per peer before waiting for an acknowledgement (Raft-level flow control). | ||||||||||||||||||||||||||
| // It also sets the per-peer dispatch channel capacity, so total buffered | ||||||||||||||||||||||||||
| // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). | ||||||||||||||||||||||||||
| // Default: 256. Increase for deeper pipelining on high-bandwidth links; | ||||||||||||||||||||||||||
| // lower in memory-constrained clusters. | ||||||||||||||||||||||||||
| // Default: 512 (see defaultMaxInflightMsg). Increase for deeper pipelining | ||||||||||||||||||||||||||
| // on high-bandwidth links; lower in memory-constrained clusters. Operators | ||||||||||||||||||||||||||
| // can override at runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a | ||||||||||||||||||||||||||
| // rebuild; the env var takes precedence over the caller-supplied value. | ||||||||||||||||||||||||||
| MaxInflightMsg int | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -419,24 +515,34 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { | |||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| engine := &Engine{ | ||||||||||||||||||||||||||
| nodeID: prepared.cfg.NodeID, | ||||||||||||||||||||||||||
| localID: prepared.cfg.LocalID, | ||||||||||||||||||||||||||
| localAddress: prepared.cfg.LocalAddress, | ||||||||||||||||||||||||||
| dataDir: prepared.cfg.DataDir, | ||||||||||||||||||||||||||
| fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), | ||||||||||||||||||||||||||
| tickInterval: prepared.cfg.TickInterval, | ||||||||||||||||||||||||||
| electionTick: prepared.cfg.ElectionTick, | ||||||||||||||||||||||||||
| storage: prepared.disk.Storage, | ||||||||||||||||||||||||||
| rawNode: rawNode, | ||||||||||||||||||||||||||
| persist: prepared.disk.Persist, | ||||||||||||||||||||||||||
| fsm: prepared.cfg.StateMachine, | ||||||||||||||||||||||||||
| peers: peerMap, | ||||||||||||||||||||||||||
| transport: prepared.cfg.Transport, | ||||||||||||||||||||||||||
| proposeCh: make(chan proposalRequest), | ||||||||||||||||||||||||||
| readCh: make(chan readRequest), | ||||||||||||||||||||||||||
| adminCh: make(chan adminRequest), | ||||||||||||||||||||||||||
| stepCh: make(chan raftpb.Message, defaultMaxInflightMsg), | ||||||||||||||||||||||||||
| dispatchReportCh: make(chan dispatchReport, defaultMaxInflightMsg), | ||||||||||||||||||||||||||
| nodeID: prepared.cfg.NodeID, | ||||||||||||||||||||||||||
| localID: prepared.cfg.LocalID, | ||||||||||||||||||||||||||
| localAddress: prepared.cfg.LocalAddress, | ||||||||||||||||||||||||||
| dataDir: prepared.cfg.DataDir, | ||||||||||||||||||||||||||
| fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), | ||||||||||||||||||||||||||
| tickInterval: prepared.cfg.TickInterval, | ||||||||||||||||||||||||||
| electionTick: prepared.cfg.ElectionTick, | ||||||||||||||||||||||||||
| storage: prepared.disk.Storage, | ||||||||||||||||||||||||||
| rawNode: rawNode, | ||||||||||||||||||||||||||
| persist: prepared.disk.Persist, | ||||||||||||||||||||||||||
| fsm: prepared.cfg.StateMachine, | ||||||||||||||||||||||||||
| peers: peerMap, | ||||||||||||||||||||||||||
| transport: prepared.cfg.Transport, | ||||||||||||||||||||||||||
| proposeCh: make(chan proposalRequest), | ||||||||||||||||||||||||||
| readCh: make(chan readRequest), | ||||||||||||||||||||||||||
| adminCh: make(chan adminRequest), | ||||||||||||||||||||||||||
| // Size the inbound step / dispatch-report channels from the | ||||||||||||||||||||||||||
| // resolved MaxInflightMsg (post-normalizeLimitConfig, which has | ||||||||||||||||||||||||||
| // already applied the env override and compiled-in default) so | ||||||||||||||||||||||||||
| // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above | ||||||||||||||||||||||||||
| // the default actually get the extra buffering they asked for. | ||||||||||||||||||||||||||
| // Using defaultMaxInflightMsg here would silently cap the | ||||||||||||||||||||||||||
| // channel at 512 (the current compiled-in default) even when | ||||||||||||||||||||||||||
| // the Raft layer has been told to keep 2048 in flight, | ||||||||||||||||||||||||||
| // re-triggering errStepQueueFull under the exact bursty | ||||||||||||||||||||||||||
| // conditions this knob is meant to absorb. | ||||||||||||||||||||||||||
| stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)), | ||||||||||||||||||||||||||
| dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)), | ||||||||||||||||||||||||||
| closeCh: make(chan struct{}), | ||||||||||||||||||||||||||
| doneCh: make(chan struct{}), | ||||||||||||||||||||||||||
| startedCh: make(chan struct{}), | ||||||||||||||||||||||||||
|
|
@@ -2581,13 +2687,59 @@ func normalizeTimingConfig(cfg OpenConfig) OpenConfig { | |||||||||||||||||||||||||
| return cfg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // inboundChannelCap returns the capacity to use when allocating the | ||||||||||||||||||||||||||
| // engine's inbound stepCh and dispatchReportCh. It mirrors the resolved | ||||||||||||||||||||||||||
| // MaxInflightMsg but clamps to minInboundChannelCap so that a caller | ||||||||||||||||||||||||||
| // passing a tiny value doesn't shrink the buffers below a survivable | ||||||||||||||||||||||||||
| // floor. maxInflight is expected to be the post-normalizeLimitConfig | ||||||||||||||||||||||||||
| // value (compiled default or env override applied). | ||||||||||||||||||||||||||
| func inboundChannelCap(maxInflight int) int { | ||||||||||||||||||||||||||
| if maxInflight < minInboundChannelCap { | ||||||||||||||||||||||||||
| return minInboundChannelCap | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return maxInflight | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func normalizeLimitConfig(cfg OpenConfig) OpenConfig { | ||||||||||||||||||||||||||
| if cfg.MaxInflightMsg <= 0 { | ||||||||||||||||||||||||||
| cfg.MaxInflightMsg = defaultMaxInflightMsg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if cfg.MaxSizePerMsg == 0 { | ||||||||||||||||||||||||||
| cfg.MaxSizePerMsg = defaultMaxSizePerMsg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Env overrides win over caller-supplied values so that operators can | ||||||||||||||||||||||||||
| // retune replication flow-control without a rebuild. This mirrors the | ||||||||||||||||||||||||||
| // behaviour of ELASTICKV_RAFT_SNAPSHOT_COUNT and | ||||||||||||||||||||||||||
| // ELASTICKV_RAFT_MAX_WAL_FILES. Invalid values fall back to the | ||||||||||||||||||||||||||
| // compiled-in defaults with a warning. | ||||||||||||||||||||||||||
| if v, ok := maxInflightMsgFromEnv(); ok { | ||||||||||||||||||||||||||
| cfg.MaxInflightMsg = v | ||||||||||||||||||||||||||
|
Comment on lines
+2715
to
+2716
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if v, ok := maxSizePerMsgFromEnv(); ok { | ||||||||||||||||||||||||||
| cfg.MaxSizePerMsg = v | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Clamp the resolved values against the same upper bounds we use | ||||||||||||||||||||||||||
| // for env overrides. Without this, a programmatic caller passing | ||||||||||||||||||||||||||
| // a fat-fingered OpenConfig{MaxInflightMsg: 100_000_000} would | ||||||||||||||||||||||||||
| // bypass the env-side guard and trigger multi-GB channel | ||||||||||||||||||||||||||
| // allocations at Open() — defeating the whole point of | ||||||||||||||||||||||||||
| // maxMaxInflightMsg / maxMaxSizePerMsg. Symmetric to the env | ||||||||||||||||||||||||||
| // fallback policy: clamp to the compiled-in default, log a | ||||||||||||||||||||||||||
| // warning so the misconfiguration is auditable. | ||||||||||||||||||||||||||
| if cfg.MaxInflightMsg > maxMaxInflightMsg { | ||||||||||||||||||||||||||
| slog.Warn("OpenConfig.MaxInflightMsg exceeds safe cap; clamping to default", | ||||||||||||||||||||||||||
| "value", cfg.MaxInflightMsg, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) | ||||||||||||||||||||||||||
| cfg.MaxInflightMsg = defaultMaxInflightMsg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if cfg.MaxSizePerMsg > maxMaxSizePerMsg { | ||||||||||||||||||||||||||
| slog.Warn("OpenConfig.MaxSizePerMsg exceeds transport budget; clamping to default", | ||||||||||||||||||||||||||
| "value", cfg.MaxSizePerMsg, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||||||||||||||||||||||||||
| cfg.MaxSizePerMsg = uint64(defaultMaxSizePerMsg) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| slog.Info("etcd raft engine: message size limits", | ||||||||||||||||||||||||||
| "max_inflight_msgs", cfg.MaxInflightMsg, | ||||||||||||||||||||||||||
| "max_size_per_msg_bytes", cfg.MaxSizePerMsg, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
Comment on lines
+2739
to
+2742
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging the message size limits at Info level inside normalizeLimitConfig will produce redundant log entries for every shard opened (e.g., 100+ lines of identical output in a typical multi-shard deployment). Consider logging this once per process or only when the values deviate from the compiled-in defaults to reduce log noise.
Comment on lines
+2739
to
+2742
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In sharded environments where multiple Raft engines may be active, it is helpful to include the
Suggested change
|
||||||||||||||||||||||||||
| return cfg | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -2845,6 +2997,72 @@ func snapshotEveryFromEnv() uint64 { | |||||||||||||||||||||||||
| return n | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns | ||||||||||||||||||||||||||
| // (value, true) when the env var is set to a valid integer in | ||||||||||||||||||||||||||
| // [1, maxMaxInflightMsg]. Returns (0, false) when the var is unset so | ||||||||||||||||||||||||||
| // the caller can keep the existing cfg.MaxInflightMsg (which | ||||||||||||||||||||||||||
| // normalizeLimitConfig has already defaulted to defaultMaxInflightMsg). | ||||||||||||||||||||||||||
| // Invalid values (non-numeric, negative, zero, or above | ||||||||||||||||||||||||||
| // maxMaxInflightMsg) are logged at warn level and return | ||||||||||||||||||||||||||
| // (defaultMaxInflightMsg, true) so the engine actually applies the | ||||||||||||||||||||||||||
| // compiled-in default the log message promises — otherwise a malformed | ||||||||||||||||||||||||||
| // env var would silently let an unrelated caller-supplied value win. | ||||||||||||||||||||||||||
| // The upper cap is load-bearing: Open() allocates stepCh, | ||||||||||||||||||||||||||
| // dispatchReportCh, and every per-peer dispatch queue from this | ||||||||||||||||||||||||||
| // value, so a fat-fingered `ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000` | ||||||||||||||||||||||||||
| // would try to allocate multi-GB of channel memory before the node | ||||||||||||||||||||||||||
| // becomes healthy. Clamping to the default preserves operator intent | ||||||||||||||||||||||||||
| // ("I asked for a larger pipeline") far better than crashing startup. | ||||||||||||||||||||||||||
| func maxInflightMsgFromEnv() (int, bool) { | ||||||||||||||||||||||||||
| v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) | ||||||||||||||||||||||||||
| if v == "" { | ||||||||||||||||||||||||||
| return 0, false | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| n, err := strconv.Atoi(v) | ||||||||||||||||||||||||||
| if err != nil || n < 1 { | ||||||||||||||||||||||||||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", | ||||||||||||||||||||||||||
|
Comment on lines
+3021
to
+3023
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| "value", v, "default", defaultMaxInflightMsg) | ||||||||||||||||||||||||||
| return defaultMaxInflightMsg, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n > maxMaxInflightMsg { | ||||||||||||||||||||||||||
| slog.Warn("ELASTICKV_RAFT_MAX_INFLIGHT_MSGS exceeds safe cap; using default", | ||||||||||||||||||||||||||
| "value", v, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) | ||||||||||||||||||||||||||
| return defaultMaxInflightMsg, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+3021
to
+3026
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The warning message indicates that the engine will fall back to the default value (defaultMaxInflightMsg), but the function returns (0, false). This causes normalizeLimitConfig to retain the caller-supplied value from OpenConfig instead of the compiled-in default. If a caller provides a non-default value (e.g., 256) and the environment variable is malformed, the log message will be misleading as it won't actually use the default 1024. To align the implementation with the log message and the PR description, the function should return the default value and true when an invalid override is detected.
Suggested change
References
|
||||||||||||||||||||||||||
| return n, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain | ||||||||||||||||||||||||||
| // integer byte count. Returns (value, true) when the env var is set to a | ||||||||||||||||||||||||||
| // valid integer in [minMaxSizePerMsg, maxMaxSizePerMsg] | ||||||||||||||||||||||||||
| // (1 KiB .. 64 MiB). Returns (0, false) when the var is unset so | ||||||||||||||||||||||||||
| // normalizeLimitConfig can keep its earlier default. Invalid, too-small, | ||||||||||||||||||||||||||
| // or too-large values fall back to the compiled-in default with a | ||||||||||||||||||||||||||
| // warning and return (defaultMaxSizePerMsg, true) so the override | ||||||||||||||||||||||||||
| // actually applies the default the warning promises; a sub-KiB cap | ||||||||||||||||||||||||||
| // would make MsgApp batching degenerate, and a value above the gRPC | ||||||||||||||||||||||||||
| // transport's GRPCMaxMessageBytes budget would make Raft emit payloads | ||||||||||||||||||||||||||
| // the transport cannot carry, causing repeated send failures under | ||||||||||||||||||||||||||
| // large batches. | ||||||||||||||||||||||||||
| func maxSizePerMsgFromEnv() (uint64, bool) { | ||||||||||||||||||||||||||
| v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) | ||||||||||||||||||||||||||
| if v == "" { | ||||||||||||||||||||||||||
| return 0, false | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| n, err := strconv.ParseUint(v, 10, 64) | ||||||||||||||||||||||||||
| if err != nil || n < minMaxSizePerMsg { | ||||||||||||||||||||||||||
| slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", | ||||||||||||||||||||||||||
| "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||||||||||||||||||||||||||
| return uint64(defaultMaxSizePerMsg), true | ||||||||||||||||||||||||||
|
Comment on lines
+3052
to
+3056
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if n > maxMaxSizePerMsg { | ||||||||||||||||||||||||||
| slog.Warn("ELASTICKV_RAFT_MAX_SIZE_PER_MSG exceeds transport budget; using default", | ||||||||||||||||||||||||||
|
Comment on lines
+3058
to
+3059
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| "value", v, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) | ||||||||||||||||||||||||||
| return uint64(defaultMaxSizePerMsg), true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+3052
to
+3062
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the MaxInflightMsgs helper, this function returns (0, false) on invalid input, which prevents normalizeLimitConfig from applying the compiled-in default if the caller provided a different value. This makes the using default warning message misleading. The function should return the default value and true to ensure the fallback behavior matches the logs.
Suggested change
References
|
||||||||||||||||||||||||||
| return n, true | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has | ||||||||||||||||||||||||||
| // been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value | ||||||||||||||||||||||||||
| // is parsed with strconv.ParseBool, which accepts the standard tokens | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normalizeLimitConfigonly defaultscfg.MaxInflightMsgwhen it is non-positive, but it never applies the new upper safety bound to caller-supplied values. SinceOpennow allocatesstepChanddispatchReportChfromprepared.cfg.MaxInflightMsg, a fat-fingered programmatic config (for example100000000) can trigger huge channel allocations and crash startup even though env overrides are capped. ApplymaxMaxInflightMsg(or equivalent) to direct config input before these allocations.Useful? React with 👍 / 👎.