diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 55d30357..96c09701 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -7,4 +7,6 @@
# For each one, we add the owning team, as well as
# @temporalio/sdk, so the SDK team can continue to
# manage repo-wide concerns
-/springai/ @temporalio/ai-sdk @temporalio/sdk
\ No newline at end of file
+/springai/ @temporalio/sdk @temporalio/ai-sdk
+/core/src/main/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk
+/core/src/test/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk
diff --git a/README.md b/README.md
index 589932ec..8c8ae324 100644
--- a/README.md
+++ b/README.md
@@ -117,6 +117,8 @@ Load client configuration from TOML files with programmatic overrides.
#### API demonstrations
+- [**Workflow Streams**](/core/src/main/java/io/temporal/samples/workflowstreams): Demonstrates a durable publish/subscribe log hosted inside a workflow, using the experimental `temporal-workflowstreams` module.
+
- [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed.
- [**Updatable Timer**](/core/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment.
diff --git a/build.gradle b/build.gradle
index 8e476ad7..4985ea22 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,7 +21,7 @@ subprojects {
ext {
otelVersion = '1.30.1'
otelVersionAlpha = "${otelVersion}-alpha"
- javaSDKVersion = '1.35.0'
+ javaSDKVersion = '1.36.0-SNAPSHOT'
camelVersion = '3.22.1'
jarVersion = '1.0.0'
}
diff --git a/core/build.gradle b/core/build.gradle
index 3405d505..a06ff7f5 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -2,6 +2,7 @@ dependencies {
// Temporal SDK
implementation "io.temporal:temporal-sdk:$javaSDKVersion"
implementation "io.temporal:temporal-opentracing:$javaSDKVersion"
+ implementation "io.temporal:temporal-workflowstreams:$javaSDKVersion"
testImplementation("io.temporal:temporal-testing:$javaSDKVersion")
// Environment configuration
@@ -27,6 +28,7 @@ dependencies {
implementation 'io.jaegertracing:jaeger-client:1.8.1'
// Used in samples
+ implementation "com.openai:openai-java:4.39.1"
implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10'
implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '4.0.1'
implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '4.0.1'
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java b/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java
new file mode 100644
index 00000000..cf1a0fff
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/ExternalPublisher.java
@@ -0,0 +1,90 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.samples.workflowstreams.Shared.HubInput;
+import io.temporal.samples.workflowstreams.Shared.NewsEvent;
+import io.temporal.workflowstreams.TopicHandle;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamSubscription;
+import java.util.UUID;
+
+/**
+ * Scenario 3: external (non-activity) publisher. The hub workflow does no work of its own; it just
+ * hosts the stream. A separate process publishes news into it using the same client factory used to
+ * subscribe, then signals the workflow to close. Here the publisher and a subscriber run as two
+ * threads.
+ */
+public class ExternalPublisher {
+
+ private static final String[] HEADLINES = {
+ "markets open higher", "new bridge opens downtown", "local team wins championship",
+ };
+
+ /** The sentinel the publisher sends last so the subscriber knows to stop. */
+ private static final String DONE_HEADLINE = "-- end of feed --";
+
+ public static void main(String[] args) throws InterruptedException {
+ WorkflowClient client = Shared.newWorkflowClient();
+
+ String workflowId = "workflow-streams-hub-" + UUID.randomUUID();
+ HubWorkflow workflow =
+ client.newWorkflowStub(
+ HubWorkflow.class,
+ WorkflowOptions.newBuilder()
+ .setWorkflowId(workflowId)
+ .setTaskQueue(Shared.TASK_QUEUE)
+ .build());
+ WorkflowClient.start(workflow::host, new HubInput("newsroom"));
+ System.out.println("Started workflow: " + workflowId);
+
+ Thread subscriber =
+ new Thread(
+ () -> {
+ try (WorkflowStreamClient stream =
+ WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription =
+ stream.topic(Shared.TOPIC_NEWS).subscribe(0)) {
+ for (WorkflowStreamItem item : subscription) {
+ NewsEvent evt = Shared.decode(item, NewsEvent.class);
+ if (evt.headline.equals(DONE_HEADLINE)) {
+ return;
+ }
+ System.out.printf("[subscriber] %s%n", evt.headline);
+ }
+ }
+ });
+
+ Thread publisher =
+ new Thread(
+ () -> {
+ try (WorkflowStreamClient producer =
+ WorkflowStreamClient.newInstance(client, workflowId)) {
+ TopicHandle news = producer.topic(Shared.TOPIC_NEWS);
+ for (String headline : HEADLINES) {
+ news.publish(new NewsEvent(headline));
+ System.out.printf("[publisher] sent: %s%n", headline);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ // Force-flush the sentinel and wait for the server to confirm delivery
+ // before signaling the workflow to close.
+ news.publish(new NewsEvent(DONE_HEADLINE), /* forceFlush */ true);
+ producer.flush();
+ }
+ workflow.close();
+ System.out.println("[publisher] signaled close");
+ });
+
+ subscriber.start();
+ publisher.start();
+ subscriber.join();
+ publisher.join();
+ System.exit(0);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java
new file mode 100644
index 00000000..95c6f035
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflow.java
@@ -0,0 +1,19 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.HubInput;
+import io.temporal.workflow.SignalMethod;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+/**
+ * Scenario 3: does no work of its own; it exists only to host the stream for an external publisher
+ * and shuts down on a close signal.
+ */
+@WorkflowInterface
+public interface HubWorkflow {
+ @WorkflowMethod
+ String host(HubInput input);
+
+ @SignalMethod
+ void close();
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java
new file mode 100644
index 00000000..d703da0d
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/HubWorkflowImpl.java
@@ -0,0 +1,32 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.HubInput;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInit;
+import io.temporal.workflowstreams.WorkflowStream;
+
+public class HubWorkflowImpl implements HubWorkflow {
+
+ private boolean closed;
+
+ @WorkflowInit
+ public HubWorkflowImpl(HubInput input) {
+ WorkflowStream.newInstance(input.streamState);
+ }
+
+ @Override
+ public String host(HubInput input) {
+ Workflow.await(() -> closed);
+
+ // The publisher publishes its own terminator into the stream before signaling close
+ // (see ExternalPublisher). Hold the run open briefly so subscribers' final poll
+ // delivers any items still in the log.
+ Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY);
+ return "hub " + input.hubId + " closed";
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java b/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java
new file mode 100644
index 00000000..bf4e7632
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/Llm.java
@@ -0,0 +1,71 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.samples.workflowstreams.Shared.LlmInput;
+import io.temporal.samples.workflowstreams.Shared.RetryEvent;
+import io.temporal.samples.workflowstreams.Shared.TextDelta;
+import io.temporal.workflowstreams.SubscribeOptions;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamSubscription;
+import java.util.UUID;
+
+/**
+ * Scenario 5: LLM token streaming. The workflow hosts the stream while an activity makes the
+ * streaming OpenAI call and republishes each token delta. On a retry the activity emits a
+ * RetryEvent and this subscriber rewinds the terminal and re-renders. Run {@link LlmWorker} with
+ * {@code OPENAI_API_KEY} set before running this.
+ */
+public class Llm {
+
+ /**
+ * ANSI escapes to save the cursor position and to restore it while clearing everything below, so
+ * a retry can re-render the completion from scratch.
+ */
+ private static final String ANSI_SAVE = "\u001b[s";
+
+ private static final String ANSI_RESTORE_AND_CLEAR = "\u001b[u\u001b[J";
+
+ public static void main(String[] args) {
+ String prompt = args.length > 0 ? args[0] : "In one short paragraph, explain what Temporal is.";
+
+ WorkflowClient client = Shared.newWorkflowClient();
+
+ String workflowId = "workflow-streams-llm-" + UUID.randomUUID();
+ LlmWorkflow workflow =
+ client.newWorkflowStub(
+ LlmWorkflow.class,
+ WorkflowOptions.newBuilder()
+ .setWorkflowId(workflowId)
+ .setTaskQueue(Shared.LLM_TASK_QUEUE)
+ .build());
+ WorkflowClient.start(workflow::complete, new LlmInput(prompt, null));
+ System.out.println("Started workflow: " + workflowId);
+
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription =
+ stream.subscribe(
+ SubscribeOptions.newBuilder()
+ .setTopics(Shared.TOPIC_DELTA, Shared.TOPIC_RETRY, Shared.TOPIC_COMPLETE)
+ .build())) {
+ System.out.print(ANSI_SAVE);
+ for (WorkflowStreamItem item : subscription) {
+ if (item.getTopic().equals(Shared.TOPIC_RETRY)) {
+ RetryEvent evt = Shared.decode(item, RetryEvent.class);
+ System.out.print(ANSI_RESTORE_AND_CLEAR);
+ System.out.printf("[retry attempt %d] resetting output%n%n", evt.attempt);
+ System.out.print(ANSI_SAVE);
+ } else if (item.getTopic().equals(Shared.TOPIC_DELTA)) {
+ TextDelta evt = Shared.decode(item, TextDelta.class);
+ System.out.print(evt.text);
+ System.out.flush();
+ } else if (item.getTopic().equals(Shared.TOPIC_COMPLETE)) {
+ System.out.println();
+ break;
+ }
+ }
+ }
+ System.exit(0);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java
new file mode 100644
index 00000000..375d5bbe
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivities.java
@@ -0,0 +1,15 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import io.temporal.samples.workflowstreams.Shared.LlmInput;
+
+@ActivityInterface
+public interface LlmActivities {
+ /**
+ * Streams an LLM completion to the parent workflow's stream and returns the accumulated full
+ * text.
+ */
+ @ActivityMethod
+ String streamCompletion(LlmInput input);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java
new file mode 100644
index 00000000..92990ad7
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmActivitiesImpl.java
@@ -0,0 +1,74 @@
+package io.temporal.samples.workflowstreams;
+
+import com.openai.client.OpenAIClient;
+import com.openai.client.okhttp.OpenAIOkHttpClient;
+import com.openai.core.http.StreamResponse;
+import com.openai.models.chat.completions.ChatCompletionChunk;
+import com.openai.models.chat.completions.ChatCompletionCreateParams;
+import io.temporal.activity.Activity;
+import io.temporal.samples.workflowstreams.Shared.LlmInput;
+import io.temporal.samples.workflowstreams.Shared.RetryEvent;
+import io.temporal.samples.workflowstreams.Shared.TextComplete;
+import io.temporal.samples.workflowstreams.Shared.TextDelta;
+import io.temporal.workflowstreams.TopicHandle;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamClientOptions;
+import java.time.Duration;
+
+/**
+ * Calls OpenAI with streaming enabled and republishes each token delta to the workflow stream. The
+ * accumulated text is published on the complete topic and returned as the activity result. Because
+ * the activity owns the non-deterministic OpenAI call, the workflow stays deterministic.
+ *
+ *
Retries are disabled on the OpenAI client so transient failures surface as Temporal activity
+ * retries instead. On a retry (attempt > 1) it publishes a RetryEvent so subscribers can reset
+ * partially rendered output.
+ */
+public class LlmActivitiesImpl implements LlmActivities {
+
+ static final String DEFAULT_MODEL = "gpt-4o-mini";
+
+ @Override
+ public String streamCompletion(LlmInput input) {
+ WorkflowStreamClientOptions options =
+ WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(200)).build();
+ try (WorkflowStreamClient streamClient = WorkflowStreamClient.fromActivity(options)) {
+ TopicHandle deltas = streamClient.topic(Shared.TOPIC_DELTA);
+ TopicHandle complete = streamClient.topic(Shared.TOPIC_COMPLETE);
+ TopicHandle retry = streamClient.topic(Shared.TOPIC_RETRY);
+
+ int attempt = Activity.getExecutionContext().getInfo().getAttempt();
+ if (attempt > 1) {
+ retry.publish(new RetryEvent(attempt), /* forceFlush */ true);
+ }
+
+ String model = input.model != null && !input.model.isEmpty() ? input.model : DEFAULT_MODEL;
+
+ // Reads OPENAI_API_KEY from the environment.
+ OpenAIClient openai = OpenAIOkHttpClient.builder().fromEnv().maxRetries(0).build();
+ ChatCompletionCreateParams params =
+ ChatCompletionCreateParams.builder().model(model).addUserMessage(input.prompt).build();
+
+ StringBuilder full = new StringBuilder();
+ try (StreamResponse stream =
+ openai.chat().completions().createStreaming(params)) {
+ stream.stream()
+ .forEach(
+ chunk ->
+ chunk.choices().stream()
+ .findFirst()
+ .flatMap(choice -> choice.delta().content())
+ .filter(text -> !text.isEmpty())
+ .ifPresent(
+ text -> {
+ deltas.publish(new TextDelta(text));
+ full.append(text);
+ }));
+ }
+
+ String fullText = full.toString();
+ complete.publish(new TextComplete(fullText), /* forceFlush */ true);
+ return fullText;
+ }
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java
new file mode 100644
index 00000000..d814f12d
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorker.java
@@ -0,0 +1,29 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.Worker;
+import io.temporal.worker.WorkerFactory;
+
+/**
+ * Worker for the LLM-streaming scenario. Runs separately from {@link StreamsWorker} so the OpenAI
+ * dependency and the {@code OPENAI_API_KEY} requirement stay isolated to this one scenario.
+ * Different task queue too — the other four scenarios won't route work to this worker.
+ *
+ * Kill this worker mid-stream while {@link Llm} is running (and restart it) to trigger a retry:
+ * Temporal restarts the activity, the activity publishes a RetryEvent on its second attempt, and
+ * the consumer resets its rendered output.
+ */
+public class LlmWorker {
+
+ public static void main(String[] args) {
+ WorkflowClient client = Shared.newWorkflowClient();
+ WorkerFactory factory = WorkerFactory.newInstance(client);
+
+ Worker worker = factory.newWorker(Shared.LLM_TASK_QUEUE);
+ worker.registerWorkflowImplementationTypes(LlmWorkflowImpl.class);
+ worker.registerActivitiesImplementations(new LlmActivitiesImpl());
+
+ factory.start();
+ System.out.println("LLM worker started for task queue: " + Shared.LLM_TASK_QUEUE);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java
new file mode 100644
index 00000000..67aeca0c
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflow.java
@@ -0,0 +1,15 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.LlmInput;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+/**
+ * Scenario 5: hosts the stream while a streaming activity owns the non-deterministic OpenAI call
+ * and publishes token deltas back to subscribers.
+ */
+@WorkflowInterface
+public interface LlmWorkflow {
+ @WorkflowMethod
+ String complete(LlmInput input);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java
new file mode 100644
index 00000000..fb92a39c
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/LlmWorkflowImpl.java
@@ -0,0 +1,39 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.samples.workflowstreams.Shared.LlmInput;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInit;
+import io.temporal.workflowstreams.WorkflowStream;
+import java.time.Duration;
+
+public class LlmWorkflowImpl implements LlmWorkflow {
+
+ private final LlmActivities activities =
+ Workflow.newActivityStub(
+ LlmActivities.class,
+ ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofMinutes(2))
+ .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(3).build())
+ .build());
+
+ /**
+ * Construct the stream from a {@code @WorkflowInit} constructor so the publish-signal handler is
+ * registered before any external publisher (the activity, here) tries to publish.
+ */
+ @WorkflowInit
+ public LlmWorkflowImpl(LlmInput input) {
+ WorkflowStream.newInstance(input.streamState);
+ }
+
+ @Override
+ public String complete(LlmInput input) {
+ String result = activities.streamCompletion(input);
+
+ // Hold the run open briefly so the consumer's next poll delivers the activity's
+ // terminal "complete" event before the workflow exits and the in-memory log is gone.
+ Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY);
+ return result;
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java
new file mode 100644
index 00000000..ec356c48
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflow.java
@@ -0,0 +1,15 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.OrderInput;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+/**
+ * Scenario 1: publishes status events directly from workflow code and runs an activity that
+ * publishes fine-grained progress events to the same stream. A subscriber consumes both topics.
+ */
+@WorkflowInterface
+public interface OrderWorkflow {
+ @WorkflowMethod
+ String processOrder(OrderInput input);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java
new file mode 100644
index 00000000..42321618
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/OrderWorkflowImpl.java
@@ -0,0 +1,58 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.samples.workflowstreams.Shared.OrderInput;
+import io.temporal.samples.workflowstreams.Shared.ProgressEvent;
+import io.temporal.samples.workflowstreams.Shared.StatusEvent;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInit;
+import io.temporal.workflowstreams.WorkflowStream;
+import io.temporal.workflowstreams.WorkflowTopicHandle;
+import java.time.Duration;
+
+public class OrderWorkflowImpl implements OrderWorkflow {
+
+ /**
+ * Gives subscribers a moment to poll for the final published items before the workflow completes
+ * and the stream stops serving polls.
+ */
+ static final Duration DRAIN_DELAY = Duration.ofMillis(500);
+
+ private final WorkflowStream stream;
+ private final WorkflowTopicHandle status;
+ private final WorkflowTopicHandle progress;
+
+ private final PaymentActivities activities =
+ Workflow.newActivityStub(
+ PaymentActivities.class,
+ ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofMinutes(1)).build());
+
+ /**
+ * Construct the stream from a {@code @WorkflowInit} constructor so its handlers are registered
+ * before the workflow accepts any messages. Threading {@code streamState} lets the workflow
+ * survive continue-as-new without losing buffered items.
+ */
+ @WorkflowInit
+ public OrderWorkflowImpl(OrderInput input) {
+ stream = WorkflowStream.newInstance(input.streamState);
+ status = stream.topic(Shared.TOPIC_STATUS);
+ progress = stream.topic(Shared.TOPIC_PROGRESS);
+ }
+
+ @Override
+ public String processOrder(OrderInput input) {
+ status.publish(new StatusEvent("received", input.orderId));
+
+ String chargeId = activities.chargeCard(input.orderId);
+
+ status.publish(new StatusEvent("shipped", input.orderId));
+ progress.publish(new ProgressEvent("charge id: " + chargeId));
+ status.publish(new StatusEvent("complete", input.orderId));
+
+ // The "complete" status event above is the in-band terminator subscribers break on
+ // (see Publisher). Hold the run open briefly so subscribers' next poll delivers it
+ // before this run completes and the in-memory log is gone.
+ Workflow.sleep(DRAIN_DELAY);
+ return chargeId;
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java
new file mode 100644
index 00000000..6512319f
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivities.java
@@ -0,0 +1,13 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+@ActivityInterface
+public interface PaymentActivities {
+ /**
+ * Charges a card and publishes fine-grained progress events back to its parent workflow's stream.
+ */
+ @ActivityMethod
+ String chargeCard(String orderId);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java
new file mode 100644
index 00000000..9c3ec43e
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/PaymentActivitiesImpl.java
@@ -0,0 +1,35 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.ProgressEvent;
+import io.temporal.workflowstreams.TopicHandle;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamClientOptions;
+import java.time.Duration;
+
+public class PaymentActivitiesImpl implements PaymentActivities {
+
+ /**
+ * {@code WorkflowStreamClient.fromActivity()} reads the parent workflow id and the Temporal
+ * client from the activity context, so this activity can push events back without any wiring.
+ * Closing the client flushes any buffered items before the activity returns.
+ */
+ @Override
+ public String chargeCard(String orderId) {
+ WorkflowStreamClientOptions options =
+ WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(200)).build();
+ try (WorkflowStreamClient client = WorkflowStreamClient.fromActivity(options)) {
+ TopicHandle progress = client.topic(Shared.TOPIC_PROGRESS);
+ progress.publish(new ProgressEvent("charging card..."));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ progress.publish(new ProgressEvent("card charged"));
+ }
+ return "charge-" + orderId;
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java
new file mode 100644
index 00000000..28fce920
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflow.java
@@ -0,0 +1,15 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.PipelineInput;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+/**
+ * Scenario 2: publishes a sequence of stage events with delays between them, giving a subscriber
+ * time to disconnect and reconnect mid-stream.
+ */
+@WorkflowInterface
+public interface PipelineWorkflow {
+ @WorkflowMethod
+ String runPipeline(PipelineInput input);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java
new file mode 100644
index 00000000..b1419012
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/PipelineWorkflowImpl.java
@@ -0,0 +1,45 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.PipelineInput;
+import io.temporal.samples.workflowstreams.Shared.StageEvent;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInit;
+import io.temporal.workflowstreams.WorkflowStream;
+import io.temporal.workflowstreams.WorkflowTopicHandle;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+public class PipelineWorkflowImpl implements PipelineWorkflow {
+
+ private final WorkflowTopicHandle status;
+
+ @WorkflowInit
+ public PipelineWorkflowImpl(PipelineInput input) {
+ WorkflowStream stream = WorkflowStream.newInstance(input.streamState);
+ status = stream.topic(Shared.TOPIC_STATUS);
+ }
+
+ @Override
+ public String runPipeline(PipelineInput input) {
+ List stages =
+ Arrays.asList(
+ "validating",
+ "loading data",
+ "transforming",
+ "writing output",
+ "verifying",
+ "complete");
+ for (String stage : stages) {
+ status.publish(new StageEvent(stage));
+ if (!stage.equals("complete")) {
+ Workflow.sleep(Duration.ofSeconds(2));
+ }
+ }
+
+ // The "complete" stage above is the in-band terminator subscribers break on. Hold the
+ // run open briefly so the final poll delivers it.
+ Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY);
+ return "pipeline " + input.pipelineId + " done";
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java b/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java
new file mode 100644
index 00000000..f070c99d
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/Publisher.java
@@ -0,0 +1,64 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.client.WorkflowStub;
+import io.temporal.samples.workflowstreams.Shared.OrderInput;
+import io.temporal.samples.workflowstreams.Shared.ProgressEvent;
+import io.temporal.samples.workflowstreams.Shared.StatusEvent;
+import io.temporal.workflowstreams.SubscribeOptions;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamSubscription;
+import java.util.UUID;
+
+/**
+ * Scenario 1: basic publish/subscribe. Start an order workflow that publishes status events itself
+ * and runs an activity that publishes progress events to the same stream, then subscribe to both
+ * topics until the order completes.
+ */
+public class Publisher {
+
+ public static void main(String[] args) {
+ WorkflowClient client = Shared.newWorkflowClient();
+
+ String workflowId = "workflow-streams-order-" + UUID.randomUUID();
+ OrderWorkflow workflow =
+ client.newWorkflowStub(
+ OrderWorkflow.class,
+ WorkflowOptions.newBuilder()
+ .setWorkflowId(workflowId)
+ .setTaskQueue(Shared.TASK_QUEUE)
+ .build());
+ WorkflowClient.start(workflow::processOrder, new OrderInput("order-42"));
+ System.out.println("Started workflow: " + workflowId);
+
+ // Single subscription over both topics. The loop ends on the in-band "complete"
+ // terminator (break) or because the subscription exhausts when the workflow reaches a
+ // terminal state without one (e.g. on failure). Either way we then fetch the workflow
+ // result, which throws if the workflow failed.
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription =
+ stream.subscribe(
+ SubscribeOptions.newBuilder()
+ .setTopics(Shared.TOPIC_STATUS, Shared.TOPIC_PROGRESS)
+ .build())) {
+ for (WorkflowStreamItem item : subscription) {
+ if (item.getTopic().equals(Shared.TOPIC_STATUS)) {
+ StatusEvent evt = Shared.decode(item, StatusEvent.class);
+ System.out.printf("[status] %s: order=%s%n", evt.kind, evt.orderId);
+ if (evt.kind.equals("complete")) {
+ break;
+ }
+ } else if (item.getTopic().equals(Shared.TOPIC_PROGRESS)) {
+ ProgressEvent evt = Shared.decode(item, ProgressEvent.class);
+ System.out.printf("[progress] %s%n", evt.message);
+ }
+ }
+ }
+
+ String result = WorkflowStub.fromTyped(workflow).getResult(String.class);
+ System.out.println("workflow result: " + result);
+ System.exit(0);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/README.md b/core/src/main/java/io/temporal/samples/workflowstreams/README.md
new file mode 100644
index 00000000..517c5356
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/README.md
@@ -0,0 +1,154 @@
+# Workflow Streams
+
+A **workflow stream** is a durable publish/subscribe log hosted inside a Temporal
+workflow, provided by the experimental `io.temporal:temporal-workflowstreams`
+contrib module. External code (activities, starters, other workflows) publishes
+messages to named topics via **signals**; subscribers long-poll for new items via
+**updates**; a **query** exposes the current offset. Because it is backed by
+Temporal's durable execution, delivery is ordered, durable, and exactly-once, with
+client-side batching, publisher dedup, continue-as-new survival, and truncation.
+
+This sample mirrors the
+[Go](https://github.com/temporalio/samples-go/tree/main/workflowstreams) and
+[Python](https://github.com/temporalio/samples-python/tree/main/workflow_streams)
+workflow streams samples. It contains five scenarios.
+
+> **Note:** This sample currently depends on the unreleased
+> `temporal-workflowstreams` module and a reserved-name carve-out in the SDK
+> itself, so the repo's `javaSDKVersion` is pinned to `1.36.0-SNAPSHOT` resolved
+> from your local Maven repository. Publish the SDK locally first from a sibling
+> `../sdk-java` checkout on the `workflow-streams` branch:
+>
+> ```
+> cd ../sdk-java && ./gradlew publishToMavenLocal
+> ```
+>
+> Drop the `-SNAPSHOT` pin once a tagged SDK release ships.
+
+### Key APIs
+
+Workflow side — construct a stream once in a `@WorkflowInit` constructor and publish to topics:
+
+```java
+@WorkflowInit
+public MyWorkflowImpl(MyInput input) {
+ stream = WorkflowStream.newInstance(input.streamState);
+}
+
+stream.topic("status").publish(new StatusEvent("received"));
+```
+
+Client side (activities, starters, external code) — publish and subscribe:
+
+```java
+try (WorkflowStreamClient client = WorkflowStreamClient.newInstance(workflowClient, workflowId);
+ WorkflowStreamSubscription subscription = client.topic("status").subscribe(0)) {
+ for (WorkflowStreamItem item : subscription) {
+ StatusEvent evt =
+ DefaultDataConverter.STANDARD_INSTANCE.fromPayload(
+ item.getPayload(), StatusEvent.class, StatusEvent.class);
+ }
+}
+```
+
+Offsets are **global** across topics. To resume a subscription from where a
+previous one left off, pass `subscribe` an offset one past the last item you
+consumed:
+
+```java
+try (WorkflowStreamClient client = WorkflowStreamClient.newInstance(workflowClient, workflowId);
+ WorkflowStreamSubscription subscription =
+ client.topic("status").subscribe(lastItem.getOffset() + 1)) { // zero starts from the beginning
+ for (WorkflowStreamItem item : subscription) {
+ // ...
+ }
+}
+```
+
+### Steps to run this sample
+
+1) Run a [Temporal service](https://github.com/temporalio/samples-java/tree/main/#How-to-use)
+ (for example, `temporal server start-dev`).
+
+2) Start the worker (serves scenarios 1–4):
+
+```
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.StreamsWorker
+```
+
+3) Run any of the scenarios below in a separate terminal.
+
+#### Scenario 1 — basic publish/subscribe
+
+An order workflow publishes status events itself while an activity publishes
+fine-grained progress events to the same stream. A subscriber consumes both topics.
+
+```
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.Publisher
+```
+
+Expected output (interleaving may vary):
+
+```
+[status] received: order=order-42
+[progress] charging card...
+[progress] card charged
+[status] shipped: order=order-42
+[progress] charge id: charge-order-42
+[status] complete: order=order-42
+workflow result: charge-order-42
+```
+
+#### Scenario 2 — reconnecting subscriber
+
+A subscriber reads a few pipeline stage events, disconnects, then a brand-new
+client resumes from the saved offset without missing events or seeing duplicates.
+
+```
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.ReconnectingSubscriber
+```
+
+#### Scenario 3 — external publisher
+
+The hub workflow does no work of its own; it just hosts the stream. A separate
+publisher pushes news into it (using the same client factory used to subscribe) and
+then signals the workflow to close. Here a publisher and subscriber run concurrently.
+
+```
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.ExternalPublisher
+```
+
+#### Scenario 4 — truncating ticker
+
+The ticker workflow periodically truncates old entries to bound its history, trading
+complete history for a bounded log. A *fast* subscriber that reads from the start keeps
+up and sees every tick. A *late* subscriber joins after truncation and resumes from a
+stale offset; the stream fast-forwards it to the current base offset, so it cannot see
+the truncated ticks.
+
+```
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.TruncatingTicker
+```
+
+Expected output (the late subscriber's first line shows the fast-forward):
+
+```
+[late] requested offset 1 but it was truncated; fast-forwarded to offset 5 (skipped 4 tick(s))
+[late] offset= 5 n=5
+...
+```
+
+#### Scenario 5 — LLM token streaming
+
+The workflow hosts the stream while an activity makes a streaming OpenAI call and
+republishes each token delta. On a retry it emits a retry event and the subscriber
+rewinds the terminal and re-renders. This scenario runs on its own worker and task
+queue, and requires `OPENAI_API_KEY`.
+
+```
+# Terminal A
+OPENAI_API_KEY=sk-... ./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.LlmWorker
+
+# Terminal B
+./gradlew -q execute -PmainClass=io.temporal.samples.workflowstreams.Llm -Pargs="'Explain durable execution in one sentence.'"
+```
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java b/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java
new file mode 100644
index 00000000..f6556374
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/ReconnectingSubscriber.java
@@ -0,0 +1,72 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.samples.workflowstreams.Shared.PipelineInput;
+import io.temporal.samples.workflowstreams.Shared.StageEvent;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamSubscription;
+import java.util.UUID;
+
+/**
+ * Scenario 2: reconnecting subscriber. A subscriber reads a few events, drops its connection, then
+ * a brand-new client resumes from the saved offset without missing events or seeing duplicates —
+ * because the events are durable in workflow history, not just held in memory.
+ */
+public class ReconnectingSubscriber {
+
+ /** How many events the first subscriber reads before disconnecting. */
+ private static final int PHASE_1_EVENTS = 2;
+
+ public static void main(String[] args) {
+ WorkflowClient client = Shared.newWorkflowClient();
+
+ String workflowId = "workflow-streams-pipeline-" + UUID.randomUUID();
+ PipelineWorkflow workflow =
+ client.newWorkflowStub(
+ PipelineWorkflow.class,
+ WorkflowOptions.newBuilder()
+ .setWorkflowId(workflowId)
+ .setTaskQueue(Shared.TASK_QUEUE)
+ .build());
+ WorkflowClient.start(workflow::runPipeline, new PipelineInput("pipeline-7"));
+ System.out.println("Started workflow: " + workflowId);
+
+ // next is the offset to resume from: one past the last item we consumed.
+ long next = 0;
+
+ // Phase 1: connect, read a couple of events, remember our position, disconnect.
+ System.out.println("--- phase 1: initial subscriber ---");
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription = stream.topic(Shared.TOPIC_STATUS).subscribe(0)) {
+ int seen = 0;
+ for (WorkflowStreamItem item : subscription) {
+ StageEvent evt = Shared.decode(item, StageEvent.class);
+ next = item.getOffset() + 1;
+ System.out.printf("offset=%d stage=%s%n", item.getOffset(), evt.stage);
+ seen++;
+ if (seen >= PHASE_1_EVENTS) {
+ break;
+ }
+ }
+ }
+
+ System.out.printf("--- disconnected; will resume from offset %d ---%n", next);
+
+ // Phase 2: a new client resumes from the saved offset until the pipeline completes.
+ System.out.println("--- phase 2: reconnected subscriber ---");
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription =
+ stream.topic(Shared.TOPIC_STATUS).subscribe(next)) {
+ for (WorkflowStreamItem item : subscription) {
+ StageEvent evt = Shared.decode(item, StageEvent.class);
+ System.out.printf("offset=%d stage=%s%n", item.getOffset(), evt.stage);
+ if (evt.stage.equals("complete")) {
+ break;
+ }
+ }
+ }
+ System.exit(0);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java b/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java
new file mode 100644
index 00000000..4f4f5ec8
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/Shared.java
@@ -0,0 +1,226 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.common.converter.DefaultDataConverter;
+import io.temporal.envconfig.ClientConfigProfile;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamState;
+import java.io.IOException;
+
+/**
+ * Constants and shared types for the workflow streams sample. A workflow stream is a durable
+ * publish/subscribe log hosted inside a Temporal workflow: external code publishes to named topics
+ * via signals, subscribers long-poll for new items via updates, and a query exposes the current
+ * offset. See the experimental {@code io.temporal:temporal-workflowstreams} module.
+ */
+public final class Shared {
+
+ /**
+ * Task queues. The LLM scenario runs on its own queue so its OpenAI dependency and API key
+ * requirement stay isolated from the other workflows.
+ */
+ public static final String TASK_QUEUE = "workflow-streams";
+
+ public static final String LLM_TASK_QUEUE = "workflow-streams-llm";
+
+ /** Topic names used across the scenarios. */
+ public static final String TOPIC_STATUS = "status";
+
+ public static final String TOPIC_PROGRESS = "progress";
+ public static final String TOPIC_NEWS = "news";
+ public static final String TOPIC_TICK = "tick";
+ public static final String TOPIC_DELTA = "delta";
+ public static final String TOPIC_COMPLETE = "complete";
+ public static final String TOPIC_RETRY = "retry";
+
+ /** Creates a WorkflowClient from environment configuration. */
+ public static WorkflowClient newWorkflowClient() {
+ ClientConfigProfile profile;
+ try {
+ profile = ClientConfigProfile.load();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to load client configuration", e);
+ }
+ WorkflowServiceStubs service =
+ WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
+ return WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
+ }
+
+ /** Decodes a subscribed item's payload with the default data converter. */
+ public static T decode(WorkflowStreamItem item, Class type) {
+ return DefaultDataConverter.STANDARD_INSTANCE.fromPayload(item.getPayload(), type, type);
+ }
+
+ // Each workflow input carries an optional WorkflowStreamState so the stream can survive
+ // continue-as-new: thread the prior run's state back in and pass it to
+ // WorkflowStream.newInstance. It is null on a fresh start.
+
+ /** Input to OrderWorkflow (scenario 1). */
+ public static class OrderInput {
+ public String orderId;
+ public WorkflowStreamState streamState;
+
+ public OrderInput() {}
+
+ public OrderInput(String orderId) {
+ this.orderId = orderId;
+ }
+ }
+
+ /** Input to PipelineWorkflow (scenario 2). */
+ public static class PipelineInput {
+ public String pipelineId;
+ public WorkflowStreamState streamState;
+
+ public PipelineInput() {}
+
+ public PipelineInput(String pipelineId) {
+ this.pipelineId = pipelineId;
+ }
+ }
+
+ /** Input to HubWorkflow (scenario 3). */
+ public static class HubInput {
+ public String hubId;
+ public WorkflowStreamState streamState;
+
+ public HubInput() {}
+
+ public HubInput(String hubId) {
+ this.hubId = hubId;
+ }
+ }
+
+ /**
+ * Input to TickerWorkflow (scenario 4). Zero-valued fields fall back to the defaults applied in
+ * the workflow.
+ */
+ public static class TickerInput {
+ public int count;
+ public int keepLast;
+ public int truncateEvery;
+ public long intervalMs;
+ public WorkflowStreamState streamState;
+
+ public TickerInput() {}
+
+ public TickerInput(int count, int keepLast, int truncateEvery) {
+ this.count = count;
+ this.keepLast = keepLast;
+ this.truncateEvery = truncateEvery;
+ }
+ }
+
+ /** Input to LlmWorkflow (scenario 5). */
+ public static class LlmInput {
+ public String prompt;
+ public String model;
+ public WorkflowStreamState streamState;
+
+ public LlmInput() {}
+
+ public LlmInput(String prompt, String model) {
+ this.prompt = prompt;
+ this.model = model;
+ }
+ }
+
+ // Event types published to the stream. They are JSON-encoded by the default data converter on
+ // the way in and decoded by subscribers on the way out.
+
+ /** Reports an order's lifecycle stage on TOPIC_STATUS. */
+ public static class StatusEvent {
+ public String kind;
+ public String orderId;
+
+ public StatusEvent() {}
+
+ public StatusEvent(String kind, String orderId) {
+ this.kind = kind;
+ this.orderId = orderId;
+ }
+ }
+
+ /** Reports fine-grained progress on TOPIC_PROGRESS. */
+ public static class ProgressEvent {
+ public String message;
+
+ public ProgressEvent() {}
+
+ public ProgressEvent(String message) {
+ this.message = message;
+ }
+ }
+
+ /** Reports a pipeline stage on TOPIC_STATUS. */
+ public static class StageEvent {
+ public String stage;
+
+ public StageEvent() {}
+
+ public StageEvent(String stage) {
+ this.stage = stage;
+ }
+ }
+
+ /** Published by an external publisher on TOPIC_NEWS. */
+ public static class NewsEvent {
+ public String headline;
+
+ public NewsEvent() {}
+
+ public NewsEvent(String headline) {
+ this.headline = headline;
+ }
+ }
+
+ /** Published by the ticker on TOPIC_TICK. */
+ public static class TickEvent {
+ public int n;
+
+ public TickEvent() {}
+
+ public TickEvent(int n) {
+ this.n = n;
+ }
+ }
+
+ /** A single streamed token chunk on TOPIC_DELTA. */
+ public static class TextDelta {
+ public String text;
+
+ public TextDelta() {}
+
+ public TextDelta(String text) {
+ this.text = text;
+ }
+ }
+
+ /** The final accumulated completion on TOPIC_COMPLETE. */
+ public static class TextComplete {
+ public String fullText;
+
+ public TextComplete() {}
+
+ public TextComplete(String fullText) {
+ this.fullText = fullText;
+ }
+ }
+
+ /**
+ * Signals that the streaming activity is on a retry attempt, so subscribers can reset any
+ * partially rendered output.
+ */
+ public static class RetryEvent {
+ public int attempt;
+
+ public RetryEvent() {}
+
+ public RetryEvent(int attempt) {
+ this.attempt = attempt;
+ }
+ }
+
+ private Shared() {}
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java b/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java
new file mode 100644
index 00000000..37c8c2a3
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/StreamsWorker.java
@@ -0,0 +1,25 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.Worker;
+import io.temporal.worker.WorkerFactory;
+
+/** Worker serving scenarios 1-4. */
+public class StreamsWorker {
+
+ public static void main(String[] args) {
+ WorkflowClient client = Shared.newWorkflowClient();
+ WorkerFactory factory = WorkerFactory.newInstance(client);
+
+ Worker worker = factory.newWorker(Shared.TASK_QUEUE);
+ worker.registerWorkflowImplementationTypes(
+ OrderWorkflowImpl.class,
+ PipelineWorkflowImpl.class,
+ HubWorkflowImpl.class,
+ TickerWorkflowImpl.class);
+ worker.registerActivitiesImplementations(new PaymentActivitiesImpl());
+
+ factory.start();
+ System.out.println("Worker started for task queue: " + Shared.TASK_QUEUE);
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java
new file mode 100644
index 00000000..1cdae00c
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflow.java
@@ -0,0 +1,16 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.TickerInput;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+/**
+ * Scenario 4: publishes a long run of tick events and bounds the log by periodically truncating
+ * everything but the most recent entries. Fast subscribers see every tick; subscribers that fall
+ * behind the truncation point silently jump forward to the new base offset.
+ */
+@WorkflowInterface
+public interface TickerWorkflow {
+ @WorkflowMethod
+ String tick(TickerInput input);
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java
new file mode 100644
index 00000000..ee99475b
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/TickerWorkflowImpl.java
@@ -0,0 +1,48 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.samples.workflowstreams.Shared.TickEvent;
+import io.temporal.samples.workflowstreams.Shared.TickerInput;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInit;
+import io.temporal.workflowstreams.WorkflowStream;
+import io.temporal.workflowstreams.WorkflowTopicHandle;
+import java.time.Duration;
+
+public class TickerWorkflowImpl implements TickerWorkflow {
+
+ private final WorkflowStream stream;
+ private final WorkflowTopicHandle tick;
+
+ @WorkflowInit
+ public TickerWorkflowImpl(TickerInput input) {
+ stream = WorkflowStream.newInstance(input.streamState);
+ tick = stream.topic(Shared.TOPIC_TICK);
+ }
+
+ @Override
+ public String tick(TickerInput input) {
+ int count = input.count != 0 ? input.count : 50;
+ int keepLast = input.keepLast != 0 ? input.keepLast : 10;
+ int truncateEvery = input.truncateEvery != 0 ? input.truncateEvery : 5;
+ long intervalMs = input.intervalMs != 0 ? input.intervalMs : 200;
+
+ int published = 0;
+ for (int n = 0; n < count; n++) {
+ tick.publish(new TickEvent(n));
+ published++;
+ Workflow.sleep(Duration.ofMillis(intervalMs));
+
+ if (published % truncateEvery == 0 && published > keepLast) {
+ // Drop everything except the last keepLast entries. Future polls positioned
+ // before the new base offset are fast-forwarded.
+ stream.truncate(published - keepLast);
+ }
+ }
+
+ // The final tick (n == count - 1) is the in-band terminator subscribers break on.
+ // keepLast guarantees that final offset survives the last truncation so even slow
+ // consumers eventually see it. Hold the run open briefly so the final poll delivers it.
+ Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY);
+ return "ticker emitted " + published + " events";
+ }
+}
diff --git a/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java b/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java
new file mode 100644
index 00000000..378742f5
--- /dev/null
+++ b/core/src/main/java/io/temporal/samples/workflowstreams/TruncatingTicker.java
@@ -0,0 +1,119 @@
+package io.temporal.samples.workflowstreams;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.samples.workflowstreams.Shared.TickEvent;
+import io.temporal.samples.workflowstreams.Shared.TickerInput;
+import io.temporal.workflowstreams.WorkflowStreamClient;
+import io.temporal.workflowstreams.WorkflowStreamItem;
+import io.temporal.workflowstreams.WorkflowStreamSubscription;
+import java.util.UUID;
+
+/**
+ * Scenario 4: bounded log via truncation. The ticker workflow periodically truncates old entries to
+ * bound its history, trading complete history for a bounded log. A "fast" subscriber that reads
+ * from the start keeps up and sees every tick. A "late" subscriber that joins after truncation and
+ * resumes from a stale offset is fast-forwarded to the current base offset — it cannot see the
+ * truncated ticks.
+ */
+public class TruncatingTicker {
+
+ private static final int TICK_COUNT = 30;
+
+ /**
+ * keepLast bounds the workflow's log to its most recent entries; truncateEvery controls how often
+ * it truncates. They are deliberately small so the early offsets are dropped quickly.
+ */
+ private static final int KEEP_LAST = 5;
+
+ private static final int TRUNCATE_EVERY = 5;
+
+ /**
+ * An early offset the late subscriber deliberately resumes from. By the time it subscribes, the
+ * workflow has truncated past it.
+ */
+ private static final long STALE_OFFSET = 1;
+
+ public static void main(String[] args) throws InterruptedException {
+ WorkflowClient client = Shared.newWorkflowClient();
+
+ String workflowId = "workflow-streams-ticker-" + UUID.randomUUID();
+ TickerWorkflow workflow =
+ client.newWorkflowStub(
+ TickerWorkflow.class,
+ WorkflowOptions.newBuilder()
+ .setWorkflowId(workflowId)
+ .setTaskQueue(Shared.TASK_QUEUE)
+ .build());
+ WorkflowClient.start(workflow::tick, new TickerInput(TICK_COUNT, KEEP_LAST, TRUNCATE_EVERY));
+ System.out.println("Started workflow: " + workflowId);
+
+ int lastN = TICK_COUNT - 1;
+
+ Thread fast = new Thread(() -> fastSubscriber(client, workflowId, lastN));
+ Thread late = new Thread(() -> lateSubscriber(client, workflowId, lastN));
+ fast.start();
+ late.start();
+ fast.join();
+ late.join();
+ System.exit(0);
+ }
+
+ /** Reads from the beginning and keeps up with every tick. */
+ private static void fastSubscriber(WorkflowClient client, String workflowId, int lastN) {
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
+ WorkflowStreamSubscription subscription = stream.topic(Shared.TOPIC_TICK).subscribe(0)) {
+ for (WorkflowStreamItem item : subscription) {
+ TickEvent evt = Shared.decode(item, TickEvent.class);
+ System.out.printf("[fast] offset=%3d n=%d%n", item.getOffset(), evt.n);
+ if (evt.n == lastN) {
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Waits until the workflow has truncated past STALE_OFFSET, then resumes from that
+ * (now-truncated) offset. The stream fast-forwards it to the current base offset, so its first
+ * item necessarily skips the truncated ticks.
+ */
+ private static void lateSubscriber(WorkflowClient client, String workflowId, int lastN) {
+ try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId)) {
+ // The first truncation only fires once published reaches the first multiple of
+ // TRUNCATE_EVERY greater than KEEP_LAST; after it, the base offset is
+ // published - KEEP_LAST. Waiting until the head passes that point guarantees the
+ // base has advanced beyond STALE_OFFSET.
+ int firstTruncate = ((KEEP_LAST / TRUNCATE_EVERY) + 1) * TRUNCATE_EVERY;
+ while (stream.getOffset() <= firstTruncate) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ try (WorkflowStreamSubscription subscription =
+ stream.topic(Shared.TOPIC_TICK).subscribe(STALE_OFFSET)) {
+ boolean first = true;
+ for (WorkflowStreamItem item : subscription) {
+ TickEvent evt = Shared.decode(item, TickEvent.class);
+ if (first) {
+ if (item.getOffset() > STALE_OFFSET) {
+ System.out.printf(
+ "[late] requested offset %d but it was truncated; fast-forwarded to offset %d"
+ + " (skipped %d tick(s))%n",
+ STALE_OFFSET, item.getOffset(), item.getOffset() - STALE_OFFSET);
+ }
+ first = false;
+ }
+ System.out.printf("[late] offset=%3d n=%d%n", item.getOffset(), evt.n);
+ if (evt.n == lastN) {
+ return;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java b/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java
new file mode 100644
index 00000000..21289e69
--- /dev/null
+++ b/core/src/test/java/io/temporal/samples/workflowstreams/WorkflowStreamsSampleTest.java
@@ -0,0 +1,60 @@
+package io.temporal.samples.workflowstreams;
+
+import static org.junit.Assert.assertEquals;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowOptions;
+import io.temporal.client.WorkflowStub;
+import io.temporal.samples.workflowstreams.Shared.HubInput;
+import io.temporal.samples.workflowstreams.Shared.OrderInput;
+import io.temporal.samples.workflowstreams.Shared.PipelineInput;
+import io.temporal.testing.TestWorkflowRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Workflow-side tests. The client subscribe path needs a live Temporal service, so it is exercised
+ * by running the scenarios against a dev server (see README).
+ */
+public class WorkflowStreamsSampleTest {
+
+ @Rule
+ public TestWorkflowRule testWorkflowRule =
+ TestWorkflowRule.newBuilder()
+ .setWorkflowTypes(
+ OrderWorkflowImpl.class, PipelineWorkflowImpl.class, HubWorkflowImpl.class)
+ .setActivityImplementations(new PaymentActivitiesImpl())
+ .build();
+
+ private T newWorkflowStub(Class type) {
+ return testWorkflowRule
+ .getTestEnvironment()
+ .getWorkflowClient()
+ .newWorkflowStub(
+ type,
+ WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build());
+ }
+
+ @Test
+ public void testOrderWorkflow() {
+ OrderWorkflow workflow = newWorkflowStub(OrderWorkflow.class);
+ String result = workflow.processOrder(new OrderInput("order-42"));
+ assertEquals("charge-order-42", result);
+ }
+
+ @Test
+ public void testPipelineWorkflow() {
+ PipelineWorkflow workflow = newWorkflowStub(PipelineWorkflow.class);
+ String result = workflow.runPipeline(new PipelineInput("p1"));
+ assertEquals("pipeline p1 done", result);
+ }
+
+ @Test
+ public void testHubWorkflow() {
+ HubWorkflow workflow = newWorkflowStub(HubWorkflow.class);
+ WorkflowClient.start(workflow::host, new HubInput("newsroom"));
+ workflow.close();
+ String result = WorkflowStub.fromTyped(workflow).getResult(String.class);
+ assertEquals("hub newsroom closed", result);
+ }
+}