Skip to content

[fix][broker]Do not trigger topic GC if replication is still active#25915

Open
poorbarcode wants to merge 20 commits into
apache:masterfrom
poorbarcode:fix/topic_gc_replication
Open

[fix][broker]Do not trigger topic GC if replication is still active#25915
poorbarcode wants to merge 20 commits into
apache:masterfrom
poorbarcode:fix/topic_gc_replication

Conversation

@poorbarcode

@poorbarcode poorbarcode commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Motivation

Replication is stuck due to the Topic GC

  • Enable binary way replication between the primary cluster and backup cluster
  • primary cluster: Publishing messages into primary cluster
    • backup cluster: No consumer/producer is registered.
  • backup cluster: Check topic GC
    • No subscriptions
    • No producers except remote producer
    • The topic should be GC
      • Disable replication
      • The topic will not be deleted since the remote-side producer is still registered
  • backup cluster: the topic GC progress is waiting for the remote-producer to be disconnected.
    • It will not be executed since no one wants to delete the topic.
  • backup cluster: backlog increases because the replicator was closed
    • Although the messages copied from the remote end will not be copied back repeatedly, the replicator still needs to perform a check and then mark delete.

Modifications

  • Topic GC will only be triggered if there is no producer(includes remote producer) and no replicator
  • Replicator producer will be closed after there are no messages to be replicated anymore.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@poorbarcode poorbarcode added this to the 5.0.0-M1 milestone Jun 1, 2026
@poorbarcode poorbarcode self-assigned this Jun 1, 2026
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/4.2.2 release/4.0.11 ready-to-test labels Jun 1, 2026

