Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/flink_connector_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
<td>Boolean</td>
<td>Bounded mode for Paimon consumer. By default, Paimon automatically selects bounded mode based on the mode of the Flink job.</td>
</tr>
<tr>
<td><h5>scan.bucket</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Specify a single bucket to scan. This option filters manifest entries and only plans splits for the given bucket. It is only supported for fixed-bucket primary key tables (bucket &gt; 0). It cannot be used with postpone bucket tables.</td>
</tr>
<tr>
<td><h5>scan.dedicated-split-generation</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
Expand All @@ -37,6 +39,7 @@

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;

/** Implementation for {@link ReadBuilder}. */
Expand Down Expand Up @@ -161,10 +164,36 @@ public ReadBuilder withRowRangeIndex(RowRangeIndex rowRangeIndex) {

@Override
public ReadBuilder withBucket(int bucket) {
validateSpecifiedBucket(table, bucket);
this.specifiedBucket = bucket;
return this;
}

/**
* Validates bucket id before manifest pruning ({@link InnerTableScan#withBucket(int)}). Callers
* such as Flink {@code scan.bucket} should route through {@link #withBucket(int)}.
*/
static void validateSpecifiedBucket(InnerTable table, int bucket) {
checkArgument(bucket >= 0, "Bucket id must be non-negative, but is %s.", bucket);
if (!(table instanceof FileStoreTable)) {
throw new IllegalArgumentException(
"Bucket scan is only supported for FileStoreTable, but got "
+ table.getClass().getName());
}
FileStoreTable fileStoreTable = (FileStoreTable) table;
checkArgument(
fileStoreTable.bucketMode() != BucketMode.POSTPONE_MODE,
"Bucket scan is not supported for postpone bucket tables.");
int numBuckets = CoreOptions.fromMap(fileStoreTable.options()).bucket();
if (numBuckets > 0) {
checkArgument(
bucket < numBuckets,
"Bucket id %s must be less than table bucket number %s.",
bucket,
numBuckets);
}
}

@Override
public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
this.bucketFilter = bucketFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ public class FlinkConnectorOptions {
+ "normal source, the max partition(s) will be determined before job running "
+ "without refreshing even for streaming jobs.");

public static final ConfigOption<Integer> SCAN_BUCKET =
ConfigOptions.key("scan.bucket")
.intType()
.noDefaultValue()
.withDescription(
"Specify a single bucket to scan. This option filters manifest entries "
+ "and only plans splits for the given bucket. It is only supported "
+ "for fixed-bucket primary key tables (bucket > 0). It cannot be used "
+ "with postpone bucket tables.");

public static final ConfigOption<Duration> LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorSource;
import org.apache.paimon.flink.utils.ScanBucketUtils;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
Expand Down Expand Up @@ -200,6 +201,7 @@ private ReadBuilder createReadBuilder(@Nullable org.apache.paimon.types.RowType
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
ScanBucketUtils.applyScanBucket(table, readBuilder, conf);
return readBuilder.dropStats();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
import org.apache.paimon.flink.lookup.PartitionLoader;
import org.apache.paimon.flink.lookup.StaticPartitionLoader;
import org.apache.paimon.flink.utils.ScanBucketUtils;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
Expand All @@ -35,6 +36,7 @@
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

Expand Down Expand Up @@ -245,13 +247,14 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
protected void scanSplitsForInference() {
if (splitStatistics == null) {
if (table instanceof DataTable) {
List<PartitionEntry> partitionEntries =
ReadBuilder readBuilder =
table.newReadBuilder()
.dropStats()
.withFilter(predicate)
.withPartitionFilter(partitionPredicate)
.newScan()
.listPartitionEntries();
.withPartitionFilter(partitionPredicate);
ScanBucketUtils.applyScanBucket(table, readBuilder, options);
List<PartitionEntry> partitionEntries =
readBuilder.newScan().listPartitionEntries();
long totalSize = 0;
long rowCount = 0;
for (PartitionEntry entry : partitionEntries) {
Expand All @@ -262,15 +265,14 @@ protected void scanSplitsForInference() {
splitStatistics =
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
List<Split> splits =
ReadBuilder readBuilder =
table.newReadBuilder()
.dropStats()
.withFilter(predicate)
.withPartitionFilter(partitionPredicate)
.withProjection(new int[0])
.newScan()
.plan()
.splits();
.withProjection(new int[0]);
ScanBucketUtils.applyScanBucket(table, readBuilder, options);
List<Split> splits = readBuilder.newScan().plan().splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;

/** Utilities for {@link FlinkConnectorOptions#SCAN_BUCKET}. */
public class ScanBucketUtils {

private ScanBucketUtils() {}

/** Apply {@link FlinkConnectorOptions#SCAN_BUCKET} to the given {@link ReadBuilder}. */
public static ReadBuilder applyScanBucket(
Table table, ReadBuilder readBuilder, Options options) {
Integer scanBucket = options.get(FlinkConnectorOptions.SCAN_BUCKET);
if (scanBucket == null) {
return readBuilder;
}
return readBuilder.withBucket(scanBucket);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

/** ITCase for {@link org.apache.paimon.flink.FlinkConnectorOptions#SCAN_BUCKET}. */
public class ScanBucketITCase extends CatalogITCaseBase {

@Override
protected List<String> ddl() {
return singletonList(
"CREATE TABLE IF NOT EXISTS T (id INT, v INT, PRIMARY KEY (id) NOT ENFORCED) "
+ "WITH ('bucket' = '4')");
}

@Nullable
@Override
protected Boolean sqlSyncMode() {
return true;
}

@Test
public void testScanBucketFilter() throws Exception {
sql(
"INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (7, 70), (8, 80)");

FileStoreTable table = paimonTable("T");

int targetBucket = 0;
for (int bucket = 0; bucket < 4; bucket++) {
List<ManifestEntry> files = table.store().newScan().withBucket(bucket).plan().files();
if (!files.isEmpty()) {
targetBucket = bucket;
break;
}
}

List<Row> expected = readRowsFromBucket(table, targetBucket);

List<Row> actual =
batchSql(
String.format(
"SELECT * FROM T /*+ OPTIONS('scan.bucket' = '%s') */",
targetBucket));

assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}

private List<Row> readRowsFromBucket(FileStoreTable table, int bucket) throws Exception {
List<ManifestEntry> files = table.store().newScan().withBucket(bucket).plan().files();
List<Row> rows = new ArrayList<>();
for (ManifestEntry file : files) {
DataSplit split =
DataSplit.builder()
.withPartition(file.partition())
.withBucket(file.bucket())
.withDataFiles(Collections.singletonList(file.file()))
.withBucketPath("not used")
.build();
RecordReader<InternalRow> reader = table.newReadBuilder().newRead().createReader(split);
RecordReaderIterator<InternalRow> iterator = new RecordReaderIterator<>(reader);
while (iterator.hasNext()) {
InternalRow row = iterator.next();
rows.add(Row.of(row.getInt(0), row.getInt(1)));
}
iterator.close();
}
return rows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ScanBucketUtils}. */
public class ScanBucketUtilsTest extends AbstractTestBase {

@TempDir java.nio.file.Path tempDir;

@Test
public void testInvalidBucket() throws Exception {
FileStoreTable table = createTable(4);
Options options = new Options();
options.set(FlinkConnectorOptions.SCAN_BUCKET, 5);
ReadBuilder readBuilder = table.newReadBuilder();
assertThatThrownBy(() -> ScanBucketUtils.applyScanBucket(table, readBuilder, options))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Bucket id 5 must be less than table bucket number 4");
}

private FileStoreTable createTable(int numBuckets) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.BUCKET.key(), String.valueOf(numBuckets));
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("v", DataTypes.INT())
.primaryKey("id")
.options(options)
.build();
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
catalog.createDatabase("default", true);
Identifier identifier = Identifier.create("default", "test_bucket");
catalog.createTable(identifier, schema, false);
return (FileStoreTable) catalog.getTable(identifier);
}
}
Loading