feat: on-demand partial segment loading in SegmentLocalCacheManager#19535
feat: on-demand partial segment loading in SegmentLocalCacheManager#19535clintropolis wants to merge 10 commits into
Conversation
changes: * adds `acquirePartialSegment` / `acquireCachedPartialSegment` to `SegmentCacheManager` and `SegmentManager` to allow callers to opt-in to async partial segments; MSQ `RegularLoadableSegment` uses the new partial path * `PartialSegmentMetadataCacheEntry`, `PartialSegmentBundleCacheEntry` are now wired into `SegmentLocalCacheManager`, along with added `PartialBundleAcquirer` helper to pass things like download thread pool and the ability to acquire reference holds on cache entrie * adds `PartialQueryableIndexSegment`, `PartialQueryableIndexCursorFactory`, `V10TimeBoundaryInspector` for references acquired from the now wired up metadata cache entry, implementing the async cursor holder contract * moved `DirectoryBackedRangeReader` out of tests to use as the implementation for `LocalLoadSpec` range reader. * add `FilteredCursorFactory`/`RestrictedCursorFactory` async cursor implementations * adds config `druid.segmentCache.virtualStoragePartialDownloadsEnabled to enable feature, on iff druid.segmentCache.virtualStorage is on
| if (t instanceof Error) { | ||
| throw (Error) t; | ||
| } | ||
| Throwables.propagateIfInstanceOf(t, IOException.class); |
| throw (Error) t; | ||
| } | ||
| Throwables.propagateIfInstanceOf(t, IOException.class); | ||
| Throwables.propagateIfPossible(t); |
| catch (Throwable cleanupError) { | ||
| t.addSuppressed(cleanupError); | ||
| } | ||
| Throwables.propagateIfInstanceOf(t, IOException.class); |
| t.addSuppressed(cleanupError); | ||
| } | ||
| Throwables.propagateIfInstanceOf(t, IOException.class); | ||
| Throwables.propagateIfPossible(t); |
| }; | ||
|
|
||
| // DataSegment with a LocalLoadSpec pointing at the deep storage directory (unzipped V10 layout). | ||
| partialSegment = DataSegment.builder() |
| location = manager.getLocations().get(0); | ||
| // Drive eviction off hold-release so tests don't need to provoke SIEVE via capacity pressure. | ||
| location.setAreWeakEntriesEphemeral(true); | ||
| dataSegment = DataSegment.builder() |
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 37 of 37 changed files.
This is an automated review by Codex GPT-5.5
| @SuppressWarnings("unused") | ||
| ListenableFuture<?> unused = gatedExec.submit(() -> { | ||
| try { | ||
| gate.await(); | ||
| } | ||
| catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| }); |
| @SuppressWarnings("unchecked") | ||
| @Nullable | ||
| @Override | ||
| public <T> T as(@Nonnull Class<T> clazz) |
There was a problem hiding this comment.
Is it possible to include a PhysicalSegmentInspector? It would be useful to get the number of rows, which is used for incrementing certain counters.
There was a problem hiding this comment.
#19577 split out RowCountInspector which is now wired up in this PR so we can answer the row count stuff which is much more widely used than the other parts of this interface (and answerable purely by metadata with v10 segments)
| } | ||
|
|
||
| /** | ||
| * Partial-load variant of {@link #acquireSegment(DataSegment)}, returns an {@link AcquireSegmentAction} that |
There was a problem hiding this comment.
Is the segment returned by acquireSegment guaranteed to be fully-loaded? (Its javadocs do not seem to make that clear.)
There was a problem hiding this comment.
the methods are now combined and accept an AcquireMode enum argument, i think it is a bit clearer what happens
| } | ||
|
|
||
| /** | ||
| * Partial-load variant of {@link #acquireCachedSegment(SegmentId)}, returns a {@link Segment} when the cache holds |
There was a problem hiding this comment.
Is the segment returned by acquireCachedSegment guaranteed to be fully-loaded? (Its javadocs do not seem to make that clear.)
There was a problem hiding this comment.
same thing with the enum AcquireMode argument, i think it should be more obvious now
| } | ||
|
|
||
| final Optional<Segment> cachedSegment = segmentManager.acquireCachedSegment(segmentId); | ||
| final Optional<Segment> cachedSegment = segmentManager.acquireCachedPartialSegment(segmentId); |
There was a problem hiding this comment.
Make it an option whether RegularLoadableSegment does a partial fetch or full fetch? It may not be a safe assumption that all MSQ implementations of query processing are going to be able to handle partial segments / async stuff. Even if all existing ones do, there's also future ones, ones from extensions, etc.
There was a problem hiding this comment.
this now accepts the AcquireMode enum argument to pass through, and the engine decides what mode it wants when creating the ReadableInputQueue (see BaseLeafStageProcessor.makeReadableInputQueue)
| // partial-load on-disk state survived a toggle of druid.segmentCache.virtualStoragePartialDownloadsEnabled | ||
| // to false (getCachedSegments reserves an on-disk partial layout regardless of the flag). The eager path | ||
| // cannot serve a partial layout; surface a clear operator error rather than a ClassCastException. | ||
| throw DruidException.forPersona(DruidException.Persona.OPERATOR) |
There was a problem hiding this comment.
Two concerns:
- Won't this also be an issue on downgrade?
- Is it possible to repair this rather than throwing an error? As is, it will be difficult to turn off partial downloads once they are enabled.
There was a problem hiding this comment.
I am also interested in these ^
There was a problem hiding this comment.
agree this was a wack failure mode, I have reworked stuff so that if we detect partials on disk during startup and partials are not enabled we now delete them instead of exploding at query time. I have also switched the config to disable this partial load functionality until we have some more time to mature things.
With regards to rollback to older versions, what happens is that on startup mount will fail with an exception because the path will not contain expected v9 or v10 segment files, and the handling on that exception will trigger deletion because the path exists unless druid.segmentCache.deleteOnRemove is false (it is true by default). We could do something kind of tricky to ensure that they are always deleted, which is always write the DOWNLOAD_START_MARKER_FILE_NAME for partial segments so it would trigger the 'corrupted segment' handling if we tried to mount as a regular segment, but I have not done that yet in this PR because it feels kind of strange.
| return required; | ||
| } | ||
| // Conservative fallback when physicalColumns isn't declared, fetch every column on the chosen row selector | ||
| // plus __time (which is special-cased and not enumerated by getColumnNames()). |
There was a problem hiding this comment.
Seems weird. Can you consider changing it in this PR, so getColumnNames() does include __time? Looking around at the usages of getColumnNames(), it doesn't seem like anything is super-dependent on them not including __time.
| /** | ||
| * Determine the set of physical column names to required from the chosen row selector given a {@link CursorBuildSpec} | ||
| */ | ||
| private static Set<String> requiredColumns( |
There was a problem hiding this comment.
nit: the private static is typically placed after all non-static methods
| * {@code __time} column) and the partial-aware {@link PartialQueryableIndexCursorFactory} (which downloads | ||
| * required files on the supplied download executor before handing back a cursor). | ||
| * <p> | ||
| * Lifecycle: this segment is intended to exist as a transient reference-hold scope over an externally-owned |
There was a problem hiding this comment.
What typically owns the PartialQueryableIndex?
| * is absent, is a {@link #staticCacheEntries} entry, or still has outstanding holds. | ||
| * <p> | ||
| * This exists for callers that register a weak entry <em>without</em> a {@link ReservationHold} (the bootstrap | ||
| * reserve path uses {@link #reserveWeak}) and need to clean it up after a failed mount. The normal runtime path |
There was a problem hiding this comment.
I think this will be useful in #19539: lingering entries on failed mount was a problem raised in review (#19539 (comment))
| private AcquireSegmentAction acquirePartialInternal( | ||
| DataSegment dataSegment, | ||
| SegmentRangeReader rangeReader, | ||
| boolean fullDownload |
There was a problem hiding this comment.
On the fullDownload path I don't see stuff that is creating a cache entry and hold on the StorageLocation to account for the storage space. The logic seems to be fetching things outside of a hold. Is that right or did I miss it?
There was a problem hiding this comment.
yea, this was an oversight in a refactor, good catch; it should be handled correctly now
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 38 of 38 changed files.
This is an automated review by Codex GPT-5.5
| for (ListenableFuture<?> columnDownload : columnDownloads) { | ||
| columnDownload.cancel(true); | ||
| } | ||
| CloseableUtils.closeAndSuppressExceptions(releaseHoldOnce, ignored -> {}); |
There was a problem hiding this comment.
[P1] Keep bundle holds until running downloads finish
When an async cursor is closed after a column download has already entered rowSelector.getColumnHolder/mapFile, cancel(true) does not wait for that callable to stop; the code even documents that in-flight read/write loops may run to completion. The canceler still releases the bundle hold immediately, which drops the bundle reference and storage hold while mapper work may still be using the container. That allows SIEVE/ephemeral eviction or a pending unmount to reach PartialSegmentFileMapperV10.evictContainer despite mapFile/ensureFilesAvailable still being in flight, violating that mapper's concurrency contract and risking deletion/unmap of a container being read or written. Defer releasing the bundle hold/reference until submitted downloads have actually returned, or add per-download protection so eviction cannot run while a started download continues.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
Reviewed 62 of 62 changed files.
Findings that could not be attached inline:
- server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:119 - [P2] Stop factory-owned loading pools. After moving the on-demand executor into StorageLoadingThreadPool, this shutdown path no longer stops virtualStorageLoadingThreadPool. That is OK for the lifecycle-managed singleton from StorageNodeModule, but SegmentCacheManagerFactory.manufacturate creates a StorageLoadingThreadPool manually for task/MSQ segment caches and callers only shut down the SegmentCacheManager/SegmentManager. Those per-task virtual-storage managers now leak their loading executor after task completion; with fixed platform threads this can keep task JVMs alive, and repeated tasks can accumulate idle pools. Please either have SegmentLocalCacheManager.shutdown stop factory-owned pools or arrange for the factory-created pool to be lifecycle-owned and stopped.
This is an automated review by Codex GPT-5.5
capistrant
left a comment
There was a problem hiding this comment.
nothing overly concerning jumping out at me so far. I am still processing some of the larger app file changes. but first glance didn't catch any glaring blockers
| { | ||
| /** | ||
| * Build an inspector that reads from the given base projection metadata, falling back to {@code fallbackInterval} | ||
| * when the writer didn't persist min/max times (e.g. pre-Phase-1.5 segments). |
There was a problem hiding this comment.
"pre-Phase-1.5" callout feels unnecessary with no added context
| } | ||
| } | ||
|
|
||
| private File buildTimeOrderedProjectionSegment(String projectionName) |
There was a problem hiding this comment.
nit: private helper method interleaved within tests
| // is responsible for closing the holder. Use the real (single-threaded) executor with a gate that pauses the | ||
| // download task before it builds the cursor, then close the wrapper, then release the gate. | ||
| final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); | ||
| final AtomicReference<RuntimeException> bgErr = new AtomicReference<>(); |
There was a problem hiding this comment.
bgErr doesn't actually seem to be wired into the test at all. it is just created and used in get assertion below.
There was a problem hiding this comment.
oops, this was stale test after refactoring some stuff, it no longer had a point, deleted
| // partial-load on-disk state survived a toggle of druid.segmentCache.virtualStoragePartialDownloadsEnabled | ||
| // to false (getCachedSegments reserves an on-disk partial layout regardless of the flag). The eager path | ||
| // cannot serve a partial layout; surface a clear operator error rather than a ClassCastException. | ||
| throw DruidException.forPersona(DruidException.Persona.OPERATOR) |
There was a problem hiding this comment.
I am also interested in these ^
| catch (Throwable t) { | ||
| // Failure between acquire and wiring up the downloads (submitDownload shut-down rejection, etc.). Ownership of | ||
| // the bundle hold hasn't transferred to the holder yet, so release it here. | ||
| throw CloseableUtils.closeAndWrapInCatch(t, holdRelease.releaser()); |
There was a problem hiding this comment.
why do we use releaser() here, which is documented as a "success-path" releaser, instead of requesting a release? also, do we need to worry about what is in columnDownloads at all at this point?
There was a problem hiding this comment.
this was a mistake actually, good catch
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 64 of 64 changed files.
This is an automated review by Codex GPT-5.5
Description
changes:
acquirePartialSegment/acquireCachedPartialSegmenttoSegmentCacheManagerandSegmentManagerto allow callers to opt-in to async partial segments; MSQRegularLoadableSegmentuses the new partial pathPartialSegmentMetadataCacheEntry,PartialSegmentBundleCacheEntryare now wired intoSegmentLocalCacheManager, along with addedPartialBundleAcquirerhelper to pass things like download thread pool and the ability to acquire reference holds on cache entriePartialQueryableIndexSegment,PartialQueryableIndexCursorFactory,V10TimeBoundaryInspectorfor references acquired from the now wired up metadata cache entry, implementing the async cursor holder contractDirectoryBackedRangeReaderout of tests to use as the implementation forLocalLoadSpecrange reader.FilteredCursorFactory/RestrictedCursorFactoryasync cursor implementations