diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index a9323fbdc25f..fd38ad6b286d 100644 --- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -16,8 +16,6 @@ package com.google.cloud.executor.spanner; -import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; - import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; @@ -55,6 +53,7 @@ import com.google.cloud.spanner.Mutation.WriteBuilder; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Partition; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; @@ -156,6 +155,8 @@ import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction; import com.google.spanner.executor.v1.UpdateCloudInstanceAction; import com.google.spanner.v1.StructType; +import com.google.spanner.v1.TransactionOptions.IsolationLevel; +import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; import com.google.spanner.v1.TypeAnnotationCode; import com.google.spanner.v1.TypeCode; import io.grpc.Status; @@ -174,6 +175,7 @@ import java.time.Duration; import java.time.LocalDate; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -248,40 +250,60 @@ public static String unexpectedExceptionResponse(Exception e) { * again. */ private static class ReadWriteTransaction { + private final DatabaseClient dbClient; private TransactionRunner runner; private TransactionContext txnContext; private com.google.protobuf.Timestamp timestamp; private Mode finishMode; + private SpannerException abortedException; private SpannerException error; private final String transactionSeed; private final boolean optimistic; + private final boolean repeatableRead; // Set to true when the transaction runner completed, one of these three could happen: runner // committed, abandoned or threw an error. private boolean runnerCompleted; public ReadWriteTransaction( - DatabaseClient dbClient, String transactionSeed, boolean optimistic) { + DatabaseClient dbClient, + String transactionSeed, + boolean optimistic, + boolean repeatableRead) { this.dbClient = dbClient; this.transactionSeed = transactionSeed; this.optimistic = optimistic; + this.repeatableRead = repeatableRead; this.runnerCompleted = false; } /** Set context to be used for executing actions. */ private synchronized void setContext(TransactionContext transaction) { finishMode = null; + abortedException = null; txnContext = transaction; Preconditions.checkNotNull(txnContext); LOGGER.log(Level.INFO, "Transaction callable created, setting context %s\n", transactionSeed); notifyAll(); } + private synchronized void setAborted(SpannerException abortedException) { + LOGGER.log(Level.INFO, "Got aborted exception %s\n", abortedException.toString()); + this.abortedException = abortedException; + notifyAll(); + } + /** Wait for finishAction to be executed and return the requested finish mode. */ - private synchronized Mode waitForFinishAction() throws Exception { - while (finishMode == null) { + private synchronized Mode waitForFinishActionOrAbort() throws Exception { + while (finishMode == null && abortedException == null) { wait(); } + // If a read aborted, throw the exception to the TransactionRunner callable to + // restart the transaction. + if (abortedException != null) { + LOGGER.log(Level.INFO, "Throw aborted exception %s\n", abortedException.toString()); + throw abortedException; + } return finishMode; } @@ -320,9 +342,27 @@ public synchronized com.google.protobuf.Timestamp getTimestamp() { return timestamp; } - /** Return the transactionContext to run actions. Must be called after start action. */ + /** Return the transactionContext to run actions, waiting until it is set. */ public synchronized TransactionContext getContext() { - Preconditions.checkState(txnContext != null); + while (txnContext == null || abortedException != null) { + // If the transaction was aborted by a read action, the abortedException will + // be thrown to the TransactionRunner callable to restart the transaction. + // The restarted callable will call setContext() to set the new transaction context + // and clear abortedException. + if (abortedException != null) { + LOGGER.log(Level.INFO, "Waiting for new RW transaction context after abort\n"); + } else { + LOGGER.log(Level.INFO, "Waiting for RW transaction context."); + } + try { + wait(); + } catch (InterruptedException e) { + LOGGER.log(Level.INFO, "Interrupted while waiting for RW transaction context."); + Thread.currentThread().interrupt(); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.CANCELLED, "Interrupted while waiting for transaction context", e); + } + } return txnContext; } @@ -339,7 +379,7 @@ public void startRWTransaction() throws Exception { String.format( "Transaction context set, executing and waiting for finish %s\n", transactionSeed)); - Mode mode = waitForFinishAction(); + Mode mode = waitForFinishActionOrAbort(); if (mode == Mode.ABANDON) { throw new Exception(TRANSACTION_ABANDONED); } @@ -351,10 +391,21 @@ public void startRWTransaction() throws Exception { context.wrap( () -> { try { + List transactionOptions = new ArrayList<>(); + if (repeatableRead) { + transactionOptions.add(Options.isolationLevel(IsolationLevel.REPEATABLE_READ)); + } else { + transactionOptions.add(Options.isolationLevel(IsolationLevel.SERIALIZABLE)); + } + if (optimistic) { + transactionOptions.add(Options.readLockMode(ReadLockMode.OPTIMISTIC)); + } else { + transactionOptions.add(Options.readLockMode(ReadLockMode.PESSIMISTIC)); + } runner = - optimistic - ? dbClient.readWriteTransaction(Options.optimisticLock()) - : dbClient.readWriteTransaction(); + dbClient.readWriteTransaction( + transactionOptions.toArray( + new TransactionOption[transactionOptions.size()])); LOGGER.log( Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); runner.run(callable); @@ -397,7 +448,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception { "TxnContext cleared, sending finishMode to finish transaction %s\n", transactionSeed)); notifyAll(); - // Wait for the transaction to finish or restart + // Wait for the transaction to finish or restart due to an abort on COMMIT. while (txnContext == null && !runnerCompleted) { wait(); } @@ -434,6 +485,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception { * initialized. */ class ExecutionFlowContext { + // Database path from previous action private String prevDbPath; // Current read-write transaction @@ -448,9 +500,6 @@ class ExecutionFlowContext { private Metadata metadata; // Number of pending read/query actions. private int numPendingReads; - // Indicate whether there's a read/query action got aborted and the transaction need to be - // reset. - private boolean readAborted; // Log the workid and op pair for tracing the thread. private String transactionSeed; // Outgoing stream. @@ -588,7 +637,11 @@ public synchronized void startReadWriteTxn( String.format( "There's no active transaction, safe to create rwTxn: %s\n", getTransactionSeed())); this.metadata = metadata; - rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, options.getOptimistic()); + boolean optimistic = + options.getSerializableOptimistic() || options.getSnapshotIsolationOptimistic(); + boolean repeatableRead = + options.getSnapshotIsolationOptimistic() || options.getSnapshotIsolationPessimistic(); + rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, optimistic, repeatableRead); LOGGER.log( Level.INFO, String.format( @@ -644,20 +697,17 @@ public synchronized void startRead() { * Decrease the read count when a read/query is finished, if status is aborted and there's no * pending read/query, reset the transaction for retry. */ - public synchronized void finishRead(Status status) { + public synchronized void finishRead(Status status, SpannerException e) { if (status.getCode() == Status.ABORTED.getCode()) { - readAborted = true; + if (rwTxn != null) { + rwTxn.setAborted(e); + } } --numPendingReads; - if (readAborted && numPendingReads <= 0) { - LOGGER.log(Level.FINE, "Transaction reset due to read/query abort"); - readAborted = false; - } } /** Initialize the read count and aborted status when transaction started. */ public synchronized void initReadState() { - readAborted = false; numPendingReads = 0; } @@ -724,6 +774,12 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) { if (rwTxn.getTimestamp() != null) { outcomeBuilder.setCommitTime(rwTxn.getTimestamp()); } + if (finishMode == Mode.COMMIT + && rwTxn.runner.getCommitResponse().getSnapshotTimestamp() != null) { + outcomeBuilder.setSnapshotIsolationTxnReadTimestamp( + Timestamps.toMicros( + rwTxn.runner.getCommitResponse().getSnapshotTimestamp().toProto())); + } clear(); } } @@ -761,7 +817,7 @@ public synchronized void closeBatchTxn() throws SpannerException { } private Spanner client; - private Spanner clientWithTimeout; + private Map clientWithTimeoutMap = new HashMap<>(); private static final String TRANSACTION_ABANDONED = "Fake error to abandon transaction"; @@ -782,23 +838,25 @@ public synchronized void closeBatchTxn() throws SpannerException { private synchronized Spanner getClientWithTimeout( long timeoutSeconds, boolean useMultiplexedSession) throws IOException { - if (clientWithTimeout != null) { - return clientWithTimeout; + if (clientWithTimeoutMap.containsKey(timeoutSeconds)) { + return clientWithTimeoutMap.get(timeoutSeconds); } - clientWithTimeout = getClient(timeoutSeconds, useMultiplexedSession); - return clientWithTimeout; + clientWithTimeoutMap.put( + timeoutSeconds, initializeClient(timeoutSeconds, useMultiplexedSession)); + return clientWithTimeoutMap.get(timeoutSeconds); } private synchronized Spanner getClient(boolean useMultiplexedSession) throws IOException { if (client != null) { return client; } - client = getClient(/* timeoutSeconds= */ 0, useMultiplexedSession); + client = initializeClient(/* timeoutSeconds= */ 0, useMultiplexedSession); return client; } - // Return the spanner client, create one if not exists. - private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplexedSession) + // Initializes a newly created spanner client. NEVER CALL THIS METHOD DIRECTLY. + // ALWAYS CALL getClientWithTimeout() or getClient() INSTEAD. + private synchronized Spanner initializeClient(long timeoutSeconds, boolean useMultiplexedSession) throws IOException { // Create a cloud spanner client Credentials credentials; @@ -2807,7 +2865,7 @@ private Status processResults( Level.INFO, String.format( "Successfully processed result: %s\n", executionContext.getTransactionSeed())); - executionContext.finishRead(Status.OK); + executionContext.finishRead(Status.OK, null); return sender.finishWithOK(); } catch (SpannerException e) { LOGGER.log(Level.WARNING, "Encountered exception: ", e); @@ -2817,7 +2875,7 @@ private Status processResults( String.format( "Encountered exception: %s %s\n", status.getDescription(), executionContext.getTransactionSeed())); - executionContext.finishRead(status); + executionContext.finishRead(status, e); if (status.getCode() == Status.ABORTED.getCode()) { return sender.finishWithTransactionRestarted(); } else { diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java index 0da30f82d23c..a1b532aca5ec 100644 --- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java +++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java @@ -70,7 +70,7 @@ public class WorkerProxy { public static int proxyPort = 0; public static String cert = ""; public static String serviceKeyFile = ""; - public static double multiplexedSessionOperationsRatio = 0.0; + public static double multiplexedSessionOperationsRatio = 1.0; public static boolean usePlainTextChannel = false; public static boolean enableGrpcFaultInjector = false; public static OpenTelemetrySdk openTelemetrySdk; @@ -231,4 +231,4 @@ private static CommandLine buildOptions(String[] args) { throw new IllegalArgumentException(e.getMessage()); } } -} +} \ No newline at end of file