From 96bde4d3c961e375badaed3d750caf171db98a7c Mon Sep 17 00:00:00 2001 From: Arnav Balyan Date: Thu, 4 Jun 2026 11:44:17 +0530 Subject: [PATCH] update --- .../main/java/org/apache/paimon/Snapshot.java | 46 +++++++++++++++++-- .../java/org/apache/paimon/Changelog.java | 12 +++-- .../paimon/operation/FileStoreCommitImpl.java | 28 +++++++++-- .../paimon/table/system/SnapshotsTable.java | 8 +++- .../main/java/org/apache/paimon/tag/Tag.java | 12 ++++- .../catalog/RenamingSnapshotCommitTest.java | 2 + .../table/system/SnapshotsTableTest.java | 4 +- .../apache/paimon/tag/TagAutoManagerTest.java | 4 ++ .../java/org/apache/paimon/tag/TagTest.java | 2 + .../paimon/utils/SnapshotManagerTest.java | 10 ++++ 10 files changed, 112 insertions(+), 16 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index 93f525a7e3fb..b660f692b18d 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -69,6 +69,8 @@ public class Snapshot implements Serializable { protected static final String FIELD_STATISTICS = "statistics"; protected static final String FIELD_PROPERTIES = "properties"; protected static final String FIELD_NEXT_ROW_ID = "nextRowId"; + protected static final String FIELD_TOTAL_FILE_SIZE = "totalFileSize"; + protected static final String FIELD_TOTAL_DATA_FILES = "totalDataFiles"; // version of snapshot @JsonProperty(FIELD_VERSION) @@ -181,6 +183,16 @@ public class Snapshot implements Serializable { @Nullable protected final Long nextRowId; + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TOTAL_FILE_SIZE) + @Nullable + protected final Long totalFileSize; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TOTAL_DATA_FILES) + @Nullable + protected final Long totalDataFiles; + public Snapshot( long id, long schemaId, @@ -201,7 +213,9 @@ public Snapshot( @Nullable Long watermark, @Nullable String statistics, @Nullable Map properties, - @Nullable Long nextRowId) { + @Nullable Long nextRowId, + @Nullable Long totalFileSize, + @Nullable Long totalDataFiles) { this( CURRENT_VERSION, id, @@ -223,7 +237,9 @@ public Snapshot( watermark, statistics, properties, - nextRowId); + nextRowId, + totalFileSize, + totalDataFiles); } @JsonCreator @@ -249,7 +265,9 @@ public Snapshot( @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) @Nullable Map properties, - @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) { + @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize, + @JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles) { this.version = version; this.id = id; this.schemaId = schemaId; @@ -271,6 +289,8 @@ public Snapshot( this.statistics = statistics; this.properties = properties; this.nextRowId = nextRowId; + this.totalFileSize = totalFileSize; + this.totalDataFiles = totalDataFiles; } @JsonGetter(FIELD_VERSION) @@ -388,6 +408,18 @@ public Long nextRowId() { return nextRowId; } + @JsonGetter(FIELD_TOTAL_FILE_SIZE) + @Nullable + public Long totalFileSize() { + return totalFileSize; + } + + @JsonGetter(FIELD_TOTAL_DATA_FILES) + @Nullable + public Long totalDataFiles() { + return totalDataFiles; + } + public String toJson() { return JsonSerdeUtil.toJson(this); } @@ -415,7 +447,9 @@ public int hashCode() { watermark, statistics, properties, - nextRowId); + nextRowId, + totalFileSize, + totalDataFiles); } @Override @@ -447,7 +481,9 @@ public boolean equals(Object o) { && Objects.equals(watermark, that.watermark) && Objects.equals(statistics, that.statistics) && Objects.equals(properties, that.properties) - && Objects.equals(nextRowId, that.nextRowId); + && Objects.equals(nextRowId, that.nextRowId) + && Objects.equals(totalFileSize, that.totalFileSize) + && Objects.equals(totalDataFiles, that.totalDataFiles); } /** Type of changes in this snapshot. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 0ab3429dfc1f..8857e1501042 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -63,7 +63,9 @@ public Changelog(Snapshot snapshot) { snapshot.watermark(), snapshot.statistics(), snapshot.properties, - snapshot.nextRowId); + snapshot.nextRowId, + snapshot.totalFileSize(), + snapshot.totalDataFiles()); } @JsonCreator @@ -89,7 +91,9 @@ public Changelog( @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) Map properties, - @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) { + @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize, + @JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles) { super( version, id, @@ -111,7 +115,9 @@ public Changelog( watermark, statistics, properties, - nextRowId); + nextRowId, + totalFileSize, + totalDataFiles); } public static Changelog fromJson(String json) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 10c9b20a0467..32d86a663c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -948,9 +948,15 @@ CommitResult tryCommitOnce( long nextRowIdStart = firstRowIdStart; try { long previousTotalRecordCount = 0L; + long previousTotalFileSize = 0L; + long previousTotalDataFiles = 0L; Long currentWatermark = watermark; if (latestSnapshot != null) { previousTotalRecordCount = latestSnapshot.totalRecordCount(); + Long previousFileSize = latestSnapshot.totalFileSize(); + previousTotalFileSize = previousFileSize == null ? 0L : previousFileSize; + Long previousDataFiles = latestSnapshot.totalDataFiles(); + previousTotalDataFiles = previousDataFiles == null ? 0L : previousDataFiles; // read all previous manifest files mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot); Long latestWatermark = latestSnapshot.watermark(); @@ -989,6 +995,16 @@ CommitResult tryCommitOnce( long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); long totalRecordCount = previousTotalRecordCount + deltaRecordCount; + long deltaFileSize = 0L; + long deltaDataFiles = 0L; + for (ManifestEntry entry : deltaFiles) { + int sign = FileKind.ADD.equals(entry.kind()) ? 1 : -1; + deltaFileSize += sign * entry.file().fileSize(); + deltaDataFiles += sign; + } + long totalFileSize = previousTotalFileSize + deltaFileSize; + long totalDataFiles = previousTotalDataFiles + deltaDataFiles; + // write new delta files into manifest files deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles)); deltaManifestList = manifestList.write(manifestFile.write(deltaFiles)); @@ -1044,7 +1060,9 @@ CommitResult tryCommitOnce( statsFileName, // if empty properties, just set to null properties.isEmpty() ? null : properties, - nextRowIdStart); + nextRowIdStart, + totalFileSize, + totalDataFiles); } catch (Throwable e) { // fails when preparing for commit, we should clean up commitCleaner.cleanUpReuseTmpManifests( @@ -1148,7 +1166,9 @@ public boolean replaceManifestList( latest.statistics(), // if empty properties, just set to null latest.properties(), - nextRowId); + nextRowId, + latest.totalFileSize(), + latest.totalDataFiles()); return commitSnapshotImpl(newSnapshot, emptyList()); } @@ -1227,7 +1247,9 @@ private boolean compactManifestOnce() { latestSnapshot.watermark(), latestSnapshot.statistics(), latestSnapshot.properties(), - latestSnapshot.nextRowId()); + latestSnapshot.nextRowId(), + latestSnapshot.totalFileSize(), + latestSnapshot.totalDataFiles()); return commitSnapshotImpl(newSnapshot, emptyList()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 394dc4d58a83..1e691e330aa9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -108,7 +108,9 @@ public class SnapshotsTable implements ReadonlyTable { new DataField(10, "delta_record_count", new BigIntType(true)), new DataField(11, "changelog_record_count", new BigIntType(true)), new DataField(12, "watermark", new BigIntType(true)), - new DataField(13, "next_row_id", new BigIntType(true)))); + new DataField(13, "next_row_id", new BigIntType(true)), + new DataField(14, "total_file_size", new BigIntType(true)), + new DataField(15, "total_data_files", new BigIntType(true)))); private final FileIO fileIO; private final Path location; @@ -339,7 +341,9 @@ private InternalRow toRow(Snapshot snapshot) { snapshot.deltaRecordCount(), snapshot.changelogRecordCount(), snapshot.watermark(), - snapshot.nextRowId()); + snapshot.nextRowId(), + snapshot.totalFileSize(), + snapshot.totalDataFiles()); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index 192df2d4705a..4af697ec99e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -80,6 +80,8 @@ public Tag( @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, @JsonProperty(FIELD_PROPERTIES) Map properties, @JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId, + @JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize, + @JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles, @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { super( @@ -103,7 +105,9 @@ public Tag( watermark, statistics, properties, - nextRowId); + nextRowId, + totalFileSize, + totalDataFiles); this.tagCreateTime = tagCreateTime; this.tagTimeRetained = tagTimeRetained; } @@ -142,6 +146,8 @@ public static Tag fromSnapshotAndTagTtl( snapshot.statistics(), snapshot.properties(), snapshot.nextRowId(), + snapshot.totalFileSize(), + snapshot.totalDataFiles(), tagCreateTime, tagTimeRetained); } @@ -168,7 +174,9 @@ public Snapshot trimToSnapshot() { watermark, statistics, properties, - nextRowId); + nextRowId, + totalFileSize, + totalDataFiles); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java index 6e8ae6f36a47..c3133c412ec6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java @@ -137,6 +137,8 @@ private static Snapshot createSnapshot(long id) throws IOException { null, null, null, + null, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java index 7a3bbf961c5c..f12fbb4fb8ce 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java @@ -118,7 +118,9 @@ private List getExpectedResult(long[] snapshotIds) { snapshot.deltaRecordCount(), snapshot.changelogRecordCount(), snapshot.watermark(), - snapshot.nextRowId())); + snapshot.nextRowId(), + snapshot.totalFileSize(), + snapshot.totalDataFiles())); } return expectedRow; diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index ef0e1627ee26..ea92c673d7dc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -410,6 +410,8 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, + null, null); tagManager.createTag( snapshot1, @@ -439,6 +441,8 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, + null, null); tagManager.createTag( snapshot2, diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java index d7dbeacb7f57..c6ddb19adffc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -51,6 +51,8 @@ public class TagTest { null, null, null, + null, + null, null); @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 486f0b3fe552..b6b2bb80442a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -276,6 +276,8 @@ public static Snapshot createSnapshotWithMillis(long id, long millis) { null, null, null, + null, + null, null); } @@ -300,6 +302,8 @@ private Snapshot createSnapshotWithMillis(long id, long millis, long watermark) watermark, null, null, + null, + null, null); } @@ -325,6 +329,8 @@ private Changelog createChangelogWithMillis(long id, long millis) { null, null, null, + null, + null, null)); } @@ -356,6 +362,8 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException null, null, null, + null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } @@ -409,6 +417,8 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru null, null, null, + null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); }