Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/google/adk/agents/remote_a2a_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,88 @@ async def _run_live_impl(
# This makes the function into an async generator but the yield is still unreachable
yield

# Task states that represent in-progress or input-awaiting work.
# Events stamped with one of these states carry intermediate or
# waiting-for-input content, never the final answer, so they must
# not be promoted to the workflow node's output.
_NON_FINAL_TASK_STATES = frozenset(
{"submitted", "working", "input-required", "auth-required", "unknown"}
)

async def _run_impl(
self,
*,
ctx: Any,
node_input: Any,
) -> AsyncGenerator[Any, None]:
"""Runs the agent as a workflow node.

Promotes textual response content to ``event.output`` so the
workflow scheduler propagates it downstream. Without this, a
``JoinNode`` that aggregates parallel ``RemoteA2aAgent`` predecessors
sees ``None`` for each predecessor because ``BaseAgent._run_impl``
never sets ``event.output`` and ``RemoteA2aAgent`` carries its
response only in ``event.content``.

A node may produce at most one output (``Context.output`` raises
``ValueError`` on a second assignment), so promotion is gated to
the first terminal A2A event of the run. Non-final task states and
later events are passed through untouched.
"""
promoted = False
async for event in super()._run_impl(ctx=ctx, node_input=node_input):
if not promoted and self._promote_response_to_output(event):
promoted = True
yield event

def _promote_response_to_output(self, event: Event) -> bool:
"""Sets ``event.output`` from non-thought text parts, if eligible.

Returns True iff this call assigned ``event.output``. Skips:

* partial events and events whose ``event.output`` is already set;
* events not authored by this agent;
* events whose content carries only thoughts, function calls, or
function responses (e.g. ``input_required`` mock function calls);
* events whose A2A task state is non-final (``submitted``,
``working``, ``input-required``, ``auth-required``, ``unknown``).
Streaming converters do not always mark ``working`` text as
``thought=True``, so the task-state check guards against
promoting an intermediate streaming chunk and then raising on the
true final event.
"""
if event.partial or event.output is not None:
return False
if event.author != self.name:
return False
if not event.content or not event.content.parts:
return False

response_meta = (event.custom_metadata or {}).get(
A2A_METADATA_PREFIX + "response"
)
if isinstance(response_meta, dict):
status = response_meta.get("status")
if (
isinstance(status, dict)
and status.get("state") in self._NON_FINAL_TASK_STATES
):
return False

text_chunks = [
part.text
for part in event.content.parts
if part.text
and not part.thought
and not part.function_call
and not part.function_response
]
if not text_chunks:
return False
event.output = "".join(text_chunks)
event.node_info.message_as_output = True
return True

async def cleanup(self) -> None:
"""Clean up resources, especially the HTTP client if owned by this agent."""
if self._httpx_client_needs_cleanup and self._httpx_client:
Expand Down
Loading