-
Notifications
You must be signed in to change notification settings - Fork 565
[lake] Fix DV readable snapshot scanning and premature snapshot deletion #3519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,6 @@ | |
| import org.apache.fluss.utils.ExceptionUtils; | ||
| import org.apache.fluss.utils.types.Tuple2; | ||
|
|
||
| import org.apache.paimon.CoreOptions; | ||
| import org.apache.paimon.Snapshot; | ||
| import org.apache.paimon.data.BinaryRow; | ||
| import org.apache.paimon.data.BinaryString; | ||
|
|
@@ -256,19 +255,19 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI | |
| return null; | ||
| } | ||
|
|
||
| // We keep snapshots because for a compacted snapshot, if a bucket has L0, we find the | ||
| // snapshot that exactly holds those L0, then use that snapshot's previous APPEND's tiered | ||
| // offset as the readable offset (that offset is safe to read). When the current compacted | ||
| // snapshot has no L0 in any bucket, we do not traverse; for any later compact we would | ||
| // traverse to (going backwards in time), if some bucket has L0, the snapshot that exactly | ||
| // holds that L0 must be after the current compacted snapshot on the timeline. So that | ||
| // snapshot's previous APPEND cannot be earlier than the current compacted snapshot's | ||
| // previous APPEND. Therefore the minimum snapshot we need to keep is the current compact's | ||
| // previous APPEND; set earliestSnapshotIdToKeep to it so it is not deleted. Earlier | ||
| // snapshots may be safely deleted. | ||
| if (bucketsWithL0.isEmpty()) { | ||
| earliestSnapshotIdToKeep = compactedSnapshotPreviousAppendSnapshot.id(); | ||
| } | ||
| // The earliest snapshot we must keep is bounded by EVERY bucket's base anchor, i.e. the | ||
| // previous APPEND of the latest snapshot that exactly holds the L0 files of the bucket's | ||
| // most recent flush. A bucket's base anchor only moves forward when the bucket is flushed | ||
| // again; until then, a future recomputation (triggered once the bucket receives new L0) | ||
| // will trace back to that same anchor, so the anchor snapshot must stay retained. | ||
| // | ||
| // Buckets with L0 in the current compacted snapshot have their anchor found by the | ||
| // traversal below (which also derives their readable offset). Buckets without L0 take their | ||
| // readable offset from the latest tiered snapshot and are NOT traversed there, so their | ||
| // anchors are computed separately afterwards. We must not assume "no L0 in any bucket" | ||
| // means earlier snapshots are deletable: a bucket can be clean in the current compacted | ||
| // snapshot yet still be anchored to an older snapshot (it was flushed earlier and has not | ||
| // been flushed since). | ||
|
|
||
| // for all buckets with l0, we need to find the latest compacted snapshot which flushed | ||
| // the buckets, the per-bucket offset should be updated to the corresponding compacted | ||
|
|
@@ -305,52 +304,26 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI | |
| continue; | ||
| } | ||
| if (!readableOffsets.containsKey(tb)) { | ||
| Snapshot sourceSnapshot = | ||
| findLatestSnapshotExactlyHoldingL0Files( | ||
| fileStoreTable, currentSnapshot); | ||
| // it happens if there is a compacted snapshot flush l0 files for a bucket, | ||
| // but the snapshot from which the compacted snapshot compact is expired | ||
| // it should happen rarely, we can't determine the readable offsets for this | ||
| // bucket, currently, we just return null to stop readable offset advance | ||
| // if it happen, compaction should work unexpected, warn it and reminds to | ||
| // increase snapshot retention | ||
| if (sourceSnapshot == null) { | ||
| LOG.warn( | ||
| "Cannot find snapshot holding L0 files flushed by compacted snapshot {} for bucket {}, " | ||
| + "the snapshot may have been expired. Consider increasing snapshot retention.", | ||
| currentSnapshot.id(), | ||
| tb); | ||
| return null; | ||
| } | ||
|
|
||
| // we already find that for this bucket, which snapshot do the latest flush, | ||
| // the offset for the previous one append snapshot should be the readable | ||
| // offset | ||
| Snapshot previousAppendSnapshot = | ||
| sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND | ||
| ? sourceSnapshot | ||
| : findPreviousSnapshot( | ||
| sourceSnapshot.id(), Snapshot.CommitKind.APPEND); | ||
|
|
||
| // Can't find previous APPEND snapshot, likely due to snapshot expiration. | ||
| // This happens when the snapshot holding flushed L0 files is a COMPACT | ||
| // snapshot, | ||
| // and all APPEND snapshots before it have been expired. | ||
| // The previous APPEND of the latest snapshot that still exactly holds the L0 | ||
| // files flushed by currentSnapshot is this bucket's base anchor; its tiered | ||
| // offset is the bucket's readable offset. | ||
| Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); | ||
|
|
||
| // Can't determine the base anchor, likely due to snapshot expiration: either | ||
| // the snapshot holding the flushed L0 files, or all earlier APPEND snapshots, | ||
| // have been expired. We can't determine this bucket's readable offset, so stop | ||
| // advancing and return null. | ||
| // | ||
| // TODO: Optimization - Store compacted snapshot offsets in Fluss | ||
| // Currently, we rely on Paimon to find the previous APPEND snapshot to get its | ||
| // offset. If Fluss stores offsets for all snapshots (including COMPACT | ||
| // snapshots), | ||
| // we could: | ||
| // 1. Use the sourceSnapshot's offset directly if it's stored in Fluss | ||
| // 2. Find any previous snapshot (COMPACT or APPEND) and use its offset | ||
| // 3. This would make the system more resilient to snapshot expiration | ||
| // TODO: Optimization - Store compacted snapshot offsets in Fluss so we don't | ||
| // need to find the previous APPEND snapshot to get its offset, making this | ||
| // resilient to snapshot expiration. | ||
| if (previousAppendSnapshot == null) { | ||
| LOG.warn( | ||
| "Cannot find previous APPEND snapshot before snapshot {} for bucket {}. " | ||
| + "This may be due to snapshot expiration. Consider increasing paimon snapshot retention.", | ||
| sourceSnapshot.id(), | ||
| tb); | ||
| "Cannot determine base anchor (previous APPEND) for bucket {} flushed by " | ||
| + "compacted snapshot {}. Snapshot history may have expired; " | ||
| + "consider increasing paimon snapshot retention.", | ||
| tb, | ||
| currentSnapshot.id()); | ||
| return null; | ||
| } | ||
|
|
||
|
|
@@ -404,6 +377,23 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI | |
| return null; | ||
| } | ||
|
|
||
| // Tighten earliestSnapshotIdToKeep using the base anchors of buckets without L0 as well. | ||
| // Their readable offsets come from the latest tiered snapshot, but their base was | ||
| // established by an earlier flush; once such a bucket receives new L0, a later | ||
| // recomputation traces back to that flush's anchor, so the anchor snapshot must be kept. | ||
| // | ||
| // This is best-effort: if a bucket's flush history has expired and its anchor cannot be | ||
| // determined, we conservatively keep all previous snapshots (KEEP_ALL_PREVIOUS) rather than | ||
| // risk deleting one that is still needed. We never fail the whole readable-offset advance | ||
| // here, since these buckets' offsets are already resolved from the latest tiered snapshot. | ||
| earliestSnapshotIdToKeep = | ||
| tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( | ||
| bucketsWithoutL0, | ||
| flussTableBucketMapper, | ||
| latestCompactedSnapshot.id(), | ||
| earliestSnapshotId, | ||
| earliestSnapshotIdToKeep); | ||
|
|
||
| // we use the previous append snapshot tiered offset of the compacted snapshot as the | ||
| // compacted snapshot tiered offsets | ||
| LakeSnapshot tieredLakeSnapshot = | ||
|
|
@@ -426,6 +416,112 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI | |
| earliestSnapshotIdToKeep); | ||
| } | ||
|
|
||
| /** | ||
| * Lowers {@code earliestSnapshotIdToKeep} to also cover the base anchors of buckets that have | ||
| * no L0 files in the latest compacted snapshot. | ||
| * | ||
| * <p>A bucket without L0 has all of its data in base files, and its readable offset is taken | ||
| * from the latest tiered snapshot. However, that base was established by the bucket's most | ||
| * recent flush, and a later recomputation (once the bucket receives new L0) will trace back to | ||
| * that flush's anchor snapshot (the previous APPEND of the latest snapshot that exactly holds | ||
| * the flushed L0). That anchor snapshot must therefore stay retained until the bucket is | ||
| * flushed again. | ||
| * | ||
| * <p>This is best-effort: if a bucket's flush history has expired and its anchor cannot be | ||
| * determined, this conservatively returns {@link LakeCommitResult#KEEP_ALL_PREVIOUS} so that no | ||
| * snapshot is deleted, rather than risk deleting one that is still needed. | ||
| * | ||
| * @param bucketsWithoutL0 buckets with no L0 in the latest compacted snapshot | ||
| * @param flussTableBucketMapper mapper from Paimon partition-bucket to Fluss table bucket | ||
| * @param latestCompactedSnapshotId the latest compacted snapshot id (traversal start) | ||
| * @param earliestSnapshotId the earliest snapshot id still present in Paimon (traversal end) | ||
| * @param earliestSnapshotIdToKeep the current value computed from buckets with L0 | ||
| * @return the tightened earliest snapshot id to keep | ||
| */ | ||
| private long tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( | ||
| Set<PaimonPartitionBucket> bucketsWithoutL0, | ||
| FlussTableBucketMapper flussTableBucketMapper, | ||
| long latestCompactedSnapshotId, | ||
| long earliestSnapshotId, | ||
| long earliestSnapshotIdToKeep) | ||
| throws IOException { | ||
| if (bucketsWithoutL0.isEmpty()) { | ||
| return earliestSnapshotIdToKeep; | ||
| } | ||
|
|
||
| // Only track buckets that map to a Fluss bucket; unmappable ones (e.g. a partition not in | ||
| // Fluss) never need recomputation and must not pin retention. | ||
| Set<PaimonPartitionBucket> bucketsToAnchor = new HashSet<>(); | ||
| for (PaimonPartitionBucket bucket : bucketsWithoutL0) { | ||
| if (flussTableBucketMapper.toTableBucket(bucket) != null) { | ||
| bucketsToAnchor.add(bucket); | ||
| } | ||
| } | ||
| if (bucketsToAnchor.isEmpty()) { | ||
| return earliestSnapshotIdToKeep; | ||
| } | ||
| boolean allAnchorsResolved = true; | ||
|
|
||
| for (long currentSnapshotId = latestCompactedSnapshotId; | ||
| currentSnapshotId >= earliestSnapshotId && !bucketsToAnchor.isEmpty(); | ||
| currentSnapshotId--) { | ||
| Snapshot currentSnapshot = snapshotManager.tryGetSnapshot(currentSnapshotId); | ||
| if (currentSnapshot == null | ||
| || currentSnapshot.commitKind() != Snapshot.CommitKind.COMPACT) { | ||
| continue; | ||
| } | ||
| // The first flush encountered going backwards is the bucket's most recent flush. | ||
| for (PaimonPartitionBucket partitionBucket : getBucketsWithFlushedL0(currentSnapshot)) { | ||
| if (!bucketsToAnchor.remove(partitionBucket)) { | ||
| continue; | ||
| } | ||
| Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); | ||
| if (previousAppendSnapshot == null) { | ||
| // can't determine this bucket's base anchor; don't tighten retention | ||
| allAnchorsResolved = false; | ||
| continue; | ||
| } | ||
| if (earliestSnapshotIdToKeep <= 0 | ||
| || previousAppendSnapshot.id() < earliestSnapshotIdToKeep) { | ||
| earliestSnapshotIdToKeep = previousAppendSnapshot.id(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!bucketsToAnchor.isEmpty()) { | ||
| // some bucket's most recent flush was not found within the retained snapshots | ||
| allAnchorsResolved = false; | ||
| } | ||
|
|
||
| return allAnchorsResolved ? earliestSnapshotIdToKeep : LakeCommitResult.KEEP_ALL_PREVIOUS; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If one bucket's flush history has aged out, this returns KEEP_ALL_PREVIOUS and throws away the bound the other buckets already gave us - so retention never shrinks while that bucket stays cold. The unresolvable anchor is by definition older than earliestSnapshotId (already gone from Paimon), so keep-all can't recover it anyway, flooring at earliestSnapshotId is just as safe and lets retention track Paimon's expiry. WDYT? |
||
| } | ||
|
|
||
| /** | ||
| * Finds the base anchor APPEND snapshot for the bucket(s) whose L0 files were flushed by the | ||
| * given compacted snapshot. | ||
| * | ||
| * <p>The anchor is the previous APPEND of the latest snapshot that still exactly holds those | ||
| * flushed L0 files (see {@link PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}). | ||
| * Its tiered offset is the bucket's readable offset, and it must stay retained until the bucket | ||
| * is flushed again. | ||
| * | ||
| * @param flushingCompactedSnapshot the COMPACT snapshot that flushed the bucket's L0 files | ||
| * @return the base anchor APPEND snapshot, or {@code null} if it cannot be determined (e.g. the | ||
| * holding snapshot or all earlier APPEND snapshots have been expired) | ||
| */ | ||
| @Nullable | ||
| private Snapshot findBaseAnchorAppendSnapshot(Snapshot flushingCompactedSnapshot) | ||
| throws IOException { | ||
| Snapshot sourceSnapshot = | ||
| findLatestSnapshotExactlyHoldingL0Files(fileStoreTable, flushingCompactedSnapshot); | ||
| if (sourceSnapshot == null) { | ||
| return null; | ||
| } | ||
| return sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND | ||
| ? sourceSnapshot | ||
| : findPreviousSnapshot(sourceSnapshot.id(), Snapshot.CommitKind.APPEND); | ||
| } | ||
|
|
||
| /** | ||
| * Checks that the given lake snapshot belongs to the current table (same table id). Throws when | ||
| * the table may have been dropped and re-created with a different id; the tiering committer | ||
|
|
@@ -495,16 +591,10 @@ private LakeSnapshot getOrFetchLakeSnapshot(long snapshotId, Map<Long, LakeSnaps | |
| Set<PaimonPartitionBucket> bucketsWithoutL0 = new HashSet<>(); | ||
| Set<PaimonPartitionBucket> bucketsWithL0 = new HashSet<>(); | ||
|
|
||
| // Scan the snapshot to get all splits including level0 | ||
| Map<String, String> scanOptions = new HashMap<>(); | ||
| scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())); | ||
| // hacky: set batch scan mode to compact to make sure we can get l0 level files | ||
| scanOptions.put( | ||
| CoreOptions.BATCH_SCAN_MODE.key(), CoreOptions.BatchScanMode.COMPACT.getValue()); | ||
|
|
||
| // Scan the snapshot to get all data files including L0 level files | ||
| Map<BinaryRow, Map<Integer, List<ManifestEntry>>> manifestsByBucket = | ||
| FileStoreScan.Plan.groupByPartFiles( | ||
| fileStoreTable.copy(scanOptions).store().newScan().plan().files()); | ||
| fileStoreTable.store().newScan().withSnapshot(snapshot).plan().files()); | ||
|
|
||
| for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> manifestsByBucketEntry : | ||
| manifestsByBucket.entrySet()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This re-walks the same snapshots as the with-L0 loop above, so getBucketsWithFlushedL0 scans manifests twice per compacted snapshot. Could it fold into that loop: one pass, offsets for the with-L0 buckets, anchor-only for the rest?