From 7e8c5d8805b91e149b5ff4681446f6f491b12a00 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Thu, 4 Jun 2026 15:36:30 +0800 Subject: [PATCH] [flink] Expose scan.bucket for single-bucket manifest pruning --- .../flink_connector_configuration.html | 6 + .../paimon/table/source/ReadBuilderImpl.java | 29 +++++ .../paimon/flink/FlinkConnectorOptions.java | 10 ++ .../flink/source/FlinkSourceBuilder.java | 2 + .../paimon/flink/source/FlinkTableSource.java | 20 ++-- .../paimon/flink/utils/ScanBucketUtils.java | 40 +++++++ .../apache/paimon/flink/ScanBucketITCase.java | 104 ++++++++++++++++++ .../flink/utils/ScanBucketUtilsTest.java | 75 +++++++++++++ 8 files changed, 277 insertions(+), 9 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java diff --git a/docs/generated/flink_connector_configuration.html b/docs/generated/flink_connector_configuration.html index 686349e7bdfc..792f5a699766 100644 --- a/docs/generated/flink_connector_configuration.html +++ b/docs/generated/flink_connector_configuration.html @@ -152,6 +152,12 @@ Boolean Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job. + +
scan.bucket
+ (none) + Integer + Specify a single bucket to scan. This option filters manifest entries and only plans splits for the given bucket. It is only supported for fixed-bucket primary key tables (bucket > 0). It cannot be used with postpone bucket tables. +
scan.dedicated-split-generation
false diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index 553a8487491e..5b918f79712d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -23,6 +23,8 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.TopN; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; @@ -37,6 +39,7 @@ import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.partition.PartitionPredicate.fromPredicate; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; /** Implementation for {@link ReadBuilder}. */ @@ -161,10 +164,36 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) { @Override public ReadBuilder withBucket(int bucket) { + validateSpecifiedBucket(table, bucket); this.specifiedBucket = bucket; return this; } + /** + * Validates bucket id before manifest pruning ({@link InnerTableScan#withBucket(int)}). Callers + * such as Flink {@code scan.bucket} should route through {@link #withBucket(int)}. + */ + static void validateSpecifiedBucket(InnerTable table, int bucket) { + checkArgument(bucket >= 0, "Bucket id must be non-negative, but is %s.", bucket); + if (!(table instanceof FileStoreTable)) { + throw new IllegalArgumentException( + "Bucket scan is only supported for FileStoreTable, but got " + + table.getClass().getName()); + } + FileStoreTable fileStoreTable = (FileStoreTable) table; + checkArgument( + fileStoreTable.bucketMode() != BucketMode.POSTPONE_MODE, + "Bucket scan is not supported for postpone bucket tables."); + int numBuckets = CoreOptions.fromMap(fileStoreTable.options()).bucket(); + if (numBuckets > 0) { + checkArgument( + bucket < numBuckets, + "Bucket id %s must be less than table bucket number %s.", + bucket, + numBuckets); + } + } + @Override public ReadBuilder withBucketFilter(Filter bucketFilter) { this.bucketFilter = bucketFilter; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 61c741fee288..c35e8ef04af0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -274,6 +274,16 @@ public class FlinkConnectorOptions { + "normal source, the max partition(s) will be determined before job running " + "without refreshing even for streaming jobs."); + public static final ConfigOption SCAN_BUCKET = + ConfigOptions.key("scan.bucket") + .intType() + .noDefaultValue() + .withDescription( + "Specify a single bucket to scan. This option filters manifest entries " + + "and only plans splits for the given bucket. It is only supported " + + "for fixed-bucket primary key tables (bucket > 0). It cannot be used " + + "with postpone bucket tables."); + public static final ConfigOption LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL = ConfigOptions.key("lookup.dynamic-partition.refresh-interval") .durationType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3e96dec1ea50..149f699a6381 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -26,6 +26,7 @@ import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource; import org.apache.paimon.flink.source.operator.MonitorSource; +import org.apache.paimon.flink.utils.ScanBucketUtils; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; @@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType if (limit != null) { readBuilder.withLimit(limit.intValue()); } + ScanBucketUtils.applyScanBucket(table, readBuilder, conf); return readBuilder.dropStats(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 66cb49798aa0..e1f509582e26 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -26,6 +26,7 @@ import org.apache.paimon.flink.lookup.DynamicPartitionLoader; import org.apache.paimon.flink.lookup.PartitionLoader; import org.apache.paimon.flink.lookup.StaticPartitionLoader; +import org.apache.paimon.flink.utils.ScanBucketUtils; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; @@ -35,6 +36,7 @@ import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.RowDataToObjectArrayConverter; @@ -245,13 +247,14 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { protected void scanSplitsForInference() { if (splitStatistics == null) { if (table instanceof DataTable) { - List partitionEntries = + ReadBuilder readBuilder = table.newReadBuilder() .dropStats() .withFilter(predicate) - .withPartitionFilter(partitionPredicate) - .newScan() - .listPartitionEntries(); + .withPartitionFilter(partitionPredicate); + ScanBucketUtils.applyScanBucket(table, readBuilder, options); + List partitionEntries = + readBuilder.newScan().listPartitionEntries(); long totalSize = 0; long rowCount = 0; for (PartitionEntry entry : partitionEntries) { @@ -262,15 +265,14 @@ protected void scanSplitsForInference() { splitStatistics = new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount); } else { - List splits = + ReadBuilder readBuilder = table.newReadBuilder() .dropStats() .withFilter(predicate) .withPartitionFilter(partitionPredicate) - .withProjection(new int[0]) - .newScan() - .plan() - .splits(); + .withProjection(new int[0]); + ScanBucketUtils.applyScanBucket(table, readBuilder, options); + List splits = readBuilder.newScan().plan().splits(); splitStatistics = new SplitStatistics( splits.size(), splits.stream().mapToLong(Split::rowCount).sum()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java new file mode 100644 index 000000000000..19f251233e6a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ScanBucketUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; + +/** Utilities for {@link FlinkConnectorOptions#SCAN_BUCKET}. */ +public class ScanBucketUtils { + + private ScanBucketUtils() {} + + /** Apply {@link FlinkConnectorOptions#SCAN_BUCKET} to the given {@link ReadBuilder}. */ + public static ReadBuilder applyScanBucket( + Table table, ReadBuilder readBuilder, Options options) { + Integer scanBucket = options.get(FlinkConnectorOptions.SCAN_BUCKET); + if (scanBucket == null) { + return readBuilder; + } + return readBuilder.withBucket(scanBucket); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java new file mode 100644 index 000000000000..417f0032dacd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ScanBucketITCase.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for {@link org.apache.paimon.flink.FlinkConnectorOptions#SCAN_BUCKET}. */ +public class ScanBucketITCase extends CatalogITCaseBase { + + @Override + protected List ddl() { + return singletonList( + "CREATE TABLE IF NOT EXISTS T (id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) " + + "WITH ('bucket' = '4')"); + } + + @Nullable + @Override + protected Boolean sqlSyncMode() { + return true; + } + + @Test + public void testScanBucketFilter() throws Exception { + sql( + "INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (7, 70), (8, 80)"); + + FileStoreTable table = paimonTable("T"); + + int targetBucket = 0; + for (int bucket = 0; bucket < 4; bucket++) { + List files = table.store().newScan().withBucket(bucket).plan().files(); + if (!files.isEmpty()) { + targetBucket = bucket; + break; + } + } + + List expected = readRowsFromBucket(table, targetBucket); + + List actual = + batchSql( + String.format( + "SELECT * FROM T /*+ OPTIONS('scan.bucket' = '%s') */", + targetBucket)); + + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + private List readRowsFromBucket(FileStoreTable table, int bucket) throws Exception { + List files = table.store().newScan().withBucket(bucket).plan().files(); + List rows = new ArrayList<>(); + for (ManifestEntry file : files) { + DataSplit split = + DataSplit.builder() + .withPartition(file.partition()) + .withBucket(file.bucket()) + .withDataFiles(Collections.singletonList(file.file())) + .withBucketPath("not used") + .build(); + RecordReader reader = table.newReadBuilder().newRead().createReader(split); + RecordReaderIterator iterator = new RecordReaderIterator<>(reader); + while (iterator.hasNext()) { + InternalRow row = iterator.next(); + rows.add(Row.of(row.getInt(0), row.getInt(1))); + } + iterator.close(); + } + return rows; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java new file mode 100644 index 000000000000..5fa81f4fc0ea --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/ScanBucketUtilsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.util.AbstractTestBase; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ScanBucketUtils}. */ +public class ScanBucketUtilsTest extends AbstractTestBase { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testInvalidBucket() throws Exception { + FileStoreTable table = createTable(4); + Options options = new Options(); + options.set(FlinkConnectorOptions.SCAN_BUCKET, 5); + ReadBuilder readBuilder = table.newReadBuilder(); + assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Bucket id 5 must be less than table bucket number 4"); + } + + private FileStoreTable createTable(int numBuckets) throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), String.valueOf(numBuckets)); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("v", DataTypes.INT()) + .primaryKey("id") + .options(options) + .build(); + Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString())); + catalog.createDatabase("default", true); + Identifier identifier = Identifier.create("default", "test_bucket"); + catalog.createTable(identifier, schema, false); + return (FileStoreTable) catalog.getTable(identifier); + } +}