Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,18 @@ def __init__(self):
# until OffsetForLeaderEpoch confirms the position is consistent with
# the current leader's log; mutually exclusive with awaiting_reset.
self._awaiting_validation = False
# Highest cluster leader epoch this position has already been reconciled
# against (a completed validation, or adoption of a position whose
# record epoch is unknown). This is deliberately distinct from the
# position's *record* epoch (``OffsetAndMetadata.leader_epoch``, which
# is sent as last_fetched_epoch for KIP-595 divergence detection): a
# leader election bumps the cluster epoch without changing the epoch of
# the record we're positioned on, and OffsetForLeaderEpoch reports the
# epoch of the requested (older) point, so the record epoch alone can't
# tell us whether we've validated against the current leader. Tracking
# it separately is what stops poll() from re-validating forever and
# stalling the consumer. See #3106.
self._current_leader_epoch = -1
# KIP-392: preferred read replica chosen by the broker (rack-aware).
# ``_preferred_read_replica_expiration`` is a monotonic deadline; after
# it passes we fall back to the leader and re-learn. Cleared on
Expand Down Expand Up @@ -597,6 +609,7 @@ def reset(self, strategy):
self._position = None
self.next_allowed_retry_time = None
self._awaiting_validation = False
self._current_leader_epoch = -1
self.clear_preferred_read_replica()

def is_reset_allowed(self):
Expand Down Expand Up @@ -625,6 +638,11 @@ def seek(self, offset):
self.drop_pending_record_batch = True
self.next_allowed_retry_time = None
self._awaiting_validation = False
# A freshly-seeked position has not been reconciled against any leader;
# force the next maybe_validate_position to validate it against the
# current leader (the prior position's validation says nothing about
# this offset).
self._current_leader_epoch = -1
self.clear_preferred_read_replica()

def pause(self):
Expand Down Expand Up @@ -686,7 +704,8 @@ def awaiting_validation(self):
return self._awaiting_validation

def maybe_validate_position(self, current_leader_epoch):
"""Mark for validation if current leader has advanced beyond our position's epoch.
"""Mark for validation if the current leader has advanced beyond the
leader epoch this position was last reconciled against.

Returns True if the partition is now awaiting validation.
"""
Expand All @@ -696,11 +715,26 @@ def maybe_validate_position(self, current_leader_epoch):
return False
if current_leader_epoch is None or current_leader_epoch < 0:
return False
# Positions without a known epoch (legacy data, post-seek to bare offset)
# can't be validated; treat as already-fetchable.
if self._position.leader_epoch < 0:
# Have we already reconciled this position against this leader epoch (or
# a newer one)? The floor is the max of:
# - the record epoch of the position itself: a record we actually
# fetched at this offset proves the position is valid through its
# epoch (so a position fetched under the current leader needs no
# validation), and
# - the leader epoch a completed validation reconciled us against,
# which can exceed the record epoch because OffsetForLeaderEpoch
# reports the epoch of the requested (older) point, not the current
# leader's. Without this second term the position is re-marked every
# poll and the partition never becomes fetchable again (#3106).
reconciled_epoch = max(self._current_leader_epoch, self._position.leader_epoch)
if current_leader_epoch <= reconciled_epoch:
return False
if self._position.leader_epoch >= current_leader_epoch:
# We are now tracking this leader epoch regardless of whether the
# position itself carries a record epoch we can validate against.
self._current_leader_epoch = current_leader_epoch
# Positions without a known record epoch (legacy data, post-seek to bare
# offset) can't be validated; treat as already-fetchable.
if self._position.leader_epoch < 0:
return False
self.clear_preferred_read_replica()
self._awaiting_validation = True
Expand Down
86 changes: 86 additions & 0 deletions test/consumer/test_fetcher_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,92 @@ def handler(api_key, api_version, correlation_id, request_bytes):
assert fetcher._subscriptions.assignment[tp].position.leader_epoch == 5
assert fetcher._subscriptions.assignment[tp].position.offset == 50

def test_validated_position_not_revalidated_forever(
self, broker, manager, fetcher):
"""Regression for #3106: consumer stalls after one fetched batch.

