From 34e99ffe6da15c7e04809543a39256df7a1883f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 5 Jun 2026 20:55:17 +0800 Subject: [PATCH] [core] Improve BTree global index option handling --- .../globalindex/btree/BTreeIndexOptions.java | 2 +- .../btree/BTreeIndexOptionsTest.java | 33 ++++++++++++ .../procedure/CreateGlobalIndexProcedure.java | 11 +++- .../paimon/flink/procedure/ProcedureBase.java | 2 +- .../CreateGlobalIndexProcedureTest.java | 53 +++++++++++++++++++ .../procedure/CreateGlobalIndexProcedure.java | 15 ++++-- .../CreateGlobalIndexProcedureTest.java | 22 ++++++++ 7 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexOptionsTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedureTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java index 8d0758a1fcb1..ab8636592ac3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java @@ -58,7 +58,7 @@ public class BTreeIndexOptions { public static final ConfigOption BTREE_INDEX_RECORDS_PER_RANGE = ConfigOptions.key("btree-index.records-per-range") .longType() - .defaultValue(1000_000L) + .defaultValue(10_000_000L) .withDescription("The expected number of records per BTree Index File."); public static final ConfigOption BTREE_INDEX_BUILD_MAX_PARALLELISM = diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexOptionsTest.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexOptionsTest.java new file mode 100644 index 000000000000..27ae90f98e9e --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexOptionsTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex.btree; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BTreeIndexOptions}. */ +class BTreeIndexOptionsTest { + + @Test + void testDefaultRecordsPerRange() { + assertThat(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.defaultValue()) + .isEqualTo(10_000_000L); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java index 5f4855567047..ad62ad8f7654 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java @@ -43,6 +43,14 @@ public class CreateGlobalIndexProcedure extends ProcedureBase { public static final String IDENTIFIER = "create_global_index"; + static Options createUserOptions(FileStoreTable table, String optionString) { + return createUserOptions(table.options(), optionString); + } + + static Options createUserOptions(Map tableOptions, String optionString) { + return new Options(tableOptions, optionalConfigMap(optionString)); + } + @Override public String identifier() { return IDENTIFIER; @@ -87,8 +95,7 @@ public String[] call( PartitionPredicate partitionPredicate = parsePartitionPredicate(table, partitions); // Parse options - Map parsedOptions = optionalConfigMap(options); - Options userOptions = Options.fromMap(parsedOptions); + Options userOptions = createUserOptions(table, options); // Build global index based on index type indexType = indexType.toLowerCase().trim(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index efb6aa50c219..59d49132f225 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -102,7 +102,7 @@ private String[] execute(JobClient jobClient, boolean dmlSync) { } } - protected Map optionalConfigMap(String configStr) { + protected static Map optionalConfigMap(String configStr) { if (StringUtils.isNullOrWhitespaceOnly(configStr)) { return Collections.emptyMap(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedureTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedureTest.java new file mode 100644 index 000000000000..5b879628b241 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedureTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CreateGlobalIndexProcedure}. */ +public class CreateGlobalIndexProcedureTest { + + @Test + public void testCreateUserOptionsUsesTableOptionsAndParsedOptionsOverride() { + Map tableOptions = new HashMap<>(); + tableOptions.put(BTreeIndexOptions.BTREE_INDEX_COMPRESSION.key(), "zstd"); + tableOptions.put(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(), "100"); + tableOptions.put("unrelated-table-option", "table-value"); + + Options userOptions = + CreateGlobalIndexProcedure.createUserOptions( + tableOptions, + BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key() + + "=200;procedure-only=procedure-value"); + + assertThat(userOptions.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION)).isEqualTo("zstd"); + assertThat(userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE)) + .isEqualTo(200L); + assertThat(userOptions.get("unrelated-table-option")).isEqualTo("table-value"); + assertThat(userOptions.get("procedure-only")).isEqualTo("procedure-value"); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index e25464b173d7..b447cdbd33f8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.UUID; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -91,6 +92,16 @@ public String description() { return "Create global index files for a given column."; } + static Options createUserOptions(FileStoreTable table, String optionString) { + return createUserOptions(table.options(), optionString); + } + + static Options createUserOptions(Map tableOptions, String optionString) { + HashMap parsedOptions = new HashMap<>(); + ProcedureUtils.putAllOptions(parsedOptions, optionString); + return new Options(tableOptions, parsedOptions); + } + @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); @@ -139,9 +150,7 @@ public InternalRow[] call(InternalRow args) { rowType.project(Collections.singletonList(column)); RowType readRowType = SpecialFields.rowTypeWithRowId(projectedRowType); - HashMap parsedOptions = new HashMap<>(); - ProcedureUtils.putAllOptions(parsedOptions, optionString); - Options userOptions = Options.fromMap(parsedOptions); + Options userOptions = createUserOptions(table, optionString); GlobalIndexTopologyBuilder topoBuilder = GlobalIndexTopologyBuilderUtils.createTopoBuilder(indexType); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java index 796ab7c54243..0a13f0619797 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java @@ -22,11 +22,13 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.fs.Path; import org.apache.paimon.globalindex.IndexedSplit; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.PojoDataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.PojoManifestEntry; +import org.apache.paimon.options.Options; import org.apache.paimon.spark.globalindex.DefaultGlobalIndexTopoBuilder; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.source.DataSplit; @@ -51,6 +53,26 @@ public class CreateGlobalIndexProcedureTest { private final BiFunction pathFactory = (a, b) -> new Path(UUID.randomUUID().toString()); + @Test + void testCreateUserOptionsUsesTableOptionsAndParsedOptionsOverride() { + Map tableOptions = new HashMap<>(); + tableOptions.put(BTreeIndexOptions.BTREE_INDEX_COMPRESSION.key(), "zstd"); + tableOptions.put(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(), "100"); + tableOptions.put("unrelated-table-option", "table-value"); + + Options userOptions = + CreateGlobalIndexProcedure.createUserOptions( + tableOptions, + BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key() + + "=200, procedure-only=procedure-value"); + + assertThat(userOptions.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION)).isEqualTo("zstd"); + assertThat(userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE)) + .isEqualTo(200L); + assertThat(userOptions.get("unrelated-table-option")).isEqualTo("table-value"); + assertThat(userOptions.get("procedure-only")).isEqualTo("procedure-value"); + } + @Test void testGroupFilesIntoShardsByPartitionSingleFileInSingleShard() { // Create a partition