diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..7e826d3baf2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -48,6 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; @@ -89,10 +91,10 @@ public class JdbcCatalog extends AbstractCatalog { private final JdbcClientPool connections; private final String catalogKey; private final Options options; - private final String warehouse; + @Nullable private final String warehouse; protected JdbcCatalog( - FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { + FileIO fileIO, String catalogKey, CatalogContext context, @Nullable String warehouse) { super(fileIO, context); this.catalogKey = catalogKey; this.options = context.options(); @@ -205,6 +207,19 @@ protected Database getDatabaseImpl(String databaseName) throws DatabaseNotExistE Map options = Maps.newHashMap(); options.putAll(fetchProperties(databaseName)); if (!options.containsKey(DB_LOCATION_PROP)) { + if (warehouse == null) { + LOG.error( + "Database '{}' has no location property in catalog metadata " + + "and no warehouse is configured.", + databaseName); + throw new IllegalStateException( + String.format( + "Cannot resolve location for database '%s': " + + "no warehouse configured and no database location found in catalog metadata. " + + "Set 'warehouse' in catalog options or ensure the database " + + "has a location property.", + databaseName)); + } options.put(DB_LOCATION_PROP, newDatabasePath(databaseName).getName()); } options.remove(DATABASE_EXISTS_PROPERTY); @@ -220,6 +235,10 @@ protected void createDatabaseImpl(String name, Map properties) { } if (!createProps.containsKey(DB_LOCATION_PROP)) { + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to create databases. " + + "Set 'warehouse' in catalog options."); Path databasePath = newDatabasePath(name); createProps.put(DB_LOCATION_PROP, databasePath.toString()); } @@ -293,6 +312,11 @@ protected List listTablesImpl(String databaseName) { @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { + // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata + Optional storedPath = fetchStoredPathIfSyncEnabled(identifier); + Path path = storedPath.map(Path::new).orElse(getTableLocation(identifier)); + boolean customPath = storedPath.isPresent(); + int deletedRecords = execute( connections, @@ -313,7 +337,12 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { identifier.getDatabaseName(), identifier.getTableName()); } - Path path = getTableLocation(identifier); + + // If custom path: skip filesystem deletion (external table keeps its data) + if (customPath) { + return; + } + try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -334,12 +363,17 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { try { - // create table file - SchemaManager schemaManager = getSchemaManager(identifier); + // Determine table location before table exists in JDBC + Path tableLocation = initialTableLocation(schema.options(), identifier); + boolean externalTable = + syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key()); + + // create table file using the determined location + SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); - // Update schema metadata - Path path = getTableLocation(identifier); + runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable)); + + // Insert table record into paimon_tables if (JdbcUtils.insertTable( connections, catalogKey, @@ -348,22 +382,36 @@ protected void createTableImpl(Identifier identifier, Schema schema) { LOG.debug("Successfully committed to new table: {}", identifier); } else { try { - fileIO.deleteDirectoryQuietly(path); + if (!externalTable) { + fileIO.deleteDirectoryQuietly(tableLocation); + } } catch (Exception ee) { - LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + LOG.error( + "Delete directory[{}] fail for table {}", + tableLocation, + identifier, + ee); } throw new RuntimeException( String.format( "Failed to create table %s in catalog %s", identifier.getFullName(), catalogKey)); } + if (syncTableProperties()) { + Map propsToStore = collectTableProperties(tableSchema); + // Store custom path in table properties + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } catch (Exception e) { throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); @@ -373,6 +421,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) { @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { + // Check custom path BEFORE renaming metadata + Optional storedPath = fetchStoredPathIfSyncEnabled(fromTable); + boolean customPath = storedPath.isPresent(); + Path fromPath = storedPath.map(Path::new).orElse(getTableLocation(fromTable)); + // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); if (syncTableProperties()) { @@ -386,10 +439,14 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { fromTable.getObjectName()); } - Path fromPath = getTableLocation(fromTable); + // If custom path: skip filesystem rename (data stays at same location) + if (customPath) { + return; + } + if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in - // the file system and tables in the Hive Metastore. + // the file system and tables in the JDBC catalog. Path toPath = getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); @@ -414,6 +471,15 @@ protected void alterTableImpl(Identifier identifier, List changes) try { runWithLock(identifier, () -> schemaManager.commitChanges(changes)); if (syncTableProperties()) { + // Save custom path before DELETE_ALL so we can re-insert it + Optional customPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + TableSchema updatedSchema = schemaManager.latest().get(); execute( connections, @@ -421,12 +487,18 @@ protected void alterTableImpl(Identifier identifier, List changes) catalogKey, identifier.getDatabaseName(), identifier.getTableName()); - JdbcUtils.insertTableProperties( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - collectTableProperties(updatedSchema)); + + Map propsToStore = collectTableProperties(updatedSchema); + // Re-insert custom path if it was stored + customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p)); + if (!propsToStore.isEmpty()) { + JdbcUtils.insertTableProperties( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + propsToStore); + } } } catch (TableNotExistException | ColumnAlreadyExistException @@ -451,6 +523,39 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis () -> new RuntimeException("There is no paimon table in " + tableLocation)); } + @Override + protected boolean allowCustomTablePath() { + return syncTableProperties(); + } + + @Override + public Path getTableLocation(Identifier identifier) { + if (syncTableProperties()) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + if (storedPath.isPresent()) { + return new Path(storedPath.get()); + } + } + if (warehouse != null) { + return super.getTableLocation(identifier); + } + Map dbProps = fetchProperties(identifier.getDatabaseName()); + String dbLocation = dbProps.get(DB_LOCATION_PROP); + if (dbLocation != null) { + return new Path(dbLocation, identifier.getTableName()); + } + throw new IllegalStateException( + "Cannot resolve table location for " + + identifier.getFullName() + + ": no warehouse configured and no database location found in catalog metadata."); + } + @Override public boolean caseSensitive() { return false; @@ -481,6 +586,10 @@ public T runWithLock(Identifier identifier, Callable callable) throws Exc @Override public void repairCatalog() { + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to repair catalogs. " + + "Set 'warehouse' in catalog options."); List databases; try { databases = listDatabasesInFileSystem(new Path(warehouse)); @@ -495,6 +604,10 @@ public void repairCatalog() { @Override public void repairDatabase(String databaseName) { checkNotSystemDatabase(databaseName); + Preconditions.checkNotNull( + warehouse, + "The 'warehouse' option is required to repair databases. " + + "Set 'warehouse' in catalog options."); // First check if database exists in file system Path databasePath = newDatabasePath(databaseName); @@ -522,6 +635,15 @@ public void repairDatabase(String databaseName) { } } + /** + * Repair a table by re-syncing JDBC metadata from the filesystem schema. + * + *

Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully + * repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows + * are lost. In that case, {@code getTableLocation} falls back to the default warehouse path + * where no schema exists, and repair will throw {@link TableNotExistException}. To recover, + * re-create the table pointing to the original custom path. + */ @Override public void repairTable(Identifier identifier) throws TableNotExistException { checkNotBranch(identifier, "repairTable"); @@ -550,6 +672,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException { LOG.error("Failed to repair table: {}", identifier); } } + if (syncTableProperties()) { // Delete existing properties and reinsert from filesystem schema execute( @@ -558,12 +681,21 @@ public void repairTable(Identifier identifier) throws TableNotExistException { catalogKey, identifier.getDatabaseName(), identifier.getTableName()); + + Map propsToStore = collectTableProperties(tableSchema); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } @@ -599,6 +731,25 @@ private Map collectTableProperties(TableSchema tableSchema) { return properties; } + private Path initialTableLocation(Map tableOptions, Identifier identifier) { + if (tableOptions.containsKey(CoreOptions.PATH.key())) { + return new Path(tableOptions.get(CoreOptions.PATH.key())); + } + return getTableLocation(identifier); + } + + private Optional fetchStoredPathIfSyncEnabled(Identifier identifier) { + if (syncTableProperties()) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + } + return Optional.empty(); + } + private SchemaManager getSchemaManager(Identifier identifier) { return new SchemaManager(fileIO, getTableLocation(identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index b2998e55787b..e9e1224bb5f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -23,8 +23,13 @@ import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import java.io.IOException; +import java.io.UncheckedIOException; + /** Factory to create {@link JdbcCatalog}. */ public class JdbcCatalogFactory implements CatalogFactory { @@ -35,6 +40,28 @@ public String identifier() { return IDENTIFIER; } + @Override + public Catalog create(CatalogContext context) { + Options options = context.options(); + String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); + + if (warehouseStr != null) { + Path warehousePath = new Path(warehouseStr); + try { + FileIO fileIO = FileIO.get(warehousePath, context); + fileIO.checkOrMkdirs(warehousePath); + return new JdbcCatalog(fileIO, catalogKey, context, warehouseStr); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + ResolvingFileIO fileIO = new ResolvingFileIO(); + fileIO.configure(context); + return new JdbcCatalog(fileIO, catalogKey, context, null); + } + @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { Options options = context.options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java index 492d1a84b389..a286abcd6bc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java @@ -23,6 +23,8 @@ import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.fs.FileIO; +import javax.annotation.Nullable; + /** Loader to create {@link JdbcCatalog}. */ public class JdbcCatalogLoader implements CatalogLoader { @@ -31,10 +33,10 @@ public class JdbcCatalogLoader implements CatalogLoader { private final FileIO fileIO; private final String catalogKey; private final CatalogContext context; - private final String warehouse; + @Nullable private final String warehouse; public JdbcCatalogLoader( - FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) { + FileIO fileIO, String catalogKey, CatalogContext context, @Nullable String warehouse) { this.fileIO = fileIO; this.catalogKey = catalogKey; this.context = context; diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..805aa43d85a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -32,6 +32,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; @@ -325,6 +326,21 @@ public class JdbcUtils { + TABLE_NAME + " = ? "; + static final String GET_TABLE_PROPERTY_SQL = + "SELECT " + + TABLE_PROPERTY_VALUE + + " FROM " + + TABLE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? AND " + + TABLE_PROPERTY_KEY + + " = ?"; + // Distributed locks table static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; static final String LOCK_ID = "lock_id"; @@ -554,6 +570,41 @@ public static boolean insertTableProperties( insertedRecords, properties.size())); } + @SuppressWarnings("checkstyle:NestedTryDepth") + public static Optional getTableProperty( + JdbcClientPool connections, + String storeKey, + String databaseName, + String tableName, + String propertyKey) { + try { + return connections.run( + conn -> { + try (PreparedStatement ps = conn.prepareStatement(GET_TABLE_PROPERTY_SQL)) { + ps.setString(1, storeKey); + ps.setString(2, databaseName); + ps.setString(3, tableName); + ps.setString(4, propertyKey); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return Optional.of(rs.getString(TABLE_PROPERTY_VALUE)); + } + } + } + return Optional.empty(); + }); + } catch (SQLException e) { + throw new RuntimeException( + String.format( + "Failed to get table property '%s' for %s.%s", + propertyKey, databaseName, tableName), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + private static String insertTablePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_TABLE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index fd3c6fdc5950..73e764c39ff3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -18,14 +18,17 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -625,4 +628,402 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + private Schema schemaWithCustomPath(String customPath) { + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customPath); + return new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + } + + @Test + public void testCreateTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_location/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "custom_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema exists at custom location + Path customPath = new Path(customDir); + SchemaManager sm = new SchemaManager(fileIO, customPath); + assertThat(sm.listAllIds()).isNotEmpty(); + + // Verify getTableLocation returns the custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(customPath); + + // Verify path is stored in JDBC table properties + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "custom_table"); + assertThat(storedProps).containsEntry(CoreOptions.PATH.key(), customPath.toString()); + + // Verify table is loadable + assertDoesNotThrow(() -> jdbcCatalog.getTable(identifier)); + } + + @Test + public void testCreateTableWithCustomPathSyncDisabled() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_nosync/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "nosync_table"); + + // Custom path requires syncTableProperties=true + assertThatThrownBy(() -> jdbcCatalog.createTable(identifier, schema, false)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testDropTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_drop/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "drop_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify data exists at custom location + Path customPath = new Path(customDir); + assertThat(fileIO.exists(customPath)).isTrue(); + + // Drop the table + jdbcCatalog.dropTable(identifier, false); + + // Verify JDBC metadata is cleaned up + assertThatThrownBy(() -> jdbcCatalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "drop_custom"); + assertThat(storedProps).isEmpty(); + + // Verify data is NOT deleted (external table keeps its data) + assertThat(fileIO.exists(customPath)).isTrue(); + } + + @Test + public void testDropTableWithDefaultPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "drop_default"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + Path tablePath = jdbcCatalog.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + jdbcCatalog.dropTable(identifier, false); + + // Data SHOULD be deleted for default-path tables + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testRenameTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_rename/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier fromTable = Identifier.create("test_db", "rename_from"); + + jdbcCatalog.createTable(fromTable, schema, false); + + Identifier toTable = Identifier.create("test_db", "rename_to"); + jdbcCatalog.renameTable(fromTable, toTable, false); + + // Verify old table is gone + assertThatThrownBy(() -> jdbcCatalog.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify new table is accessible and still points to the custom path + assertThat(jdbcCatalog.getTableLocation(toTable)).isEqualTo(new Path(customDir)); + + // Verify the path property was moved + Map storedProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_to"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Verify old name has no properties + Map oldProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_from"); + assertThat(oldProps).isEmpty(); + } + + @Test + public void testAlterTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_alter/my_table"; + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customDir); + options.put("bucket", "4"); + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Lists.newArrayList("pk"), + options, + ""); + + Identifier identifier = Identifier.create("test_db", "alter_custom"); + jdbcCatalog.createTable(identifier, schema, false); + + // Alter: add a new option + jdbcCatalog.alterTable( + identifier, + Lists.newArrayList(SchemaChange.setOption("file.format", "orc")), + false); + + // Verify path is preserved after alter + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "alter_custom"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + assertThat(storedProps).containsEntry("file.format", "orc"); + assertThat(storedProps).containsEntry("bucket", "4"); + + // Verify getTableLocation still returns custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(new Path(customDir)); + } + + @Test + public void testLoadTableSchemaWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_load/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "load_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema loads from custom location + Table table = jdbcCatalog.getTable(identifier); + assertThat(table).isNotNull(); + assertThat(table.name()).isEqualTo("load_custom"); + } + + @Test + public void testGetTableLocationFallback() throws Exception { + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "default_table"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify getTableLocation returns default when no stored path + Path expected = new Path(new Path(warehouse, "test_db.db"), "default_table"); + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(expected); + } + + @Test + public void testCrossWarehouseTableAccess() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "cross_warehouse.db"); + + // Catalog A: producer with warehouse_a + String warehouseA = new Path(warehouse, "warehouse_a").toString(); + JdbcCatalog catalogA = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + catalogA.createDatabase("mydb", false); + catalogA.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Catalog B: reader WITHOUT warehouse (same JDBC) resolves from DB properties + JdbcCatalog catalogB = initCatalogWithoutWarehouse(sharedUri, true); + + // Catalog B resolves table location from database properties + Path tableLocation = catalogB.getTableLocation(Identifier.create("mydb", "mytable")); + assertThat(tableLocation.toString()).contains("warehouse_a"); + + // Catalog B can load the table + Table table = catalogB.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogReadsTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_read.db"); + + // Producer creates table with warehouse + String warehouseA = new Path(warehouse, "warehouse_a_nw").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + producer.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Reader has no warehouse + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + + // Reader can resolve and load the table + Table table = reader.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogCreatesTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_write.db"); + + // Producer creates database (requires warehouse to set location) + String warehouseA = new Path(warehouse, "warehouse_a_write").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + + // Writer has no warehouse but creates a new table in existing database + JdbcCatalog writer = initCatalogWithoutWarehouse(sharedUri, true); + Identifier newTable = Identifier.create("mydb", "new_table"); + writer.createTable(newTable, DEFAULT_TABLE_SCHEMA, false); + + // Table location should be resolved from database location property + Path tableLocation = writer.getTableLocation(newTable); + assertThat(tableLocation.toString()).contains("warehouse_a_write"); + assertThat(tableLocation.toString()).contains("mydb.db/new_table"); + + // Table is readable + Table table = writer.getTable(newTable); + assertThat(table).isNotNull(); + } + + @Test + public void testNoWarehouseCatalogDropsTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_drop.db"); + + // Producer creates database and table + String warehouseA = new Path(warehouse, "warehouse_a_drop").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + Identifier identifier = Identifier.create("mydb", "mytable"); + producer.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify data exists on filesystem + Path tablePath = producer.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + // No-warehouse catalog drops the table + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + reader.dropTable(identifier, false); + + // Verify metadata is gone + assertThatThrownBy(() -> reader.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify filesystem data is deleted + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testNoWarehouseCatalogRenamesTable() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_rename.db"); + + // Producer creates database and table + String warehouseA = new Path(warehouse, "warehouse_a_rename").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + Identifier fromTable = Identifier.create("mydb", "old_name"); + producer.createTable(fromTable, DEFAULT_TABLE_SCHEMA, false); + + Path fromPath = producer.getTableLocation(fromTable); + assertThat(fileIO.exists(fromPath)).isTrue(); + + // No-warehouse catalog renames the table + JdbcCatalog writer = initCatalogWithoutWarehouse(sharedUri, true); + Identifier toTable = Identifier.create("mydb", "new_name"); + writer.renameTable(fromTable, toTable, false); + + // Old name is gone + assertThatThrownBy(() -> writer.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // New name is accessible + Table table = writer.getTable(toTable); + assertThat(table).isNotNull(); + + // Filesystem directory was renamed + Path toPath = writer.getTableLocation(toTable); + assertThat(toPath.toString()).contains("mydb.db/new_name"); + assertThat(fileIO.exists(toPath)).isTrue(); + assertThat(fileIO.exists(fromPath)).isFalse(); + } + + @Test + public void testNoWarehouseCannotCreateDatabase() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "no_warehouse_create.db"); + JdbcCatalog reader = initCatalogWithoutWarehouse(sharedUri, true); + + assertThatThrownBy(() -> reader.createDatabase("newdb", false)) + .hasMessageContaining("warehouse"); + } + + @Test + public void testFactoryCreatesNoWarehouseCatalog() throws Exception { + String sharedUri = "jdbc:sqlite:" + new Path(warehouse, "factory.db"); + + // Producer creates database and table via direct constructor (with warehouse) + String warehouseA = new Path(warehouse, "warehouse_factory").toString(); + JdbcCatalog producer = initCatalogWithWarehouseAndUri(warehouseA, sharedUri, true); + producer.createDatabase("mydb", false); + producer.createTable(Identifier.create("mydb", "mytable"), DEFAULT_TABLE_SCHEMA, false); + + // Reader created via factory with no warehouse option + Map readerProps = Maps.newHashMap(); + readerProps.put(CatalogOptions.URI.key(), sharedUri); + readerProps.put(CatalogOptions.METASTORE.key(), "jdbc"); + readerProps.put(JdbcCatalogOptions.CATALOG_KEY.key(), "test-jdbc-catalog"); + readerProps.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + readerProps.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + readerProps.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + readerProps.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + readerProps.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), "true"); + CatalogContext readerContext = CatalogContext.create(Options.fromMap(readerProps)); + JdbcCatalog reader = (JdbcCatalog) new JdbcCatalogFactory().create(readerContext); + + assertThat(reader.warehouse()).isNull(); + + // Reader can load the table via factory-created catalog + Table table = reader.getTable(Identifier.create("mydb", "mytable")); + assertThat(table).isNotNull(); + } + + private JdbcCatalog initCatalogWithWarehouseAndUri( + String catalogWarehouse, String jdbcUri, boolean syncAllProperties) { + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), jdbcUri); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.WAREHOUSE.key(), catalogWarehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), String.valueOf(syncAllProperties)); + return new JdbcCatalog( + fileIO, + "test-jdbc-catalog", + CatalogContext.create(Options.fromMap(properties)), + catalogWarehouse); + } + + private JdbcCatalog initCatalogWithoutWarehouse(String jdbcUri, boolean syncAllProperties) { + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), jdbcUri); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.put(CatalogOptions.SYNC_ALL_PROPERTIES.key(), String.valueOf(syncAllProperties)); + return new JdbcCatalog( + fileIO, + "test-jdbc-catalog", + CatalogContext.create(Options.fromMap(properties)), + null); + } }