From 18c53d44193b1ebf7426398aba4503d34d992526 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Fri, 12 Jun 2026 10:11:56 -0700 Subject: [PATCH] Sample code for Workflow Streams Co-Authored-By: Claude Fable 5 --- .github/CODEOWNERS | 4 +- README.md | 2 + build.gradle | 2 +- core/build.gradle | 2 + .../workflowstreams/ExternalPublisher.java | 90 +++++++ .../samples/workflowstreams/HubWorkflow.java | 19 ++ .../workflowstreams/HubWorkflowImpl.java | 32 +++ .../temporal/samples/workflowstreams/Llm.java | 71 ++++++ .../workflowstreams/LlmActivities.java | 15 ++ .../workflowstreams/LlmActivitiesImpl.java | 74 ++++++ .../samples/workflowstreams/LlmWorker.java | 29 +++ .../samples/workflowstreams/LlmWorkflow.java | 15 ++ .../workflowstreams/LlmWorkflowImpl.java | 39 +++ .../workflowstreams/OrderWorkflow.java | 15 ++ .../workflowstreams/OrderWorkflowImpl.java | 58 +++++ .../workflowstreams/PaymentActivities.java | 13 + .../PaymentActivitiesImpl.java | 35 +++ .../workflowstreams/PipelineWorkflow.java | 15 ++ .../workflowstreams/PipelineWorkflowImpl.java | 45 ++++ .../samples/workflowstreams/Publisher.java | 64 +++++ .../samples/workflowstreams/README.md | 154 ++++++++++++ .../ReconnectingSubscriber.java | 72 ++++++ .../samples/workflowstreams/Shared.java | 226 ++++++++++++++++++ .../workflowstreams/StreamsWorker.java | 25 ++ .../workflowstreams/TickerWorkflow.java | 16 ++ .../workflowstreams/TickerWorkflowImpl.java | 48 ++++ .../workflowstreams/TruncatingTicker.java | 119 +++++++++ .../WorkflowStreamsSampleTest.java | 60 +++++ 28 files changed, 1357 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/Llm.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/README.md create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/Shared.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java create mode 100644 core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 55d30357..96c09701 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -7,4 +7,6 @@ # For each one, we add the owning team, as well as # @temporalio/sdk, so the SDK team can continue to # manage repo-wide concerns -/springai/ @temporalio/ai-sdk @temporalio/sdk \ No newline at end of file +/springai/ @temporalio/sdk @temporalio/ai-sdk +/core/src/main/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk +/core/src/test/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk diff --git a/README.md b/README.md index 589932ec..8c8ae324 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,8 @@ Load client configuration from TOML files with programmatic overrides. #### API demonstrations +- [**Workflow Streams**](/core/src/main/java/io/temporal/samples/workflowstreams): Demonstrates a durable publish/subscribe log hosted inside a workflow, using the experimental `temporal-workflowstreams` module. + - [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed. - [**Updatable Timer**](/core/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment. diff --git a/build.gradle b/build.gradle index 8e476ad7..4985ea22 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ subprojects { ext { otelVersion = '1.30.1' otelVersionAlpha = "${otelVersion}-alpha" - javaSDKVersion = '1.35.0' + javaSDKVersion = '1.36.0-SNAPSHOT' camelVersion = '3.22.1' jarVersion = '1.0.0' } diff --git a/core/build.gradle b/core/build.gradle index 3405d505..a06ff7f5 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -2,6 +2,7 @@ dependencies { // Temporal SDK implementation "io.temporal:temporal-sdk:$javaSDKVersion" implementation "io.temporal:temporal-opentracing:$javaSDKVersion" + implementation "io.temporal:temporal-workflowstreams:$javaSDKVersion" testImplementation("io.temporal:temporal-testing:$javaSDKVersion") // Environment configuration @@ -27,6 +28,7 @@ dependencies { implementation 'io.jaegertracing:jaeger-client:1.8.1' // Used in samples + implementation "com.openai:openai-java:4.39.1" implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10' implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '4.0.1' implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '4.0.1' diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java b/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java new file mode 100644 index 00000000..cf1a0fff --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java @@ -0,0 +1,90 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.workflowstreams.Shared.HubInput; +import io.temporal.samples.workflowstreams.Shared.NewsEvent; +import io.temporal.workflowstreams.TopicHandle; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamSubscription; +import java.util.UUID; + +/** + * Scenario 3: external (non-activity) publisher. The hub workflow does no work of its own; it just + * hosts the stream. A separate process publishes news into it using the same client factory used to + * subscribe, then signals the workflow to close. Here the publisher and a subscriber run as two + * threads. + */ +public class ExternalPublisher { + + private static final String[] HEADLINES = { + "markets open higher", "new bridge opens downtown", "local team wins championship", + }; + + /** The sentinel the publisher sends last so the subscriber knows to stop. */ + private static final String DONE_HEADLINE = "-- end of feed --"; + + public static void main(String[] args) throws InterruptedException { + WorkflowClient client = Shared.newWorkflowClient(); + + String workflowId = "workflow-streams-hub-" + UUID.randomUUID(); + HubWorkflow workflow = + client.newWorkflowStub( + HubWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(Shared.TASK_QUEUE) + .build()); + WorkflowClient.start(workflow::host, new HubInput("newsroom")); + System.out.println("Started workflow: " + workflowId); + + Thread subscriber = + new Thread( + () -> { + try (WorkflowStreamClient stream = + WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = + stream.topic(Shared.TOPIC_NEWS).subscribe(0)) { + for (WorkflowStreamItem item : subscription) { + NewsEvent evt = Shared.decode(item, NewsEvent.class); + if (evt.headline.equals(DONE_HEADLINE)) { + return; + } + System.out.printf("[subscriber] %s%n", evt.headline); + } + } + }); + + Thread publisher = + new Thread( + () -> { + try (WorkflowStreamClient producer = + WorkflowStreamClient.newInstance(client, workflowId)) { + TopicHandle news = producer.topic(Shared.TOPIC_NEWS); + for (String headline : HEADLINES) { + news.publish(new NewsEvent(headline)); + System.out.printf("[publisher] sent: %s%n", headline); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + // Force-flush the sentinel and wait for the server to confirm delivery + // before signaling the workflow to close. + news.publish(new NewsEvent(DONE_HEADLINE), /* forceFlush */ true); + producer.flush(); + } + workflow.close(); + System.out.println("[publisher] signaled close"); + }); + + subscriber.start(); + publisher.start(); + subscriber.join(); + publisher.join(); + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java new file mode 100644 index 00000000..95c6f035 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java @@ -0,0 +1,19 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.HubInput; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * Scenario 3: does no work of its own; it exists only to host the stream for an external publisher + * and shuts down on a close signal. + */ +@WorkflowInterface +public interface HubWorkflow { + @WorkflowMethod + String host(HubInput input); + + @SignalMethod + void close(); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java new file mode 100644 index 00000000..d703da0d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java @@ -0,0 +1,32 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.HubInput; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflowstreams.WorkflowStream; + +public class HubWorkflowImpl implements HubWorkflow { + + private boolean closed; + + @WorkflowInit + public HubWorkflowImpl(HubInput input) { + WorkflowStream.newInstance(input.streamState); + } + + @Override + public String host(HubInput input) { + Workflow.await(() -> closed); + + // The publisher publishes its own terminator into the stream before signaling close + // (see ExternalPublisher). Hold the run open briefly so subscribers' final poll + // delivers any items still in the log. + Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY); + return "hub " + input.hubId + " closed"; + } + + @Override + public void close() { + closed = true; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java b/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java new file mode 100644 index 00000000..bf4e7632 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java @@ -0,0 +1,71 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.workflowstreams.Shared.LlmInput; +import io.temporal.samples.workflowstreams.Shared.RetryEvent; +import io.temporal.samples.workflowstreams.Shared.TextDelta; +import io.temporal.workflowstreams.SubscribeOptions; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamSubscription; +import java.util.UUID; + +/** + * Scenario 5: LLM token streaming. The workflow hosts the stream while an activity makes the + * streaming OpenAI call and republishes each token delta. On a retry the activity emits a + * RetryEvent and this subscriber rewinds the terminal and re-renders. Run {@link LlmWorker} with + * {@code OPENAI_API_KEY} set before running this. + */ +public class Llm { + + /** + * ANSI escapes to save the cursor position and to restore it while clearing everything below, so + * a retry can re-render the completion from scratch. + */ + private static final String ANSI_SAVE = "\u001b[s"; + + private static final String ANSI_RESTORE_AND_CLEAR = "\u001b[u\u001b[J"; + + public static void main(String[] args) { + String prompt = args.length > 0 ? args[0] : "In one short paragraph, explain what Temporal is."; + + WorkflowClient client = Shared.newWorkflowClient(); + + String workflowId = "workflow-streams-llm-" + UUID.randomUUID(); + LlmWorkflow workflow = + client.newWorkflowStub( + LlmWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(Shared.LLM_TASK_QUEUE) + .build()); + WorkflowClient.start(workflow::complete, new LlmInput(prompt, null)); + System.out.println("Started workflow: " + workflowId); + + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = + stream.subscribe( + SubscribeOptions.newBuilder() + .setTopics(Shared.TOPIC_DELTA, Shared.TOPIC_RETRY, Shared.TOPIC_COMPLETE) + .build())) { + System.out.print(ANSI_SAVE); + for (WorkflowStreamItem item : subscription) { + if (item.getTopic().equals(Shared.TOPIC_RETRY)) { + RetryEvent evt = Shared.decode(item, RetryEvent.class); + System.out.print(ANSI_RESTORE_AND_CLEAR); + System.out.printf("[retry attempt %d] resetting output%n%n", evt.attempt); + System.out.print(ANSI_SAVE); + } else if (item.getTopic().equals(Shared.TOPIC_DELTA)) { + TextDelta evt = Shared.decode(item, TextDelta.class); + System.out.print(evt.text); + System.out.flush(); + } else if (item.getTopic().equals(Shared.TOPIC_COMPLETE)) { + System.out.println(); + break; + } + } + } + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java new file mode 100644 index 00000000..375d5bbe --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java @@ -0,0 +1,15 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.samples.workflowstreams.Shared.LlmInput; + +@ActivityInterface +public interface LlmActivities { + /** + * Streams an LLM completion to the parent workflow's stream and returns the accumulated full + * text. + */ + @ActivityMethod + String streamCompletion(LlmInput input); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java new file mode 100644 index 00000000..92990ad7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java @@ -0,0 +1,74 @@ +package io.temporal.samples.workflowstreams; + +import com.openai.client.OpenAIClient; +import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.core.http.StreamResponse; +import com.openai.models.chat.completions.ChatCompletionChunk; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import io.temporal.activity.Activity; +import io.temporal.samples.workflowstreams.Shared.LlmInput; +import io.temporal.samples.workflowstreams.Shared.RetryEvent; +import io.temporal.samples.workflowstreams.Shared.TextComplete; +import io.temporal.samples.workflowstreams.Shared.TextDelta; +import io.temporal.workflowstreams.TopicHandle; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamClientOptions; +import java.time.Duration; + +/** + * Calls OpenAI with streaming enabled and republishes each token delta to the workflow stream. The + * accumulated text is published on the complete topic and returned as the activity result. Because + * the activity owns the non-deterministic OpenAI call, the workflow stays deterministic. + * + *

