diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.py b/temporalio/bridge/proto/nexus/nexus_pb2.py index 2a1d8b786..d932c3571 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.py +++ b/temporalio/bridge/proto/nexus/nexus_pb2.py @@ -34,7 +34,7 @@ ) DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xee\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorB\x02\x18\x01H\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x12\x33\n\x07\x66\x61ilure\x18\x05 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' + b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xee\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorB\x02\x18\x01H\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x12\x33\n\x07\x66\x61ilure\x18\x05 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xe2\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x10\n\x08\x65ndpoint\x18\x04 \x01(\tB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' ) _NEXUSTASKCANCELREASON = DESCRIPTOR.enum_types_by_name["NexusTaskCancelReason"] @@ -108,16 +108,16 @@ ) _NEXUSTASKCOMPLETION.fields_by_name["error"]._options = None _NEXUSTASKCOMPLETION.fields_by_name["error"]._serialized_options = b"\030\001" - _NEXUSTASKCANCELREASON._serialized_start = 1092 - _NEXUSTASKCANCELREASON._serialized_end = 1151 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1153 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1280 + _NEXUSTASKCANCELREASON._serialized_start = 1110 + _NEXUSTASKCANCELREASON._serialized_end = 1169 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1171 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1298 _NEXUSOPERATIONRESULT._serialized_start = 297 _NEXUSOPERATIONRESULT._serialized_end = 545 _NEXUSTASKCOMPLETION._serialized_start = 548 _NEXUSTASKCOMPLETION._serialized_end = 786 _NEXUSTASK._serialized_start = 789 - _NEXUSTASK._serialized_end = 997 - _CANCELNEXUSTASK._serialized_start = 999 - _CANCELNEXUSTASK._serialized_end = 1090 + _NEXUSTASK._serialized_end = 1015 + _CANCELNEXUSTASK._serialized_start = 1017 + _CANCELNEXUSTASK._serialized_end = 1108 # @@protoc_insertion_point(module_scope) diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.pyi b/temporalio/bridge/proto/nexus/nexus_pb2.pyi index 94f390595..3cfafac26 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.pyi +++ b/temporalio/bridge/proto/nexus/nexus_pb2.pyi @@ -240,6 +240,7 @@ class NexusTask(google.protobuf.message.Message): TASK_FIELD_NUMBER: builtins.int CANCEL_TASK_FIELD_NUMBER: builtins.int REQUEST_DEADLINE_FIELD_NUMBER: builtins.int + ENDPOINT_FIELD_NUMBER: builtins.int @property def task( self, @@ -265,6 +266,10 @@ class NexusTask(google.protobuf.message.Message): Only set when variant is `task` and the header was present with a valid value. Represented as an absolute timestamp. """ + endpoint: builtins.str + """The endpoint this request was addressed to. Extracted from the request for convenient access. + Only set when variant is `task`. + """ def __init__( self, *, @@ -272,6 +277,7 @@ class NexusTask(google.protobuf.message.Message): | None = ..., cancel_task: global___CancelNexusTask | None = ..., request_deadline: google.protobuf.timestamp_pb2.Timestamp | None = ..., + endpoint: builtins.str = ..., ) -> None: ... def HasField( self, @@ -291,6 +297,8 @@ class NexusTask(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "cancel_task", b"cancel_task", + "endpoint", + b"endpoint", "request_deadline", b"request_deadline", "task", diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 71a5caa57..b544f95da 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 71a5caa57118848bd60843dd7fa867ed73704108 +Subproject commit b544f95da46b21e8a642229b8d7f1b017c88e84e diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index ae310f070..04462c900 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -79,6 +79,9 @@ class Info: Retrieved inside a Nexus operation handler via :py:func:`info`. """ + endpoint: str + """The endpoint this Nexus request was addressed to.""" + namespace: str """The namespace of the worker handling this Nexus operation.""" diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index d324a0c4c..a189278b8 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -140,6 +140,7 @@ async def raise_from_exception_queue() -> NoReturn: headers=dict(task.request.header), task_cancellation=task_cancellation, request_deadline=request_deadline, + endpoint=nexus_task.endpoint, ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -154,6 +155,7 @@ async def raise_from_exception_queue() -> NoReturn: headers=dict(task.request.header), task_cancellation=task_cancellation, request_deadline=request_deadline, + endpoint=nexus_task.endpoint, ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -224,6 +226,7 @@ async def _handle_cancel_operation_task( headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, request_deadline: datetime | None, + endpoint: str, ) -> None: """Handle a cancel operation task. @@ -244,7 +247,11 @@ async def _handle_cancel_operation_task( request_deadline=request_deadline, ) temporalio.nexus._operation_context._TemporalCancelOperationContext( - info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue), + info=lambda: Info( + endpoint=endpoint, + namespace=self._namespace, + task_queue=self._task_queue, + ), nexus_context=ctx, client=self._client, _runtime_metric_meter=self._metric_meter, @@ -293,6 +300,7 @@ async def _handle_start_operation_task( headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, request_deadline: datetime | None, + endpoint: str, ) -> None: """Handle a start operation task. @@ -302,7 +310,11 @@ async def _handle_start_operation_task( try: try: start_response = await self._start_operation( - start_request, headers, task_cancellation, request_deadline + start_request, + headers, + task_cancellation, + request_deadline, + endpoint, ) except asyncio.CancelledError: completion = temporalio.bridge.proto.nexus.NexusTaskCompletion( @@ -346,6 +358,7 @@ async def _start_operation( headers: Mapping[str, str], cancellation: nexusrpc.handler.OperationTaskCancellation, request_deadline: datetime | None, + endpoint: str, ) -> temporalio.api.nexus.v1.StartOperationResponse: """Invoke the Nexus handler's start_operation method and construct the StartOperationResponse. @@ -375,7 +388,11 @@ async def _start_operation( temporalio.nexus._operation_context._TemporalStartOperationContext( nexus_context=ctx, client=self._client, - info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue), + info=lambda: Info( + endpoint=endpoint, + namespace=self._namespace, + task_queue=self._task_queue, + ), _runtime_metric_meter=self._metric_meter, _worker_shutdown_event=self._worker_shutdown_event, ).set() diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index 2b9699089..18cfb40c0 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -703,7 +703,11 @@ async def get_info( self, _ctx: StartOperationContext, _input: None ) -> dict[str, str]: info = nexus.info() - return {"namespace": info.namespace, "task_queue": info.task_queue} + return { + "endpoint": info.endpoint, + "namespace": info.namespace, + "task_queue": info.task_queue, + } @workflow.defn @@ -733,6 +737,9 @@ async def test_nexus_info_includes_namespace(client: Client, env: WorkflowEnviro id=str(uuid.uuid4()), task_queue=task_queue, ) + if not env.supports_time_skipping: + # Time-skipping server doesn't send the endpoint yet. + assert result["endpoint"] == endpoint_name assert result["namespace"] == client.namespace assert result["task_queue"] == task_queue