Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@
# For each one, we add the owning team, as well as
# @temporalio/sdk, so the SDK team can continue to
# manage repo-wide concerns
/springai/ @temporalio/ai-sdk @temporalio/sdk
/springai/ @temporalio/sdk @temporalio/ai-sdk
/core/src/main/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk
/core/src/test/java/io/temporal/samples/workflowstreams/ @temporalio/sdk @temporalio/ai-sdk
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ Load client configuration from TOML files with programmatic overrides.

#### API demonstrations

- [**Workflow Streams**](/core/src/main/java/io/temporal/samples/workflowstreams): Demonstrates a durable publish/subscribe log hosted inside a workflow, using the experimental `temporal-workflowstreams` module.

- [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed.

- [**Updatable Timer**](/core/src/main/java/io/temporal/samples/updatabletimer): Demonstrates the use of a helper class which relies on `Workflow.await` to implement a blocking sleep that can be updated at any moment.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ subprojects {
ext {
otelVersion = '1.30.1'
otelVersionAlpha = "${otelVersion}-alpha"
javaSDKVersion = '1.35.0'
javaSDKVersion = '1.36.0-SNAPSHOT'
camelVersion = '3.22.1'
jarVersion = '1.0.0'
}
Expand Down
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// Temporal SDK
implementation "io.temporal:temporal-sdk:$javaSDKVersion"
implementation "io.temporal:temporal-opentracing:$javaSDKVersion"
implementation "io.temporal:temporal-workflowstreams:$javaSDKVersion"
testImplementation("io.temporal:temporal-testing:$javaSDKVersion")

// Environment configuration
Expand All @@ -27,6 +28,7 @@ dependencies {
implementation 'io.jaegertracing:jaeger-client:1.8.1'

// Used in samples
implementation "com.openai:openai-java:4.39.1"
implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10'
implementation group: 'io.cloudevents', name: 'cloudevents-core', version: '4.0.1'
implementation group: 'io.cloudevents', name: 'cloudevents-api', version: '4.0.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.temporal.samples.workflowstreams;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.workflowstreams.Shared.HubInput;
import io.temporal.samples.workflowstreams.Shared.NewsEvent;
import io.temporal.workflowstreams.TopicHandle;
import io.temporal.workflowstreams.WorkflowStreamClient;
import io.temporal.workflowstreams.WorkflowStreamItem;
import io.temporal.workflowstreams.WorkflowStreamSubscription;
import java.util.UUID;

/**
* Scenario 3: external (non-activity) publisher. The hub workflow does no work of its own; it just
* hosts the stream. A separate process publishes news into it using the same client factory used to
* subscribe, then signals the workflow to close. Here the publisher and a subscriber run as two
* threads.
*/
public class ExternalPublisher {

private static final String[] HEADLINES = {
"markets open higher", "new bridge opens downtown", "local team wins championship",
};

/** The sentinel the publisher sends last so the subscriber knows to stop. */
private static final String DONE_HEADLINE = "-- end of feed --";

public static void main(String[] args) throws InterruptedException {
WorkflowClient client = Shared.newWorkflowClient();

String workflowId = "workflow-streams-hub-" + UUID.randomUUID();
HubWorkflow workflow =
client.newWorkflowStub(
HubWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(Shared.TASK_QUEUE)
.build());
WorkflowClient.start(workflow::host, new HubInput("newsroom"));
System.out.println("Started workflow: " + workflowId);

Thread subscriber =
new Thread(
() -> {
try (WorkflowStreamClient stream =
WorkflowStreamClient.newInstance(client, workflowId);
WorkflowStreamSubscription subscription =
stream.topic(Shared.TOPIC_NEWS).subscribe(0)) {
for (WorkflowStreamItem item : subscription) {
NewsEvent evt = Shared.decode(item, NewsEvent.class);
if (evt.headline.equals(DONE_HEADLINE)) {
return;
}
System.out.printf("[subscriber] %s%n", evt.headline);
}
}
});

Thread publisher =
new Thread(
() -> {
try (WorkflowStreamClient producer =
WorkflowStreamClient.newInstance(client, workflowId)) {
TopicHandle news = producer.topic(Shared.TOPIC_NEWS);
for (String headline : HEADLINES) {
news.publish(new NewsEvent(headline));
System.out.printf("[publisher] sent: %s%n", headline);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// Force-flush the sentinel and wait for the server to confirm delivery
// before signaling the workflow to close.
news.publish(new NewsEvent(DONE_HEADLINE), /* forceFlush */ true);
producer.flush();
}
workflow.close();
System.out.println("[publisher] signaled close");
});

subscriber.start();
publisher.start();
subscriber.join();
publisher.join();
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.temporal.samples.workflowstreams;

import io.temporal.samples.workflowstreams.Shared.HubInput;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

/**
* Scenario 3: does no work of its own; it exists only to host the stream for an external publisher
* and shuts down on a close signal.
*/
@WorkflowInterface
public interface HubWorkflow {
@WorkflowMethod
String host(HubInput input);

@SignalMethod
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.temporal.samples.workflowstreams;

import io.temporal.samples.workflowstreams.Shared.HubInput;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInit;
import io.temporal.workflowstreams.WorkflowStream;

public class HubWorkflowImpl implements HubWorkflow {

private boolean closed;

@WorkflowInit
public HubWorkflowImpl(HubInput input) {
WorkflowStream.newInstance(input.streamState);
}

@Override
public String host(HubInput input) {
Workflow.await(() -> closed);

// The publisher publishes its own terminator into the stream before signaling close
// (see ExternalPublisher). Hold the run open briefly so subscribers' final poll
// delivers any items still in the log.
Workflow.sleep(OrderWorkflowImpl.DRAIN_DELAY);
return "hub " + input.hubId + " closed";
}

@Override
public void close() {
closed = true;
}
}
71 changes: 71 additions & 0 deletions core/src/main/java/io/temporal/samples/workflowstreams/Llm.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.temporal.samples.workflowstreams;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.workflowstreams.Shared.LlmInput;
import io.temporal.samples.workflowstreams.Shared.RetryEvent;
import io.temporal.samples.workflowstreams.Shared.TextDelta;
import io.temporal.workflowstreams.SubscribeOptions;
import io.temporal.workflowstreams.WorkflowStreamClient;
import io.temporal.workflowstreams.WorkflowStreamItem;
import io.temporal.workflowstreams.WorkflowStreamSubscription;
import java.util.UUID;

/**
* Scenario 5: LLM token streaming. The workflow hosts the stream while an activity makes the
* streaming OpenAI call and republishes each token delta. On a retry the activity emits a
* RetryEvent and this subscriber rewinds the terminal and re-renders. Run {@link LlmWorker} with
* {@code OPENAI_API_KEY} set before running this.
*/
public class Llm {

/**
* ANSI escapes to save the cursor position and to restore it while clearing everything below, so
* a retry can re-render the completion from scratch.
*/
private static final String ANSI_SAVE = "\u001b[s";

private static final String ANSI_RESTORE_AND_CLEAR = "\u001b[u\u001b[J";

public static void main(String[] args) {
String prompt = args.length > 0 ? args[0] : "In one short paragraph, explain what Temporal is.";

WorkflowClient client = Shared.newWorkflowClient();

String workflowId = "workflow-streams-llm-" + UUID.randomUUID();
LlmWorkflow workflow =
client.newWorkflowStub(
LlmWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(Shared.LLM_TASK_QUEUE)
.build());
WorkflowClient.start(workflow::complete, new LlmInput(prompt, null));
System.out.println("Started workflow: " + workflowId);

try (WorkflowStreamClient stream = WorkflowStreamClient.newInstance(client, workflowId);
WorkflowStreamSubscription subscription =
stream.subscribe(
SubscribeOptions.newBuilder()
.setTopics(Shared.TOPIC_DELTA, Shared.TOPIC_RETRY, Shared.TOPIC_COMPLETE)
.build())) {
System.out.print(ANSI_SAVE);
for (WorkflowStreamItem item : subscription) {
if (item.getTopic().equals(Shared.TOPIC_RETRY)) {
RetryEvent evt = Shared.decode(item, RetryEvent.class);
System.out.print(ANSI_RESTORE_AND_CLEAR);
System.out.printf("[retry attempt %d] resetting output%n%n", evt.attempt);
System.out.print(ANSI_SAVE);
} else if (item.getTopic().equals(Shared.TOPIC_DELTA)) {
TextDelta evt = Shared.decode(item, TextDelta.class);
System.out.print(evt.text);
System.out.flush();
} else if (item.getTopic().equals(Shared.TOPIC_COMPLETE)) {
System.out.println();
break;
}
}
}
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.temporal.samples.workflowstreams;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.samples.workflowstreams.Shared.LlmInput;

@ActivityInterface
public interface LlmActivities {
/**
* Streams an LLM completion to the parent workflow's stream and returns the accumulated full
* text.
*/
@ActivityMethod
String streamCompletion(LlmInput input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.temporal.samples.workflowstreams;

import com.openai.client.OpenAIClient;
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.core.http.StreamResponse;
import com.openai.models.chat.completions.ChatCompletionChunk;
import com.openai.models.chat.completions.ChatCompletionCreateParams;
import io.temporal.activity.Activity;
import io.temporal.samples.workflowstreams.Shared.LlmInput;
import io.temporal.samples.workflowstreams.Shared.RetryEvent;
import io.temporal.samples.workflowstreams.Shared.TextComplete;
import io.temporal.samples.workflowstreams.Shared.TextDelta;
import io.temporal.workflowstreams.TopicHandle;
import io.temporal.workflowstreams.WorkflowStreamClient;
import io.temporal.workflowstreams.WorkflowStreamClientOptions;
import java.time.Duration;

/**
* Calls OpenAI with streaming enabled and republishes each token delta to the workflow stream. The
* accumulated text is published on the complete topic and returned as the activity result. Because
* the activity owns the non-deterministic OpenAI call, the workflow stays deterministic.
*
* <p>Retries are disabled on the OpenAI client so transient failures surface as Temporal activity
* retries instead. On a retry (attempt &gt; 1) it publishes a RetryEvent so subscribers can reset
* partially rendered output.
*/
public class LlmActivitiesImpl implements LlmActivities {

static final String DEFAULT_MODEL = "gpt-4o-mini";

@Override
public String streamCompletion(LlmInput input) {
WorkflowStreamClientOptions options =
WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(200)).build();
try (WorkflowStreamClient streamClient = WorkflowStreamClient.fromActivity(options)) {
TopicHandle deltas = streamClient.topic(Shared.TOPIC_DELTA);
TopicHandle complete = streamClient.topic(Shared.TOPIC_COMPLETE);
TopicHandle retry = streamClient.topic(Shared.TOPIC_RETRY);

int attempt = Activity.getExecutionContext().getInfo().getAttempt();
if (attempt > 1) {
retry.publish(new RetryEvent(attempt), /* forceFlush */ true);
}

String model = input.model != null && !input.model.isEmpty() ? input.model : DEFAULT_MODEL;

// Reads OPENAI_API_KEY from the environment.
OpenAIClient openai = OpenAIOkHttpClient.builder().fromEnv().maxRetries(0).build();
ChatCompletionCreateParams params =
ChatCompletionCreateParams.builder().model(model).addUserMessage(input.prompt).build();

StringBuilder full = new StringBuilder();
try (StreamResponse<ChatCompletionChunk> stream =
openai.chat().completions().createStreaming(params)) {
stream.stream()
.forEach(
chunk ->
chunk.choices().stream()
.findFirst()
.flatMap(choice -> choice.delta().content())
.filter(text -> !text.isEmpty())
.ifPresent(
text -> {
deltas.publish(new TextDelta(text));
full.append(text);
}));
}

String fullText = full.toString();
complete.publish(new TextComplete(fullText), /* forceFlush */ true);
return fullText;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.temporal.samples.workflowstreams;

import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;

/**
* Worker for the LLM-streaming scenario. Runs separately from {@link StreamsWorker} so the OpenAI
* dependency and the {@code OPENAI_API_KEY} requirement stay isolated to this one scenario.
* Different task queue too — the other four scenarios won't route work to this worker.
*
* <p>Kill this worker mid-stream while {@link Llm} is running (and restart it) to trigger a retry:
* Temporal restarts the activity, the activity publishes a RetryEvent on its second attempt, and
* the consumer resets its rendered output.
*/
public class LlmWorker {

public static void main(String[] args) {
WorkflowClient client = Shared.newWorkflowClient();
WorkerFactory factory = WorkerFactory.newInstance(client);

Worker worker = factory.newWorker(Shared.LLM_TASK_QUEUE);
worker.registerWorkflowImplementationTypes(LlmWorkflowImpl.class);
worker.registerActivitiesImplementations(new LlmActivitiesImpl());

factory.start();
System.out.println("LLM worker started for task queue: " + Shared.LLM_TASK_QUEUE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.temporal.samples.workflowstreams;

import io.temporal.samples.workflowstreams.Shared.LlmInput;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

/**
* Scenario 5: hosts the stream while a streaming activity owns the non-deterministic OpenAI call
* and publishes token deltas back to subscribers.
*/
@WorkflowInterface
public interface LlmWorkflow {
@WorkflowMethod
String complete(LlmInput input);
}
Loading
Loading