From b2feda0299b5831116e7c3748853556672db4710 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Wed, 18 Mar 2026 10:50:49 -0700 Subject: [PATCH 1/4] [core] Support custom table paths in JdbcCatalog Enable users to specify custom storage locations for individual tables via CoreOptions.PATH, replicating the pattern already used by HiveCatalog. The catalog-level warehouse becomes a default, but individual tables can live at arbitrary paths persisted in paimon_table_properties. Key changes: - JdbcUtils: add getTableProperty() to fetch a single table property - JdbcCatalog: override allowCustomTablePath(), getTableLocation() - createTableImpl: use initialTableLocation(), store path for custom tables - dropTableImpl: skip filesystem deletion for custom-path tables - renameTableImpl: skip filesystem rename for custom-path tables - alterTableImpl: preserve custom path across property refresh - dropDatabaseImpl/repairTable: always manage table properties Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 193 ++++++++++++---- .../org/apache/paimon/jdbc/JdbcUtils.java | 51 +++++ .../apache/paimon/jdbc/JdbcCatalogTest.java | 212 ++++++++++++++++++ 3 files changed, 407 insertions(+), 49 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..bb603ae432ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -232,14 +232,8 @@ protected void dropDatabaseImpl(String name) { execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name); // Delete properties from paimon_database_properties execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogKey, name); - // Delete table properties from paimon_table_properties - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, - catalogKey, - name); - } + // Always delete table properties (needed for custom path entries) + execute(connections, JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, catalogKey, name); } @Override @@ -293,6 +287,10 @@ protected List listTablesImpl(String databaseName) { @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { + // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata + Path path = getTableLocation(identifier); + boolean customPath = isCustomTablePath(identifier); + int deletedRecords = execute( connections, @@ -305,15 +303,20 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { LOG.info("Skipping drop, table does not exist: {}", identifier); return; } - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + + // Always delete table properties (needed for custom path entries) + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + // If custom path: skip filesystem deletion (external table keeps its data) + if (customPath) { + return; } - Path path = getTableLocation(identifier); + try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -334,12 +337,16 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { try { - // create table file - SchemaManager schemaManager = getSchemaManager(identifier); + // Determine table location before table exists in JDBC + Path tableLocation = initialTableLocation(schema.options(), identifier); + boolean externalTable = schema.options().containsKey(CoreOptions.PATH.key()); + + // create table file using the determined location + SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); - // Update schema metadata - Path path = getTableLocation(identifier); + runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable)); + + // Insert table record into paimon_tables if (JdbcUtils.insertTable( connections, catalogKey, @@ -348,22 +355,40 @@ protected void createTableImpl(Identifier identifier, Schema schema) { LOG.debug("Successfully committed to new table: {}", identifier); } else { try { - fileIO.deleteDirectoryQuietly(path); + if (!externalTable) { + fileIO.deleteDirectoryQuietly(tableLocation); + } } catch (Exception ee) { - LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + LOG.error( + "Delete directory[{}] fail for table {}", + tableLocation, + identifier, + ee); } throw new RuntimeException( String.format( "Failed to create table %s in catalog %s", identifier.getFullName(), catalogKey)); } + + // Always store path property for custom-path tables + Map propsToStore = new HashMap<>(); + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } if (syncTableProperties()) { + Map tableProps = collectTableProperties(tableSchema); + // Avoid duplicate path entry + tableProps.remove(CoreOptions.PATH.key()); + propsToStore.putAll(tableProps); + } + if (!propsToStore.isEmpty()) { JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } catch (Exception e) { throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); @@ -373,24 +398,32 @@ protected void createTableImpl(Identifier identifier, Schema schema) { @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { + // Check custom path BEFORE renaming metadata + boolean customPath = isCustomTablePath(fromTable); + Path fromPath = getTableLocation(fromTable); + // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, - toTable.getDatabaseName(), - toTable.getObjectName(), - catalogKey, - fromTable.getDatabaseName(), - fromTable.getObjectName()); + + // Always update table properties (needed for custom path entries) + execute( + connections, + JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + catalogKey, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + + // If custom path: skip filesystem rename (data stays at same location) + if (customPath) { + return; } - Path fromPath = getTableLocation(fromTable); if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. - Path toPath = getTableLocation(toTable); + Path toPath = super.getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -414,6 +447,15 @@ protected void alterTableImpl(Identifier identifier, List changes) try { runWithLock(identifier, () -> schemaManager.commitChanges(changes)); if (syncTableProperties()) { + // Save custom path before DELETE_ALL so we can re-insert it + Optional customPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + TableSchema updatedSchema = schemaManager.latest().get(); execute( connections, @@ -421,12 +463,18 @@ protected void alterTableImpl(Identifier identifier, List changes) catalogKey, identifier.getDatabaseName(), identifier.getTableName()); - JdbcUtils.insertTableProperties( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - collectTableProperties(updatedSchema)); + + Map propsToStore = collectTableProperties(updatedSchema); + // Re-insert custom path if it was stored + customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p)); + if (!propsToStore.isEmpty()) { + JdbcUtils.insertTableProperties( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + propsToStore); + } } } catch (TableNotExistException | ColumnAlreadyExistException @@ -451,6 +499,23 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis () -> new RuntimeException("There is no paimon table in " + tableLocation)); } + @Override + protected boolean allowCustomTablePath() { + return true; + } + + @Override + public Path getTableLocation(Identifier identifier) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + return storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + } + @Override public boolean caseSensitive() { return false; @@ -550,20 +615,33 @@ public void repairTable(Identifier identifier) throws TableNotExistException { LOG.error("Failed to repair table: {}", identifier); } } + + // Always clean and re-insert table properties during repair + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + Map propsToStore = new HashMap<>(); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } if (syncTableProperties()) { - // Delete existing properties and reinsert from filesystem schema - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + Map tableProps = collectTableProperties(tableSchema); + tableProps.remove(CoreOptions.PATH.key()); + propsToStore.putAll(tableProps); + } + if (!propsToStore.isEmpty()) { JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } @@ -599,6 +677,23 @@ private Map collectTableProperties(TableSchema tableSchema) { return properties; } + private Path initialTableLocation(Map tableOptions, Identifier identifier) { + if (tableOptions.containsKey(CoreOptions.PATH.key())) { + return new Path(tableOptions.get(CoreOptions.PATH.key())); + } + return super.getTableLocation(identifier); + } + + private boolean isCustomTablePath(Identifier identifier) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()) + .isPresent(); + } + private SchemaManager getSchemaManager(Identifier identifier) { return new SchemaManager(fileIO, getTableLocation(identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..805aa43d85a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -32,6 +32,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; @@ -325,6 +326,21 @@ public class JdbcUtils { + TABLE_NAME + " = ? "; + static final String GET_TABLE_PROPERTY_SQL = + "SELECT " + + TABLE_PROPERTY_VALUE + + " FROM " + + TABLE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? AND " + + TABLE_PROPERTY_KEY + + " = ?"; + // Distributed locks table static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; static final String LOCK_ID = "lock_id"; @@ -554,6 +570,41 @@ public static boolean insertTableProperties( insertedRecords, properties.size())); } + @SuppressWarnings("checkstyle:NestedTryDepth") + public static Optional getTableProperty( + JdbcClientPool connections, + String storeKey, + String databaseName, + String tableName, + String propertyKey) { + try { + return connections.run( + conn -> { + try (PreparedStatement ps = conn.prepareStatement(GET_TABLE_PROPERTY_SQL)) { + ps.setString(1, storeKey); + ps.setString(2, databaseName); + ps.setString(3, tableName); + ps.setString(4, propertyKey); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return Optional.of(rs.getString(TABLE_PROPERTY_VALUE)); + } + } + } + return Optional.empty(); + }); + } catch (SQLException e) { + throw new RuntimeException( + String.format( + "Failed to get table property '%s' for %s.%s", + propertyKey, databaseName, tableName), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + private static String insertTablePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_TABLE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index fd3c6fdc5950..7e2ed08d10bd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -18,14 +18,17 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -625,4 +628,213 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + private Schema schemaWithCustomPath(String customPath) { + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customPath); + return new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + } + + @Test + public void testCreateTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_location/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "custom_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema exists at custom location + Path customPath = new Path(customDir); + SchemaManager sm = new SchemaManager(fileIO, customPath); + assertThat(sm.listAllIds()).isNotEmpty(); + + // Verify getTableLocation returns the custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(customPath); + + // Verify path is stored in JDBC table properties + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "custom_table"); + assertThat(storedProps).containsEntry(CoreOptions.PATH.key(), customPath.toString()); + + // Verify table is loadable + assertDoesNotThrow(() -> jdbcCatalog.getTable(identifier)); + } + + @Test + public void testCreateTableWithCustomPathSyncDisabled() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_nosync/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "nosync_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Path should still be stored even when sync is disabled + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "nosync_table"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Other properties should NOT be stored (sync disabled) + assertThat(storedProps).hasSize(1); + } + + @Test + public void testDropTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_drop/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "drop_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify data exists at custom location + Path customPath = new Path(customDir); + assertThat(fileIO.exists(customPath)).isTrue(); + + // Drop the table + jdbcCatalog.dropTable(identifier, false); + + // Verify JDBC metadata is cleaned up + assertThatThrownBy(() -> jdbcCatalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "drop_custom"); + assertThat(storedProps).isEmpty(); + + // Verify data is NOT deleted (external table keeps its data) + assertThat(fileIO.exists(customPath)).isTrue(); + } + + @Test + public void testDropTableWithDefaultPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "drop_default"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + Path tablePath = jdbcCatalog.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + jdbcCatalog.dropTable(identifier, false); + + // Data SHOULD be deleted for default-path tables + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testRenameTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_rename/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier fromTable = Identifier.create("test_db", "rename_from"); + + jdbcCatalog.createTable(fromTable, schema, false); + + Identifier toTable = Identifier.create("test_db", "rename_to"); + jdbcCatalog.renameTable(fromTable, toTable, false); + + // Verify old table is gone + assertThatThrownBy(() -> jdbcCatalog.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify new table is accessible and still points to the custom path + assertThat(jdbcCatalog.getTableLocation(toTable)).isEqualTo(new Path(customDir)); + + // Verify the path property was moved + Map storedProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_to"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Verify old name has no properties + Map oldProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_from"); + assertThat(oldProps).isEmpty(); + } + + @Test + public void testAlterTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_alter/my_table"; + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customDir); + options.put("bucket", "4"); + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Lists.newArrayList("pk"), + options, + ""); + + Identifier identifier = Identifier.create("test_db", "alter_custom"); + jdbcCatalog.createTable(identifier, schema, false); + + // Alter: add a new option + jdbcCatalog.alterTable( + identifier, + Lists.newArrayList(SchemaChange.setOption("file.format", "orc")), + false); + + // Verify path is preserved after alter + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "alter_custom"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + assertThat(storedProps).containsEntry("file.format", "orc"); + assertThat(storedProps).containsEntry("bucket", "4"); + + // Verify getTableLocation still returns custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(new Path(customDir)); + } + + @Test + public void testLoadTableSchemaWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_load/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "load_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema loads from custom location + Table table = jdbcCatalog.getTable(identifier); + assertThat(table).isNotNull(); + assertThat(table.name()).isEqualTo("load_custom"); + } + + @Test + public void testGetTableLocationFallback() throws Exception { + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "default_table"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify getTableLocation returns default when no stored path + Path expected = new Path(new Path(warehouse, "test_db.db"), "default_table"); + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(expected); + } } From 5796d5c465d00625952d2419fc7e7d5f49d43f49 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Wed, 18 Mar 2026 11:09:18 -0700 Subject: [PATCH 2/4] [core] Gate custom table path behind syncTableProperties Custom table path support is now only available when syncTableProperties is enabled, preserving the existing default behavior when sync is off. Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 129 +++++++++--------- .../apache/paimon/jdbc/JdbcCatalogTest.java | 15 +- 2 files changed, 71 insertions(+), 73 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index bb603ae432ea..b7ad27ebfb50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -232,8 +232,14 @@ protected void dropDatabaseImpl(String name) { execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name); // Delete properties from paimon_database_properties execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogKey, name); - // Always delete table properties (needed for custom path entries) - execute(connections, JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, catalogKey, name); + // Delete table properties from paimon_table_properties + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, + catalogKey, + name); + } } @Override @@ -289,7 +295,7 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { try { // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata Path path = getTableLocation(identifier); - boolean customPath = isCustomTablePath(identifier); + boolean customPath = syncTableProperties() && isCustomTablePath(identifier); int deletedRecords = execute( @@ -303,14 +309,14 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { LOG.info("Skipping drop, table does not exist: {}", identifier); return; } - - // Always delete table properties (needed for custom path entries) - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + } // If custom path: skip filesystem deletion (external table keeps its data) if (customPath) { @@ -339,7 +345,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { // Determine table location before table exists in JDBC Path tableLocation = initialTableLocation(schema.options(), identifier); - boolean externalTable = schema.options().containsKey(CoreOptions.PATH.key()); + boolean externalTable = + syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key()); // create table file using the determined location SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); @@ -371,18 +378,14 @@ protected void createTableImpl(Identifier identifier, Schema schema) { identifier.getFullName(), catalogKey)); } - // Always store path property for custom-path tables - Map propsToStore = new HashMap<>(); - if (externalTable) { - propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); - } if (syncTableProperties()) { - Map tableProps = collectTableProperties(tableSchema); - // Avoid duplicate path entry - tableProps.remove(CoreOptions.PATH.key()); - propsToStore.putAll(tableProps); - } - if (!propsToStore.isEmpty()) { + Map propsToStore = collectTableProperties(tableSchema); + // Store custom path in table properties + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, @@ -399,21 +402,21 @@ protected void createTableImpl(Identifier identifier, Schema schema) { protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { // Check custom path BEFORE renaming metadata - boolean customPath = isCustomTablePath(fromTable); + boolean customPath = syncTableProperties() && isCustomTablePath(fromTable); Path fromPath = getTableLocation(fromTable); // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); - - // Always update table properties (needed for custom path entries) - execute( - connections, - JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, - toTable.getDatabaseName(), - toTable.getObjectName(), - catalogKey, - fromTable.getDatabaseName(), - fromTable.getObjectName()); + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + catalogKey, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + } // If custom path: skip filesystem rename (data stays at same location) if (customPath) { @@ -501,19 +504,24 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis @Override protected boolean allowCustomTablePath() { - return true; + return syncTableProperties(); } @Override public Path getTableLocation(Identifier identifier) { - Optional storedPath = - JdbcUtils.getTableProperty( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - CoreOptions.PATH.key()); - return storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + if (syncTableProperties()) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + if (storedPath.isPresent()) { + return new Path(storedPath.get()); + } + } + return super.getTableLocation(identifier); } @Override @@ -616,26 +624,23 @@ public void repairTable(Identifier identifier) throws TableNotExistException { } } - // Always clean and re-insert table properties during repair - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); - - Map propsToStore = new HashMap<>(); - // Check if the table has a custom path (path from schema != default location) - Path defaultLocation = super.getTableLocation(identifier); - if (!tableLocation.equals(defaultLocation)) { - propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); - } if (syncTableProperties()) { - Map tableProps = collectTableProperties(tableSchema); - tableProps.remove(CoreOptions.PATH.key()); - propsToStore.putAll(tableProps); - } - if (!propsToStore.isEmpty()) { + // Delete existing properties and reinsert from filesystem schema + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + Map propsToStore = collectTableProperties(tableSchema); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 7e2ed08d10bd..be3e8db37776 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -679,16 +679,9 @@ public void testCreateTableWithCustomPathSyncDisabled() throws Exception { Schema schema = schemaWithCustomPath(customDir); Identifier identifier = Identifier.create("test_db", "nosync_table"); - jdbcCatalog.createTable(identifier, schema, false); - - // Path should still be stored even when sync is disabled - Map storedProps = - fetchTableProperties(jdbcCatalog, "test_db", "nosync_table"); - assertThat(storedProps) - .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); - - // Other properties should NOT be stored (sync disabled) - assertThat(storedProps).hasSize(1); + // Custom path requires syncTableProperties=true + assertThatThrownBy(() -> jdbcCatalog.createTable(identifier, schema, false)) + .isInstanceOf(UnsupportedOperationException.class); } @Test @@ -810,7 +803,7 @@ public void testAlterTableWithCustomPath() throws Exception { @Test public void testLoadTableSchemaWithCustomPath() throws Exception { - JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); jdbcCatalog.createDatabase("test_db", false); String customDir = warehouse + "/custom_load/my_table"; From 12e38b805fdccaddc859ac2b90f2d6c2a1b8f3b3 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Thu, 19 Mar 2026 04:03:37 -0700 Subject: [PATCH 3/4] [core] Fix redundant JDBC queries and document repair limitation - Consolidate getTableLocation + isCustomTablePath into a single fetchStoredPathIfSyncEnabled call in dropTableImpl and renameTableImpl - Document that repairTable cannot recover custom-path tables when all JDBC metadata is lost - Fix stale "Hive Metastore" comment in renameTableImpl Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index b7ad27ebfb50..a1ff6714c7f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -294,8 +294,9 @@ protected List listTablesImpl(String databaseName) { protected void dropTableImpl(Identifier identifier, List externalPaths) { try { // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata - Path path = getTableLocation(identifier); - boolean customPath = syncTableProperties() && isCustomTablePath(identifier); + Optional storedPath = fetchStoredPathIfSyncEnabled(identifier); + Path path = storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + boolean customPath = storedPath.isPresent(); int deletedRecords = execute( @@ -402,8 +403,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { // Check custom path BEFORE renaming metadata - boolean customPath = syncTableProperties() && isCustomTablePath(fromTable); - Path fromPath = getTableLocation(fromTable); + Optional storedPath = fetchStoredPathIfSyncEnabled(fromTable); + boolean customPath = storedPath.isPresent(); + Path fromPath = storedPath.map(Path::new).orElse(super.getTableLocation(fromTable)); // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); @@ -425,7 +427,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in - // the file system and tables in the Hive Metastore. + // the file system and tables in the JDBC catalog. Path toPath = super.getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); @@ -595,6 +597,15 @@ public void repairDatabase(String databaseName) { } } + /** + * Repair a table by re-syncing JDBC metadata from the filesystem schema. + * + *

Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully + * repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows + * are lost. In that case, {@code getTableLocation} falls back to the default warehouse path + * where no schema exists, and repair will throw {@link TableNotExistException}. To recover, + * re-create the table pointing to the original custom path. + */ @Override public void repairTable(Identifier identifier) throws TableNotExistException { checkNotBranch(identifier, "repairTable"); @@ -689,14 +700,16 @@ private Path initialTableLocation(Map tableOptions, Identifier i return super.getTableLocation(identifier); } - private boolean isCustomTablePath(Identifier identifier) { - return JdbcUtils.getTableProperty( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - CoreOptions.PATH.key()) - .isPresent(); + private Optional fetchStoredPathIfSyncEnabled(Identifier identifier) { + if (syncTableProperties()) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + } + return Optional.empty(); } private SchemaManager getSchemaManager(Identifier identifier) { From 5f11dbf3b537822f8020f0c3a9d7277ffcc14fb2 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Wed, 3 Jun 2026 17:22:07 -0700 Subject: [PATCH 4/4] jdbc catalog: make warehouse path optional --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 52 ++++- .../paimon/jdbc/JdbcCatalogFactory.java | 27 +++ .../apache/paimon/jdbc/JdbcCatalogLoader.java | 6 +- .../apache/paimon/jdbc/JdbcCatalogTest.java | 196 ++++++++++++++++++ 4 files changed, 272 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index a1ff6714c7f5..7e826d3baf2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -48,6 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; @@ -89,10 +91,10 @@ public class JdbcCatalog extends AbstractCatalog { private final JdbcClientPool connections; private final String catalogKey; private final Options options; - private final String warehouse; + @Nullable private final String warehouse; protected JdbcCatalog( - FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { + FileIO fileIO, String catalogKey, CatalogContext context, @Nullable String warehouse) { super(fileIO, context); this.catalogKey = catalogKey; this.options = context.options(); @@ -205,6 +207,19 @@ protected Database getDatabaseImpl(String databaseName) throws DatabaseNotExistE Map options = Maps.newHashMap(); options.putAll(fetchProperties(databaseName)); if (!options.containsKey(DB_LOCATION_PROP)) { + if (warehouse == null) { + LOG.error( + "Database '{}' has no location property in catalog metadata " + + "and no warehouse is configured.", + databaseName); + throw new IllegalStateException( + String.format( + "Cannot resolve location for database '%s': " + + "no warehouse configured and no database location found in catalog metadata. " + + "Set 'warehouse' in catalog options or ensure the database " + + "has a location property.", + databaseName)); + } options.put(DB_LOCATION_PROP, newDatabasePath(databaseName).getName()); } options.remove(DATABASE_EXISTS_PROPERTY); @@ -220,6 +235,10 @@ protected void createDatabaseImpl(String name, Map properties) { } if (!createProps.containsKey(DB_LOCATION_PROP)) { + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to create databases. " + + "Set 'warehouse' in catalog options."); Path databasePath = newDatabasePath(name); createProps.put(DB_LOCATION_PROP, databasePath.toString()); } @@ -295,7 +314,7 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { try { // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata Optional storedPath = fetchStoredPathIfSyncEnabled(identifier); - Path path = storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + Path path = storedPath.map(Path::new).orElse(getTableLocation(identifier)); boolean customPath = storedPath.isPresent(); int deletedRecords = @@ -405,7 +424,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { // Check custom path BEFORE renaming metadata Optional storedPath = fetchStoredPathIfSyncEnabled(fromTable); boolean customPath = storedPath.isPresent(); - Path fromPath = storedPath.map(Path::new).orElse(super.getTableLocation(fromTable)); + Path fromPath = storedPath.map(Path::new).orElse(getTableLocation(fromTable)); // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); @@ -428,7 +447,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the JDBC catalog. - Path toPath = super.getTableLocation(toTable); + Path toPath = getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -523,7 +542,18 @@ public Path getTableLocation(Identifier identifier) { return new Path(storedPath.get()); } } - return super.getTableLocation(identifier); + if (warehouse != null) { + return super.getTableLocation(identifier); + } + Map dbProps = fetchProperties(identifier.getDatabaseName()); + String dbLocation = dbProps.get(DB_LOCATION_PROP); + if (dbLocation != null) { + return new Path(dbLocation, identifier.getTableName()); + } + throw new IllegalStateException( + "Cannot resolve table location for " + + identifier.getFullName() + + ": no warehouse configured and no database location found in catalog metadata."); } @Override @@ -556,6 +586,10 @@ public T runWithLock(Identifier identifier, Callable callable) throws Exc @Override public void repairCatalog() { + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to repair catalogs. " + + "Set 'warehouse' in catalog options."); List databases; try { databases = listDatabasesInFileSystem(new Path(warehouse)); @@ -570,6 +604,10 @@ public void repairCatalog() { @Override public void repairDatabase(String databaseName) { checkNotSystemDatabase(databaseName); + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to repair databases. " + + "Set 'warehouse' in catalog options."); // First check if database exists in file system Path databasePath = newDatabasePath(databaseName); @@ -697,7 +735,7 @@ private Path initialTableLocation(Map tableOptions, Identifier i if (tableOptions.containsKey(CoreOptions.PATH.key())) { return new Path(tableOptions.get(CoreOptions.PATH.key())); } - return super.getTableLocation(identifier); + return getTableLocation(identifier); } private Optional fetchStoredPathIfSyncEnabled(Identifier identifier) { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index b2998e55787b..e9e1224bb5f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -23,8 +23,13 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import java.io.IOException; +import java.io.UncheckedIOException; + /** Factory to create {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -35,6 +40,28 @@ public String identifier() { return IDENTIFIER; } + @Override + public Catalog create(CatalogContext context) { + Options options = context.options(); + String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); + + if (warehouseStr != null) { + Path warehousePath = new Path(warehouseStr); + try { + FileIO fileIO = FileIO.get(warehousePath, context); + fileIO.checkOrMkdirs(warehousePath); + return new JdbcCatalog(fileIO, catalogKey, context, warehouseStr); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + ResolvingFileIO fileIO = new ResolvingFileIO(); + fileIO.configure(context); + return new JdbcCatalog(fileIO, catalogKey, context, null); + } + @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { Options options = context.options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java index 492d1a84b389..a286abcd6bc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java @@ -23,6 +23,8 @@ import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.fs.FileIO; +import javax.annotation.Nullable; + /** Loader to create {@link JdbcCatalog}. */ public class JdbcCatalogLoader implements CatalogLoader { @@ -31,10 +33,10 @@ public class JdbcCatalogLoader implements CatalogLoader { private final FileIO fileIO; private final String catalogKey; private final CatalogContext context; - private final String warehouse; + @Nullable private final String warehouse; public JdbcCatalogLoader( - FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { + FileIO fileIO, String catalogKey, CatalogContext context, @Nullable String warehouse) { this.fileIO = fileIO; this.catalogKey = catalogKey; this.context = context; diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index be3e8db37776..73e764c39ff3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -830,4 +830,200 @@ public void testGetTableLocationFallback() throws Exception { Path expected = new Path(new Path(warehouse, "test_db.db"), "default_table"); assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(expected); } + + @Test + public void testCrossWarehouseTableAccess() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "cross_warehouse.db"); + + // Catalog A: producer with warehouse_a + String warehouseA = new Path(warehouse, "warehouse_a").toString(); + JdbcCatalog catalogA = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + catalogA.createDatabase("mydb", false); + catalogA.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Catalog B: reader WITHOUT warehouse (same JDBC) resolves from DB properties + JdbcCatalog catalogB = initCatalogWithoutWarehouse(sharedUri, true); + + // Catalog B resolves table location from database properties + Path tableLocation = catalogB.getTableLocation(Identifier.create("mydb", "mytable")); + assertThat(tableLocation.toString()).contains("warehouse_a"); + + // Catalog B can load the table + Table table = catalogB.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogReadsTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_read.db"); + + // Producer creates table with warehouse + String warehouseA = new Path(warehouse, "warehouse_a_nw").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + producer.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Reader has no warehouse + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + + // Reader can resolve and load the table + Table table = reader.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogCreatesTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_write.db"); + + // Producer creates database (requires warehouse to set location) + String warehouseA = new Path(warehouse, "warehouse_a_write").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + + // Writer has no warehouse but creates a new table in existing database + JdbcCatalog writer = initCatalogWithoutWarehouse(sharedUri, true); + Identifier newTable = Identifier.create("mydb", "new_table"); + writer.createTable(newTable, DEFAULT_TABLE_SCHEMA, false); + + // Table location should be resolved from database location property + Path tableLocation = writer.getTableLocation(newTable); + assertThat(tableLocation.toString()).contains("warehouse_a_write"); + assertThat(tableLocation.toString()).contains("mydb.db/new_table"); + + // Table is readable + Table table = writer.getTable(newTable); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogDropsTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_drop.db"); + + // Producer creates database and table + String warehouseA = new Path(warehouse, "warehouse_a_drop").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + Identifier identifier = Identifier.create("mydb", "mytable"); + producer.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify data exists on filesystem + Path tablePath = producer.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + // No-warehouse catalog drops the table + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + reader.dropTable(identifier, false); + + // Verify metadata is gone + assertThatThrownBy(() -> reader.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify filesystem data is deleted + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testNoWarehouseCatalogRenamesTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_rename.db"); + + // Producer creates database and table + String warehouseA = new Path(warehouse, "warehouse_a_rename").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + Identifier fromTable = Identifier.create("mydb", "old_name"); + producer.createTable(fromTable, DEFAULT_TABLE_SCHEMA, false); + + Path fromPath = producer.getTableLocation(fromTable); + assertThat(fileIO.exists(fromPath)).isTrue(); + + // No-warehouse catalog renames the table + JdbcCatalog writer = initCatalogWithoutWarehouse(sharedUri, true); + Identifier toTable = Identifier.create("mydb", "new_name"); + writer.renameTable(fromTable, toTable, false); + + // Old name is gone + assertThatThrownBy(() -> writer.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // New name is accessible + Table table = writer.getTable(toTable); + assertThat(table).isNotNull(); + + // Filesystem directory was renamed + Path toPath = writer.getTableLocation(toTable); + assertThat(toPath.toString()).contains("mydb.db/new_name"); + assertThat(fileIO.exists(toPath)).isTrue(); + assertThat(fileIO.exists(fromPath)).isFalse(); + } + + @Test + public void testNoWarehouseCannotCreateDatabase() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_create.db"); + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + + assertThatThrownBy(() -> reader.createDatabase("newdb", false)) + .hasMessageContaining("warehouse"); + } + + @Test + public void testFactoryCreatesNoWarehouseCatalog() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "factory.db"); + + // Producer creates database and table via direct constructor (with warehouse) + String warehouseA = new Path(warehouse, "warehouse_factory").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + producer.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Reader created via factory with no warehouse option + Map readerProps = Maps.newHashMap(); + readerProps.put(CatalogOptions.URI.key(), sharedUri); + readerProps.put(CatalogOptions.METASTORE.key(), "jdbc"); + readerProps.put(JdbcCatalogOptions.CATALOG_KEY.key(), "test-jdbc-catalog"); + readerProps.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + readerProps.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + readerProps.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + readerProps.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + readerProps.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), "true"); + CatalogContext readerContext = CatalogContext.create(Options.fromMap(readerProps)); + JdbcCatalog reader = (JdbcCatalog) new JdbcCatalogFactory().create(readerContext); + + assertThat(reader.warehouse()).isNull(); + + // Reader can load the table via factory-created catalog + Table table = reader.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + private JdbcCatalog initCatalogWithWarehouseAndUri( + String catalogWarehouse, String jdbcUri, boolean syncAllProperties) { + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), jdbcUri); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.WAREHOUSE.key(), catalogWarehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), String.valueOf(syncAllProperties)); + return new JdbcCatalog( + fileIO, + "test-jdbc-catalog", + CatalogContext.create(Options.fromMap(properties)), + catalogWarehouse); + } + + private JdbcCatalog initCatalogWithoutWarehouse(String jdbcUri, boolean syncAllProperties) { + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), jdbcUri); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), String.valueOf(syncAllProperties)); + return new JdbcCatalog( + fileIO, + "test-jdbc-catalog", + CatalogContext.create(Options.fromMap(properties)), + null); + } }