From 5863dbacf1e28bf73988bf79980b1a2ce3fc2f06 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 14 Apr 2026 12:02:53 -0700 Subject: [PATCH 1/6] Add Temporal Nexus Operation Handler --- .../nexus/NexusStartWorkflowHelper.java | 80 +++++++++++ ...perationToken.java => OperationToken.java} | 9 +- .../internal/nexus/OperationTokenUtil.java | 40 ++++-- .../temporal/nexus/TemporalNexusClient.java | 131 +++++++++++++++++ .../nexus/TemporalOperationCancelContext.java | 49 +++++++ .../nexus/TemporalOperationHandler.java | 124 ++++++++++++++++ .../nexus/TemporalOperationResult.java | 82 +++++++++++ .../nexus/TemporalOperationStartContext.java | 49 +++++++ .../nexus/WorkflowRunOperationImpl.java | 57 +------- .../internal/nexus/WorkflowRunTokenTest.java | 28 ++-- .../nexus/AsyncWorkflowOperationTest.java | 8 +- .../nexus/GenericHandlerCancelTest.java | 136 ++++++++++++++++++ .../nexus/GenericHandlerSyncResultTest.java | 64 +++++++++ .../nexus/GenericHandlerTypedProcTest.java | 77 ++++++++++ .../GenericHandlerTypedStartWorkflowTest.java | 108 ++++++++++++++ ...enericHandlerUntypedStartWorkflowTest.java | 77 ++++++++++ 16 files changed, 1031 insertions(+), 88 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java rename temporal-sdk/src/main/java/io/temporal/internal/nexus/{WorkflowRunOperationToken.java => OperationToken.java} (82%) create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java new file mode 100644 index 0000000000..c361f304e7 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java @@ -0,0 +1,80 @@ +package io.temporal.internal.nexus; + +import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; +import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; + +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import io.temporal.internal.client.NexusStartWorkflowResponse; +import java.net.URISyntaxException; +import java.util.function.Function; + +/** + * Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the + * operation context. Used by both {@code WorkflowRunOperationImpl} and {@code TemporalNexusClient}. + */ +public class NexusStartWorkflowHelper { + + /** + * Starts a workflow via the provided invoker function, attaches workflow links to the operation + * context, and returns the response. + * + * @param ctx the operation context (links will be attached as a side-effect) + * @param details the operation start details containing requestId, callback, links + * @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest} + * @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow + * execution + */ + public static NexusStartWorkflowResponse startWorkflowAndAttachLinks( + OperationContext ctx, + OperationStartDetails details, + Function invoker) { + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); + + NexusStartWorkflowRequest nexusRequest = + new NexusStartWorkflowRequest( + details.getRequestId(), + details.getCallbackUrl(), + details.getCallbackHeaders(), + nexusCtx.getTaskQueue(), + details.getLinks()); + + NexusStartWorkflowResponse response = invoker.apply(nexusRequest); + WorkflowExecution workflowExec = response.getWorkflowExecution(); + + // If the start workflow response returned a link use it, otherwise + // create the link information about the new workflow and return to the caller. + Link.WorkflowEvent workflowEventLink = + nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() + ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() + : null; + if (workflowEventLink == null) { + workflowEventLink = + Link.WorkflowEvent.newBuilder() + .setNamespace(nexusCtx.getNamespace()) + .setWorkflowId(workflowExec.getWorkflowId()) + .setRunId(workflowExec.getRunId()) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + } + io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); + if (nexusLink != null) { + try { + ctx.addLinks(nexusProtoLinkToLink(nexusLink)); + } catch (URISyntaxException e) { + throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e); + } + } + + return response; + } + + private NexusStartWorkflowHelper() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java similarity index 82% rename from temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java rename to temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java index 2c8d1acb87..4bd5635e93 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java @@ -3,7 +3,8 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -public class WorkflowRunOperationToken { +/** Deserialized representation of a Nexus operation token. */ +public class OperationToken { @JsonProperty("v") @JsonInclude(JsonInclude.Include.NON_NULL) private final Integer version; @@ -17,7 +18,7 @@ public class WorkflowRunOperationToken { @JsonProperty("wid") private final String workflowId; - public WorkflowRunOperationToken( + public OperationToken( @JsonProperty("t") Integer type, @JsonProperty("ns") String namespace, @JsonProperty("wid") String workflowId, @@ -28,8 +29,8 @@ public WorkflowRunOperationToken( this.version = version; } - public WorkflowRunOperationToken(String namespace, String workflowId) { - this.type = OperationTokenType.WORKFLOW_RUN; + public OperationToken(OperationTokenType type, String namespace, String workflowId) { + this.type = type; this.namespace = namespace; this.workflowId = workflowId; this.version = null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java index 1f4869bdc4..737a84aad4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java @@ -15,33 +15,45 @@ public class OperationTokenUtil { private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding(); /** - * Load a workflow run operation token from an operation token. + * Load and validate an operation token without asserting the token type. Use this for cancel + * dispatch where the token type determines the cancel behavior. * - * @throws IllegalArgumentException if the operation token is invalid + * @throws IllegalArgumentException if the operation token is malformed or has invalid structure */ - public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) { - WorkflowRunOperationToken token; + public static OperationToken loadOperationToken(String operationToken) { + OperationToken token; try { - JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class); + JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class); token = mapper.readValue(decoder.decode(operationToken), reference); } catch (Exception e) { throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage()); } - if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) { - throw new IllegalArgumentException( - "Invalid workflow run token: incorrect operation token type: " + token.getType()); - } if (token.getVersion() != null && token.getVersion() != 0) { - throw new IllegalArgumentException("Invalid workflow run token: unexpected version field"); + throw new IllegalArgumentException("Invalid operation token: unexpected version field"); } if (Strings.isNullOrEmpty(token.getWorkflowId())) { - throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)"); + throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)"); + } + return token; + } + + /** + * Load a workflow run operation token, asserting that the token type is {@link + * OperationTokenType#WORKFLOW_RUN}. + * + * @throws IllegalArgumentException if the operation token is invalid or not a workflow run token + */ + public static OperationToken loadWorkflowRunOperationToken(String operationToken) { + OperationToken token = loadOperationToken(operationToken); + if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) { + throw new IllegalArgumentException( + "Invalid workflow run token: incorrect operation token type: " + token.getType()); } return token; } /** - * Attempt to extract the workflow Id from an operation token. + * Extract the workflow ID from a workflow run operation token. * * @throws IllegalArgumentException if the operation token is invalid */ @@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) { /** Generate a workflow run operation token from a workflow ID and namespace. */ public static String generateWorkflowRunOperationToken(String workflowId, String namespace) throws JsonProcessingException { - String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId)); + String json = + ow.writeValueAsString( + new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId)); return encoder.encodeToString(json.getBytes()); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java new file mode 100644 index 0000000000..3b6999cc6c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -0,0 +1,131 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.Experimental; +import io.temporal.internal.client.NexusStartWorkflowResponse; +import io.temporal.internal.nexus.NexusStartWorkflowHelper; +import io.temporal.workflow.Functions; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with + * Temporal workflows from within a Nexus operation handler. + * + *

Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. The client creates + * workflow stubs internally — users pass the workflow class, a lambda that calls the workflow + * method, and workflow options. + * + *

Usage example: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler createOrder() {
+ *   return TemporalOperationHandler.from((context, client, input) -> {
+ *     return client.startWorkflow(
+ *         OrderWorkflow.class,
+ *         wf -> wf.processOrder(input),
+ *         WorkflowOptions.newBuilder()
+ *             .setWorkflowId("order-" + context.getRequestId())
+ *             .build());
+ *   });
+ * }
+ * }
+ * + *