After a leader election the cluster epoch advances (3 -> 5), but
OffsetForLeaderEpoch reports the epoch of the *requested* (older)
point - here 3, below the cluster epoch - because that's the epoch
the position's record actually belongs to. The position must be
reconciled against the current leader epoch (5) so the *next* poll
does not re-mark it for validation. Otherwise the partition is gated
out of fetching on every poll and the consumer stalls forever with a
frozen position and growing lag.
"""
tp = TopicPartition(TOPIC, PARTITION)
# Position recorded under leader epoch 3 (e.g. last fetched batch).
fetcher._subscriptions.seek(tp, OffsetAndMetadata(50, '', 3))

# Cluster leader epoch advances to 5; refresh the consumer's cache.
_broker_metadata(broker, leader_epoch=5)
manager._net.run(manager.wait_for, manager.cluster.request_update(), 1000)

# No truncation (end_offset 100 >= position 50), and the broker
# reports the requested epoch (3), NOT the current cluster epoch (5).
ofle_requests = [0]

def handler(api_key, api_version, correlation_id, request_bytes):
ofle_requests[0] += 1
return _ofle_response(error_code=0, leader_epoch=3, end_offset=100)
broker.respond_fn(OffsetForLeaderEpochRequest, handler)
broker.respond_fn(OffsetForLeaderEpochRequest, handler) # guard 2nd call

# Round 1: poll marks + drives validation to completion.
fetcher.maybe_validate_positions()
assert fetcher._subscriptions.assignment[tp].awaiting_validation
manager.run(fetcher._validate_offsets_async, 1000)

assert fetcher._cached_log_truncation is None
assert not fetcher._subscriptions.assignment[tp].awaiting_validation
assert ofle_requests[0] == 1

# Round 2: the next poll must NOT re-mark the partition - it has
# already been validated against cluster leader epoch 5 - and the
# partition must be fetchable again.
fetcher.maybe_validate_positions()
state = fetcher._subscriptions.assignment[tp]
assert not state.awaiting_validation, \
'position re-marked for validation -> consumer stalls (#3106)'
assert state.is_fetchable()

# And no further OffsetForLeaderEpoch request is issued.
manager.run(fetcher._validate_offsets_async, 1000)
assert ofle_requests[0] == 1

def test_seek_forces_revalidation_of_new_position(
self, broker, manager, fetcher):
"""A seek must re-arm validation even after a prior validation.

Companion to #3106: the per-partition reconciled-leader-epoch must be
reset on seek. A position validated against leader epoch 5 says nothing
about a *different* offset the user then seeks to - that offset may sit
in a truncated region of an older epoch. If the stale reconciliation
leaked across the seek, the new position would be (wrongly) treated as
already validated and fetched without an OffsetForLeaderEpoch check.
"""
tp = TopicPartition(TOPIC, PARTITION)
fetcher._subscriptions.seek(tp, OffsetAndMetadata(50, '', 3))

_broker_metadata(broker, leader_epoch=5)
manager._net.run(manager.wait_for, manager.cluster.request_update(), 1000)

broker.respond(OffsetForLeaderEpochRequest,
_ofle_response(error_code=0, leader_epoch=3, end_offset=100))

# Validate the first position against the current leader (epoch 5).
fetcher.maybe_validate_positions()
manager.run(fetcher._validate_offsets_async, 1000)
assert not fetcher._subscriptions.assignment[tp].awaiting_validation

# User seeks to a different offset whose record epoch (4) is below the
# current cluster leader epoch (5). This new position has never been
# reconciled and must be re-marked for validation.
fetcher._subscriptions.seek(tp, OffsetAndMetadata(200, '', 4))
fetcher.maybe_validate_positions()
assert fetcher._subscriptions.assignment[tp].awaiting_validation, \
'seeked position not re-validated -> could consume past truncation'

def test_diverged_seeks_to_endpoint_with_policy(
self, broker, manager, fetcher):
"""end_offset < position.offset (valid epoch) on the wire triggers
Expand Down