KAFKA-20428: Fix unsubscribe failure with assignment updates#22011
KAFKA-20428: Fix unsubscribe failure with assignment updates#22011lianetm merged 4 commits intoapache:trunkfrom
Conversation
|
Just for the record, I filed https://issues.apache.org/jira/browse/KAFKA-20429 as follow-up, to properly review the design so we can simplify and be less error-prone in this area (for now just fixing with the same "filter-out" approach that we already have in place) |
|
@kirktrue / @AndrewJSchofield could you take a look? Thanks! |
| // they are not relevant anymore (consumer already unsubscribing). | ||
| processBackgroundEvents(unsubscribeEvent.future(), timer, | ||
| e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException), | ||
| true); |
There was a problem hiding this comment.
Since we're already using an anonymous function for Predicate<Exception> ignoreErrorEventException, could we also do that instead of a flag, i.e.:
| true); | |
| e -> (event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED || event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED)); |
Then we can delete isAssignmentEvent() and the event filtering becomes a little more generalized.
There was a problem hiding this comment.
Agree that it would given some consistency in the params, but at the price of complexity in a case where seems to me we don't really need the flexibitliy of a predicate?
This is only used from the unsubscribe to "skip assignment", not different callers filtering different kinds of events. So I would lean towards simple/YAGNI in this case, makes sense?
| public void testUnsubscribeWithPendingStreamsTasksAssignedEvent() { | ||
| consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup")); | ||
| completeTopicSubscriptionChangeEventSuccessfully(); | ||
| consumer.subscribe(singletonList("topic")); | ||
| completeUnsubscribeApplicationEventSuccessfully(); | ||
|
|
||
| // Add StreamsTasksAssignedEvent to the background queue (simulating an ongoing reconciliation | ||
| // that completed just before unsubscribe was called) | ||
| StreamsTasksAssignedEvent assignedEvent = new StreamsTasksAssignedEvent( | ||
| new TreeSet<>(TOPIC_PARTITION_COMPARATOR), new TreeSet<>(TOPIC_PARTITION_COMPARATOR), | ||
| new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), true)); | ||
| backgroundEventQueue.add(assignedEvent); | ||
|
|
||
| // The call to unsubscribe should complete successfully (StreamsTasksAssignedEvent not processed and completed exceptionally) | ||
| assertDoesNotThrow(() -> consumer.unsubscribe()); | ||
| verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing")) | ||
| .addAndGet(any(ApplyAssignmentEvent.class)); | ||
| assertTrue(assignedEvent.future().isCompletedExceptionally()); | ||
| } |
There was a problem hiding this comment.
These unit tests only differ by the type of assignedEvent, right? Could we generalize the test to reduce duplication?
There was a problem hiding this comment.
yes, done (consolidated them with params)
| if (event instanceof CompletableEvent) { | ||
| ((CompletableEvent<?>) event).future().completeExceptionally( | ||
| new KafkaException("Assignment event skipped because consumer is unsubscribing")); | ||
| } |
There was a problem hiding this comment.
Both of these events are CompletableEvents. I guess this is defensive?
There was a problem hiding this comment.
yes, just extra check because at this level we don't know about the specific events
|
Thanks for the review @kirktrue ! Comments addressed |
There was a problem hiding this comment.
Pull request overview
Fixes KAFKA-20428 by preventing AsyncKafkaConsumer#unsubscribe() from processing pending “assignment update” background events that can be queued right before unsubscribe and otherwise cause failures during the unsubscribe flow.
Changes:
- Add a
skipAssignmentEventsoption toAsyncKafkaConsumer#processBackgroundEvents(...)and use it fromunsubscribe(). - Skip processing
PARTITIONS_ASSIGNED/STREAMS_TASKS_ASSIGNEDbackground events during unsubscribe (while still draining the queue). - Update/add unit tests to cover unsubscribe with pending assignment events and adapt existing tests to the new method signature.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java |
Adds the ability to skip assignment-update background events during unsubscribe and wires it into unsubscribe(). |
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java |
Updates processBackgroundEvents tests for the new signature and adds a parameterized regression test for pending assignment events during unsubscribe. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. It seems to me that there's a bit of a testing gap because we are completing the assignment events preemptively during unsubscribe and I don't think we have a test to validate. It seems fairly straightforward for the consumer group, but a bit more complicated for the streams group. Do we need to show that the streams group remains in a valid state afterwards?
|
Makes sense @AndrewJSchofield , added tests to show the recovery at the state machine level (showing members can rejoin after unsubscribe when there was a previous reconciliation clashing with the unsubscribe). Both tests pass locally . Thanks! |
Fix to ensure that unsubscribe does not apply any pending assignment update that may exist in the background queue (e.g, if a reconciliation completed right before unsubscribing). Fix by filtering out the assignment update events on unsubscribe (same approach already done for filtering out error events that the unsubscribe should not process). This issue and fix only affects unsubscribe (not close), as unsubscribe is the only one, other than poll, that processes background events. Reviewers: Kirk True <kirk@kirktrue.pro>, Andrew Schofield <aschofield@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Fix to ensure that unsubscribe does not apply any pending assignment
update that may exist in the background queue (e.g, if a reconciliation
completed right before unsubscribing).
Fix by filtering out the assignment update events on unsubscribe (same
approach already done for filtering out error events that the
unsubscribe should not process).
This issue and fix only affects unsubscribe (not close), as unsubscribe
is the only one, other than poll, that processes background events.
Reviewers: Kirk True kirk@kirktrue.pro, Andrew Schofield
aschofield@confluent.io