Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.a2aproject.sdk.client.transport.spi.sse;

import java.io.UncheckedIOException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.logging.Logger;
Expand Down Expand Up @@ -76,9 +77,7 @@ public void onError(Throwable throwable, @Nullable Future<Void> future) {
if (errorHandler != null) {
errorHandler.accept(throwable);
}
if (future != null) {
future.cancel(true); // close SSE channel
}
cancelQuietly(future);
}

/**
Expand All @@ -97,7 +96,22 @@ protected void handleEvent(StreamingEventKind event, @Nullable Future<Void> futu
// This covers late subscriptions to completed tasks and ensures no connection leaks
if (shouldAutoClose(event) && future != null) {
log.fine("Auto-closing SSE connection for final event: " + event.getClass().getSimpleName());
future.cancel(true); // close SSE channel
cancelQuietly(future);
}
}

/**
* Cancels a future without propagating exceptions.
* HTTP/1.1 connections may throw {@link java.io.IOException} synchronously
* when cancelled, unlike HTTP/2 which uses a clean RST_STREAM.
*/
private static void cancelQuietly(@Nullable Future<Void> future) {
if (future != null) {
try {
future.cancel(true);
} catch (UncheckedIOException e) {
log.fine("Exception during SSE connection close: " + e.getMessage());
}
}
}
Comment on lines +108 to 116
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure that the future cancellation is truly quiet, it is safer to catch RuntimeException instead of only UncheckedIOException. Some custom Future implementations or downstream cancellation handlers might throw other runtime exceptions (such as IllegalStateException or UnsupportedOperationException), which would otherwise propagate and disrupt the stream processing.

