Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link RecordReader} that stops after reading a given number of records. */
public final class LimitRecordReader<T> implements RecordReader<T> {

private final RecordReader<T> reader;
private final long limit;
private final AtomicLong recordCount = new AtomicLong(0);

public LimitRecordReader(RecordReader<T> reader, long limit) {
checkArgument(limit > 0, "Limit must be positive.");
this.reader = reader;
this.limit = limit;
}

public static <T> RecordReader<T> limit(RecordReader<T> reader, @Nullable Integer limit) {
if (limit == null || limit <= 0) {
return reader;
}
return new LimitRecordReader<>(reader, limit);
}

@Override
@Nullable
public RecordIterator<T> readBatch() throws IOException {
if (recordCount.get() >= limit) {
return null;
}
RecordIterator<T> iterator = reader.readBatch();
if (iterator == null) {
return null;
}
return new LimitRecordIterator<>(iterator);
}

@Override
public void close() throws IOException {
reader.close();
}

private class LimitRecordIterator<T> implements RecordIterator<T> {

private final RecordIterator<T> iterator;

private LimitRecordIterator(RecordIterator<T> iterator) {
this.iterator = iterator;
}

@Override
@Nullable
public T next() throws IOException {
if (recordCount.get() >= limit) {
return null;
}
T next = iterator.next();
if (next != null) {
recordCount.incrementAndGet();
}
return next;
}

@Override
public void releaseBatch() {
iterator.releaseBatch();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
Expand All @@ -41,6 +42,7 @@
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.LimitRecordReader;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -54,6 +56,9 @@
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -74,6 +79,8 @@
*/
public class MergeFileSplitRead implements SplitRead<KeyValue> {

private static final Logger LOG = LoggerFactory.getLogger(MergeFileSplitRead.class);

private final TableSchema tableSchema;
private final FileIO fileIO;
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
Expand All @@ -82,14 +89,18 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
private final MergeSorter mergeSorter;
private final List<String> sequenceFields;
private final boolean sequenceOrder;
private final CoreOptions coreOptions;

@Nullable private RowType readKeyType;
@Nullable private RowType outerReadType;

@Nullable private List<Predicate> filtersForKeys;
@Nullable private List<Predicate> filtersForAll;

@Nullable private Integer limit;

private boolean forceKeepDelete = false;
private boolean mergeReadLimitLogEmitted = false;

public MergeFileSplitRead(
CoreOptions options,
Expand All @@ -100,6 +111,7 @@ public MergeFileSplitRead(
MergeFunctionFactory<KeyValue> mfFactory,
KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
this.tableSchema = schema;
this.coreOptions = options;
this.readerFactoryBuilder = readerFactoryBuilder;
this.fileIO = readerFactoryBuilder.fileIO();
this.keyComparator = keyComparator;
Expand Down Expand Up @@ -177,6 +189,12 @@ public MergeFileSplitRead forceKeepDelete() {
return this;
}

@Override
public MergeFileSplitRead withLimit(@Nullable Integer limit) {
this.limit = limit;
return this;
}

@Override
public MergeFileSplitRead withFilter(Predicate predicate) {
if (predicate == null) {
Expand Down Expand Up @@ -312,6 +330,7 @@ public RecordReader<KeyValue> createMergeReader(
reader = new DropDeleteReader(reader);
}

reader = LimitRecordReader.limit(reader, effectiveReadLimit());
return projectOuter(projectKey(reader));
}

Expand All @@ -334,7 +353,89 @@ public RecordReader<KeyValue> createNoMergeReader(
suppliers.add(() -> readerFactory.createRecordReader(file));
}

return projectOuter(ConcatRecordReader.create(suppliers));
return LimitRecordReader.limit(
projectOuter(ConcatRecordReader.create(suppliers)), effectiveReadLimit());
}

/**
* Limit on merge read is only safe when filters are fully applied before truncation. This
* aligns with {@link KeyValueFileStoreScan#limitPushdownEnabled()} and additionally rejects
* non-primary-key filters, which are not pushed down to overlapping L0 sections (see {@link
* #withFilter(Predicate)}).
*/
@Nullable
private Integer effectiveReadLimit() {
if (limit == null || limit <= 0) {
return null;
}

String disabledReason = mergeReadLimitDisabledReason();
if (disabledReason != null) {
logMergeReadLimitOnce(disabledReason);
return null;
}

logMergeReadLimitOnce(null);
return limit;
}

/** Returns why merge read limit is unsafe, or {@code null} if limit can be applied. */
@Nullable
private String mergeReadLimitDisabledReason() {
if (forceKeepDelete) {
return "forceKeepDelete is enabled";
}

MergeEngine mergeEngine = coreOptions.mergeEngine();
if (mergeEngine == MergeEngine.PARTIAL_UPDATE) {
return "merge-engine is partial-update";
}
if (mergeEngine == MergeEngine.AGGREGATE) {
return "merge-engine is aggregation";
}

if (coreOptions.deletionVectorsEnabled()) {
return "deletion-vectors is enabled";
}

if (hasNonPrimaryKeyFilter()) {
return "non-primary-key filter is present";
}

return null;
}

private void logMergeReadLimitOnce(@Nullable String disabledReason) {
if (mergeReadLimitLogEmitted) {
return;
}
mergeReadLimitLogEmitted = true;
if (disabledReason != null) {
LOG.info(
"Merge read limit {} is disabled: {}. Limit will not be applied during merge read.",
limit,
disabledReason);
} else {
LOG.info("Applying merge read limit {} during merge read.", limit);
}
}

private boolean hasNonPrimaryKeyFilter() {
if (filtersForAll == null || filtersForAll.isEmpty()) {
return false;
}

List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
Set<String> nonPrimaryKeys =
tableSchema.fieldNames().stream()
.filter(name -> !primaryKeys.contains(name))
.collect(Collectors.toSet());
for (Predicate filter : filtersForAll) {
if (containsFields(filter, nonPrimaryKeys)) {
return true;
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.reader.LimitRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.source.Split;
Expand All @@ -31,7 +32,6 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/** A {@link TableRead} implementation for {@link FormatTable}. */
public class FormatTableRead implements TableRead {
Expand Down Expand Up @@ -80,11 +80,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (executeFilter) {
reader = executeFilter(reader);
}
if (limit != null && limit > 0) {
reader = applyLimit(reader, limit);
}

return reader;
return LimitRecordReader.limit(reader, limit);
}

private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow> reader) {
Expand All @@ -106,44 +102,4 @@ private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow> reader
Predicate finalFilter = predicate;
return reader.filter(finalFilter::test);
}

private RecordReader<InternalRow> applyLimit(RecordReader<InternalRow> reader, int limit) {
return new RecordReader<InternalRow>() {
private final AtomicLong recordCount = new AtomicLong(0);

@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
if (recordCount.get() >= limit) {
return null;
}
RecordIterator<InternalRow> iterator = reader.readBatch();
if (iterator == null) {
return null;
}
return new RecordIterator<InternalRow>() {
@Override
public InternalRow next() throws IOException {
if (recordCount.get() >= limit) {
return null;
}
InternalRow next = iterator.next();
if (next != null) {
recordCount.incrementAndGet();
}
return next;
}

@Override
public void releaseBatch() {
iterator.releaseBatch();
}
};
}

@Override
public void close() throws IOException {
reader.close();
}
};
}
}
Loading