From a428889c3c11cc28b382e221f475c7bd2f81ea87 Mon Sep 17 00:00:00 2001 From: Harineko0 Date: Tue, 26 May 2026 15:10:26 +0900 Subject: [PATCH 1/2] fix(a2a): Promote RemoteA2aAgent response to workflow node output RemoteA2aAgent inherits BaseAgent._run_impl, which never sets event.output or message_as_output, so NodeRunner leaves ctx.output as None for A2A agent nodes. When a JoinNode aggregates parallel RemoteA2aAgent predecessors, every value in the joined dict comes back as None. Override _run_impl on RemoteA2aAgent to mirror LlmAgent: join the non-thought, non-function-call/response text parts of each yielded event into event.output and set message_as_output=True. Partial, foreign-author, and input-required (mock function call) events are skipped. --- src/google/adk/agents/remote_a2a_agent.py | 46 +++++ .../unittests/agents/test_remote_a2a_agent.py | 191 ++++++++++++++++++ 2 files changed, 237 insertions(+) diff --git a/src/google/adk/agents/remote_a2a_agent.py b/src/google/adk/agents/remote_a2a_agent.py index 495a715d76..6d34cb4302 100644 --- a/src/google/adk/agents/remote_a2a_agent.py +++ b/src/google/adk/agents/remote_a2a_agent.py @@ -815,6 +815,52 @@ async def _run_live_impl( # This makes the function into an async generator but the yield is still unreachable yield + async def _run_impl( + self, + *, + ctx: Any, + node_input: Any, + ) -> AsyncGenerator[Any, None]: + """Runs the agent as a workflow node. + + Promotes textual response content to ``event.output`` so the + workflow scheduler propagates it downstream. Without this, a + ``JoinNode`` that aggregates parallel ``RemoteA2aAgent`` predecessors + sees ``None`` for each predecessor because ``BaseAgent._run_impl`` + never sets ``event.output`` and ``RemoteA2aAgent`` carries its + response only in ``event.content``. + """ + async for event in super()._run_impl(ctx=ctx, node_input=node_input): + self._promote_response_to_output(event) + yield event + + def _promote_response_to_output(self, event: Event) -> None: + """Sets ``event.output`` from non-thought text parts, if any. + + Skips partial events, events not authored by this agent, and events + whose content carries only thoughts, function calls, or function + responses (e.g. input-required mock function calls). + """ + if event.partial or event.output is not None: + return + if event.author != self.name: + return + if not event.content or not event.content.parts: + return + + text_chunks = [ + part.text + for part in event.content.parts + if part.text + and not part.thought + and not part.function_call + and not part.function_response + ] + if not text_chunks: + return + event.output = "".join(text_chunks) + event.node_info.message_as_output = True + async def cleanup(self) -> None: """Clean up resources, especially the HTTP client if owned by this agent.""" if self._httpx_client_needs_cleanup and self._httpx_client: diff --git a/tests/unittests/agents/test_remote_a2a_agent.py b/tests/unittests/agents/test_remote_a2a_agent.py index 073ad36d9f..60a64cc400 100644 --- a/tests/unittests/agents/test_remote_a2a_agent.py +++ b/tests/unittests/agents/test_remote_a2a_agent.py @@ -2943,3 +2943,194 @@ def test_deepcopy_config(self): copied_config.request_interceptors[0] is not config.request_interceptors[0] ) + + +class TestRemoteA2aAgentWorkflowOutput: + """Tests that RemoteA2aAgent surfaces a workflow-node output value. + + Without ``_promote_response_to_output``, a ``RemoteA2aAgent`` used as + a Workflow node leaves ``ctx.output`` as None, which causes + downstream JoinNode aggregation to record ``None`` for that + predecessor. + """ + + def _make_agent(self) -> RemoteA2aAgent: + return RemoteA2aAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + def test_promotes_text_content_to_output(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="Findings: ok")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output == "Findings: ok" + assert event.node_info.message_as_output is True + + def test_joins_multiple_text_parts(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part(text="line1\n"), + genai_types.Part(text="line2"), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output == "line1\nline2" + + def test_skips_thought_parts(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part(text="streaming update", thought=True), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + assert event.node_info.message_as_output is None + + def test_skips_function_call_parts(self): + """input-required events carry a mock function call and no text.""" + agent = self._make_agent() + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[ + genai_types.Part( + function_call=genai_types.FunctionCall( + id="fc1", + name="mock_function_call_for_required_user_input", + args={"input_required": "Please confirm"}, + ) + ), + ], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_skips_partial_events(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + partial=True, + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="streaming...")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_skips_events_not_authored_by_agent(self): + agent = self._make_agent() + event = Event( + author="some_other_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="Not mine")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output is None + + def test_preserves_existing_output(self): + agent = self._make_agent() + event = Event( + author="remote_agent", + output="preset", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="text")], + ), + ) + + agent._promote_response_to_output(event) + + assert event.output == "preset" + + def test_no_content_no_output(self): + agent = self._make_agent() + event = Event(author="remote_agent") + + agent._promote_response_to_output(event) + + assert event.output is None + + @pytest.mark.asyncio + async def test_run_impl_promotes_output_for_each_event(self): + """``_run_impl`` calls ``_promote_response_to_output`` per event. + + Uses a subclass that overrides ``_run_async_impl`` to yield a + deterministic event, then drives ``_run_impl`` through the public + workflow node entry point. + """ + + yielded_event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text="agent reply")], + ), + ) + + class _StubRemoteAgent(RemoteA2aAgent): + + async def _run_async_impl(self, ctx): + yield yielded_event + + agent = _StubRemoteAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + from google.adk.apps.app import App + from google.adk.workflow._join_node import JoinNode + from google.adk.workflow._workflow import Workflow + + from tests.unittests import testing_utils + + workflow = Workflow( + name="wf", + edges=[("START", agent, JoinNode(name="join"))], + ) + app_instance = App(name="t", root_agent=workflow) + runner = testing_utils.InMemoryRunner(app=app_instance) + events = await runner.run_async(testing_utils.get_user_content("start")) + + join_outputs = [ + e + for e in events + if isinstance(e, Event) + and e.output is not None + and "join" in (e.node_info.path or "") + ] + assert join_outputs, "JoinNode should emit an aggregated output event" + assert join_outputs[0].output == {"remote_agent": "agent reply"} From bb6690fd6681fb0ce654640d2c3501fb6888cbd4 Mon Sep 17 00:00:00 2001 From: Harineko0 Date: Tue, 26 May 2026 15:44:28 +0900 Subject: [PATCH 2/2] fix(a2a): Gate workflow output promotion to first terminal A2A event The v2 A2A response handler delegates to converters that do not mark streaming working-state text as thought=True, so the prior fix promoted every non-partial text event to event.output. NodeRunner sets ctx.output from the first one and raises "Output already set" on the next, breaking streaming RemoteA2aAgent workflow nodes before they reach the real final answer. Skip events whose A2A task state is submitted, working, input-required, auth-required, or unknown (read from custom_metadata['a2a:response']), and short-circuit further promotion in _run_impl after the first terminal event so trailing artifact updates on a completed task don't trigger the double-set. --- src/google/adk/agents/remote_a2a_agent.py | 58 ++++++-- .../unittests/agents/test_remote_a2a_agent.py | 126 +++++++++++++++++- 2 files changed, 170 insertions(+), 14 deletions(-) diff --git a/src/google/adk/agents/remote_a2a_agent.py b/src/google/adk/agents/remote_a2a_agent.py index 6d34cb4302..b702f8c715 100644 --- a/src/google/adk/agents/remote_a2a_agent.py +++ b/src/google/adk/agents/remote_a2a_agent.py @@ -815,6 +815,14 @@ async def _run_live_impl( # This makes the function into an async generator but the yield is still unreachable yield + # Task states that represent in-progress or input-awaiting work. + # Events stamped with one of these states carry intermediate or + # waiting-for-input content, never the final answer, so they must + # not be promoted to the workflow node's output. + _NON_FINAL_TASK_STATES = frozenset( + {"submitted", "working", "input-required", "auth-required", "unknown"} + ) + async def _run_impl( self, *, @@ -829,24 +837,51 @@ async def _run_impl( sees ``None`` for each predecessor because ``BaseAgent._run_impl`` never sets ``event.output`` and ``RemoteA2aAgent`` carries its response only in ``event.content``. + + A node may produce at most one output (``Context.output`` raises + ``ValueError`` on a second assignment), so promotion is gated to + the first terminal A2A event of the run. Non-final task states and + later events are passed through untouched. """ + promoted = False async for event in super()._run_impl(ctx=ctx, node_input=node_input): - self._promote_response_to_output(event) + if not promoted and self._promote_response_to_output(event): + promoted = True yield event - def _promote_response_to_output(self, event: Event) -> None: - """Sets ``event.output`` from non-thought text parts, if any. - - Skips partial events, events not authored by this agent, and events - whose content carries only thoughts, function calls, or function - responses (e.g. input-required mock function calls). + def _promote_response_to_output(self, event: Event) -> bool: + """Sets ``event.output`` from non-thought text parts, if eligible. + + Returns True iff this call assigned ``event.output``. Skips: + + * partial events and events whose ``event.output`` is already set; + * events not authored by this agent; + * events whose content carries only thoughts, function calls, or + function responses (e.g. ``input_required`` mock function calls); + * events whose A2A task state is non-final (``submitted``, + ``working``, ``input-required``, ``auth-required``, ``unknown``). + Streaming converters do not always mark ``working`` text as + ``thought=True``, so the task-state check guards against + promoting an intermediate streaming chunk and then raising on the + true final event. """ if event.partial or event.output is not None: - return + return False if event.author != self.name: - return + return False if not event.content or not event.content.parts: - return + return False + + response_meta = (event.custom_metadata or {}).get( + A2A_METADATA_PREFIX + "response" + ) + if isinstance(response_meta, dict): + status = response_meta.get("status") + if ( + isinstance(status, dict) + and status.get("state") in self._NON_FINAL_TASK_STATES + ): + return False text_chunks = [ part.text @@ -857,9 +892,10 @@ def _promote_response_to_output(self, event: Event) -> None: and not part.function_response ] if not text_chunks: - return + return False event.output = "".join(text_chunks) event.node_info.message_as_output = True + return True async def cleanup(self) -> None: """Clean up resources, especially the HTTP client if owned by this agent.""" diff --git a/tests/unittests/agents/test_remote_a2a_agent.py b/tests/unittests/agents/test_remote_a2a_agent.py index 60a64cc400..ac783d040e 100644 --- a/tests/unittests/agents/test_remote_a2a_agent.py +++ b/tests/unittests/agents/test_remote_a2a_agent.py @@ -2970,8 +2970,7 @@ def test_promotes_text_content_to_output(self): ), ) - agent._promote_response_to_output(event) - + assert agent._promote_response_to_output(event) is True assert event.output == "Findings: ok" assert event.node_info.message_as_output is True @@ -3080,10 +3079,131 @@ def test_no_content_no_output(self): agent = self._make_agent() event = Event(author="remote_agent") - agent._promote_response_to_output(event) + assert agent._promote_response_to_output(event) is False + assert event.output is None + + def _make_text_event( + self, text: str = "reply", task_state: str | None = None + ) -> Event: + event = Event( + author="remote_agent", + content=genai_types.Content( + role="model", + parts=[genai_types.Part(text=text)], + ), + ) + if task_state is not None: + event.custom_metadata = { + A2A_METADATA_PREFIX + "response": {"status": {"state": task_state}} + } + return event + + @pytest.mark.parametrize( + "state", + [ + "submitted", + "working", + "input-required", + "auth-required", + "unknown", + ], + ) + def test_skips_non_final_task_states(self, state): + """Streaming converters may leave non-final text un-thoughted. + + The task-state check on ``custom_metadata['a2a:response']`` is the + guard that prevents ``ctx.output`` from being overwritten by an + intermediate event and then raising on the real final event. + """ + agent = self._make_agent() + event = self._make_text_event(text="in-progress chunk", task_state=state) + assert agent._promote_response_to_output(event) is False assert event.output is None + @pytest.mark.parametrize( + "state", + ["completed", "failed", "canceled", "rejected"], + ) + def test_promotes_terminal_task_states(self, state): + agent = self._make_agent() + event = self._make_text_event(text="final answer", task_state=state) + + assert agent._promote_response_to_output(event) is True + assert event.output == "final answer" + + def test_promotes_when_response_metadata_absent(self): + """Non-Task A2A responses (plain Message) carry no task status.""" + agent = self._make_agent() + event = self._make_text_event(text="message reply") + + assert agent._promote_response_to_output(event) is True + assert event.output == "message reply" + + @pytest.mark.asyncio + async def test_run_impl_promotes_only_first_terminal_event(self): + """Guards against ``ValueError: Output already set``. + + When the v2 converter path emits a ``working`` text event followed + by a ``completed`` text event, the first must be passed through + untouched and only the terminal event promoted. After that, any + further promotable event must also be left alone. + """ + + working = self._make_text_event( + text="thinking out loud", task_state="working" + ) + completed = self._make_text_event( + text="final answer", task_state="completed" + ) + trailing = self._make_text_event( + text="ignored trailing artifact", task_state="completed" + ) + + class _StubRemoteAgent(RemoteA2aAgent): + + async def _run_async_impl(self, ctx): + yield working + yield completed + yield trailing + + agent = _StubRemoteAgent( + name="remote_agent", + agent_card=create_test_agent_card(), + ) + + from google.adk.apps.app import App + from google.adk.workflow._join_node import JoinNode + from google.adk.workflow._workflow import Workflow + + from tests.unittests import testing_utils + + workflow = Workflow( + name="wf", + edges=[("START", agent, JoinNode(name="join"))], + ) + app_instance = App(name="t", root_agent=workflow) + runner = testing_utils.InMemoryRunner(app=app_instance) + + events = await runner.run_async(testing_utils.get_user_content("start")) + + # No "Output already set" raised, and the JoinNode aggregates the + # terminal event's text — not the working intermediate, not the + # trailing artifact. + join_outputs = [ + e + for e in events + if isinstance(e, Event) + and e.output is not None + and "join" in (e.node_info.path or "") + ] + assert join_outputs + assert join_outputs[0].output == {"remote_agent": "final answer"} + + assert working.output is None + assert completed.output == "final answer" + assert trailing.output is None + @pytest.mark.asyncio async def test_run_impl_promotes_output_for_each_event(self): """``_run_impl`` calls ``_promote_response_to_output`` per event.