add epoch to the add maintainer related opeartions#5181
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR propagates a uint64 maintainer_epoch through heartbeat protos and coordinator flows: Changefeeds expose epochs; operators capture and include epochs in add/remove messages and require epoch matches to finish; maintainers echo and validate epochs and ignore stale responses; dispatcher components accept/replace maintainer requests based on epoch comparisons. ChangesMaintainer Epoch Fencing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Tools execution failed with the following error: Failed to run tools: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a maintainer_epoch mechanism to fence maintainer scheduling generations from one another, updating protobuf definitions, coordinator operators, and maintainer components to propagate and validate the epoch. The review feedback identifies several critical issues: a potential runtime panic in handleAddMaintainer if info is nil, a compilation error in GetMaintainerEpoch due to comparing an atomic.Pointer directly to nil, and potential nil pointer dereferences in the Check methods of both AddMaintainerOperator and MoveMaintainerOperator if the status parameter is nil.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@coordinator/operator/operator_add.go`:
- Around line 87-90: The current maintainerEpochMatches function incorrectly
treats actual == 0 as a match; update it so only expected == 0 (operator opting
out) bypasses the fencing check and the receiver reporting zero does not count
as a match. Replace the logic in maintainerEpochMatches to return true only when
expected == 0 or when actual != 0 and expected == actual (i.e., remove the
actual == 0 branch) so Check no longer treats a zero-reported epoch as a
successful match.
In `@coordinator/operator/operator_move.go`:
- Around line 90-96: When m.originNodeStopped is true we must always initialize
m.maintainerEpoch from m.changefeed.GetMaintainerEpoch() before constructing the
AddMaintainer message; currently that assignment only runs when !m.bind so the
fallback path sends epoch 0. Move the call m.maintainerEpoch =
m.changefeed.GetMaintainerEpoch() so it executes unconditionally inside the
m.originNodeStopped branch, but keep the DB bind call
m.db.BindChangefeedToNode(m.origin, m.dest, m.changefeed) guarded by if !m.bind
and still set m.bind = true only there; then return
m.changefeed.NewAddMaintainerMessage(m.dest, m.maintainerEpoch).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a9ce3ffe-7823-41c7-bd00-1fb124290f54
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (12)
coordinator/changefeed/changefeed.gocoordinator/changefeed/changefeed_test.gocoordinator/controller.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.goheartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_manager_maintainers.gomaintainer/maintainer_test.gopkg/common/format.go
|
/test all |
|
/test all |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
coordinator/changefeed/changefeed.go (1)
273-301:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPass
MaintainerEpochinto coordinator remove requests instead of always using epoch-0.
coordinator/controller.gostill callschangefeed.RemoveMaintainerMessage(...)(epoch-0 wrapper) in bothhandleNonExistentChangefeedand the stale-removal loop infinishBootstrap. This setsheartbeatpb.RemoveMaintainerRequest.MaintainerEpoch = 0, and maintainer-sideshouldApplyMaintainerRemovetreatsrequestEpoch==0as always-apply, bypassing epoch fencing. Switch those callsites toRemoveMaintainerMessageWithEpoch(..., status.GetMaintainerEpoch())/RemoveMaintainerMessageWithEpoch(..., rm.status.GetMaintainerEpoch())(the coordinator already has these epochs from the incomingMaintainerStatus) to preserve the fencing behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/changefeed/changefeed.go` around lines 273 - 301, The coordinator currently calls RemoveMaintainerMessage(...) which sets MaintainerEpoch=0 and bypasses epoch fencing; update the two call sites in coordinator/controller.go — inside handleNonExistentChangefeed and inside the stale-removal loop in finishBootstrap — to call RemoveMaintainerMessageWithEpoch(...) and pass the correct epoch from the maintainer status (use status.GetMaintainerEpoch() in handleNonExistentChangefeed and rm.status.GetMaintainerEpoch() in the finishBootstrap loop) so heartbeatpb.RemoveMaintainerRequest.MaintainerEpoch carries the actual epoch and preserves fencing semantics.downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (1)
379-406:⚠️ Potential issue | 🟠 Major | ⚡ Quick winFix stale maintainer close requests replying
Success: true(no-op ack)
handleCloseRequestinitializesMaintainerCloseResponse{Success: true}and, on!shouldAcceptMaintainerRequest(...), only logs and then still falls through tosendResponse—so the sender maintainer can receive a success ack even thoughTryClosewas not executed.
Maintainer.onMaintainerCloseResponsetreatsresponse.Successas an acknowledgement whenresponse.MaintainerEpochmatches the maintainer’s current epoch, so this can incorrectly triggeronRemoveMaintainerfor a no-op close. Align stale handling with bootstrap/post-bootstrap by not replying (or setSuccess=false).🔧 Proposed fix
m.mutex.Lock() if manager, ok := m.dispatcherManagers[cfId]; ok { if !shouldAcceptMaintainerRequest(req.MaintainerEpoch, manager.GetMaintainerEpoch()) { log.Info("ignore stale maintainer close request", zap.Stringer("changefeedID", cfId), zap.Stringer("from", from), zap.Uint64("requestMaintainerEpoch", req.MaintainerEpoch), zap.Uint64("localMaintainerEpoch", manager.GetMaintainerEpoch())) + m.mutex.Unlock() + return nil } else if closed := manager.TryClose(req.Removed); closed { delete(m.dispatcherManagers, cfId) metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec() response.Success = true } else { response.Success = false } } m.mutex.Unlock()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 379 - 406, The code currently initializes response.Success=true and on a stale request (when !shouldAcceptMaintainerRequest(...)) only logs, allowing a success ack to be sent; change the stale handling inside handleCloseRequest so that when !shouldAcceptMaintainerRequest(req.MaintainerEpoch, manager.GetMaintainerEpoch()) you set response.Success = false and immediately send the response (via m.sendResponse) and return (ensuring you still unlock m.mutex before sending/returning), instead of falling through to the TryClose branch; reference symbols: shouldAcceptMaintainerRequest, manager.GetMaintainerEpoch, response (heartbeatpb.MaintainerCloseResponse), TryClose, m.sendResponse, and m.mutex.
🧹 Nitpick comments (1)
coordinator/operator/operator_controller.go (1)
130-130: 💤 Low valueLocal variable
changefeedshadows the imported package.
changefeed := oc.changefeedDB.GetByID(cfID)shadows thegithub.com/pingcap/ticdc/coordinator/changefeedimport. It's safe here since the package isn't referenced inside this function, but a rename (e.g.cf, matchingAddOperatorabove) avoids confusion and future breakage.♻️ Suggested rename
- changefeed := oc.changefeedDB.GetByID(cfID) - if changefeed == nil { + cf := oc.changefeedDB.GetByID(cfID) + if cf == nil { log.Warn("stop changefeed failed, changefeed not found", zap.String("role", oc.role), zap.Bool("removed", removed), zap.String("changefeed", cfID.Name())) if old, ok := oc.operators[cfID]; ok { return old.OP } return nil } - keyspaceID := changefeed.GetKeyspaceID() - maintainerEpoch := changefeed.GetMaintainerEpoch() + keyspaceID := cf.GetKeyspaceID() + maintainerEpoch := cf.GetMaintainerEpoch()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/operator/operator_controller.go` at line 130, The local variable name changefeed shadows the imported package changefeed; rename the local variable returned by oc.changefeedDB.GetByID(cfID) (e.g., to cf to match AddOperator) to avoid confusion and potential future bugs—update all subsequent references in this function from changefeed to cf and ensure no other identifiers collide with the imported package name.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@coordinator/changefeed/changefeed.go`:
- Around line 273-301: The coordinator currently calls
RemoveMaintainerMessage(...) which sets MaintainerEpoch=0 and bypasses epoch
fencing; update the two call sites in coordinator/controller.go — inside
handleNonExistentChangefeed and inside the stale-removal loop in finishBootstrap
— to call RemoveMaintainerMessageWithEpoch(...) and pass the correct epoch from
the maintainer status (use status.GetMaintainerEpoch() in
handleNonExistentChangefeed and rm.status.GetMaintainerEpoch() in the
finishBootstrap loop) so heartbeatpb.RemoveMaintainerRequest.MaintainerEpoch
carries the actual epoch and preserves fencing semantics.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 379-406: The code currently initializes response.Success=true and
on a stale request (when !shouldAcceptMaintainerRequest(...)) only logs,
allowing a success ack to be sent; change the stale handling inside
handleCloseRequest so that when
!shouldAcceptMaintainerRequest(req.MaintainerEpoch,
manager.GetMaintainerEpoch()) you set response.Success = false and immediately
send the response (via m.sendResponse) and return (ensuring you still unlock
m.mutex before sending/returning), instead of falling through to the TryClose
branch; reference symbols: shouldAcceptMaintainerRequest,
manager.GetMaintainerEpoch, response (heartbeatpb.MaintainerCloseResponse),
TryClose, m.sendResponse, and m.mutex.
---
Nitpick comments:
In `@coordinator/operator/operator_controller.go`:
- Line 130: The local variable name changefeed shadows the imported package
changefeed; rename the local variable returned by oc.changefeedDB.GetByID(cfID)
(e.g., to cf to match AddOperator) to avoid confusion and potential future
bugs—update all subsequent references in this function from changefeed to cf and
ensure no other identifiers collide with the imported package name.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0ee957cf-114f-4672-a5f0-3a84cf22a15d
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (17)
coordinator/changefeed/changefeed.gocoordinator/changefeed/changefeed_test.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.gocoordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.goheartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_epoch_test.gomaintainer/maintainer_manager_maintainers.go
🚧 Files skipped from review as they are similar to previous changes (4)
- coordinator/operator/operator_move_test.go
- coordinator/operator/operator_add_test.go
- coordinator/operator/operator_move.go
- coordinator/operator/operator_add.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@maintainer/maintainer.go`:
- Around line 1087-1089: The predicate in maintainerResponseEpochMatches
currently rejects responseEpoch == 0 when the local epoch is non-zero, breaking
bootstrap compatibility; update the function maintainerResponseEpochMatches to
accept a responseEpoch of 0 as a compatibility wildcard (in addition to the
existing localEpoch == 0 case) so compatibility responses are not dropped —
i.e., treat responseEpoch == 0 as matching the local epoch and preserve the
existing localEpoch == 0 behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e3e3c97f-01b4-4696-8219-ec7240e96676
📒 Files selected for processing (7)
coordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.gomaintainer/maintainer.gomaintainer/maintainer_epoch_test.gomaintainer/maintainer_manager_maintainers.go
🚧 Files skipped from review as they are similar to previous changes (6)
- maintainer/maintainer_manager_maintainers.go
- maintainer/maintainer_epoch_test.go
- coordinator/operator/operator_stop.go
- downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
- coordinator/operator/operator_stop_test.go
- downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go
|
/test all |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
coordinator/operator/operator_stop.go (1)
65-79:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMake the node match and finish transition atomic.
Line 69 snapshots
nodeID, but Line 79 can still mark the operator finished afterOnNodeRemovehas already switched the target tocoordinatorNodeID. That lets a late non-working report from the removed node retire the operator and skip the rescheduled remove on the coordinator.Proposed fix
func (m *StopChangefeedOperator) Check(from node.ID, status *heartbeatpb.MaintainerStatus) { if status == nil { return } - if from != m.getNodeID() { + m.mu.RLock() + defer m.mu.RUnlock() + + if from != m.nodeID { return } if !m.finished.Load() && status.State != heartbeatpb.ComponentState_Working && maintainerEpochCanBeStopped(m.maintainerEpoch, status.MaintainerEpoch) { log.Info("maintainer report non-working status", zap.Stringer("maintainer", m.cfID), zap.Uint64("operatorMaintainerEpoch", m.maintainerEpoch), zap.Uint64("statusMaintainerEpoch", status.MaintainerEpoch)) m.finished.Store(true) } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/operator/operator_stop.go` around lines 65 - 79, The Check method may race with OnNodeRemove because it calls m.getNodeID() twice implicitly (once for the comparison and again when deciding to finish); capture the current target node ID into a local variable (e.g., nodeID := m.getNodeID()) at the start of Check and use that single snapshot for the from comparison and the finished transition so the non-working report only retires the operator if the node still matches that snapshot; ensure you only call m.finished.Store(true) when from == nodeID and maintainerEpochCanBeStopped(...) and status checks all pass.coordinator/controller.go (1)
646-657:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't overwrite a newer persisted epoch with an older remote epoch.
allChangefeedscomes from metastore, soinfo.Epochis the durable generation. If a remote maintainer reports a smaller non-zero epoch here, this branch downgrades the in-memory changefeed and then records that stale maintainer as authoritative. That re-admits exactly the generation this PR is trying to fence after recovery. Please only adopt the remote epoch for compatibility/newer-generation cases, and treatremote < localas stale instead.Possible direction
- if epoch := rm.status.GetMaintainerEpoch(); epoch != 0 && info.Epoch != epoch { + if epoch := rm.status.GetMaintainerEpoch(); epoch != 0 && (info.Epoch == 0 || epoch > info.Epoch) { clonedInfo, err := info.Clone() if err != nil { log.Panic("clone changefeed info failed", zap.Stringer("changefeed", cfID), zap.Error(err)) } clonedInfo.Epoch = epoch info = clonedInfo + } else if epoch != 0 && info.Epoch > epoch { + _ = c.messageCenter.SendCommand(removeStaleMaintainerMessage(cfID, rm.nodeID, rm.status)) + continue }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/controller.go` around lines 646 - 657, The current code unconditionally replaces the durable in-memory epoch (info.Epoch) with the remote maintainer epoch from rm.status.GetMaintainerEpoch(), which can downgrade a newer persisted generation; change the logic in the branch around rm.status.GetMaintainerEpoch() so you only adopt the remote epoch when it is non-zero and strictly greater than info.Epoch (i.e., remote > local). If remote <= info.Epoch treat the remote as stale: do not clone/modify info.Epoch and do not call c.changefeedDB.AddReplicatingMaintainer based on the stale epoch; continue creating the changefeed with the existing info and only add the maintainer when the epoch check passes. Ensure this uses the existing symbols rm.status.GetMaintainerEpoch(), info.Clone(), changefeed.NewChangefeed(...), and c.changefeedDB.AddReplicatingMaintainer(...) to locate and update the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@coordinator/controller.go`:
- Around line 646-657: The current code unconditionally replaces the durable
in-memory epoch (info.Epoch) with the remote maintainer epoch from
rm.status.GetMaintainerEpoch(), which can downgrade a newer persisted
generation; change the logic in the branch around rm.status.GetMaintainerEpoch()
so you only adopt the remote epoch when it is non-zero and strictly greater than
info.Epoch (i.e., remote > local). If remote <= info.Epoch treat the remote as
stale: do not clone/modify info.Epoch and do not call
c.changefeedDB.AddReplicatingMaintainer based on the stale epoch; continue
creating the changefeed with the existing info and only add the maintainer when
the epoch check passes. Ensure this uses the existing symbols
rm.status.GetMaintainerEpoch(), info.Clone(), changefeed.NewChangefeed(...), and
c.changefeedDB.AddReplicatingMaintainer(...) to locate and update the code.
In `@coordinator/operator/operator_stop.go`:
- Around line 65-79: The Check method may race with OnNodeRemove because it
calls m.getNodeID() twice implicitly (once for the comparison and again when
deciding to finish); capture the current target node ID into a local variable
(e.g., nodeID := m.getNodeID()) at the start of Check and use that single
snapshot for the from comparison and the finished transition so the non-working
report only retires the operator if the node still matches that snapshot; ensure
you only call m.finished.Store(true) when from == nodeID and
maintainerEpochCanBeStopped(...) and status checks all pass.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 81789fb6-2656-4e0f-9de4-cd6cee16a38e
📒 Files selected for processing (7)
coordinator/controller.gocoordinator/controller_test.gocoordinator/operator/operator_stop.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.gomaintainer/maintainer.gomaintainer/maintainer_epoch_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- maintainer/maintainer_epoch_test.go
- downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
|
@3AceShowHand: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #5179
After the CDC owner is network-isolated from other CDC nodes and later recovers, an old maintainer generation can still be alive while a new owner/coordinator generation is scheduling the same changefeed. Without generation fencing, stale maintainer scheduling messages and dispatcher-manager lifecycle messages can race with the new generation.
The visible symptom is not only a short maintainer nodeID mismatch window. Stale add/remove/bootstrap/post-bootstrap/close messages can keep affecting the current control path after recovery, so the changefeed may stay stuck and its Kafka sink lag keeps growing for much longer.
What is changed and how it works?
This PR adds maintainer generation/epoch fencing across maintainer scheduling and dispatcher-manager ownership paths.
maintainer_epochthrough maintainer scheduling messages, including add, remove, bootstrap, post-bootstrap, close, and the corresponding dispatcher-manager responses.Check List
Tests
Commands run locally:
make generate-protobufmake fmtgo test ./coordinator/changefeed ./coordinator/operator ./maintainer ./downstreamadapter/dispatcherorchestrator ./downstreamadapter/dispatchermanager ./heartbeatpbgo test ./coordinator/...git diff --checkQuestions
Will it cause performance regression or break compatibility?
No expected performance regression. The runtime overhead is limited to passing a uint64 epoch and doing simple integer comparisons on maintainer control messages.
The protobuf changes are additive. Epoch
0is treated as the compatibility path for requests or responses that do not participate in maintainer epoch fencing yet.Do you need to update user documentation, design documentation or monitoring documentation?
No user-facing behavior or configuration is changed.
Release note
Summary by CodeRabbit
New Features
Improvements
Tests