diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/LimitRecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/LimitRecordReader.java new file mode 100644 index 000000000000..59e1a767b8ff --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/LimitRecordReader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** A {@link RecordReader} that stops after reading a given number of records. */ +public final class LimitRecordReader implements RecordReader { + + private final RecordReader reader; + private final long limit; + private final AtomicLong recordCount = new AtomicLong(0); + + public LimitRecordReader(RecordReader reader, long limit) { + checkArgument(limit > 0, "Limit must be positive."); + this.reader = reader; + this.limit = limit; + } + + public static RecordReader limit(RecordReader reader, @Nullable Integer limit) { + if (limit == null || limit <= 0) { + return reader; + } + return new LimitRecordReader<>(reader, limit); + } + + @Override + @Nullable + public RecordIterator readBatch() throws IOException { + if (recordCount.get() >= limit) { + return null; + } + RecordIterator iterator = reader.readBatch(); + if (iterator == null) { + return null; + } + return new LimitRecordIterator<>(iterator); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private class LimitRecordIterator implements RecordIterator { + + private final RecordIterator iterator; + + private LimitRecordIterator(RecordIterator iterator) { + this.iterator = iterator; + } + + @Override + @Nullable + public T next() throws IOException { + if (recordCount.get() >= limit) { + return null; + } + T next = iterator.next(); + if (next != null) { + recordCount.incrementAndGet(); + } + return next; + } + + @Override + public void releaseBatch() { + iterator.releaseBatch(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index 689117f1d22c..242f5022a6ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.BinaryRow; @@ -41,6 +42,7 @@ import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.LimitRecordReader; import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; @@ -54,6 +56,9 @@ import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.UserDefinedSeqComparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -74,6 +79,8 @@ */ public class MergeFileSplitRead implements SplitRead { + private static final Logger LOG = LoggerFactory.getLogger(MergeFileSplitRead.class); + private final TableSchema tableSchema; private final FileIO fileIO; private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; @@ -82,6 +89,7 @@ public class MergeFileSplitRead implements SplitRead { private final MergeSorter mergeSorter; private final List sequenceFields; private final boolean sequenceOrder; + private final CoreOptions coreOptions; @Nullable private RowType readKeyType; @Nullable private RowType outerReadType; @@ -89,7 +97,10 @@ public class MergeFileSplitRead implements SplitRead { @Nullable private List filtersForKeys; @Nullable private List filtersForAll; + @Nullable private Integer limit; + private boolean forceKeepDelete = false; + private boolean mergeReadLimitLogEmitted = false; public MergeFileSplitRead( CoreOptions options, @@ -100,6 +111,7 @@ public MergeFileSplitRead( MergeFunctionFactory mfFactory, KeyValueFileReaderFactory.Builder readerFactoryBuilder) { this.tableSchema = schema; + this.coreOptions = options; this.readerFactoryBuilder = readerFactoryBuilder; this.fileIO = readerFactoryBuilder.fileIO(); this.keyComparator = keyComparator; @@ -177,6 +189,12 @@ public MergeFileSplitRead forceKeepDelete() { return this; } + @Override + public MergeFileSplitRead withLimit(@Nullable Integer limit) { + this.limit = limit; + return this; + } + @Override public MergeFileSplitRead withFilter(Predicate predicate) { if (predicate == null) { @@ -312,6 +330,7 @@ public RecordReader createMergeReader( reader = new DropDeleteReader(reader); } + reader = LimitRecordReader.limit(reader, effectiveReadLimit()); return projectOuter(projectKey(reader)); } @@ -334,7 +353,89 @@ public RecordReader createNoMergeReader( suppliers.add(() -> readerFactory.createRecordReader(file)); } - return projectOuter(ConcatRecordReader.create(suppliers)); + return LimitRecordReader.limit( + projectOuter(ConcatRecordReader.create(suppliers)), effectiveReadLimit()); + } + + /** + * Limit on merge read is only safe when filters are fully applied before truncation. This + * aligns with {@link KeyValueFileStoreScan#limitPushdownEnabled()} and additionally rejects + * non-primary-key filters, which are not pushed down to overlapping L0 sections (see {@link + * #withFilter(Predicate)}). + */ + @Nullable + private Integer effectiveReadLimit() { + if (limit == null || limit <= 0) { + return null; + } + + String disabledReason = mergeReadLimitDisabledReason(); + if (disabledReason != null) { + logMergeReadLimitOnce(disabledReason); + return null; + } + + logMergeReadLimitOnce(null); + return limit; + } + + /** Returns why merge read limit is unsafe, or {@code null} if limit can be applied. */ + @Nullable + private String mergeReadLimitDisabledReason() { + if (forceKeepDelete) { + return "forceKeepDelete is enabled"; + } + + MergeEngine mergeEngine = coreOptions.mergeEngine(); + if (mergeEngine == MergeEngine.PARTIAL_UPDATE) { + return "merge-engine is partial-update"; + } + if (mergeEngine == MergeEngine.AGGREGATE) { + return "merge-engine is aggregation"; + } + + if (coreOptions.deletionVectorsEnabled()) { + return "deletion-vectors is enabled"; + } + + if (hasNonPrimaryKeyFilter()) { + return "non-primary-key filter is present"; + } + + return null; + } + + private void logMergeReadLimitOnce(@Nullable String disabledReason) { + if (mergeReadLimitLogEmitted) { + return; + } + mergeReadLimitLogEmitted = true; + if (disabledReason != null) { + LOG.info( + "Merge read limit {} is disabled: {}. Limit will not be applied during merge read.", + limit, + disabledReason); + } else { + LOG.info("Applying merge read limit {} during merge read.", limit); + } + } + + private boolean hasNonPrimaryKeyFilter() { + if (filtersForAll == null || filtersForAll.isEmpty()) { + return false; + } + + List primaryKeys = tableSchema.trimmedPrimaryKeys(); + Set nonPrimaryKeys = + tableSchema.fieldNames().stream() + .filter(name -> !primaryKeys.contains(name)) + .collect(Collectors.toSet()); + for (Predicate filter : filtersForAll) { + if (containsFields(filter, nonPrimaryKeys)) { + return true; + } + } + return false; } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java index 765cfd2c9a37..9939b1b75057 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateProjectionConverter; +import org.apache.paimon.reader.LimitRecordReader; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.source.Split; @@ -31,7 +32,6 @@ import java.io.IOException; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; /** A {@link TableRead} implementation for {@link FormatTable}. */ public class FormatTableRead implements TableRead { @@ -80,11 +80,7 @@ public RecordReader createReader(Split split) throws IOException { if (executeFilter) { reader = executeFilter(reader); } - if (limit != null && limit > 0) { - reader = applyLimit(reader, limit); - } - - return reader; + return LimitRecordReader.limit(reader, limit); } private RecordReader executeFilter(RecordReader reader) { @@ -106,44 +102,4 @@ private RecordReader executeFilter(RecordReader reader Predicate finalFilter = predicate; return reader.filter(finalFilter::test); } - - private RecordReader applyLimit(RecordReader reader, int limit) { - return new RecordReader() { - private final AtomicLong recordCount = new AtomicLong(0); - - @Override - public RecordIterator readBatch() throws IOException { - if (recordCount.get() >= limit) { - return null; - } - RecordIterator iterator = reader.readBatch(); - if (iterator == null) { - return null; - } - return new RecordIterator() { - @Override - public InternalRow next() throws IOException { - if (recordCount.get() >= limit) { - return null; - } - InternalRow next = iterator.next(); - if (next != null) { - recordCount.incrementAndGet(); - } - return next; - } - - @Override - public void releaseBatch() { - iterator.releaseBatch(); - } - }; - } - - @Override - public void close() throws IOException { - reader.close(); - } - }; - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index db65eb8d1660..8bb673bc9f15 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -31,6 +31,8 @@ import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -164,6 +166,115 @@ public List valueFields(TableSchema schema) { assertThat(actual).isEqualTo(expected); } + @Test + public void testWithLimit() throws Exception { + TestKeyValueGenerator gen = new TestKeyValueGenerator(); + int numRecords = 100; + List data = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + data.add(gen.next()); + } + TestFileStore store = + createStore( + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory()); + + // Write multiple times to create overlapping level-0 files in the same bucket. + for (int i = 0; i < 5; i++) { + store.commitData(data.subList(i * 20, (i + 1) * 20), gen::getPartition, kv -> 0); + } + + Long snapshotId = store.snapshotManager().latestSnapshotId(); + FileStoreScan scan = store.newScan(); + Map> filesGroupedByPartition = + scan.withSnapshot(snapshotId).plan().files().stream() + .collect(Collectors.groupingBy(ManifestEntry::partition)); + assertThat(filesGroupedByPartition.values().iterator().next()).hasSizeGreaterThan(1); + + int limit = 10; + MergeFileSplitRead read = store.newRead().withLimit(limit); + Map.Entry> entry = + filesGroupedByPartition.entrySet().iterator().next(); + RecordReader reader = + read.createReader( + DataSplit.builder() + .withSnapshot(snapshotId) + .withPartition(entry.getKey()) + .withBucket(0) + .withDataFiles( + entry.getValue().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList())) + .withBucketPath("not used") + .build()); + int count = 0; + RecordReaderIterator iterator = new RecordReaderIterator<>(reader); + while (iterator.hasNext()) { + iterator.next(); + count++; + } + iterator.close(); + assertThat(count).isEqualTo(limit); + } + + @Test + public void testWithLimitDisabledByNonPrimaryKeyFilter() throws Exception { + TestKeyValueGenerator gen = new TestKeyValueGenerator(); + int numRecords = 100; + List data = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + data.add(gen.next()); + } + TestFileStore store = + createStore( + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory()); + + for (int i = 0; i < 5; i++) { + store.commitData(data.subList(i * 20, (i + 1) * 20), gen::getPartition, kv -> 0); + } + + Long snapshotId = store.snapshotManager().latestSnapshotId(); + Map> filesGroupedByPartition = + store.newScan().withSnapshot(snapshotId).plan().files().stream() + .collect(Collectors.groupingBy(ManifestEntry::partition)); + Map.Entry> entry = + filesGroupedByPartition.entrySet().iterator().next(); + + int limit = 10; + Predicate filter = + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).isNotNull(6); + MergeFileSplitRead read = store.newRead().withFilter(filter).withLimit(limit); + RecordReader reader = + read.createReader( + DataSplit.builder() + .withSnapshot(snapshotId) + .withPartition(entry.getKey()) + .withBucket(0) + .withDataFiles( + entry.getValue().stream() + .map(ManifestEntry::file) + .collect(Collectors.toList())) + .withBucketPath("not used") + .build()); + int count = 0; + RecordReaderIterator iterator = new RecordReaderIterator<>(reader); + while (iterator.hasNext()) { + iterator.next(); + count++; + } + iterator.close(); + assertThat(count) + .as("Non-PK filter must disable merge read limit to preserve SQL semantics") + .isGreaterThan(limit); + } + @Test public void testValueProjection() throws Exception { // (dt, hr, shopId, orderId, itemId, priceAmount, comment) -> (shopId, itemId, dt, hr) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MergeFileSplitReadLimitITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MergeFileSplitReadLimitITCase.java new file mode 100644 index 000000000000..f1dfe5796568 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MergeFileSplitReadLimitITCase.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Batch IT for LIMIT push-down on primary key tables with multiple level-0 files (merge read path). + */ +public class MergeFileSplitReadLimitITCase extends CatalogITCaseBase { + + private static final int TOTAL_ROWS = 100; + private static final int COMMITS = 5; + private static final int LIMIT = 10; + + @Override + protected List ddl() { + return singletonList( + "CREATE TABLE IF NOT EXISTS pk_l0_limit (" + + "id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH (" + + "'bucket' = '1', " + + "'write-only' = 'true', " + + "'changelog-producer' = 'none', " + + "'merge-engine' = 'deduplicate')"); + } + + @Nullable + @Override + protected Boolean sqlSyncMode() { + return true; + } + + @Test + public void testLimitOnMultiLevel0PrimaryKeyTable() { + writeMultipleCommits(); + + long level0FileCount = + batchSql("SELECT file_path FROM `pk_l0_limit$files` WHERE level = 0").size(); + assertThat(level0FileCount) + .as("Need multiple level-0 files to exercise merge read path") + .isGreaterThan(1); + + List limited = + batchSql( + String.format( + "SELECT * FROM pk_l0_limit /*+ OPTIONS('scan.infer-parallelism'='false', 'scan.parallelism'='1') */ LIMIT %d", + LIMIT)); + assertThat(limited).hasSize(LIMIT); + + List all = batchSql("SELECT * FROM pk_l0_limit"); + assertThat(all).hasSize(TOTAL_ROWS); + } + + @Test + public void testLimitWithValueFilterReturnsCorrectRows() { + writeMultipleCommits(); + + List filtered = + batchSql( + String.format( + "SELECT * FROM pk_l0_limit /*+ OPTIONS('scan.infer-parallelism'='false', 'scan.parallelism'='1') */ " + + "WHERE v > %d LIMIT %d", + TOTAL_ROWS / 2, LIMIT)); + assertThat(filtered).hasSize(LIMIT); + for (Row row : filtered) { + assertThat((int) row.getField(1)).isGreaterThan(TOTAL_ROWS / 2); + } + } + + private void writeMultipleCommits() { + int rowsPerCommit = TOTAL_ROWS / COMMITS; + IntStream.range(0, COMMITS) + .forEach( + commit -> { + StringBuilder values = new StringBuilder(); + for (int i = 0; i < rowsPerCommit; i++) { + int id = commit * rowsPerCommit + i; + if (i > 0) { + values.append(", "); + } + values.append(String.format("(%d, %d)", id, id)); + } + batchSql("INSERT INTO pk_l0_limit VALUES " + values); + }); + } +}