maintainer,dispatcher: fence stale generation requests#5182
maintainer,dispatcher: fence stale generation requests#5182hongyunyan wants to merge 10 commits into
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a maintainer generation/epoch fencing mechanism to prevent stale maintainer requests from affecting dispatcher managers. It adds generation fields to heartbeat protobuf messages, implements fencing logic in the dispatcher manager and orchestrator, and stamps outgoing requests with the current maintainer generation. The review feedback highlights two critical head-of-line blocking issues in dispatcher_orchestrator.go where the orchestrator-wide lock m.mutex is held while waiting for the per-changefeed lock manager.LockControl(), and provides suggestions to safely release the lock before acquiring the per-changefeed lock.
| } else { | ||
| manager.LockControl() | ||
| defer manager.UnlockControl() | ||
| if !manager.TryUpdateMaintainer(from, generation) { | ||
| log.Warn("drop stale maintainer bootstrap request", | ||
| zap.String("changefeed", cfId.Name()), | ||
| zap.String("from", from.String()), | ||
| zap.Uint64("requestGeneration", generation), | ||
| zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()), | ||
| zap.String("currentMaintainer", manager.GetMaintainerID().String())) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.
To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.
} else {
m.mutex.Unlock()
manager.LockControl()
defer manager.UnlockControl()
m.mutex.Lock()
currentManager, stillExists := m.dispatcherManagers[cfId]
if !stillExists || currentManager != manager {
m.mutex.Unlock()
return nil
}
m.mutex.Unlock()
if !manager.TryUpdateMaintainer(from, generation) {
log.Warn("drop stale maintainer bootstrap request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
return nil
}| m.mutex.Lock() | ||
| if manager, ok := m.dispatcherManagers[cfId]; ok { | ||
| if closed := manager.TryClose(req.Removed); closed { | ||
| delete(m.dispatcherManagers, cfId) | ||
| metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec() | ||
| response.Success = true | ||
| manager.LockControl() | ||
| if manager.IsMaintainerRequestAllowed(from, req.Generation) { | ||
| if closed := manager.TryClose(req.Removed); closed { | ||
| delete(m.dispatcherManagers, cfId) | ||
| metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec() | ||
| response.Success = true | ||
| } else { | ||
| response.Success = false | ||
| } | ||
| } else { | ||
| response.Success = false | ||
| log.Warn("drop stale maintainer close request", | ||
| zap.String("changefeed", cfId.Name()), | ||
| zap.String("from", from.String()), | ||
| zap.Uint64("requestGeneration", req.Generation), | ||
| zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()), | ||
| zap.String("currentMaintainer", manager.GetMaintainerID().String())) | ||
| } | ||
| manager.UnlockControl() | ||
| } | ||
| m.mutex.Unlock() |
There was a problem hiding this comment.
Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.
To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.
m.mutex.Lock()
manager, ok := m.dispatcherManagers[cfId]
if !ok {
m.mutex.Unlock()
return response
}
m.mutex.Unlock()
manager.LockControl()
defer manager.UnlockControl()
m.mutex.Lock()
currentManager, stillExists := m.dispatcherManagers[cfId]
if !stillExists || currentManager != manager {
m.mutex.Unlock()
response.Success = false
return response
}
if manager.IsMaintainerRequestAllowed(from, req.Generation) {
if closed := manager.TryClose(req.Removed); closed {
delete(m.dispatcherManagers, cfId)
metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec()
response.Success = true
} else {
response.Success = false
}
} else {
log.Warn("drop stale maintainer close request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", req.Generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
}
m.mutex.Unlock()…neration-fence # Conflicts: # coordinator/controller.go # heartbeatpb/heartbeat.pb.go
What problem does this PR solve?
Issue Number: close #5083
During maintainer failover, a delayed schedule request from the previous
maintainer can still reach a dispatcher manager after the new maintainer has
already bootstrapped and recreated the same table span. Without a receiver-side
ownership fence, the stale request can create an orphan dispatcher that enters
Workingand writes to the downstream sink before the new maintainer observesand removes it.
What is changed and how it works?
This PR adds a receiver-local maintainer generation fence:
generationto maintainer bootstrap, schedule, post-bootstrap, and closeheartbeat messages.
scheduled through add/move operators, and before resume/retry scheduling.
ChangeFeedInfoand job status, advancing withmax(candidate, persisted+1),preserving stored status by default, and writing info/job under info-key and
job-key
ModRevisioncompares.of first doing an ordinary no-CAS changefeed update.
changefeed's generation strictly monotonic with
max(candidate, current+1).AddMaintainerRequest.Configbytes synchronized with the latestChangeFeedInfo.request generation and reject stale schedule/post-bootstrap/close requests
locally.
changes, and keeps
currentOperatorMapkeyed by dispatcher ID and generation.0only while thereceiver has not observed a non-zero generation for the changefeed, and only
for the current compatibility-mode maintainer owner.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No expected performance regression. The new mutex only serializes per-changefeed
dispatcher-manager control operations such as bootstrap, close, and dispatcher
create/remove scheduling; it is not in the event write path.
The change is wire-compatible. New fields are optional protobuf fields, and a
new receiver still allows generation
0from the current maintainer owner whileit remains in compatibility mode for that changefeed.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Validation
make generate-protobufmake fmttools/bin/golangci-lint run --timeout 10m0s --new-from-rev=upstream/mastergo test ./coordinator/changefeed ./coordinator/operator ./coordinator ./pkg/pdutilgo test ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./coordinator ./coordinator/changefeed ./coordinator/operator ./pkg/pdutil ./maintainer ./maintainer/replica ./maintainer/operatorgo test ./api/v1 ./coordinator ./coordinator/changefeed ./coordinator/drain ./coordinator/operator ./coordinator/scheduler ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./maintainer ./maintainer/replica ./maintainer/operator ./pkg/bootstrap ./pkg/server ./pkg/pdutilgit diff --check