diff --git a/src/google/adk/agents/remote_a2a_agent.py b/src/google/adk/agents/remote_a2a_agent.py index 495a715d76..b702f8c715 100644 --- a/src/google/adk/agents/remote_a2a_agent.py +++ b/src/google/adk/agents/remote_a2a_agent.py @@ -815,6 +815,88 @@ async def _run_live_impl( # This makes the function into an async generator but the yield is still unreachable yield + # Task states that represent in-progress or input-awaiting work. + # Events stamped with one of these states carry intermediate or + # waiting-for-input content, never the final answer, so they must + # not be promoted to the workflow node's output. + _NON_FINAL_TASK_STATES = frozenset( + {"submitted", "working", "input-required", "auth-required", "unknown"} + ) + + async def _run_impl( + self, + *, + ctx: Any, + node_input: Any, + ) -> AsyncGenerator[Any, None]: + """Runs the agent as a workflow node. + + Promotes textual response content to ``event.output`` so the + workflow scheduler propagates it downstream. Without this, a + ``JoinNode`` that aggregates parallel ``RemoteA2aAgent`` predecessors + sees ``None`` for each predecessor because ``BaseAgent._run_impl`` + never sets ``event.output`` and ``RemoteA2aAgent`` carries its + response only in ``event.content``. + + A node may produce at most one output (``Context.output`` raises + ``ValueError`` on a second assignment), so promotion is gated to + the first terminal A2A event of the run. Non-final task states and + later events are passed through untouched. + """ + promoted = False + async for event in super()._run_impl(ctx=ctx, node_input=node_input): + if not promoted and self._promote_response_to_output(event): + promoted = True + yield event + + def _promote_response_to_output(self, event: Event) -> bool: + """Sets ``event.output`` from non-thought text parts, if eligible. + + Returns True iff this call assigned ``event.output``. Skips: + + * partial events and events whose ``event.output`` is already set; + * events not authored by this agent; + * events whose content carries only thoughts, function calls, or + function responses (e.g. ``input_required`` mock function calls); + * events whose A2A task state is non-final (``submitted``, + ``working``, ``input-required``, ``auth-required``, ``unknown``). + Streaming converters do not always mark ``working`` text as + ``thought=True``, so the task-state check guards against + promoting an intermediate streaming chunk and then raising on the + true final event. + """ + if event.partial or event.output is not None: + return False + if event.author != self.name: + return False + if not event.content or not event.content.parts: + return False + + response_meta = (event.custom_metadata or {}).get( + A2A_METADATA_PREFIX + "response" + ) + if isinstance(response_meta, dict): + status = response_meta.get("status") + if ( + isinstance(status, dict) + and status.get("state") in self._NON_FINAL_TASK_STATES + ): + return False + + text_chunks = [ + part.text + for part in event.content.parts + if part.text + and not part.thought + and not part.function_call + and not part.function_response + ] + if not text_chunks: + return False + event.output = "".join(text_chunks) + event.node_info.message_as_output = True + return True + async def cleanup(self) -> None: """Clean up resources, especially the HTTP client if owned by this agent.""" if self._httpx_client_needs_cleanup and self._httpx_client: diff --git a/tests/unittests/agents/test_remote_a2a_agent.py b/tests/unittests/agents/test_remote_a2a_agent.py index 073ad36d9f..ac783d040e 100644 --- a/tests/unittests/agents/test_remote_a2a_agent.py +++ b/tests/unittests/agents/test_remote_a2a_agent.py @@ -2943,3 +2943,314 @@ def test_deepcopy_config(self): copied_config.request_interceptors[0] is not config.request_interceptors[0] ) + + +class TestRemoteA2aAgentWorkflowOutput: + """Tests that RemoteA2aAgent surfaces a workflow-node output value. + + Without ``_promote_response_to_output``, a ``RemoteA2aAgent`` used as + a Workflow node leaves ``ctx.output`` as None, which causes + downstream JoinNode aggregation to record ``None`` for that + predecessor. + """ + + def _make_agent(self) -> RemoteA2aAgent: + return RemoteA2aAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + def test_promotes_text_content_to_output(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="Findings: ok")], + ), + ) + + assert agent._promote_response_to_output(event) is True + assert event.output == "Findings: ok" + assert event.node_info.message_as_output is True + + def test_joins_multiple_text_parts(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part(text="line1\n"), + genai_types.Part(text="line2"), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output == "line1\nline2" + + def test_skips_thought_parts(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part(text="streaming update", thought=True), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + assert event.node_info.message_as_output is None + + def test_skips_function_call_parts(self): + """input-required events carry a mock function call and no text.""" + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part( + function_call=genai_types.FunctionCall( + id="fc1", + name="mock_function_call_for_required_user_input", + args={"input_required": "Please confirm"}, + ) + ), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_skips_partial_events(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + partial=True, + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="streaming...")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_skips_events_not_authored_by_agent(self): + agent = self._make_agent() + event = Event( + author="some_other_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="Not mine")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_preserves_existing_output(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + output="preset", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="text")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output == "preset" + + def test_no_content_no_output(self): + agent = self._make_agent() + event = Event(author="remote_agent") + + assert agent._promote_response_to_output(event) is False + assert event.output is None + + def _make_text_event( + self, text: str = "reply", task_state: str | None = None + ) -> Event: + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text=text)], + ), + ) + if task_state is not None: + event.custom_metadata = { + A2A_METADATA_PREFIX + "response": {"status": {"state": task_state}} + } + return event + + @pytest.mark.parametrize( + "state", + [ + "submitted", + "working", + "input-required", + "auth-required", + "unknown", + ], + ) + def test_skips_non_final_task_states(self, state): + """Streaming converters may leave non-final text un-thoughted. + + The task-state check on ``custom_metadata['a2a:response']`` is the + guard that prevents ``ctx.output`` from being overwritten by an + intermediate event and then raising on the real final event. + """ + agent = self._make_agent() + event = self._make_text_event(text="in-progress chunk", task_state=state) + + assert agent._promote_response_to_output(event) is False + assert event.output is None + + @pytest.mark.parametrize( + "state", + ["completed", "failed", "canceled", "rejected"], + ) + def test_promotes_terminal_task_states(self, state): + agent = self._make_agent() + event = self._make_text_event(text="final answer", task_state=state) + + assert agent._promote_response_to_output(event) is True + assert event.output == "final answer" + + def test_promotes_when_response_metadata_absent(self): + """Non-Task A2A responses (plain Message) carry no task status.""" + agent = self._make_agent() + event = self._make_text_event(text="message reply") + + assert agent._promote_response_to_output(event) is True + assert event.output == "message reply" + + @pytest.mark.asyncio + async def test_run_impl_promotes_only_first_terminal_event(self): + """Guards against ``ValueError: Output already set``. + + When the v2 converter path emits a ``working`` text event followed + by a ``completed`` text event, the first must be passed through + untouched and only the terminal event promoted. After that, any + further promotable event must also be left alone. + """ + + working = self._make_text_event( + text="thinking out loud", task_state="working" + ) + completed = self._make_text_event( + text="final answer", task_state="completed" + ) + trailing = self._make_text_event( + text="ignored trailing artifact", task_state="completed" + ) + + class _StubRemoteAgent(RemoteA2aAgent): + + async def _run_async_impl(self, ctx): + yield working + yield completed + yield trailing + + agent = _StubRemoteAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + from google.adk.apps.app import App + from google.adk.workflow._join_node import JoinNode + from google.adk.workflow._workflow import Workflow + + from tests.unittests import testing_utils + + workflow = Workflow( + name="wf", + edges=[("START", agent, JoinNode(name="join"))], + ) + app_instance = App(name="t", root_agent=workflow) + runner = testing_utils.InMemoryRunner(app=app_instance) + + events = await runner.run_async(testing_utils.get_user_content("start")) + + # No "Output already set" raised, and the JoinNode aggregates the + # terminal event's text — not the working intermediate, not the + # trailing artifact. + join_outputs = [ + e + for e in events + if isinstance(e, Event) + and e.output is not None + and "join" in (e.node_info.path or "") + ] + assert join_outputs + assert join_outputs[0].output == {"remote_agent": "final answer"} + + assert working.output is None + assert completed.output == "final answer" + assert trailing.output is None + + @pytest.mark.asyncio + async def test_run_impl_promotes_output_for_each_event(self): + """``_run_impl`` calls ``_promote_response_to_output`` per event. + + Uses a subclass that overrides ``_run_async_impl`` to yield a + deterministic event, then drives ``_run_impl`` through the public + workflow node entry point. + """ + + yielded_event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="agent reply")], + ), + ) + + class _StubRemoteAgent(RemoteA2aAgent): + + async def _run_async_impl(self, ctx): + yield yielded_event + + agent = _StubRemoteAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + from google.adk.apps.app import App + from google.adk.workflow._join_node import JoinNode + from google.adk.workflow._workflow import Workflow + + from tests.unittests import testing_utils + + workflow = Workflow( + name="wf", + edges=[("START", agent, JoinNode(name="join"))], + ) + app_instance = App(name="t", root_agent=workflow) + runner = testing_utils.InMemoryRunner(app=app_instance) + events = await runner.run_async(testing_utils.get_user_content("start")) + + join_outputs = [ + e + for e in events + if isinstance(e, Event) + and e.output is not None + and "join" in (e.node_info.path or "") + ] + assert join_outputs, "JoinNode should emit an aggregated output event" + assert join_outputs[0].output == {"remote_agent": "agent reply"}