fix(a2a): a2a-sdk 1.x compat + input/output capture#170
Open
SuhaniNagpal7 wants to merge 1 commit into
Open
Conversation
A2AInstrumentor silently no-op'd on a2a-sdk 1.x. The SDK restructured
the client surface in 1.0:
* The class `A2AClient` was renamed to `Client` (re-exported from
`a2a.client`).
* `send_task` and `send_task_streaming` were unified into a single
`Client.send_message(request, *, context=None)` which is `async def`
and returns `AsyncIterator[StreamResponse]` — streaming is the only
mode now.
* Request/response shapes moved to protobuf (`a2a_pb2.SendMessageRequest`
in, `a2a_pb2.StreamResponse` out, oneof of `task` / `message` /
`status_update` / `artifact_update`). The `Part.type` discriminator
was dropped — content type is now encoded by which oneof field is
populated (`text` / `raw` / `url` / `data` / `filename`).
The instrumentor's `_get_client_class` couldn't find `A2AClient`, so
`_instrument()` logged a single warning ("Could not locate A2AClient
class") and returned without patching anything. `instrument()` itself
returned cleanly, so the breakage was invisible — every subsequent A2A
call produced zero spans on a2a-sdk 1.x.
This PR makes the instrumentor work on both API generations and also
fills the Input/Output dashboard panels that were never populated on
either version.
Compat changes (items 1-9)
1. `_resolve_client_class` now also tries `from a2a.client import Client`
and returns a `(class, api_version)` tuple so the patching code knows
which method set to target. Old `A2AClient` path is kept as the v0
fallback.
2. `_methods_for_api` returns `[("send_message", True)]` on v1 and
`[("send_task", False), ("send_task_streaming", True)]` on v0. The
`_a2a_streaming` marker is preserved for symmetry; on v1 it's
always True.
3. `A2AClientWrapper.__call__` now dispatches between `_sync_call` and
`_async_call` based on `inspect.iscoroutinefunction(wrapped)`.
`Client.send_message` is async-only, so it routes to `_async_call`.
The old `__call_async__` was dead code — wrapt only invokes
`__call__` — so it's gone. Switched from the deprecated
`asyncio.iscoroutinefunction` to `inspect.iscoroutinefunction`.
4. `_async_call` awaits the wrapped coroutine and wraps the returned
`AsyncIterator` with `_AsyncStreamingSpanWrapper`, which now also
handles the protobuf event shapes (see #6-#8).
5. W3C header injection (`kwargs["headers"]`) is skipped on v1 because
`Client.send_message` has no `headers` kwarg. The W3C trace-id is
still recorded locally as `gen_ai.a2a.propagated_trace_id`;
cross-wire propagation on v1 will need a `ClientCallInterceptor`
in a follow-up.
6. `get_send_message_request_attributes` reads the protobuf
`SendMessageRequest.message` and yields task_id / role / parts.count
for the pre-call span attributes.
7. `_extract_event_state` walks the new oneof: tries
`event.status_update.status`, then `event.task.status`, then the
v0 `event.status` shape. Returns the state value (string for v0
enums, int for protobuf enums).
8. `_extract_event_artifact` walks the new oneof: tries
`event.artifact_update.artifact`, then `event.task.artifacts[-1]`,
then the v0 `event.artifact` shape.
9. `get_artifact_type` falls back to inferring "text" / "file" /
"data" from the populated v1 `Part` field name when the v0
`.type` discriminator is absent.
Input/Output capture (items 10-14)
Pre-existing gap, not v1-specific: the 0.x wrapper never set FI's
canonical `input.value` / `output.value`, so the dashboard's Input
and Output panels were always blank. Filled in for both versions
because the surface area is the same:
10. Shared text helpers in `_attributes.py` (`_part_text`,
`extract_parts_text`, `extract_message_text`,
`extract_artifact_text`) handle both v0 and v1 part shapes.
11. `_INPUT_VALUE_KEY` / `_OUTPUT_VALUE_KEY` constants source the
keys from `fi_instrumentation.fi_types.SpanAttributes` with a
raw-string fallback so the helpers still work when fi-core
isn't installed.
12. `input.value` is emitted up-front in both
`get_send_task_payload_attributes` (v0 dict payload) and
`get_send_message_request_attributes` (v1 protobuf request),
derived from the user message's text parts.
13. Both streaming wrappers (`_StreamingSpanWrapper` and
`_AsyncStreamingSpanWrapper`) now accumulate text from each
incoming artifact into `_output_fragments` and emit a single
concatenated `output.value` in `_finalize_span`.
14. `get_task_attributes` also emits `output.value` from
`task.artifacts[*].parts` for the v0 non-streaming case (where
a Task is returned directly).
Verified end-to-end against a2a-sdk 1.0.2: a `FakeClient` subclass
returning canned `StreamResponse` events (status_update +
artifact_update + status_update) produces a single `Client.send_message`
span in Future AGI Observe with:
* status OK
* gen_ai.span.kind = A2A_CLIENT
* gen_ai.a2a.task.id = task-smoke-1 (from protobuf SendMessageRequest)
* gen_ai.a2a.streaming = true
* gen_ai.a2a.task.state = 3 (TASK_STATE_COMPLETED)
* gen_ai.a2a.artifact.type = "text"
* gen_ai.a2a.propagated_trace_id = <w3c trace id>
* input.value = the user prompt text
* output.value = the accumulated artifact text
Known limitations (out of scope for this PR)
* Cross-wire W3C propagation on v1 needs a `ClientCallInterceptor`.
* Server-side instrumentation (`_a2a_server.py`) wasn't audited or
changed — if the inbound shape also moved in 1.x it's a separate fix.
* Protobuf enum integers (state=3, role=1) are surfaced as integers,
not their readable names — a small follow-up using
`pb.TaskState.Name(3)` etc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A2AInstrumentorsilently no-op'd ona2a-sdk1.x —instrument()returned cleanly with a single WARNING log, no methods were wrapped, every A2A call produced zero spans. The SDK restructured its client surface in 1.0: the class was renamedA2AClient → Client, the two send methods collapsed into a single async-streamingClient.send_message, and request/response types moved to protobuf. The wrapper's_get_client_classcouldn't findA2AClientanywhere, hit its "SDK structure may have changed" warning path, and bailed.This PR (a) makes the instrumentor work on both v0 and v1, and (b) fills the FI dashboard's Input/Output panels which were never populated even on v0. Verified end-to-end against
a2a-sdk1.0.2.What was broken on a2a-sdk 1.x
A2AClientClient_get_client_classreturns None → warning, no patchingsend_task+send_task_streamingsend_message(always async, always streaming)SendMessageRequest_extract_payload'sdict.get(\"payload\")finds nothingevent.artifact/event.statustask/message/status_update/artifact_update_extract_event_state/ artifact access all return NonePart.typeget_artifact_typealways returns Nonekwargs[\"headers\"]passes through to httpxClient.send_messagehas noheaderskwargasync defreturningAsyncIterator__call__would return the coroutine without awaiting; the dead__call_async__was never reached by wraptWhat this patch changes
All three files in
traceai_a2a/:__init__.py_resolve_client_classreturns(class, api_version)and triesa2a.client.Clientfirst, falls back toa2a.client.A2AClient, then togetattr(a2a_module, \"A2AClient\", None)._methods_for_api(api_version)returns[(\"send_message\", True)]on v1 and[(\"send_task\", False), (\"send_task_streaming\", True)]on v0._instrument/_uninstrumentiterate that list instead of hard-coding the two v0 methods._a2a_streamingmarker is still attached to each wrapped function for symmetry — always True on v1._a2a_client.pyA2AClientWrapper.__init__takes anapi_version(default v0 for backward compat with tests that don't pass it).__call__dispatches to_sync_callor_async_callbased oninspect.iscoroutinefunction(wrapped). Removed the dead__call_async__method._async_callawaits the wrapped coroutine, wraps the returnedAsyncIteratorwith_AsyncStreamingSpanWrapper, handles the streaming-only v1 path.headerskwarg). The W3C trace-id is still recorded locally asgen_ai.a2a.propagated_trace_id.Client.send_messageon v1, unchanged on v0._extract_event_statewalks the v1 oneof (event.status_update.status→event.task.status→ fallback to v0'sevent.status)._extract_event_artifactwalks the v1 oneof (event.artifact_update.artifact→event.task.artifacts[-1]→ fallback to v0'sevent.artifact)._output_fragments, joined and emitted asoutput.valuein_finalize_span._attributes.py_part_text,extract_parts_text,extract_message_text,extract_artifact_text(handle both v0 and v1 part shapes).get_send_message_request_attributesextracts task_id / role / parts.count / input text from a protobufSendMessageRequest.get_send_task_payload_attributes(v0) now also emits input text frompayload[\"message\"]parts.get_task_attributes(v0 non-streaming) now also emits output text fromtask.artifacts[*].parts.get_artifact_typefalls back to inferring\"text\"/\"file\"/\"data\"from the populated v1Partfield when the v0.typediscriminator is absent.What's NOT changed (out of scope)
traceparentbecause there's noheaderskwarg. The proper hook isClientCallInterceptor— separate PR._a2a_server.py. Not audited or changed. If the inbound A2A server shape also moved in 1.x, it's a separate fix.gen_ai.a2a.task.state = 3andgen_ai.a2a.message.role = 1surface as integers; readable names would need apb.TaskState.Name(value)translation in_enum_or_str. Cosmetic.[tool.poetry.extras] a2a = [\"a2a-sdk\"]is unpinned and the package now works on both lines, so left alone.Test plan
python -m py_compileon all three changed files — syntax clean.a2a-sdk1.0.2,fi-instrumentation-otel, and the localtraceai-a2ainstalled editable.A2AInstrumentor().instrument()logsWARNING traceai_a2a: Could not locate A2AClient class in a2a module;Client.send_message.__wrapped__is False; zero spans on any call.instrument()runs cleanly;Client.send_message.__wrapped__is True;_a2a_streaming = True.FakeClient(Client)subclass returning a cannedAsyncIteratorofStreamResponse(status_update WORKING → artifact_update with a text Part → status_update COMPLETED). All three events consumed, span flushed.traceai-a2a-smokeproject) shows:gen_ai.span.kind = \"A2A_CLIENT\"gen_ai.a2a.agent.url = \"http://weather-agent.local\"gen_ai.a2a.streaming = truegen_ai.a2a.task.id = \"task-smoke-1\"(extracted from protobufSendMessageRequest.message.task_id)gen_ai.a2a.message.role = 1(ROLE_USER)gen_ai.a2a.message.parts.count = 1gen_ai.a2a.task.state = 3(TASK_STATE_COMPLETED)gen_ai.a2a.artifact.type = \"text\"(inferred from populatedPart.textfield)gen_ai.a2a.propagated_trace_idmatches the trace's own W3C idinput.value)output.value)Files changed
python/frameworks/a2a/traceai_a2a/__init__.py— 78 / 57 ins/delpython/frameworks/a2a/traceai_a2a/_a2a_client.py— 196 / 34 ins/delpython/frameworks/a2a/traceai_a2a/_attributes.py— 137 / 8 ins/del