feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)#721
Conversation
Adds a stateless helper that polls each peer's /sqs_health
endpoint and reports whether all advertise the htfifo capability.
This is the building block PR 5's CreateQueue gate uses to refuse
PartitionCount > 1 until every node in the cluster has the
HT-FIFO data plane.
What changes
adapter/sqs_capability_poller.go (new file):
- HTFIFOCapabilityReport: AllAdvertise + per-peer detail.
- HTFIFOCapabilityPeerStatus: address, HasHTFIFO flag, raw
capabilities slice, Error string for failure detail.
- PollSQSHTFIFOCapability(ctx, client, peers): polls each peer
concurrently. Returns AllAdvertise=false on any timeout, HTTP
error, malformed JSON, or missing-capability — fail-closed
per §8.5.
- Vacuously AllAdvertise=true on empty peer list (caller is
responsible for ensuring the peer list is meaningful).
- Per-peer timeout capped by defaultSQSCapabilityPollTimeout
(3s) so a single hung peer cannot stall the whole poll.
- Body capped at 1 KiB via io.LimitReader so a misconfigured
peer cannot drain memory.
- Bare host:port and full http://… / https://… URLs both
accepted — operators can front the endpoint with TLS or a
proxy without the helper having to know.
What does NOT change yet
- htfifoCapabilityAdvertised stays false. PR 4-B-3b adds the §8
leadership-refusal hook (startup + per-acquisition observer)
and flips this flag.
- CreateQueue does NOT yet call PollSQSHTFIFOCapability — PR 5
lifts the PartitionCount > 1 dormancy gate AND wires the
capability check in the same commit.
Tests
adapter/sqs_capability_poller_test.go: 9 top-level tests across
the contract surface.
- AllAdvertise happy path with multiple peers.
- One-bad-apple: a peer with empty capabilities drops AllAdvertise.
- Transport failures (HTTP 500, connection refused, malformed
JSON) all fail closed with non-empty Error.
- Hung peer respects per-peer timeout — test bound is well below
what a serial poll would take.
- Empty peer list → AllAdvertise vacuously true.
- Empty peer address → fail closed with explicit Error.
- Full-URL peer (http:// or https://) accepted alongside bare
host:port.
- Concurrent polling: 5 peers each delaying 200ms must finish in
well under 1 second (serial would take 1s+).
- Body-size limit: a 10 KiB response truncated mid-string is
surfaced as a JSON parse error, not a half-decoded value.
- TestBuildSQSHealthURL covers the URL construction edge cases.
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 4 minutes and 11 seconds.Comment |
|
@claude review Phase 3.D PR 4-B-3a — htfifo capability poller. Pure-additive helper that PR 5's CreateQueue gate will consume; |
|
Claude finished @bootjp's task in 2m 18s —— View job Code Review: feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)Tasks:
Verdict: Approve with minor notes. The implementation is correct. The concurrency model is clean, the fail-closed contract is consistent, and the test coverage is thorough. A few small issues worth calling out below, none blocking. Implementation (
|
| Lens | Finding |
|---|---|
| Data loss | None — read-only poll, no FSM/Pebble path |
| Concurrency | Correct — indexed channel avoids slice-write races; per-peer timeout enforced; no goroutine leaks |
| Performance | Concurrent polling confirmed by test; control-plane only, no hot-path impact |
| Data consistency | Fail-closed contract is sound and consistent with §8.5; empty-peer vacuous-true is documented |
| Test coverage | Excellent; two minor gaps noted (per-peer cap not exercised independently; double-path URL edge case not pinned) |
There was a problem hiding this comment.
Code Review
This pull request introduces a new SQS capability poller that concurrently checks cluster peers for the htfifo capability. The implementation includes structured reporting, per-peer timeouts, and response body size limits to ensure reliability. Feedback was provided regarding the silent ignoring of errors when closing the HTTP response body, suggesting that these should be logged to provide visibility into potential resource cleanup issues.
| status.Error = errors.Wrapf(err, "GET %q", url).Error() | ||
| return status | ||
| } | ||
| defer func() { _ = resp.Body.Close() }() |
There was a problem hiding this comment.
The error from resp.Body.Close() is being silently ignored. According to the general rules, errors from Close() methods on resources like network connections should be logged to ensure resource leaks or other cleanup problems are visible. If a logger is not available in this context, consider how to surface this failure or ensure it is handled according to the project's reliability standards.
References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
PR #721 round 1 review fixes: 1) Gemini medium — Body.Close error silently ignored defer func() { _ = resp.Body.Close() }() dropped any close error. A failed Close indicates a connection that the http.Transport will tear down rather than reuse — under load this can mask leaking connections / file descriptors. Logged via slog.Warn with the peer address so operators can grep the cluster log when triaging. 2) Claude minor — non-200 body not drained resp.Body.Close() without first draining the body prevents the http.Transport from reusing the underlying TCP connection. In a control-plane path (one CreateQueue call per gate check) this is acceptable, but if the gate ever fans out across many peers under load, the failed-peer branch would force connection teardown on every error response. Drain via io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes)) before the early return so the transport can reuse the connection. 3) Claude nit — per-peer cap not exercised independently TestPollSQSHTFIFOCapability_TimeoutFailsClosed used a 500ms parent ctx — the request actually timed out via the parent context, not the per-peer cap. The default 3s per-peer cap was never independently exercised by tests. Refactored signature to PollSQSHTFIFOCapability(ctx, peers, cfg PollerConfig). PollerConfig{HTTPClient, PerPeerTimeout} is the single options surface — zero values pick safe defaults. Renamed the existing test to TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed and added TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed which uses context.Background() and PerPeerTimeout=100ms to exercise the cap independently. PerPeerTimeout is also a sensible operator knob — different cluster latencies want different bounds. Caller-side: only *_test.go files use the function today; PR 5's CreateQueue gate will pick the appropriate timeout when it wires this up. 4) Claude minor — buildSQSHealthURL double-path edge case A caller passing a URL that already includes the health path (e.g. "http://node:5050/sqs_health") would receive a doubled path. Added an explicit test case to TestBuildSQSHealthURL documenting the behavior and the contract ("pass a base URL or host:port, never a full request URL"). A future refactor can intentionally change the contract; the test will catch it. 5) Audit per the lessons-learned discipline PollSQSHTFIFOCapability is exported but has no production callers yet — only the test file references it. grep confirmed the API change is safe.
|
@claude review Round 2 — addresses Round 1 review: 1. Gemini medium — Body.Close error silently ignored: now logs via 2. Claude minor — non-200 body not drained: 3. Claude nit — per-peer cap not exercised independently: refactored
Bonus: 4. Claude minor —
|
|
Claude finished @bootjp's task in 2m 8s —— View job Code Review: Round 2 — feat(sqs): htfifo capability pollerTasks:
Verdict: LGTM. All four Round 1 items are correctly addressed. No new issues found. Round 1 feedback checklist1. Body.Close() error logged (Gemini medium) ✅// adapter/sqs_capability_poller.go:204-209
defer func() {
if cerr := resp.Body.Close(); cerr != nil {
slog.Warn("sqs capability poller: response body close failed",
"peer", peer, "err", cerr)
}
}()Correct. Uses 2. Non-200 body drained before early return (Claude minor) ✅// adapter/sqs_capability_poller.go:211-215
if resp.StatusCode != http.StatusOK {
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode)
return status
}Correct. The drain is bounded by the same 1 KiB 3. PollerConfig.PerPeerTimeout; per-peer cap exercised independently (Claude nit) ✅The // Parent-ctx path (line 143): ctx expires at 500ms < 3s per-peer cap
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
// Per-peer-cap path (line 179-180): context.Background() + 100ms cap
report := PollSQSHTFIFOCapability(context.Background(), []string{addrHang},
PollerConfig{PerPeerTimeout: 100 * time.Millisecond})The 4. buildSQSHealthURL double-path documented in test (Claude minor) ✅// adapter/sqs_capability_poller_test.go:291-297
{
"http://node.example:5050" + sqsHealthPath,
"http://node.example:5050" + sqsHealthPath + sqsHealthPath,
},The comment ("pin the behaviour so a future refactor can either keep it or change the contract intentionally") is the right framing — document and lock rather than silently accept. No caller today passes a full request URL to the poller, so this is a latent trap, not an active bug. One micro-observation (not blocking)The Summary
The concurrency model, fail-closed contract, body-size cap, and all test coverage from Round 1 remain intact. Ready to merge. |
Closes the Phase 3.D PR 4-B-2 → PR 5 chain by adding the §8
leadership-refusal safeguard and flipping htfifoCapabilityAdvertised
to true. This is the final piece of the routing+leadership-refusal
pair the §11 PR 4 contract requires before a binary is "marked
htfifo-eligible".
What changes
raftengine: leader-acquired observer (mirror of leader-loss)
- raftengine.Admin gains RegisterLeaderAcquiredCallback. Same
contract as RegisterLeaderLossCallback (non-blocking, panic-
contained, sentinel-pointer deregister) but fires on the
previous!=Leader → status==Leader edge instead of leaving
the leader role.
- etcd backend: new leaderAcquiredCbs slice + mutex; fires from
refreshStatus on the leader-acquired edge AFTER e.isLeader is
published, so a callback that calls engine.State() observes
StateLeader.
- register / fire helpers extracted (registerLeaderCallback,
gatherLeaderCallbacks) so the leader-loss and leader-acquired
paths share one slot management implementation. The dupl
lint warning that triggered on first draft is the test that
keeps this consolidated.
- Test coverage: leader_acquired_callback_test.go mirrors
leader_loss_callback_test.go — panic containment, empty-list
safety, deregister removal, deregister idempotence, nil-fn
safety, nil-receiver safety, identical-fn disambiguation.
main: SQS leadership-refusal hook
- main_sqs_leadership_refusal.go: installSQSLeadershipRefusal +
installSQSLeadershipRefusalAcrossGroups + partitionedGroupSet.
- On install, if the engine is currently leader of a group
hosting a partitioned queue and the binary lacks htfifo, the
hook calls TransferLeadership immediately. The
leader-acquired observer then keeps catching future
transitions for the same group.
- TransferLeadership runs in a goroutine because the
leader-acquired callback contract is non-blocking — a
synchronous admin RPC inside the callback would stall
refreshStatus.
- sqsLeadershipController is the small interface the helper
accepts (subset of raftengine.Admin) so test doubles don't
have to satisfy the full Admin surface.
- run() wires installSQSLeadershipRefusalAcrossGroups after the
coordinator is built; the composite deregister flows through
cleanup.
adapter: AdvertisesHTFIFO + flag flip
- adapter.AdvertisesHTFIFO() reports the htfifo capability flag
so main.go can read it without touching the package-private
constant.
- htfifoCapabilityAdvertised = false → true. Both the routing
wiring (PR 4-B-2 #715) and the leadership-refusal hook (this
PR) are now in the binary, so the design's "marked
htfifo-eligible" bar is met.
What's still gated
PR 5 lifts the PartitionCount > 1 dormancy gate AND wires
PollSQSHTFIFOCapability (PR 4-B-3a #721) into the CreateQueue
gate in the same commit. Until PR 5 lands, no partitioned queue
can land in production — the leadership-refusal hook is dormant
in the happy-path runtime (every binary past this PR advertises
htfifo, and the per-group early return keeps the hook out of the
hot path).
Self-review (per CLAUDE.md)
1. Data loss — control-plane only; no FSM/Pebble/retention path.
The hook calls TransferLeadership which is itself an admin
action with the same data-loss profile as a graceful manual
transfer. No issue.
2. Concurrency / distributed failures — leader-acquired callback
contract mirrors leader-loss (non-blocking, panic-contained,
sentinel-pointer deregister). refuse() offloads the actual
TransferLeadership to a goroutine so refreshStatus stays
non-blocking. Multiple goroutines calling refuse() for the
same group queue serially in raft's admin channel; the worst
case is one redundant transfer attempt, which is idempotent
on the raft side. No issue.
3. Performance — leader-acquired callbacks fire only on the
transition edge (rare event); no per-request hot path cost.
The early return on advertisesHTFIFO=true means
production-binary hosts pay zero overhead. No issue.
4. Data consistency — the hook protects against the §8
downgrade scenario: a node rolled back to a pre-htfifo
binary that still gets elected leader of a partitioned-queue
shard would otherwise read/write under the legacy keyspace
and silently corrupt the queue. The hook steps it down via
TransferLeadership before any client request lands. No issue.
5. Test coverage — 7 raftengine observer tests (mirror of
leader-loss panic / empty / deregister / idempotence / nil
guards / sentinel-pointer disambiguation) + 11 main-side
helper tests (htfifo no-op, no-partitioned-queue no-op,
startup-already-leader refuses, startup-follower waits,
per-acquisition fires, deregister propagates, transfer error
logged, nil-admin safe, partitionedGroupSet flatten / empty /
malformed).
…723) ## Summary Phase 3.D PR 4-B-3b — closes the routing+leadership-refusal pair the §11 PR 4 contract requires before a binary is "marked htfifo-eligible". With this PR landed, every node's `/sqs_health` advertises `htfifo` and the §8 downgrade-protection safeguard is in place. PR 5 (next) lifts the `PartitionCount > 1` dormancy gate and wires `PollSQSHTFIFOCapability` (#721) into the CreateQueue gate in the same commit. ## What's added ### raftengine: leader-acquired observer (mirror of leader-loss) - `raftengine.Admin` gains `RegisterLeaderAcquiredCallback`. Same contract as `RegisterLeaderLossCallback` (non-blocking, panic-contained, sentinel-pointer deregister) but fires on the `previous!=Leader → status==Leader` edge in `refreshStatus`. - The etcd backend's slot-management is now shared between leader-loss and leader-acquired via `registerLeaderCallback` + `gatherLeaderCallbacks` helpers — this satisfies the `dupl` lint check and keeps both paths consistent in one place. - 7 observer tests mirror the existing leader-loss tests: panic-containment, empty-list safety, deregister removal, idempotence, nil-fn safe, nil-receiver safe, identical-fn disambiguation. ### main: SQS leadership-refusal hook - `main_sqs_leadership_refusal.go`: - `installSQSLeadershipRefusal(ctx, admin, gid, partitionedGroups, advertisesHTFIFO, logger) func()` — startup check + per-acquisition observer. - `installSQSLeadershipRefusalAcrossGroups(...)` — composite installer iterating every shard runtime. - `partitionedGroupSet(partitionMap, logger)` — flattens `--sqsFifoPartitionMap` into the `{gid → bool}` set the hook consumes. - `sqsAdvertisesHTFIFO()` — wraps `adapter.AdvertisesHTFIFO()`. - `run()` installs the composite refusal across runtimes after the coordinator is built; deregister flows through `cleanup`. - `TransferLeadership` runs in a goroutine because the leader-acquired callback contract is non-blocking — a synchronous admin RPC inside the callback would stall `refreshStatus`. - 11 helper tests cover the contract: htfifo no-op, no-partitioned-queue no-op, startup-already-leader refuses, startup-follower waits, per-acquisition fires, deregister propagates, transfer-error logged, nil-admin safe, `partitionedGroupSet` flatten / empty-input / malformed-ref-skip. ### adapter: `AdvertisesHTFIFO` + flag flip - `adapter.AdvertisesHTFIFO()` exposes the package-private flag. - `htfifoCapabilityAdvertised = false → true`. Both the routing wiring (PR 4-B-2 #715) and the leadership-refusal hook (this PR) are now in the binary, so the design's "marked htfifo-eligible" bar is met. ## What's still gated PR 5 lifts the `PartitionCount > 1` dormancy gate AND wires `PollSQSHTFIFOCapability` (#721) into the CreateQueue gate in the same commit. Until PR 5 lands, no partitioned queue can land in production — the leadership-refusal hook is dormant in the happy-path runtime (every binary past this PR advertises `htfifo`, and the per-group early return keeps the hook out of the hot path). ## Test plan - [x] `go test -race ./internal/raftengine/etcd/` — 7 new + existing tests pass. - [x] `go test -race ./kv/` — existing 30+ tests pass (verified the tests that previously failed due to the misplaced interface method now pass). - [x] `go test -race ./adapter/` + `go test -race .` — all pass. - [x] `golangci-lint ./kv/... ./adapter/... ./internal/raftengine/... .` — clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — control-plane only; no FSM/Pebble/retention path. The hook calls TransferLeadership which is itself an admin action with the same data-loss profile as a graceful manual transfer. No issue. 2. **Concurrency / distributed failures** — leader-acquired callback contract mirrors leader-loss (non-blocking, panic-contained, sentinel-pointer deregister). `refuse()` offloads `TransferLeadership` to a goroutine so `refreshStatus` stays non-blocking. Multiple goroutines firing for the same group serialize on raft's admin channel; worst case is one redundant transfer attempt, which is idempotent on the raft side. No issue. 3. **Performance** — leader-acquired callbacks fire only on the transition edge (rare event); no per-request hot path cost. The early return on `advertisesHTFIFO=true` means production-binary hosts pay zero overhead. No issue. 4. **Data consistency** — the hook protects against the §8 downgrade scenario: a node rolled back to a pre-htfifo binary that still gets elected leader of a partitioned-queue shard would otherwise read/write under the legacy keyspace and silently corrupt the queue. The hook steps it down via TransferLeadership before any client request lands. No issue. 5. **Test coverage** — 7 raftengine observer tests + 11 main-side helper tests (18 new). Existing kv lease tests confirm the misplaced-interface bug from first draft was caught and fixed. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * SQS health endpoint now advertises HT-FIFO capability status for client detection. * Raft administration introduces leader-acquired callbacks, enabling systems to respond immediately to leadership acquisition events. * SQS FIFO queue groups now automatically refuse and transfer leadership when HT-FIFO capability is unavailable. * **Tests** * Added comprehensive test coverage for leader-acquired callback registration, deregistration, panic isolation, and SQS leadership refusal logic. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Phase 3.D PR 4-B-3a — adds the stateless
htfifocapability poller that PR 5's CreateQueue gate consumes. Stacks on the now-merged #715 (PR 4-B-2, partition resolver). This PR is purely additive: new helper file, new test file, no existing code touched. Next is PR 4-B-3b (leadership-refusal hook + flag flip).What's added
adapter/sqs_capability_poller.go:HTFIFOCapabilityReport{AllAdvertise, Peers}— binary go/no-go signal + per-peer detail for operator triage.HTFIFOCapabilityPeerStatus{Address, HasHTFIFO, Capabilities, Error}— one peer's polling result.PollSQSHTFIFOCapability(ctx, client, peers)— concurrent goroutine-per-peer poll, indexed-channel result aggregation (race-free).defaultSQSCapabilityPollTimeout = 3sso a single hung peer can't stall the cluster-wide poll.io.LimitReaderso a misconfigured peer can't drain memory.host:portand fullhttp://…/https://…URLs both accepted.AllAdvertise=true(caller validates list completeness).What's NOT added (deferred)
htfifoCapabilityAdvertisedstaysfalse. PR 4-B-3b adds the §8 leadership-refusal hook + per-acquisition observer inkv/raftengine/etcdand flips the flag.CreateQueuedoes NOT yet call this helper. PR 5 lifts thePartitionCount > 1dormancy gate AND wires the capability check in the same commit (per the §11 rollout plan's "gate-and-lift atomically" rule).Test plan
9 top-level tests covering the contract surface:
TestPollSQSHTFIFOCapability_AllAdvertise— happy path, multiple peers.TestPollSQSHTFIFOCapability_OneMissingFailsClosed— old-binary peer with empty capabilities dropsAllAdvertise.TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed— HTTP 500, connection refused, malformed JSON all surface asError.TestPollSQSHTFIFOCapability_TimeoutFailsClosed— hung peer respects per-peer timeout, full poll bounded.TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue— empty peer list contract.TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed—""entry in peers slice surfaces explicit Error.TestPollSQSHTFIFOCapability_FullURLPeer—http://andhttps://URLs accepted alongside barehost:port.TestPollSQSHTFIFOCapability_ConcurrentPolling— 5×200ms peers finish in well under 1s.TestPollSQSHTFIFOCapability_RespectsBodyLimit— 10 KiB response truncated mid-string surfaces as JSON parse error, not garbage decode.TestBuildSQSHealthURL— URL construction edge cases.go test -race ./adapter/...pass.golangci-lint ./adapter/...clean.Self-review (per CLAUDE.md)
context.WithTimeoutso a slow peer can't stall the rest. Body capped viaio.LimitReader. No issue.