Retries are disabled on the OpenAI client so transient failures surface as Temporal activity + * retries instead. On a retry (attempt > 1) it publishes a RetryEvent so subscribers can reset + * partially rendered output. + */ +public class LlmActivitiesImpl implements LlmActivities { + + static final String DEFAULT_MODEL = "gpt-4o-mini"; + + @Override + public String streamCompletion(LlmInput input) { + WorkflowStreamClientOptions options = + WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(200)).build(); + try (WorkflowStreamClient streamClient = WorkflowStreamClient.fromActivity(options)) { + TopicHandle deltas = streamClient.topic(Shared.TOPIC_DELTA); + TopicHandle complete = streamClient.topic(Shared.TOPIC_COMPLETE); + TopicHandle retry = streamClient.topic(Shared.TOPIC_RETRY); + + int attempt = Activity.getExecutionContext().getInfo().getAttempt(); + if (attempt > 1) { + retry.publish(new RetryEvent(attempt), /* forceFlush */ true); + } + + String model = input.model != null && !input.model.isEmpty() ? input.model : DEFAULT_MODEL; + + // Reads OPENAI_API_KEY from the environment. + OpenAIClient openai = OpenAIOkHttpClient.builder().fromEnv().maxRetries(0).build(); + ChatCompletionCreateParams params = + ChatCompletionCreateParams.builder().model(model).addUserMessage(input.prompt).build(); + + StringBuilder full = new StringBuilder(); + try (StreamResponse stream = + openai.chat().completions().createStreaming(params)) { + stream.stream() + .forEach( + chunk -> + chunk.choices().stream() + .findFirst() + .flatMap(choice -> choice.delta().content()) + .filter(text -> !text.isEmpty()) + .ifPresent( + text -> { + deltas.publish(new TextDelta(text)); + full.append(text); + })); + } + + String fullText = full.toString(); + complete.publish(new TextComplete(fullText), /* forceFlush */ true); + return fullText; + } + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java new file mode 100644 index 00000000..d814f12d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java @@ -0,0 +1,29 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +/** + * Worker for the LLM-streaming scenario. Runs separately from {@link StreamsWorker} so the OpenAI + * dependency and the {@code OPENAI_API_KEY} requirement stay isolated to this one scenario. + * Different task queue too — the other four scenarios won't route work to this worker. + * + *

Kill this worker mid-stream while {@link Llm} is running (and restart it) to trigger a retry: + * Temporal restarts the activity, the activity publishes a RetryEvent on its second attempt, and + * the consumer resets its rendered output. + */ +public class LlmWorker { + + public static void main(String[] args) { + WorkflowClient client = Shared.newWorkflowClient(); + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(Shared.LLM_TASK_QUEUE); + worker.registerWorkflowImplementationTypes(LlmWorkflowImpl.class); + worker.registerActivitiesImplementations(new LlmActivitiesImpl()); + + factory.start(); + System.out.println("LLM worker started for task queue: " + Shared.LLM_TASK_QUEUE); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java new file mode 100644 index 00000000..67aeca0c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java @@ -0,0 +1,15 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.LlmInput; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * Scenario 5: hosts the stream while a streaming activity owns the non-deterministic OpenAI call + * and publishes token deltas back to subscribers. + */ +@WorkflowInterface +public interface LlmWorkflow { + @WorkflowMethod + String complete(LlmInput input); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java new file mode 100644 index 00000000..fb92a39c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java @@ -0,0 +1,39 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.samples.workflowstreams.Shared.LlmInput; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflowstreams.WorkflowStream; +import java.time.Duration; + +public class LlmWorkflowImpl implements LlmWorkflow { + + private final LlmActivities activities = + Workflow.newActivityStub( + LlmActivities.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(2)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build()) + .build()); + + /** + * Construct the stream from a {@code @WorkflowInit} constructor so the publish-signal handler is + * registered before any external publisher (the activity, here) tries to publish. + */ + @WorkflowInit + public LlmWorkflowImpl(LlmInput input) { + WorkflowStream.newInstance(input.streamState); + } + + @Override + public String complete(LlmInput input) { + String result = activities.streamCompletion(input); + + // Hold the run open briefly so the consumer's next poll delivers the activity's + // terminal "complete" event before the workflow exits and the in-memory log is gone. + Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY); + return result; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java new file mode 100644 index 00000000..ec356c48 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java @@ -0,0 +1,15 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.OrderInput; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * Scenario 1: publishes status events directly from workflow code and runs an activity that + * publishes fine-grained progress events to the same stream. A subscriber consumes both topics. + */ +@WorkflowInterface +public interface OrderWorkflow { + @WorkflowMethod + String processOrder(OrderInput input); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java new file mode 100644 index 00000000..42321618 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java @@ -0,0 +1,58 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.activity.ActivityOptions; +import io.temporal.samples.workflowstreams.Shared.OrderInput; +import io.temporal.samples.workflowstreams.Shared.ProgressEvent; +import io.temporal.samples.workflowstreams.Shared.StatusEvent; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflowstreams.WorkflowStream; +import io.temporal.workflowstreams.WorkflowTopicHandle; +import java.time.Duration; + +public class OrderWorkflowImpl implements OrderWorkflow { + + /** + * Gives subscribers a moment to poll for the final published items before the workflow completes + * and the stream stops serving polls. + */ + static final Duration DRAIN_DELAY = Duration.ofMillis(500); + + private final WorkflowStream stream; + private final WorkflowTopicHandle status; + private final WorkflowTopicHandle progress; + + private final PaymentActivities activities = + Workflow.newActivityStub( + PaymentActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofMinutes(1)).build()); + + /** + * Construct the stream from a {@code @WorkflowInit} constructor so its handlers are registered + * before the workflow accepts any messages. Threading {@code streamState} lets the workflow + * survive continue-as-new without losing buffered items. + */ + @WorkflowInit + public OrderWorkflowImpl(OrderInput input) { + stream = WorkflowStream.newInstance(input.streamState); + status = stream.topic(Shared.TOPIC_STATUS); + progress = stream.topic(Shared.TOPIC_PROGRESS); + } + + @Override + public String processOrder(OrderInput input) { + status.publish(new StatusEvent("received", input.orderId)); + + String chargeId = activities.chargeCard(input.orderId); + + status.publish(new StatusEvent("shipped", input.orderId)); + progress.publish(new ProgressEvent("charge id: " + chargeId)); + status.publish(new StatusEvent("complete", input.orderId)); + + // The "complete" status event above is the in-band terminator subscribers break on + // (see Publisher). Hold the run open briefly so subscribers' next poll delivers it + // before this run completes and the in-memory log is gone. + Workflow.sleep(DRAIN_DELAY); + return chargeId; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java new file mode 100644 index 00000000..6512319f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java @@ -0,0 +1,13 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +public interface PaymentActivities { + /** + * Charges a card and publishes fine-grained progress events back to its parent workflow's stream. + */ + @ActivityMethod + String chargeCard(String orderId); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java new file mode 100644 index 00000000..9c3ec43e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java @@ -0,0 +1,35 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.ProgressEvent; +import io.temporal.workflowstreams.TopicHandle; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamClientOptions; +import java.time.Duration; + +public class PaymentActivitiesImpl implements PaymentActivities { + + /** + * {@code WorkflowStreamClient.fromActivity()} reads the parent workflow id and the Temporal + * client from the activity context, so this activity can push events back without any wiring. + * Closing the client flushes any buffered items before the activity returns. + */ + @Override + public String chargeCard(String orderId) { + WorkflowStreamClientOptions options = + WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(200)).build(); + try (WorkflowStreamClient client = WorkflowStreamClient.fromActivity(options)) { + TopicHandle progress = client.topic(Shared.TOPIC_PROGRESS); + progress.publish(new ProgressEvent("charging card...")); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + progress.publish(new ProgressEvent("card charged")); + } + return "charge-" + orderId; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java new file mode 100644 index 00000000..28fce920 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java @@ -0,0 +1,15 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.PipelineInput; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * Scenario 2: publishes a sequence of stage events with delays between them, giving a subscriber + * time to disconnect and reconnect mid-stream. + */ +@WorkflowInterface +public interface PipelineWorkflow { + @WorkflowMethod + String runPipeline(PipelineInput input); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java new file mode 100644 index 00000000..b1419012 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java @@ -0,0 +1,45 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.PipelineInput; +import io.temporal.samples.workflowstreams.Shared.StageEvent; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflowstreams.WorkflowStream; +import io.temporal.workflowstreams.WorkflowTopicHandle; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +public class PipelineWorkflowImpl implements PipelineWorkflow { + + private final WorkflowTopicHandle status; + + @WorkflowInit + public PipelineWorkflowImpl(PipelineInput input) { + WorkflowStream stream = WorkflowStream.newInstance(input.streamState); + status = stream.topic(Shared.TOPIC_STATUS); + } + + @Override + public String runPipeline(PipelineInput input) { + List stages = + Arrays.asList( + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete"); + for (String stage : stages) { + status.publish(new StageEvent(stage)); + if (!stage.equals("complete")) { + Workflow.sleep(Duration.ofSeconds(2)); + } + } + + // The "complete" stage above is the in-band terminator subscribers break on. Hold the + // run open briefly so the final poll delivers it. + Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY); + return "pipeline " + input.pipelineId + " done"; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java b/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java new file mode 100644 index 00000000..f070c99d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java @@ -0,0 +1,64 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.samples.workflowstreams.Shared.OrderInput; +import io.temporal.samples.workflowstreams.Shared.ProgressEvent; +import io.temporal.samples.workflowstreams.Shared.StatusEvent; +import io.temporal.workflowstreams.SubscribeOptions; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamSubscription; +import java.util.UUID; + +/** + * Scenario 1: basic publish/subscribe. Start an order workflow that publishes status events itself + * and runs an activity that publishes progress events to the same stream, then subscribe to both + * topics until the order completes. + */ +public class Publisher { + + public static void main(String[] args) { + WorkflowClient client = Shared.newWorkflowClient(); + + String workflowId = "workflow-streams-order-" + UUID.randomUUID(); + OrderWorkflow workflow = + client.newWorkflowStub( + OrderWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(Shared.TASK_QUEUE) + .build()); + WorkflowClient.start(workflow::processOrder, new OrderInput("order-42")); + System.out.println("Started workflow: " + workflowId); + + // Single subscription over both topics. The loop ends on the in-band "complete" + // terminator (break) or because the subscription exhausts when the workflow reaches a + // terminal state without one (e.g. on failure). Either way we then fetch the workflow + // result, which throws if the workflow failed. + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = + stream.subscribe( + SubscribeOptions.newBuilder() + .setTopics(Shared.TOPIC_STATUS, Shared.TOPIC_PROGRESS) + .build())) { + for (WorkflowStreamItem item : subscription) { + if (item.getTopic().equals(Shared.TOPIC_STATUS)) { + StatusEvent evt = Shared.decode(item, StatusEvent.class); + System.out.printf("[status] %s: order=%s%n", evt.kind, evt.orderId); + if (evt.kind.equals("complete")) { + break; + } + } else if (item.getTopic().equals(Shared.TOPIC_PROGRESS)) { + ProgressEvent evt = Shared.decode(item, ProgressEvent.class); + System.out.printf("[progress] %s%n", evt.message); + } + } + } + + String result = WorkflowStub.fromTyped(workflow).getResult(String.class); + System.out.println("workflow result: " + result); + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/README.md b/core/src/main/java/io/temporal/samples/workflowstreams/README.md new file mode 100644 index 00000000..517c5356 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/README.md @@ -0,0 +1,154 @@ +# Workflow Streams + +A **workflow stream** is a durable publish/subscribe log hosted inside a Temporal +workflow, provided by the experimental `io.temporal:temporal-workflowstreams` +contrib module. External code (activities, starters, other workflows) publishes +messages to named topics via **signals**; subscribers long-poll for new items via +**updates**; a **query** exposes the current offset. Because it is backed by +Temporal's durable execution, delivery is ordered, durable, and exactly-once, with +client-side batching, publisher dedup, continue-as-new survival, and truncation. + +This sample mirrors the +[Go](https://github.com/temporalio/samples-go/tree/main/workflowstreams) and +[Python](https://github.com/temporalio/samples-python/tree/main/workflow_streams) +workflow streams samples. It contains five scenarios. + +> **Note:** This sample currently depends on the unreleased +> `temporal-workflowstreams` module and a reserved-name carve-out in the SDK +> itself, so the repo's `javaSDKVersion` is pinned to `1.36.0-SNAPSHOT` resolved +> from your local Maven repository. Publish the SDK locally first from a sibling +> `../sdk-java` checkout on the `workflow-streams` branch: +> +> ``` +> cd ../sdk-java && ./gradlew publishToMavenLocal +> ``` +> +> Drop the `-SNAPSHOT` pin once a tagged SDK release ships. + +### Key APIs + +Workflow side — construct a stream once in a `@WorkflowInit` constructor and publish to topics: + +```java +@WorkflowInit +public MyWorkflowImpl(MyInput input) { + stream = WorkflowStream.newInstance(input.streamState); +} + +stream.topic("status").publish(new StatusEvent("received")); +``` + +Client side (activities, starters, external code) — publish and subscribe: + +```java +try (WorkflowStreamClient client = WorkflowStreamClient.newInstance(workflowClient, workflowId); + WorkflowStreamSubscription subscription = client.topic("status").subscribe(0)) { + for (WorkflowStreamItem item : subscription) { + StatusEvent evt = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + item.getPayload(), StatusEvent.class, StatusEvent.class); + } +} +``` + +Offsets are **global** across topics. To resume a subscription from where a +previous one left off, pass `subscribe` an offset one past the last item you +consumed: + +```java +try (WorkflowStreamClient client = WorkflowStreamClient.newInstance(workflowClient, workflowId); + WorkflowStreamSubscription subscription = + client.topic("status").subscribe(lastItem.getOffset() + 1)) { // zero starts from the beginning + for (WorkflowStreamItem item : subscription) { + // ... + } +} +``` + +### Steps to run this sample + +1) Run a [Temporal service](https://github.com/temporalio/samples-java/tree/main/#How-to-use) + (for example, `temporal server start-dev`). + +2) Start the worker (serves scenarios 1–4): + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.StreamsWorker +``` + +3) Run any of the scenarios below in a separate terminal. + +#### Scenario 1 — basic publish/subscribe + +An order workflow publishes status events itself while an activity publishes +fine-grained progress events to the same stream. A subscriber consumes both topics. + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.Publisher +``` + +Expected output (interleaving may vary): + +``` +[status] received: order=order-42 +[progress] charging card... +[progress] card charged +[status] shipped: order=order-42 +[progress] charge id: charge-order-42 +[status] complete: order=order-42 +workflow result: charge-order-42 +``` + +#### Scenario 2 — reconnecting subscriber + +A subscriber reads a few pipeline stage events, disconnects, then a brand-new +client resumes from the saved offset without missing events or seeing duplicates. + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.ReconnectingSubscriber +``` + +#### Scenario 3 — external publisher + +The hub workflow does no work of its own; it just hosts the stream. A separate +publisher pushes news into it (using the same client factory used to subscribe) and +then signals the workflow to close. Here a publisher and subscriber run concurrently. + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.ExternalPublisher +``` + +#### Scenario 4 — truncating ticker + +The ticker workflow periodically truncates old entries to bound its history, trading +complete history for a bounded log. A *fast* subscriber that reads from the start keeps +up and sees every tick. A *late* subscriber joins after truncation and resumes from a +stale offset; the stream fast-forwards it to the current base offset, so it cannot see +the truncated ticks. + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.TruncatingTicker +``` + +Expected output (the late subscriber's first line shows the fast-forward): + +``` +[late] requested offset 1 but it was truncated; fast-forwarded to offset 5 (skipped 4 tick(s)) +[late] offset= 5 n=5 +... +``` + +#### Scenario 5 — LLM token streaming + +The workflow hosts the stream while an activity makes a streaming OpenAI call and +republishes each token delta. On a retry it emits a retry event and the subscriber +rewinds the terminal and re-renders. This scenario runs on its own worker and task +queue, and requires `OPENAI_API_KEY`. + +``` +# Terminal A +OPENAI_API_KEY=sk-... ./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.LlmWorker + +# Terminal B +./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.Llm -Pargs="'Explain durable execution in one sentence.'" +``` diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java b/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java new file mode 100644 index 00000000..f6556374 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java @@ -0,0 +1,72 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.workflowstreams.Shared.PipelineInput; +import io.temporal.samples.workflowstreams.Shared.StageEvent; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamSubscription; +import java.util.UUID; + +/** + * Scenario 2: reconnecting subscriber. A subscriber reads a few events, drops its connection, then + * a brand-new client resumes from the saved offset without missing events or seeing duplicates — + * because the events are durable in workflow history, not just held in memory. + */ +public class ReconnectingSubscriber { + + /** How many events the first subscriber reads before disconnecting. */ + private static final int PHASE_1_EVENTS = 2; + + public static void main(String[] args) { + WorkflowClient client = Shared.newWorkflowClient(); + + String workflowId = "workflow-streams-pipeline-" + UUID.randomUUID(); + PipelineWorkflow workflow = + client.newWorkflowStub( + PipelineWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(Shared.TASK_QUEUE) + .build()); + WorkflowClient.start(workflow::runPipeline, new PipelineInput("pipeline-7")); + System.out.println("Started workflow: " + workflowId); + + // next is the offset to resume from: one past the last item we consumed. + long next = 0; + + // Phase 1: connect, read a couple of events, remember our position, disconnect. + System.out.println("--- phase 1: initial subscriber ---"); + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = stream.topic(Shared.TOPIC_STATUS).subscribe(0)) { + int seen = 0; + for (WorkflowStreamItem item : subscription) { + StageEvent evt = Shared.decode(item, StageEvent.class); + next = item.getOffset() + 1; + System.out.printf("offset=%d stage=%s%n", item.getOffset(), evt.stage); + seen++; + if (seen >= PHASE_1_EVENTS) { + break; + } + } + } + + System.out.printf("--- disconnected; will resume from offset %d ---%n", next); + + // Phase 2: a new client resumes from the saved offset until the pipeline completes. + System.out.println("--- phase 2: reconnected subscriber ---"); + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = + stream.topic(Shared.TOPIC_STATUS).subscribe(next)) { + for (WorkflowStreamItem item : subscription) { + StageEvent evt = Shared.decode(item, StageEvent.class); + System.out.printf("offset=%d stage=%s%n", item.getOffset(), evt.stage); + if (evt.stage.equals("complete")) { + break; + } + } + } + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java b/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java new file mode 100644 index 00000000..4f4f5ec8 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java @@ -0,0 +1,226 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.envconfig.ClientConfigProfile; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamState; +import java.io.IOException; + +/** + * Constants and shared types for the workflow streams sample. A workflow stream is a durable + * publish/subscribe log hosted inside a Temporal workflow: external code publishes to named topics + * via signals, subscribers long-poll for new items via updates, and a query exposes the current + * offset. See the experimental {@code io.temporal:temporal-workflowstreams} module. + */ +public final class Shared { + + /** + * Task queues. The LLM scenario runs on its own queue so its OpenAI dependency and API key + * requirement stay isolated from the other workflows. + */ + public static final String TASK_QUEUE = "workflow-streams"; + + public static final String LLM_TASK_QUEUE = "workflow-streams-llm"; + + /** Topic names used across the scenarios. */ + public static final String TOPIC_STATUS = "status"; + + public static final String TOPIC_PROGRESS = "progress"; + public static final String TOPIC_NEWS = "news"; + public static final String TOPIC_TICK = "tick"; + public static final String TOPIC_DELTA = "delta"; + public static final String TOPIC_COMPLETE = "complete"; + public static final String TOPIC_RETRY = "retry"; + + /** Creates a WorkflowClient from environment configuration. */ + public static WorkflowClient newWorkflowClient() { + ClientConfigProfile profile; + try { + profile = ClientConfigProfile.load(); + } catch (IOException e) { + throw new RuntimeException("Failed to load client configuration", e); + } + WorkflowServiceStubs service = + WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions()); + return WorkflowClient.newInstance(service, profile.toWorkflowClientOptions()); + } + + /** Decodes a subscribed item's payload with the default data converter. */ + public static T decode(WorkflowStreamItem item, Class type) { + return DefaultDataConverter.STANDARD_INSTANCE.fromPayload(item.getPayload(), type, type); + } + + // Each workflow input carries an optional WorkflowStreamState so the stream can survive + // continue-as-new: thread the prior run's state back in and pass it to + // WorkflowStream.newInstance. It is null on a fresh start. + + /** Input to OrderWorkflow (scenario 1). */ + public static class OrderInput { + public String orderId; + public WorkflowStreamState streamState; + + public OrderInput() {} + + public OrderInput(String orderId) { + this.orderId = orderId; + } + } + + /** Input to PipelineWorkflow (scenario 2). */ + public static class PipelineInput { + public String pipelineId; + public WorkflowStreamState streamState; + + public PipelineInput() {} + + public PipelineInput(String pipelineId) { + this.pipelineId = pipelineId; + } + } + + /** Input to HubWorkflow (scenario 3). */ + public static class HubInput { + public String hubId; + public WorkflowStreamState streamState; + + public HubInput() {} + + public HubInput(String hubId) { + this.hubId = hubId; + } + } + + /** + * Input to TickerWorkflow (scenario 4). Zero-valued fields fall back to the defaults applied in + * the workflow. + */ + public static class TickerInput { + public int count; + public int keepLast; + public int truncateEvery; + public long intervalMs; + public WorkflowStreamState streamState; + + public TickerInput() {} + + public TickerInput(int count, int keepLast, int truncateEvery) { + this.count = count; + this.keepLast = keepLast; + this.truncateEvery = truncateEvery; + } + } + + /** Input to LlmWorkflow (scenario 5). */ + public static class LlmInput { + public String prompt; + public String model; + public WorkflowStreamState streamState; + + public LlmInput() {} + + public LlmInput(String prompt, String model) { + this.prompt = prompt; + this.model = model; + } + } + + // Event types published to the stream. They are JSON-encoded by the default data converter on + // the way in and decoded by subscribers on the way out. + + /** Reports an order's lifecycle stage on TOPIC_STATUS. */ + public static class StatusEvent { + public String kind; + public String orderId; + + public StatusEvent() {} + + public StatusEvent(String kind, String orderId) { + this.kind = kind; + this.orderId = orderId; + } + } + + /** Reports fine-grained progress on TOPIC_PROGRESS. */ + public static class ProgressEvent { + public String message; + + public ProgressEvent() {} + + public ProgressEvent(String message) { + this.message = message; + } + } + + /** Reports a pipeline stage on TOPIC_STATUS. */ + public static class StageEvent { + public String stage; + + public StageEvent() {} + + public StageEvent(String stage) { + this.stage = stage; + } + } + + /** Published by an external publisher on TOPIC_NEWS. */ + public static class NewsEvent { + public String headline; + + public NewsEvent() {} + + public NewsEvent(String headline) { + this.headline = headline; + } + } + + /** Published by the ticker on TOPIC_TICK. */ + public static class TickEvent { + public int n; + + public TickEvent() {} + + public TickEvent(int n) { + this.n = n; + } + } + + /** A single streamed token chunk on TOPIC_DELTA. */ + public static class TextDelta { + public String text; + + public TextDelta() {} + + public TextDelta(String text) { + this.text = text; + } + } + + /** The final accumulated completion on TOPIC_COMPLETE. */ + public static class TextComplete { + public String fullText; + + public TextComplete() {} + + public TextComplete(String fullText) { + this.fullText = fullText; + } + } + + /** + * Signals that the streaming activity is on a retry attempt, so subscribers can reset any + * partially rendered output. + */ + public static class RetryEvent { + public int attempt; + + public RetryEvent() {} + + public RetryEvent(int attempt) { + this.attempt = attempt; + } + } + + private Shared() {} +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java b/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java new file mode 100644 index 00000000..37c8c2a3 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java @@ -0,0 +1,25 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +/** Worker serving scenarios 1-4. */ +public class StreamsWorker { + + public static void main(String[] args) { + WorkflowClient client = Shared.newWorkflowClient(); + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(Shared.TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + OrderWorkflowImpl.class, + PipelineWorkflowImpl.class, + HubWorkflowImpl.class, + TickerWorkflowImpl.class); + worker.registerActivitiesImplementations(new PaymentActivitiesImpl()); + + factory.start(); + System.out.println("Worker started for task queue: " + Shared.TASK_QUEUE); + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java new file mode 100644 index 00000000..1cdae00c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java @@ -0,0 +1,16 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.TickerInput; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +/** + * Scenario 4: publishes a long run of tick events and bounds the log by periodically truncating + * everything but the most recent entries. Fast subscribers see every tick; subscribers that fall + * behind the truncation point silently jump forward to the new base offset. + */ +@WorkflowInterface +public interface TickerWorkflow { + @WorkflowMethod + String tick(TickerInput input); +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java new file mode 100644 index 00000000..ee99475b --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java @@ -0,0 +1,48 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.samples.workflowstreams.Shared.TickEvent; +import io.temporal.samples.workflowstreams.Shared.TickerInput; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflowstreams.WorkflowStream; +import io.temporal.workflowstreams.WorkflowTopicHandle; +import java.time.Duration; + +public class TickerWorkflowImpl implements TickerWorkflow { + + private final WorkflowStream stream; + private final WorkflowTopicHandle tick; + + @WorkflowInit + public TickerWorkflowImpl(TickerInput input) { + stream = WorkflowStream.newInstance(input.streamState); + tick = stream.topic(Shared.TOPIC_TICK); + } + + @Override + public String tick(TickerInput input) { + int count = input.count != 0 ? input.count : 50; + int keepLast = input.keepLast != 0 ? input.keepLast : 10; + int truncateEvery = input.truncateEvery != 0 ? input.truncateEvery : 5; + long intervalMs = input.intervalMs != 0 ? input.intervalMs : 200; + + int published = 0; + for (int n = 0; n < count; n++) { + tick.publish(new TickEvent(n)); + published++; + Workflow.sleep(Duration.ofMillis(intervalMs)); + + if (published % truncateEvery == 0 && published > keepLast) { + // Drop everything except the last keepLast entries. Future polls positioned + // before the new base offset are fast-forwarded. + stream.truncate(published - keepLast); + } + } + + // The final tick (n == count - 1) is the in-band terminator subscribers break on. + // keepLast guarantees that final offset survives the last truncation so even slow + // consumers eventually see it. Hold the run open briefly so the final poll delivers it. + Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY); + return "ticker emitted " + published + " events"; + } +} diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java b/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java new file mode 100644 index 00000000..378742f5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java @@ -0,0 +1,119 @@ +package io.temporal.samples.workflowstreams; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.workflowstreams.Shared.TickEvent; +import io.temporal.samples.workflowstreams.Shared.TickerInput; +import io.temporal.workflowstreams.WorkflowStreamClient; +import io.temporal.workflowstreams.WorkflowStreamItem; +import io.temporal.workflowstreams.WorkflowStreamSubscription; +import java.util.UUID; + +/** + * Scenario 4: bounded log via truncation. The ticker workflow periodically truncates old entries to + * bound its history, trading complete history for a bounded log. A "fast" subscriber that reads + * from the start keeps up and sees every tick. A "late" subscriber that joins after truncation and + * resumes from a stale offset is fast-forwarded to the current base offset — it cannot see the + * truncated ticks. + */ +public class TruncatingTicker { + + private static final int TICK_COUNT = 30; + + /** + * keepLast bounds the workflow's log to its most recent entries; truncateEvery controls how often + * it truncates. They are deliberately small so the early offsets are dropped quickly. + */ + private static final int KEEP_LAST = 5; + + private static final int TRUNCATE_EVERY = 5; + + /** + * An early offset the late subscriber deliberately resumes from. By the time it subscribes, the + * workflow has truncated past it. + */ + private static final long STALE_OFFSET = 1; + + public static void main(String[] args) throws InterruptedException { + WorkflowClient client = Shared.newWorkflowClient(); + + String workflowId = "workflow-streams-ticker-" + UUID.randomUUID(); + TickerWorkflow workflow = + client.newWorkflowStub( + TickerWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(workflowId) + .setTaskQueue(Shared.TASK_QUEUE) + .build()); + WorkflowClient.start(workflow::tick, new TickerInput(TICK_COUNT, KEEP_LAST, TRUNCATE_EVERY)); + System.out.println("Started workflow: " + workflowId); + + int lastN = TICK_COUNT - 1; + + Thread fast = new Thread(() -> fastSubscriber(client, workflowId, lastN)); + Thread late = new Thread(() -> lateSubscriber(client, workflowId, lastN)); + fast.start(); + late.start(); + fast.join(); + late.join(); + System.exit(0); + } + + /** Reads from the beginning and keeps up with every tick. */ + private static void fastSubscriber(WorkflowClient client, String workflowId, int lastN) { + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId); + WorkflowStreamSubscription subscription = stream.topic(Shared.TOPIC_TICK).subscribe(0)) { + for (WorkflowStreamItem item : subscription) { + TickEvent evt = Shared.decode(item, TickEvent.class); + System.out.printf("[fast] offset=%3d n=%d%n", item.getOffset(), evt.n); + if (evt.n == lastN) { + return; + } + } + } + } + + /** + * Waits until the workflow has truncated past STALE_OFFSET, then resumes from that + * (now-truncated) offset. The stream fast-forwards it to the current base offset, so its first + * item necessarily skips the truncated ticks. + */ + private static void lateSubscriber(WorkflowClient client, String workflowId, int lastN) { + try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId)) { + // The first truncation only fires once published reaches the first multiple of + // TRUNCATE_EVERY greater than KEEP_LAST; after it, the base offset is + // published - KEEP_LAST. Waiting until the head passes that point guarantees the + // base has advanced beyond STALE_OFFSET. + int firstTruncate = ((KEEP_LAST / TRUNCATE_EVERY) + 1) * TRUNCATE_EVERY; + while (stream.getOffset() <= firstTruncate) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + + try (WorkflowStreamSubscription subscription = + stream.topic(Shared.TOPIC_TICK).subscribe(STALE_OFFSET)) { + boolean first = true; + for (WorkflowStreamItem item : subscription) { + TickEvent evt = Shared.decode(item, TickEvent.class); + if (first) { + if (item.getOffset() > STALE_OFFSET) { + System.out.printf( + "[late] requested offset %d but it was truncated; fast-forwarded to offset %d" + + " (skipped %d tick(s))%n", + STALE_OFFSET, item.getOffset(), item.getOffset() - STALE_OFFSET); + } + first = false; + } + System.out.printf("[late] offset=%3d n=%d%n", item.getOffset(), evt.n); + if (evt.n == lastN) { + return; + } + } + } + } + } +} diff --git a/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java b/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java new file mode 100644 index 00000000..21289e69 --- /dev/null +++ b/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java @@ -0,0 +1,60 @@ +package io.temporal.samples.workflowstreams; + +import static org.junit.Assert.assertEquals; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.samples.workflowstreams.Shared.HubInput; +import io.temporal.samples.workflowstreams.Shared.OrderInput; +import io.temporal.samples.workflowstreams.Shared.PipelineInput; +import io.temporal.testing.TestWorkflowRule; +import org.junit.Rule; +import org.junit.Test; + +/** + * Workflow-side tests. The client subscribe path needs a live Temporal service, so it is exercised + * by running the scenarios against a dev server (see README). + */ +public class WorkflowStreamsSampleTest { + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowTypes( + OrderWorkflowImpl.class, PipelineWorkflowImpl.class, HubWorkflowImpl.class) + .setActivityImplementations(new PaymentActivitiesImpl()) + .build(); + + private T newWorkflowStub(Class type) { + return testWorkflowRule + .getTestEnvironment() + .getWorkflowClient() + .newWorkflowStub( + type, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + } + + @Test + public void testOrderWorkflow() { + OrderWorkflow workflow = newWorkflowStub(OrderWorkflow.class); + String result = workflow.processOrder(new OrderInput("order-42")); + assertEquals("charge-order-42", result); + } + + @Test + public void testPipelineWorkflow() { + PipelineWorkflow workflow = newWorkflowStub(PipelineWorkflow.class); + String result = workflow.runPipeline(new PipelineInput("p1")); + assertEquals("pipeline p1 done", result); + } + + @Test + public void testHubWorkflow() { + HubWorkflow workflow = newWorkflowStub(HubWorkflow.class); + WorkflowClient.start(workflow::host, new HubInput("newsroom")); + workflow.close(); + String result = WorkflowStub.fromTyped(workflow).getResult(String.class); + assertEquals("hub newsroom closed", result); + } +}