From 24a3ed7752ed923004a8db4ac6999a03357721e6 Mon Sep 17 00:00:00 2001 From: goutamadwant Date: Fri, 15 May 2026 23:02:16 -0700 Subject: [PATCH] Fix streaming timeout exception decoration --- .../bugfix-NettyNIOHTTPClient-0bec94c.json | 6 ++++ .../nio/netty/internal/ResponseHandler.java | 7 ++-- .../netty/internal/PublisherAdapterTest.java | 36 ++++++++++++++++--- 3 files changed, 42 insertions(+), 7 deletions(-) create mode 100644 .changes/next-release/bugfix-NettyNIOHTTPClient-0bec94c.json diff --git a/.changes/next-release/bugfix-NettyNIOHTTPClient-0bec94c.json b/.changes/next-release/bugfix-NettyNIOHTTPClient-0bec94c.json new file mode 100644 index 000000000000..6c2ad6e04481 --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOHTTPClient-0bec94c.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Netty NIO HTTP Client", + "description": "Decorate streaming response publisher failures so S3AsyncClient getObject can retry Netty read timeouts.", + "contributor": "adwantg" +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java index a7e9a42a8cc4..f939790b3499 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java @@ -342,11 +342,12 @@ public void onError(Throwable t) { if (!isDone.compareAndSet(false, true)) { return; } + Throwable throwable = NettyUtils.decorateException(channelContext.channel(), t); try { runAndLogError(channelContext.channel(), () -> String.format("Subscriber %s threw an exception in onError.", subscriber), - () -> subscriber.onError(t)); - notifyError(t); + () -> subscriber.onError(throwable)); + notifyError(throwable); } finally { runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool", () -> closeAndRelease(channelContext)); @@ -513,4 +514,4 @@ public void onComplete() { }); } } -} \ No newline at end of file +} diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java index 4098aec8eac2..73642ce80e8e 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java @@ -37,13 +37,16 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.timeout.ReadTimeoutException; import io.reactivex.Flowable; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.reactivestreams.Publisher; @@ -120,8 +123,6 @@ public void successfulStreaming_shouldNotInvokeChannelRead() { HttpResponseStatus.ACCEPTED, testPublisher); - - ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedHttpResponse, ctx, requestContext, @@ -148,8 +149,6 @@ public void errorOccurred_shouldInvokeResponseHandler() { HttpResponseStatus.ACCEPTED, testPublisher); - - ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedHttpResponse, ctx, requestContext, @@ -167,6 +166,33 @@ public void errorOccurred_shouldInvokeResponseHandler() { verify(responseHandler).onError(exception); } + @Test + public void streamingReadTimeout_shouldDecorateExceptionBeforeNotifyingHandlers() { + Flowable testPublisher = Flowable.error(ReadTimeoutException.INSTANCE); + + StreamedHttpResponse streamedHttpResponse = new DefaultStreamedHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.ACCEPTED, + testPublisher); + + + + ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedHttpResponse, + ctx, + requestContext, + executeFuture + ); + TestSubscriber subscriber = new TestSubscriber(); + + publisherAdapter.subscribe(subscriber); + + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(responseHandler).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue()).isInstanceOf(IOException.class) + .hasCauseInstanceOf(ReadTimeoutException.class); + assertThat(subscriber.error).isSameAs(errorCaptor.getValue()); + assertThat(executeFuture).isCompletedExceptionally(); + } + @Test public void subscriptionCancelled_upstreamPublisherCallsOnNext_httpContentReleased() { HttpContent firstContent = mock(HttpContent.class); @@ -290,6 +316,7 @@ static final class TestSubscriber implements Subscriber { private Subscription subscription; private boolean isCompleted = false; private boolean errorOccurred = false; + private Throwable error; @Override public void onSubscribe(Subscription s) { @@ -305,6 +332,7 @@ public void onNext(ByteBuffer byteBuffer) { @Override public void onError(Throwable t) { errorOccurred = true; + error = t; } @Override