diff --git a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index fd38ad6b286d..aa2ffd20497a 100644 --- a/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/java-spanner/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -736,14 +736,16 @@ public synchronized void bufferMutations(List mutations) throws Spanne } /** Execute a batch of updates in a read-write transaction. */ - public synchronized long[] executeBatchDml(@Nonnull List stmts) + public synchronized long[] executeBatchDml(@Nonnull List stmts, Options.UpdateOption... options) throws SpannerException { for (int i = 0; i < stmts.size(); i++) { LOGGER.log( Level.INFO, String.format("executeBatchDml [%d]: %s", i + 1, stmts.get(i).toString())); } + List allOptions = new ArrayList<>(java.util.Arrays.asList(options)); + allOptions.add(Options.tag("batch-update-transaction-tag")); return getTransactionForWrite() - .batchUpdate(stmts, Options.tag("batch-update-transaction-tag")); + .batchUpdate(stmts, allOptions.toArray(new Options.UpdateOption[0])); } /** Finish active transaction in given finishMode, then send outcome back to client. */ @@ -2198,6 +2200,42 @@ private Status executeGenerateDbPartitionsRead( } /** Execute action that generates database partitions for the given query. */ + + private Options.ReadQueryUpdateTransactionOption buildSecureContextOption( + Map secureContextMap) { + if (secureContextMap != null && !secureContextMap.isEmpty()) { + com.google.spanner.v1.RequestOptions.ClientContext.Builder clientContextBuilder = + com.google.spanner.v1.RequestOptions.ClientContext.newBuilder(); + for (Map.Entry entry : + secureContextMap.entrySet()) { + com.google.protobuf.Value.Builder valueBuilder = com.google.protobuf.Value.newBuilder(); + if (entry.getValue().getValueTypeCase() == com.google.spanner.executor.v1.Value.ValueTypeCase.IS_NULL + && entry.getValue().getIsNull()) { + valueBuilder.setNullValue(com.google.protobuf.NullValue.NULL_VALUE); + } else if (entry.getValue().getValueTypeCase() == com.google.spanner.executor.v1.Value.ValueTypeCase.STRING_VALUE) { + valueBuilder.setStringValue(entry.getValue().getStringValue()); + } else { + throw new IllegalArgumentException("Unsupported secure parameter value type in executor proxy"); + } + clientContextBuilder.putSecureContext(entry.getKey(), valueBuilder.build()); + } + return Options.clientContext(clientContextBuilder.build()); + } + return null; + } + + @SuppressWarnings("unchecked") + private void addSecureContextOption( + Map secureContextMap, + List optionsList, + Class optionClass) { + Options.ReadQueryUpdateTransactionOption secureContextOption = + buildSecureContextOption(secureContextMap); + if (secureContextOption != null && optionClass.isInstance(secureContextOption)) { + optionsList.add((T) secureContextOption); + } + } + private Status executeGenerateDbPartitionsQuery( GenerateDbPartitionsForQueryAction action, OutcomeSender sender, @@ -2205,6 +2243,8 @@ private Status executeGenerateDbPartitionsQuery( try { BatchReadOnlyTransaction batchTxn = executionContext.getBatchTxn(); Statement.Builder stmt = Statement.newBuilder(action.getQuery().getSql()); + List queryOptions = new ArrayList<>(); + addSecureContextOption(action.getQuery().getSecureContextMap(), queryOptions, Options.QueryOption.class); for (int i = 0; i < action.getQuery().getParamsCount(); ++i) { stmt.bind(action.getQuery().getParams(i).getName()) .to( @@ -2216,7 +2256,7 @@ private Status executeGenerateDbPartitionsQuery( PartitionOptions.newBuilder() .setPartitionSizeBytes(action.getDesiredBytesPerPartition()) .build(); - List parts = batchTxn.partitionQuery(partitionOptions, stmt.build()); + List parts = batchTxn.partitionQuery(partitionOptions, stmt.build(), queryOptions.toArray(new Options.QueryOption[0])); List batchPartitions = new ArrayList<>(); for (Partition part : parts) { batchPartitions.add( @@ -2282,11 +2322,14 @@ private Status executePartitionedUpdate( PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) { try { ExecutePartitionedUpdateOptions options = action.getOptions(); + List optionsList = new ArrayList<>(); + optionsList.add(Options.tag(options.getTag())); + optionsList.add(Options.priority(RpcPriority.fromProto(options.getRpcPriority()))); + addSecureContextOption(action.getUpdate().getSecureContextMap(), optionsList, Options.UpdateOption.class); Long count = dbClient.executePartitionedUpdate( Statement.of(action.getUpdate().getSql()), - Options.tag(options.getTag()), - Options.priority(RpcPriority.fromProto(options.getRpcPriority()))); + optionsList.toArray(new Options.UpdateOption[0])); SpannerActionOutcome outcome = SpannerActionOutcome.newBuilder() .setStatus(toProto(Status.OK)) @@ -2739,7 +2782,10 @@ private Status executeQuery( String.format( "Finish query building, ready to execute %s\n", executionContext.getTransactionSeed())); - ResultSet result = txn.executeQuery(stmt.build(), Options.tag("query-tag")); + List queryOptions = new ArrayList<>(); + queryOptions.add(Options.tag("query-tag")); + addSecureContextOption(action.getQuery().getSecureContextMap(), queryOptions, Options.QueryOption.class); + ResultSet result = txn.executeQuery(stmt.build(), queryOptions.toArray(new Options.QueryOption[0])); LOGGER.log( Level.INFO, String.format("Parsing query result %s\n", executionContext.getTransactionSeed())); @@ -2769,10 +2815,13 @@ private Status executeCloudDmlUpdate( update.getParams(i).getType(), update.getParams(i).getValue())); } sender.initForQuery(); + List queryOptions = new ArrayList<>(); + queryOptions.add(Options.tag("dml-transaction-tag")); + addSecureContextOption(action.getUpdate().getSecureContextMap(), queryOptions, Options.QueryOption.class); ResultSet result = executionContext .getTransactionForWrite() - .executeQuery(stmt.build(), Options.tag("dml-transaction-tag")); + .executeQuery(stmt.build(), queryOptions.toArray(new Options.QueryOption[0])); LOGGER.log( Level.INFO, String.format("Parsing Dml result %s\n", executionContext.getTransactionSeed())); @@ -2803,7 +2852,13 @@ private Status executeCloudBatchDmlUpdates( } queries.add(stmt.build()); } - long[] rowCounts = executionContext.executeBatchDml(queries); + Map secureContextMap = new java.util.HashMap<>(); + for (int i = 0; i < action.getUpdatesCount(); ++i) { + secureContextMap.putAll(action.getUpdates(i).getSecureContextMap()); + } + List optionsList = new ArrayList<>(); + addSecureContextOption(secureContextMap, optionsList, Options.UpdateOption.class); + long[] rowCounts = executionContext.executeBatchDml(queries, optionsList.toArray(new Options.UpdateOption[0])); sender.initForQuery(); for (long rowCount : rowCounts) { sender.appendRowsModifiedInDml(rowCount); diff --git a/java-spanner/proto-google-cloud-spanner-executor-v1/src/main/proto/google/spanner/executor/v1/cloud_executor.proto b/java-spanner/proto-google-cloud-spanner-executor-v1/src/main/proto/google/spanner/executor/v1/cloud_executor.proto index 5ca3b25ac2a2..11a394ede998 100644 --- a/java-spanner/proto-google-cloud-spanner-executor-v1/src/main/proto/google/spanner/executor/v1/cloud_executor.proto +++ b/java-spanner/proto-google-cloud-spanner-executor-v1/src/main/proto/google/spanner/executor/v1/cloud_executor.proto @@ -178,6 +178,8 @@ message QueryAction { // Parameters for the SQL string. repeated Parameter params = 2; + // Secure context parameters. + map secure_context = 3; } // A single DML statement.