Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions paimon-api/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class Snapshot implements Serializable {
protected static final String FIELD_STATISTICS = "statistics";
protected static final String FIELD_PROPERTIES = "properties";
protected static final String FIELD_NEXT_ROW_ID = "nextRowId";
protected static final String FIELD_TOTAL_FILE_SIZE = "totalFileSize";
protected static final String FIELD_TOTAL_DATA_FILES = "totalDataFiles";

// version of snapshot
@JsonProperty(FIELD_VERSION)
Expand Down Expand Up @@ -181,6 +183,16 @@ public class Snapshot implements Serializable {
@Nullable
protected final Long nextRowId;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_TOTAL_FILE_SIZE)
@Nullable
protected final Long totalFileSize;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_TOTAL_DATA_FILES)
@Nullable
protected final Long totalDataFiles;

public Snapshot(
long id,
long schemaId,
Expand All @@ -201,7 +213,9 @@ public Snapshot(
@Nullable Long watermark,
@Nullable String statistics,
@Nullable Map<String, String> properties,
@Nullable Long nextRowId) {
@Nullable Long nextRowId,
@Nullable Long totalFileSize,
@Nullable Long totalDataFiles) {
this(
CURRENT_VERSION,
id,
Expand All @@ -223,7 +237,9 @@ public Snapshot(
watermark,
statistics,
properties,
nextRowId);
nextRowId,
totalFileSize,
totalDataFiles);
}

@JsonCreator
Expand All @@ -249,7 +265,9 @@ public Snapshot(
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties,
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize,
@JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
Expand All @@ -271,6 +289,8 @@ public Snapshot(
this.statistics = statistics;
this.properties = properties;
this.nextRowId = nextRowId;
this.totalFileSize = totalFileSize;
this.totalDataFiles = totalDataFiles;
}

@JsonGetter(FIELD_VERSION)
Expand Down Expand Up @@ -388,6 +408,18 @@ public Long nextRowId() {
return nextRowId;
}

@JsonGetter(FIELD_TOTAL_FILE_SIZE)
@Nullable
public Long totalFileSize() {
return totalFileSize;
}

@JsonGetter(FIELD_TOTAL_DATA_FILES)
@Nullable
public Long totalDataFiles() {
return totalDataFiles;
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand Down Expand Up @@ -415,7 +447,9 @@ public int hashCode() {
watermark,
statistics,
properties,
nextRowId);
nextRowId,
totalFileSize,
totalDataFiles);
}

@Override
Expand Down Expand Up @@ -447,7 +481,9 @@ public boolean equals(Object o) {
&& Objects.equals(watermark, that.watermark)
&& Objects.equals(statistics, that.statistics)
&& Objects.equals(properties, that.properties)
&& Objects.equals(nextRowId, that.nextRowId);
&& Objects.equals(nextRowId, that.nextRowId)
&& Objects.equals(totalFileSize, that.totalFileSize)
&& Objects.equals(totalDataFiles, that.totalDataFiles);
}

/** Type of changes in this snapshot. */
Expand Down
12 changes: 9 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public Changelog(Snapshot snapshot) {
snapshot.watermark(),
snapshot.statistics(),
snapshot.properties,
snapshot.nextRowId);
snapshot.nextRowId,
snapshot.totalFileSize(),
snapshot.totalDataFiles());
}

@JsonCreator
Expand All @@ -89,7 +91,9 @@ public Changelog(
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId) {
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize,
@JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles) {
super(
version,
id,
Expand All @@ -111,7 +115,9 @@ public Changelog(
watermark,
statistics,
properties,
nextRowId);
nextRowId,
totalFileSize,
totalDataFiles);
}

public static Changelog fromJson(String json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,9 +948,15 @@ CommitResult tryCommitOnce(
long nextRowIdStart = firstRowIdStart;
try {
long previousTotalRecordCount = 0L;
long previousTotalFileSize = 0L;
long previousTotalDataFiles = 0L;
Long currentWatermark = watermark;
if (latestSnapshot != null) {
previousTotalRecordCount = latestSnapshot.totalRecordCount();
Long previousFileSize = latestSnapshot.totalFileSize();
previousTotalFileSize = previousFileSize == null ? 0L : previousFileSize;
Long previousDataFiles = latestSnapshot.totalDataFiles();
previousTotalDataFiles = previousDataFiles == null ? 0L : previousDataFiles;
// read all previous manifest files
mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
Long latestWatermark = latestSnapshot.watermark();
Expand Down Expand Up @@ -989,6 +995,16 @@ CommitResult tryCommitOnce(
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

long deltaFileSize = 0L;
long deltaDataFiles = 0L;
for (ManifestEntry entry : deltaFiles) {
int sign = FileKind.ADD.equals(entry.kind()) ? 1 : -1;
deltaFileSize += sign * entry.file().fileSize();
deltaDataFiles += sign;
}
long totalFileSize = previousTotalFileSize + deltaFileSize;
long totalDataFiles = previousTotalDataFiles + deltaDataFiles;

// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
Expand Down Expand Up @@ -1044,7 +1060,9 @@ CommitResult tryCommitOnce(
statsFileName,
// if empty properties, just set to null
properties.isEmpty() ? null : properties,
nextRowIdStart);
nextRowIdStart,
totalFileSize,
totalDataFiles);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
commitCleaner.cleanUpReuseTmpManifests(
Expand Down Expand Up @@ -1148,7 +1166,9 @@ public boolean replaceManifestList(
latest.statistics(),
// if empty properties, just set to null
latest.properties(),
nextRowId);
nextRowId,
latest.totalFileSize(),
latest.totalDataFiles());

return commitSnapshotImpl(newSnapshot, emptyList());
}
Expand Down Expand Up @@ -1227,7 +1247,9 @@ private boolean compactManifestOnce() {
latestSnapshot.watermark(),
latestSnapshot.statistics(),
latestSnapshot.properties(),
latestSnapshot.nextRowId());
latestSnapshot.nextRowId(),
latestSnapshot.totalFileSize(),
latestSnapshot.totalDataFiles());

return commitSnapshotImpl(newSnapshot, emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public class SnapshotsTable implements ReadonlyTable {
new DataField(10, "delta_record_count", new BigIntType(true)),
new DataField(11, "changelog_record_count", new BigIntType(true)),
new DataField(12, "watermark", new BigIntType(true)),
new DataField(13, "next_row_id", new BigIntType(true))));
new DataField(13, "next_row_id", new BigIntType(true)),
new DataField(14, "total_file_size", new BigIntType(true)),
new DataField(15, "total_data_files", new BigIntType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -339,7 +341,9 @@ private InternalRow toRow(Snapshot snapshot) {
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.nextRowId());
snapshot.nextRowId(),
snapshot.totalFileSize(),
snapshot.totalDataFiles());
}
}
}
12 changes: 10 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public Tag(
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties,
@JsonProperty(FIELD_NEXT_ROW_ID) @Nullable Long nextRowId,
@JsonProperty(FIELD_TOTAL_FILE_SIZE) @Nullable Long totalFileSize,
@JsonProperty(FIELD_TOTAL_DATA_FILES) @Nullable Long totalDataFiles,
@JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime,
@JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) {
super(
Expand All @@ -103,7 +105,9 @@ public Tag(
watermark,
statistics,
properties,
nextRowId);
nextRowId,
totalFileSize,
totalDataFiles);
this.tagCreateTime = tagCreateTime;
this.tagTimeRetained = tagTimeRetained;
}
Expand Down Expand Up @@ -142,6 +146,8 @@ public static Tag fromSnapshotAndTagTtl(
snapshot.statistics(),
snapshot.properties(),
snapshot.nextRowId(),
snapshot.totalFileSize(),
snapshot.totalDataFiles(),
tagCreateTime,
tagTimeRetained);
}
Expand All @@ -168,7 +174,9 @@ public Snapshot trimToSnapshot() {
watermark,
statistics,
properties,
nextRowId);
nextRowId,
totalFileSize,
totalDataFiles);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ private static Snapshot createSnapshot(long id) throws IOException {
null,
null,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ private List<InternalRow> getExpectedResult(long[] snapshotIds) {
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.nextRowId()));
snapshot.nextRowId(),
snapshot.totalFileSize(),
snapshot.totalDataFiles()));
}

return expectedRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ public void testExpireTagsByTimeRetained() throws Exception {
null,
null,
null,
null,
null,
null);
tagManager.createTag(
snapshot1,
Expand Down Expand Up @@ -439,6 +441,8 @@ public void testExpireTagsByTimeRetained() throws Exception {
null,
null,
null,
null,
null,
null);
tagManager.createTag(
snapshot2,
Expand Down
2 changes: 2 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class TagTest {
null,
null,
null,
null,
null,
null);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ public static Snapshot createSnapshotWithMillis(long id, long millis) {
null,
null,
null,
null,
null,
null);
}

Expand All @@ -300,6 +302,8 @@ private Snapshot createSnapshotWithMillis(long id, long millis, long watermark)
watermark,
null,
null,
null,
null,
null);
}

Expand All @@ -325,6 +329,8 @@ private Changelog createChangelogWithMillis(long id, long millis) {
null,
null,
null,
null,
null,
null));
}

Expand Down Expand Up @@ -356,6 +362,8 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException
null,
null,
null,
null,
null,
null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
Expand Down Expand Up @@ -409,6 +417,8 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru
null,
null,
null,
null,
null,
null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
Expand Down