From 78bed771030c331f1d4cb9ad0a80d23d3414f3f1 Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Fri, 29 May 2026 14:14:57 +0200 Subject: [PATCH] fix: fix REST client URL construction and SSE stability with HTTP/1.1 - Use string concatenation instead of URI.resolve() for agent card path resolution. - Make HTTP version configurable in JdkA2AHttpClient, defaulting to HTTP/2 for requests and HTTP/1.1 for SSE - Handle HTTP/1.1 cancellation exceptions gracefully in SSE stream shutdown. Signed-off-by: Emmanuel Hugonnet --- .../spi/sse/AbstractSSEEventListener.java | 22 +++++-- .../client/http/A2ACardResolver_v0_3.java | 8 ++- .../http/A2ACardResolver_v0_3_Test.java | 13 ++++ .../sdk/client/http/A2ACardResolver.java | 26 +++++--- .../sdk/client/http/JdkA2AHttpClient.java | 63 ++++++++++++++++--- .../sdk/client/http/A2ACardResolverTest.java | 23 +++++-- .../sdk/grpc/utils/JSONRPCUtilsTest.java | 54 ++++++++++++++++ 7 files changed, 184 insertions(+), 25 deletions(-) diff --git a/client/transport/spi/src/main/java/org/a2aproject/sdk/client/transport/spi/sse/AbstractSSEEventListener.java b/client/transport/spi/src/main/java/org/a2aproject/sdk/client/transport/spi/sse/AbstractSSEEventListener.java index b587fdb5d..ad535c604 100644 --- a/client/transport/spi/src/main/java/org/a2aproject/sdk/client/transport/spi/sse/AbstractSSEEventListener.java +++ b/client/transport/spi/src/main/java/org/a2aproject/sdk/client/transport/spi/sse/AbstractSSEEventListener.java @@ -1,5 +1,6 @@ package org.a2aproject.sdk.client.transport.spi.sse; +import java.io.UncheckedIOException; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.logging.Logger; @@ -76,9 +77,7 @@ public void onError(Throwable throwable, @Nullable Future future) { if (errorHandler != null) { errorHandler.accept(throwable); } - if (future != null) { - future.cancel(true); // close SSE channel - } + cancelQuietly(future); } /** @@ -97,7 +96,22 @@ protected void handleEvent(StreamingEventKind event, @Nullable Future futu // This covers late subscriptions to completed tasks and ensures no connection leaks if (shouldAutoClose(event) && future != null) { log.fine("Auto-closing SSE connection for final event: " + event.getClass().getSimpleName()); - future.cancel(true); // close SSE channel + cancelQuietly(future); + } + } + + /** + * Cancels a future without propagating exceptions. + * HTTP/1.1 connections may throw {@link java.io.IOException} synchronously + * when cancelled, unlike HTTP/2 which uses a clean RST_STREAM. + */ + private static void cancelQuietly(@Nullable Future future) { + if (future != null) { + try { + future.cancel(true); + } catch (UncheckedIOException e) { + log.fine("Exception during SSE connection close: " + e.getMessage()); + } } } diff --git a/compat-0.3/client/transport/spi/src/main/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3.java b/compat-0.3/client/transport/spi/src/main/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3.java index 2d91eb195..17a56a3dd 100644 --- a/compat-0.3/client/transport/spi/src/main/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3.java +++ b/compat-0.3/client/transport/spi/src/main/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3.java @@ -5,6 +5,9 @@ import java.net.URISyntaxException; import java.util.Map; +import static org.a2aproject.sdk.util.Assert.checkNotNullParam; + +import org.a2aproject.sdk.client.http.A2ACardResolver; import org.a2aproject.sdk.client.http.A2AHttpClient; import org.a2aproject.sdk.client.http.A2AHttpClientFactory; import org.a2aproject.sdk.client.http.A2AHttpResponse; @@ -65,13 +68,16 @@ public A2ACardResolver_v0_3(A2AHttpClient httpClient, String baseUrl, String age */ public A2ACardResolver_v0_3(A2AHttpClient httpClient, String baseUrl, @Nullable String agentCardPath, @Nullable Map authHeaders) throws A2AClientError_v0_3 { + checkNotNullParam("httpClient", httpClient); + checkNotNullParam("baseUrl", baseUrl); this.httpClient = httpClient; String effectiveAgentCardPath = agentCardPath == null || agentCardPath.isEmpty() ? DEFAULT_AGENT_CARD_PATH : agentCardPath; try { - this.url = new URI(baseUrl).resolve(effectiveAgentCardPath).toString(); + new URI(baseUrl); // validate syntax only — URI.resolve() would silently drop sub-paths } catch (URISyntaxException e) { throw new A2AClientError_v0_3("Invalid agent URL", e); } + this.url = A2ACardResolver.buildCardUrl(baseUrl, effectiveAgentCardPath); this.authHeaders = authHeaders; } diff --git a/compat-0.3/client/transport/spi/src/test/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3_Test.java b/compat-0.3/client/transport/spi/src/test/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3_Test.java index 1b6e2736e..6fe4ca3b0 100644 --- a/compat-0.3/client/transport/spi/src/test/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3_Test.java +++ b/compat-0.3/client/transport/spi/src/test/java/org/a2aproject/sdk/compat03/client/http/A2ACardResolver_v0_3_Test.java @@ -62,6 +62,19 @@ public void testConstructorStripsSlashes() throws Exception { card = resolver.getAgentCard(); assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); + + // baseUrl with sub-path and trailing slash — the original URI.resolve() bug silently + // dropped the sub-path, producing http://example.com/.well-known/agent-card.json instead + resolver = new A2ACardResolver_v0_3(client, "http://example.com/jsonrpc/", AGENT_CARD_PATH); + card = resolver.getAgentCard(); + + assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url); + + // baseUrl with sub-path, no trailing slash + resolver = new A2ACardResolver_v0_3(client, "http://example.com/jsonrpc", AGENT_CARD_PATH); + card = resolver.getAgentCard(); + + assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url); } diff --git a/http-client/src/main/java/org/a2aproject/sdk/client/http/A2ACardResolver.java b/http-client/src/main/java/org/a2aproject/sdk/client/http/A2ACardResolver.java index e8ee10397..2bf8d5509 100644 --- a/http-client/src/main/java/org/a2aproject/sdk/client/http/A2ACardResolver.java +++ b/http-client/src/main/java/org/a2aproject/sdk/client/http/A2ACardResolver.java @@ -2,8 +2,6 @@ import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.Map; import org.a2aproject.sdk.grpc.utils.JSONRPCUtils; @@ -145,14 +143,28 @@ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, @Nullable Strin this.httpClient = httpClient; String effectiveAgentCardPath = (agentCardPath == null || agentCardPath.isEmpty()) ? DEFAULT_AGENT_CARD_PATH : agentCardPath; - try { - this.url = new URI(org.a2aproject.sdk.util.Utils.buildBaseUrl(new AgentInterface("JSONRPC", baseUrl, ""), tenant)).resolve(effectiveAgentCardPath).toString(); - } catch (URISyntaxException e) { - throw new A2AClientError("Invalid agent URL", e); - } + String baseUrlStr = org.a2aproject.sdk.util.Utils.buildBaseUrl(new AgentInterface("JSONRPC", baseUrl, ""), tenant); + this.url = buildCardUrl(baseUrlStr, effectiveAgentCardPath); this.authHeaders = authHeaders; } + /** + * Normalizes {@code baseUrl} and {@code cardPath} and concatenates them into a full card URL. + * + *

Strips any trailing slash from {@code baseUrl} and ensures {@code cardPath} starts with + * a leading slash before concatenating, so both {@code http://host/base/} and + * {@code http://host/base} produce the same result. + * + * @param baseUrl the agent base URL, must not be null + * @param cardPath the card endpoint path, must not be null + * @return the normalized card URL + */ + public static String buildCardUrl(String baseUrl, String cardPath) { + String cleanBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + String normalizedPath = cardPath.startsWith("/") ? cardPath : "/" + cardPath; + return cleanBase + normalizedPath; + } + /** * Get the agent card for the configured A2A agent. * diff --git a/http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java b/http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java index 7499d52ac..9880bb0fb 100644 --- a/http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java +++ b/http-client/src/main/java/org/a2aproject/sdk/client/http/JdkA2AHttpClient.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,7 +34,7 @@ *

This is the fallback implementation used when no higher-priority * {@link A2AHttpClientProvider} is available. It provides full support for: *

    - *
  • HTTP/2 with automatic fallback to HTTP/1.1
  • + *
  • HTTP/2 for regular requests, HTTP/1.1 for SSE streaming (configurable per request type)
  • *
  • Synchronous GET, POST, and DELETE requests
  • *
  • Asynchronous Server-Sent Events (SSE) streaming
  • *
  • Automatic redirect following
  • @@ -52,19 +53,21 @@ public class JdkA2AHttpClient implements A2AHttpClient { private final HttpClient httpClient; + private final HttpClient.Version requestVersion; + private final HttpClient.Version sseVersion; /** * Creates a new JDK-based HTTP client. * *

    Configures the client with: *

      - *
    • HTTP/2 preferred (with HTTP/1.1 fallback)
    • + *
    • HTTP/2 for regular requests (with HTTP/1.1 fallback)
    • + *
    • HTTP/1.1 for SSE streaming connections
    • *
    • Normal redirect following
    • *
    */ public JdkA2AHttpClient() { this(HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_2) .followRedirects(HttpClient.Redirect.NORMAL) .build()); } @@ -72,11 +75,27 @@ public JdkA2AHttpClient() { /** * Creates a new JDK-based HTTP client using a caller-provided JDK {@link HttpClient}. * + *

    Uses HTTP/2 for regular requests and HTTP/1.1 for SSE streaming connections. + * * @param httpClient the JDK HTTP client to delegate requests to * @throws IllegalArgumentException if {@code httpClient} is {@code null} */ public JdkA2AHttpClient(HttpClient httpClient) { + this(httpClient, HttpClient.Version.HTTP_2, HttpClient.Version.HTTP_1_1); + } + + /** + * Creates a new JDK-based HTTP client with configurable HTTP versions. + * + * @param httpClient the JDK HTTP client to delegate requests to + * @param requestVersion the HTTP version to use for regular (non-SSE) requests + * @param sseVersion the HTTP version to use for SSE streaming connections + * @throws IllegalArgumentException if any parameter is {@code null} + */ + public JdkA2AHttpClient(HttpClient httpClient, HttpClient.Version requestVersion, HttpClient.Version sseVersion) { this.httpClient = checkNotNullParam("httpClient", httpClient); + this.requestVersion = checkNotNullParam("requestVersion", requestVersion); + this.sseVersion = checkNotNullParam("sseVersion", sseVersion); } @Override @@ -125,9 +144,25 @@ T self() { return (T) this; } + static boolean isCancellation(Throwable t) { + while (t != null) { + if (t instanceof CancellationException) { + return true; + } + // HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this + // hardcoded English message (not locale-sensitive — no resource bundle is used) + if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) { + return true; + } + t = t.getCause(); + } + return false; + } + protected HttpRequest.Builder createRequestBuilder() throws IOException { HttpRequest.Builder builder = HttpRequest.newBuilder() - .uri(URI.create(url)); + .uri(URI.create(url)) + .version(requestVersion); for (Map.Entry headerEntry : headers.entrySet()) { builder.header(headerEntry.getKey(), headerEntry.getValue()); } @@ -143,6 +178,7 @@ protected CompletableFuture asyncRequest( ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer); AtomicBoolean useSseParser = new AtomicBoolean(false); AtomicBoolean errorNotified = new AtomicBoolean(false); + AtomicBoolean completeNotified = new AtomicBoolean(false); StringBuilder nonSseBodyBuffer = new StringBuilder(); Flow.Subscriber subscriber = new Flow.Subscriber() { @@ -174,6 +210,14 @@ public void onNext(String item) { @Override public void onError(Throwable throwable) { + if (isCancellation(throwable)) { + // Treat cancellation as clean completion so completeRunnable fires and + // callers can distinguish a deliberate cancel from a real error. + // onComplete() guards against double-invocation via completeNotified, so + // calling it here is safe even if the publisher also delivers onComplete(). + onComplete(); + return; + } if (errorNotified.compareAndSet(false, true)) { errorConsumer.accept(throwable); } @@ -184,7 +228,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { - if (!errorNotified.get()) { + if (!errorNotified.get() && completeNotified.compareAndSet(false, true)) { if (useSseParser.get()) { sseParser.flush(); } else { @@ -248,7 +292,8 @@ public void onComplete() { // normally — consistent with the Vert.x and Android implementations. return httpClient.sendAsync(request, bodyHandler) .handle((response, throwable) -> { - if (throwable != null && errorNotified.compareAndSet(false, true)) { + if (throwable != null && !isCancellation(throwable) + && errorNotified.compareAndSet(false, true)) { Throwable cause = throwable.getCause() != null ? throwable.getCause() : throwable; errorConsumer.accept(cause); } @@ -262,7 +307,8 @@ private class JdkGetBuilder extends JdkBuilder implements A2AHttpCli private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException { HttpRequest.Builder builder = super.createRequestBuilder().GET(); if (SSE) { - builder.header(ACCEPT, EVENT_STREAM); + builder.header(ACCEPT, EVENT_STREAM) + .version(sseVersion); } return builder; } @@ -327,7 +373,8 @@ private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException HttpRequest.Builder builder = super.createRequestBuilder() .POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8)); if (SSE) { - builder.header(ACCEPT, EVENT_STREAM); + builder.header(ACCEPT, EVENT_STREAM) + .version(sseVersion); } return builder; } diff --git a/http-client/src/test/java/org/a2aproject/sdk/client/http/A2ACardResolverTest.java b/http-client/src/test/java/org/a2aproject/sdk/client/http/A2ACardResolverTest.java index b89476185..a8fd6f1bf 100644 --- a/http-client/src/test/java/org/a2aproject/sdk/client/http/A2ACardResolverTest.java +++ b/http-client/src/test/java/org/a2aproject/sdk/client/http/A2ACardResolverTest.java @@ -39,29 +39,42 @@ public void testConstructorStripsSlashes() throws Exception { assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); - // baseUrl with trailing slash, agentCardParth with leading slash - resolver = new A2ACardResolver(client, "http://example.com/", AGENT_CARD_PATH); + // baseUrl with trailing slash, agentCardPath with leading slash + resolver = new A2ACardResolver(client, "http://example.com/", null, AGENT_CARD_PATH); card = resolver.getAgentCard(); assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); // baseUrl without trailing slash, agentCardPath with leading slash - resolver = new A2ACardResolver(client, "http://example.com", AGENT_CARD_PATH); + resolver = new A2ACardResolver(client, "http://example.com", null, AGENT_CARD_PATH); card = resolver.getAgentCard(); assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); // baseUrl with trailing slash, agentCardPath without leading slash - resolver = new A2ACardResolver(client, "http://example.com/", AGENT_CARD_PATH.substring(1)); + resolver = new A2ACardResolver(client, "http://example.com/", null, AGENT_CARD_PATH.substring(1)); card = resolver.getAgentCard(); assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); // baseUrl without trailing slash, agentCardPath without leading slash - resolver = new A2ACardResolver(client, "http://example.com", AGENT_CARD_PATH.substring(1)); + resolver = new A2ACardResolver(client, "http://example.com", null, AGENT_CARD_PATH.substring(1)); card = resolver.getAgentCard(); assertEquals("http://example.com" + AGENT_CARD_PATH, client.url); + + // baseUrl with sub-path and trailing slash — the original URI.resolve() bug silently + // dropped the sub-path, producing http://example.com/.well-known/agent-card.json instead + resolver = new A2ACardResolver(client, "http://example.com/jsonrpc/", null, AGENT_CARD_PATH); + card = resolver.getAgentCard(); + + assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url); + + // baseUrl with sub-path, no trailing slash + resolver = new A2ACardResolver(client, "http://example.com/jsonrpc", null, AGENT_CARD_PATH); + card = resolver.getAgentCard(); + + assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url); } diff --git a/spec-grpc/src/test/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtilsTest.java b/spec-grpc/src/test/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtilsTest.java index 0a89eb194..782d41093 100644 --- a/spec-grpc/src/test/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtilsTest.java +++ b/spec-grpc/src/test/java/org/a2aproject/sdk/grpc/utils/JSONRPCUtilsTest.java @@ -22,6 +22,7 @@ import org.a2aproject.sdk.jsonrpc.common.wrappers.GetTaskPushNotificationConfigResponse; import org.a2aproject.sdk.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigRequest; import org.a2aproject.sdk.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigResponse; +import org.a2aproject.sdk.spec.AgentCard; import org.a2aproject.sdk.spec.InvalidParamsError; import org.a2aproject.sdk.util.ErrorDetail; import org.a2aproject.sdk.spec.JSONParseError; @@ -484,4 +485,57 @@ public void testToJsonRPCErrorResponse_RoundTrip() throws Exception { assertEquals(-32001, response.getError().getCode()); assertEquals("Custom message", response.getError().getMessage()); } + + @Test + public void testParseAgentCard() throws Exception { + String agentCardPayload = """ + { + "defaultInputModes": [ + "text" + ], + "defaultOutputModes": [ + "text" + ], + "description": "Multi-transport Go agent with A2A v0.3 compatibility.", + "name": "ITK v10 Agent", + "version": "1.0.0-alpha", + "skills": [], + "capabilities": { + "streaming": true + }, + "supportedInterfaces": [ + { + "url": "http://127.0.0.1:59679/jsonrpc", + "protocolBinding": "JSONRPC", + "protocolVersion": "1.0" + }, + { + "url": "http://127.0.0.1:59679", + "protocolBinding": "JSONRPC", + "protocolVersion": "0.3" + }, + { + "url": "http://127.0.0.1:59679/rest", + "protocolBinding": "HTTP+JSON", + "protocolVersion": "1.0" + }, + { + "url": "127.0.0.1:39915", + "protocolBinding": "GRPC", + "protocolVersion": "1.0" + } + ], + "url": "http://127.0.0.1:59679", + "protocolVersion": "0.3", + "preferredTransport": "JSONRPC" + } + """; + org.a2aproject.sdk.grpc.AgentCard.Builder agentCardBuilder = org.a2aproject.sdk.grpc.AgentCard.newBuilder(); + JSONRPCUtils.parseJsonString(agentCardPayload, agentCardBuilder, "", true); + AgentCard agentCard = ProtoUtils.FromProto.agentCard(agentCardBuilder); + assertEquals("ITK v10 Agent", agentCard.name()); + assertEquals("Multi-transport Go agent with A2A v0.3 compatibility.", agentCard.description()); + assertEquals("1.0.0-alpha", agentCard.version()); + assertEquals(4, agentCard.supportedInterfaces().size()); + } }