Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.temporal.internal.nexus;

import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import io.nexusrpc.handler.HandlerException;
import io.nexusrpc.handler.OperationContext;
import io.nexusrpc.handler.OperationStartDetails;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.NexusStartWorkflowResponse;
import java.net.URISyntaxException;
import java.util.function.Function;

/**
* Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the
* operation context. Used by both {@code WorkflowRunOperationImpl} and {@code
* TemporalNexusClientImpl}.
*/
public class NexusStartWorkflowHelper {

/**
* Starts a workflow via the provided invoker function, attaches workflow links to the operation
* context, and returns the response.
*
* @param ctx the operation context (links will be attached as a side-effect)
* @param details the operation start details containing requestId, callback, links
* @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest}
* @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow
* execution
*/
public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(
OperationContext ctx,
OperationStartDetails details,
Function<NexusStartWorkflowRequest, NexusStartWorkflowResponse> invoker) {
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();

NexusStartWorkflowRequest nexusRequest =
new NexusStartWorkflowRequest(
details.getRequestId(),
details.getCallbackUrl(),
details.getCallbackHeaders(),
nexusCtx.getTaskQueue(),
details.getLinks());

NexusStartWorkflowResponse response = invoker.apply(nexusRequest);
WorkflowExecution workflowExec = response.getWorkflowExecution();

// If the start workflow response returned a link use it, otherwise
// create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
: null;
if (workflowEventLink == null) {
workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
}
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
if (nexusLink != null) {
try {
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
} catch (URISyntaxException e) {
throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e);
}
}

return response;
}

private NexusStartWorkflowHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

public class WorkflowRunOperationToken {
/** Deserialized representation of a Nexus operation token. */
public class OperationToken {
@JsonProperty("v")
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Integer version;
Expand All @@ -17,7 +18,7 @@ public class WorkflowRunOperationToken {
@JsonProperty("wid")
private final String workflowId;

public WorkflowRunOperationToken(
public OperationToken(
@JsonProperty("t") Integer type,
@JsonProperty("ns") String namespace,
@JsonProperty("wid") String workflowId,
Expand All @@ -28,8 +29,8 @@ public WorkflowRunOperationToken(
this.version = version;
}

public WorkflowRunOperationToken(String namespace, String workflowId) {
this.type = OperationTokenType.WORKFLOW_RUN;
public OperationToken(OperationTokenType type, String namespace, String workflowId) {
this.type = type;
this.namespace = namespace;
this.workflowId = workflowId;
this.version = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,45 @@ public class OperationTokenUtil {
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();

/**
* Load a workflow run operation token from an operation token.
* Load and validate an operation token without asserting the token type. Use this for cancel
* dispatch where the token type determines the cancel behavior.
*
* @throws IllegalArgumentException if the operation token is invalid
* @throws IllegalArgumentException if the operation token is malformed or has invalid structure
*/
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) {
WorkflowRunOperationToken token;
public static OperationToken loadOperationToken(String operationToken) {
OperationToken token;
try {
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class);
token = mapper.readValue(decoder.decode(operationToken), reference);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage());
}
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
throw new IllegalArgumentException(
"Invalid workflow run token: incorrect operation token type: " + token.getType());
}
if (token.getVersion() != null && token.getVersion() != 0) {
throw new IllegalArgumentException("Invalid workflow run token: unexpected version field");
throw new IllegalArgumentException("Invalid operation token: unexpected version field");
}
if (Strings.isNullOrEmpty(token.getWorkflowId())) {
throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)");
throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)");
}
return token;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic token loader validates workflow-specific field

Low Severity

loadOperationToken is documented as a type-agnostic method for "cancel dispatch where the token type determines the cancel behavior," yet it unconditionally validates that workflowId is present. This is a WORKFLOW_RUN-specific invariant baked into a method meant to handle any token type. When a new OperationTokenType is added that doesn't carry a workflowId, this method will incorrectly reject it. The workflowId check belongs in loadWorkflowRunOperationToken or in the type-specific dispatch branch of TemporalOperationHandler.cancel.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit ee62ae7. Configure here.

}

/**
* Load a workflow run operation token, asserting that the token type is {@link
* OperationTokenType#WORKFLOW_RUN}.
*
* @throws IllegalArgumentException if the operation token is invalid or not a workflow run token
*/
public static OperationToken loadWorkflowRunOperationToken(String operationToken) {
OperationToken token = loadOperationToken(operationToken);
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
throw new IllegalArgumentException(
"Invalid workflow run token: incorrect operation token type: " + token.getType());
}
return token;
}

/**
* Attempt to extract the workflow Id from an operation token.
* Extract the workflow ID from a workflow run operation token.
*
* @throws IllegalArgumentException if the operation token is invalid
*/
Expand All @@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) {
/** Generate a workflow run operation token from a workflow ID and namespace. */
public static String generateWorkflowRunOperationToken(String workflowId, String namespace)
throws JsonProcessingException {
String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId));
String json =
ow.writeValueAsString(
new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId));
return encoder.encodeToString(json.getBytes());
}

Expand Down
Loading
Loading