Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,16 @@ public synchronized void bufferMutations(List<Mutation> mutations) throws Spanne
}

/** Execute a batch of updates in a read-write transaction. */
public synchronized long[] executeBatchDml(@Nonnull List<Statement> stmts)
public synchronized long[] executeBatchDml(@Nonnull List<Statement> stmts, Options.UpdateOption... options)
throws SpannerException {
for (int i = 0; i < stmts.size(); i++) {
LOGGER.log(
Level.INFO, String.format("executeBatchDml [%d]: %s", i + 1, stmts.get(i).toString()));
}
List<Options.UpdateOption> allOptions = new ArrayList<>(java.util.Arrays.asList(options));
allOptions.add(Options.tag("batch-update-transaction-tag"));
return getTransactionForWrite()
.batchUpdate(stmts, Options.tag("batch-update-transaction-tag"));
.batchUpdate(stmts, allOptions.toArray(new Options.UpdateOption[0]));
}

/** Finish active transaction in given finishMode, then send outcome back to client. */
Expand Down Expand Up @@ -2198,13 +2200,51 @@ private Status executeGenerateDbPartitionsRead(
}

/** Execute action that generates database partitions for the given query. */

private Options.ReadQueryUpdateTransactionOption buildSecureContextOption(
Map<String, com.google.spanner.executor.v1.Value> secureContextMap) {
if (secureContextMap != null && !secureContextMap.isEmpty()) {
com.google.spanner.v1.RequestOptions.ClientContext.Builder clientContextBuilder =
com.google.spanner.v1.RequestOptions.ClientContext.newBuilder();
for (Map.Entry<String, com.google.spanner.executor.v1.Value> entry :
secureContextMap.entrySet()) {
com.google.protobuf.Value.Builder valueBuilder = com.google.protobuf.Value.newBuilder();
if (entry.getValue().getValueTypeCase() == com.google.spanner.executor.v1.Value.ValueTypeCase.IS_NULL
&& entry.getValue().getIsNull()) {
valueBuilder.setNullValue(com.google.protobuf.NullValue.NULL_VALUE);
} else if (entry.getValue().getValueTypeCase() == com.google.spanner.executor.v1.Value.ValueTypeCase.STRING_VALUE) {
valueBuilder.setStringValue(entry.getValue().getStringValue());
} else {
throw new IllegalArgumentException("Unsupported secure parameter value type in executor proxy");
}
clientContextBuilder.putSecureContext(entry.getKey(), valueBuilder.build());
}
return Options.clientContext(clientContextBuilder.build());
}
return null;
}

@SuppressWarnings("unchecked")
private <T> void addSecureContextOption(
Map<String, com.google.spanner.executor.v1.Value> secureContextMap,
List<T> optionsList,
Class<?> optionClass) {
Options.ReadQueryUpdateTransactionOption secureContextOption =
buildSecureContextOption(secureContextMap);
if (secureContextOption != null && optionClass.isInstance(secureContextOption)) {
optionsList.add((T) secureContextOption);
}
}

