Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions temporalio/bridge/proto/nexus/nexus_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions temporalio/bridge/proto/nexus/nexus_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class NexusTask(google.protobuf.message.Message):
TASK_FIELD_NUMBER: builtins.int
CANCEL_TASK_FIELD_NUMBER: builtins.int
REQUEST_DEADLINE_FIELD_NUMBER: builtins.int
ENDPOINT_FIELD_NUMBER: builtins.int
@property
def task(
self,
Expand All @@ -265,13 +266,18 @@ class NexusTask(google.protobuf.message.Message):
Only set when variant is `task` and the header was present with a valid value.
Represented as an absolute timestamp.
"""
endpoint: builtins.str
"""The endpoint this request was addressed to. Extracted from the request for convenient access.
Only set when variant is `task`.
"""
def __init__(
self,
*,
task: temporalio.api.workflowservice.v1.request_response_pb2.PollNexusTaskQueueResponse
| None = ...,
cancel_task: global___CancelNexusTask | None = ...,
request_deadline: google.protobuf.timestamp_pb2.Timestamp | None = ...,
endpoint: builtins.str = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -291,6 +297,8 @@ class NexusTask(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"cancel_task",
b"cancel_task",
"endpoint",
b"endpoint",
"request_deadline",
b"request_deadline",
"task",
Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 72 files
+0 −0 .github/CODEOWNERS
+36 −0 .github/workflows/per-pr.yml
+1 −1 ARCHITECTURE.md
+3 −0 crates/client/Cargo.toml
+38 −1 crates/client/README.md
+410 −0 crates/client/src/envconfig.rs
+3 −0 crates/client/src/lib.rs
+106 −0 crates/client/src/proxy.rs
+3 −0 crates/common/protos/local/temporal/sdk/core/nexus/nexus.proto
+4 −4 crates/common/src/protos/mod.rs
+96 −7 crates/sdk-core-c-bridge/src/worker.rs
+72 −0 crates/sdk-core/src/core_tests/queries.rs
+9 −0 crates/sdk-core/src/worker/nexus.rs
+7 −6 crates/sdk-core/src/worker/workflow/managed_run.rs
+48 −4 crates/sdk-core/src/worker/workflow/mod.rs
+74 −0 crates/sdk-core/tests/integ_tests/metrics_tests.rs
+2 −3 crates/sdk-core/tests/integ_tests/update_tests.rs
+139 −1 crates/sdk/Cargo.toml
+18 −18 crates/sdk/README.md
+46 −0 crates/sdk/examples/README.md
+22 −0 crates/sdk/examples/activity_heartbeating/README.md
+33 −0 crates/sdk/examples/activity_heartbeating/starter.rs
+33 −0 crates/sdk/examples/activity_heartbeating/worker.rs
+61 −0 crates/sdk/examples/activity_heartbeating/workflows.rs
+22 −0 crates/sdk/examples/cancellation/README.md
+37 −0 crates/sdk/examples/cancellation/starter.rs
+32 −0 crates/sdk/examples/cancellation/worker.rs
+79 −0 crates/sdk/examples/cancellation/workflows.rs
+22 −0 crates/sdk/examples/child_workflows/README.md
+36 −0 crates/sdk/examples/child_workflows/starter.rs
+33 −0 crates/sdk/examples/child_workflows/worker.rs
+48 −0 crates/sdk/examples/child_workflows/workflows.rs
+24 −0 crates/sdk/examples/continue_as_new/README.md
+32 −0 crates/sdk/examples/continue_as_new/starter.rs
+33 −0 crates/sdk/examples/continue_as_new/worker.rs
+35 −0 crates/sdk/examples/continue_as_new/workflows.rs
+22 −0 crates/sdk/examples/hello_world/README.md
+32 −0 crates/sdk/examples/hello_world/starter.rs
+33 −0 crates/sdk/examples/hello_world/worker.rs
+40 −0 crates/sdk/examples/hello_world/workflows.rs
+25 −0 crates/sdk/examples/local_activities/README.md
+33 −0 crates/sdk/examples/local_activities/starter.rs
+33 −0 crates/sdk/examples/local_activities/worker.rs
+56 −0 crates/sdk/examples/local_activities/workflows.rs
+26 −0 crates/sdk/examples/message_passing/README.md
+60 −0 crates/sdk/examples/message_passing/starter.rs
+32 −0 crates/sdk/examples/message_passing/worker.rs
+48 −0 crates/sdk/examples/message_passing/workflows.rs
+24 −0 crates/sdk/examples/polling/README.md
+32 −0 crates/sdk/examples/polling/starter.rs
+33 −0 crates/sdk/examples/polling/worker.rs
+54 −0 crates/sdk/examples/polling/workflows.rs
+24 −0 crates/sdk/examples/saga/README.md
+36 −0 crates/sdk/examples/saga/starter.rs
+32 −0 crates/sdk/examples/saga/worker.rs
+175 −0 crates/sdk/examples/saga/workflows.rs
+22 −0 crates/sdk/examples/schedules/README.md
+55 −0 crates/sdk/examples/schedules/starter.rs
+32 −0 crates/sdk/examples/schedules/worker.rs
+40 −0 crates/sdk/examples/schedules/workflows.rs
+34 −0 crates/sdk/examples/search_attributes/README.md
+42 −0 crates/sdk/examples/search_attributes/starter.rs
+33 −0 crates/sdk/examples/search_attributes/worker.rs
+36 −0 crates/sdk/examples/search_attributes/workflows.rs
+26 −0 crates/sdk/examples/timer_examples/README.md
+32 −0 crates/sdk/examples/timer_examples/starter.rs
+33 −0 crates/sdk/examples/timer_examples/worker.rs
+57 −0 crates/sdk/examples/timer_examples/workflows.rs
+22 −0 crates/sdk/examples/updatable_timer/README.md
+63 −0 crates/sdk/examples/updatable_timer/starter.rs
+32 −0 crates/sdk/examples/updatable_timer/worker.rs
+62 −0 crates/sdk/examples/updatable_timer/workflows.rs
3 changes: 3 additions & 0 deletions temporalio/nexus/_operation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class Info:
Retrieved inside a Nexus operation handler via :py:func:`info`.
"""

endpoint: str
"""The endpoint this Nexus request was addressed to."""

namespace: str
"""The namespace of the worker handling this Nexus operation."""

Expand Down
23 changes: 20 additions & 3 deletions temporalio/worker/_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ async def raise_from_exception_queue() -> NoReturn:
headers=dict(task.request.header),
task_cancellation=task_cancellation,
request_deadline=request_deadline,
endpoint=nexus_task.endpoint,
)
)
self._running_tasks[task.task_token] = _RunningNexusTask(
Expand All @@ -154,6 +155,7 @@ async def raise_from_exception_queue() -> NoReturn:
headers=dict(task.request.header),
task_cancellation=task_cancellation,
request_deadline=request_deadline,
endpoint=nexus_task.endpoint,
)
)
self._running_tasks[task.task_token] = _RunningNexusTask(
Expand Down Expand Up @@ -224,6 +226,7 @@ async def _handle_cancel_operation_task(
headers: Mapping[str, str],
task_cancellation: nexusrpc.handler.OperationTaskCancellation,
request_deadline: datetime | None,
endpoint: str,
) -> None:
"""Handle a cancel operation task.

Expand All @@ -244,7 +247,11 @@ async def _handle_cancel_operation_task(
request_deadline=request_deadline,
)
temporalio.nexus._operation_context._TemporalCancelOperationContext(
info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue),
info=lambda: Info(
endpoint=endpoint,
namespace=self._namespace,
task_queue=self._task_queue,
),
nexus_context=ctx,
client=self._client,
_runtime_metric_meter=self._metric_meter,
Expand Down Expand Up @@ -293,6 +300,7 @@ async def _handle_start_operation_task(
headers: Mapping[str, str],
task_cancellation: nexusrpc.handler.OperationTaskCancellation,
request_deadline: datetime | None,
endpoint: str,
) -> None:
"""Handle a start operation task.

Expand All @@ -302,7 +310,11 @@ async def _handle_start_operation_task(
try:
try:
start_response = await self._start_operation(
start_request, headers, task_cancellation, request_deadline
start_request,
headers,
task_cancellation,
request_deadline,
endpoint,
)
except asyncio.CancelledError:
completion = temporalio.bridge.proto.nexus.NexusTaskCompletion(
Expand Down Expand Up @@ -346,6 +358,7 @@ async def _start_operation(
headers: Mapping[str, str],
cancellation: nexusrpc.handler.OperationTaskCancellation,
request_deadline: datetime | None,
endpoint: str,
) -> temporalio.api.nexus.v1.StartOperationResponse:
"""Invoke the Nexus handler's start_operation method and construct the StartOperationResponse.

Expand Down Expand Up @@ -375,7 +388,11 @@ async def _start_operation(
temporalio.nexus._operation_context._TemporalStartOperationContext(
nexus_context=ctx,
client=self._client,
info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue),
info=lambda: Info(
endpoint=endpoint,
namespace=self._namespace,
task_queue=self._task_queue,
),
_runtime_metric_meter=self._metric_meter,
_worker_shutdown_event=self._worker_shutdown_event,
).set()
Expand Down
9 changes: 8 additions & 1 deletion tests/nexus/test_workflow_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,11 @@ async def get_info(
self, _ctx: StartOperationContext, _input: None
) -> dict[str, str]:
info = nexus.info()
return {"namespace": info.namespace, "task_queue": info.task_queue}
return {
"endpoint": info.endpoint,
"namespace": info.namespace,
"task_queue": info.task_queue,
}


@workflow.defn
Expand Down Expand Up @@ -733,6 +737,9 @@ async def test_nexus_info_includes_namespace(client: Client, env: WorkflowEnviro
id=str(uuid.uuid4()),
task_queue=task_queue,
)
if not env.supports_time_skipping:
# Time-skipping server doesn't send the endpoint yet.
assert result["endpoint"] == endpoint_name
assert result["namespace"] == client.namespace
assert result["task_queue"] == task_queue

Expand Down
Loading