[fix][broker] Prevent topic policy initialization race with a buffering listener wrapper#26044
Merged
Merged
Conversation
…apper Introduce TopicPolicyListenerWrapper: during topic-policy initialization it buffers incoming onUpdate notifications and, on completeInitialization, applies the latest received values (preferring them over the loaded values). This fixes a race where a live policy update arriving while PersistentTopic#initTopicPolicy / NonPersistentTopic#initialize is still loading policies could be overwritten by the stale loaded value, leaving the topic policies inconsistent until the next update. PersistentTopic and NonPersistentTopic register the wrapper (via getTopicPolicyListener) and complete its initialization on the per-topic policies-notify thread; non-persistent and internal topics complete it with null loaded values. A null delete arriving during the init window is ignored (onUpdate(null) is a no-op for topics) without NPEing. Assisted-by: Claude Opus 4.8
Use Optional<TopicPolicies> for the buffered latest global/local policies so a delete received during initialization (onUpdate(null)) is recorded (Optional.empty) and propagated downstream, instead of being ignored and the now-stale loaded value re-applied. A null field means no update was received during init (the loaded value is used); a present Optional holds the received policies; an empty Optional records a delete. Since a delete carries no global/local scope through the TopicPolicyListener interface, it is recorded for both scopes and a later scoped update during init overrides its own scope. Assisted-by: Claude Opus 4.8
dc9df65 to
98bb275
Compare
Reword the onUpdate buffering comment to describe the actual code (a value is stored as Optional.of(data), a delete as Optional.empty()) instead of the stale Optional.ofNullable reference, and mark the assign-once createdTimestampNanos field final. Assisted-by: Claude Opus 4.8
merlimat
approved these changes
Jun 18, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Main Issue: #26037
Motivation
When a topic is loaded, its initial topic policies are applied during
PersistentTopic#initTopicPolicy/NonPersistentTopic#initializeby reading the current (global + local) policies and applying them. The topicalso registers as a
TopicPolicyListenerto receive subsequent updates. There is a race: a live policy updatecan arrive via the listener while the initial load is still in flight, and the later-completing initial load
can overwrite the newer live value — leaving the topic with stale topic policies until the next update arrives.
This is a rare corner case, but possible.
Modifications
TopicPolicyListenerWrapper, aTopicPolicyListenerwrapper around the real topic listener. While notyet initialized it buffers the latest received global/local policies instead of forwarding them; on
completeInitialization(loadedGlobal, loadedLocal)it applies, per scope, the latest value received duringloading (falling back to the loaded value), then forwards all subsequent updates directly. A null (delete)
update arriving during the init window is ignored —
onUpdate(null)is a no-op for topics — without NPEing.PersistentTopicandNonPersistentTopicregister the wrapper (via the newAbstractTopic#getTopicPolicyListenerhook) and call
completeInitializationon the per-topic policies-notify thread after the initial load.Internal and non-persistent topics complete with
nullloaded values (they don't load initial topic policies).Verifying this change
This change is already covered by existing tests and adds a unit test:
TopicPolicyListenerWrapperTest— verifies buffering during init, latest-wins precedence over loaded values,application of loaded values when nothing was buffered, and the null-delete guard.
TopicPoliciesTest,SystemTopicBasedTopicPoliciesServiceTest,MessageTTLTest) exercise the wrapper end-to-end for both persistent and non-persistent topics.Does this pull request potentially affect one of the following parts:
This is an internal behavior change (topic-policy initialization) and does not change any public API, schema,
configuration, wire protocol, REST endpoint, CLI option, or metric.