From f6cb334e05c19ccb4a72abaa4ed1027591340da5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 25 Jun 2026 11:55:47 -0700 Subject: [PATCH] consumer: track current leader epoch in addition to record epoch --- kafka/consumer/subscription_state.py | 44 ++++++++++-- test/consumer/test_fetcher_mock_broker.py | 86 +++++++++++++++++++++++ 2 files changed, 125 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 03863f769..d8283e349 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -564,6 +564,18 @@ def __init__(self): # until OffsetForLeaderEpoch confirms the position is consistent with # the current leader's log; mutually exclusive with awaiting_reset. self._awaiting_validation = False + # Highest cluster leader epoch this position has already been reconciled + # against (a completed validation, or adoption of a position whose + # record epoch is unknown). This is deliberately distinct from the + # position's *record* epoch (``OffsetAndMetadata.leader_epoch``, which + # is sent as last_fetched_epoch for KIP-595 divergence detection): a + # leader election bumps the cluster epoch without changing the epoch of + # the record we're positioned on, and OffsetForLeaderEpoch reports the + # epoch of the requested (older) point, so the record epoch alone can't + # tell us whether we've validated against the current leader. Tracking + # it separately is what stops poll() from re-validating forever and + # stalling the consumer. See #3106. + self._current_leader_epoch = -1 # KIP-392: preferred read replica chosen by the broker (rack-aware). # ``_preferred_read_replica_expiration`` is a monotonic deadline; after # it passes we fall back to the leader and re-learn. Cleared on @@ -597,6 +609,7 @@ def reset(self, strategy): self._position = None self.next_allowed_retry_time = None self._awaiting_validation = False + self._current_leader_epoch = -1 self.clear_preferred_read_replica() def is_reset_allowed(self): @@ -625,6 +638,11 @@ def seek(self, offset): self.drop_pending_record_batch = True self.next_allowed_retry_time = None self._awaiting_validation = False + # A freshly-seeked position has not been reconciled against any leader; + # force the next maybe_validate_position to validate it against the + # current leader (the prior position's validation says nothing about + # this offset). + self._current_leader_epoch = -1 self.clear_preferred_read_replica() def pause(self): @@ -686,7 +704,8 @@ def awaiting_validation(self): return self._awaiting_validation def maybe_validate_position(self, current_leader_epoch): - """Mark for validation if current leader has advanced beyond our position's epoch. + """Mark for validation if the current leader has advanced beyond the + leader epoch this position was last reconciled against. Returns True if the partition is now awaiting validation. """ @@ -696,11 +715,26 @@ def maybe_validate_position(self, current_leader_epoch): return False if current_leader_epoch is None or current_leader_epoch < 0: return False - # Positions without a known epoch (legacy data, post-seek to bare offset) - # can't be validated; treat as already-fetchable. - if self._position.leader_epoch < 0: + # Have we already reconciled this position against this leader epoch (or + # a newer one)? The floor is the max of: + # - the record epoch of the position itself: a record we actually + # fetched at this offset proves the position is valid through its + # epoch (so a position fetched under the current leader needs no + # validation), and + # - the leader epoch a completed validation reconciled us against, + # which can exceed the record epoch because OffsetForLeaderEpoch + # reports the epoch of the requested (older) point, not the current + # leader's. Without this second term the position is re-marked every + # poll and the partition never becomes fetchable again (#3106). + reconciled_epoch = max(self._current_leader_epoch, self._position.leader_epoch) + if current_leader_epoch <= reconciled_epoch: return False - if self._position.leader_epoch >= current_leader_epoch: + # We are now tracking this leader epoch regardless of whether the + # position itself carries a record epoch we can validate against. + self._current_leader_epoch = current_leader_epoch + # Positions without a known record epoch (legacy data, post-seek to bare + # offset) can't be validated; treat as already-fetchable. + if self._position.leader_epoch < 0: return False self.clear_preferred_read_replica() self._awaiting_validation = True diff --git a/test/consumer/test_fetcher_mock_broker.py b/test/consumer/test_fetcher_mock_broker.py index 952890cad..fae3ae8b6 100644 --- a/test/consumer/test_fetcher_mock_broker.py +++ b/test/consumer/test_fetcher_mock_broker.py @@ -143,6 +143,92 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert fetcher._subscriptions.assignment[tp].position.leader_epoch == 5 assert fetcher._subscriptions.assignment[tp].position.offset == 50 + def test_validated_position_not_revalidated_forever( + self, broker, manager, fetcher): + """Regression for #3106: consumer stalls after one fetched batch. + + After a leader election the cluster epoch advances (3 -> 5), but + OffsetForLeaderEpoch reports the epoch of the *requested* (older) + point - here 3, below the cluster epoch - because that's the epoch + the position's record actually belongs to. The position must be + reconciled against the current leader epoch (5) so the *next* poll + does not re-mark it for validation. Otherwise the partition is gated + out of fetching on every poll and the consumer stalls forever with a + frozen position and growing lag. + """ + tp = TopicPartition(TOPIC, PARTITION) + # Position recorded under leader epoch 3 (e.g. last fetched batch). + fetcher._subscriptions.seek(tp, OffsetAndMetadata(50, '', 3)) + + # Cluster leader epoch advances to 5; refresh the consumer's cache. + _broker_metadata(broker, leader_epoch=5) + manager._net.run(manager.wait_for, manager.cluster.request_update(), 1000) + + # No truncation (end_offset 100 >= position 50), and the broker + # reports the requested epoch (3), NOT the current cluster epoch (5). + ofle_requests = [0] + + def handler(api_key, api_version, correlation_id, request_bytes): + ofle_requests[0] += 1 + return _ofle_response(error_code=0, leader_epoch=3, end_offset=100) + broker.respond_fn(OffsetForLeaderEpochRequest, handler) + broker.respond_fn(OffsetForLeaderEpochRequest, handler) # guard 2nd call + + # Round 1: poll marks + drives validation to completion. + fetcher.maybe_validate_positions() + assert fetcher._subscriptions.assignment[tp].awaiting_validation + manager.run(fetcher._validate_offsets_async, 1000) + + assert fetcher._cached_log_truncation is None + assert not fetcher._subscriptions.assignment[tp].awaiting_validation + assert ofle_requests[0] == 1 + + # Round 2: the next poll must NOT re-mark the partition - it has + # already been validated against cluster leader epoch 5 - and the + # partition must be fetchable again. + fetcher.maybe_validate_positions() + state = fetcher._subscriptions.assignment[tp] + assert not state.awaiting_validation, \ + 'position re-marked for validation -> consumer stalls (#3106)' + assert state.is_fetchable() + + # And no further OffsetForLeaderEpoch request is issued. + manager.run(fetcher._validate_offsets_async, 1000) + assert ofle_requests[0] == 1 + + def test_seek_forces_revalidation_of_new_position( + self, broker, manager, fetcher): + """A seek must re-arm validation even after a prior validation. + + Companion to #3106: the per-partition reconciled-leader-epoch must be + reset on seek. A position validated against leader epoch 5 says nothing + about a *different* offset the user then seeks to - that offset may sit + in a truncated region of an older epoch. If the stale reconciliation + leaked across the seek, the new position would be (wrongly) treated as + already validated and fetched without an OffsetForLeaderEpoch check. + """ + tp = TopicPartition(TOPIC, PARTITION) + fetcher._subscriptions.seek(tp, OffsetAndMetadata(50, '', 3)) + + _broker_metadata(broker, leader_epoch=5) + manager._net.run(manager.wait_for, manager.cluster.request_update(), 1000) + + broker.respond(OffsetForLeaderEpochRequest, + _ofle_response(error_code=0, leader_epoch=3, end_offset=100)) + + # Validate the first position against the current leader (epoch 5). + fetcher.maybe_validate_positions() + manager.run(fetcher._validate_offsets_async, 1000) + assert not fetcher._subscriptions.assignment[tp].awaiting_validation + + # User seeks to a different offset whose record epoch (4) is below the + # current cluster leader epoch (5). This new position has never been + # reconciled and must be re-marked for validation. + fetcher._subscriptions.seek(tp, OffsetAndMetadata(200, '', 4)) + fetcher.maybe_validate_positions() + assert fetcher._subscriptions.assignment[tp].awaiting_validation, \ + 'seeked position not re-validated -> could consume past truncation' + def test_diverged_seeks_to_endpoint_with_policy( self, broker, manager, fetcher): """end_offset < position.offset (valid epoch) on the wire triggers