diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 816d0a8547df..8fb437f6120d 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -61,6 +61,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * An implementation of {@link java.sql.Connection} for establishing a connection with BigQuery and @@ -72,6 +73,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); String connectionClassName = this.toString(); + private final String connectionId; + private static final AtomicLong connectionIdCounter = new AtomicLong(1); private static final String DEFAULT_JDBC_TOKEN_VALUE = "Google-BigQuery-JDBC-Driver"; private static final String DEFAULT_VERSION = "0.0.0"; private HeaderProvider headerProvider; @@ -146,113 +149,126 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { } BigQueryConnection(String url, DataSource ds) throws IOException { - this.connectionUrl = url; - this.openStatements = ConcurrentHashMap.newKeySet(); - this.autoCommit = true; - this.sqlWarnings = new ArrayList<>(); - this.transactionStarted = false; - this.isClosed = false; - - this.labels = ds.getLabels() != null ? ds.getLabels() : new java.util.HashMap<>(); - this.maxBytesBilled = ds.getMaximumBytesBilled(); - this.retryTimeoutInSeconds = ds.getTimeout(); - this.retryTimeoutDuration = Duration.ofMillis(retryTimeoutInSeconds * 1000L); - this.retryInitialDelayInSeconds = ds.getRetryInitialDelay(); - this.retryInitialDelayDuration = Duration.ofMillis(retryInitialDelayInSeconds * 1000L); - this.retryMaxDelayInSeconds = ds.getRetryMaxDelay(); - this.retryMaxDelayDuration = Duration.ofMillis(retryMaxDelayInSeconds * 1000L); - this.jobTimeoutInSeconds = ds.getJobTimeout(); - this.authProperties = - BigQueryJdbcOAuthUtility.parseOAuthProperties(ds, this.connectionClassName); - this.catalog = ds.getProjectId(); - this.universeDomain = ds.getUniverseDomain(); - - this.overrideProperties = ds.getOverrideProperties(); - if (this.universeDomain != null) { - this.overrideProperties.put( - BigQueryJdbcUrlUtility.UNIVERSE_DOMAIN_OVERRIDE_PROPERTY_NAME, this.universeDomain); - } + try { + this.connectionId = String.valueOf(connectionIdCounter.getAndIncrement()); + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + + this.connectionUrl = url; + this.openStatements = ConcurrentHashMap.newKeySet(); + this.autoCommit = true; + this.sqlWarnings = new ArrayList<>(); + this.transactionStarted = false; + this.isClosed = false; + + this.labels = ds.getLabels() != null ? ds.getLabels() : new java.util.HashMap<>(); + this.maxBytesBilled = ds.getMaximumBytesBilled(); + this.retryTimeoutInSeconds = ds.getTimeout(); + this.retryTimeoutDuration = Duration.ofMillis(retryTimeoutInSeconds * 1000L); + this.retryInitialDelayInSeconds = ds.getRetryInitialDelay(); + this.retryInitialDelayDuration = Duration.ofMillis(retryInitialDelayInSeconds * 1000L); + this.retryMaxDelayInSeconds = ds.getRetryMaxDelay(); + this.retryMaxDelayDuration = Duration.ofMillis(retryMaxDelayInSeconds * 1000L); + this.jobTimeoutInSeconds = ds.getJobTimeout(); + this.authProperties = + BigQueryJdbcOAuthUtility.parseOAuthProperties(ds, this.connectionClassName); + this.catalog = ds.getProjectId(); + this.universeDomain = ds.getUniverseDomain(); + + this.overrideProperties = ds.getOverrideProperties(); + if (this.universeDomain != null) { + this.overrideProperties.put( + BigQueryJdbcUrlUtility.UNIVERSE_DOMAIN_OVERRIDE_PROPERTY_NAME, this.universeDomain); + } - this.reqGoogleDriveScope = - BigQueryJdbcUrlUtility.convertIntToBoolean( - String.valueOf(ds.getRequestGoogleDriveScope()), - BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME); - - this.credentials = - BigQueryJdbcOAuthUtility.getCredentials( - authProperties, overrideProperties, this.reqGoogleDriveScope, this.connectionClassName); - String defaultDatasetString = ds.getDefaultDataset(); - if (defaultDatasetString == null || defaultDatasetString.trim().isEmpty()) { - this.defaultDataset = null; - } else { - String[] parts = defaultDatasetString.split("\\."); - if (parts.length == 2) { - this.defaultDataset = DatasetId.of(parts[0], parts[1]); - } else if (parts.length == 1) { - this.defaultDataset = DatasetId.of(parts[0]); + this.reqGoogleDriveScope = + BigQueryJdbcUrlUtility.convertIntToBoolean( + String.valueOf(ds.getRequestGoogleDriveScope()), + BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME); + + this.credentials = + BigQueryJdbcOAuthUtility.getCredentials( + authProperties, + overrideProperties, + this.reqGoogleDriveScope, + this.connectionClassName); + String defaultDatasetString = ds.getDefaultDataset(); + if (defaultDatasetString == null || defaultDatasetString.trim().isEmpty()) { + this.defaultDataset = null; } else { - throw new IllegalArgumentException( - "DefaultDataset format is invalid. Supported options are datasetId or" - + " projectId.datasetId"); + String[] parts = defaultDatasetString.split("\\."); + if (parts.length == 2) { + this.defaultDataset = DatasetId.of(parts[0], parts[1]); + } else if (parts.length == 1) { + this.defaultDataset = DatasetId.of(parts[0]); + } else { + throw new IllegalArgumentException( + "DefaultDataset format is invalid. Supported options are datasetId or" + + " projectId.datasetId"); + } } + this.location = ds.getLocation(); + this.enableHighThroughputAPI = ds.getEnableHighThroughputAPI(); + this.highThroughputMinTableSize = ds.getHighThroughputMinTableSize(); + this.highThroughputActivationRatio = ds.getHighThroughputActivationRatio(); + this.useQueryCache = ds.getUseQueryCache(); + this.useStatelessQueryMode = ds.getUseStatelessQueryMode(); + + this.queryDialect = ds.getQueryDialect(); + this.allowLargeResults = ds.getAllowLargeResults(); + this.destinationTable = ds.getDestinationTable(); + this.destinationDataset = ds.getDestinationDataset(); + this.destinationDatasetExpirationTime = ds.getDestinationDatasetExpirationTime(); + this.kmsKeyName = ds.getKmsKeyName(); + Map proxyProperties = + BigQueryJdbcProxyUtility.parseProxyProperties(ds, this.connectionClassName); + + this.sslTrustStorePath = ds.getSSLTrustStorePath(); + this.sslTrustStorePassword = ds.getSSLTrustStorePassword(); + this.httpConnectTimeout = ds.getHttpConnectTimeout(); + this.httpReadTimeout = ds.getHttpReadTimeout(); + + this.httpTransportOptions = + BigQueryJdbcProxyUtility.getHttpTransportOptions( + proxyProperties, + this.sslTrustStorePath, + this.sslTrustStorePassword, + this.httpConnectTimeout, + this.httpReadTimeout, + this.connectionClassName); + this.transportChannelProvider = + BigQueryJdbcProxyUtility.getTransportChannelProvider( + proxyProperties, + this.sslTrustStorePath, + this.sslTrustStorePassword, + this.connectionClassName); + this.enableSession = ds.getEnableSession(); + this.unsupportedHTAPIFallback = ds.getUnsupportedHTAPIFallback(); + this.maxResults = ds.getMaxResults(); + Map queryPropertiesMap = ds.getQueryProperties(); + this.sessionInfoConnectionProperty = + getSessionPropertyFromQueryProperties(queryPropertiesMap); + this.queryProperties = convertMapToConnectionPropertiesList(queryPropertiesMap); + this.enableWriteAPI = ds.getEnableWriteAPI(); + this.writeAPIActivationRowCount = ds.getSwaActivationRowCount(); + this.writeAPIAppendRowCount = ds.getSwaAppendRowCount(); + + this.additionalProjects = ds.getAdditionalProjects(); + + this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset(); + this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope(); + this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount(); + this.requestReason = ds.getRequestReason(); + this.connectionPoolSize = ds.getConnectionPoolSize(); + this.listenerPoolSize = ds.getListenerPoolSize(); + this.partnerToken = ds.getPartnerToken(); + + this.headerProvider = createHeaderProvider(); + this.bigQuery = getBigQueryConnection(); + } finally { + BigQueryJdbcMdc.clear(); + ; } - this.location = ds.getLocation(); - this.enableHighThroughputAPI = ds.getEnableHighThroughputAPI(); - this.highThroughputMinTableSize = ds.getHighThroughputMinTableSize(); - this.highThroughputActivationRatio = ds.getHighThroughputActivationRatio(); - this.useQueryCache = ds.getUseQueryCache(); - this.useStatelessQueryMode = ds.getUseStatelessQueryMode(); - - this.queryDialect = ds.getQueryDialect(); - this.allowLargeResults = ds.getAllowLargeResults(); - this.destinationTable = ds.getDestinationTable(); - this.destinationDataset = ds.getDestinationDataset(); - this.destinationDatasetExpirationTime = ds.getDestinationDatasetExpirationTime(); - this.kmsKeyName = ds.getKmsKeyName(); - Map proxyProperties = - BigQueryJdbcProxyUtility.parseProxyProperties(ds, this.connectionClassName); - - this.sslTrustStorePath = ds.getSSLTrustStorePath(); - this.sslTrustStorePassword = ds.getSSLTrustStorePassword(); - this.httpConnectTimeout = ds.getHttpConnectTimeout(); - this.httpReadTimeout = ds.getHttpReadTimeout(); - - this.httpTransportOptions = - BigQueryJdbcProxyUtility.getHttpTransportOptions( - proxyProperties, - this.sslTrustStorePath, - this.sslTrustStorePassword, - this.httpConnectTimeout, - this.httpReadTimeout, - this.connectionClassName); - this.transportChannelProvider = - BigQueryJdbcProxyUtility.getTransportChannelProvider( - proxyProperties, - this.sslTrustStorePath, - this.sslTrustStorePassword, - this.connectionClassName); - this.enableSession = ds.getEnableSession(); - this.unsupportedHTAPIFallback = ds.getUnsupportedHTAPIFallback(); - this.maxResults = ds.getMaxResults(); - Map queryPropertiesMap = ds.getQueryProperties(); - this.sessionInfoConnectionProperty = getSessionPropertyFromQueryProperties(queryPropertiesMap); - this.queryProperties = convertMapToConnectionPropertiesList(queryPropertiesMap); - this.enableWriteAPI = ds.getEnableWriteAPI(); - this.writeAPIActivationRowCount = ds.getSwaActivationRowCount(); - this.writeAPIAppendRowCount = ds.getSwaAppendRowCount(); - - this.additionalProjects = ds.getAdditionalProjects(); - - this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset(); - this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope(); - this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount(); - this.requestReason = ds.getRequestReason(); - this.connectionPoolSize = ds.getConnectionPoolSize(); - this.listenerPoolSize = ds.getListenerPoolSize(); - this.partnerToken = ds.getPartnerToken(); - - this.headerProvider = createHeaderProvider(); - this.bigQuery = getBigQueryConnection(); } String getLibraryVersion(Class libraryClass) { @@ -323,6 +339,10 @@ String getConnectionUrl() { return connectionUrl; } + String getConnectionId() { + return this.connectionId; + } + /** * Creates and returns a new {@code Statement} object for executing BigQuery SQL queries * @@ -331,11 +351,17 @@ String getConnectionUrl() { */ @Override public Statement createStatement() throws SQLException { - checkClosed(); - BigQueryStatement currentStatement = new BigQueryStatement(this); - LOG.fine("Statement %s created.", currentStatement); - addOpenStatements(currentStatement); - return currentStatement; + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + BigQueryStatement currentStatement = new BigQueryStatement(this); + LOG.fine("Statement %s created.", currentStatement); + addOpenStatements(currentStatement); + return currentStatement; + } finally { + BigQueryJdbcMdc.clear(); + } } /** @@ -354,12 +380,19 @@ public Statement createStatement() throws SQLException { @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { - checkClosed(); - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY - || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { - throw new BigQueryJdbcSqlFeatureNotSupportedException("Unsupported createStatement feature."); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + throw new BigQueryJdbcSqlFeatureNotSupportedException( + "Unsupported createStatement feature."); + } + return createStatement(); + } finally { + BigQueryJdbcMdc.clear(); } - return createStatement(); } /** @@ -378,31 +411,49 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency) @Override public Statement createStatement( int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - LOG.finest("++enter++"); - checkClosed(); - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY - || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY - || resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { - throw new BigQueryJdbcSqlFeatureNotSupportedException("Unsupported createStatement feature"); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY + || resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { + throw new BigQueryJdbcSqlFeatureNotSupportedException( + "Unsupported createStatement feature"); + } + return createStatement(); + } finally { + BigQueryJdbcMdc.clear(); } - return createStatement(); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { - checkClosed(); - PreparedStatement currentStatement = new BigQueryPreparedStatement(this, sql); - LOG.fine("Prepared Statement %s created.", currentStatement); - addOpenStatements(currentStatement); - return currentStatement; + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + PreparedStatement currentStatement = new BigQueryPreparedStatement(this, sql); + LOG.fine("Prepared Statement %s created.", currentStatement); + addOpenStatements(currentStatement); + return currentStatement; + } finally { + BigQueryJdbcMdc.clear(); + } } @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - if (autoGeneratedKeys != Statement.NO_GENERATED_KEYS) { - throw new BigQueryJdbcSqlFeatureNotSupportedException("autoGeneratedKeys is not supported"); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (autoGeneratedKeys != Statement.NO_GENERATED_KEYS) { + throw new BigQueryJdbcSqlFeatureNotSupportedException("autoGeneratedKeys is not supported"); + } + return prepareStatement(sql); + } finally { + BigQueryJdbcMdc.clear(); } - return prepareStatement(sql); } @Override @@ -414,23 +465,36 @@ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throw public PreparedStatement prepareStatement( String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY - || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY - || resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { - throw new BigQueryJdbcSqlFeatureNotSupportedException("Unsupported prepareStatement feature"); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY + || resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { + throw new BigQueryJdbcSqlFeatureNotSupportedException( + "Unsupported prepareStatement feature"); + } + return prepareStatement(sql); + } finally { + BigQueryJdbcMdc.clear(); } - return prepareStatement(sql); } @Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - LOG.finest("++enter++"); - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY - || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { - throw new BigQueryJdbcSqlFeatureNotSupportedException("Unsupported prepareStatement feature"); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + throw new BigQueryJdbcSqlFeatureNotSupportedException( + "Unsupported prepareStatement feature"); + } + return prepareStatement(sql); + } finally { + BigQueryJdbcMdc.clear(); } - return prepareStatement(sql); } public DatasetId getDefaultDataset() { @@ -645,36 +709,46 @@ Long getListenerPoolSize() { @Override public boolean isValid(int timeout) throws SQLException { - if (timeout < 0) { - throw new BigQueryJdbcException("timeout must be >= 0"); - } - if (!isClosed()) { - try (Statement statement = createStatement(); - ResultSet rs = statement.executeQuery("SELECT 1")) { - LOG.finest("Running validation query"); - // TODO(obada): set query timeout when it's implemented - // TODO(obada): use dry run - if (rs.next()) { - if (rs.getInt(1) == 1) { - return true; + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (timeout < 0) { + throw new BigQueryJdbcException("timeout must be >= 0"); + } + if (!isClosed()) { + try (Statement statement = createStatement(); + ResultSet rs = statement.executeQuery("SELECT 1")) { + LOG.finest("Running validation query"); + if (rs.next()) { + if (rs.getInt(1) == 1) { + return true; + } } + } catch (SQLException ex) { + // Ignore } - } catch (SQLException ex) { - // Ignore } + return false; + } finally { + BigQueryJdbcMdc.clear(); } - return false; } @Override public void abort(Executor executor) throws SQLException { - LOG.finest("++enter++"); - close(); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + close(); + } finally { + BigQueryJdbcMdc.clear(); + } } - // TODO: Throw exception translation of BigQueryJdbcSqlClientInfoException when implementing below @Override - public void setClientInfo(String name, String value) {} + public void setClientInfo(String name, String value) { + // no-op + } @Override public String getClientInfo(String name) { @@ -692,7 +766,9 @@ public Properties getClientInfo() { } @Override - public void setClientInfo(Properties properties) {} + public void setClientInfo(Properties properties) { + // no-op + } @Override public SQLWarning getWarnings() { @@ -706,102 +782,125 @@ public void clearWarnings() { @Override public boolean getAutoCommit() { - checkClosed(); - return this.autoCommit; + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + return this.autoCommit; + } finally { + BigQueryJdbcMdc.clear(); + } } - /** - * Sets this connection's auto-commit mode to the given state.
- * If this method is called during a transaction and the auto-commit mode is changed, the - * transaction is committed. If setAutoCommit is called and the auto-commit mode is not changed, - * the call is a no-op. - * - * @param autoCommit {@code true} to enable auto-commit mode; {@code false} to disable it - * @see Connection#setAutoCommit(boolean) - */ @Override public void setAutoCommit(boolean autoCommit) throws SQLException { - LOG.finest("++enter++"); - checkClosed(); - checkIfEnabledSession("setAutoCommit"); - if (this.autoCommit == autoCommit) { - return; - } + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + checkIfEnabledSession("setAutoCommit"); + if (this.autoCommit == autoCommit) { + return; + } - if (isTransactionStarted()) { - commitTransaction(); - } + if (isTransactionStarted()) { + commitTransaction(); + } - this.autoCommit = autoCommit; - if (!this.autoCommit) { - beginTransaction(); + this.autoCommit = autoCommit; + if (!this.autoCommit) { + beginTransaction(); + } + } finally { + BigQueryJdbcMdc.clear(); } } @Override public void commit() { - LOG.finest("++enter++"); - checkClosed(); - checkIfEnabledSession("commit"); - if (!isTransactionStarted()) { - throw new IllegalStateException( - "Cannot commit without an active transaction. Please set setAutoCommit to false to start" - + " a transaction."); - } - commitTransaction(); - if (!getAutoCommit()) { - beginTransaction(); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + checkIfEnabledSession("commit"); + if (!isTransactionStarted()) { + throw new IllegalStateException( + "Cannot commit without an active transaction. Please set setAutoCommit to false to start" + + " a transaction."); + } + commitTransaction(); + if (!getAutoCommit()) { + beginTransaction(); + } + } finally { + BigQueryJdbcMdc.clear(); } } @Override public void rollback() throws SQLException { - LOG.finest("++enter++"); - checkClosed(); - checkIfEnabledSession("rollback"); - if (!isTransactionStarted()) { - throw new IllegalStateException( - "Cannot rollback without an active transaction. Please set setAutoCommit to false to" - + " start a transaction."); - } try { - QueryJobConfiguration transactionRollbackJobConfig = - QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;") - .setConnectionProperties(this.queryProperties) - .build(); - Job rollbackJob = this.bigQuery.create(JobInfo.of(transactionRollbackJobConfig)); - rollbackJob.waitFor(); - this.transactionStarted = false; - if (!getAutoCommit()) { - beginTransaction(); + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + checkClosed(); + checkIfEnabledSession("rollback"); + if (!isTransactionStarted()) { + throw new IllegalStateException( + "Cannot rollback without an active transaction. Please set setAutoCommit to false to" + + " start a transaction."); } - } catch (InterruptedException | BigQueryException ex) { - LOG.severe(ex, "Failed to rollback transaction"); - throw new BigQueryJdbcException(ex); + try { + QueryJobConfiguration transactionRollbackJobConfig = + QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;") + .setConnectionProperties(this.queryProperties) + .build(); + Job rollbackJob = this.bigQuery.create(JobInfo.of(transactionRollbackJobConfig)); + rollbackJob.waitFor(); + this.transactionStarted = false; + if (!getAutoCommit()) { + beginTransaction(); + } + } catch (InterruptedException | BigQueryException ex) { + LOG.severe(ex, "Failed to rollback transaction"); + throw new BigQueryJdbcException(ex); + } + } finally { + BigQueryJdbcMdc.clear(); } } @Override public DatabaseMetaData getMetaData() throws SQLException { - if (databaseMetaData == null) { - databaseMetaData = new BigQueryDatabaseMetaData(this); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (databaseMetaData == null) { + databaseMetaData = new BigQueryDatabaseMetaData(this); + } + return databaseMetaData; + } finally { + BigQueryJdbcMdc.clear(); } - return databaseMetaData; } @Override public int getTransactionIsolation() { - // only supports Connection.TRANSACTION_SERIALIZABLE return Connection.TRANSACTION_SERIALIZABLE; } @Override public void setTransactionIsolation(int level) throws SQLException { - if (level != Connection.TRANSACTION_SERIALIZABLE) { - throw new BigQueryJdbcSqlFeatureNotSupportedException( - "Transaction serializable not supported"); + try { + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + if (level != Connection.TRANSACTION_SERIALIZABLE) { + throw new BigQueryJdbcSqlFeatureNotSupportedException( + "Unsupported transaction isolation level"); + } + this.transactionIsolation = level; + } finally { + BigQueryJdbcMdc.clear(); } - this.transactionIsolation = level; } @Override @@ -827,38 +926,46 @@ public void setHoldability(int holdability) throws SQLException { */ @Override public void close() throws SQLException { - LOG.fine("Closing Connection " + this); - // TODO(neenu-postMVP): Release all connection state objects - // check for and close all existing transactions - - if (isClosed()) { - return; - } try { - if (this.bigQueryReadClient != null) { - this.bigQueryReadClient.shutdown(); - this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES); - this.bigQueryReadClient.close(); + if (isClosed()) { + return; } - if (this.bigQueryWriteClient != null) { - this.bigQueryWriteClient.shutdown(); - this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES); - this.bigQueryWriteClient.close(); - } + BigQueryJdbcMdc.registerInstance(this, this.connectionId); + LOG.finest("++enter++"); + LOG.fine("Closing Connection " + this); - for (Statement statement : this.openStatements) { - statement.close(); + try { + if (this.bigQueryReadClient != null) { + this.bigQueryReadClient.shutdown(); + this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES); + this.bigQueryReadClient.close(); + } + + if (this.bigQueryWriteClient != null) { + this.bigQueryWriteClient.shutdown(); + this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES); + this.bigQueryWriteClient.close(); + } + + for (Statement statement : this.openStatements) { + statement.close(); + } + this.openStatements.clear(); + } catch (ConcurrentModificationException ex) { + LOG.severe(ex, "Concurrent modification during close"); + throw new BigQueryJdbcException(ex); + } catch (InterruptedException e) { + LOG.severe(e, "Interrupted during close"); + throw new BigQueryJdbcRuntimeException(e); + } finally { + BigQueryJdbcMdc.removeInstance(this); + BigQueryJdbcRootLogger.closeConnectionHandler(this.connectionId); } - this.openStatements.clear(); - } catch (ConcurrentModificationException ex) { - LOG.severe(ex, "Concurrent modification during close"); - throw new BigQueryJdbcException(ex); - } catch (InterruptedException e) { - LOG.severe(e, "Interrupted during close"); - throw new BigQueryJdbcRuntimeException(e); + this.isClosed = true; + } finally { + BigQueryJdbcMdc.clear(); } - this.isClosed = true; } @Override @@ -868,6 +975,7 @@ public boolean isClosed() { private void checkClosed() { if (isClosed()) { + LOG.severe("This " + getClass().getName() + " has been closed"); throw new IllegalStateException("This " + getClass().getName() + " has been closed"); } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcRootLogger.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcRootLogger.java index cb9b6d0835dc..32772521e9c2 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcRootLogger.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcRootLogger.java @@ -19,14 +19,10 @@ import com.google.common.base.Strings; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.logging.ConsoleHandler; -import java.util.logging.FileHandler; import java.util.logging.Formatter; import java.util.logging.Handler; import java.util.logging.Level; @@ -46,13 +42,11 @@ class BigQueryJdbcRootLogger { private static final boolean isTest = Boolean.getBoolean("JDBC_TESTS"); private static Handler fileHandler = null; - private static Path currentLogPath = null; - private static int fileCounter = 0; static final String PROCESS_ID = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; - private static final ThreadLocal DATE_FORMATTER = - ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); static String getThreadName(long threadId) { Thread current = Thread.currentThread(); @@ -92,7 +86,10 @@ public static Formatter getFormatter() { @Override public String format(LogRecord record) { - String date = DATE_FORMATTER.get().format(new Date(record.getMillis())); + String date = DATE_FORMATTER.format(Instant.ofEpochMilli(record.getMillis())); + String connectionId = BigQueryJdbcMdc.getConnectionId(); + String connStr = + (connectionId != null && !connectionId.isEmpty()) ? connectionId : "NO_CONN"; long threadId = record.getThreadID(); String threadName = getThreadName(threadId); @@ -104,13 +101,13 @@ public String format(LogRecord record) { String sourceClassName = record.getLoggerName(); String sourceMethodName = record.getSourceMethodName(); - // Expected log format: yyyy-MM-dd HH:mm:ss.SSS LEVEL PID --- [THREAD] CLASS METHOD: MESSAGE - // Example: 2026-04-22 10:16:00.123 INFO 12345 --- [main ] - // com.google.cloud.bigquery.jdbc.BigQueryConnection connect : Connection - // successful + // Expected log format: yyyy-MM-dd HH:mm:ss.SSS [CONNECTION_ID] LEVEL PID --- [THREAD] CLASS + // METHOD: MESSAGE StringBuilder sb = new StringBuilder(256); sb.append(date) - .append(" ") + .append(" [") + .append(connStr) + .append("] ") .append(Strings.padStart(record.getLevel().getName(), 5, ' ')) .append(" ") .append(PROCESS_ID) @@ -139,41 +136,9 @@ public static Logger getRootLogger() { return logger; } - private static void setHandler() throws IOException { - // If Console handler exists, remove it. - // If File handler exists, use it. Else create new one. - for (Handler h : logger.getHandlers()) { - if (h instanceof ConsoleHandler) { - if (!isTest) { - h.close(); - logger.removeHandler(h); - } - } else if (h instanceof FileHandler) { - fileHandler = h; - } - } - - if (fileHandler == null) { - String fileName = String.format("BigQueryJdbc%d", fileCounter); - fileCounter++; - - currentLogPath = Files.createTempFile(fileName, ".log"); - currentLogPath.toFile().deleteOnExit(); - - fileHandler = new FileHandler(currentLogPath.toString(), 0, 1, true); - logger.addHandler(fileHandler); - } - } - public static void setLevel(Level level, String logPath) throws IOException { if (level != Level.OFF) { - setPath(logPath); - if (logger.getHandlers().length == 0) { - setHandler(); - fileHandler.setFormatter(getFormatter()); - logger.setUseParentHandlers(false); - } - fileHandler.setLevel(level); + setPath(logPath, level); logger.setLevel(level); } else { for (Handler h : logger.getHandlers()) { @@ -181,45 +146,36 @@ public static void setLevel(Level level, String logPath) throws IOException { logger.removeHandler(h); } fileHandler = null; - currentLogPath = null; } } - static void setPath(String logPath) { + static void setPath(String logPath, Level level) { try { + if (logPath == null) { + logPath = ""; + } if (!logPath.isEmpty() && !logPath.endsWith("/")) { logPath = logPath + "/"; } - Path dir = Paths.get(logPath); - if (!Files.exists(dir)) { - Files.createDirectory(dir); - } - - String fileName = String.format("BigQueryJdbc%d.log", fileCounter); - fileCounter++; - Path destination = Paths.get(logPath + fileName).toAbsolutePath(); - if (currentLogPath != null && !currentLogPath.equals(destination)) { - Path source = Paths.get(currentLogPath.toUri()); - Files.move(source, destination, StandardCopyOption.REPLACE_EXISTING); - } - - currentLogPath = destination; - fileHandler = new FileHandler(currentLogPath.toString(), 0, 1, true); - fileHandler.setFormatter(getFormatter()); - - for (Handler h : logger.getHandlers()) { - if (h instanceof FileHandler) { - h.close(); - logger.removeHandler(h); - break; - } + if (fileHandler != null) { + fileHandler.close(); + logger.removeHandler(fileHandler); } + fileHandler = new PerConnectionFileHandler(logPath, level); + fileHandler.setLevel(level); logger.addHandler(fileHandler); + logger.setUseParentHandlers(false); - } catch (IOException ex) { + } catch (Exception ex) { logger.warning("Log File warning : " + ex); } } + + public static void closeConnectionHandler(String connectionId) { + if (fileHandler instanceof PerConnectionFileHandler) { + ((PerConnectionFileHandler) fileHandler).closeHandler(connectionId); + } + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index ac2ee99fdb53..cb42308d9449 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -104,6 +104,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { protected int currentJobIdIndex = -1; protected List batchQueries = new ArrayList<>(); protected BigQueryConnection connection; + protected String connectionId; protected int maxFieldSize = 0; protected int maxRows = 0; protected boolean isClosed = false; @@ -149,6 +150,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { @VisibleForTesting public BigQueryStatement(BigQueryConnection connection) { this.connection = connection; + this.connectionId = connection.getConnectionId(); this.bigQuery = connection.getBigQuery(); this.querySettings = generateBigQuerySettings(); } @@ -233,48 +235,65 @@ private BigQuerySettings generateBigQuerySettings() { */ @Override public ResultSet executeQuery(String sql) throws SQLException { - // TODO: write method to return state variables to original state. - LOG.finest("++enter++"); - logQueryExecutionStart(sql); + checkClosed(); try { - QueryJobConfiguration jobConfiguration = - setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - LOG.severe(ex, "Interrupted during executeQuery"); - throw new BigQueryJdbcException(ex); - } + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + // TODO: write method to return state variables to original state. + LOG.finest("++enter++"); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = + setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + LOG.severe(ex, "Interrupted during executeQuery"); + throw new BigQueryJdbcException(ex); + } - if (!isSingularResultSet()) { - throw new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); + if (!isSingularResultSet()) { + throw new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); + } + // This contains all the other assertions spec required on this method + return getCurrentResultSet(); + } finally { + BigQueryJdbcMdc.clear(); } - // This contains all the other assertions spec required on this method - return getCurrentResultSet(); } @Override public long executeLargeUpdate(String sql) throws SQLException { - LOG.finest("++enter++"); - logQueryExecutionStart(sql); + checkClosed(); try { - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); - runQuery(sql, jobConfiguration.build()); - } catch (InterruptedException ex) { - LOG.severe(ex, "Interrupted during executeLargeUpdate"); - throw new BigQueryJdbcRuntimeException(ex); - } - if (this.currentUpdateCount == -1) { - throw new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + LOG.finest("++enter++"); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); + runQuery(sql, jobConfiguration.build()); + } catch (InterruptedException ex) { + LOG.severe(ex, "Interrupted during executeLargeUpdate"); + throw new BigQueryJdbcRuntimeException(ex); + } + if (this.currentUpdateCount == -1) { + throw new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); + } + return this.currentUpdateCount; + } finally { + BigQueryJdbcMdc.clear(); } - return this.currentUpdateCount; } @Override public int executeUpdate(String sql) throws SQLException { - LOG.finest("++enter++"); - return checkUpdateCount(executeLargeUpdate(sql)); + try { + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + LOG.finest("++enter++"); + return checkUpdateCount(executeLargeUpdate(sql)); + } finally { + BigQueryJdbcMdc.clear(); + } } int checkUpdateCount(long updateCount) { @@ -289,19 +308,25 @@ int checkUpdateCount(long updateCount) { @Override public boolean execute(String sql) throws SQLException { - LOG.finest("++enter++"); - logQueryExecutionStart(sql); + checkClosed(); try { - QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); - // If Large Results are enabled, ensure query type is SELECT - if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { - jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + LOG.finest("++enter++"); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); + // If Large Results are enabled, ensure query type is SELECT + if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { + jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + } + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); } - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); + return getCurrentResultSet() != null; + } finally { + BigQueryJdbcMdc.clear(); } - return getCurrentResultSet() != null; } StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException { @@ -363,23 +388,28 @@ QueryStatistics getQueryStatistics(QueryJobConfiguration queryJobConfiguration) */ @Override public void close() throws SQLException { - LOG.fine("Closing Statement %s.", this); if (isClosed()) { return; } - - boolean cancelSucceeded = false; try { - cancel(); // This attempts to cancel jobs and calls closeStatementResources() - cancelSucceeded = true; - } catch (SQLException e) { - LOG.warning("Failed to cancel statement during close().", e); - } finally { - if (!cancelSucceeded) { - closeStatementResources(); + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + LOG.fine("Closing Statement %s.", this); + + boolean cancelSucceeded = false; + try { + cancel(); // This attempts to cancel jobs and calls closeStatementResources() + cancelSucceeded = true; + } catch (SQLException e) { + LOG.warning("Failed to cancel statement during close().", e); + } finally { + if (!cancelSucceeded) { + closeStatementResources(); + } + this.connection = null; + this.isClosed = true; } - this.connection = null; - this.isClosed = true; + } finally { + BigQueryJdbcMdc.clear(); } } @@ -429,28 +459,33 @@ public void setQueryTimeout(int seconds) { */ @Override public void cancel() throws SQLException { - LOG.finest("Statement %s cancelled", this); - synchronized (cancelLock) { - this.isCanceled = true; - for (JobId jobId : this.jobIds) { - try { - this.bigQuery.cancel(jobId); - LOG.info("Job " + jobId + "cancelled."); - } catch (BigQueryException e) { - if (e.getMessage() != null - && (e.getMessage().contains("Job is already in state DONE") - || e.getMessage().contains("Error: 3848323"))) { - LOG.warning("Attempted to cancel a job that was already done: " + jobId); - } else { - throw new BigQueryJdbcException(e); + try { + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); + LOG.finest("Statement %s cancelled", this); + synchronized (cancelLock) { + this.isCanceled = true; + for (JobId jobId : this.jobIds) { + try { + this.bigQuery.cancel(jobId); + LOG.info("Job " + jobId + "cancelled."); + } catch (BigQueryException e) { + if (e.getMessage() != null + && (e.getMessage().contains("Job is already in state DONE") + || e.getMessage().contains("Error: 3848323"))) { + LOG.warning("Attempted to cancel a job that was already done: " + jobId); + } else { + throw new BigQueryJdbcException(e); + } } } + jobIds.clear(); } - jobIds.clear(); + // If a ResultSet exists, then it will be closed as well, closing the + // ownedThreads + closeStatementResources(); + } finally { + BigQueryJdbcMdc.clear(); } - // If a ResultSet exists, then it will be closed as well, closing the - // ownedThreads - closeStatementResources(); } @Override @@ -1539,6 +1574,7 @@ protected void logQueryExecutionStart(String sql) { /** Throws a {@link BigQueryJdbcException} if this object is closed */ void checkClosed() throws SQLException { if (isClosed()) { + LOG.severe("This " + getClass().getName() + " has been closed"); throw new BigQueryJdbcException("This " + getClass().getName() + " has been closed"); } }