Suggested change
private static void cancelQuietly(@Nullable Future<Void> future) {
if (future != null) {
try {
future.cancel(true);
} catch (UncheckedIOException e) {
log.fine("Exception during SSE connection close: " + e.getMessage());
}
}
}
private static void cancelQuietly(@Nullable Future<Void> future) {
if (future != null) {
try {
future.cancel(true);
} catch (RuntimeException e) {
log.fine("Exception during SSE connection close: " + e.getMessage());
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.net.URISyntaxException;
import java.util.Map;

import static org.a2aproject.sdk.util.Assert.checkNotNullParam;

import org.a2aproject.sdk.client.http.A2ACardResolver;
import org.a2aproject.sdk.client.http.A2AHttpClient;
import org.a2aproject.sdk.client.http.A2AHttpClientFactory;
import org.a2aproject.sdk.client.http.A2AHttpResponse;
Expand Down Expand Up @@ -65,13 +68,16 @@ public A2ACardResolver_v0_3(A2AHttpClient httpClient, String baseUrl, String age
*/
public A2ACardResolver_v0_3(A2AHttpClient httpClient, String baseUrl, @Nullable String agentCardPath,
@Nullable Map<String, String> authHeaders) throws A2AClientError_v0_3 {
checkNotNullParam("httpClient", httpClient);
checkNotNullParam("baseUrl", baseUrl);
this.httpClient = httpClient;
String effectiveAgentCardPath = agentCardPath == null || agentCardPath.isEmpty() ? DEFAULT_AGENT_CARD_PATH : agentCardPath;
try {
this.url = new URI(baseUrl).resolve(effectiveAgentCardPath).toString();
new URI(baseUrl); // validate syntax only — URI.resolve() would silently drop sub-paths
} catch (URISyntaxException e) {
throw new A2AClientError_v0_3("Invalid agent URL", e);
}
Comment thread
ehsavoie marked this conversation as resolved.
this.url = A2ACardResolver.buildCardUrl(baseUrl, effectiveAgentCardPath);
this.authHeaders = authHeaders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ public void testConstructorStripsSlashes() throws Exception {
card = resolver.getAgentCard();

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl with sub-path and trailing slash — the original URI.resolve() bug silently
// dropped the sub-path, producing http://example.com/.well-known/agent-card.json instead
resolver = new A2ACardResolver_v0_3(client, "http://example.com/jsonrpc/", AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url);

// baseUrl with sub-path, no trailing slash
resolver = new A2ACardResolver_v0_3(client, "http://example.com/jsonrpc", AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;

import org.a2aproject.sdk.grpc.utils.JSONRPCUtils;
Expand Down Expand Up @@ -145,14 +143,28 @@ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, @Nullable Strin

this.httpClient = httpClient;
String effectiveAgentCardPath = (agentCardPath == null || agentCardPath.isEmpty()) ? DEFAULT_AGENT_CARD_PATH : agentCardPath;
try {
this.url = new URI(org.a2aproject.sdk.util.Utils.buildBaseUrl(new AgentInterface("JSONRPC", baseUrl, ""), tenant)).resolve(effectiveAgentCardPath).toString();
} catch (URISyntaxException e) {
throw new A2AClientError("Invalid agent URL", e);
}
String baseUrlStr = org.a2aproject.sdk.util.Utils.buildBaseUrl(new AgentInterface("JSONRPC", baseUrl, ""), tenant);
this.url = buildCardUrl(baseUrlStr, effectiveAgentCardPath);
Comment thread
ehsavoie marked this conversation as resolved.
this.authHeaders = authHeaders;
}

/**
* Normalizes {@code baseUrl} and {@code cardPath} and concatenates them into a full card URL.
*
* <p>Strips any trailing slash from {@code baseUrl} and ensures {@code cardPath} starts with
* a leading slash before concatenating, so both {@code http://host/base/} and
* {@code http://host/base} produce the same result.
*
* @param baseUrl the agent base URL, must not be null
* @param cardPath the card endpoint path, must not be null
* @return the normalized card URL
*/
public static String buildCardUrl(String baseUrl, String cardPath) {
String cleanBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
String normalizedPath = cardPath.startsWith("/") ? cardPath : "/" + cardPath;
return cleanBase + normalizedPath;
}

/**
* Get the agent card for the configured A2A agent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -33,7 +34,7 @@
* <p>This is the fallback implementation used when no higher-priority
* {@link A2AHttpClientProvider} is available. It provides full support for:
* <ul>
* <li>HTTP/2 with automatic fallback to HTTP/1.1</li>
* <li>HTTP/2 for regular requests, HTTP/1.1 for SSE streaming (configurable per request type)</li>
* <li>Synchronous GET, POST, and DELETE requests</li>
* <li>Asynchronous Server-Sent Events (SSE) streaming</li>
* <li>Automatic redirect following</li>
Expand All @@ -52,31 +53,49 @@
public class JdkA2AHttpClient implements A2AHttpClient {

private final HttpClient httpClient;
private final HttpClient.Version requestVersion;
private final HttpClient.Version sseVersion;

/**
* Creates a new JDK-based HTTP client.
*
* <p>Configures the client with:
* <ul>
* <li>HTTP/2 preferred (with HTTP/1.1 fallback)</li>
* <li>HTTP/2 for regular requests (with HTTP/1.1 fallback)</li>
* <li>HTTP/1.1 for SSE streaming connections</li>
* <li>Normal redirect following</li>
* </ul>
*/
public JdkA2AHttpClient() {
this(HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.followRedirects(HttpClient.Redirect.NORMAL)
.build());
}

/**
* Creates a new JDK-based HTTP client using a caller-provided JDK {@link HttpClient}.
*
* <p>Uses HTTP/2 for regular requests and HTTP/1.1 for SSE streaming connections.
*
* @param httpClient the JDK HTTP client to delegate requests to
* @throws IllegalArgumentException if {@code httpClient} is {@code null}
*/
public JdkA2AHttpClient(HttpClient httpClient) {
this(httpClient, HttpClient.Version.HTTP_2, HttpClient.Version.HTTP_1_1);
}

/**
* Creates a new JDK-based HTTP client with configurable HTTP versions.
*
* @param httpClient the JDK HTTP client to delegate requests to
* @param requestVersion the HTTP version to use for regular (non-SSE) requests
* @param sseVersion the HTTP version to use for SSE streaming connections
* @throws IllegalArgumentException if any parameter is {@code null}
*/
public JdkA2AHttpClient(HttpClient httpClient, HttpClient.Version requestVersion, HttpClient.Version sseVersion) {
this.httpClient = checkNotNullParam("httpClient", httpClient);
this.requestVersion = checkNotNullParam("requestVersion", requestVersion);
this.sseVersion = checkNotNullParam("sseVersion", sseVersion);
}

@Override
Expand Down Expand Up @@ -125,9 +144,25 @@ T self() {
return (T) this;
}

static boolean isCancellation(Throwable t) {
while (t != null) {
if (t instanceof CancellationException) {
return true;
}
// HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this
// hardcoded English message (not locale-sensitive — no resource bundle is used)
if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) {
return true;
}
t = t.getCause();
}
return false;
}
Comment on lines +147 to +160
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Traversing exception causes using a simple while (t != null) loop can lead to an infinite loop and thread hang if there is a cyclic reference in the exception chain (e.g., if an exception has itself or a parent exception as its cause). Using a set of visited throwables guards against this scenario.

Suggested change
static boolean isCancellation(Throwable t) {
while (t != null) {
if (t instanceof CancellationException) {
return true;
}
// HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this
// hardcoded English message (not locale-sensitive — no resource bundle is used)
if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) {
return true;
}
t = t.getCause();
}
return false;
}
static boolean isCancellation(Throwable t) {
java.util.Set<Throwable> visited = java.util.Collections.newSetFromMap(new java.util.IdentityHashMap<>());
while (t != null && visited.add(t)) {
if (t instanceof CancellationException) {
return true;
}
// HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this
// hardcoded English message (not locale-sensitive — no resource bundle is used)
if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) {
return true;
}
t = t.getCause();
}
return false;
}


protected HttpRequest.Builder createRequestBuilder() throws IOException {
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(url));
.uri(URI.create(url))
.version(requestVersion);
for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
builder.header(headerEntry.getKey(), headerEntry.getValue());
}
Expand All @@ -143,6 +178,7 @@ protected CompletableFuture<Void> asyncRequest(
ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer);
AtomicBoolean useSseParser = new AtomicBoolean(false);
AtomicBoolean errorNotified = new AtomicBoolean(false);
AtomicBoolean completeNotified = new AtomicBoolean(false);
StringBuilder nonSseBodyBuffer = new StringBuilder();

Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
Expand Down Expand Up @@ -174,6 +210,14 @@ public void onNext(String item) {

@Override
public void onError(Throwable throwable) {
if (isCancellation(throwable)) {
// Treat cancellation as clean completion so completeRunnable fires and
// callers can distinguish a deliberate cancel from a real error.
// onComplete() guards against double-invocation via completeNotified, so
// calling it here is safe even if the publisher also delivers onComplete().
onComplete();
return;
}
if (errorNotified.compareAndSet(false, true)) {
errorConsumer.accept(throwable);
}
Expand All @@ -184,7 +228,7 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
if (!errorNotified.get()) {
if (!errorNotified.get() && completeNotified.compareAndSet(false, true)) {
if (useSseParser.get()) {
sseParser.flush();
} else {
Expand Down Expand Up @@ -248,7 +292,8 @@ public void onComplete() {
// normally — consistent with the Vert.x and Android implementations.
return httpClient.sendAsync(request, bodyHandler)
.<Void>handle((response, throwable) -> {
if (throwable != null && errorNotified.compareAndSet(false, true)) {
if (throwable != null && !isCancellation(throwable)
&& errorNotified.compareAndSet(false, true)) {
Comment on lines 294 to +296
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the stream has already completed cleanly (i.e., completeNotified is true), any subsequent error received in the handle block (such as a connection teardown or socket close exception) should be ignored. Otherwise, it might trigger the errorConsumer after a successful completion, leading to unexpected behavior.

Suggested change
.<Void>handle((response, throwable) -> {
if (throwable != null && errorNotified.compareAndSet(false, true)) {
if (throwable != null && !isCancellation(throwable)
&& errorNotified.compareAndSet(false, true)) {
.<Void>handle((response, throwable) -> {
if (throwable != null && !isCancellation(throwable)
&& !completeNotified.get()
&& errorNotified.compareAndSet(false, true)) {

Throwable cause = throwable.getCause() != null ? throwable.getCause() : throwable;
errorConsumer.accept(cause);
}
Expand All @@ -262,7 +307,8 @@ private class JdkGetBuilder extends JdkBuilder<GetBuilder> implements A2AHttpCli
private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException {
HttpRequest.Builder builder = super.createRequestBuilder().GET();
if (SSE) {
builder.header(ACCEPT, EVENT_STREAM);
builder.header(ACCEPT, EVENT_STREAM)
.version(sseVersion);
}
return builder;
}
Expand Down Expand Up @@ -327,7 +373,8 @@ private HttpRequest.Builder createRequestBuilder(boolean SSE) throws IOException
HttpRequest.Builder builder = super.createRequestBuilder()
.POST(HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8));
if (SSE) {
builder.header(ACCEPT, EVENT_STREAM);
builder.header(ACCEPT, EVENT_STREAM)
.version(sseVersion);
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,42 @@ public void testConstructorStripsSlashes() throws Exception {

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl with trailing slash, agentCardParth with leading slash
resolver = new A2ACardResolver(client, "http://example.com/", AGENT_CARD_PATH);
// baseUrl with trailing slash, agentCardPath with leading slash
resolver = new A2ACardResolver(client, "http://example.com/", null, AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl without trailing slash, agentCardPath with leading slash
resolver = new A2ACardResolver(client, "http://example.com", AGENT_CARD_PATH);
resolver = new A2ACardResolver(client, "http://example.com", null, AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl with trailing slash, agentCardPath without leading slash
resolver = new A2ACardResolver(client, "http://example.com/", AGENT_CARD_PATH.substring(1));
resolver = new A2ACardResolver(client, "http://example.com/", null, AGENT_CARD_PATH.substring(1));
card = resolver.getAgentCard();

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl without trailing slash, agentCardPath without leading slash
resolver = new A2ACardResolver(client, "http://example.com", AGENT_CARD_PATH.substring(1));
resolver = new A2ACardResolver(client, "http://example.com", null, AGENT_CARD_PATH.substring(1));
card = resolver.getAgentCard();

assertEquals("http://example.com" + AGENT_CARD_PATH, client.url);

// baseUrl with sub-path and trailing slash — the original URI.resolve() bug silently
// dropped the sub-path, producing http://example.com/.well-known/agent-card.json instead
resolver = new A2ACardResolver(client, "http://example.com/jsonrpc/", null, AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url);

// baseUrl with sub-path, no trailing slash
resolver = new A2ACardResolver(client, "http://example.com/jsonrpc", null, AGENT_CARD_PATH);
card = resolver.getAgentCard();

assertEquals("http://example.com/jsonrpc" + AGENT_CARD_PATH, client.url);
}


Expand Down
Loading
Loading