Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/content/primary-key-table/pk-clustering-override.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ CREATE TABLE my_table (
);
```

For `first-row` merge engine, deletion vectors are already built-in, so you don't need to enable them explicitly:

```sql
CREATE TABLE my_table (
id BIGINT,
dt STRING,
city STRING,
amount DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'pk-clustering-override' = 'true',
'clustering.columns' = 'city',
'merge-engine' = 'first-row',
'bucket' = '4'
);
```

After this, data files within each bucket will be physically sorted by `city` instead of `id`. Queries like
`SELECT * FROM my_table WHERE city = 'Beijing'` can skip irrelevant data files by checking their min/max statistics
on the clustering column.
Expand All @@ -60,7 +77,7 @@ on the clustering column.
|--------|-------------|
| `pk-clustering-override` | `true` |
| `clustering.columns` | Must be set (one or more non-primary-key columns) |
| `deletion-vectors.enabled` | Must be `true` |
| `deletion-vectors.enabled` | Must be `true` (not required for `first-row` merge engine) |
| `merge-engine` | `deduplicate` (default) or `first-row` only |

## When to Use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -204,10 +206,14 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
// Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
for (DataFileMeta file : unsortedFiles) {
Set<String> originalFileNames = Collections.singleton(file.fileName());
List<DataFileMeta> sortedFiles =
fileRewriter.sortAndRewriteFiles(
singletonList(file), kvSerializer, kvSchemaType);
keyIndex.updateIndex(file, sortedFiles);
singletonList(file),
kvSerializer,
kvSchemaType,
keyIndex,
originalFileNames);
result.before().add(file);
result.after().addAll(sortedFiles);
}
Expand All @@ -232,19 +238,23 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
keyIndex.rebuildIndex(newFile);
}
// Remove stale deletion vectors for merged-away files
for (DataFileMeta file : mergeGroup) {
dvMaintainer.removeDeletionVectorOf(file.fileName());
if (dvMaintainer != null) {
for (DataFileMeta file : mergeGroup) {
dvMaintainer.removeDeletionVectorOf(file.fileName());
}
}
result.before().addAll(mergeGroup);
result.after().addAll(mergedFiles);
}
}

CompactDeletionFile deletionFile =
lazyGenDeletionFile
? CompactDeletionFile.lazyGeneration(dvMaintainer)
: CompactDeletionFile.generateFiles(dvMaintainer);
result.setDeletionFile(deletionFile);
if (dvMaintainer != null) {
CompactDeletionFile deletionFile =
lazyGenDeletionFile
? CompactDeletionFile.lazyGeneration(dvMaintainer)
: CompactDeletionFile.generateFiles(dvMaintainer);
result.setDeletionFile(deletionFile);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.disk.ChannelReaderInputView;
import org.apache.paimon.disk.ChannelReaderInputViewIterator;
import org.apache.paimon.disk.ChannelWithMeta;
Expand All @@ -47,13 +48,16 @@
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.MutableObjectIterator;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;

/**
* Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and
Expand Down Expand Up @@ -112,10 +116,19 @@ public ClusteringFileRewriter(

/**
* Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them
* using an external sort buffer, and writes to new level-1 files.
* using an external sort buffer, and writes to new level-1 files. Checks the key index inline
* during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old
* positions in DV) and updates the index without re-reading the output files.
*
* @param keyIndex the key index for inline checking and batch update, or null to skip
* @param originalFileNames file names of the original files being replaced (for index check)
*/
public List<DataFileMeta> sortAndRewriteFiles(
List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, RowType kvSchemaType)
List<DataFileMeta> inputFiles,
KeyValueSerializer kvSerializer,
RowType kvSchemaType,
@Nullable ClusteringKeyIndex keyIndex,
Set<String> originalFileNames)
throws Exception {
int[] sortFieldsInKeyValue =
Arrays.stream(clusteringColumns)
Expand Down Expand Up @@ -145,17 +158,29 @@ public List<DataFileMeta> sortAndRewriteFiles(
}
}

RowCompactedSerializer keySerializer =
keyIndex != null ? new RowCompactedSerializer(keyType) : null;
List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : null;

RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingClusteringFileWriter();
try {
MutableObjectIterator<BinaryRow> sortedIterator = sortBuffer.sortedIterator();
BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
KeyValue kv = kvSerializer.fromRow(binaryRow);
writer.write(
KeyValue copied =
kv.copy(
new InternalRowSerializer(keyType),
new InternalRowSerializer(valueType)));
new InternalRowSerializer(valueType));
if (keyIndex != null) {
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
continue;
}
collectedKeys.add(keyBytes);
}
writer.write(copied);
}
} finally {
sortBuffer.clear();
Expand All @@ -170,6 +195,16 @@ public List<DataFileMeta> sortAndRewriteFiles(
fileLevels.addNewFile(newFile);
}

// Batch update index using collected keys, split by file rowCount
if (keyIndex != null) {
int offset = 0;
for (DataFileMeta newFile : newFiles) {
int count = (int) newFile.rowCount();
keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count));
offset += count;
}
}

return newFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -209,61 +207,47 @@ public Map.Entry<byte[], byte[]> next() {
}

/**
* Update the key index after a single original file is replaced by new sorted files.
* Check a key against the index during sort-and-rewrite writing.
*
* <p>For DEDUPLICATE mode: mark the old position in deletion vectors, keep the new position.
* <p>For FIRST_ROW mode: if key exists pointing to a non-original file, return false (skip
* writing this record — it's a duplicate).
*
* <p>For FIRST_ROW mode: if key exists, mark the new position in deletion vectors (keep the
* first/old one); if key is new, store the new position.
* <p>For DEDUPLICATE mode: if key exists pointing to a non-original file, mark the old position
* in deletion vectors, return true (write the new record).
*
* @param keyBytes serialized key bytes
* @param originalFileNames file names of the original unsorted files being replaced
* @return true if the record should be written, false to skip (FIRST_ROW dedup)
*/
public void updateIndex(DataFileMeta originalFile, List<DataFileMeta> newSortedFiles)
throws Exception {
updateIndex(Collections.singletonList(originalFile), newSortedFiles);
public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) throws Exception {
byte[] oldValue = kvDb.get(keyBytes);
if (oldValue != null) {
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
int oldFileId = decodeInt(valueIn);
int oldPosition = decodeInt(valueIn);
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
if (firstRow) {
return false;
} else {
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
}
}
}
return true;
}

/**
* Update the key index after multiple original files are replaced by new sorted files.
*
* @see #updateIndex(DataFileMeta, List)
* Batch update the key index for a new sorted file using pre-collected key bytes. Avoids
* re-reading the file.
*/
public void updateIndex(List<DataFileMeta> originalFiles, List<DataFileMeta> newSortedFiles)
throws Exception {
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);

Set<String> originalFileNames = new HashSet<>();
for (DataFileMeta file : originalFiles) {
originalFileNames.add(file.fileName());
}

for (DataFileMeta sortedFile : newSortedFiles) {
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
int position = 0;
try (CloseableIterator<InternalRow> iterator = readKeyIterator(sortedFile)) {
while (iterator.hasNext()) {
byte[] key = keySerializer.serializeToBytes(iterator.next());
byte[] oldValue = kvDb.get(key);
if (oldValue != null) {
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
int oldFileId = decodeInt(valueIn);
int oldPosition = decodeInt(valueIn);
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
if (firstRow) {
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
position++;
continue;
} else {
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
}
}
}
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
encodeInt(value, fileId);
encodeInt(value, position);
kvDb.put(key, value.toByteArray());
position++;
}
}
public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> keyBytesList) throws Exception {
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
for (int position = 0; position < keyBytesList.size(); position++) {
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
encodeInt(value, fileId);
encodeInt(value, position);
kvDb.put(keyBytesList.get(position), value.toByteArray());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ private static void validateForDeletionVectors(CoreOptions options) {
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now.");

// pk-clustering-override mode requires deletion vectors even for first-row
// pk-clustering-override mode allows deletion vectors for first-row
if (!options.pkClusteringOverride()) {
checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down Expand Up @@ -847,7 +847,8 @@ public static void validatePkClusteringOverride(CoreOptions options) {
throw new IllegalArgumentException(
"Cannot support 'pk-clustering-override' mode without 'clustering.columns'.");
}
if (!options.deletionVectorsEnabled()) {
if (!options.deletionVectorsEnabled()
&& options.mergeEngine() != CoreOptions.MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
"Cannot support deletion-vectors disabled in 'pk-clustering-override' mode.");
}
Expand Down
Loading
Loading