Skip to content

[client] Fix race condition in RemoteLogDownloader causing flaky testPrefetchNum#3158

Open
ankit-khare-2015 wants to merge 1 commit intoapache:mainfrom
ankit-khare-2015:fix/unstable-remote-log-downloader-test
Open

[client] Fix race condition in RemoteLogDownloader causing flaky testPrefetchNum#3158
ankit-khare-2015 wants to merge 1 commit intoapache:mainfrom
ankit-khare-2015:fix/unstable-remote-log-downloader-test

Conversation

@ankit-khare-2015
Copy link
Copy Markdown

@ankit-khare-2015 ankit-khare-2015 commented Apr 22, 2026

Summary

Fixes #3145RemoteLogDownloaderTest.testPrefetchNum was non-deterministically failing.

Root cause: In RemoteLogDownloader.fetchOnce(), the prefetch semaphore was acquired before polling the work queue:

// old order
prefetchSemaphore.acquire();                                   // holds permit
RemoteLogDownloadRequest request = segmentsToFetch.poll(...); // may return null
if (request == null) {
    prefetchSemaphore.release();  // held for up to pollTimeout (10 ms)
    return;
}

When the queue was empty the download thread held a permit for the full pollTimeout (10 ms in tests) before releasing it. The test assertion availablePermits() == 2 ran during that 10 ms window and saw 1 instead of 2, causing a spurious failure.

Fix: Swap the order — poll the queue first, acquire the semaphore only when there is actual work to do. The semaphore is never borrowed for null-polls, so availablePermits() accurately reflects true available capacity at all times.

// new order
RemoteLogDownloadRequest request = segmentsToFetch.poll(...); // check for work first
if (request == null) {
    return;  // no permit touched
}
prefetchSemaphore.acquire();  // only acquired when there is something to download

Side effect on testDownloadLogInParallelAndInPriority: With poll-first semantics the single download thread polls one extra item from the queue before blocking on the semaphore (4 downloading + 1 pending acquire). The hard isEqualTo(totalSegments - 4) queue-size assertion is relaxed to isBetween(totalSegments - 5, totalSegments - 4) to cover both cases.

Test Plan

  • ./mvnw test -Dtest=RemoteLogDownloaderTest -pl fluss-client — all 3 tests pass
  • ./mvnw spotless:check -pl fluss-client — no formatting violations

…PrefetchNum

The prefetch semaphore was acquired before polling the work queue in
fetchOnce(). When the queue was empty the download thread held a permit
for the full pollTimeout (10 ms) before releasing it, creating a window
where availablePermits() transiently returned 1 instead of 2 immediately
after two recycleRemoteLog() calls in testPrefetchNum.

Fix: poll the queue first and only acquire the semaphore when there is
actual work to do. The semaphore is never borrowed for null-polls, so
availablePermits() accurately reflects true available capacity at all
times.

Also updates testDownloadLogInParallelAndInPriority: with poll-first
semantics the download thread polls one extra item before blocking on
the semaphore, so the queue size is totalSegments-5 or totalSegments-4
(not strictly totalSegments-4 as before). The assertion is relaxed to
isBetween() to cover both cases.

Fixes apache#3145
@ankit-khare-2015 ankit-khare-2015 force-pushed the fix/unstable-remote-log-downloader-test branch from b671bb6 to a82249a Compare April 22, 2026 04:08
@ankit-khare-2015
Copy link
Copy Markdown
Author

Copy link
Copy Markdown
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@ankit-khare-2015
Copy link
Copy Markdown
Author

Thanks, @zuston. Could you please merge these changes to resolve the issue?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses flaky behavior in RemoteLogDownloaderTest.testPrefetchNum by changing RemoteLogDownloader.fetchOnce() to only acquire the prefetch semaphore when there is actual work, avoiding temporarily “borrowing” permits during empty-queue polls.

Changes:

  • Reordered fetchOnce() to poll the request queue before acquiring prefetchSemaphore.
  • Relaxed a brittle queue-size assertion in testDownloadLogInParallelAndInPriority to accommodate the new polling behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java Adjusts fetch ordering between queue poll and semaphore acquire to eliminate a timing window in tests.
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java Updates assertion to tolerate the updated fetch/prefetch behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 150 to +153

// blocks until there is capacity (the fetched file is consumed)
prefetchSemaphore.acquire();

Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchOnce() now polls from the PriorityBlockingQueue before acquiring the prefetchSemaphore. If no permits are available, the thread can block on acquire() while holding a dequeued request, which (1) can violate the intended priority ordering if higher-priority requests are added while blocked, and (2) can drop the request on shutdown/interrupt (InterruptedException from acquire() exits the thread without re-adding). Consider not removing an element from the priority queue until after a permit is obtained (e.g., poll to detect work, reinsert it, acquire permit, then take/poll again), and ensure InterruptedException preserves/requeues the request.

Suggested change
// blocks until there is capacity (the fetched file is consumed)
prefetchSemaphore.acquire();
// Put the request back before waiting for capacity so it is not lost on interruption
// and higher-priority requests can still be selected while this thread is blocked.
segmentsToFetch.add(request);
// blocks until there is capacity (the fetched file is consumed)
prefetchSemaphore.acquire();
request = segmentsToFetch.poll();
if (request == null) {
prefetchSemaphore.release();
return;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Copilot. The acquire() before poll() is intended to guarantees the consuming order of RemoteLogSegments. The consuming order is essential (see #1421).

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Apr 24, 2026

Hi @ankit-khare-2015, I think the root cause analysis provided by Claude Code is correct (pasted below). I suggest restricting the permits to 1 or 2 via an assertion and adding a comment before the assert line to explain the rationale.

            // the 2 semaphores are released, it should be 2 permits available. But the next
            // fetchOnce() maybe called at this point of time, and acquire the permits,
            // so we just check the permits is between 1 and 2.
            assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits())
                    .isBetween(1, 2);

By AI Generated:

Root Cause: Race Condition Between recycleRemoteLog and fetchOnce

Here's the sequence of events that causes the flakiness:

The test at line 127-128:

futures.get(1).getRecycleCallback().run();
futures.get(2).getRecycleCallback().run();
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(2);

Each getRecycleCallback().run() calls recycleRemoteLog() (line 133-136 of RemoteLogDownloader):

void recycleRemoteLog(RemoteLogSegment segment) {
    segmentsToRecycle.add(segment);
    prefetchSemaphore.release();  // increments permits
}

Meanwhile, the DownloadRemoteLogThread is continuously running fetchOnce() in a loop (via doWork()). The fetchOnce() method starts with:

void fetchOnce() throws Exception {
    prefetchSemaphore.acquire();  // decrements permits (blocks if 0)
    // ...
}

The race condition:

  1. futures.get(1).getRecycleCallback().run() → calls prefetchSemaphore.release() → permits becomes 1
  2. The DownloadRemoteLogThread is spinning in its loop. It calls prefetchSemaphore.acquire() → permits drops back to 0. But there's nothing left in segmentsToFetch (all 5 segments were already submitted and consumed), so segmentsToFetch.poll(pollTimeout, ...) returns null, and the thread calls prefetchSemaphore.release() → permits back to 1
  3. futures.get(2).getRecycleCallback().run() → calls prefetchSemaphore.release() → permits becomes 2
  4. Test asserts availablePermits() == 2Happy path

But in the failing scenario:

  1. futures.get(1).getRecycleCallback().run() → permits becomes 1
  2. futures.get(2).getRecycleCallback().run() → permits becomes 2
  3. Before the assertion, the download thread wakes up and calls prefetchSemaphore.acquire() → permits drops to 1
  4. Test asserts availablePermits() == 2FAILS with expected 2 but was 1

The download thread acquired a permit between the two release() calls and the assertion check. Even though the thread will eventually release it back (because segmentsToFetch is empty and poll returns null), the assertion fires at the wrong moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[test] Unstable test RemoteLogDownloaderTest.testPrefetchNum

4 participants