For advanced use cases, the underlying {@link WorkflowClient} can be accessed via {@link + * #getWorkflowClient()}. For example, to send a signal and return a synchronous result: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler cancelOrder() {
+ *   return TemporalOperationHandler.from((context, client, input) -> {
+ *     client.getWorkflowClient()
+ *         .newUntypedWorkflowStub("order-" + input.getOrderId())
+ *         .signal("requestCancellation", input);
+ *     return TemporalOperationResult.sync(null);
+ *   });
+ * }
+ * }
+ */ +@Experimental +public final class TemporalNexusClient { + + private final WorkflowClient client; + private final OperationContext operationContext; + private final OperationStartDetails operationStartDetails; + + TemporalNexusClient( + WorkflowClient client, + OperationContext operationContext, + OperationStartDetails operationStartDetails) { + this.client = Objects.requireNonNull(client); + this.operationContext = Objects.requireNonNull(operationContext); + this.operationStartDetails = Objects.requireNonNull(operationStartDetails); + } + + /** Returns the underlying {@link WorkflowClient} for advanced use cases. */ + public WorkflowClient getWorkflowClient() { + return client; + } + + /** + * Starts a workflow by invoking a method on a workflow stub. The client creates the stub from the + * given class and options, then passes it to the provided consumer which should call exactly one + * workflow method. Works for both returning and void workflow methods. + * + *

Example (returning): + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, wf -> wf.run(input), options)
+   * }
+ * + *

Example (void): + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, wf -> wf.execute(input), options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowInvocation receives the workflow stub and calls exactly one workflow method + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the workflow return type (inferred from calling context) + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + public TemporalOperationResult startWorkflow( + Class workflowClass, Consumer workflowInvocation, WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + Functions.Proc bound = () -> workflowInvocation.accept(stub); + return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(bound)); + } + + /** + * Starts a workflow using an untyped workflow type name. + * + * @param workflowType the workflow type name string + * @param resultClass the expected result class + * @param args workflow arguments + * @param options workflow start options (must include workflowId) + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + public TemporalOperationResult startWorkflow( + String workflowType, Class resultClass, Object[] args, WorkflowOptions options) { + WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options); + WorkflowHandle handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args); + return invokeAndReturn(handle); + } + + private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { + NexusStartWorkflowResponse response = + NexusStartWorkflowHelper.startWorkflowAndAttachLinks( + operationContext, + operationStartDetails, + request -> handle.getInvoker().invoke(request)); + return TemporalOperationResult.async(response.getOperationToken()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java new file mode 100644 index 0000000000..440d18b4f5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java @@ -0,0 +1,49 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationCancelDetails; +import io.nexusrpc.handler.OperationContext; +import io.temporal.common.Experimental; +import java.util.Objects; + +/** + * Context for a Nexus cancel operation. Combines the {@link OperationContext} and {@link + * OperationCancelDetails} into a single object passed to cancel methods on {@link + * TemporalOperationHandler}. + */ +@Experimental +public final class TemporalOperationCancelContext { + + private final OperationContext operationContext; + private final OperationCancelDetails operationCancelDetails; + + TemporalOperationCancelContext( + OperationContext operationContext, OperationCancelDetails operationCancelDetails) { + this.operationContext = Objects.requireNonNull(operationContext); + this.operationCancelDetails = Objects.requireNonNull(operationCancelDetails); + } + + /** Returns the service name for this operation. */ + public String getService() { + return operationContext.getService(); + } + + /** Returns the operation name. */ + public String getOperation() { + return operationContext.getOperation(); + } + + /** Returns the operation token identifying the operation to cancel. */ + public String getOperationToken() { + return operationCancelDetails.getOperationToken(); + } + + /** Returns the underlying {@link OperationContext} for advanced use cases. */ + public OperationContext getOperationContext() { + return operationContext; + } + + /** Returns the underlying {@link OperationCancelDetails} for advanced use cases. */ + public OperationCancelDetails getOperationCancelDetails() { + return operationCancelDetails; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java new file mode 100644 index 0000000000..9d24410282 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -0,0 +1,124 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.*; +import io.nexusrpc.handler.OperationHandler; +import io.temporal.client.WorkflowClient; +import io.temporal.common.Experimental; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import io.temporal.internal.nexus.OperationToken; +import io.temporal.internal.nexus.OperationTokenType; +import io.temporal.internal.nexus.OperationTokenUtil; + +/** + * Generic Nexus operation handler backed by Temporal. Implements {@link OperationHandler} and + * provides a composable way to map Temporal operations (start workflow, etc.) to Nexus operations. + * + *

Usage example: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler createOrder() {
+ *   return TemporalOperationHandler.from((context, client, input) -> {
+ *     return client.startWorkflow(
+ *         OrderWorkflow.class,
+ *         wf -> wf.processOrder(input),
+ *         WorkflowOptions.newBuilder()
+ *             .setWorkflowId("order-" + context.getRequestId())
+ *             .build());
+ *   });
+ * }
+ * }
+ * + *

The cancel behavior is overridable. By default, canceling an operation backed by a + * workflow-run token cancels the underlying workflow. + * + * @param the input type + * @param the result type + */ +@Experimental +public class TemporalOperationHandler implements OperationHandler { + + /** + * Function invoked when a Nexus start operation request is received. + * + * @param the input type + * @param the result type + */ + @FunctionalInterface + public interface StartFunction { + TemporalOperationResult apply( + TemporalOperationStartContext context, TemporalNexusClient client, T input); + } + + private final StartFunction startFunction; + + private TemporalOperationHandler(StartFunction startFunction) { + this.startFunction = startFunction; + } + + /** + * Creates an {@link OperationHandler} from a start function. + * + * @param startFunction the function to invoke on start operation requests + * @return an operation handler backed by the given start function + */ + public static OperationHandler from(StartFunction startFunction) { + return new TemporalOperationHandler<>(startFunction); + } + + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails details, T input) { + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); + TemporalNexusClient client = + new TemporalNexusClient(nexusCtx.getWorkflowClient(), ctx, details); + + TemporalOperationStartContext startContext = new TemporalOperationStartContext(ctx, details); + TemporalOperationResult result = startFunction.apply(startContext, client, input); + + if (result.isSync()) { + return OperationStartResult.newSyncBuilder(result.getSyncResult()).build(); + } else if (result.isAsync()) { + return OperationStartResult.newAsyncBuilder(result.getAsyncOperationToken()).build(); + } else { + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + new IllegalStateException("TemporalOperationResult must be either sync or async")); + } + } + + @Override + public void cancel(OperationContext ctx, OperationCancelDetails details) { + OperationToken token; + try { + token = OperationTokenUtil.loadOperationToken(details.getOperationToken()); + } catch (IllegalArgumentException e) { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, "failed to parse operation token", e); + } + + TemporalOperationCancelContext cancelContext = new TemporalOperationCancelContext(ctx, details); + if (token.getType() == OperationTokenType.WORKFLOW_RUN) { + cancelWorkflowRun(cancelContext, token.getWorkflowId()); + } else { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new IllegalArgumentException("unsupported operation token type: " + token.getType())); + } + } + + /** + * Called when a cancel request is received for a workflow-run token (type=1). Override to + * customize cancel behavior. + * + *

Default behavior: cancels the underlying workflow. + * + * @param context the cancel context + * @param workflowId the workflow ID extracted from the operation token + */ + protected void cancelWorkflowRun(TemporalOperationCancelContext context, String workflowId) { + WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient(); + client.newUntypedWorkflowStub(workflowId).cancel(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java new file mode 100644 index 0000000000..3ff3fa2e60 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java @@ -0,0 +1,82 @@ +package io.temporal.nexus; + +import com.google.common.base.Strings; +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** + * Unified result type for Temporal-backed Nexus operations. Encapsulates either a synchronous + * result or an async operation token. + * + *

Use {@link #sync(Object)} for operations that complete immediately (e.g., signals). Use {@link + * #async(String)} for operations that return an operation token for async completion (e.g., start + * workflow). + */ +@Experimental +public final class TemporalOperationResult { + + private final boolean isSync; + @Nullable private final R syncResult; + @Nullable private final String asyncOperationToken; + + private TemporalOperationResult( + boolean isSync, @Nullable R syncResult, @Nullable String asyncOperationToken) { + this.isSync = isSync; + this.syncResult = syncResult; + this.asyncOperationToken = asyncOperationToken; + } + + /** + * Creates a synchronous result. + * + * @param value the result value, may be null + * @return a sync result wrapping the given value + */ + public static TemporalOperationResult sync(@Nullable R value) { + return new TemporalOperationResult<>(true, value, null); + } + + /** + * Creates an asynchronous result backed by an operation token. + * + * @param operationToken the operation token identifying the async operation + * @return an async result wrapping the given token + * @throws IllegalArgumentException if operationToken is null or empty + */ + public static TemporalOperationResult async(String operationToken) { + if (Strings.isNullOrEmpty(operationToken)) { + throw new IllegalArgumentException("operationToken must not be null or empty"); + } + return new TemporalOperationResult<>(false, null, operationToken); + } + + /** Returns true if this is a synchronous result. */ + public boolean isSync() { + return isSync; + } + + /** Returns true if this is an asynchronous result backed by an operation token. */ + public boolean isAsync() { + return !isSync; + } + + /** + * Returns the synchronous result value, or null if this is an async result. + * + * @return the sync result value, or null + */ + @Nullable + public R getSyncResult() { + return syncResult; + } + + /** + * Returns the async operation token, or null if this is a sync result. + * + * @return the operation token, or null + */ + @Nullable + public String getAsyncOperationToken() { + return asyncOperationToken; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java new file mode 100644 index 0000000000..f15095b93a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java @@ -0,0 +1,49 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.common.Experimental; +import java.util.Objects; + +/** + * Context for a Nexus start operation. Combines the {@link OperationContext} and {@link + * OperationStartDetails} into a single object passed to {@link + * TemporalOperationHandler.StartFunction}. + */ +@Experimental +public final class TemporalOperationStartContext { + + private final OperationContext operationContext; + private final OperationStartDetails operationStartDetails; + + TemporalOperationStartContext( + OperationContext operationContext, OperationStartDetails operationStartDetails) { + this.operationContext = Objects.requireNonNull(operationContext); + this.operationStartDetails = Objects.requireNonNull(operationStartDetails); + } + + /** Returns the service name for this operation. */ + public String getService() { + return operationContext.getService(); + } + + /** Returns the operation name. */ + public String getOperation() { + return operationContext.getOperation(); + } + + /** Returns the request ID, commonly used as a workflow ID for idempotency. */ + public String getRequestId() { + return operationStartDetails.getRequestId(); + } + + /** Returns the underlying {@link OperationContext} for advanced use cases. */ + public OperationContext getOperationContext() { + return operationContext; + } + + /** Returns the underlying {@link OperationStartDetails} for advanced use cases. */ + public OperationStartDetails getOperationStartDetails() { + return operationStartDetails; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java index 537235ce53..167cd4061f 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java @@ -1,20 +1,12 @@ package io.temporal.nexus; -import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; -import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; - import io.nexusrpc.handler.*; import io.nexusrpc.handler.OperationHandler; -import io.temporal.api.common.v1.Link; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.enums.v1.EventType; import io.temporal.client.WorkflowClient; -import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.client.NexusStartWorkflowResponse; import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.InternalNexusOperationContext; +import io.temporal.internal.nexus.NexusStartWorkflowHelper; import io.temporal.internal.nexus.OperationTokenUtil; -import java.net.URISyntaxException; class WorkflowRunOperationImpl implements OperationHandler { private final WorkflowHandleFactory handleFactory; @@ -26,52 +18,13 @@ class WorkflowRunOperationImpl implements OperationHandler { @Override public OperationStartResult start( OperationContext ctx, OperationStartDetails operationStartDetails, T input) { - InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); - WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input); - NexusStartWorkflowRequest nexusRequest = - new NexusStartWorkflowRequest( - operationStartDetails.getRequestId(), - operationStartDetails.getCallbackUrl(), - operationStartDetails.getCallbackHeaders(), - nexusCtx.getTaskQueue(), - operationStartDetails.getLinks()); - - NexusStartWorkflowResponse nexusStartWorkflowResponse = - handle.getInvoker().invoke(nexusRequest); - WorkflowExecution workflowExec = nexusStartWorkflowResponse.getWorkflowExecution(); + NexusStartWorkflowResponse response = + NexusStartWorkflowHelper.startWorkflowAndAttachLinks( + ctx, operationStartDetails, request -> handle.getInvoker().invoke(request)); - // If the start workflow response returned a link use it, otherwise - // create the link information about the new workflow and return to the caller. - Link.WorkflowEvent workflowEventLink = - nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() - ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() - : null; - if (workflowEventLink == null) { - workflowEventLink = - Link.WorkflowEvent.newBuilder() - .setNamespace(nexusCtx.getNamespace()) - .setWorkflowId(workflowExec.getWorkflowId()) - .setRunId(workflowExec.getRunId()) - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) - .build(); - } - io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); - // Attach the link to the operation result. - OperationStartResult.Builder result = - OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken()); - if (nexusLink != null) { - try { - ctx.addLinks(nexusProtoLinkToLink(nexusLink)); - } catch (URISyntaxException e) { - // Not expected as the link is constructed by the SDK. - throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e); - } - } - return result.build(); + return OperationStartResult.newAsyncBuilder(response.getOperationToken()).build(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java index fbf14d217a..4404c2bc4c 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java @@ -17,7 +17,8 @@ public class WorkflowRunTokenTest { @Test public void serializeWorkflowRunToken() throws JsonProcessingException { - WorkflowRunOperationToken token = new WorkflowRunOperationToken("namespace", "workflowId"); + OperationToken token = + new OperationToken(OperationTokenType.WORKFLOW_RUN, "namespace", "workflowId"); String json = ow.writeValueAsString(token); final JsonNode node = new ObjectMapper().readTree(json); System.out.println(json); @@ -32,9 +33,8 @@ public void serializeWorkflowRunToken() throws JsonProcessingException { @Test public void deserializeWorkflowRunTokenWithVersion() throws IOException { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\",\"v\":1}"; - JavaType reference = - new ObjectMapper().getTypeFactory().constructType(WorkflowRunOperationToken.class); - WorkflowRunOperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); + JavaType reference = new ObjectMapper().getTypeFactory().constructType(OperationToken.class); + OperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); // Assert that the serialized JSON is as expected Assert.assertEquals(OperationTokenType.WORKFLOW_RUN, token.getType()); Assert.assertEquals(new Integer(1), token.getVersion()); @@ -45,9 +45,8 @@ public void deserializeWorkflowRunTokenWithVersion() throws IOException { @Test public void deserializeWorkflowRunToken() throws IOException { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\"}"; - JavaType reference = - new ObjectMapper().getTypeFactory().constructType(WorkflowRunOperationToken.class); - WorkflowRunOperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); + JavaType reference = new ObjectMapper().getTypeFactory().constructType(OperationToken.class); + OperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); // Assert that the serialized JSON is as expected Assert.assertEquals(OperationTokenType.WORKFLOW_RUN, token.getType()); Assert.assertNull(null, token.getVersion()); @@ -67,8 +66,8 @@ public void failLoadOldWorkflowRunToken() { public void loadWorkflowIdFromOperationToken() { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\"}"; - WorkflowRunOperationToken token = - OperationTokenUtil.loadWorkflowRunOperationToken(encoder.encodeToString(json.getBytes())); + OperationToken token = + OperationTokenUtil.loadOperationToken(encoder.encodeToString(json.getBytes())); Assert.assertEquals("workflowId", token.getWorkflowId()); Assert.assertEquals("namespace", token.getNamespace()); Assert.assertNull(token.getVersion()); @@ -86,8 +85,7 @@ public void loadWorkflowIdFromGoOperationToken() { // across SDKs. String goOperationToken = "eyJ2IjowLCJ0IjoxLCJucyI6Im5zIiwid2lkIjoidyJ9"; - WorkflowRunOperationToken token = - OperationTokenUtil.loadWorkflowRunOperationToken(goOperationToken); + OperationToken token = OperationTokenUtil.loadOperationToken(goOperationToken); Assert.assertEquals("w", token.getWorkflowId()); Assert.assertEquals("ns", token.getNamespace()); Assert.assertEquals(Integer.valueOf(0), token.getVersion()); @@ -101,7 +99,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenEmptyJson.getBytes()))); // Bad token, missing the "wid" field @@ -109,7 +107,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenMissingWorkflow.getBytes()))); // Bad token, unknown version @@ -118,7 +116,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenUnknownVersion.getBytes()))); // Bad token, unknown version @@ -126,7 +124,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenUnknownType.getBytes()))); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java index a0b5f1f9d9..3ffaca9b7f 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java @@ -6,8 +6,8 @@ import io.temporal.client.WorkflowOptions; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; +import io.temporal.internal.nexus.OperationToken; import io.temporal.internal.nexus.OperationTokenUtil; -import io.temporal.internal.nexus.WorkflowRunOperationToken; import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowRunOperation; import io.temporal.testing.WorkflowReplayer; @@ -74,8 +74,8 @@ public String execute(String input) { "Operation token should be present", asyncExec.getOperationToken().isPresent()); // Result should only be completed if the operation is completed Assert.assertFalse("Result should not be completed", asyncOpHandle.getResult().isCompleted()); - WorkflowRunOperationToken token = - OperationTokenUtil.loadWorkflowRunOperationToken(asyncExec.getOperationToken().get()); + OperationToken token = + OperationTokenUtil.loadOperationToken(asyncExec.getOperationToken().get()); Assert.assertTrue(token.getWorkflowId().startsWith(WORKFLOW_ID_PREFIX)); // Unblock the operation Workflow.newExternalWorkflowStub(OperationWorkflow.class, token.getWorkflowId()).unblock(); @@ -87,7 +87,7 @@ public String execute(String input) { } catch (NexusOperationFailure e) { Assert.assertEquals("TestNexusService1", e.getService()); Assert.assertEquals("operation", e.getOperation()); - token = OperationTokenUtil.loadWorkflowRunOperationToken(e.getOperationToken()); + token = OperationTokenUtil.loadOperationToken(e.getOperationToken()); Assert.assertTrue(token.getWorkflowId().startsWith(WORKFLOW_ID_PREFIX)); Assert.assertTrue(e.getCause() instanceof ApplicationFailure); ApplicationFailure applicationFailure = (ApplicationFailure) e.getCause(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java new file mode 100644 index 0000000000..b73035743a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java @@ -0,0 +1,136 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.enums.v1.EventType; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.CanceledFailure; +import io.temporal.internal.Signal; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerCancelTest extends BaseNexusTest { + + private static final Signal opStarted = new Signal(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestCancelWorkflow.class, WaitForCancelWorkflow.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Override + public void setUp() { + super.setUp(); + opStarted.clearSignal(); + } + + @Test + public void cancelGenericHandlerOperation() { + WorkflowStub stub = testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("TestCancelWorkflow"); + stub.start(false); + try { + opStarted.waitForSignal(); + } catch (Exception e) { + Assert.fail("test timed out waiting for operation to start."); + } + stub.cancel(); + Assert.assertThrows(WorkflowFailedException.class, () -> stub.getResult(Void.class)); + // Verify the nexus operation cancel was dispatched and completed successfully. + // The parent workflow completes as cancelled (CanceledFailure), same as with + // WorkflowRunOperation — the cancel handler correctly cancels the handler workflow. + testWorkflowRule.assertHistoryEvent( + stub.getExecution().getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED); + } + + @WorkflowInterface + public interface TestCancelWorkflowInterface { + @WorkflowMethod(name = "TestCancelWorkflow") + void execute(boolean cancelImmediately); + } + + public static class TestCancelWorkflow implements TestCancelWorkflowInterface { + @Override + public void execute(boolean cancelImmediately) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .setCancellationType(NexusOperationCancellationType.WAIT_COMPLETED) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + + TestNexusCancelService serviceStub = + Workflow.newNexusServiceStub(TestNexusCancelService.class, serviceOptions); + + NexusOperationHandle handle = + Workflow.startNexusOperation(serviceStub::operation, "cancel-test"); + handle.getExecution().get(); + opStarted.signal(); + + try { + Workflow.await(() -> false); + } catch (CanceledFailure f) { + Workflow.newDetachedCancellationScope(() -> handle.getResult().get()).run(); + } + } + } + + @WorkflowInterface + public interface WaitForCancelWorkflowInterface { + @WorkflowMethod + Void execute(String input); + } + + public static class WaitForCancelWorkflow implements WaitForCancelWorkflowInterface { + @Override + public Void execute(String input) { + try { + Workflow.await(() -> false); + } catch (CanceledFailure f) { + // workflow was cancelled as expected + } + return null; + } + } + + @Service + public interface TestNexusCancelService { + @Operation + Void operation(String input); + } + + @ServiceImpl(service = TestNexusCancelService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.from( + (context, client, input) -> + client.startWorkflow( + WaitForCancelWorkflowInterface.class, + wf -> wf.execute(input), + WorkflowOptions.newBuilder() + .setWorkflowId("generic-cancel-test-" + context.getService()) + .build())); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java new file mode 100644 index 0000000000..55ca9c321c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java @@ -0,0 +1,64 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.nexus.TemporalOperationResult; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerSyncResultTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void syncResultTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("sync-hello", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusSyncService serviceStub = + Workflow.newNexusServiceStub(TestNexusSyncService.class, serviceOptions); + return serviceStub.operation("hello"); + } + } + + @Service + public interface TestNexusSyncService { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusSyncService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.from( + (context, client, input) -> TemporalOperationResult.sync("sync-" + input)); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java new file mode 100644 index 0000000000..def80b0a53 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java @@ -0,0 +1,77 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerTypedProcTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void typedProcStartWorkflowTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("done", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceProc serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceProc.class, serviceOptions); + serviceStub.operation("input"); + return "done"; + } + } + + @Service + public interface TestNexusServiceProc { + @Operation + Void operation(String input); + } + + @ServiceImpl(service = TestNexusServiceProc.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.from( + (context, client, input) -> + client.startWorkflow( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, + wf -> wf.proc(), + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-proc-" + + context.getService() + + "-" + + context.getOperation()) + .build())); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java new file mode 100644 index 0000000000..3b9809d454 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java @@ -0,0 +1,108 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerTypedStartWorkflowTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void typedStartWorkflowTests() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("funcinputinput2", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceGeneric serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceGeneric.class, serviceOptions); + StringBuilder result = new StringBuilder(); + for (int i = 0; i < 3; i++) { + result.append(serviceStub.operation(i)); + } + return result.toString(); + } + } + + @Service + public interface TestNexusServiceGeneric { + @Operation + String operation(Integer input); + } + + @ServiceImpl(service = TestNexusServiceGeneric.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.from( + (context, client, input) -> { + switch (input) { + case 0: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc.class, + wf -> wf.func(), + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-test-func0-" + + context.getService() + + "-" + + context.getOperation()) + .build()); + case 1: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, + wf -> wf.func1("input"), + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-test-func1-" + + context.getService() + + "-" + + context.getOperation()) + .build()); + case 2: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc.class, + wf -> wf.func2("input", 2), + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-test-func2-" + + context.getService() + + "-" + + context.getOperation()) + .build()); + default: + throw new IllegalArgumentException("unexpected input: " + input); + } + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java new file mode 100644 index 0000000000..410b621013 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java @@ -0,0 +1,77 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerUntypedStartWorkflowTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void untypedStartWorkflowTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("input", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceUntyped serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceUntyped.class, serviceOptions); + return serviceStub.operation("input"); + } + } + + @Service + public interface TestNexusServiceUntyped { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusServiceUntyped.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.from( + (context, client, input) -> + client.startWorkflow( + "func1", + String.class, + new Object[] {input}, + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-untyped-" + + context.getService() + + "-" + + context.getOperation()) + .build())); + } + } +} From 6641b14483173700384fa4548c401b8d4331d8c7 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 14 Apr 2026 15:31:33 -0700 Subject: [PATCH 2/6] Update --- .../temporal/nexus/TemporalNexusClient.java | 23 +++++++++---------- .../nexus/TemporalOperationHandler.java | 12 ++++++---- .../nexus/GenericHandlerTypedProcTest.java | 5 +++- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java index 3b6999cc6c..ab4de55d4b 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -10,7 +10,6 @@ import io.temporal.internal.nexus.NexusStartWorkflowHelper; import io.temporal.workflow.Functions; import java.util.Objects; -import java.util.function.Consumer; /** * Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with @@ -73,33 +72,33 @@ public WorkflowClient getWorkflowClient() { } /** - * Starts a workflow by invoking a method on a workflow stub. The client creates the stub from the - * given class and options, then passes it to the provided consumer which should call exactly one - * workflow method. Works for both returning and void workflow methods. + * Starts a workflow by invoking a returning method on a workflow stub. The client creates the + * stub from the given class and options, then invokes the workflow method via the provided + * function. * - *

Example (returning): + *

Example: * *

{@code
    * client.startWorkflow(MyWorkflow.class, wf -> wf.run(input), options)
    * }
* - *

Example (void): + *

For void-returning workflow methods, use a block lambda that returns null: * *

{@code
-   * client.startWorkflow(MyWorkflow.class, wf -> wf.execute(input), options)
+   * client.startWorkflow(MyWorkflow.class, wf -> { wf.execute(input); return null; }, options)
    * }
* * @param workflowClass the workflow interface class - * @param workflowInvocation receives the workflow stub and calls exactly one workflow method + * @param workflowMethod receives the workflow stub and calls exactly one workflow method * @param options workflow start options (must include workflowId) * @param the workflow interface type - * @param the workflow return type (inferred from calling context) + * @param the workflow return type * @return an async {@link TemporalOperationResult} with the workflow-run operation token */ public TemporalOperationResult startWorkflow( - Class workflowClass, Consumer workflowInvocation, WorkflowOptions options) { + Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options) { T stub = client.newWorkflowStub(workflowClass, options); - Functions.Proc bound = () -> workflowInvocation.accept(stub); + Functions.Func bound = () -> workflowMethod.apply(stub); return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(bound)); } @@ -120,7 +119,7 @@ public TemporalOperationResult startWorkflow( return invokeAndReturn(handle); } - private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { + private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { NexusStartWorkflowResponse response = NexusStartWorkflowHelper.startWorkflowAndAttachLinks( operationContext, diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 9d24410282..48f4ef0ca8 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -30,8 +30,9 @@ * } * } * - *

The cancel behavior is overridable. By default, canceling an operation backed by a - * workflow-run token cancels the underlying workflow. + *

This class supports subclassing to customize cancel behavior. Override {@link + * #cancelWorkflowRun} to change how workflow-run cancellations are handled. The {@link #start} and + * {@link #cancel} methods should not be overridden — they contain the core dispatch logic. * * @param the input type * @param the result type @@ -53,17 +54,18 @@ TemporalOperationResult apply( private final StartFunction startFunction; - private TemporalOperationHandler(StartFunction startFunction) { + protected TemporalOperationHandler(StartFunction startFunction) { this.startFunction = startFunction; } /** - * Creates an {@link OperationHandler} from a start function. + * Creates a {@link TemporalOperationHandler} from a start function. Subclass and override {@link + * #cancelWorkflowRun} to customize cancel behavior. * * @param startFunction the function to invoke on start operation requests * @return an operation handler backed by the given start function */ - public static OperationHandler from(StartFunction startFunction) { + public static TemporalOperationHandler from(StartFunction startFunction) { return new TemporalOperationHandler<>(startFunction); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java index def80b0a53..684dc05f78 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java @@ -64,7 +64,10 @@ public OperationHandler operation() { (context, client, input) -> client.startWorkflow( TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, - wf -> wf.proc(), + wf -> { + wf.proc(); + return null; + }, WorkflowOptions.newBuilder() .setWorkflowId( "generic-handler-proc-" From 05a2002c64d8e0b526a8b9f0507d925f4c0264fb Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 14 Apr 2026 15:51:54 -0700 Subject: [PATCH 3/6] Update some docs --- .../main/java/io/temporal/nexus/TemporalNexusClient.java | 6 ++---- .../io/temporal/nexus/TemporalOperationCancelContext.java | 5 ++--- .../java/io/temporal/nexus/TemporalOperationHandler.java | 2 +- .../io/temporal/nexus/TemporalOperationStartContext.java | 7 +++---- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java index ab4de55d4b..358d03ec16 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -15,9 +15,7 @@ * Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with * Temporal workflows from within a Nexus operation handler. * - *

Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. The client creates - * workflow stubs internally — users pass the workflow class, a lambda that calls the workflow - * method, and workflow options. + *

Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. * *

Usage example: * @@ -29,7 +27,7 @@ * OrderWorkflow.class, * wf -> wf.processOrder(input), * WorkflowOptions.newBuilder() - * .setWorkflowId("order-" + context.getRequestId()) + * .setWorkflowId("order-" + input.getOrderId()) * .build()); * }); * } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java index 440d18b4f5..afee2a8a8e 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java @@ -6,9 +6,8 @@ import java.util.Objects; /** - * Context for a Nexus cancel operation. Combines the {@link OperationContext} and {@link - * OperationCancelDetails} into a single object passed to cancel methods on {@link - * TemporalOperationHandler}. + * Context for a Nexus cancel operation request, passed to {@link + * TemporalOperationHandler#cancelWorkflowRun}. */ @Experimental public final class TemporalOperationCancelContext { diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 48f4ef0ca8..04bf4f3598 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -24,7 +24,7 @@ * OrderWorkflow.class, * wf -> wf.processOrder(input), * WorkflowOptions.newBuilder() - * .setWorkflowId("order-" + context.getRequestId()) + * .setWorkflowId("order-" + input.getOrderId()) * .build()); * }); * } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java index f15095b93a..b02df88804 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java @@ -6,9 +6,8 @@ import java.util.Objects; /** - * Context for a Nexus start operation. Combines the {@link OperationContext} and {@link - * OperationStartDetails} into a single object passed to {@link - * TemporalOperationHandler.StartFunction}. + * Context for a Nexus start operation request, passed to {@link + * TemporalOperationHandler.StartFunction} alongside the {@link TemporalNexusClient} and input. */ @Experimental public final class TemporalOperationStartContext { @@ -32,7 +31,7 @@ public String getOperation() { return operationContext.getOperation(); } - /** Returns the request ID, commonly used as a workflow ID for idempotency. */ + /** Returns the request ID for this operation. */ public String getRequestId() { return operationStartDetails.getRequestId(); } From acfa2e8942c19f02be4220531d8c93d2b8c8ee4c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 16 Apr 2026 08:47:45 -0700 Subject: [PATCH 4/6] Update calling convention --- .../temporal/nexus/TemporalNexusClient.java | 387 +++++++++++++++--- .../nexus/TemporalNexusClientImpl.java | 204 +++++++++ .../nexus/TemporalOperationHandler.java | 2 +- .../nexus/GenericHandlerCancelTest.java | 3 +- .../nexus/GenericHandlerTypedProcTest.java | 77 +++- .../GenericHandlerTypedStartWorkflowTest.java | 68 +-- ...enericHandlerUntypedStartWorkflowTest.java | 4 +- 7 files changed, 641 insertions(+), 104 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java index 358d03ec16..dcdb57165c 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -1,33 +1,27 @@ package io.temporal.nexus; -import io.nexusrpc.handler.OperationContext; -import io.nexusrpc.handler.OperationStartDetails; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowStub; import io.temporal.common.Experimental; -import io.temporal.internal.client.NexusStartWorkflowResponse; -import io.temporal.internal.nexus.NexusStartWorkflowHelper; import io.temporal.workflow.Functions; -import java.util.Objects; /** * Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with - * Temporal workflows from within a Nexus operation handler. + * Temporal from within a Nexus operation handler. * *

Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. * - *

Usage example: + *

Example usage to start a workflow from an operation handler: * *

{@code
  * @OperationImpl
- * public OperationHandler createOrder() {
+ * public OperationHandler startTransfer() {
  *   return TemporalOperationHandler.from((context, client, input) -> {
  *     return client.startWorkflow(
- *         OrderWorkflow.class,
- *         wf -> wf.processOrder(input),
+ *         TransferWorkflow.class,
+ *         TransferWorkflow::transfer, input.getFromAccount(), input.getToAccount(),
  *         WorkflowOptions.newBuilder()
- *             .setWorkflowId("order-" + input.getOrderId())
+ *             .setWorkflowId("transfer-" + input.getTransferId())
  *             .build());
  *   });
  * }
@@ -49,80 +43,359 @@
  * }
*/ @Experimental -public final class TemporalNexusClient { +public interface TemporalNexusClient { - private final WorkflowClient client; - private final OperationContext operationContext; - private final OperationStartDetails operationStartDetails; + /** Returns the underlying {@link WorkflowClient} for advanced use cases. */ + WorkflowClient getWorkflowClient(); - TemporalNexusClient( - WorkflowClient client, - OperationContext operationContext, - OperationStartDetails operationStartDetails) { - this.client = Objects.requireNonNull(client); - this.operationContext = Objects.requireNonNull(operationContext); - this.operationStartDetails = Objects.requireNonNull(operationStartDetails); - } + /** + * Starts a zero-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options); - /** Returns the underlying {@link WorkflowClient} for advanced use cases. */ - public WorkflowClient getWorkflowClient() { - return client; - } + /** + * Starts a one-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::processOrder, input, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func2 workflowMethod, + A1 arg1, + WorkflowOptions options); + + /** + * Starts a two-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options); /** - * Starts a workflow by invoking a returning method on a workflow stub. The client creates the - * stub from the given class and options, then invokes the workflow method via the provided - * function. + * Starts a three-argument workflow that returns a value. * *

Example: * *

{@code
-   * client.startWorkflow(MyWorkflow.class, wf -> wf.run(input), options)
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, options)
    * }
* - *

For void-returning workflow methods, use a block lambda that returns null: + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options); + + /** + * Starts a four-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, arg4, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options); + + /** + * Starts a five-argument workflow that returns a value. + * + *

Example: * *

{@code
-   * client.startWorkflow(MyWorkflow.class, wf -> { wf.execute(input); return null; }, options)
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, arg4, arg5, options)
    * }
* * @param workflowClass the workflow interface class - * @param workflowMethod receives the workflow stub and calls exactly one workflow method + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument * @param options workflow start options (must include workflowId) * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the type of the fifth workflow argument * @param the workflow return type * @return an async {@link TemporalOperationResult} with the workflow-run operation token */ - public TemporalOperationResult startWorkflow( - Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options) { - T stub = client.newWorkflowStub(workflowClass, options); - Functions.Func bound = () -> workflowMethod.apply(stub); - return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(bound)); - } + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options); + + /** + * Starts a zero-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Proc1 workflowMethod, WorkflowOptions options); + + /** + * Starts a one-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, input, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc2 workflowMethod, + A1 arg1, + WorkflowOptions options); + + /** + * Starts a two-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options); + + /** + * Starts a three-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options); + + /** + * Starts a four-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, arg4, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options); + + /** + * Starts a five-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, arg4, arg5, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the type of the fifth workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options); /** * Starts a workflow using an untyped workflow type name. * + *

Example: + * + *

{@code
+   * client.startWorkflow("MyWorkflow", String.class, options, input)
+   * }
+ * * @param workflowType the workflow type name string * @param resultClass the expected result class - * @param args workflow arguments * @param options workflow start options (must include workflowId) + * @param args workflow arguments * @param the workflow return type * @return an async {@link TemporalOperationResult} with the workflow-run operation token */ - public TemporalOperationResult startWorkflow( - String workflowType, Class resultClass, Object[] args, WorkflowOptions options) { - WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options); - WorkflowHandle handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args); - return invokeAndReturn(handle); - } - - private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { - NexusStartWorkflowResponse response = - NexusStartWorkflowHelper.startWorkflowAndAttachLinks( - operationContext, - operationStartDetails, - request -> handle.getInvoker().invoke(request)); - return TemporalOperationResult.async(response.getOperationToken()); - } + TemporalOperationResult startWorkflow( + String workflowType, Class resultClass, WorkflowOptions options, Object... args); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java new file mode 100644 index 0000000000..869dfe3cc5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java @@ -0,0 +1,204 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.client.NexusStartWorkflowResponse; +import io.temporal.internal.nexus.NexusStartWorkflowHelper; +import io.temporal.workflow.Functions; +import java.util.Objects; + +/** Package-private implementation of {@link TemporalNexusClient}. */ +final class TemporalNexusClientImpl implements TemporalNexusClient { + + private final WorkflowClient client; + private final OperationContext operationContext; + private final OperationStartDetails operationStartDetails; + + TemporalNexusClientImpl( + WorkflowClient client, + OperationContext operationContext, + OperationStartDetails operationStartDetails) { + this.client = Objects.requireNonNull(client); + this.operationContext = Objects.requireNonNull(operationContext); + this.operationStartDetails = Objects.requireNonNull(operationStartDetails); + } + + @Override + public WorkflowClient getWorkflowClient() { + return client; + } + + // ---------- Returning (Func) overloads ---------- + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func2 workflowMethod, + A1 arg1, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2, arg3))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4, arg5))); + } + + // ---------- Void (Proc) overloads ---------- + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Proc1 workflowMethod, WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc2 workflowMethod, + A1 arg1, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2, arg3))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4, arg5))); + } + + // ---------- Untyped ---------- + + @Override + public TemporalOperationResult startWorkflow( + String workflowType, Class resultClass, WorkflowOptions options, Object... args) { + WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options); + WorkflowHandle handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args); + return invokeAndReturn(handle); + } + + private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { + NexusStartWorkflowResponse response = + NexusStartWorkflowHelper.startWorkflowAndAttachLinks( + operationContext, + operationStartDetails, + request -> handle.getInvoker().invoke(request)); + return TemporalOperationResult.async(response.getOperationToken()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 04bf4f3598..126f35a034 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -74,7 +74,7 @@ public OperationStartResult start( OperationContext ctx, OperationStartDetails details, T input) { InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); TemporalNexusClient client = - new TemporalNexusClient(nexusCtx.getWorkflowClient(), ctx, details); + new TemporalNexusClientImpl(nexusCtx.getWorkflowClient(), ctx, details); TemporalOperationStartContext startContext = new TemporalOperationStartContext(ctx, details); TemporalOperationResult result = startFunction.apply(startContext, client, input); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java index b73035743a..904393843e 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java @@ -127,7 +127,8 @@ public OperationHandler operation() { (context, client, input) -> client.startWorkflow( WaitForCancelWorkflowInterface.class, - wf -> wf.execute(input), + WaitForCancelWorkflowInterface::execute, + input, WorkflowOptions.newBuilder() .setWorkflowId("generic-cancel-test-" + context.getService()) .build())); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java index 684dc05f78..983f5e8039 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java @@ -45,7 +45,9 @@ public String execute(String input) { TestNexusServiceProc serviceStub = Workflow.newNexusServiceStub(TestNexusServiceProc.class, serviceOptions); - serviceStub.operation("input"); + for (int i = 0; i < 6; i++) { + serviceStub.operation(i); + } return "done"; } } @@ -53,28 +55,69 @@ public String execute(String input) { @Service public interface TestNexusServiceProc { @Operation - Void operation(String input); + Void operation(Integer input); } @ServiceImpl(service = TestNexusServiceProc.class) public class TestNexusServiceImpl { @OperationImpl - public OperationHandler operation() { + public OperationHandler operation() { return TemporalOperationHandler.from( - (context, client, input) -> - client.startWorkflow( - TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, - wf -> { - wf.proc(); - return null; - }, - WorkflowOptions.newBuilder() - .setWorkflowId( - "generic-handler-proc-" - + context.getService() - + "-" - + context.getOperation()) - .build())); + (context, client, input) -> { + String prefix = "generic-handler-test-proc" + input + "-"; + String workflowId = prefix + context.getService() + "-" + context.getOperation(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId(workflowId).build(); + switch (input) { + case 0: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc::proc, + options); + case 1: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc::proc1, + "input", + options); + case 2: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc::proc2, + "input", + 2, + options); + case 3: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc::proc3, + "input", + 2, + 3, + options); + case 4: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc::proc4, + "input", + 2, + 3, + 4, + options); + case 5: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc::proc5, + "input", + 2, + 3, + 4, + 5, + options); + default: + throw new IllegalArgumentException("unexpected input: " + input); + } + }); } } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java index 3b9809d454..ddd2faf381 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java @@ -30,7 +30,7 @@ public void typedStartWorkflowTests() { TestWorkflows.TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); - Assert.assertEquals("funcinputinput2", result); + Assert.assertEquals("funcinputinput2input23input234input2345", result); } public static class TestNexus implements TestWorkflows.TestWorkflow1 { @@ -46,7 +46,7 @@ public String execute(String input) { TestNexusServiceGeneric serviceStub = Workflow.newNexusServiceStub(TestNexusServiceGeneric.class, serviceOptions); StringBuilder result = new StringBuilder(); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 6; i++) { result.append(serviceStub.operation(i)); } return result.toString(); @@ -65,40 +65,56 @@ public class TestNexusServiceImpl { public OperationHandler operation() { return TemporalOperationHandler.from( (context, client, input) -> { + String prefix = "generic-handler-test-func" + input + "-"; + String workflowId = prefix + context.getService() + "-" + context.getOperation(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId(workflowId).build(); switch (input) { case 0: return client.startWorkflow( TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc.class, - wf -> wf.func(), - WorkflowOptions.newBuilder() - .setWorkflowId( - "generic-handler-test-func0-" - + context.getService() - + "-" - + context.getOperation()) - .build()); + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc::func, + options); case 1: return client.startWorkflow( TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, - wf -> wf.func1("input"), - WorkflowOptions.newBuilder() - .setWorkflowId( - "generic-handler-test-func1-" - + context.getService() - + "-" - + context.getOperation()) - .build()); + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc::func1, + "input", + options); case 2: return client.startWorkflow( TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc.class, - wf -> wf.func2("input", 2), - WorkflowOptions.newBuilder() - .setWorkflowId( - "generic-handler-test-func2-" - + context.getService() - + "-" - + context.getOperation()) - .build()); + TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc::func2, + "input", + 2, + options); + case 3: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc::func3, + "input", + 2, + 3, + options); + case 4: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc::func4, + "input", + 2, + 3, + 4, + options); + case 5: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc::func5, + "input", + 2, + 3, + 4, + 5, + options); default: throw new IllegalArgumentException("unexpected input: " + input); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java index 410b621013..341eb85539 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java @@ -64,14 +64,14 @@ public OperationHandler operation() { client.startWorkflow( "func1", String.class, - new Object[] {input}, WorkflowOptions.newBuilder() .setWorkflowId( "generic-handler-untyped-" + context.getService() + "-" + context.getOperation()) - .build())); + .build(), + input)); } } } From d1cd542f9616acd34ee4b68078ad04b44aeaafc7 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 16 Apr 2026 08:54:35 -0700 Subject: [PATCH 5/6] Code review --- .../internal/nexus/NexusStartWorkflowHelper.java | 3 ++- .../io/temporal/nexus/TemporalNexusClientImpl.java | 2 ++ .../io/temporal/nexus/TemporalOperationHandler.java | 12 ++++++------ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java index c361f304e7..5cd24018f9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java @@ -16,7 +16,8 @@ /** * Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the - * operation context. Used by both {@code WorkflowRunOperationImpl} and {@code TemporalNexusClient}. + * operation context. Used by both {@code WorkflowRunOperationImpl} and {@code + * TemporalNexusClientImpl}. */ public class NexusStartWorkflowHelper { diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java index 869dfe3cc5..91ff476190 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java @@ -5,12 +5,14 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.common.Experimental; import io.temporal.internal.client.NexusStartWorkflowResponse; import io.temporal.internal.nexus.NexusStartWorkflowHelper; import io.temporal.workflow.Functions; import java.util.Objects; /** Package-private implementation of {@link TemporalNexusClient}. */ +@Experimental final class TemporalNexusClientImpl implements TemporalNexusClient { private final WorkflowClient client; diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java index 126f35a034..347f377092 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -18,13 +18,13 @@ * *
{@code
  * @OperationImpl
- * public OperationHandler createOrder() {
+ * public OperationHandler startTransfer() {
  *   return TemporalOperationHandler.from((context, client, input) -> {
  *     return client.startWorkflow(
- *         OrderWorkflow.class,
- *         wf -> wf.processOrder(input),
+ *         TransferWorkflow.class,
+ *         TransferWorkflow::transfer, input.getFromAccount(), input.getToAccount(),
  *         WorkflowOptions.newBuilder()
- *             .setWorkflowId("order-" + input.getOrderId())
+ *             .setWorkflowId("transfer-" + input.getTransferId())
  *             .build());
  *   });
  * }
@@ -70,7 +70,7 @@ public static  TemporalOperationHandler from(StartFunction sta
   }
 
   @Override
-  public OperationStartResult start(
+  public final OperationStartResult start(
       OperationContext ctx, OperationStartDetails details, T input) {
     InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
     TemporalNexusClient client =
@@ -91,7 +91,7 @@ public OperationStartResult start(
   }
 
   @Override
-  public void cancel(OperationContext ctx, OperationCancelDetails details) {
+  public final void cancel(OperationContext ctx, OperationCancelDetails details) {
     OperationToken token;
     try {
       token = OperationTokenUtil.loadOperationToken(details.getOperationToken());

From ee62ae76da697c2d7c9a833bc9410627a9311537 Mon Sep 17 00:00:00 2001
From: Quinn Klassen 
Date: Thu, 16 Apr 2026 08:56:37 -0700
Subject: [PATCH 6/6] Address feedback

---
 .../temporal/internal/nexus/WorkflowRunTokenTest.java  | 10 +++++++++-
 .../workflow/nexus/AsyncWorkflowOperationTest.java     |  4 ++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java
index 4404c2bc4c..1f22fe8c2e 100644
--- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java
@@ -119,12 +119,20 @@ public void loadWorkflowIdFromBadOperationToken() {
             OperationTokenUtil.loadOperationToken(
                 encoder.encodeToString(badTokenUnknownVersion.getBytes())));
 
-    // Bad token, unknown version
+    // Bad token, unknown type (also has bad version, so loadOperationToken rejects on version)
     String badTokenUnknownType = "{\"t\":4,\"ns\":\"namespace\", \"wid\":\"workflowId\", \"v\":1}";
     Assert.assertThrows(
         IllegalArgumentException.class,
         () ->
             OperationTokenUtil.loadOperationToken(
                 encoder.encodeToString(badTokenUnknownType.getBytes())));
+
+    // Bad token, unknown type with valid version — loadWorkflowRunOperationToken rejects on type
+    String badTokenWrongType = "{\"t\":4,\"ns\":\"namespace\", \"wid\":\"workflowId\"}";
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            OperationTokenUtil.loadWorkflowRunOperationToken(
+                encoder.encodeToString(badTokenWrongType.getBytes())));
   }
 }
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java
index 3ffaca9b7f..3259c428ea 100644
--- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java
@@ -75,7 +75,7 @@ public String execute(String input) {
       // Result should only be completed if the operation is completed
       Assert.assertFalse("Result should not be completed", asyncOpHandle.getResult().isCompleted());
       OperationToken token =
-          OperationTokenUtil.loadOperationToken(asyncExec.getOperationToken().get());
+          OperationTokenUtil.loadWorkflowRunOperationToken(asyncExec.getOperationToken().get());
       Assert.assertTrue(token.getWorkflowId().startsWith(WORKFLOW_ID_PREFIX));
       // Unblock the operation
       Workflow.newExternalWorkflowStub(OperationWorkflow.class, token.getWorkflowId()).unblock();
@@ -87,7 +87,7 @@ public String execute(String input) {
       } catch (NexusOperationFailure e) {
         Assert.assertEquals("TestNexusService1", e.getService());
         Assert.assertEquals("operation", e.getOperation());
-        token = OperationTokenUtil.loadOperationToken(e.getOperationToken());
+        token = OperationTokenUtil.loadWorkflowRunOperationToken(e.getOperationToken());
         Assert.assertTrue(token.getWorkflowId().startsWith(WORKFLOW_ID_PREFIX));
         Assert.assertTrue(e.getCause() instanceof ApplicationFailure);
         ApplicationFailure applicationFailure = (ApplicationFailure) e.getCause();