[feat] PIP-468: Basic end-to-end tests for V5 Queue/Stream/Checkpoint consumers + async APIs#25587
Merged
merlimat merged 7 commits intoapache:masterfrom Apr 28, 2026
Merged
Conversation
ScalableQueueConsumer and ScalableStreamConsumer were creating fresh ConsumerConfigurationData per segment that only carried over the subscription name and consumer name. Every other builder knob — receiverQueueSize, ackTimeout, acknowledgmentGroupTime, maxAcknowledgmentGroupSize, negativeAckRedeliveryBackoff, ackTimeoutRedeliveryBackoff, deadLetterPolicy, readCompacted, replicateSubscriptionState, encryption — was silently dropped on the floor. Clone the user-facing ConsumerConfigurationData and only override the fields that must be per-segment (topicNames, subscription type) and the consumer name suffix. This makes the V5 consumer builders behave the way users expect: setting a knob on the V5 builder actually applies to the underlying per-segment v4 consumers. The same structural issue exists in ScalableTopicProducer and ScalableCheckpointConsumer; those will be fixed in follow-ups since they need a small builder refactor to retain the user-facing config.
Five end-to-end scenarios for QueueConsumer on a single-segment topic: - testProduceAndAckMany — 50 messages, sequential receive + ack, then a short-timeout receive confirms nothing leaks back. - testNegativeAckCausesRedelivery — verifies negativeAcknowledge triggers redelivery, with a tight backoff so the test stays fast. - testReceiveTimeoutReturnsNullWhenNoMessages — receive(timeout) must return null on an idle topic, not block or throw. - testKeyedMessagesPreserveKeyAndProperties — keys and properties roundtrip through the wire without loss. - testTopicSubscriptionAndConsumerNameAccessors — consumer accessors return what was configured (and a generated name when not set). V5ClientBaseTest now exposes a shared v5Client (initialized in @BeforeClass, closed in @afterclass) — most tests use it instead of spinning up a new client per test. Tests use Lombok @cleanup on local producer/consumer variables for resource lifecycle. Total wall-clock ~1.2s of test work on top of the shared cluster startup, which is amortized across the class.
Four end-to-end scenarios for StreamConsumer on a single-segment topic: - testProduceAndCumulativeAck — 50 messages received in order, single cumulative-ack of the last MessageId, verify nothing redelivers. - testReceiveMultiReturnsBatch — receiveMulti(20, 500ms) returns the available 10 messages once the timeout fires; iteration count matches Messages.count(); lastId is set. - testReceiveTimeoutReturnsNullWhenNoMessages — receive(timeout) returns null on an idle topic. - testTopicSubscriptionAndConsumerNameAccessors — consumer accessors return what was configured. Uses the shared v5Client and @cleanup, same pattern as the queue test.
…t on wire read Checkpoint.checkpoint() captured per-segment positions from the wire read loop, but the read loop pre-fetches into an in-memory queue. So a checkpoint taken right after receive() returned message N could already point past message N+1 (or further) if the read loop had run ahead. Calling seek(savedCheckpoint) would then skip messages that the application had not yet processed. Move the per-segment position update into receive() / receive(timeout) / receiveMulti(...), i.e., the moment a message crosses from the wire buffer into application code. Now checkpoint() reflects "I have processed up to here" and seek(checkpoint) redelivers everything after. Surfaced by V5CheckpointConsumerBasicTest.testSeekRewindsToEarlierCheckpoint.
Seven end-to-end scenarios for CheckpointConsumer on a single-segment topic, covering the unmanaged reader-style API used by connector frameworks (Flink / Spark): - testReadFromEarliest — Checkpoint.earliest() sentinel reads the full history. - testReadFromLatestSkipsExistingMessages — Checkpoint.latest() must not see pre-existing messages; only what's published after subscribe. - testCheckpointAndResume — checkpoint() + close + reopen with the saved position resumes exactly at the next unread message. - testCheckpointSerializationRoundtrip — checkpoint().toByteArray() + Checkpoint.fromByteArray() preserves position across an out-of-process restart shape. - testSeekRewindsToEarlierCheckpoint — seek(checkpoint) rewinds the consumer and redelivers every message published after that point. - testReceiveTimeoutReturnsNullWhenNoMessages — idle topic handling. - testTopicAccessor — basic accessor. Cross-segment position-vector behavior is intentionally deferred to the multi-segment scalable suite.
Six end-to-end scenarios exercising the V5 async API surface: - testAsyncProducerSendAndFlush — N async sends + flush(); verify all futures complete with non-null MessageIds and the messages land in order on a Shared subscription. - testAsyncProducerSendCarriesMessageId — sendAsync returns a future carrying the broker-assigned MessageId. - testAsyncQueueConsumerReceiveAndAck — async receive() future, ack via the async view, no redelivery after. - testAsyncStreamConsumerReceiveAndCumulativeAck — multiple async receives + acknowledgeCumulative on the last id. - testAsyncCheckpointConsumerCheckpointAndSeek — async checkpoint() and seek() futures complete; seek redelivers everything after the saved position. - testAsyncCloseCompletes — async close() futures on producer and consumer complete cleanly. All single-segment, ~1s total wall-clock on the shared cluster.
The single-letter generic 'M' on advanceCheckpoint() (added in the previous CheckpointConsumer position fix) violates Pulsar's MethodTypeParameterName checkstyle rule, which restricts single-letter type parameters to T/K/V/W/X/R. The genericity wasn't needed — every caller passes a plain Message<T>, so just take and return Message<T>.
10 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 2 of the V5 end-to-end test suite: per-consumer-type basic
behaviors plus the async API surface. Builds on the V5 smoke-test infra
landed in #25586. Single-segment scalable topics throughout — multi-
segment / split / merge / multi-broker scenarios live in dedicated
follow-up suites.
New test classes
negativeAck redelivery, idle receive timeout, keyed-message metadata
roundtrip, accessors.
receive, `receiveMulti` batch shape, idle receive timeout,
accessors.
and `Checkpoint.latest` sentinels, checkpoint + close + reopen-with-
saved-position, `toByteArray` / `fromByteArray` roundtrip,
in-place `seek(checkpoint)` rewind, idle timeout, topic accessor.
async receive on Queue / Stream / Checkpoint consumers, async ack /
cumulative ack / seek / checkpoint, async close.
22 new tests, ~6 s of test wall-clock on top of the shared cluster
startup that's amortized across the class.
Real fixes surfaced by writing the tests
Both are squashable: the test commits depend on the fix commits.
fix: propagate user consumer config to per-segment consumers
(`025da64b4f`) — `ScalableQueueConsumer` and `ScalableStreamConsumer`
were creating a fresh `ConsumerConfigurationData` per segment that
only carried over `subscriptionName` and `consumerName`. Every
other builder knob — `receiverQueueSize`, `ackTimeout`,
`acknowledgmentGroupTime`, `maxAcknowledgmentGroupSize`,
`negativeAckRedeliveryBackoff`, `ackTimeoutRedeliveryBackoff`,
`deadLetterPolicy`, `readCompacted`,
`replicateSubscriptionState`, encryption — was silently dropped.
Surfaced by the queue test's nack-backoff scenario. Fix: clone the
user-facing config and override only what must be per-segment
(`topicNames`, subscription type, consumer-name suffix). The same
structural issue still exists in `ScalableTopicProducer` and
`ScalableCheckpointConsumer` — left for follow-ups since they need
a small builder refactor to retain the user-facing config.
fix: advance CheckpointConsumer position in `receive()`, not on
wire read (`0a2f834a5e`) — `checkpoint()` was capturing the
read-loop's pre-fetched wire position, but the read loop pre-fetches
into an in-memory queue. So a checkpoint taken right after `receive()`
returned message N could already point past N+1 (or further) if the
loop had run ahead, and `seek(savedCheckpoint)` would skip messages
the application had not yet processed. Surfaced by
`testSeekRewindsToEarlierCheckpoint`. Fix: advance the per-segment
positions inside `receive()` / `receive(timeout)` /
`receiveMulti(...)` — i.e., the moment a message crosses from the
wire buffer into application code.
Things deliberately left for later phases
range routing under load, cumulative ack with position vector across
segments).
reconnect — exercises the `SubscriptionCoordinator` end-to-end.
cross-broker ownership, controller failover. Needs a multi-broker base
class; will land in its own PR.
modes / dedup / event time / delivery timing).
Test plan
"org.apache.pulsar.client.api.v5.*"` — 23/23 pass
(smoke + queue + stream + checkpoint + async).
(`ScalableTopicControllerTest`, `ScalableTopicServiceTest`,
etc.).