Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 171 additions & 20 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -89,10 +91,10 @@ public class JdbcCatalog extends AbstractCatalog {
private final JdbcClientPool connections;
private final String catalogKey;
private final Options options;
private final String warehouse;
@Nullable private final String warehouse;

protected JdbcCatalog(
FileIO fileIO, String catalogKey, CatalogContext context, String warehouse) {
FileIO fileIO, String catalogKey, CatalogContext context, @Nullable String warehouse) {
super(fileIO, context);
this.catalogKey = catalogKey;
this.options = context.options();
Expand Down Expand Up @@ -205,6 +207,19 @@ protected Database getDatabaseImpl(String databaseName) throws DatabaseNotExistE
Map<String, String> options = Maps.newHashMap();
options.putAll(fetchProperties(databaseName));
if (!options.containsKey(DB_LOCATION_PROP)) {
if (warehouse == null) {
LOG.error(
"Database '{}' has no location property in catalog metadata "
+ "and no warehouse is configured.",
databaseName);
throw new IllegalStateException(
String.format(
"Cannot resolve location for database '%s': "
+ "no warehouse configured and no database location found in catalog metadata. "
+ "Set 'warehouse' in catalog options or ensure the database "
+ "has a location property.",
databaseName));
}
options.put(DB_LOCATION_PROP, newDatabasePath(databaseName).getName());
}
options.remove(DATABASE_EXISTS_PROPERTY);
Expand All @@ -220,6 +235,10 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
}

if (!createProps.containsKey(DB_LOCATION_PROP)) {
Preconditions.checkNotNull(
warehouse,
"The 'warehouse' option is required to create databases. "
+ "Set 'warehouse' in catalog options.");
Path databasePath = newDatabasePath(name);
createProps.put(DB_LOCATION_PROP, databasePath.toString());
}
Expand Down Expand Up @@ -293,6 +312,11 @@ protected List<String> listTablesImpl(String databaseName) {
@Override
protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
try {
// Retrieve table location and custom-path flag BEFORE deleting JDBC metadata
Optional<String> storedPath = fetchStoredPathIfSyncEnabled(identifier);
Path path = storedPath.map(Path::new).orElse(getTableLocation(identifier));
boolean customPath = storedPath.isPresent();

int deletedRecords =
execute(
connections,
Expand All @@ -313,7 +337,12 @@ protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
identifier.getDatabaseName(),
identifier.getTableName());
}
Path path = getTableLocation(identifier);

// If custom path: skip filesystem deletion (external table keeps its data)
if (customPath) {
return;
}

try {
if (fileIO.exists(path)) {
fileIO.deleteDirectoryQuietly(path);
Expand All @@ -334,12 +363,17 @@ protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
try {
// create table file
SchemaManager schemaManager = getSchemaManager(identifier);
// Determine table location before table exists in JDBC
Path tableLocation = initialTableLocation(schema.options(), identifier);
boolean externalTable =
syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key());

// create table file using the determined location
SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation);
TableSchema tableSchema =
runWithLock(identifier, () -> schemaManager.createTable(schema));
// Update schema metadata
Path path = getTableLocation(identifier);
runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable));

// Insert table record into paimon_tables
if (JdbcUtils.insertTable(
connections,
catalogKey,
Expand All @@ -348,22 +382,36 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
LOG.debug("Successfully committed to new table: {}", identifier);
} else {
try {
fileIO.deleteDirectoryQuietly(path);
if (!externalTable) {
fileIO.deleteDirectoryQuietly(tableLocation);
}
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee);
LOG.error(
"Delete directory[{}] fail for table {}",
tableLocation,
identifier,
ee);
}
throw new RuntimeException(
String.format(
"Failed to create table %s in catalog %s",
identifier.getFullName(), catalogKey));
}

if (syncTableProperties()) {
Map<String, String> propsToStore = collectTableProperties(tableSchema);
// Store custom path in table properties
if (externalTable) {
propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString());
} else {
propsToStore.remove(CoreOptions.PATH.key());
}
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(tableSchema));
propsToStore);
}
} catch (Exception e) {
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
Expand All @@ -373,6 +421,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
@Override
protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
// Check custom path BEFORE renaming metadata
Optional<String> storedPath = fetchStoredPathIfSyncEnabled(fromTable);
boolean customPath = storedPath.isPresent();
Path fromPath = storedPath.map(Path::new).orElse(getTableLocation(fromTable));

// update table metadata info
updateTable(connections, catalogKey, fromTable, toTable);
if (syncTableProperties()) {
Expand All @@ -386,10 +439,14 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
fromTable.getObjectName());
}

Path fromPath = getTableLocation(fromTable);
// If custom path: skip filesystem rename (data stays at same location)
if (customPath) {
return;
}

if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
// the file system and tables in the JDBC catalog.
Path toPath = getTableLocation(toTable);
try {
fileIO.rename(fromPath, toPath);
Expand All @@ -414,19 +471,34 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
try {
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
if (syncTableProperties()) {
// Save custom path before DELETE_ALL so we can re-insert it
Optional<String> customPath =
JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());

TableSchema updatedSchema = schemaManager.latest().get();
execute(
connections,
JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName());
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(updatedSchema));

Map<String, String> propsToStore = collectTableProperties(updatedSchema);
// Re-insert custom path if it was stored
customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p));
if (!propsToStore.isEmpty()) {
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
propsToStore);
}
}
} catch (TableNotExistException
| ColumnAlreadyExistException
Expand All @@ -451,6 +523,39 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@Override
protected boolean allowCustomTablePath() {
return syncTableProperties();
}

@Override
public Path getTableLocation(Identifier identifier) {
if (syncTableProperties()) {
Optional<String> storedPath =
JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());
if (storedPath.isPresent()) {
return new Path(storedPath.get());
}
}
if (warehouse != null) {
return super.getTableLocation(identifier);
}
Map<String, String> dbProps = fetchProperties(identifier.getDatabaseName());
String dbLocation = dbProps.get(DB_LOCATION_PROP);
if (dbLocation != null) {
return new Path(dbLocation, identifier.getTableName());
}
throw new IllegalStateException(
"Cannot resolve table location for "
+ identifier.getFullName()
+ ": no warehouse configured and no database location found in catalog metadata.");
}

@Override
public boolean caseSensitive() {
return false;
Expand Down Expand Up @@ -481,6 +586,10 @@ public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exc

@Override
public void repairCatalog() {
Preconditions.checkNotNull(
warehouse,
"The 'warehouse' option is required to repair catalogs. "
+ "Set 'warehouse' in catalog options.");
List<String> databases;
try {
databases = listDatabasesInFileSystem(new Path(warehouse));
Expand All @@ -495,6 +604,10 @@ public void repairCatalog() {
@Override
public void repairDatabase(String databaseName) {
checkNotSystemDatabase(databaseName);
Preconditions.checkNotNull(
warehouse,
"The 'warehouse' option is required to repair databases. "
+ "Set 'warehouse' in catalog options.");

// First check if database exists in file system
Path databasePath = newDatabasePath(databaseName);
Expand Down Expand Up @@ -522,6 +635,15 @@ public void repairDatabase(String databaseName) {
}
}

/**
* Repair a table by re-syncing JDBC metadata from the filesystem schema.
*
* <p>Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully
* repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows
* are lost. In that case, {@code getTableLocation} falls back to the default warehouse path
* where no schema exists, and repair will throw {@link TableNotExistException}. To recover,
* re-create the table pointing to the original custom path.
*/
@Override
public void repairTable(Identifier identifier) throws TableNotExistException {
checkNotBranch(identifier, "repairTable");
Expand Down Expand Up @@ -550,6 +672,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
LOG.error("Failed to repair table: {}", identifier);
}
}

if (syncTableProperties()) {
// Delete existing properties and reinsert from filesystem schema
execute(
Expand All @@ -558,12 +681,21 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName());

Map<String, String> propsToStore = collectTableProperties(tableSchema);
// Check if the table has a custom path (path from schema != default location)
Path defaultLocation = super.getTableLocation(identifier);
if (!tableLocation.equals(defaultLocation)) {
propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString());
} else {
propsToStore.remove(CoreOptions.PATH.key());
}
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(tableSchema));
propsToStore);
}
}

Expand Down Expand Up @@ -599,6 +731,25 @@ private Map<String, String> collectTableProperties(TableSchema tableSchema) {
return properties;
}

private Path initialTableLocation(Map<String, String> tableOptions, Identifier identifier) {
if (tableOptions.containsKey(CoreOptions.PATH.key())) {
return new Path(tableOptions.get(CoreOptions.PATH.key()));
}
return getTableLocation(identifier);
}

private Optional<String> fetchStoredPathIfSyncEnabled(Identifier identifier) {
if (syncTableProperties()) {
return JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());
}
return Optional.empty();
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getTableLocation(identifier));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.ResolvingFileIO;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import java.io.IOException;
import java.io.UncheckedIOException;

/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {

Expand All @@ -35,6 +40,28 @@ public String identifier() {
return IDENTIFIER;
}

@Override
public Catalog create(CatalogContext context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);

if (warehouseStr != null) {
Path warehousePath = new Path(warehouseStr);
try {
FileIO fileIO = FileIO.get(warehousePath, context);
fileIO.checkOrMkdirs(warehousePath);
return new JdbcCatalog(fileIO, catalogKey, context, warehouseStr);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

ResolvingFileIO fileIO = new ResolvingFileIO();
fileIO.configure(context);
return new JdbcCatalog(fileIO, catalogKey, context, null);
}

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Options options = context.options();
Expand Down
Loading