feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723
feat(sqs): leadership-refusal hook + flag flip (Phase 3.D PR 4-B-3b)#723
Conversation
Closes the Phase 3.D PR 4-B-2 → PR 5 chain by adding the §8
leadership-refusal safeguard and flipping htfifoCapabilityAdvertised
to true. This is the final piece of the routing+leadership-refusal
pair the §11 PR 4 contract requires before a binary is "marked
htfifo-eligible".
What changes
raftengine: leader-acquired observer (mirror of leader-loss)
- raftengine.Admin gains RegisterLeaderAcquiredCallback. Same
contract as RegisterLeaderLossCallback (non-blocking, panic-
contained, sentinel-pointer deregister) but fires on the
previous!=Leader → status==Leader edge instead of leaving
the leader role.
- etcd backend: new leaderAcquiredCbs slice + mutex; fires from
refreshStatus on the leader-acquired edge AFTER e.isLeader is
published, so a callback that calls engine.State() observes
StateLeader.
- register / fire helpers extracted (registerLeaderCallback,
gatherLeaderCallbacks) so the leader-loss and leader-acquired
paths share one slot management implementation. The dupl
lint warning that triggered on first draft is the test that
keeps this consolidated.
- Test coverage: leader_acquired_callback_test.go mirrors
leader_loss_callback_test.go — panic containment, empty-list
safety, deregister removal, deregister idempotence, nil-fn
safety, nil-receiver safety, identical-fn disambiguation.
main: SQS leadership-refusal hook
- main_sqs_leadership_refusal.go: installSQSLeadershipRefusal +
installSQSLeadershipRefusalAcrossGroups + partitionedGroupSet.
- On install, if the engine is currently leader of a group
hosting a partitioned queue and the binary lacks htfifo, the
hook calls TransferLeadership immediately. The
leader-acquired observer then keeps catching future
transitions for the same group.
- TransferLeadership runs in a goroutine because the
leader-acquired callback contract is non-blocking — a
synchronous admin RPC inside the callback would stall
refreshStatus.
- sqsLeadershipController is the small interface the helper
accepts (subset of raftengine.Admin) so test doubles don't
have to satisfy the full Admin surface.
- run() wires installSQSLeadershipRefusalAcrossGroups after the
coordinator is built; the composite deregister flows through
cleanup.
adapter: AdvertisesHTFIFO + flag flip
- adapter.AdvertisesHTFIFO() reports the htfifo capability flag
so main.go can read it without touching the package-private
constant.
- htfifoCapabilityAdvertised = false → true. Both the routing
wiring (PR 4-B-2 #715) and the leadership-refusal hook (this
PR) are now in the binary, so the design's "marked
htfifo-eligible" bar is met.
What's still gated
PR 5 lifts the PartitionCount > 1 dormancy gate AND wires
PollSQSHTFIFOCapability (PR 4-B-3a #721) into the CreateQueue
gate in the same commit. Until PR 5 lands, no partitioned queue
can land in production — the leadership-refusal hook is dormant
in the happy-path runtime (every binary past this PR advertises
htfifo, and the per-group early return keeps the hook out of the
hot path).
Self-review (per CLAUDE.md)
1. Data loss — control-plane only; no FSM/Pebble/retention path.
The hook calls TransferLeadership which is itself an admin
action with the same data-loss profile as a graceful manual
transfer. No issue.
2. Concurrency / distributed failures — leader-acquired callback
contract mirrors leader-loss (non-blocking, panic-contained,
sentinel-pointer deregister). refuse() offloads the actual
TransferLeadership to a goroutine so refreshStatus stays
non-blocking. Multiple goroutines calling refuse() for the
same group queue serially in raft's admin channel; the worst
case is one redundant transfer attempt, which is idempotent
on the raft side. No issue.
3. Performance — leader-acquired callbacks fire only on the
transition edge (rare event); no per-request hot path cost.
The early return on advertisesHTFIFO=true means
production-binary hosts pay zero overhead. No issue.
4. Data consistency — the hook protects against the §8
downgrade scenario: a node rolled back to a pre-htfifo
binary that still gets elected leader of a partitioned-queue
shard would otherwise read/write under the legacy keyspace
and silently corrupt the queue. The hook steps it down via
TransferLeadership before any client request lands. No issue.
5. Test coverage — 7 raftengine observer tests (mirror of
leader-loss panic / empty / deregister / idempotence / nil
guards / sentinel-pointer disambiguation) + 11 main-side
helper tests (htfifo no-op, no-partitioned-queue no-op,
startup-already-leader refuses, startup-follower waits,
per-acquisition fires, deregister propagates, transfer error
logged, nil-admin safe, partitionedGroupSet flatten / empty /
malformed).
|
@claude review Phase 3.D PR 4-B-3b — leadership-refusal hook (raftengine leader-acquired observer + main-side helper) and |
|
Claude finished @bootjp's task in 3m 33s —— View job Review: Phase 3.D PR 4-B-3b — Leadership-Refusal Hook + Flag Flip
Finding 1 — Bug:
|
📝 WalkthroughWalkthroughEnables HT-FIFO capability advertisement in the SQS adapter, adds a leader-acquired callback API to the Raft engine, and installs an SQS leadership-refusal observer that forces non-HT-FIFO nodes hosting partitioned FIFO groups to transfer leadership. Changes
Sequence DiagramsequenceDiagram
participant Coord as Coordinator
participant Node as Runtime / Node
participant Eng as Raft Engine
participant SQS as SQS Refusal Hook
Coord->>Node: Start runtime (with partition map)
Node->>Eng: WithPartitionResolver(...) initialised
alt Binary advertises HT-FIFO
Note over Coord,Eng: No refusal observers installed
else Binary does NOT advertise HT-FIFO
Coord->>Eng: installSQSLeadershipRefusalAcrossGroups(...)
Eng->>Eng: RegisterLeaderAcquiredCallback(refusalFn)
alt Engine is already Leader
Eng->>SQS: refusalFn invoked immediately (sync)
activate SQS
SQS->>Eng: TransferLeadership() (async)
deactivate SQS
else Starts as Follower
Note over Eng: Wait for leader-acquire event
end
alt Later acquires leadership
Eng->>Eng: refreshStatus → detects Leader
Eng->>SQS: fire registered refusalFn
activate SQS
SQS->>Eng: TransferLeadership() (async)
deactivate SQS
end
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fe99a436e5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // mutex so callers can fire them without holding the lock. | ||
| // Mirrors the snapshot-then-fire pattern used by the per-callback | ||
| // invoke helpers. | ||
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { |
There was a problem hiding this comment.
Lock callback slice before taking snapshot
gatherLeaderCallbacks takes cbs by value, so both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks read e.leader* Cbs outside the mutex before mu.Lock() runs. That regresses the previous lock-then-copy behavior and introduces a real data race when a callback is registered/deregistered concurrently with refreshStatus firing callbacks (e.g., startup/shutdown churn), which can yield undefined behavior or race-detector failures. Please move the slice read under the lock (e.g., pass a pointer to the slice and dereference only after locking).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request enables the SQS HT-FIFO capability and implements a leadership-refusal safeguard by introducing a leader-acquisition callback mechanism in the Raft engine. This allows nodes to automatically transfer leadership if they are ineligible to host partitioned queues. Review feedback highlighted critical data races in the shared callback firing logic and suggested enhancing panic recovery logs with node identity and stack traces for better production debugging.
| // invokeLeaderLossCallback) so a bug in one holder cannot break | ||
| // subsequent callbacks or crash the process. | ||
| func (e *Engine) fireLeaderLossCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { |
There was a problem hiding this comment.
This line contains a data race. The slice header of e.leaderLossCbs is read here to be passed by value to gatherLeaderCallbacks, but this read is not protected by e.leaderLossCbsMu. If another goroutine modifies the slice (e.g., via RegisterLeaderLossCallback) concurrently, it can lead to undefined behavior or a crash. The slice should be passed by pointer so that gatherLeaderCallbacks can read the header under the lock.
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, e.leaderLossCbs) { | |
| for _, fn := range gatherLeaderCallbacks(&e.leaderLossCbsMu, &e.leaderLossCbs) { |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { | ||
| mu.Lock() | ||
| out := make([]func(), len(cbs)) | ||
| for i, c := range cbs { | ||
| out[i] = c.fn | ||
| } | ||
| mu.Unlock() | ||
| return out | ||
| } |
There was a problem hiding this comment.
To prevent data races on the slice header, this helper should accept a pointer to the slice. This allows the function to access the slice's pointer, length, and capacity safely while holding the mutex.
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs []leaderCallbackSlot) []func() { | |
| mu.Lock() | |
| out := make([]func(), len(cbs)) | |
| for i, c := range cbs { | |
| out[i] = c.fn | |
| } | |
| mu.Unlock() | |
| return out | |
| } | |
| func gatherLeaderCallbacks(mu *sync.Mutex, cbs *[]leaderCallbackSlot) []func() { | |
| mu.Lock() | |
| defer mu.Unlock() | |
| out := make([]func(), len(*cbs)) | |
| for i, c := range *cbs { | |
| out[i] = c.fn | |
| } | |
| return out | |
| } |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| // synchronously. Same panic-containment + non-blocking contract | ||
| // as fireLeaderLossCallbacks. | ||
| func (e *Engine) fireLeaderAcquiredCallbacks() { | ||
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { |
There was a problem hiding this comment.
This line also has a data race. e.leaderAcquiredCbs is read without holding e.leaderAcquiredCbsMu before being passed to gatherLeaderCallbacks. It should be passed by pointer instead.
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, e.leaderAcquiredCbs) { | |
| for _, fn := range gatherLeaderCallbacks(&e.leaderAcquiredCbsMu, &e.leaderAcquiredCbs) { |
References
- To prevent race conditions and the use of stale data, ensure that shared variables are accessed and checked while holding the appropriate lock.
| func (e *Engine) invokeLeaderAcquiredCallback(fn func()) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| slog.Error("etcd raft engine: leader-acquired callback panic", | ||
| "recover", r) | ||
| } | ||
| }() | ||
| fn() | ||
| } |
There was a problem hiding this comment.
The panic recovery logic here is less informative than its counterpart in invokeLeaderLossCallback. It should include the node identity and a stack trace to facilitate debugging of policy hook failures in production.
func (e *Engine) invokeLeaderAcquiredCallback(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("etcd raft engine: leader-acquired callback panicked",
slog.String("node_id", e.localID),
slog.Uint64("raft_node_id", e.nodeID),
slog.Any("panic", r),
slog.String("stack", string(debug.Stack())),
)
}
}()
fn()
}References
- Include node identity in logs and metadata to ensure correct context and prevent incorrect metadata reuse.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 1028-1039: gatherLeaderCallbacks currently takes the callback
slice by value and locks after reading it, which races with concurrent
register/deregister; change gatherLeaderCallbacks to lock before reading the
slice (accept the slice as a pointer or otherwise take a locked reference) so
the snapshot of []leaderCallbackSlot is performed while mu is held, and update
callers fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks to pass
&e.leaderLossCbs and &e.leaderAcquiredCbs (using the same mu) so the function
reads a consistent slice of leaderCallbackSlot entries (and then returns the
copied []func() for use outside the lock).
In `@main_sqs_leadership_refusal.go`:
- Around line 97-103: Register the leader-acquired callback before doing the
immediate startup leader check: call
admin.RegisterLeaderAcquiredCallback(refuse) first to subscribe to future
acquisitions, then check admin.State() == raftengine.StateLeader and call
refuse() if true; this ensures the refuse() callback isn't missed if leadership
is gained between the check and registration. Reference:
admin.RegisterLeaderAcquiredCallback, admin.State(), refuse().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1c809151-22b1-4912-9ca5-c83d5b143f5f
📒 Files selected for processing (7)
adapter/sqs.gointernal/raftengine/engine.gointernal/raftengine/etcd/engine.gointernal/raftengine/etcd/leader_acquired_callback_test.gomain.gomain_sqs_leadership_refusal.gomain_sqs_leadership_refusal_test.go
PR #723 round 1 review caught four items. This commit lands all four (one is a real concurrency bug I introduced when refactoring the leader-callback duplication). 1) P1 (Codex) / HIGH (Gemini) — data race in gatherLeaderCallbacks My round 1 refactor extracted gatherLeaderCallbacks(mu, cbs []leaderCallbackSlot). Passing the slice by value means the slice header (pointer, length, capacity) is dereferenced at the call site — i.e. BEFORE mu.Lock() runs inside the helper. Concurrent registerLeaderCallback running on another goroutine could mutate the header while the caller is reading it, triggering -race detector failures or undefined behaviour. Fix: gatherLeaderCallbacks now takes *[]leaderCallbackSlot (pointer to slice). The slice is dereferenced INSIDE the locked section, closing the race. Both fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks updated to pass &e.leader*Cbs. This regresses to the pre-refactor "lock-then-copy" semantics the original RegisterLeaderLossCallback had. The lesson: passing a slice by value to a "thread-safe" helper does not give you mutex safety, because the function-argument evaluation happens before the function body locks anything. 2) MEDIUM (Gemini) / Claude finding 1 — invokeLeaderAcquiredCallback log fields Mirror invokeLeaderLossCallback's slog.Error fields: slog.String("node_id", e.localID) slog.Uint64("raft_node_id", e.nodeID) slog.Any("panic", r) slog.String("stack", string(debug.Stack())) Without these, an SQS leadership-refusal hook panicking in production would leave operators with only the recovered value to grep on. Same fields the leader-loss path has, so cross- family triage is consistent. 3) Claude finding 2 — TOCTOU window in startup leader check installSQSLeadershipRefusal previously did: if admin.State() == StateLeader { refuse() } return admin.RegisterLeaderAcquiredCallback(refuse) Window: the engine could win an election between State() and RegisterLeaderAcquiredCallback returning. refreshStatus would fire fireLeaderAcquiredCallbacks before refuse is in the slice, and the hook would miss that acquisition. Fix: post-registration State() re-check. If the install-time read returned follower but the post-registration read returns leader, refuse() fires for the in-flight election. refuse() is idempotent (TransferLeadership becomes a no-op once a transfer is already in flight), so a double-invocation across the boundary is safe. 4) Claude finding 3 — installSQSLeadershipRefusalAcrossGroups branches Three new tests: - TestInstallSQSLeadershipRefusalAcrossGroups_ComposesDeregisters: multi-group install + composite deregister fan-out. - TestInstallSQSLeadershipRefusalAcrossGroups_NoPartitionedQueueIsNoOp: empty partition map → no per-group install runs. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsNilRuntimes: nil-runtime / nil-engine guard in the iterator. - TestInstallSQSLeadershipRefusalAcrossGroups_SkipsEngineWithoutObserver: type-assertion-miss log path (engine that doesn't satisfy sqsLeadershipController). Plus TestInstallSQSLeadershipRefusal_TOCTOUWindowCovered which pins fix #3 directly: a fake controller whose State() flips during RegisterLeaderAcquiredCallback exercises the post-registration re-check. 5) Audit per the lessons-learned discipline The semantic change here is the gatherLeaderCallbacks signature. grep -rn confirmed only fireLeaderLossCallbacks and fireLeaderAcquiredCallbacks call it — both are local to this file, both updated together. No external callers to audit.
|
@claude review Round 2 — all four round 1 findings addressed: P1 (Codex) / HIGH (Gemini) — real data race in MEDIUM (Gemini) / Claude finding 1 — Claude finding 2 (TOCTOU) — Claude finding 3 (branch coverage) — 4 new Lessons-learned audit: semantic change is
|
|
Claude finished @bootjp's task in 3m 7s —— View job Review: Round 2 — Phase 3.D PR 4-B-3b
All four Round 1 findings are correctly addressed. No new issues introduced. Fix #1 —
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 634fbcf954
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := admin.TransferLeadership(ctx); err != nil { | ||
| logger.Warn("sqs: TransferLeadership failed", | ||
| "group", gid, "err", err) | ||
| } |
There was a problem hiding this comment.
Fail closed when leadership transfer is rejected
When advertisesHTFIFO is false and this hook fires, a failed TransferLeadership (for example, no eligible transferee in a single-voter group or peers unavailable) is only logged and then ignored. In that case the node remains leader and continues serving despite the hook deciding it is unsafe to lead, which defeats the downgrade-protection guarantee and can still allow incorrect behavior on partitioned queues.
Useful? React with 👍 / 👎.
| for queue, routing := range partitionMap { | ||
| for _, groupRef := range routing.groups { | ||
| id, err := strconv.ParseUint(groupRef, 10, 64) |
There was a problem hiding this comment.
Exclude single-partition routes from refusal group set
partitionedGroupSet currently marks groups from every --sqsFifoPartitionMap entry, but PartitionCount=1 is valid input and is not a partitioned queue in the HT-FIFO sense. This makes the refusal hook run for groups that only host single-partition routes, causing unnecessary leadership transfers (and possible avoidable unavailability on constrained groups) without any data-safety benefit.
Useful? React with 👍 / 👎.
CI failure on PR #723 (test (ubuntu-latest)): TestServerMapsEngineAdminMethods returned "add voter is not supported by this raft engine" because the type-assertion at internal/raftadmin/server.go:21 (any(engine).(raftengine.Admin)) silently fell back to admin=nil after Phase 3.D PR 4-B-3b extended the Admin interface with RegisterLeaderAcquiredCallback. The test stub fakeEngine never gained the method, so it stopped satisfying the interface — and every admin RPC mapped through NewServer returned Unimplemented. This is exactly the "semantic-change-without-full-caller-audit" miss the recent lessons-learned discipline targets. PR #723's audit grep was on the production resolver helpers, not on the Admin interface itself; raftadmin's stub slipped through. Fix: add the missing method on the stub. Returns a no-op deregister — raftadmin doesn't exercise the leader-acquired observer (PR 4-B-3b's SQS leadership-refusal lives in main_sqs_leadership_refusal.go), the stub's only obligation is to satisfy the type assertion. go test -race ./... and golangci-lint clean.
|
@claude review CI failure fix (round 3): Failure: Lessons-learned miss: my round 2 audit was Fix:
|
|
Claude finished @bootjp's task in 1m 33s —— View job Review: Round 3 — CI Failure Fix (
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/raftengine/etcd/engine.go (1)
2604-2642:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse the published post-close state for the leader-acquired edge.
refreshStatus()rewrites the published state toStateShutdownwhene.closedis set, but this new branch still keys off the pre-close localstatus.State. Iffail()/shutdown closes the engine and the loop reaches one morerefreshStatus(),previousisshutdownwhilestatus.Statecan still beleader, sofireLeaderAcquiredCallbacks()runs during teardown. That violates the callback contract and can spuriously triggerTransferLeadershipfrommain_sqs_leadership_refusal.goagainst a closed engine.Proposed fix
e.mu.Lock() e.status = status if e.closed { e.status.State = raftengine.StateShutdown } + effectiveState := e.status.State // Use the voter count, NOT len(e.peers), so a 1-voter + N-learner // cluster correctly hits the leader-of-1 fast path: a learner does // not vote and is not part of the lease-read denominator. See // docs/design/2026_04_26_proposed_raft_learner.md §4.6. voterCount := e.voterCount e.mu.Unlock() - e.isLeader.Store(status.State == raftengine.StateLeader) + e.isLeader.Store(effectiveState == raftengine.StateLeader) - if status.State == raftengine.StateLeader && voterCount <= 1 { + if effectiveState == raftengine.StateLeader && voterCount <= 1 { e.singleNodeLeaderAckMonoNs.Store(monoclock.Now().Nanos()) } else { e.singleNodeLeaderAckMonoNs.Store(0) } - if status.State == raftengine.StateLeader { + if effectiveState == raftengine.StateLeader { e.leaderOnce.Do(func() { close(e.leaderReady) }) } - if previous != raftengine.StateLeader && status.State == raftengine.StateLeader { + if previous != raftengine.StateLeader && effectiveState == raftengine.StateLeader { e.fireLeaderAcquiredCallbacks() } - if previous == raftengine.StateLeader && status.State != raftengine.StateLeader { + if previous == raftengine.StateLeader && effectiveState != raftengine.StateLeader { e.failPending(errors.WithStack(errNotLeader)) e.ackTracker.reset() e.fireLeaderLossCallbacks() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 2604 - 2642, refreshStatus() currently mutates e.status to StateShutdown when e.closed but later checks still use the local variable status, allowing a shutdown path to look like a leader and call fireLeaderAcquiredCallbacks(); fix by reading the published/post-close state (e.status) for the leader-related decisions: after unlocking (or immediately after updating e.status) capture the canonical current state (e.status.State) into a local like currentState and use that instead of status.State for e.isLeader.Store, the single-node ack store, the leaderOnce/leaderReady logic, and the previous->current comparison that gates e.fireLeaderAcquiredCallbacks().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@main_sqs_leadership_refusal.go`:
- Around line 143-153: partitionedGroupSet() currently marks every routed group
in partitionMap as partitioned; change it to only add group IDs when the route
is actually partitioned by checking routing.partitionCount > 1 before iterating
routing.groups. Keep the existing strconv.ParseUint error handling and
logger.Warn for non-uint64 groupRef, but wrap the loop that parses and sets
out[id] = true inside an if routing.partitionCount > 1 block (or otherwise skip
routes with partitionCount == 1) so ordinary FIFO groups are not included.
---
Outside diff comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 2604-2642: refreshStatus() currently mutates e.status to
StateShutdown when e.closed but later checks still use the local variable
status, allowing a shutdown path to look like a leader and call
fireLeaderAcquiredCallbacks(); fix by reading the published/post-close state
(e.status) for the leader-related decisions: after unlocking (or immediately
after updating e.status) capture the canonical current state (e.status.State)
into a local like currentState and use that instead of status.State for
e.isLeader.Store, the single-node ack store, the leaderOnce/leaderReady logic,
and the previous->current comparison that gates e.fireLeaderAcquiredCallbacks().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d9c0fca5-2e68-4470-955f-bb8f3a00c4a8
📒 Files selected for processing (4)
internal/raftadmin/server_test.gointernal/raftengine/etcd/engine.gomain_sqs_leadership_refusal.gomain_sqs_leadership_refusal_test.go
| out := make(map[uint64]bool) | ||
| for queue, routing := range partitionMap { | ||
| for _, groupRef := range routing.groups { | ||
| id, err := strconv.ParseUint(groupRef, 10, 64) | ||
| if err != nil { | ||
| logger.Warn("sqs: leadership-refusal: skipping non-uint64 group reference (config validation bypass?)", | ||
| "queue", queue, "group_ref", groupRef, "err", err) | ||
| continue | ||
| } | ||
| out[id] = true | ||
| } |
There was a problem hiding this comment.
Only include genuinely partitioned routes in partitionedGroupSet().
This currently marks every routed group as “partitioned”, even when routing.partitionCount == 1. That makes the refusal hook apply to ordinary FIFO groups too, which can force unnecessary leadership transfers on binaries that don't advertise htfifo. The set should only include routes that are actually split.
Proposed fix
out := make(map[uint64]bool)
for queue, routing := range partitionMap {
+ if routing.partitionCount <= 1 {
+ continue
+ }
for _, groupRef := range routing.groups {
id, err := strconv.ParseUint(groupRef, 10, 64)
if err != nil {
logger.Warn("sqs: leadership-refusal: skipping non-uint64 group reference (config validation bypass?)",
"queue", queue, "group_ref", groupRef, "err", err)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| out := make(map[uint64]bool) | |
| for queue, routing := range partitionMap { | |
| for _, groupRef := range routing.groups { | |
| id, err := strconv.ParseUint(groupRef, 10, 64) | |
| if err != nil { | |
| logger.Warn("sqs: leadership-refusal: skipping non-uint64 group reference (config validation bypass?)", | |
| "queue", queue, "group_ref", groupRef, "err", err) | |
| continue | |
| } | |
| out[id] = true | |
| } | |
| out := make(map[uint64]bool) | |
| for queue, routing := range partitionMap { | |
| if routing.partitionCount <= 1 { | |
| continue | |
| } | |
| for _, groupRef := range routing.groups { | |
| id, err := strconv.ParseUint(groupRef, 10, 64) | |
| if err != nil { | |
| logger.Warn("sqs: leadership-refusal: skipping non-uint64 group reference (config validation bypass?)", | |
| "queue", queue, "group_ref", groupRef, "err", err) | |
| continue | |
| } | |
| out[id] = true | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@main_sqs_leadership_refusal.go` around lines 143 - 153, partitionedGroupSet()
currently marks every routed group in partitionMap as partitioned; change it to
only add group IDs when the route is actually partitioned by checking
routing.partitionCount > 1 before iterating routing.groups. Keep the existing
strconv.ParseUint error handling and logger.Warn for non-uint64 groupRef, but
wrap the loop that parses and sets out[id] = true inside an if
routing.partitionCount > 1 block (or otherwise skip routes with partitionCount
== 1) so ordinary FIFO groups are not included.
Summary
Phase 3.D PR 4-B-3b — closes the routing+leadership-refusal pair the §11 PR 4 contract requires before a binary is "marked htfifo-eligible". With this PR landed, every node's
/sqs_healthadvertiseshtfifoand the §8 downgrade-protection safeguard is in place. PR 5 (next) lifts thePartitionCount > 1dormancy gate and wiresPollSQSHTFIFOCapability(#721) into the CreateQueue gate in the same commit.What's added
raftengine: leader-acquired observer (mirror of leader-loss)
raftengine.AdmingainsRegisterLeaderAcquiredCallback. Same contract asRegisterLeaderLossCallback(non-blocking, panic-contained, sentinel-pointer deregister) but fires on theprevious!=Leader → status==Leaderedge inrefreshStatus.registerLeaderCallback+gatherLeaderCallbackshelpers — this satisfies thedupllint check and keeps both paths consistent in one place.main: SQS leadership-refusal hook
main_sqs_leadership_refusal.go:installSQSLeadershipRefusal(ctx, admin, gid, partitionedGroups, advertisesHTFIFO, logger) func()— startup check + per-acquisition observer.installSQSLeadershipRefusalAcrossGroups(...)— composite installer iterating every shard runtime.partitionedGroupSet(partitionMap, logger)— flattens--sqsFifoPartitionMapinto the{gid → bool}set the hook consumes.sqsAdvertisesHTFIFO()— wrapsadapter.AdvertisesHTFIFO().run()installs the composite refusal across runtimes after the coordinator is built; deregister flows throughcleanup.TransferLeadershipruns in a goroutine because the leader-acquired callback contract is non-blocking — a synchronous admin RPC inside the callback would stallrefreshStatus.partitionedGroupSetflatten / empty-input / malformed-ref-skip.adapter:
AdvertisesHTFIFO+ flag flipadapter.AdvertisesHTFIFO()exposes the package-private flag.htfifoCapabilityAdvertised = false → true. Both the routing wiring (PR 4-B-2 feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2) #715) and the leadership-refusal hook (this PR) are now in the binary, so the design's "marked htfifo-eligible" bar is met.What's still gated
PR 5 lifts the
PartitionCount > 1dormancy gate AND wiresPollSQSHTFIFOCapability(#721) into the CreateQueue gate in the same commit. Until PR 5 lands, no partitioned queue can land in production — the leadership-refusal hook is dormant in the happy-path runtime (every binary past this PR advertiseshtfifo, and the per-group early return keeps the hook out of the hot path).Test plan
go test -race ./internal/raftengine/etcd/— 7 new + existing tests pass.go test -race ./kv/— existing 30+ tests pass (verified the tests that previously failed due to the misplaced interface method now pass).go test -race ./adapter/+go test -race .— all pass.golangci-lint ./kv/... ./adapter/... ./internal/raftengine/... .— clean.Self-review (per CLAUDE.md)
refuse()offloadsTransferLeadershipto a goroutine sorefreshStatusstays non-blocking. Multiple goroutines firing for the same group serialize on raft's admin channel; worst case is one redundant transfer attempt, which is idempotent on the raft side. No issue.advertisesHTFIFO=truemeans production-binary hosts pay zero overhead. No issue.Summary by CodeRabbit
New Features
Tests