@codelipenghui codelipenghui left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode I'm thinking another solution

  1. Close the replicator producer if the producer is idle for a while (e.g. 10 mins)
  2. Check all the producers for detecting the inactive topic (now it's more complicated to skip the replicator producer)
  3. Only delete the inactive topic if there is no producers (including the replicator producer)

Now, your solution added 7 days delay for inactive topic deletion if geo-replication enable. But if there is no messages from last 7 days, the issue can still happen, right?

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@void-ptr974

Copy link
Copy Markdown
Contributor

@poorbarcode I'm thinking another solution

  1. Close the replicator producer if the producer is idle for a while (e.g. 10 mins)
  2. Check all the producers for detecting the inactive topic (now it's more complicated to skip the replicator producer)
  3. Only delete the inactive topic if there is no producers (including the replicator producer)

Now, your solution added 7 days delay for inactive topic deletion if geo-replication enable. But if there is no messages from last 7 days, the issue can still happen, right?

I agree with this concern. A replicated topic can be quiet for longer than the threshold while the replication relationship is still valid.

Using latestPublishTime here makes topic deletion depend on the traffic pattern instead of the producer lifecycle. It seems cleaner to let the replicator close its producer explicitly when it is really idle, and let topic GC only check whether producers still exist.

@poorbarcode

Copy link
Copy Markdown
Contributor Author

@codelipenghui @void-ptr974 Changed the solution as @codelipenghui suggested, please review again

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@poorbarcode poorbarcode requested a review from lhotari June 3, 2026 08:58
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Outdated
@poorbarcode

Copy link
Copy Markdown
Contributor Author

@codelipenghui @lhotari @void-ptr974

Please review the PR again. I have changed the solution:

  • Topic GC will only be triggered if there is no producer(includes remote producer) and no replicator
  • Replicator producer will be closed after there are no messages to be replicated, it does not rely on the Persistent Topic anymore

@poorbarcode poorbarcode requested review from codelipenghui and lhotari and removed request for codelipenghui, lhotari and void-ptr974 June 11, 2026 09:49
@poorbarcode poorbarcode requested a review from void-ptr974 June 11, 2026 09:49
@poorbarcode poorbarcode requested a review from void-ptr974 June 15, 2026 06:25
void-ptr974

This comment was marked as duplicate.

Comment on lines +422 to +426
// Start producer and retry.
if (state == Disconnected) {
startProducer();
retryReplicateEntries.run();
return;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BUG] Auto-resume can replicate newer positions before older ones → reordering, and silent message loss when dedup is enabled

After an idle disconnect, this resume path can replicate a newer batch before the older one it is holding, which breaks replication ordering and — with deduplication enabled — silently drops the older message.

Sequence: when a wait-read completes while state == Disconnected, this branch holds that batch (call it R1, the older positions) and reschedules it ~100ms later, while also calling startProducer(). startProducer()setProducerAndTriggerReadEntries() flips the state to Started and immediately calls readMoreEntries():

protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
/**
* 1. Try change state to {@link Started}.
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
* producer when the state is {@link Started}.
*/
Pair<Boolean, State> changeStateRes;
changeStateRes = compareSetAndGetState(Starting, Started);
if (changeStateRes.getLeft()) {
if (!(producer instanceof ProducerImpl)) {
log.error("The partitions count between two clusters is not the same, "
+ "the replicator can not be created successfully");
doCloseProducerAsync(producer, () -> {});
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
}
this.producer = (ProducerImpl) producer;
// Trigger a new read.
log.info("Created replicator producer");
backOff.reset();
// activate cursor: so, entries can be cached.
this.cursor.setActive();
// read entries
readMoreEntries();
} else {

Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.

Two consequences:

  • Replication order across the reconnect can be violated.
  • The receiving cluster dedups replicated messages by source position, not wire order:

synchronized (highestSequencedPushed) {
Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey);
Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey);
if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null
&& (replSequenceLId < lastSequenceLIdPushed.longValue()
|| (replSequenceLId == lastSequenceLIdPushed.longValue()
&& replSequenceEId <= lastSequenceEIdPushed.longValue()))) {
log.debug()

Once R2's higher position is recorded, R1 (lower position) is classified a duplicate and discarded on the remote, while the source side mark-deletes R1 as delivered on the (successful) send callback:

public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
replicator.log.error()
.attr("inFlightTasks", replicator.inFlightTasks)
.attr("pendingQueueSize", replicator.producer.getPendingQueueSize())
.exception(exception)
.log("Error producing on remote broker");
// cursor should be rewound since it was incremented when readMoreEntries
replicator.beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Failed_Publishing);
replicator.doRewindCursor(false);
} else {
replicator.log.debug()
.exception(exception)
.log("Message persisted on remote broker");
inFlightTask.incCompletedEntries();
replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
}

i.e. permanent silent message loss across the reconnect. (Loss requires topic dedup enabled; without it the symptom is out-of-order replication, which is still a correctness issue for ordered consumers.)

The previous design avoided this: cursor.rewind() on (re)connect plus pending-read cancellation guaranteed in-order resume from markDelete + 1. Could we hold the new read until the held batch has been replicated (or rewind / cancel the pending read on disconnect) so resume is strictly in order?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a replicator dedup related PR in #25860 by @void-ptr974

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because R1's InFlightTask already has entries != null, it is no longer counted as a pending read, so a fresh read R2 (strictly newer positions — the cursor read position already advanced past R1 and is no longer rewound) is issued and can be sent by doReplicateEntries(R2) before R1's 100ms-deferred retry fires.

Could you point why the entries is not null?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcodeentries is set as the very first action of the read callback, before any state is checked. In readEntriesComplete:

InFlightTask inFlightTask = (InFlightTask) ctx;
inFlightTask.setEntries(entries);   // entries becomes non-null here

So by the time we get into replicateEntries and hit the Disconnected branch (startProducer(); retryReplicateEntries.run(); return;), inFlightTask.entries is already populated, and that branch reschedules R1 ~100 ms later without resetting entries to null. R1 also can't be recycled while it's parked (isDone() is false: completedEntries == 0 < entries.size()), so it stays in inFlightTasks with non-null entries.

