From 261f7ceff85ba2226af0b2c419eee3bf580a8e56 Mon Sep 17 00:00:00 2001 From: wzx140 Date: Thu, 11 Jun 2026 19:43:00 +0800 Subject: [PATCH 1/5] [lake/paimon] Support custom Paimon lake table path --- .../apache/fluss/config/ConfigOptions.java | 14 ++ .../apache/fluss/config/FlussConfigUtils.java | 2 + .../apache/fluss/metadata/LakeTableUtil.java | 101 +++++++++ .../org/apache/fluss/metadata/TableInfo.java | 16 ++ .../fluss/flink/catalog/FlinkCatalog.java | 28 ++- .../fluss/flink/lake/LakeTableFactory.java | 37 +++- .../fluss/flink/utils/LakeSourceUtils.java | 4 +- .../fluss/flink/catalog/FlinkCatalogTest.java | 40 ++++ fluss-lake/fluss-lake-paimon/pom.xml | 54 ++++- .../paimon/tiering/PaimonLakeCommitter.java | 6 +- .../lake/paimon/tiering/PaimonLakeWriter.java | 3 +- .../lake/paimon/utils/PaimonConversions.java | 1 - .../paimon/LakeEnabledTableCreateITCase.java | 108 ++++++++++ ...abledTableCreateWithHiveCatalogITCase.java | 198 ++++++++++++++++++ .../FlinkUnionReadPrimaryKeyTableITCase.java | 40 ++++ .../paimon/tiering/PaimonTieringITCase.java | 26 +++ .../CoordinatorEventProcessor.java | 22 ++ .../coordinator/CoordinatorService.java | 5 +- .../server/coordinator/MetadataManager.java | 16 +- .../utils/TableDescriptorValidation.java | 53 +++++ .../fluss/server/zk/ZooKeeperClient.java | 10 + .../server/zk/data/lake/LakeTableHelper.java | 18 ++ .../CoordinatorEventProcessorTest.java | 66 ++++++ .../zk/data/lake/LakeTableHelperTest.java | 29 +++ .../org/apache/fluss/spark/SparkTable.scala | 6 +- pom.xml | 6 + 26 files changed, 876 insertions(+), 33 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/metadata/LakeTableUtil.java create mode 100644 fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index fafcc2905a..b2ed84b283 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1646,6 +1646,20 @@ public class ConfigOptions { + "The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. " + "If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster."); + public static final ConfigOption TABLE_DATALAKE_DATABASE_NAME = + key("table.datalake.database-name") + .stringType() + .noDefaultValue() + .withDescription( + "The database name of the datalake table. If not set, it will be the same as the Fluss database."); + + public static final ConfigOption TABLE_DATALAKE_TABLE_NAME = + key("table.datalake.table-name") + .stringType() + .noDefaultValue() + .withDescription( + "The table name of the datalake table. If not set, it will be the same as the Fluss table."); + public static final ConfigOption TABLE_DATALAKE_FRESHNESS = key("table.datalake.freshness") .durationType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 237acf5287..1cabf2dc1c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -47,6 +47,8 @@ public class FlussConfigUtils { ALTERABLE_TABLE_OPTIONS = Arrays.asList( ConfigOptions.TABLE_DATALAKE_ENABLED.key(), + ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/LakeTableUtil.java b/fluss-common/src/main/java/org/apache/fluss/metadata/LakeTableUtil.java new file mode 100644 index 0000000000..b4602ad9ad --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/LakeTableUtil.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metadata; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; + +import java.util.Map; +import java.util.Optional; + +/** Utility methods for resolving external lake table metadata. */ +@Internal +public final class LakeTableUtil { + + private LakeTableUtil() {} + + /** Returns the table path used to access the external datalake table. */ + public static TablePath getLakeTablePath( + TablePath flussTablePath, Map tableProperties) { + return getLakeTablePath(flussTablePath, Configuration.fromMap(tableProperties)); + } + + /** Returns the table path used to access the external datalake table. */ + public static TablePath getLakeTablePath(TablePath flussTablePath, Configuration tableConf) { + String lakeDatabaseName = + getDataLakeDatabaseName(tableConf).orElse(flussTablePath.getDatabaseName()); + String lakeTableName = + getDataLakeTableName(tableConf).orElse(flussTablePath.getTableName()); + return TablePath.of(lakeDatabaseName, lakeTableName); + } + + /** Returns whether the table has explicit custom datalake path options. */ + public static boolean hasCustomLakePath(Map tableProperties) { + return hasCustomLakePath(Configuration.fromMap(tableProperties)); + } + + /** Returns whether the table has explicit custom datalake path options. */ + public static boolean hasCustomLakePath(Configuration tableConf) { + return getDataLakeDatabaseName(tableConf).isPresent() + || getDataLakeTableName(tableConf).isPresent(); + } + + /** Returns the lake table name with the metadata table suffix from the requested table name. */ + public static String getLakeTableName(String lakeTableName, String requestedTableName) { + if (lakeTableName == null) { + return requestedTableName; + } + + int metadataTableIndex = requestedTableName.indexOf('$'); + if (metadataTableIndex < 0) { + return lakeTableName; + } + + String metadataTableSuffix = requestedTableName.substring(metadataTableIndex); + if (lakeTableName.endsWith(metadataTableSuffix)) { + return lakeTableName; + } + return lakeTableName + metadataTableSuffix; + } + + /** + * Returns the lake table name with the metadata table suffix from a table name containing the + * given lake table splitter. + */ + public static String getLakeTableName( + String lakeTableName, String requestedTableName, String lakeTableSplitter) { + int splitterIndex = requestedTableName.indexOf(lakeTableSplitter); + if (splitterIndex < 0) { + return requestedTableName; + } + + String requestedLakeTableName = + requestedTableName.substring(0, splitterIndex) + + requestedTableName.substring(splitterIndex + lakeTableSplitter.length()); + return getLakeTableName(lakeTableName, requestedLakeTableName); + } + + private static Optional getDataLakeDatabaseName(Configuration tableConf) { + return tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_DATABASE_NAME); + } + + private static Optional getDataLakeTableName(Configuration tableConf) { + return tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_TABLE_NAME); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index 00f58b81f0..6dbb4c9d83 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -112,6 +112,22 @@ public TablePath getTablePath() { return tablePath; } + /** + * Returns the table path used to access the external datalake table. + * + *

The returned path is derived from the Fluss table path and table-level datalake path + * options. If no custom datalake path option is configured, this returns the same path as + * {@link #getTablePath()}. + */ + public TablePath getLakeTablePath() { + return LakeTableUtil.getLakeTablePath(tablePath, properties); + } + + /** Returns whether the table has explicit custom datalake path options. */ + public boolean hasCustomLakePath() { + return LakeTableUtil.hasCustomLakePath(properties); + } + /** * Returns the unique identifier for the table within the cluster. * diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java index 103bf311b4..68f324543a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java @@ -34,6 +34,7 @@ import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.TableChange; @@ -404,9 +405,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath) objectPath.getDatabaseName(), tableName.split("\\" + LAKE_TABLE_SPLITTER)[0]))); } + TablePath lakeTablePath = tableInfo.getLakeTablePath(); return getLakeTable( - objectPath.getDatabaseName(), + lakeTablePath, tableName, tableInfo.getProperties(), getLakeCatalogProperties()); @@ -455,23 +457,19 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } protected CatalogBaseTable getLakeTable( - String databaseName, - String tableName, + TablePath lakeTablePath, + String requestedTableName, Configuration properties, Map lakeCatalogProperties) throws TableNotExistException, CatalogException { - String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); - if (tableComponents.length == 1) { - // should be pattern like table_name$lake - tableName = tableComponents[0]; - } else { - // pattern is table_name$lake$snapshots - // Need to reconstruct: table_name + $snapshots - tableName = String.join("", tableComponents); - } - return lakeFlinkCatalog - .getLakeCatalog(properties, lakeCatalogProperties) - .getTable(new ObjectPath(databaseName, tableName)); + String lakeObjectName = + LakeTableUtil.getLakeTableName( + lakeTablePath.getTableName(), requestedTableName, LAKE_TABLE_SPLITTER); + CatalogBaseTable lakeTable = + lakeFlinkCatalog + .getLakeCatalog(properties, lakeCatalogProperties) + .getTable(new ObjectPath(lakeTablePath.getDatabaseName(), lakeObjectName)); + return lakeTable; } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java index ad9918f389..4c410f7bb8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java @@ -17,7 +17,9 @@ package org.apache.fluss.flink.lake; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -26,9 +28,17 @@ import org.apache.flink.table.factories.FactoryUtil; import java.util.Collections; +import java.util.Map; /** A factory to create {@link DynamicTableSource} for lake table. */ public class LakeTableFactory { + + private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String FLUSS_TABLE_DATALAKE_DATABASE_NAME = + FLUSS_CONF_PREFIX + ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(); + private static final String FLUSS_TABLE_DATALAKE_TABLE_NAME = + FLUSS_CONF_PREFIX + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(); + private final LakeFlinkCatalog lakeFlinkCatalog; public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) { @@ -36,13 +46,12 @@ public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) { } public DynamicTableSource createDynamicTableSource( - DynamicTableFactory.Context context, String tableName) { - ObjectIdentifier originIdentifier = context.getObjectIdentifier(); + DynamicTableFactory.Context context, String requestedTableName) { ObjectIdentifier lakeIdentifier = - ObjectIdentifier.of( - originIdentifier.getCatalogName(), - originIdentifier.getDatabaseName(), - tableName); + toLakeIdentifier( + context.getObjectIdentifier(), + context.getCatalogTable().getOptions(), + requestedTableName); // For Iceberg and Paimon, pass the table name as-is to their factory. // Metadata tables will be handled internally by their respective factories. @@ -60,6 +69,22 @@ public DynamicTableSource createDynamicTableSource( return factory.createDynamicTableSource(newContext); } + static ObjectIdentifier toLakeIdentifier( + ObjectIdentifier originIdentifier, + Map lakeTableOptions, + String requestedTableName) { + String lakeDatabaseName = + lakeTableOptions.getOrDefault( + FLUSS_TABLE_DATALAKE_DATABASE_NAME, originIdentifier.getDatabaseName()); + String lakeObjectName = + LakeTableUtil.getLakeTableName( + lakeTableOptions.getOrDefault( + FLUSS_TABLE_DATALAKE_TABLE_NAME, requestedTableName), + requestedTableName); + return ObjectIdentifier.of( + originIdentifier.getCatalogName(), lakeDatabaseName, lakeObjectName); + } + private DynamicTableSourceFactory getLakeTableFactory() { switch (lakeFlinkCatalog.getLakeFormat()) { case PAIMON: diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java index fcc8db29cd..bd08ed6b52 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java @@ -24,6 +24,7 @@ import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.fluss.metadata.TablePath; import org.slf4j.Logger; @@ -67,8 +68,9 @@ public static LakeSource createLakeSource( dataLake, dataLake.toLowerCase())); } LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig); + TablePath lakeTablePath = LakeTableUtil.getLakeTablePath(tablePath, properties); try { - return (LakeSource) lakeStorage.createLakeSource(tablePath); + return (LakeSource) lakeStorage.createLakeSource(lakeTablePath); } catch (UnsupportedOperationException e) { throw new UnsupportedOperationException( String.format( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java index 8354fbf20c..16c45c8580 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter; import org.apache.fluss.flink.lake.LakeFlinkCatalog; import org.apache.fluss.flink.utils.FlinkConversionsTest; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.ExceptionUtils; @@ -74,8 +75,10 @@ import java.util.Optional; import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_DATABASE_NAME; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_TABLE_NAME; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY; import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; @@ -344,6 +347,43 @@ void testCreateAlreadyExistsLakeTable() throws Exception { catalog.createTable(lakeTablePath, table, false); } + @Test + void testGetLakeTableWithCustomLakePath() throws Exception { + String flussTableName = "fluss_table"; + TablePath lakeTablePath = TablePath.of("custom_lake_db", "custom_lake_table"); + + Map options = new HashMap<>(); + options.put(TABLE_DATALAKE_ENABLED.key(), "true"); + options.put(TABLE_DATALAKE_FORMAT.key(), PAIMON.name()); + options.put(TABLE_DATALAKE_DATABASE_NAME.key(), lakeTablePath.getDatabaseName()); + options.put(TABLE_DATALAKE_TABLE_NAME.key(), lakeTablePath.getTableName()); + + ObjectPath flussTablePath = new ObjectPath(DEFAULT_DB, flussTableName); + catalog.createTable(flussTablePath, newCatalogTable(options), false); + + CatalogTable lakeTable = newCatalogTable(Collections.emptyMap()); + mockLakeCatalog.catalog.createDatabase( + lakeTablePath.getDatabaseName(), + new CatalogDatabaseImpl(Collections.emptyMap(), null), + true); + mockLakeCatalog.registerLakeTable( + new ObjectPath(lakeTablePath.getDatabaseName(), lakeTablePath.getTableName()), + lakeTable); + CatalogBaseTable gottenLakeTable = + catalog.getTable(new ObjectPath(DEFAULT_DB, flussTableName + "$lake")); + assertThat(gottenLakeTable).isEqualTo(lakeTable); + + CatalogTable snapshotsTable = newCatalogTable(Collections.emptyMap()); + mockLakeCatalog.registerLakeTable( + new ObjectPath( + lakeTablePath.getDatabaseName(), + lakeTablePath.getTableName() + "$snapshots"), + snapshotsTable); + CatalogBaseTable gottenSnapshotsTable = + catalog.getTable(new ObjectPath(DEFAULT_DB, flussTableName + "$lake$snapshots")); + assertThat(gottenSnapshotsTable).isEqualTo(snapshotsTable); + } + @Test void testCreateTableWithBucket() throws Exception { // for pk table; diff --git a/fluss-lake/fluss-lake-paimon/pom.xml b/fluss-lake/fluss-lake-paimon/pom.xml index a1e6d1abca..4f904f5777 100644 --- a/fluss-lake/fluss-lake-paimon/pom.xml +++ b/fluss-lake/fluss-lake-paimon/pom.xml @@ -184,6 +184,58 @@ test + + org.apache.hive + hive-exec + ${hive.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.datanucleus + datanucleus-api-jdo + ${datanucleus-api-jdo.version} + test + + + + org.datanucleus + datanucleus-core + ${datanucleus-core.version} + test + + + + org.datanucleus + datanucleus-rdbms + ${datanucleus-rdbms.version} + test + + + + org.datanucleus + javax.jdo + ${javax-jdo.version} + test + + + + org.apache.derby + derby + ${derby.version} + test + + org.apache.fluss fluss-common @@ -237,4 +289,4 @@ - \ No newline at end of file + diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java index 339ec2b2b5..e223451a7b 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java @@ -64,6 +64,7 @@ public class PaimonLakeCommitter implements LakeCommitter snapshot = paimonCatalog.loadSnapshot(identifier); if (!snapshot.isPresent()) { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 5c2738e233..83109d4f83 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -50,9 +50,10 @@ public PaimonLakeWriter( PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) throws IOException { this.paimonCatalog = paimonCatalogProvider.get(); + TablePath lakeTablePath = writerInitContext.tableInfo().getLakeTablePath(); FileStoreTable fileStoreTable = getTable( - writerInitContext.tablePath(), + lakeTablePath, writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()); List partitionKeys = fileStoreTable.partitionKeys(); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 94166e2d05..87ce7b0439 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -69,7 +69,6 @@ public class PaimonConversions { static { PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key()); PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key()); - PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key()); PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 0efa057b89..a4a3051285 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -23,6 +23,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; @@ -30,6 +31,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -919,6 +921,112 @@ void testEnableLakeTableAfterAlterTableProperties() throws Exception { admin.alterTable(tablePath, Collections.singletonList(enableLake), false).get(); } + @Test + void testAlterLakePathOptionsValidation() throws Exception { + TablePath lakeDisabledTablePath = TablePath.of(DATABASE, "lake_path_disabled"); + Map disabledProperties = new HashMap<>(); + disabledProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"); + disabledProperties.put(ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), "lake_db"); + TableDescriptor disabledTableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .properties(disabledProperties) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + admin.createTable(lakeDisabledTablePath, disabledTableDescriptor, false).get(); + + admin.alterTable( + lakeDisabledTablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), + "lake_table")), + false) + .get(); + + TableInfo disabledTable = admin.getTableInfo(lakeDisabledTablePath).get(); + assertThat(disabledTable.getProperties().toMap()) + .containsEntry(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "lake_table"); + assertThatThrownBy( + () -> + paimonCatalog.getTable( + Identifier.create( + DATABASE, lakeDisabledTablePath.getTableName()))) + .isInstanceOf(Catalog.TableNotExistException.class); + + admin.alterTable( + lakeDisabledTablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")), + false) + .get(); + Table paimonTable = paimonCatalog.getTable(Identifier.create("lake_db", "lake_table")); + Map enabledDisabledTableProperties = new HashMap<>(disabledProperties); + enabledDisabledTableProperties.put( + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "lake_table"); + enabledDisabledTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + verifyPaimonTable( + paimonTable, + disabledTableDescriptor.withProperties(enabledDisabledTableProperties), + RowType.of( + new DataType[] { + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.STRING(), + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.BIGINT(), + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + }, + new String[] { + "c1", + "c2", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME + }), + "c1,c2", + BUCKET_NUM); + + TablePath lakeEnabledTablePath = TablePath.of(DATABASE, "lake_path_enabled"); + Map enabledProperties = new HashMap<>(); + enabledProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + enabledProperties.put(ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), "lake_db"); + enabledProperties.put(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "lake_table_enabled"); + TableDescriptor enabledTableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .properties(enabledProperties) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + admin.createTable(lakeEnabledTablePath, enabledTableDescriptor, false).get(); + paimonCatalog.getTable(Identifier.create("lake_db", "lake_table_enabled")); + + assertThatThrownBy( + () -> + admin.alterTable( + lakeEnabledTablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions + .TABLE_DATALAKE_TABLE_NAME + .key(), + "another_lake_table")), + false) + .get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("cannot be altered for datalake enabled tables") + .hasMessageContaining(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key()); + } + @Test void testAlterLakeEnabledTableSchema() throws Exception { // create table diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java new file mode 100644 index 0000000000..bbf0881e7d --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableChange; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.DataTypes; + +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.net.URI; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for create lake enabled table with Paimon Hive catalog. */ +class LakeEnabledTableCreateWithHiveCatalogITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(3) + .setClusterConf(initConfig()) + .build(); + + private static final String DATABASE = "fluss"; + private static final int BUCKET_NUM = 3; + + private static Catalog paimonCatalog; + private static String warehousePath; + private static String customTablePath; + + private Connection conn; + private Admin admin; + + @BeforeEach + protected void setup() { + conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + admin = conn.getAdmin(); + } + + @AfterEach + protected void teardown() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } + } + + private static Configuration initConfig() { + Configuration conf = new Configuration(); + conf.setString("datalake.format", "paimon"); + conf.setString("datalake.paimon.metastore", "hive"); + conf.setString("datalake.paimon.cache-enabled", "false"); + try { + java.nio.file.Path baseDir = Files.createTempDirectory("fluss-testing-hms-paimon"); + warehousePath = baseDir.resolve("warehouse").toString(); + customTablePath = baseDir.resolve("custom_lake_table_path").toUri().toString(); + conf.setString( + "datalake.paimon.javax.jdo.option.ConnectionURL", + "jdbc:derby:memory:" + baseDir.resolve("metastore_db") + ";create=true"); + } catch (Exception e) { + throw new FlussRuntimeException("Failed to create hive catalog test path", e); + } + conf.setString("datalake.paimon.warehouse", warehousePath); + conf.setString( + "datalake.paimon.javax.jdo.option.ConnectionDriverName", + "org.apache.derby.jdbc.EmbeddedDriver"); + conf.setString("datalake.paimon.datanucleus.schema.autoCreateAll", "true"); + conf.setString("datalake.paimon.hive.metastore.schema.verification", "false"); + + paimonCatalog = + CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))); + + return conf; + } + + @Test + void testAlterPaimonPathRequiresLakePathOptions() throws Exception { + TablePath tablePath = TablePath.of(DATABASE, "lake_paimon_path_without_mapping"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + "paimon.path", customTablePath)), + false) + .get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "'paimon.path' can only be altered together with lake table path options"); + + List paimonPathWithLakePath = + Arrays.asList( + TableChange.set("paimon.path", customTablePath), + TableChange.set( + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table")); + admin.alterTable(tablePath, paimonPathWithLakePath, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getProperties().toMap()) + .containsEntry(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table"); + assertThat(tableInfo.toTableDescriptor().getCustomProperties()) + .containsEntry("paimon.path", customTablePath); + assertThatThrownBy( + () -> paimonCatalog.getTable(Identifier.create(DATABASE, "hms_lake_table"))) + .isInstanceOf(Catalog.TableNotExistException.class); + + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")), + false) + .get(); + + tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getProperties().toMap()) + .containsEntry(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .containsEntry(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table"); + assertThat(tableInfo.toTableDescriptor().getCustomProperties()) + .containsEntry("paimon.path", customTablePath); + + Table paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE, "hms_lake_table")); + assertThat(paimonTable.name()).isEqualTo("hms_lake_table"); + assertThat( + URI.create( + ((AbstractCatalog) paimonCatalog) + .getTableLocation( + Identifier.create( + DATABASE, "hms_lake_table")) + .toString()) + .getPath()) + .isEqualTo(URI.create(customTablePath).getPath()); + assertThat(URI.create(paimonTable.options().get("path")).getPath()) + .isEqualTo(URI.create(customTablePath).getPath()); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 9837cbcd34..dc558b5097 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -45,6 +45,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; +import org.apache.paimon.catalog.Identifier; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -917,6 +918,45 @@ void testUnionReadPrimaryKeyTableFailover(boolean isPartitioned) throws Exceptio jobClient.cancel().get(); } + @Test + void testUnionReadWithCustomLakeTablePath() throws Exception { + String tableName = "pk_table_custom_lake_mapping"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + TablePath lakeTablePath = TablePath.of("custom_db", "pk_table_custom_lake_mapping_target"); + Map tableProperties = new HashMap<>(); + tableProperties.put( + ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), lakeTablePath.getDatabaseName()); + tableProperties.put( + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), lakeTablePath.getTableName()); + + long tableId = createPkTable(tablePath, tableProperties, Collections.emptyMap()); + TableBucket tableBucket = new TableBucket(tableId, 0); + + List initialRows = Arrays.asList(row(1, "v1"), row(2, "v2")); + writeRows(tablePath, initialRows, false); + + JobClient jobClient = buildTieringJob(execEnv); + try { + assertReplicaStatus(tableBucket, 2); + paimonCatalog.getTable( + Identifier.create( + lakeTablePath.getDatabaseName(), lakeTablePath.getTableName())); + + List lakeRows = + toSortedRows(batchTEnv.executeSql("select a, b from " + tableName + "$lake")); + assertThat(lakeRows.toString().replace("+U", "+I")).isEqualTo("[+I[1, v1], +I[2, v2]]"); + + writeRows(tablePath, Collections.singletonList(row(3, "v3")), false); + + List unionRows = + toSortedRows(batchTEnv.executeSql("select * from " + tableName)); + assertThat(unionRows.toString().replace("+U", "+I")) + .isEqualTo("[+I[1, v1], +I[2, v2], +I[3, v3]]"); + } finally { + jobClient.cancel().get(); + } + } + @Test void testUnionReadWithAddColumn() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "unionReadAddColumnPKTable"); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 3d4da4fe50..5b0eb5dfe6 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -188,6 +188,32 @@ void testTiering() throws Exception { } } + @Test + void testTieringWithCustomLakeTablePath() throws Exception { + TablePath flussTablePath = TablePath.of(DEFAULT_DB, "customLakePathPkTable"); + TablePath lakeTablePath = TablePath.of("custom_db", "custom_lake_path_target"); + Map tableProperties = new HashMap<>(); + tableProperties.put( + ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), lakeTablePath.getDatabaseName()); + tableProperties.put( + ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), lakeTablePath.getTableName()); + long tableId = createPkTable(flussTablePath, tableProperties, Collections.emptyMap()); + TableBucket tableBucket = new TableBucket(tableId, 0); + + List rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + writeRows(flussTablePath, rows, false); + triggerAndWaitSnapshot(tableId, 1); + + JobClient jobClient = buildTieringJob(execEnv); + try { + assertReplicaStatus(tableBucket, 3); + checkDataInPaimonPrimaryKeyTable(lakeTablePath, rows); + checkFlussOffsetsInSnapshot(lakeTablePath, Collections.singletonMap(tableBucket, 3L)); + } finally { + jobClient.cancel().get(); + } + } + private static Stream tieringAllTypesWriteArgs() { return Stream.of(Arguments.of(true), Arguments.of(false)); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 6c68a55835..ebac8efe0b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -831,6 +831,12 @@ private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo newTable boolean toDisableDataLake = oldTableInfo.getTableConfig().isDataLakeEnabled() && !newTableInfo.getTableConfig().isDataLakeEnabled(); + boolean lakeTablePathChanged = + !Objects.equals(oldTableInfo.getLakeTablePath(), newTableInfo.getLakeTablePath()); + + if (lakeTablePathChanged) { + clearLakeTableProgress(oldTableInfo, newTableInfo); + } if (toEnableDataLake) { // if the table is lake table, we need to add it to lake table tiering manager @@ -887,6 +893,22 @@ private void triggerReElectionForTable(long tableId) { } } + private void clearLakeTableProgress(TableInfo oldTableInfo, TableInfo newTableInfo) { + try { + lakeTableHelper.clearLakeTableProgress(newTableInfo.getTableId()); + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to clear lake table progress for table " + + newTableInfo.getTablePath() + + " after lake table path changed from " + + oldTableInfo.getLakeTablePath() + + " to " + + newTableInfo.getLakeTablePath() + + ".", + e); + } + } + private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { long partitionId = createPartitionEvent.getPartitionId(); // skip the partition if it already exists diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 8c122cb785..3e341d9ff6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -47,6 +47,7 @@ import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DeleteBehavior; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; @@ -485,10 +486,12 @@ public CompletableFuture createTable(CreateTableRequest req // before create table in fluss, we may create in lake if (isDataLakeEnabled(tableDescriptor)) { + TablePath lakeTablePath = + LakeTableUtil.getLakeTablePath(tablePath, tableDescriptor.getProperties()); try { checkNotNull(lakeCatalogContainer.getLakeCatalog()) .createTable( - tablePath, + lakeTablePath, tableDescriptor, new DefaultLakeCatalogContext( true, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 106d4ef66f..5658e3cc04 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -39,6 +39,7 @@ import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; import org.apache.fluss.metadata.DatabaseSummary; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaInfo; @@ -492,7 +493,7 @@ private void syncSchemaChangesToLake( } try { - lakeCatalog.alterTable(tablePath, schemaChanges, lakeCatalogContext); + lakeCatalog.alterTable(tableInfo.getLakeTablePath(), schemaChanges, lakeCatalogContext); } catch (TableNotExistException e) { throw new FlussRuntimeException( "Lake table doesn't exist for lake-enabled table " @@ -517,7 +518,10 @@ public void alterTableProperties( TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo); // validate the changes - validateAlterTableProperties(tableInfo, tablePropertyChanges.tableKeysToChange()); + validateAlterTableProperties( + tableInfo, + tablePropertyChanges.tableKeysToChange(), + tablePropertyChanges.customKeysToChange()); TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); TableDescriptor newDescriptor = @@ -597,8 +601,10 @@ private void preAlterTableProperties( // to enable lake table if (!isDataLakeEnabled(tableDescriptor)) { // before create table in fluss, we may create in lake + TablePath lakeTablePath = + LakeTableUtil.getLakeTablePath(tablePath, newDescriptor.getProperties()); try { - lakeCatalog.createTable(tablePath, newDescriptor, lakeCatalogContext); + lakeCatalog.createTable(lakeTablePath, newDescriptor, lakeCatalogContext); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException(e.getMessage(), e); } @@ -614,8 +620,10 @@ private void preAlterTableProperties( && tableDescriptor .getProperties() .containsKey(ConfigOptions.TABLE_DATALAKE_ENABLED.key())) { + TablePath lakeTablePath = + LakeTableUtil.getLakeTablePath(tablePath, newDescriptor.getProperties()); try { - lakeCatalog.alterTable(tablePath, tableChanges, lakeCatalogContext); + lakeCatalog.alterTable(lakeTablePath, tableChanges, lakeCatalogContext); } catch (TableNotExistException e) { // only throw TableNotExistException if datalake is enabled if (isDataLakeEnabled(newDescriptor)) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 949afd8c67..84bc278a3b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -31,6 +31,7 @@ import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LakeTableUtil; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.Schema; @@ -68,6 +69,8 @@ /** Validator of {@link TableDescriptor}. */ public class TableDescriptorValidation { + private static final String PAIMON_PATH_KEY = "paimon.path"; + private static final Set SYSTEM_COLUMNS = Collections.unmodifiableSet( new LinkedHashSet<>( @@ -127,6 +130,7 @@ public static void validateTableDescriptor( checkSystemColumns(schema.getRowType()); validateStatisticsConfig(tableDescriptor); checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); + checkCustomLakePathSupported(tableConf, clusterDataLakeFormat); } private static void checkTableLakeFormatMatchesCluster( @@ -153,8 +157,32 @@ private static void checkTableLakeFormatMatchesCluster( } } + private static void checkCustomLakePathSupported( + Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { + if (!LakeTableUtil.hasCustomLakePath(tableConf)) { + return; + } + if (!tableConf.get(ConfigOptions.TABLE_DATALAKE_ENABLED)) { + return; + } + + DataLakeFormat dataLakeFormat = + tableConf + .getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT) + .orElse(clusterDataLakeFormat); + if (dataLakeFormat != DataLakeFormat.PAIMON) { + throw new InvalidConfigException( + "Custom lake table path is only supported for Paimon."); + } + } + public static void validateAlterTableProperties( TableInfo currentTable, Set tableKeysToChange) { + validateAlterTableProperties(currentTable, tableKeysToChange, Collections.emptySet()); + } + + public static void validateAlterTableProperties( + TableInfo currentTable, Set tableKeysToChange, Set customKeysToChange) { TableConfig currentConfig = currentTable.getTableConfig(); List unsupportedKeys = @@ -179,6 +207,26 @@ public static void validateAlterTableProperties( ConfigOptions.TABLE_KV_STANDBY_REPLICA_ENABLED.key())); } + List lakePathKeys = + tableKeysToChange.stream() + .filter(TableDescriptorValidation::isLakePathOption) + .collect(Collectors.toList()); + if (currentConfig.isDataLakeEnabled() && !lakePathKeys.isEmpty()) { + throw new InvalidAlterTableException( + String.format( + "The following options cannot be altered for datalake enabled tables: %s.", + lakePathKeys.stream() + .map(k -> "'" + k + "'") + .collect(Collectors.joining(", ")))); + } + + if (customKeysToChange.contains(PAIMON_PATH_KEY) && lakePathKeys.isEmpty()) { + throw new InvalidAlterTableException( + String.format( + "'%s' can only be altered together with lake table path options.", + PAIMON_PATH_KEY)); + } + if (!currentConfig.getDataLakeFormat().isPresent()) { List datalakeKeys = tableKeysToChange.stream() @@ -212,6 +260,11 @@ public static void validateAlterTableProperties( } } + private static boolean isLakePathOption(String key) { + return ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key().equals(key) + || ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key().equals(key); + } + private static void checkSystemColumns(RowType schema) { List fieldNames = schema.getFieldNames(); List unsupportedColumns = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index cb34a2f2ed..6c18b1d937 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1365,6 +1365,16 @@ public Optional getLakeTable(long tableId) throws Exception { return getOrEmpty(zkPath).map(LakeTableZNode::decode); } + /** Deletes the {@link LakeTable} for the given table ID if it exists. */ + public void deleteLakeTable(long tableId) throws Exception { + String zkPath = LakeTableZNode.path(tableId); + try { + zkClient.delete().forPath(zkPath); + } catch (KeeperException.NoNodeException ignored) { + // Ignore if the lake table progress has not been committed yet. + } + } + /** * Gets the {@link LakeTableSnapshot} for the given table ID. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java index 0e70d80767..3013d63f92 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java @@ -79,6 +79,24 @@ public void registerLakeTableSnapshotV2( registerLakeTableSnapshotV2(tableId, lakeSnapshotMetadata, null); } + /** Clears the committed lake table progress for the given table. */ + public void clearLakeTableProgress(long tableId) throws Exception { + Optional optLakeTable = zkClient.getLakeTable(tableId); + if (!optLakeTable.isPresent()) { + return; + } + + zkClient.deleteLakeTable(tableId); + LakeTable lakeTable = optLakeTable.get(); + List lakeSnapshotMetadatas = + lakeTable.getLakeSnapshotMetadatas(); + if (lakeSnapshotMetadatas != null) { + for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata : lakeSnapshotMetadatas) { + lakeSnapshotMetadata.discard(); + } + } + } + /** * Register a lake table snapshot and clean up old snapshots based on the table type. * diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 8f27cdd106..8f0af62fea 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -27,6 +27,7 @@ import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; @@ -82,11 +83,14 @@ import org.apache.fluss.server.zk.data.ZkData; import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode; +import org.apache.fluss.server.zk.data.lake.LakeTable; +import org.apache.fluss.server.zk.data.lake.LakeTableHelper; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.fluss.utils.json.TableBucketOffsets; import org.apache.fluss.utils.types.Tuple2; import org.junit.jupiter.api.AfterEach; @@ -1394,6 +1398,57 @@ void testAlterStandbyReplicaEnabledForLogTable() throws Exception { .hasMessageContaining("can only be altered on primary key tables"); } + @Test + void testAlterLakePathClearsProgressWhenDatalakeDisabled() throws Exception { + initCoordinatorChannel(); + TablePath tablePath = TablePath.of(defaultDatabase, "test_lake_path_progress_cleanup"); + String oldLakeDatabaseName = "lake_db_1"; + String oldLakeTableName = "lake_table_1"; + String lakeDatabaseName = "lake_db_2"; + String lakeTableName = "lake_table_2"; + TableDescriptor tableDescriptor = + tableDescriptorWithLakePath(false, oldLakeDatabaseName, oldLakeTableName); + TableAssignment tableAssignment = + generateAssignment( + 1, + REPLICATION_FACTOR, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }); + long tableId = + metadataManager.createTable( + tablePath, remoteDataDir, tableDescriptor, tableAssignment, false); + + LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, remoteDataDir); + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(tableId, 0), 100L); + FsPath offsetsPath = + lakeTableHelper.storeLakeTableOffsetsFile( + tablePath, new TableBucketOffsets(tableId, offsets)); + lakeTableHelper.registerLakeTableSnapshotV2( + tableId, + new LakeTable.LakeSnapshotMetadata(1L, offsetsPath, offsetsPath), + LakeCommitResult.KEEP_ALL_PREVIOUS); + assertThat(zookeeperClient.getLakeTable(tableId)).isPresent(); + + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setTableProperty( + ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), lakeDatabaseName); + builder.setTableProperty(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), lakeTableName); + metadataManager.alterTableProperties( + tablePath, Collections.emptyList(), builder.build(), false, null); + + retry( + Duration.ofMinutes(1), + () -> { + assertThat(metadataManager.getTable(tablePath).getLakeTablePath()) + .isEqualTo(TablePath.of(lakeDatabaseName, lakeTableName)); + assertThat(zookeeperClient.getLakeTable(tableId)).isNotPresent(); + }); + } + @Test void testDoBucketReassignment() throws Exception { zookeeperClient.registerTabletServer( @@ -1927,6 +1982,17 @@ private void alterTable(TablePath tablePath, List schemaChanges) { metadataManager.alterTableSchema(tablePath, schemaChanges, true, null); } + private TableDescriptor tableDescriptorWithLakePath( + boolean dataLakeEnabled, String lakeDatabaseName, String lakeTableName) { + Map properties = + new HashMap<>(CoordinatorEventProcessorTest.TEST_TABLE.getProperties()); + properties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), String.valueOf(dataLakeEnabled)); + properties.put(ConfigOptions.TABLE_DATALAKE_FORMAT.key(), "paimon"); + properties.put(ConfigOptions.TABLE_DATALAKE_DATABASE_NAME.key(), lakeDatabaseName); + properties.put(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), lakeTableName); + return CoordinatorEventProcessorTest.TEST_TABLE.withProperties(properties); + } + private TableDescriptor getPartitionedTable() { return TableDescriptor.builder() .schema( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java index e3b0f78694..2ec759dab4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java @@ -253,6 +253,35 @@ void testRegisterLakeTableSnapshotWithRetention(@TempDir Path tempDir) throws Ex .containsExactly(4L, 5L, 6L); } + @Test + void testClearLakeTableProgress(@TempDir Path tempDir) throws Exception { + LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, tempDir.toString()); + long tableId = 1L; + TablePath tablePath = TablePath.of("test_db", "clear_progress_test"); + zookeeperClient.registerTable(tablePath, createTableReg(tableId)); + + FsPath path1 = storeOffsetFile(lakeTableHelper, tablePath, tableId, 100L); + FsPath path2 = storeOffsetFile(lakeTableHelper, tablePath, tableId, 200L); + lakeTableHelper.registerLakeTableSnapshotV2( + tableId, + new LakeTable.LakeSnapshotMetadata(1L, path1, path1), + LakeCommitResult.KEEP_ALL_PREVIOUS); + lakeTableHelper.registerLakeTableSnapshotV2( + tableId, + new LakeTable.LakeSnapshotMetadata(2L, path2, path2), + LakeCommitResult.KEEP_ALL_PREVIOUS); + + assertThat(zookeeperClient.getLakeTable(tableId)).isPresent(); + assertThat(LocalFileSystem.getSharedInstance().exists(path1)).isTrue(); + assertThat(LocalFileSystem.getSharedInstance().exists(path2)).isTrue(); + + lakeTableHelper.clearLakeTableProgress(tableId); + + assertThat(zookeeperClient.getLakeTable(tableId)).isNotPresent(); + assertThat(LocalFileSystem.getSharedInstance().exists(path1)).isFalse(); + assertThat(LocalFileSystem.getSharedInstance().exists(path2)).isFalse(); + } + /** Helper to store offset files and return the FsPath. */ private FsPath storeOffsetFile( LakeTableHelper helper, TablePath path, long tableId, long offset) throws Exception { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index 0a9d9663f6..1e9168ac82 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -19,7 +19,7 @@ package org.apache.fluss.spark import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{Configuration => FlussConfiguration} -import org.apache.fluss.metadata.{TableInfo, TablePath} +import org.apache.fluss.metadata.{LakeTableUtil, TableInfo, TablePath} import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussLakeUpsertScanBuilder, FlussUpsertScanBuilder} import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder} @@ -68,6 +68,10 @@ class SparkTable( flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE)) .toUpperCase val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString + if (isDataLakeEnabled && LakeTableUtil.hasCustomLakePath(tableInfo.getProperties)) { + throw new UnsupportedOperationException( + "Custom lake table path is not supported in Spark connector yet.") + } if (tableInfo.getPrimaryKeys.isEmpty) { if (isDataLakeEnabled && isFullMode) { new FlussLakeAppendScanBuilder(tablePath, tableInfo, options, flussConfig) diff --git a/pom.xml b/pom.xml index d39a91d9b3..d618702b43 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,12 @@ 1.3.1 1.10.1 1.1.0 + 2.3.10 + 4.2.4 + 4.1.17 + 4.1.19 + 3.2.0-m3 + 10.14.2.0 1.3.0 From 1014a03795d731d29de6c1e690fb125f330e9e85 Mon Sep 17 00:00:00 2001 From: wzx140 Date: Fri, 12 Jun 2026 12:02:49 +0800 Subject: [PATCH 2/5] [lake/paimon] Exclude commons-io from hive exec --- fluss-lake/fluss-lake-paimon/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fluss-lake/fluss-lake-paimon/pom.xml b/fluss-lake/fluss-lake-paimon/pom.xml index 4f904f5777..5b836ea3df 100644 --- a/fluss-lake/fluss-lake-paimon/pom.xml +++ b/fluss-lake/fluss-lake-paimon/pom.xml @@ -198,6 +198,10 @@ org.slf4j slf4j-log4j12 + + commons-io + commons-io + From 1368c8f750ab6e438106934105a14272e149768d Mon Sep 17 00:00:00 2001 From: wzx140 Date: Fri, 12 Jun 2026 14:35:45 +0800 Subject: [PATCH 3/5] [lake/paimon] Address custom lake path review comments --- ...LakeEnabledTableCreateWithHiveCatalogITCase.java | 9 +++++++++ .../org/apache/fluss/server/zk/ZooKeeperClient.java | 2 +- .../fluss/server/zk/data/lake/LakeTableHelper.java | 2 +- .../scala/org/apache/fluss/spark/SparkTable.scala | 13 +++++++++---- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java index bbf0881e7d..2f1998be5c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java @@ -39,6 +39,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -74,6 +75,14 @@ class LakeEnabledTableCreateWithHiveCatalogITCase { private Connection conn; private Admin admin; + @AfterAll + static void afterAll() throws Exception { + if (paimonCatalog != null) { + paimonCatalog.close(); + paimonCatalog = null; + } + } + @BeforeEach protected void setup() { conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 6c18b1d937..6433ecd19b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1369,7 +1369,7 @@ public Optional getLakeTable(long tableId) throws Exception { public void deleteLakeTable(long tableId) throws Exception { String zkPath = LakeTableZNode.path(tableId); try { - zkClient.delete().forPath(zkPath); + zkClient.delete().deletingChildrenIfNeeded().forPath(zkPath); } catch (KeeperException.NoNodeException ignored) { // Ignore if the lake table progress has not been committed yet. } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java index 3013d63f92..9fb1a960f1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java @@ -86,7 +86,6 @@ public void clearLakeTableProgress(long tableId) throws Exception { return; } - zkClient.deleteLakeTable(tableId); LakeTable lakeTable = optLakeTable.get(); List lakeSnapshotMetadatas = lakeTable.getLakeSnapshotMetadatas(); @@ -95,6 +94,7 @@ public void clearLakeTableProgress(long tableId) throws Exception { lakeSnapshotMetadata.discard(); } } + zkClient.deleteLakeTable(tableId); } /** diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index 1e9168ac82..c60fe2c63f 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -68,18 +68,23 @@ class SparkTable( flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE)) .toUpperCase val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString - if (isDataLakeEnabled && LakeTableUtil.hasCustomLakePath(tableInfo.getProperties)) { - throw new UnsupportedOperationException( - "Custom lake table path is not supported in Spark connector yet.") - } + val hasCustomLakePath = LakeTableUtil.hasCustomLakePath(tableInfo.getProperties) if (tableInfo.getPrimaryKeys.isEmpty) { if (isDataLakeEnabled && isFullMode) { + if (hasCustomLakePath) { + throw new UnsupportedOperationException( + "Custom lake table path is not supported for Spark lake reads yet.") + } new FlussLakeAppendScanBuilder(tablePath, tableInfo, options, flussConfig) } else { new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig) } } else { if (isDataLakeEnabled) { + if (hasCustomLakePath) { + throw new UnsupportedOperationException( + "Custom lake table path is not supported for Spark lake reads yet.") + } new FlussLakeUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) } else { new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) From 29616e28ec2000ffb4c2e26f37005099eb6ad737 Mon Sep 17 00:00:00 2001 From: wzx140 Date: Fri, 12 Jun 2026 15:16:12 +0800 Subject: [PATCH 4/5] [lake/paimon] Exclude Hive Janino test dependencies --- fluss-lake/fluss-lake-paimon/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fluss-lake/fluss-lake-paimon/pom.xml b/fluss-lake/fluss-lake-paimon/pom.xml index 5b836ea3df..909dca2dc3 100644 --- a/fluss-lake/fluss-lake-paimon/pom.xml +++ b/fluss-lake/fluss-lake-paimon/pom.xml @@ -202,6 +202,14 @@ commons-io commons-io + + org.codehaus.janino + commons-compiler + + + org.codehaus.janino + janino + From 1e54ae341f62a11bae5a541f67304e78897f886e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AD=90=E6=97=8B?= Date: Wed, 24 Jun 2026 20:24:30 +0800 Subject: [PATCH 5/5] fix --- fluss-lake/fluss-lake-paimon/pom.xml | 66 +----- .../lake/paimon/utils/PaimonConversions.java | 1 + ...abledTableCreateWithHiveCatalogITCase.java | 207 ------------------ .../server/coordinator/MetadataManager.java | 5 +- .../utils/TableDescriptorValidation.java | 14 -- pom.xml | 6 - 6 files changed, 3 insertions(+), 296 deletions(-) delete mode 100644 fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java diff --git a/fluss-lake/fluss-lake-paimon/pom.xml b/fluss-lake/fluss-lake-paimon/pom.xml index 909dca2dc3..a1e6d1abca 100644 --- a/fluss-lake/fluss-lake-paimon/pom.xml +++ b/fluss-lake/fluss-lake-paimon/pom.xml @@ -184,70 +184,6 @@ test - - org.apache.hive - hive-exec - ${hive.version} - test - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - commons-io - commons-io - - - org.codehaus.janino - commons-compiler - - - org.codehaus.janino - janino - - - - - - org.datanucleus - datanucleus-api-jdo - ${datanucleus-api-jdo.version} - test - - - - org.datanucleus - datanucleus-core - ${datanucleus-core.version} - test - - - - org.datanucleus - datanucleus-rdbms - ${datanucleus-rdbms.version} - test - - - - org.datanucleus - javax.jdo - ${javax-jdo.version} - test - - - - org.apache.derby - derby - ${derby.version} - test - - org.apache.fluss fluss-common @@ -301,4 +237,4 @@ - + \ No newline at end of file diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 87ce7b0439..94166e2d05 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -69,6 +69,7 @@ public class PaimonConversions { static { PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key()); PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key()); + PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key()); PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java deleted file mode 100644 index 2f1998be5c..0000000000 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateWithHiveCatalogITCase.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.lake.paimon; - -import org.apache.fluss.client.Connection; -import org.apache.fluss.client.ConnectionFactory; -import org.apache.fluss.client.admin.Admin; -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.FlussRuntimeException; -import org.apache.fluss.exception.InvalidAlterTableException; -import org.apache.fluss.metadata.Schema; -import org.apache.fluss.metadata.TableChange; -import org.apache.fluss.metadata.TableDescriptor; -import org.apache.fluss.metadata.TableInfo; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.server.testutils.FlussClusterExtension; -import org.apache.fluss.types.DataTypes; - -import org.apache.paimon.catalog.AbstractCatalog; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.net.URI; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** ITCase for create lake enabled table with Paimon Hive catalog. */ -class LakeEnabledTableCreateWithHiveCatalogITCase { - - @RegisterExtension - public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = - FlussClusterExtension.builder() - .setNumOfTabletServers(3) - .setClusterConf(initConfig()) - .build(); - - private static final String DATABASE = "fluss"; - private static final int BUCKET_NUM = 3; - - private static Catalog paimonCatalog; - private static String warehousePath; - private static String customTablePath; - - private Connection conn; - private Admin admin; - - @AfterAll - static void afterAll() throws Exception { - if (paimonCatalog != null) { - paimonCatalog.close(); - paimonCatalog = null; - } - } - - @BeforeEach - protected void setup() { - conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); - admin = conn.getAdmin(); - } - - @AfterEach - protected void teardown() throws Exception { - if (admin != null) { - admin.close(); - admin = null; - } - - if (conn != null) { - conn.close(); - conn = null; - } - } - - private static Configuration initConfig() { - Configuration conf = new Configuration(); - conf.setString("datalake.format", "paimon"); - conf.setString("datalake.paimon.metastore", "hive"); - conf.setString("datalake.paimon.cache-enabled", "false"); - try { - java.nio.file.Path baseDir = Files.createTempDirectory("fluss-testing-hms-paimon"); - warehousePath = baseDir.resolve("warehouse").toString(); - customTablePath = baseDir.resolve("custom_lake_table_path").toUri().toString(); - conf.setString( - "datalake.paimon.javax.jdo.option.ConnectionURL", - "jdbc:derby:memory:" + baseDir.resolve("metastore_db") + ";create=true"); - } catch (Exception e) { - throw new FlussRuntimeException("Failed to create hive catalog test path", e); - } - conf.setString("datalake.paimon.warehouse", warehousePath); - conf.setString( - "datalake.paimon.javax.jdo.option.ConnectionDriverName", - "org.apache.derby.jdbc.EmbeddedDriver"); - conf.setString("datalake.paimon.datanucleus.schema.autoCreateAll", "true"); - conf.setString("datalake.paimon.hive.metastore.schema.verification", "false"); - - paimonCatalog = - CatalogFactory.createCatalog( - CatalogContext.create(Options.fromMap(extractLakeProperties(conf)))); - - return conf; - } - - @Test - void testAlterPaimonPathRequiresLakePathOptions() throws Exception { - TablePath tablePath = TablePath.of(DATABASE, "lake_paimon_path_without_mapping"); - TableDescriptor tableDescriptor = - TableDescriptor.builder() - .schema( - Schema.newBuilder() - .column("c1", DataTypes.INT()) - .column("c2", DataTypes.STRING()) - .build()) - .distributedBy(BUCKET_NUM, "c1", "c2") - .build(); - admin.createTable(tablePath, tableDescriptor, false).get(); - - assertThatThrownBy( - () -> - admin.alterTable( - tablePath, - Collections.singletonList( - TableChange.set( - "paimon.path", customTablePath)), - false) - .get()) - .cause() - .isInstanceOf(InvalidAlterTableException.class) - .hasMessageContaining( - "'paimon.path' can only be altered together with lake table path options"); - - List paimonPathWithLakePath = - Arrays.asList( - TableChange.set("paimon.path", customTablePath), - TableChange.set( - ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table")); - admin.alterTable(tablePath, paimonPathWithLakePath, false).get(); - - TableInfo tableInfo = admin.getTableInfo(tablePath).get(); - assertThat(tableInfo.getProperties().toMap()) - .containsEntry(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table"); - assertThat(tableInfo.toTableDescriptor().getCustomProperties()) - .containsEntry("paimon.path", customTablePath); - assertThatThrownBy( - () -> paimonCatalog.getTable(Identifier.create(DATABASE, "hms_lake_table"))) - .isInstanceOf(Catalog.TableNotExistException.class); - - admin.alterTable( - tablePath, - Collections.singletonList( - TableChange.set( - ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")), - false) - .get(); - - tableInfo = admin.getTableInfo(tablePath).get(); - assertThat(tableInfo.getProperties().toMap()) - .containsEntry(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .containsEntry(ConfigOptions.TABLE_DATALAKE_TABLE_NAME.key(), "hms_lake_table"); - assertThat(tableInfo.toTableDescriptor().getCustomProperties()) - .containsEntry("paimon.path", customTablePath); - - Table paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE, "hms_lake_table")); - assertThat(paimonTable.name()).isEqualTo("hms_lake_table"); - assertThat( - URI.create( - ((AbstractCatalog) paimonCatalog) - .getTableLocation( - Identifier.create( - DATABASE, "hms_lake_table")) - .toString()) - .getPath()) - .isEqualTo(URI.create(customTablePath).getPath()); - assertThat(URI.create(paimonTable.options().get("path")).getPath()) - .isEqualTo(URI.create(customTablePath).getPath()); - } -} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 5658e3cc04..83c8e6cf0f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -518,10 +518,7 @@ public void alterTableProperties( TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo); // validate the changes - validateAlterTableProperties( - tableInfo, - tablePropertyChanges.tableKeysToChange(), - tablePropertyChanges.customKeysToChange()); + validateAlterTableProperties(tableInfo, tablePropertyChanges.tableKeysToChange()); TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); TableDescriptor newDescriptor = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 84bc278a3b..f7f57a1196 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -69,8 +69,6 @@ /** Validator of {@link TableDescriptor}. */ public class TableDescriptorValidation { - private static final String PAIMON_PATH_KEY = "paimon.path"; - private static final Set SYSTEM_COLUMNS = Collections.unmodifiableSet( new LinkedHashSet<>( @@ -178,11 +176,6 @@ private static void checkCustomLakePathSupported( public static void validateAlterTableProperties( TableInfo currentTable, Set tableKeysToChange) { - validateAlterTableProperties(currentTable, tableKeysToChange, Collections.emptySet()); - } - - public static void validateAlterTableProperties( - TableInfo currentTable, Set tableKeysToChange, Set customKeysToChange) { TableConfig currentConfig = currentTable.getTableConfig(); List unsupportedKeys = @@ -220,13 +213,6 @@ public static void validateAlterTableProperties( .collect(Collectors.joining(", ")))); } - if (customKeysToChange.contains(PAIMON_PATH_KEY) && lakePathKeys.isEmpty()) { - throw new InvalidAlterTableException( - String.format( - "'%s' can only be altered together with lake table path options.", - PAIMON_PATH_KEY)); - } - if (!currentConfig.getDataLakeFormat().isPresent()) { List datalakeKeys = tableKeysToChange.stream() diff --git a/pom.xml b/pom.xml index d618702b43..d39a91d9b3 100644 --- a/pom.xml +++ b/pom.xml @@ -105,12 +105,6 @@ 1.3.1 1.10.1 1.1.0 - 2.3.10 - 4.2.4 - 4.1.17 - 4.1.19 - 3.2.0-m3 - 10.14.2.0 1.3.0