diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java index 39ef0bab5299..bbdf5e14bc4d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java @@ -66,7 +66,16 @@ static class CompactionContext { final boolean fullCompaction; final RecordComparator fieldComparator; final Set deleteEntries; - final Map defaultCompactionMap; + /** + * Manifest files that need unsorted compaction. + * + *

Key: manifest file metadata + * + *

Value: true if fullCompaction is true and the file overlaps with delete partitions. It + * means the file needs to eliminate delete entries file + */ + final Map compactWithoutSort; + final List levelRuns; final List pickedRuns; @@ -74,31 +83,44 @@ static class CompactionContext { boolean fullCompaction, RecordComparator fieldComparator, Set deleteEntries, - Map defaultCompactionMap, + Map compactWithoutSort, List levelRuns, List pickedRuns) { this.fullCompaction = fullCompaction; this.fieldComparator = fieldComparator; this.deleteEntries = deleteEntries; - this.defaultCompactionMap = defaultCompactionMap; + this.compactWithoutSort = compactWithoutSort; this.levelRuns = levelRuns; this.pickedRuns = pickedRuns; } + + /** Check whether the given manifest file is marked for unsorted compaction. */ + boolean isMarkedForUnsortedCompaction(ManifestFileMeta file) { + return compactWithoutSort.containsKey(file); + } } /** Result of classifying manifest files. */ private static class ClassifyResult { final List lsmFiles; final Set deleteEntries; - final Map defaultCompactionMap; + /** + * Manifest files that need unsorted compaction. + * + *

Key: manifest file metadata + * + *

Value: true if fullCompaction is true and the file overlaps with delete partitions. It + * means the file needs to eliminate delete entries file + */ + final Map compactWithoutSort; ClassifyResult( List lsmFiles, Set deleteEntries, - Map defaultCompactionMap) { + Map compactWithoutSort) { this.lsmFiles = lsmFiles; this.deleteEntries = deleteEntries; - this.defaultCompactionMap = defaultCompactionMap; + this.compactWithoutSort = compactWithoutSort; } } @@ -159,7 +181,7 @@ static List trySortCompaction( /** * Full compaction path: totalDeltaFileSize >= sizeTrigger. * - *

Does not build index mapping. sortAndRewriteSection writes all entries (ADD+DELETE merged) + *

Does not build index mapping. rewriteSection writes all entries (ADD+DELETE merged) * together without separating them. */ private static Optional> tryFullCompaction( @@ -201,19 +223,19 @@ private static Optional> tryFullCompaction( List levelRuns = ctx.levelRuns; List pickedRuns = ctx.pickedRuns; - if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) { + if (pickedRuns.isEmpty() && ctx.compactWithoutSort.isEmpty()) { LOG.debug( - "Manifest sort full compact skipped: no runs picked and no defaultCompaction files."); + "Manifest sort full compact skipped: no runs picked and no compactWithoutSort files."); return Optional.empty(); } LOG.info( "Manifest sort full compact: input={} files, lsm={} runs, picked={} runs, " - + "defaultCompaction={} files.", + + "compactWithoutSort={} files.", input.size(), levelRuns.size(), pickedRuns.size(), - ctx.defaultCompactionMap.size()); + ctx.compactWithoutSort.size()); // Step 3: Collect reused files (not picked) and picked files Set pickedSet = new HashSet<>(pickedRuns); @@ -227,11 +249,10 @@ private static Optional> tryFullCompaction( for (ManifestAdjacentSortedRun run : pickedRuns) { pickedFiles.addAll(run.files()); } - pickedFiles.addAll(ctx.defaultCompactionMap.keySet()); + pickedFiles.addAll(ctx.compactWithoutSort.keySet()); // Step 4: Split into sections and merge small adjacent sections - List

sections = - splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap); + List
sections = splitIntoSections(pickedFiles, ctx); sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); LOG.info( @@ -262,8 +283,8 @@ private static Optional> tryFullCompaction( /** * Minor compaction path: totalDeltaFileSize < sizeTrigger. * - *

Builds index mapping to preserve original positions. sortAndRewriteSection separates ADD - * and DELETE entries, placing ADD at result[minIdx] and DELETE at result[maxIdx]. + *

Builds index mapping to preserve original positions. rewriteSection separates ADD and + * DELETE entries, placing ADD at result[minIdx] and DELETE at result[maxIdx]. */ private static List tryMinorCompaction( List input, @@ -293,19 +314,19 @@ private static List tryMinorCompaction( List levelRuns = ctx.levelRuns; List pickedRuns = ctx.pickedRuns; - if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) { + if (pickedRuns.isEmpty() && ctx.compactWithoutSort.isEmpty()) { LOG.debug( - "Manifest sort minor compact skipped: no runs picked and no defaultCompaction files."); + "Manifest sort minor compact skipped: no runs picked and no compactWithoutSort files."); return input; } LOG.info( "Manifest sort minor compact: input={} files, lsm={} runs, picked={} runs, " - + "defaultCompaction={} files.", + + "compactWithoutSort={} files.", input.size(), levelRuns.size(), pickedRuns.size(), - ctx.defaultCompactionMap.size()); + ctx.compactWithoutSort.size()); // Step 2: Build fileName -> index mapping and initialize 2D result Map fileNameToIndex = new HashMap<>(); @@ -332,7 +353,7 @@ private static List tryMinorCompaction( for (ManifestAdjacentSortedRun run : pickedRuns) { pickedFiles.addAll(run.files()); } - pickedFiles.addAll(ctx.defaultCompactionMap.keySet()); + pickedFiles.addAll(ctx.compactWithoutSort.keySet()); // Step 4: Compute index range int minIdx = Integer.MAX_VALUE; @@ -347,8 +368,7 @@ private static List tryMinorCompaction( Pair indexRange = Pair.of(minIdx, maxIdx); // Step 5: Split into sections and merge small adjacent sections - List

sections = - splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap); + List
sections = splitIntoSections(pickedFiles, ctx); sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); LOG.info( @@ -436,7 +456,7 @@ private static CompactionContext prepareCompaction( fullCompaction, fieldComparator, classifyResult.deleteEntries, - classifyResult.defaultCompactionMap, + classifyResult.compactWithoutSort, levelRuns, pickedRuns); } @@ -445,12 +465,12 @@ private static CompactionContext prepareCompaction( * Classify manifest files into default-compaction group and LSM group. * *

Full compaction: small files and files overlapping delete partitions go into - * defaultCompactionMap; the rest are returned as lsmFiles. + * compactWithoutSort; the rest are returned as lsmFiles. * - *

Non-full compaction: small files go to defaultCompactionMap for minor-style merge; the - * rest are returned as lsmFiles. + *

Non-full compaction: small files go to compactWithoutSort for minor-style merge; the rest + * are returned as lsmFiles. * - * @return ClassifyResult containing lsmFiles, deleteEntries, and defaultCompactionMap + * @return ClassifyResult containing lsmFiles, deleteEntries, and compactWithoutSort */ private static ClassifyResult classifyManifests( List input, @@ -460,7 +480,7 @@ private static ClassifyResult classifyManifests( long suggestedMetaSize, @Nullable Integer manifestReadParallelism) { // Initialize classification containers and read delete entries - Map classifiedDefaultMap = new LinkedHashMap<>(); + Map compactWithoutSort = new LinkedHashMap<>(); List lsmFiles = new LinkedList<>(input); Set classifiedDeleteEntries = Collections.emptySet(); PartitionPredicate predicate = null; @@ -496,11 +516,11 @@ private static ClassifyResult classifyManifests( file.partitionStats().nullCounts()); if (small || inDeleteRange) { iterator.remove(); - classifiedDefaultMap.put(file, inDeleteRange); + compactWithoutSort.put(file, inDeleteRange); } } - return new ClassifyResult(lsmFiles, classifiedDeleteEntries, classifiedDefaultMap); + return new ClassifyResult(lsmFiles, classifiedDeleteEntries, compactWithoutSort); } /** @@ -588,12 +608,11 @@ static List buildLevelSortedRuns( /** * Split picked files into sections. Files with overlapping sort-key intervals go into the same - * section. Each section is built with pre-computed totalSize and hasDefaultCompactMeta. + * section. Each section is built with pre-computed totalSize and hasUnsortedCompactMeta. */ static List

splitIntoSections( - List pickedFiles, - RecordComparator fieldComparator, - Map defaultCompactionMap) { + List pickedFiles, CompactionContext ctx) { + RecordComparator fieldComparator = ctx.fieldComparator; pickedFiles.sort( (a, b) -> { int cmp = @@ -607,13 +626,13 @@ static List
splitIntoSections( }); List
sections = new ArrayList<>(); - List currentFiles = new ArrayList<>(); - long currentTotalSize = 0; - boolean currentHasDefault = false; + List currentSectionFiles = new ArrayList<>(); + long currentSectionTotalSize = 0; ManifestFileMeta first = pickedFiles.get(0); - currentFiles.add(first); - currentTotalSize += first.fileSize(); - currentHasDefault = defaultCompactionMap.containsKey(first); + + currentSectionFiles.add(first); + currentSectionTotalSize += first.fileSize(); + boolean currentSectionHasUnsortedCompactMeta = ctx.isMarkedForUnsortedCompaction(first); BinaryRow sectionMaxBound = first.partitionStats().maxValues(); for (int i = 1; i < pickedFiles.size(); i++) { @@ -633,18 +652,24 @@ static List
splitIntoSections( // they may be placed in the same SortedRun during buildLevelSortedRuns (which uses >= 0 // comparison). This dual behavior is intentional and documented in class comments. if (fieldComparator.compare(file.partitionStats().minValues(), sectionMaxBound) >= 0) { - sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); - currentFiles = new ArrayList<>(); - currentTotalSize = 0; - currentFiles.add(file); - currentTotalSize += file.fileSize(); - currentHasDefault = defaultCompactionMap.containsKey(file); + sections.add( + new Section( + currentSectionFiles, + currentSectionTotalSize, + currentSectionHasUnsortedCompactMeta)); + // start a new section + currentSectionFiles = new ArrayList<>(); + currentSectionTotalSize = 0; + currentSectionFiles.add(file); + currentSectionTotalSize += file.fileSize(); + currentSectionHasUnsortedCompactMeta = ctx.isMarkedForUnsortedCompaction(file); sectionMaxBound = file.partitionStats().maxValues(); } else { - currentFiles.add(file); - currentTotalSize += file.fileSize(); - if (!currentHasDefault && defaultCompactionMap.containsKey(file)) { - currentHasDefault = true; + currentSectionFiles.add(file); + currentSectionTotalSize += file.fileSize(); + if (!currentSectionHasUnsortedCompactMeta + && ctx.isMarkedForUnsortedCompaction(file)) { + currentSectionHasUnsortedCompactMeta = true; } if (fieldComparator.compare(file.partitionStats().maxValues(), sectionMaxBound) > 0) { @@ -652,7 +677,11 @@ static List
splitIntoSections( } } } - sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); + sections.add( + new Section( + currentSectionFiles, + currentSectionTotalSize, + currentSectionHasUnsortedCompactMeta)); return sections; } @@ -695,14 +724,14 @@ private static List
mergeSmallAdjacentSections( *
  • First overflow: The current section is split. The rewritable part is sorted and * rewritten. The remaining part is appended back to the sections queue for later * processing. - *
  • Subsequent overflows: If the section has files in defaultCompactionMap (needs default - * compaction), rewriteSubSegments is called to process it in smaller chunks. Otherwise, - * the section is skipped. + *
  • Subsequent overflows: If the section has files in compactWithoutSort (needs unsorted + * compaction), unsortedCompactSection is called to process it in smaller chunks. + * Otherwise, the section is skipped. * * *

    This design ensures that the budget only limits the aggressive sort rewrite, while still * allowing necessary cleanup operations (delete entry elimination, small file merge) through - * the rewriteSubSegments fallback path. + * the unsortedCompactSection fallback path. */ private static void rewriteSections( List

    sections, @@ -715,13 +744,16 @@ private static void rewriteSections( long maxRewriteSize, @Nullable Integer manifestReadParallelism) throws Exception { - long processedSize = 0; - boolean reachedLimit = false; + // Total data size that has been sort-rewritten so far, used to enforce maxRewriteSize. + long currentRewrittenSize = 0; + boolean budgetExhausted = false; // Whether currentRewrittenSize reaches maxRewriteSize. for (int i = 0; i < sections.size(); i++) { Section section = sections.get(i); + + // A single-file section is always handled directly, regardless of the budget. if (section.files.size() == 1) { - sortAndRewriteSection( + rewriteSection( section.files, output, sortNewFiles, @@ -731,55 +763,45 @@ private static void rewriteSections( continue; } - if (processedSize + section.totalSize <= maxRewriteSize) { - processedSize += section.totalSize; - sortAndRewriteSection( - section.files, - output, - sortNewFiles, - ctx, - manifestFile, - manifestReadParallelism); - } else if (!reachedLimit) { - long rewriteTotalSize = maxRewriteSize - processedSize; - processedSize += section.totalSize; - List rewriteFiles = new ArrayList<>(); - List remainingFiles = new ArrayList<>(); - long rewriteSize = 0; - long remainingSize = 0; - boolean remainingHasDefault = false; - - for (ManifestFileMeta file : section.files) { - if (rewriteSize + file.fileSize() <= rewriteTotalSize) { - rewriteFiles.add(file); - rewriteSize += file.fileSize(); - } else { - remainingFiles.add(file); - remainingSize += file.fileSize(); - if (ctx.defaultCompactionMap.containsKey(file)) { - remainingHasDefault = true; - } + // Phase 1: budget not yet exhausted -- perform aggressive sort rewrite. + if (!budgetExhausted) { + // Phase 1a: section fits within the remaining budget -- sort and rewrite it + // wholly. + if (currentRewrittenSize + section.totalSize <= maxRewriteSize) { + currentRewrittenSize += section.totalSize; + rewriteSection( + section.files, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + } else { + // Phase 1b: first overflow -- split the section at the budget boundary, + // rewrite the affordable head, and append the remaining tail back for later + // (Phase 2) handling. + long remainingBudget = maxRewriteSize - currentRewrittenSize; + currentRewrittenSize += section.totalSize; + Section remaining = + splitSectionAndRewriteHead( + section, + remainingBudget, + output, + sortNewFiles, + ctx, + manifestFile, + manifestReadParallelism); + if (remaining != null) { + // global ManifestMeta section order by sort key is not a required invariant + sections.add(remaining); } + budgetExhausted = true; } - - sortAndRewriteSection( - rewriteFiles, - output, - sortNewFiles, - ctx, - manifestFile, - manifestReadParallelism); - - if (!remainingFiles.isEmpty()) { - Section remainingSection = - new Section(remainingFiles, remainingSize, remainingHasDefault); - // global manifest file metas order by sort key is not a required invariant - sections.add(remainingSection); - } - reachedLimit = true; - } else if (section.hasDefaultCompactMeta) { - rewriteSubSegments( - section.files, + } else { + // Phase 2: budget already exhausted -- only do unsorted compact, skip aggressive + // sort rewrite. + rewriteSectionBeyondBudget( + section, output, sortNewFiles, ctx, @@ -787,10 +809,81 @@ private static void rewriteSections( suggestedMetaSize, suggestedMinMetaCount, manifestReadParallelism); + } + } + } + + /** + * Split a section at the rewrite budget boundary: sort and rewrite the head part that fits + * within the remaining budget, and return the remaining tail as a new Section (or null if the + * whole section fits and no tail is left). + */ + private static Section splitSectionAndRewriteHead( + Section section, + long remainingBudget, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + @Nullable Integer manifestReadParallelism) + throws Exception { + List headFiles = new ArrayList<>(); + List tailFiles = new ArrayList<>(); + long headSize = 0; + long tailSize = 0; + // Whether tail section has files in compactWithoutSort, if true, the section need to + // be rewritten. + boolean tailHasUnsortedCompactMeta = false; + + for (ManifestFileMeta file : section.files) { + if (headSize + file.fileSize() <= remainingBudget) { + headFiles.add(file); + headSize += file.fileSize(); } else { - output.addAllUnchanged(section.files); + tailFiles.add(file); + tailSize += file.fileSize(); + if (ctx.isMarkedForUnsortedCompaction(file)) { + tailHasUnsortedCompactMeta = true; + } } } + + rewriteSection(headFiles, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); + + if (tailFiles.isEmpty()) { + return null; + } + return new Section(tailFiles, tailSize, tailHasUnsortedCompactMeta); + } + + /** + * Handle a section after the sort rewrite budget is exhausted. Sections that contain + * default-compaction files (small files / delete entries) still go through + * unsortedCompactSection for necessary cleanup; otherwise they are kept unchanged. + */ + private static void rewriteSectionBeyondBudget( + Section section, + RewriteOutput output, + List sortNewFiles, + CompactionContext ctx, + ManifestFile manifestFile, + long suggestedMetaSize, + int suggestedMinMetaCount, + @Nullable Integer manifestReadParallelism) + throws Exception { + if (section.hasUnsortedCompactMeta) { + unsortedCompactSection( + section.files, + output, + sortNewFiles, + ctx, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + manifestReadParallelism); + } else { + output.addAllUnchanged(section.files); + } } /** @@ -798,8 +891,8 @@ private static void rewriteSections( * *

    Semantics difference from old minor merge: In the old ManifestFileMerger path, the * trailing candidates are kept unchanged when their count is below manifest.merge-min-count. In - * this sort path, rewriteSubSegments is triggered when defaultCompactionMap is non-empty, - * regardless of the manifest count. This is because files in defaultCompactionMap either: + * this sort path, unsortedCompactSection is triggered when compactWithoutSort is non-empty, + * regardless of the manifest count. This is because files in compactWithoutSort either: * *

      *
    • Are small files needing consolidation @@ -810,7 +903,7 @@ private static void rewriteSections( * acting as a conservative gate to avoid unnecessary rewrite when there are no delete entries * and the tail is too small. */ - private static void rewriteSubSegments( + private static void unsortedCompactSection( List section, RewriteOutput output, List sortNewFiles, @@ -820,47 +913,47 @@ private static void rewriteSubSegments( int suggestedMinMetaCount, @Nullable Integer manifestReadParallelism) throws Exception { - List subSegment = new ArrayList<>(); - long subSegmentSize = 0; + List candidates = new ArrayList<>(); + long candidatesSize = 0; for (ManifestFileMeta m : section) { - subSegmentSize += m.fileSize(); - subSegment.add(m); + candidatesSize += m.fileSize(); + candidates.add(m); - if (subSegmentSize >= suggestedMetaSize) { - sortAndRewriteSection( - subSegment, + if (candidatesSize >= suggestedMetaSize) { + rewriteSection( + candidates, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); - subSegment.clear(); - subSegmentSize = 0; + candidates.clear(); + candidatesSize = 0; } } // Flush tail only if delete entries exist or file count >= minCount. - if (!subSegment.isEmpty()) { - if (!ctx.deleteEntries.isEmpty() || subSegment.size() >= suggestedMinMetaCount) { - sortAndRewriteSection( - subSegment, + if (!candidates.isEmpty()) { + if (!ctx.deleteEntries.isEmpty() || candidates.size() >= suggestedMinMetaCount) { + rewriteSection( + candidates, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); } else { - output.addAllUnchanged(subSegment); + output.addAllUnchanged(candidates); } } } /** - * Sort and rewrite a section. Dispatches to full or minor compact path. + * Rewrite a section. Dispatches to full or minor compact path. * *

      sortNewFiles is the same reference as newFilesForAbort, ensuring newly written files are * cleaned up on exception by the caller's catch block. */ - private static void sortAndRewriteSection( + private static void rewriteSection( List section, RewriteOutput output, List sortNewFiles, @@ -869,17 +962,15 @@ private static void sortAndRewriteSection( @Nullable Integer manifestReadParallelism) throws Exception { // Skip rewrite for single file not in delete-range. - if (section.size() == 1 && !ctx.defaultCompactionMap.getOrDefault(section.get(0), false)) { + if (section.size() == 1 && !ctx.compactWithoutSort.getOrDefault(section.get(0), false)) { output.addUnchanged(section.get(0)); return; } if (ctx.fullCompaction) { - sortAndRewriteFull( - section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); + rewriteFull(section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); } else { - sortAndRewriteMinor( - section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); + rewriteMinor(section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism); } } @@ -887,7 +978,7 @@ private static void sortAndRewriteSection( * Full compaction path: read all surviving entries (ADD merged with DELETE), sort them * together, and write to output as a single sorted stream. */ - private static void sortAndRewriteFull( + private static void rewriteFull( List section, RewriteOutput output, List sortNewFiles, @@ -934,7 +1025,7 @@ private static void sortAndRewriteFull( * entries into ADD and DELETE within each file, returning a Pair. Results are merged in the * main thread. */ - private static void sortAndRewriteMinor( + private static void rewriteMinor( List section, RewriteOutput output, List sortNewFiles, @@ -1128,12 +1219,12 @@ public void addDeleteFiles(List files) { static class Section { final List files; final long totalSize; - final boolean hasDefaultCompactMeta; + final boolean hasUnsortedCompactMeta; - Section(List files, long totalSize, boolean hasDefaultCompactMeta) { + Section(List files, long totalSize, boolean hasUnsortedCompactMeta) { this.files = files; this.totalSize = totalSize; - this.hasDefaultCompactMeta = hasDefaultCompactMeta; + this.hasUnsortedCompactMeta = hasUnsortedCompactMeta; } /** Create a merged section from two sections. */ @@ -1143,7 +1234,7 @@ static Section merge(Section a, Section b) { return new Section( merged, a.totalSize + b.totalSize, - a.hasDefaultCompactMeta || b.hasDefaultCompactMeta); + a.hasUnsortedCompactMeta || b.hasUnsortedCompactMeta); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 75a1ab0a84df..2b91824d9afd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -964,7 +964,7 @@ public void testManifestSortWithOverlappingPartitions() { * Test that sort rewrite correctly eliminates DELETE entries and their corresponding ADD * entries. The key condition is that totalDeltaFileSize must reach manifestFullCompactionSize * to trigger the full compaction path inside trySortRewrite, which reads deleteEntries and - * passes them to sortAndRewriteSection for elimination. + * passes them to rewriteSection for elimination. * *

      Design: *