Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
33a2f15
Add temporalio.contrib.pubsub — reusable pub/sub for workflows
jssmith Apr 6, 2026
e2712e2
Fix PubSubState CAN serialization and simplify subscribe error handling
jssmith Apr 6, 2026
17952ae
Polish pub/sub contrib: README, flush safety, init guard, factory method
jssmith Apr 6, 2026
d1dfce7
Add init guards to poll/query handlers and fix README CAN example
jssmith Apr 6, 2026
f20ba36
Guard validator against missing init_pubsub, fix PubSubState docstring
jssmith Apr 6, 2026
70bf747
Guard get_pubsub_state/drain_pubsub, add replay and max_batch_size tests
jssmith Apr 6, 2026
70898d0
Add review comments and design addenda for pubsub redesign
jssmith Apr 7, 2026
5ff7e27
Implement pubsub redesign: dedup, base_offset, flush safety, API cleanup
jssmith Apr 7, 2026
6fbb168
TLA+-verified dedup rewrite, TTL pruning, truncation, API improvements
jssmith Apr 7, 2026
42b0df1
Remove TLA+ proof references from implementation code
jssmith Apr 7, 2026
c87a65a
Update uv.lock
jssmith Apr 7, 2026
d5a23c3
Add signal vs update dedup analysis; clarify ordering guarantees
jssmith Apr 7, 2026
3089b12
Add end-to-end dedup analysis: proper layering for three duplicate types
jssmith Apr 7, 2026
f06a53e
Expand DESIGN-v2 with offset model rationale and BFF/SSE reconnection…
jssmith Apr 7, 2026
990a6a7
pubsub: use base64 wire format with native bytes API
jssmith Apr 7, 2026
f2c6e55
pubsub: remove poll timeout and update design doc
jssmith Apr 8, 2026
a9abc20
Add token-level streaming to OpenAI and ADK Temporal plugins
jssmith Apr 8, 2026
20dafc0
pubsub: replace PubSubState Pydantic model with plain dataclass
jssmith Apr 9, 2026
5a8716c
pubsub: add per-item offsets to PubSubItem and _WireItem
jssmith Apr 10, 2026
eda55d5
pubsub: add design addendum for per-item offsets
jssmith Apr 10, 2026
7bc830a
pubsub: fix truncated offset crash and add recovery
jssmith Apr 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 106 additions & 8 deletions temporalio/contrib/google_adk_agents/_model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import json
import logging
from collections.abc import AsyncGenerator
from datetime import timedelta
from datetime import datetime, timedelta, timezone

from google.adk.models import BaseLlm, LLMRegistry
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse

from temporalio import activity, workflow
from temporalio.contrib.pubsub import PubSubClient
from temporalio.workflow import ActivityConfig

logger = logging.getLogger(__name__)

EVENTS_TOPIC = "events"


def _make_event(event_type: str, **data: object) -> bytes:
return json.dumps(
{
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}
).encode()


@activity.defn
async def invoke_model(llm_request: LlmRequest) -> list[LlmResponse]:
Expand Down Expand Up @@ -35,20 +52,93 @@ async def invoke_model(llm_request: LlmRequest) -> list[LlmResponse]:
]


@activity.defn
async def invoke_model_streaming(llm_request: LlmRequest) -> list[LlmResponse]:
"""Streaming-aware model activity.

Calls the LLM with stream=True, publishes TEXT_DELTA events via
PubSubClient as tokens arrive, and returns the collected responses.

The PubSubClient auto-detects the activity context to find the parent
workflow for publishing.

Args:
llm_request: The LLM request containing model name and parameters.

Returns:
List of LLM responses from the model.
"""
if llm_request.model is None:
raise ValueError("No model name provided, could not create LLM.")

llm = LLMRegistry.new_llm(llm_request.model)
if not llm:
raise ValueError(f"Failed to create LLM for model: {llm_request.model}")

pubsub = PubSubClient.create(batch_interval=0.1)
responses: list[LlmResponse] = []
text_buffer = ""

async with pubsub:
pubsub.publish(EVENTS_TOPIC, _make_event("LLM_CALL_START"), priority=True)

async for response in llm.generate_content_async(
llm_request=llm_request, stream=True
):
activity.heartbeat()
responses.append(response)

if response.content and response.content.parts:
for part in response.content.parts:
if part.text:
text_buffer += part.text
pubsub.publish(
EVENTS_TOPIC,
_make_event("TEXT_DELTA", delta=part.text),
)
if part.function_call:
pubsub.publish(
EVENTS_TOPIC,
_make_event(
"TOOL_CALL_START",
tool_name=part.function_call.name,
),
)

if text_buffer:
pubsub.publish(
EVENTS_TOPIC,
_make_event("TEXT_COMPLETE", text=text_buffer),
priority=True,
)
pubsub.publish(
EVENTS_TOPIC, _make_event("LLM_CALL_COMPLETE"), priority=True
)

return responses


class TemporalModel(BaseLlm):
"""A Temporal-based LLM model that executes model invocations as activities."""

def __init__(
self, model_name: str, activity_config: ActivityConfig | None = None
self,
model_name: str,
activity_config: ActivityConfig | None = None,
streaming: bool = False,
) -> None:
"""Initialize the TemporalModel.

Args:
model_name: The name of the model to use.
activity_config: Configuration options for the activity execution.
streaming: When True, the model activity uses the streaming LLM
endpoint and publishes token events via PubSubClient. The
workflow is unaffected -- it still receives complete responses.
"""
super().__init__(model=model_name)
self._model_name = model_name
self._streaming = streaming
self._activity_config = ActivityConfig(
start_to_close_timeout=timedelta(seconds=60)
)
Expand All @@ -62,15 +152,23 @@ async def generate_content_async(

Args:
llm_request: The LLM request containing model parameters and content.
stream: Whether to stream the response (currently ignored).
stream: Whether to stream the response (currently ignored; use the
``streaming`` constructor parameter instead).

Yields:
The responses from the model.
"""
responses = await workflow.execute_activity(
invoke_model,
args=[llm_request],
**self._activity_config,
)
if self._streaming:
responses = await workflow.execute_activity(
invoke_model_streaming,
args=[llm_request],
**self._activity_config,
)
else:
responses = await workflow.execute_activity(
invoke_model,
args=[llm_request],
**self._activity_config,
)
for response in responses:
yield response
7 changes: 5 additions & 2 deletions temporalio/contrib/google_adk_agents/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from temporalio import workflow
from temporalio.contrib.google_adk_agents._mcp import TemporalMcpToolSetProvider
from temporalio.contrib.google_adk_agents._model import invoke_model
from temporalio.contrib.google_adk_agents._model import (
invoke_model,
invoke_model_streaming,
)
from temporalio.contrib.pydantic import (
PydanticPayloadConverter as _DefaultPydanticPayloadConverter,
)
Expand Down Expand Up @@ -94,7 +97,7 @@ def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
)
return runner

new_activities = [invoke_model]
new_activities = [invoke_model, invoke_model_streaming]
if toolset_providers is not None:
for toolset_provider in toolset_providers:
new_activities.extend(toolset_provider._get_activities())
Expand Down
Loading