Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.cloud.executor.spanner;

import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
Expand Down Expand Up @@ -55,6 +53,7 @@
import com.google.cloud.spanner.Mutation.WriteBuilder;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
Expand Down Expand Up @@ -156,6 +155,8 @@
import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction;
import com.google.spanner.executor.v1.UpdateCloudInstanceAction;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
Expand All @@ -174,6 +175,7 @@
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -248,40 +250,60 @@ public static String unexpectedExceptionResponse(Exception e) {
* again.
*/
private static class ReadWriteTransaction {

private final DatabaseClient dbClient;
private TransactionRunner runner;
private TransactionContext txnContext;
private com.google.protobuf.Timestamp timestamp;
private Mode finishMode;
private SpannerException abortedException;
private SpannerException error;
private final String transactionSeed;
private final boolean optimistic;
private final boolean repeatableRead;
// Set to true when the transaction runner completed, one of these three could happen: runner
// committed, abandoned or threw an error.
private boolean runnerCompleted;

public ReadWriteTransaction(
DatabaseClient dbClient, String transactionSeed, boolean optimistic) {
DatabaseClient dbClient,
String transactionSeed,
boolean optimistic,
boolean repeatableRead) {
this.dbClient = dbClient;
this.transactionSeed = transactionSeed;
this.optimistic = optimistic;
this.repeatableRead = repeatableRead;
this.runnerCompleted = false;
}

/** Set context to be used for executing actions. */
private synchronized void setContext(TransactionContext transaction) {
finishMode = null;
abortedException = null;
txnContext = transaction;
Preconditions.checkNotNull(txnContext);
LOGGER.log(Level.INFO, "Transaction callable created, setting context %s\n", transactionSeed);
notifyAll();
}

private synchronized void setAborted(SpannerException abortedException) {
LOGGER.log(Level.INFO, "Got aborted exception %s\n", abortedException.toString());
this.abortedException = abortedException;
notifyAll();
}

/** Wait for finishAction to be executed and return the requested finish mode. */
private synchronized Mode waitForFinishAction() throws Exception {
while (finishMode == null) {
private synchronized Mode waitForFinishActionOrAbort() throws Exception {
while (finishMode == null && abortedException == null) {
wait();
}
// If a read aborted, throw the exception to the TransactionRunner callable to
// restart the transaction.
if (abortedException != null) {
LOGGER.log(Level.INFO, "Throw aborted exception %s\n", abortedException.toString());
throw abortedException;
}
return finishMode;
}

Expand Down Expand Up @@ -320,9 +342,27 @@ public synchronized com.google.protobuf.Timestamp getTimestamp() {
return timestamp;
}

/** Return the transactionContext to run actions. Must be called after start action. */
/** Return the transactionContext to run actions, waiting until it is set. */
public synchronized TransactionContext getContext() {
Preconditions.checkState(txnContext != null);
while (txnContext == null || abortedException != null) {
// If the transaction was aborted by a read action, the abortedException will
// be thrown to the TransactionRunner callable to restart the transaction.
// The restarted callable will call setContext() to set the new transaction context
// and clear abortedException.
if (abortedException != null) {
LOGGER.log(Level.INFO, "Waiting for new RW transaction context after abort\n");
} else {
LOGGER.log(Level.INFO, "Waiting for RW transaction context.");
}
try {
wait();
} catch (InterruptedException e) {
LOGGER.log(Level.INFO, "Interrupted while waiting for RW transaction context.");
Thread.currentThread().interrupt();
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.CANCELLED, "Interrupted while waiting for transaction context", e);
}
Comment on lines +357 to +364
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Swallowing InterruptedException is a dangerous practice because it clears the thread's interrupted status without taking appropriate action. This can prevent the thread from responding correctly to cancellation or shutdown requests. The interrupted status should be restored by calling Thread.currentThread().interrupt(). Additionally, since this method must return a valid TransactionContext, throwing a SpannerException is appropriate to signal that the operation was interrupted and cannot proceed.

        try {
          wait();
        } catch (InterruptedException e) {
          LOGGER.log(Level.INFO, "Interrupted while waiting for RW transaction context.");
          Thread.currentThread().interrupt();
          throw SpannerExceptionFactory.newSpannerException(
              ErrorCode.CANCELLED, "Interrupted while waiting for transaction context", e);
        }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyang-google can you please fix this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
return txnContext;
}

Expand All @@ -339,7 +379,7 @@ public void startRWTransaction() throws Exception {
String.format(
"Transaction context set, executing and waiting for finish %s\n",
transactionSeed));
Mode mode = waitForFinishAction();
Mode mode = waitForFinishActionOrAbort();
if (mode == Mode.ABANDON) {
throw new Exception(TRANSACTION_ABANDONED);
}
Expand All @@ -351,10 +391,21 @@ public void startRWTransaction() throws Exception {
context.wrap(
() -> {
try {
List<TransactionOption> transactionOptions = new ArrayList<>();
if (repeatableRead) {
transactionOptions.add(Options.isolationLevel(IsolationLevel.REPEATABLE_READ));
} else {
transactionOptions.add(Options.isolationLevel(IsolationLevel.SERIALIZABLE));
}
if (optimistic) {
transactionOptions.add(Options.readLockMode(ReadLockMode.OPTIMISTIC));
} else {
transactionOptions.add(Options.readLockMode(ReadLockMode.PESSIMISTIC));
}
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
dbClient.readWriteTransaction(
transactionOptions.toArray(
new TransactionOption[transactionOptions.size()]));
LOGGER.log(
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
Expand Down Expand Up @@ -397,7 +448,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
"TxnContext cleared, sending finishMode to finish transaction %s\n",
transactionSeed));
notifyAll();
// Wait for the transaction to finish or restart
// Wait for the transaction to finish or restart due to an abort on COMMIT.
while (txnContext == null && !runnerCompleted) {
wait();
}
Expand Down Expand Up @@ -434,6 +485,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
* initialized.
*/
class ExecutionFlowContext {

// Database path from previous action
private String prevDbPath;
// Current read-write transaction
Expand All @@ -448,9 +500,6 @@ class ExecutionFlowContext {
private Metadata metadata;
// Number of pending read/query actions.
private int numPendingReads;
// Indicate whether there's a read/query action got aborted and the transaction need to be
// reset.
private boolean readAborted;
// Log the workid and op pair for tracing the thread.
private String transactionSeed;
// Outgoing stream.
Expand Down Expand Up @@ -588,7 +637,11 @@ public synchronized void startReadWriteTxn(
String.format(
"There's no active transaction, safe to create rwTxn: %s\n", getTransactionSeed()));
this.metadata = metadata;
rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, options.getOptimistic());
boolean optimistic =
options.getSerializableOptimistic() || options.getSnapshotIsolationOptimistic();
boolean repeatableRead =
options.getSnapshotIsolationOptimistic() || options.getSnapshotIsolationPessimistic();
rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, optimistic, repeatableRead);
LOGGER.log(
Level.INFO,
String.format(
Expand Down Expand Up @@ -644,20 +697,17 @@ public synchronized void startRead() {
* Decrease the read count when a read/query is finished, if status is aborted and there's no
* pending read/query, reset the transaction for retry.
*/
public synchronized void finishRead(Status status) {
public synchronized void finishRead(Status status, SpannerException e) {
if (status.getCode() == Status.ABORTED.getCode()) {
readAborted = true;
if (rwTxn != null) {
rwTxn.setAborted(e);
}
}
--numPendingReads;
if (readAborted && numPendingReads <= 0) {
LOGGER.log(Level.FINE, "Transaction reset due to read/query abort");
readAborted = false;
}
}

/** Initialize the read count and aborted status when transaction started. */
public synchronized void initReadState() {
readAborted = false;
numPendingReads = 0;
}

Expand Down Expand Up @@ -724,6 +774,12 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) {
if (rwTxn.getTimestamp() != null) {
outcomeBuilder.setCommitTime(rwTxn.getTimestamp());
}
if (finishMode == Mode.COMMIT
&& rwTxn.runner.getCommitResponse().getSnapshotTimestamp() != null) {
outcomeBuilder.setSnapshotIsolationTxnReadTimestamp(
Timestamps.toMicros(
rwTxn.runner.getCommitResponse().getSnapshotTimestamp().toProto()));
}
clear();
}
}
Expand Down Expand Up @@ -761,7 +817,7 @@ public synchronized void closeBatchTxn() throws SpannerException {
}

private Spanner client;
private Spanner clientWithTimeout;
private Map<Long, Spanner> clientWithTimeoutMap = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The clientWithTimeoutMap uses Long (representing timeoutSeconds) as its key. However, the client initialization also depends on the useMultiplexedSession parameter. If the executor is called with different useMultiplexedSession values for the same timeout, the current implementation will return a cached client that may not have the expected configuration. Using a String key that combines both parameters would ensure that clients are correctly isolated by their configuration.

Suggested change
private Map<Long, Spanner> clientWithTimeoutMap = new HashMap<>();
private Map<String, Spanner> clientWithTimeoutMap = new HashMap<>();


private static final String TRANSACTION_ABANDONED = "Fake error to abandon transaction";

Expand All @@ -782,23 +838,25 @@ public synchronized void closeBatchTxn() throws SpannerException {

private synchronized Spanner getClientWithTimeout(
long timeoutSeconds, boolean useMultiplexedSession) throws IOException {
if (clientWithTimeout != null) {
return clientWithTimeout;
if (clientWithTimeoutMap.containsKey(timeoutSeconds)) {
return clientWithTimeoutMap.get(timeoutSeconds);
}
clientWithTimeout = getClient(timeoutSeconds, useMultiplexedSession);
return clientWithTimeout;
clientWithTimeoutMap.put(
timeoutSeconds, initializeClient(timeoutSeconds, useMultiplexedSession));
return clientWithTimeoutMap.get(timeoutSeconds);
Comment on lines +841 to +846
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The caching logic here only considers timeoutSeconds. As noted in the map definition, this can lead to returning a client with the wrong useMultiplexedSession configuration if multiple requests with the same timeout but different session settings are processed. The key should incorporate both parameters to ensure correct client retrieval.

    String key = timeoutSeconds + ":" + useMultiplexedSession;
    if (clientWithTimeoutMap.containsKey(key)) {
      return clientWithTimeoutMap.get(key);
    }
    Spanner client = initializeClient(timeoutSeconds, useMultiplexedSession);
    clientWithTimeoutMap.put(key, client);
    return client;

}

private synchronized Spanner getClient(boolean useMultiplexedSession) throws IOException {
if (client != null) {
return client;
}
client = getClient(/* timeoutSeconds= */ 0, useMultiplexedSession);
client = initializeClient(/* timeoutSeconds= */ 0, useMultiplexedSession);
return client;
Comment on lines 850 to 854
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method caches the client in a single field without considering the useMultiplexedSession parameter. Subsequent calls with a different useMultiplexedSession value will return the previously cached client, which may have the wrong configuration. Delegating to getClientWithTimeout with a timeout of 0 will leverage the keyed caching mechanism and ensure the correct client is used.

    return getClientWithTimeout(0, useMultiplexedSession);

}

// Return the spanner client, create one if not exists.
private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplexedSession)
// Initializes a newly created spanner client. NEVER CALL THIS METHOD DIRECTLY.
// ALWAYS CALL getClientWithTimeout() or getClient() INSTEAD.
private synchronized Spanner initializeClient(long timeoutSeconds, boolean useMultiplexedSession)
throws IOException {
// Create a cloud spanner client
Credentials credentials;
Expand Down Expand Up @@ -2807,7 +2865,7 @@ private Status processResults(
Level.INFO,
String.format(
"Successfully processed result: %s\n", executionContext.getTransactionSeed()));
executionContext.finishRead(Status.OK);
executionContext.finishRead(Status.OK, null);
return sender.finishWithOK();
} catch (SpannerException e) {
LOGGER.log(Level.WARNING, "Encountered exception: ", e);
Expand All @@ -2817,7 +2875,7 @@ private Status processResults(
String.format(
"Encountered exception: %s %s\n",
status.getDescription(), executionContext.getTransactionSeed()));
executionContext.finishRead(status);
executionContext.finishRead(status, e);
if (status.getCode() == Status.ABORTED.getCode()) {
return sender.finishWithTransactionRestarted();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class WorkerProxy {
public static int proxyPort = 0;
public static String cert = "";
public static String serviceKeyFile = "";
public static double multiplexedSessionOperationsRatio = 0.0;
public static double multiplexedSessionOperationsRatio = 1.0;
public static boolean usePlainTextChannel = false;
public static boolean enableGrpcFaultInjector = false;
public static OpenTelemetrySdk openTelemetrySdk;
Expand Down Expand Up @@ -231,4 +231,4 @@ private static CommandLine buildOptions(String[] args) {
throw new IllegalArgumentException(e.getMessage());
}
}
}
}
Loading