private Status executeGenerateDbPartitionsQuery(
GenerateDbPartitionsForQueryAction action,
OutcomeSender sender,
ExecutionFlowContext executionContext) {
try {
BatchReadOnlyTransaction batchTxn = executionContext.getBatchTxn();
Statement.Builder stmt = Statement.newBuilder(action.getQuery().getSql());
List<Options.QueryOption> queryOptions = new ArrayList<>();
addSecureContextOption(action.getQuery().getSecureContextMap(), queryOptions, Options.QueryOption.class);
for (int i = 0; i < action.getQuery().getParamsCount(); ++i) {
stmt.bind(action.getQuery().getParams(i).getName())
.to(
Expand All @@ -2216,7 +2256,7 @@ private Status executeGenerateDbPartitionsQuery(
PartitionOptions.newBuilder()
.setPartitionSizeBytes(action.getDesiredBytesPerPartition())
.build();
List<Partition> parts = batchTxn.partitionQuery(partitionOptions, stmt.build());
List<Partition> parts = batchTxn.partitionQuery(partitionOptions, stmt.build(), queryOptions.toArray(new Options.QueryOption[0]));
List<BatchPartition> batchPartitions = new ArrayList<>();
for (Partition part : parts) {
batchPartitions.add(
Expand Down Expand Up @@ -2282,11 +2322,14 @@ private Status executePartitionedUpdate(
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
try {
ExecutePartitionedUpdateOptions options = action.getOptions();
List<Options.UpdateOption> optionsList = new ArrayList<>();
optionsList.add(Options.tag(options.getTag()));
optionsList.add(Options.priority(RpcPriority.fromProto(options.getRpcPriority())));
addSecureContextOption(action.getUpdate().getSecureContextMap(), optionsList, Options.UpdateOption.class);
Long count =
dbClient.executePartitionedUpdate(
Statement.of(action.getUpdate().getSql()),
Options.tag(options.getTag()),
Options.priority(RpcPriority.fromProto(options.getRpcPriority())));
optionsList.toArray(new Options.UpdateOption[0]));
SpannerActionOutcome outcome =
SpannerActionOutcome.newBuilder()
.setStatus(toProto(Status.OK))
Expand Down Expand Up @@ -2739,7 +2782,10 @@ private Status executeQuery(
String.format(
"Finish query building, ready to execute %s\n",
executionContext.getTransactionSeed()));
ResultSet result = txn.executeQuery(stmt.build(), Options.tag("query-tag"));
List<Options.QueryOption> queryOptions = new ArrayList<>();
queryOptions.add(Options.tag("query-tag"));
addSecureContextOption(action.getQuery().getSecureContextMap(), queryOptions, Options.QueryOption.class);
ResultSet result = txn.executeQuery(stmt.build(), queryOptions.toArray(new Options.QueryOption[0]));
LOGGER.log(
Level.INFO,
String.format("Parsing query result %s\n", executionContext.getTransactionSeed()));
Expand Down Expand Up @@ -2769,10 +2815,13 @@ private Status executeCloudDmlUpdate(
update.getParams(i).getType(), update.getParams(i).getValue()));
}
sender.initForQuery();
List<Options.QueryOption> queryOptions = new ArrayList<>();
queryOptions.add(Options.tag("dml-transaction-tag"));
addSecureContextOption(action.getUpdate().getSecureContextMap(), queryOptions, Options.QueryOption.class);
ResultSet result =
executionContext
.getTransactionForWrite()
.executeQuery(stmt.build(), Options.tag("dml-transaction-tag"));
.executeQuery(stmt.build(), queryOptions.toArray(new Options.QueryOption[0]));
LOGGER.log(
Level.INFO,
String.format("Parsing Dml result %s\n", executionContext.getTransactionSeed()));
Expand Down Expand Up @@ -2803,7 +2852,13 @@ private Status executeCloudBatchDmlUpdates(
}
queries.add(stmt.build());
}
long[] rowCounts = executionContext.executeBatchDml(queries);
Map<String, com.google.spanner.executor.v1.Value> secureContextMap = new java.util.HashMap<>();
for (int i = 0; i < action.getUpdatesCount(); ++i) {
secureContextMap.putAll(action.getUpdates(i).getSecureContextMap());
}
List<Options.UpdateOption> optionsList = new ArrayList<>();
addSecureContextOption(secureContextMap, optionsList, Options.UpdateOption.class);
long[] rowCounts = executionContext.executeBatchDml(queries, optionsList.toArray(new Options.UpdateOption[0]));
sender.initForQuery();
for (long rowCount : rowCounts) {
sender.appendRowsModifiedInDml(rowCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ message QueryAction {

// Parameters for the SQL string.
repeated Parameter params = 2;
// Secure context parameters.
map<string, Value> secure_context = 3;
}

// A single DML statement.
Expand Down
Loading