fix: fix REST client URL construction and SSE stability with HTTP/1.1#901
fix: fix REST client URL construction and SSE stability with HTTP/1.1#901ehsavoie wants to merge 1 commit into
Conversation
ehsavoie
commented
May 29, 2026
- Use string concatenation instead of URI.resolve() for agent card path resolution.
- Make HTTP version configurable in JdkA2AHttpClient, defaulting to HTTP/2 for requests and HTTP/1.1 for SSE
- Handle HTTP/1.1 cancellation exceptions gracefully in SSE stream shutdown.
- Use string concatenation instead of URI.resolve() for agent card path resolution. - Make HTTP version configurable in JdkA2AHttpClient, defaulting to HTTP/2 for requests and HTTP/1.1 for SSE - Handle HTTP/1.1 cancellation exceptions gracefully in SSE stream shutdown. Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
There was a problem hiding this comment.
Code Review
This pull request configures HTTP/1.1 for SSE streaming in JdkA2AHttpClient, handles cancellation cleanly, and fixes a URL resolution bug in A2ACardResolver by replacing URI.resolve() with a custom buildCardUrl utility. Feedback on these changes suggests catching RuntimeException rather than just UncheckedIOException when quietly cancelling futures, validating that base URLs are absolute to avoid runtime failures, tracking visited exceptions in isCancellation to prevent infinite loops from cyclic references, and ignoring post-completion errors in the HTTP client.
| private static void cancelQuietly(@Nullable Future<Void> future) { | ||
| if (future != null) { | ||
| try { | ||
| future.cancel(true); | ||
| } catch (UncheckedIOException e) { | ||
| log.fine("Exception during SSE connection close: " + e.getMessage()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
To ensure that the future cancellation is truly quiet, it is safer to catch RuntimeException instead of only UncheckedIOException. Some custom Future implementations or downstream cancellation handlers might throw other runtime exceptions (such as IllegalStateException or UnsupportedOperationException), which would otherwise propagate and disrupt the stream processing.
| private static void cancelQuietly(@Nullable Future<Void> future) { | |
| if (future != null) { | |
| try { | |
| future.cancel(true); | |
| } catch (UncheckedIOException e) { | |
| log.fine("Exception during SSE connection close: " + e.getMessage()); | |
| } | |
| } | |
| } | |
| private static void cancelQuietly(@Nullable Future<Void> future) { | |
| if (future != null) { | |
| try { | |
| future.cancel(true); | |
| } catch (RuntimeException e) { | |
| log.fine("Exception during SSE connection close: " + e.getMessage()); | |
| } | |
| } | |
| } |
| static boolean isCancellation(Throwable t) { | ||
| while (t != null) { | ||
| if (t instanceof CancellationException) { | ||
| return true; | ||
| } | ||
| // HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this | ||
| // hardcoded English message (not locale-sensitive — no resource bundle is used) | ||
| if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) { | ||
| return true; | ||
| } | ||
| t = t.getCause(); | ||
| } | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Traversing exception causes using a simple while (t != null) loop can lead to an infinite loop and thread hang if there is a cyclic reference in the exception chain (e.g., if an exception has itself or a parent exception as its cause). Using a set of visited throwables guards against this scenario.
| static boolean isCancellation(Throwable t) { | |
| while (t != null) { | |
| if (t instanceof CancellationException) { | |
| return true; | |
| } | |
| // HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this | |
| // hardcoded English message (not locale-sensitive — no resource bundle is used) | |
| if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) { | |
| return true; | |
| } | |
| t = t.getCause(); | |
| } | |
| return false; | |
| } | |
| static boolean isCancellation(Throwable t) { | |
| java.util.Set<Throwable> visited = java.util.Collections.newSetFromMap(new java.util.IdentityHashMap<>()); | |
| while (t != null && visited.add(t)) { | |
| if (t instanceof CancellationException) { | |
| return true; | |
| } | |
| // HTTP/1.1: Http1Exchange.cancel() throws IOException synchronously with this | |
| // hardcoded English message (not locale-sensitive — no resource bundle is used) | |
| if (t instanceof IOException && "Request cancelled".equals(t.getMessage())) { | |
| return true; | |
| } | |
| t = t.getCause(); | |
| } | |
| return false; | |
| } |
| .<Void>handle((response, throwable) -> { | ||
| if (throwable != null && errorNotified.compareAndSet(false, true)) { | ||
| if (throwable != null && !isCancellation(throwable) | ||
| && errorNotified.compareAndSet(false, true)) { |
There was a problem hiding this comment.
If the stream has already completed cleanly (i.e., completeNotified is true), any subsequent error received in the handle block (such as a connection teardown or socket close exception) should be ignored. Otherwise, it might trigger the errorConsumer after a successful completion, leading to unexpected behavior.
| .<Void>handle((response, throwable) -> { | |
| if (throwable != null && errorNotified.compareAndSet(false, true)) { | |
| if (throwable != null && !isCancellation(throwable) | |
| && errorNotified.compareAndSet(false, true)) { | |
| .<Void>handle((response, throwable) -> { | |
| if (throwable != null && !isCancellation(throwable) | |
| && !completeNotified.get() | |
| && errorNotified.compareAndSet(false, true)) { |