eventcollector: refactor dispatcher registration and session lifecycle#5175
eventcollector: refactor dispatcher registration and session lifecycle#5175lidezhu wants to merge 17 commits into
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR refactors dispatcher session orchestration in TiCDC's event collector by introducing explicit registration entrypoints ( ChangesDispatcher Session Registration and State Management Refactoring
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Tools execution failed with the following error: Failed to run tools: 13 INTERNAL: Received RST_STREAM with code 2 (Internal server error) Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the registration and session management logic for dispatchers in the event collector. It introduces more explicit lifecycle methods (such as startLocalRegistration, retryCurrentRegistration, and startRemoteProbing) and encapsulates control-plane transitions within dispatcherSession and dispatcherStat. A critical race condition was identified in retryCurrentRegistration where a concurrent call to remove() could clear the current event service ID, leading to a panic. A code suggestion has been provided to handle this scenario gracefully.
|
/gemini review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Code Review
This pull request refactors the registration, reset, and probing lifecycle of dispatchers in the event collector. It replaces generic registration and reset methods with more specific, context-aware entry points such as startLocalRegistration, retryCurrentRegistrationIfRemovedFrom, and resetCurrentEventService. Additionally, it consolidates heartbeat progress queries into a single getHeartbeatReport method. Feedback on these changes suggests adding a check in retryCurrentRegistrationIfRemovedFrom to prevent unnecessary operations and logging if the dispatcher session has already been removed.
| func (s *dispatcherSession) retryCurrentRegistrationIfRemovedFrom(serverID node.ID) bool { | ||
| s.requestMu.Lock() | ||
| defer s.requestMu.Unlock() | ||
| if s.connState.getCurrentEventServiceID() != serverID { | ||
| return false | ||
| } |
There was a problem hiding this comment.
If the dispatcher session has already been removed, we should avoid logging the retry message and attempting to register. Checking s.connState.isRemoved() at the beginning of retryCurrentRegistrationIfRemovedFrom prevents unnecessary logging and operations on a terminated session.
func (s *dispatcherSession) retryCurrentRegistrationIfRemovedFrom(serverID node.ID) bool {
s.requestMu.Lock()
defer s.requestMu.Unlock()
if s.connState.isRemoved() {
return false
}
if s.connState.getCurrentEventServiceID() != serverID {
return false
}|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/eventcollector/dispatcher_session.go (1)
586-600:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winDon't make the init-time remote-ready path reachable until the callback contract is updated.
This now starts remote probing while the session can still be in the
readyCallback != nilinit state, buthandleAcceptedRemoteReadyLockedstill panics on that combination. The state-machine comment above explicitly allows “remote ready first”, so the first successful reusable remote during add will crash instead of resetting the remote stream. Either defer probing until after the initial local commit, or make the remote-ready path tolerate the init callback state.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 586 - 600, startRemoteProbing may run while the session is still in the init "readyCallback != nil" state, which causes handleAcceptedRemoteReadyLocked to panic; change startRemoteProbing (in dispatcherSession) to not start probing when the init ready callback is present: inside startRemoteProbing (after acquiring s.requestMu and before calling s.connState.beginRemoteProbing / s.sendRegisterRequest) check the connection/init callback state (readyCallback != nil or equivalent on s.connState) and if it's non-nil, defer probing by returning early or by saving the candidate nodes to a pending field to be processed once the init callback completes (ensure the same lock protects the pending field), so the remote probing path is only reachable after the initial local commit; update any tests to cover the deferred-probing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/eventcollector/dispatcher_session.go`:
- Around line 355-359: commitLocalRegistration currently calls doReset but
leaves the one-shot readyCallback set, so subsequent accepted local ready
messages re-enter the init-only branch in handleAcceptedLocalReadyLocked and
skip the normal RESET/re-registration path; fix by clearing the one-shot
callback (set s.readyCallback = nil) at the end of commitLocalRegistration (or
immediately after doReset) so the init-only callback cannot be reused and normal
RESET/re-registration logic in handleAcceptedLocalReadyLocked runs on future
events.
---
Outside diff comments:
In `@downstreamadapter/eventcollector/dispatcher_session.go`:
- Around line 586-600: startRemoteProbing may run while the session is still in
the init "readyCallback != nil" state, which causes
handleAcceptedRemoteReadyLocked to panic; change startRemoteProbing (in
dispatcherSession) to not start probing when the init ready callback is present:
inside startRemoteProbing (after acquiring s.requestMu and before calling
s.connState.beginRemoteProbing / s.sendRegisterRequest) check the
connection/init callback state (readyCallback != nil or equivalent on
s.connState) and if it's non-nil, defer probing by returning early or by saving
the candidate nodes to a pending field to be processed once the init callback
completes (ensure the same lock protects the pending field), so the remote
probing path is only reachable after the initial local commit; update any tests
to cover the deferred-probing behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1b8c6473-7300-4fac-9b7e-17000770830c
📒 Files selected for processing (6)
downstreamadapter/eventcollector/dispatcher_session.godownstreamadapter/eventcollector/dispatcher_stat.godownstreamadapter/eventcollector/dispatcher_stat_test.godownstreamadapter/eventcollector/event_collector.godownstreamadapter/eventcollector/event_collector_test.godownstreamadapter/eventcollector/log_coordinator_client.go
| // commitLocalRegistration commits the accepted local registration by sending | ||
| // RESET to the local EventService. | ||
| func (s *dispatcherSession) commitLocalRegistration() { | ||
| s.doReset(s.localServerID, s.target.GetCheckpointTs()) | ||
| } |
There was a problem hiding this comment.
Clear the one-shot readyCallback when local registration is committed.
After commitLocalRegistration() runs, readyCallback is still non-nil. Any later accepted local ready will re-enter the init-only callback branch in handleAcceptedLocalReadyLocked and skip the normal RESET path, which breaks re-registration after removal/reconnect.
Proposed fix
func (s *dispatcherSession) commitLocalRegistration() {
- s.doReset(s.localServerID, s.target.GetCheckpointTs())
+ s.requestMu.Lock()
+ defer s.requestMu.Unlock()
+ if s.connState.isRemoved() {
+ return
+ }
+ s.readyCallback = nil
+ s.doResetLocked(s.localServerID, s.target.GetCheckpointTs())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // commitLocalRegistration commits the accepted local registration by sending | |
| // RESET to the local EventService. | |
| func (s *dispatcherSession) commitLocalRegistration() { | |
| s.doReset(s.localServerID, s.target.GetCheckpointTs()) | |
| } | |
| // commitLocalRegistration commits the accepted local registration by sending | |
| // RESET to the local EventService. | |
| func (s *dispatcherSession) commitLocalRegistration() { | |
| s.requestMu.Lock() | |
| defer s.requestMu.Unlock() | |
| if s.connState.isRemoved() { | |
| return | |
| } | |
| s.readyCallback = nil | |
| s.doResetLocked(s.localServerID, s.target.GetCheckpointTs()) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 355 -
359, commitLocalRegistration currently calls doReset but leaves the one-shot
readyCallback set, so subsequent accepted local ready messages re-enter the
init-only branch in handleAcceptedLocalReadyLocked and skip the normal
RESET/re-registration path; fix by clearing the one-shot callback (set
s.readyCallback = nil) at the end of commitLocalRegistration (or immediately
after doReset) so the init-only callback cannot be reused and normal
RESET/re-registration logic in handleAcceptedLocalReadyLocked runs on future
events.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit