diff --git a/docs/content/primary-key-table/pk-clustering-override.md b/docs/content/primary-key-table/pk-clustering-override.md index 3348bc0482bb..91b3fa14e884 100644 --- a/docs/content/primary-key-table/pk-clustering-override.md +++ b/docs/content/primary-key-table/pk-clustering-override.md @@ -50,6 +50,23 @@ CREATE TABLE my_table ( ); ``` +For `first-row` merge engine, deletion vectors are already built-in, so you don't need to enable them explicitly: + +```sql +CREATE TABLE my_table ( + id BIGINT, + dt STRING, + city STRING, + amount DOUBLE, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'pk-clustering-override' = 'true', + 'clustering.columns' = 'city', + 'merge-engine' = 'first-row', + 'bucket' = '4' +); +``` + After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like `SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics on the clustering column. @@ -60,7 +77,7 @@ on the clustering column. |--------|-------------| | `pk-clustering-override` | `true` | | `clustering.columns` | Must be set (one or more non-primary-key columns) | -| `deletion-vectors.enabled` | Must be `true` | +| `deletion-vectors.enabled` | Must be `true` (not required for `first-row` merge engine) | | `merge-engine` | `deduplicate` (default) or `first-row` only | ## When to Use diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 1e9b02b32b7a..9aac4ba08ebb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -43,8 +43,10 @@ import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.stream.IntStream; @@ -204,10 +206,14 @@ private CompactResult compact(boolean fullCompaction) throws Exception { // Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2 List existingSortedFiles = fileLevels.sortedFiles(); for (DataFileMeta file : unsortedFiles) { + Set originalFileNames = Collections.singleton(file.fileName()); List sortedFiles = fileRewriter.sortAndRewriteFiles( - singletonList(file), kvSerializer, kvSchemaType); - keyIndex.updateIndex(file, sortedFiles); + singletonList(file), + kvSerializer, + kvSchemaType, + keyIndex, + originalFileNames); result.before().add(file); result.after().addAll(sortedFiles); } @@ -232,19 +238,23 @@ private CompactResult compact(boolean fullCompaction) throws Exception { keyIndex.rebuildIndex(newFile); } // Remove stale deletion vectors for merged-away files - for (DataFileMeta file : mergeGroup) { - dvMaintainer.removeDeletionVectorOf(file.fileName()); + if (dvMaintainer != null) { + for (DataFileMeta file : mergeGroup) { + dvMaintainer.removeDeletionVectorOf(file.fileName()); + } } result.before().addAll(mergeGroup); result.after().addAll(mergedFiles); } } - CompactDeletionFile deletionFile = - lazyGenDeletionFile - ? CompactDeletionFile.lazyGeneration(dvMaintainer) - : CompactDeletionFile.generateFiles(dvMaintainer); - result.setDeletionFile(deletionFile); + if (dvMaintainer != null) { + CompactDeletionFile deletionFile = + lazyGenDeletionFile + ? CompactDeletionFile.lazyGeneration(dvMaintainer) + : CompactDeletionFile.generateFiles(dvMaintainer); + result.setDeletionFile(deletionFile); + } return result; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java index ece48137645d..3689e704833a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.disk.ChannelReaderInputView; import org.apache.paimon.disk.ChannelReaderInputViewIterator; import org.apache.paimon.disk.ChannelWithMeta; @@ -47,6 +48,8 @@ import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; import org.apache.paimon.utils.MutableObjectIterator; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -54,6 +57,7 @@ import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import java.util.Set; /** * Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and @@ -112,10 +116,19 @@ public ClusteringFileRewriter( /** * Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them - * using an external sort buffer, and writes to new level-1 files. + * using an external sort buffer, and writes to new level-1 files. Checks the key index inline + * during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old + * positions in DV) and updates the index without re-reading the output files. + * + * @param keyIndex the key index for inline checking and batch update, or null to skip + * @param originalFileNames file names of the original files being replaced (for index check) */ public List sortAndRewriteFiles( - List inputFiles, KeyValueSerializer kvSerializer, RowType kvSchemaType) + List inputFiles, + KeyValueSerializer kvSerializer, + RowType kvSchemaType, + @Nullable ClusteringKeyIndex keyIndex, + Set originalFileNames) throws Exception { int[] sortFieldsInKeyValue = Arrays.stream(clusteringColumns) @@ -145,6 +158,10 @@ public List sortAndRewriteFiles( } } + RowCompactedSerializer keySerializer = + keyIndex != null ? new RowCompactedSerializer(keyType) : null; + List collectedKeys = keyIndex != null ? new ArrayList<>() : null; + RollingFileWriter writer = writerFactory.createRollingClusteringFileWriter(); try { @@ -152,10 +169,18 @@ public List sortAndRewriteFiles( BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount()); while ((binaryRow = sortedIterator.next(binaryRow)) != null) { KeyValue kv = kvSerializer.fromRow(binaryRow); - writer.write( + KeyValue copied = kv.copy( new InternalRowSerializer(keyType), - new InternalRowSerializer(valueType))); + new InternalRowSerializer(valueType)); + if (keyIndex != null) { + byte[] keyBytes = keySerializer.serializeToBytes(copied.key()); + if (!keyIndex.checkKey(keyBytes, originalFileNames)) { + continue; + } + collectedKeys.add(keyBytes); + } + writer.write(copied); } } finally { sortBuffer.clear(); @@ -170,6 +195,16 @@ public List sortAndRewriteFiles( fileLevels.addNewFile(newFile); } + // Batch update index using collected keys, split by file rowCount + if (keyIndex != null) { + int offset = 0; + for (DataFileMeta newFile : newFiles) { + int count = (int) newFile.rowCount(); + keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count)); + offset += count; + } + } + return newFiles; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java index d7234345f67c..2b4e41272759 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java @@ -45,8 +45,6 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -209,61 +207,47 @@ public Map.Entry next() { } /** - * Update the key index after a single original file is replaced by new sorted files. + * Check a key against the index during sort-and-rewrite writing. * - *

For DEDUPLICATE mode: mark the old position in deletion vectors, keep the new position. + *

For FIRST_ROW mode: if key exists pointing to a non-original file, return false (skip + * writing this record — it's a duplicate). * - *

For FIRST_ROW mode: if key exists, mark the new position in deletion vectors (keep the - * first/old one); if key is new, store the new position. + *

For DEDUPLICATE mode: if key exists pointing to a non-original file, mark the old position + * in deletion vectors, return true (write the new record). + * + * @param keyBytes serialized key bytes + * @param originalFileNames file names of the original unsorted files being replaced + * @return true if the record should be written, false to skip (FIRST_ROW dedup) */ - public void updateIndex(DataFileMeta originalFile, List newSortedFiles) - throws Exception { - updateIndex(Collections.singletonList(originalFile), newSortedFiles); + public boolean checkKey(byte[] keyBytes, Set originalFileNames) throws Exception { + byte[] oldValue = kvDb.get(keyBytes); + if (oldValue != null) { + ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue); + int oldFileId = decodeInt(valueIn); + int oldPosition = decodeInt(valueIn); + DataFileMeta oldFile = fileLevels.getFileById(oldFileId); + if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) { + if (firstRow) { + return false; + } else { + dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition); + } + } + } + return true; } /** - * Update the key index after multiple original files are replaced by new sorted files. - * - * @see #updateIndex(DataFileMeta, List) + * Batch update the key index for a new sorted file using pre-collected key bytes. Avoids + * re-reading the file. */ - public void updateIndex(List originalFiles, List newSortedFiles) - throws Exception { - RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); - - Set originalFileNames = new HashSet<>(); - for (DataFileMeta file : originalFiles) { - originalFileNames.add(file.fileName()); - } - - for (DataFileMeta sortedFile : newSortedFiles) { - int fileId = fileLevels.getFileIdByName(sortedFile.fileName()); - int position = 0; - try (CloseableIterator iterator = readKeyIterator(sortedFile)) { - while (iterator.hasNext()) { - byte[] key = keySerializer.serializeToBytes(iterator.next()); - byte[] oldValue = kvDb.get(key); - if (oldValue != null) { - ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue); - int oldFileId = decodeInt(valueIn); - int oldPosition = decodeInt(valueIn); - DataFileMeta oldFile = fileLevels.getFileById(oldFileId); - if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) { - if (firstRow) { - dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position); - position++; - continue; - } else { - dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition); - } - } - } - ByteArrayOutputStream value = new ByteArrayOutputStream(8); - encodeInt(value, fileId); - encodeInt(value, position); - kvDb.put(key, value.toByteArray()); - position++; - } - } + public void batchPutIndex(DataFileMeta sortedFile, List keyBytesList) throws Exception { + int fileId = fileLevels.getFileIdByName(sortedFile.fileName()); + for (int position = 0; position < keyBytesList.size(); position++) { + ByteArrayOutputStream value = new ByteArrayOutputStream(8); + encodeInt(value, fileId); + encodeInt(value, position); + kvDb.put(keyBytesList.get(position), value.toByteArray()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 2ff1080c4a84..271709c47ef5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -522,7 +522,7 @@ private static void validateForDeletionVectors(CoreOptions options) { || options.changelogProducer() == ChangelogProducer.LOOKUP, "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now."); - // pk-clustering-override mode requires deletion vectors even for first-row + // pk-clustering-override mode allows deletion vectors for first-row if (!options.pkClusteringOverride()) { checkArgument( !options.mergeEngine().equals(MergeEngine.FIRST_ROW), @@ -847,7 +847,8 @@ public static void validatePkClusteringOverride(CoreOptions options) { throw new IllegalArgumentException( "Cannot support 'pk-clustering-override' mode without 'clustering.columns'."); } - if (!options.deletionVectorsEnabled()) { + if (!options.deletionVectorsEnabled() + && options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) { throw new UnsupportedOperationException( "Cannot support deletion-vectors disabled in 'pk-clustering-override' mode."); } diff --git a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java index e9ddc9e000eb..dc1aa5febee9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -519,6 +520,22 @@ public void testFirstRowBasic() throws Exception { .containsExactlyInAnyOrder(GenericRow.of(1, 100), GenericRow.of(2, 200)); } + /** Test first-row mode without explicit deletion-vectors enabled. */ + @Test + public void testFirstRowWithoutDeletionVectors() throws Exception { + Table firstRowTable = createFirstRowTableWithoutDv(); + + // Write initial data + writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 100), GenericRow.of(2, 200))); + + // Write same keys with different values - should be ignored (first-row keeps first) + writeRows(firstRowTable, Arrays.asList(GenericRow.of(1, 999), GenericRow.of(2, 888))); + + // Should still see the first values + assertThat(readRows(firstRowTable)) + .containsExactlyInAnyOrder(GenericRow.of(1, 100), GenericRow.of(2, 200)); + } + /** Test first-row mode with multiple commits. */ @Test public void testFirstRowMultipleCommits() throws Exception { @@ -617,6 +634,59 @@ public void testFirstRowDeletionVectorCorrectness() throws Exception { .containsExactlyInAnyOrder(GenericRow.of(1, 10), GenericRow.of(2, 20)); } + /** + * Test that FIRST_ROW inline dedup actually reduces the number of records written. Duplicate + * keys should be dropped during sort-and-rewrite, resulting in fewer total rows across data + * files compared to the number of rows written. + */ + @Test + public void testFirstRowInlineDedupReducesFileRows() throws Exception { + Table firstRowTable = createFirstRowTable(); + + // Commit 1: write 5 unique keys + writeRows( + firstRowTable, + Arrays.asList( + GenericRow.of(1, 10), + GenericRow.of(2, 20), + GenericRow.of(3, 30), + GenericRow.of(4, 40), + GenericRow.of(5, 50))); + + // Commit 2: write 5 duplicate keys (all should be dropped inline) + writeRows( + firstRowTable, + Arrays.asList( + GenericRow.of(1, 99), + GenericRow.of(2, 99), + GenericRow.of(3, 99), + GenericRow.of(4, 99), + GenericRow.of(5, 99))); + + // Verify correctness: still see first values + assertThat(readRows(firstRowTable)) + .containsExactlyInAnyOrder( + GenericRow.of(1, 10), + GenericRow.of(2, 20), + GenericRow.of(3, 30), + GenericRow.of(4, 40), + GenericRow.of(5, 50)); + + // Verify optimization: total row count across all data files should be exactly 5 + // (duplicates dropped during writing, not just DV-marked) + List splits = firstRowTable.newReadBuilder().newScan().plan().splits(); + long totalRows = + splits.stream() + .mapToLong( + split -> + ((DataSplit) split) + .dataFiles().stream() + .mapToLong(DataFileMeta::rowCount) + .sum()) + .sum(); + assertThat(totalRows).isEqualTo(5); + } + /** Test first-row mode with many writes to trigger compaction. */ @Test public void testFirstRowManyWrites() throws Exception { @@ -915,6 +985,22 @@ private Table createFirstRowTable() throws Exception { return catalog.getTable(identifier); } + private Table createFirstRowTableWithoutDv() throws Exception { + Identifier identifier = Identifier.create("default", "first_row_no_dv_table"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .primaryKey("a") + .option(BUCKET.key(), "1") + .option(CLUSTERING_COLUMNS.key(), "b") + .option(PK_CLUSTERING_OVERRIDE.key(), "true") + .option(MERGE_ENGINE.key(), "first-row") + .build(); + catalog.createTable(identifier, schema, false); + return catalog.getTable(identifier); + } + private void writeRows(List rows) throws Exception { writeRows(table, rows); }