That's what removes the gate: the "pending read" test is exactly readPos != null && entries == null in both getPermitsIfNoPendingRead (L961) and hasPendingRead (L1054). With entries != null, R1 no longer counts as a pending read, so when startProducer() flips the state to Started and setProducerAndTriggerReadEntries() calls readMoreEntries(), a fresh read R2 is issued at the advanced cursor.getReadPosition() — and R2 can be sent before R1's deferred retry fires.

Worth noting: the GC/idle disconnect() path goes through beforeDisconnect() (backlog check only), so unlike beforeTerminateOrCursorRewinding() it does not cancelPendingReadRequest() or rewind the cursor — which is also why R1's wait-read survives the producer close and completes while Disconnected in the first place. This is essentially what the cursor.rewind() + pending-read cancellation that this PR removed from setProducerAndTriggerReadEntries used to prevent.

On the consequence: repl dedup is a high-water-mark keyed by source position (isDuplicateReplV2), so R2 (newer) being recorded before R1 (older) is enough — once the mark is past R1, R1 comes back as a duplicate. If R2 is already persisted, R1 gets Dup, which PersistentTopic acks as success, so the source mark-deletes it → silent loss; if R2 is only pushed-not-persisted, R1 gets Unknown and the source retries (recoverable). With dedup off it's out-of-order delivery. The simplest fix is to keep resume strictly in order — don't let the deferred retry leave a held batch counting as "not a pending read" while a fresh read is issued ahead of it (e.g. gate the new read on the held task, or rewind/cancel on disconnect as before).

@poorbarcode poorbarcode Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I should let the check to be in readCompete, rather than let it to be in replicatEntries.

Fixed


There's also a replicator dedup related PR in #25860 by @void-ptr974

I will review it tomorrow ❤️

Comment on lines +286 to +290
// Has backlog.
long backlog = getNumberOfEntriesInBacklog();
if (backlog > 0) {
return;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[QUALITY] Loss-safety now rests entirely on the backlog == 0 precondition — worth documenting the invariant

It would help to document (a comment, or an assertion) that this backlog == 0 precondition is now load-bearing for correctness. Since both the cursor.rewind() on reconnect and the afterDisconnected() rewind were removed, nothing re-reads entries that are read-but-not-yet-acked at producer restart. No-loss safety now rests entirely on (a) this backlog == 0 check and (b) beforeDisconnect() rejecting any in-flight task below the last confirmed entry:

protected CompletableFuture<Void> beforeDisconnect() {
// Ensure no in-flight task.
synchronized (inFlightTasks) {
for (PersistentReplicator.InFlightTask task : inFlightTasks) {
if (!task.isDone() && task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) {
return CompletableFuture.failedFuture(new BrokerServiceException
.TopicBusyException("Cannot close a replicator with backlog"));
}
}
return CompletableFuture.completedFuture(null);
}
}

There's no construct of a loss case under these guards currently, so this is fine as written — but the invariant is implicit and easy to break. A future disconnect trigger that doesn't enforce backlog == 0, or a regression in beforeDisconnect(), would silently drop those entries, with no rewind left as a safety net. A short comment stating the requirement would make that explicit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not understand what concern is it

@poorbarcode poorbarcode requested a review from lhotari June 16, 2026 04:03
long estimatedTimeStampProducerConnected = this.estimatedTimeStampProducerConnected;
long delayMillis;
if (estimatedTimeStampProducerConnected > System.currentTimeMillis()) {
delayMillis = (estimatedTimeStampProducerConnected - System.currentTimeMillis()) + 100;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could delayMillis be negative?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line-412 prevents the situation you mentioned

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode poorbarcode requested a review from lhotari June 16, 2026 14:22
@gaoran10

Copy link
Copy Markdown
Contributor

Please check the code style.

@poorbarcode poorbarcode force-pushed the fix/topic_gc_replication branch from 56026a4 to cd58d4d Compare June 17, 2026 13:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready-to-test release/4.0.12 release/4.2.3 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants