[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132
[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132Kaixuan-Duan wants to merge 2 commits intoapache:mainfrom
Conversation
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Kaixuan-Duan Thanks for the contribution. I will help to review this PR
One process point: this issue was already assigned and I was actively working on it. In that situation, please coordinate on the issue before opening an overlapping PR. Assignment is not exclusive ownership, but it is an important coordination signal, and skipping it usually leads to duplicated effort and fragmented review.
We can evaluate this PR on its merits, but for future cases please check on the issue first.
fresh-borzoni
left a comment
There was a problem hiding this comment.
Ty, direction is right, I left some cooments, PTAL
|
|
||
| private void cancelPrefetch() { | ||
| if (nextDownloadedSegmentFuture != null) { | ||
| nextDownloadedSegmentFuture.cancel(true); |
There was a problem hiding this comment.
cancel(true) on an already-completed future is a no-op and drops the reference to the downloaded File, which then lives in tempDir until fetcher-level close()
| activeIterator = null; | ||
| } | ||
| } finally { | ||
| downloadExecutor.shutdownNow(); |
There was a problem hiding this comment.
shutdownNow() doesn't wait - if a prefetch is mid-flush, it can write to tempDir after deleteDirectoryQuietly runs. Either downloadExecutor.awaitTermination() with a short timeout before deletion, or make downloadSegment interruption-aware (most S3 SDKs don't honor Thread.isInterrupted() during socket reads, so the interrupt from shutdownNow is effectively decorative)
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { |
There was a problem hiding this comment.
If fetch() is called twice, the first Iterable still wraps the now-closed iterator and iterating it re-enters advance() on a closed instance, downloading into the shared tempDir, racing with the new iterator
| } | ||
|
|
||
| private File fetchSegmentFile(RemoteLogSegment segment) throws IOException { | ||
| if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) { |
There was a problem hiding this comment.
This depends on RemoteLogSegment having value-based equals(), or on both references coming from the same segments list (reference equality). Works today, but safer to compare by segment id tbh.
| if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) { | ||
| try { | ||
| return nextDownloadedSegmentFuture.get(); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
Also catch CancellationException - it's unchecked (extends RuntimeException) and CompletableFuture.get() throws it on a cancelled future. Not a live bug in the current state machine (every cancelPrefetch nulls the field) but cheap defense-in-depth, especially given closed is volatile.
| @@ -28,10 +28,13 @@ | |||
| import org.junit.jupiter.api.Test; | |||
There was a problem hiding this comment.
non-blocking: Two of the three new tests inject state via reflection (setPrivateField) instead of exercising a real async prefetch - they cover the branches in fetchSegmentFile, but not close-during-real-in-flight-download or the orphan-file cleanup.
Consider one integration-style test with a real slow/failing download source.
| "Prefetched segment {} failed, fallback to sync download.", | ||
| segment.remoteLogSegmentId(), | ||
| e.getCause()); | ||
| return downloadSegment(segment); |
There was a problem hiding this comment.
Non-blocking: No retry on transient S3 failure - one flaky segment fails the entire recovery. In fluss-rust we added exponential backoff (100ms -> 5s with jitter) for this.
| return downloadSegment(segment); | ||
| } | ||
|
|
||
| private void prefetchNextSegment() { |
There was a problem hiding this comment.
Prefetch depth hardcoded to 1. If S3 p99 download time > consume time for a segment, the downloader sits idle and the optimization is half-realized. On the Rust side (fluss-rust #187) we landed on configurable depth with default 4 for exactly this reason. Since it's KV depth = 1 might be fine, but it's still better to configure and reason properly
Purpose
Linked issue: close #3091
This PR improves KV recovery performance by reducing wait time between remote log segments in
RemoteLogFetcher.Brief change log
fetch()to ensure previous iterator cleanup.Tests
./mvnw -pl fluss-server -Dtest=RemoteLogFetcherTest -DfailIfNoTests=false -Dspotless.check.skip=true testAPI and Format
No API change. No storage/log format change.
Documentation
No user-facing feature. No documentation update required.