From 167e2cc43ab78dd07517080ba0fa355faac7b5a6 Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Fri, 12 Jun 2026 08:04:20 -0700 Subject: [PATCH 1/3] HDDS-15551. Stream reads should respect gRPC flow control backpressure --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 32 +++- .../scm/storage/StreamBlockInputStream.java | 21 ++- .../storage/TestStreamBlockInputStream.java | 105 +++++++++++ .../hdds/scm/StreamingReadResponse.java | 15 ++ .../hadoop/hdds/scm/XceiverClientSpi.java | 3 +- .../hdds/scm/TestXceiverClientGrpc.java | 175 ++++++++++++++++++ 6 files changed, 345 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 8b1a99a6b668..bcd1e8e576fc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -37,6 +37,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; @@ -63,6 +64,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.Status; @@ -567,12 +569,38 @@ private XceiverClientReply sendCommandWithRetry( @Override public void streamRead(ContainerCommandRequestProto request, - StreamingReadResponse streamObserver) { + StreamingReadResponse streamObserver) throws IOException { + final ClientCallStreamObserver obs = streamObserver.getRequestObserver(); + + if (!obs.isReady()) { + LOG.debug("->{}: flow control stall (isReady=false) for block={} offset={} length={}. Waiting.", + streamObserver, + request.getReadBlock().getBlockID().getLocalID(), + request.getReadBlock().getOffset(), + request.getReadBlock().getLength()); + final long now = System.nanoTime(); + final long callerDeadlineNs = streamObserver.getReadDeadlineNs(); + final long waitTimeoutNanos = callerDeadlineNs > 0 ? Math.max(0, callerDeadlineNs - now) + : TimeUnit.SECONDS.toNanos(timeout); + final long deadlineNs = callerDeadlineNs > 0 ? callerDeadlineNs : now + waitTimeoutNanos; + while (!obs.isReady() && System.nanoTime() < deadlineNs) { + LockSupport.parkNanos(10_000_000L); + if (Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted while waiting for stream to become ready: " + streamObserver); + } + } + if (!obs.isReady()) { + throw new TimeoutIOException("Timed out waiting for stream to become ready after " + + TimeUnit.NANOSECONDS.toMillis(waitTimeoutNanos) + "ms"); + } + } + if (LOG.isDebugEnabled()) { LOG.debug("->{}, send onNext request {}", streamObserver, TextFormat.shortDebugString(request.getReadBlock())); } - streamObserver.getRequestObserver().onNext(request); + obs.onNext(request); } @Override diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index dd3928f91378..48e22091caf6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -322,8 +322,10 @@ synchronized void readBlockImpl(long length) throws IOException { if (r == null) { throw new IOException("Uninitialized StreamingReadResponse: " + blockID); } + r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); xceiverClient.streamRead(ContainerProtocolCalls.buildReadBlockCommandProto( blockID, requestedLength, length, responseDataSize, tokenRef.get(), pipelineRef.get()), r); + r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); } private void handleExceptions(IOException cause) throws IOException { @@ -406,7 +408,7 @@ void checkError() throws IOException { } ReadBlockResponseProto poll() throws IOException { - final long startTime = System.nanoTime(); + final long deadlineNs = getReadDeadlineNs(); final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000); while (true) { @@ -430,8 +432,7 @@ ReadBlockResponseProto poll() throws IOException { return null; // Stream ended, queue is empty } - final long elapsedNanos = System.nanoTime() - startTime; - if (elapsedNanos >= readTimeoutNanos) { + if (System.nanoTime() >= deadlineNs) { setFailedAndThrow(new TimeoutIOException( "Timed out waiting for response after " + readTimeout)); return null; @@ -439,6 +440,19 @@ ReadBlockResponseProto poll() throws IOException { } } + private long getReadDeadlineNs() { + final StreamingReadResponse r = getResponse(); + final long deadlineNs = r != null ? r.getReadDeadlineNs() : 0; + return deadlineNs > 0 ? deadlineNs : System.nanoTime() + readTimeoutNanos; + } + + private void refreshReadDeadline() { + final StreamingReadResponse r = getResponse(); + if (r != null) { + r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); + } + } + private ByteBuffer read(int length, boolean preRead) throws IOException { checkError(); if (future.isDone()) { @@ -447,6 +461,7 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { return responseQueue.isEmpty() ? null : readFromQueue(); } + refreshReadDeadline(); readBlock(length, preRead); while (true) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java index 6275273c9f54..4cdf13032b47 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -33,11 +33,13 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -367,6 +369,109 @@ public void testReadDoesNotDropQueuedItemsWhenFutureIsDoneOnSecondCall() throws } } + @Test + public void testReadGetsFreshResponseTimeoutAfterStreamReadWait() throws Exception { + OzoneClientConfig clientConfig = newStreamReadConfig(); + clientConfig.setStreamReadTimeout(Duration.ofMillis(500)); + BlockID blockID = new BlockID(1L, 12L); + Pipeline pipeline = mockStandalonePipeline(); + ClientCallStreamObserver requestObserver = + mock(ClientCallStreamObserver.class); + StreamingReadResponse streamingReadResponse = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + + XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class); + AtomicReference readerRef = new AtomicReference<>(); + AtomicReference responseThreadRef = new AtomicReference<>(); + doAnswer(inv -> { + StreamingReaderSpi reader = inv.getArgument(1); + reader.setStreamingReadResponse(streamingReadResponse); + readerRef.set(reader); + return null; + }).when(xceiverClient).initStreamRead(any(BlockID.class), any()); + doAnswer(inv -> { + Thread.sleep(450); + Thread responseThread = new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0)); + }); + responseThreadRef.set(responseThread); + responseThread.start(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class))) + .thenReturn(xceiverClient); + + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, 1L, pipeline, null, xceiverClientFactory, + NO_REFRESH, clientConfig)) { + ByteBuffer buf = ByteBuffer.allocate(1); + assertEquals(1, sbis.read(buf)); + responseThreadRef.get().join(); + } + } + + @Test + public void testReadWithoutNewRequestGetsFreshTimeoutBudget() throws Exception { + OzoneClientConfig clientConfig = newStreamReadConfig(); + clientConfig.setStreamReadPreReadSize(10); + clientConfig.setStreamReadTimeout(Duration.ofMillis(500)); + BlockID blockID = new BlockID(1L, 13L); + Pipeline pipeline = mockStandalonePipeline(); + ClientCallStreamObserver requestObserver = + mock(ClientCallStreamObserver.class); + StreamingReadResponse streamingReadResponse = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + + AtomicReference readerRef = new AtomicReference<>(); + AtomicInteger streamReads = new AtomicInteger(); + XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class); + doAnswer(inv -> { + StreamingReaderSpi reader = inv.getArgument(1); + reader.setStreamingReadResponse(streamingReadResponse); + readerRef.set(reader); + return null; + }).when(xceiverClient).initStreamRead(any(BlockID.class), any()); + doAnswer(inv -> { + streamReads.incrementAndGet(); + readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0)); + return null; + }).when(xceiverClient).streamRead(any(), any()); + + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class))) + .thenReturn(xceiverClient); + + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, 2L, pipeline, null, xceiverClientFactory, + NO_REFRESH, clientConfig)) { + ByteBuffer first = ByteBuffer.allocate(1); + assertEquals(1, sbis.read(first)); + Thread.sleep(600); + + Thread delayedResponse = new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + readerRef.get().onNext(buildResponseProto(new byte[] {2}, 1)); + }); + delayedResponse.start(); + + ByteBuffer second = ByteBuffer.allocate(1); + assertEquals(1, sbis.readFully(second, false)); + delayedResponse.join(); + assertEquals(1, streamReads.get(), "second read should use data from the existing request"); + } + } + private ReadBlockResponseProto buildReadBlockResponse(byte[] data) { return ReadBlockResponseProto.newBuilder() .setOffset(0) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java index 3018fda7ea61..4813fd307a1f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java @@ -31,6 +31,13 @@ public class StreamingReadResponse { private final ClientCallStreamObserver requestObserver; private final String name; + /** + * Deadline (System.nanoTime) for the current wait. Set before streamRead() to bound the + * isReady() flow-control wait, then refreshed afterwards to bound the response wait in poll(). + * Zero falls back to the client-level read timeout. + */ + private volatile long readDeadlineNs; + public StreamingReadResponse(DatanodeDetails dn, ClientCallStreamObserver requestObserver) { this.dn = dn; @@ -40,6 +47,14 @@ public StreamingReadResponse(DatanodeDetails dn, this.name = "dn" + s.substring(s.lastIndexOf('-')) + "_stream"; } + public void setReadDeadlineNs(long deadlineNs) { + this.readDeadlineNs = deadlineNs; + } + + public long getReadDeadlineNs() { + return readDeadlineNs; + } + public DatanodeDetails getDatanodeDetails() { return dn; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 54be3c5686a0..98598fb1a7c6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -149,7 +149,8 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t throw new UnsupportedOperationException("Stream read is not supported"); } - public void streamRead(ContainerCommandRequestProto request, StreamingReadResponse streamObserver) { + public void streamRead(ContainerCommandRequestProto request, + StreamingReadResponse streamObserver) throws IOException { throw new UnsupportedOperationException("Stream read is not supported"); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 0f3af071fc54..d8dd3488b68d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -43,6 +44,8 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.protocol.exceptions.TimeoutIOException; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -294,6 +297,178 @@ private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) ContainerProtocolCalls.readSmallFile(client, bid, null); } + /** streamRead() calls onNext() immediately when isReady() is true from the start. */ + @Test + public void testStreamReadSendsImmediatelyWhenReady() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(0); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) { + client.streamRead(request, response); + } + + assertEquals(1, obs.getSent().size(), "onNext must be called exactly once"); + assertEquals(request, obs.getSent().get(0)); + assertEquals(1, obs.getReadyCalls().get(), + "isReady() must be checked exactly once when stream is immediately ready"); + } + + /** streamRead() spin-waits until isReady() becomes true, then calls onNext(). */ + @Test + public void testStreamReadWaitsUntilReadyThenSends() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(3); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) { + client.streamRead(request, response); + } + + assertEquals(1, obs.getSent().size(), "onNext must be called exactly once"); + assertEquals(request, obs.getSent().get(0)); + assertThat(obs.getReadyCalls().get()).isGreaterThanOrEqualTo(4); + } + + /** + * streamRead() honours the caller-supplied deadline rather than the client read timeout + * and does not send while the stream is not ready. + */ + @Test + public void testStreamReadFailsAfterTimeoutIfNeverReady() throws Exception { + OzoneConfiguration longClientTimeout = new OzoneConfiguration(); + longClientTimeout.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "60s"); + + TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + long start; + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longClientTimeout)) { + response.setReadDeadlineNs(System.nanoTime() + + java.util.concurrent.TimeUnit.SECONDS.toNanos(1)); + start = System.currentTimeMillis(); + assertThrows(TimeoutIOException.class, () -> client.streamRead(request, response)); + } + long elapsed = System.currentTimeMillis() - start; + + assertEquals(0, obs.getSent().size(), "onNext must not be called while the stream is not ready"); + assertThat(elapsed).isGreaterThanOrEqualTo(1000L); + assertThat(elapsed).isLessThan(10_000L); + } + + /** streamRead() exits the spin-wait immediately on interrupt and restores the interrupt flag. */ + @Test + public void testStreamReadRestoresInterruptFlagOnInterruption() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + OzoneConfiguration longTimeout = new OzoneConfiguration(); + longTimeout.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "60s"); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longTimeout)) { + Thread self = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(50); + } catch (InterruptedException ignored) { + } + self.interrupt(); + }).start(); + + long start = System.currentTimeMillis(); + assertThrows(InterruptedIOException.class, () -> client.streamRead(request, response)); + long elapsed = System.currentTimeMillis() - start; + + assertThat(elapsed).isLessThan(5_000L); + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + assertEquals(0, obs.getSent().size()); + } finally { + Thread.interrupted(); // clear for test cleanup + } + } + + /** Records onNext() calls and controls when isReady() starts returning true. */ + private static final class TrackingStreamObserver + extends ClientCallStreamObserver { + + private final List sent = new ArrayList<>(); + private final AtomicInteger readyCalls = new AtomicInteger(); + private final int readyAfter; + + TrackingStreamObserver(int readyAfter) { + this.readyAfter = readyAfter; + } + + List getSent() { + return sent; + } + + AtomicInteger getReadyCalls() { + return readyCalls; + } + + @Override + public boolean isReady() { + return readyCalls.incrementAndGet() > readyAfter; + } + + @Override + public void onNext(ContainerProtos.ContainerCommandRequestProto value) { + sent.add(value); + } + + @Override + public void cancel(String msg, Throwable cause) { + } + + @Override + public void setOnReadyHandler(Runnable r) { + } + + @Override + public void disableAutoInboundFlowControl() { + } + + @Override + public void request(int count) { + } + + @Override + public void setMessageCompression(boolean enable) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } + + private ContainerProtos.ContainerCommandRequestProto buildReadBlockRequest() { + return ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadBlock) + .setContainerID(1L) + .setDatanodeUuid(dns.get(0).getUuidString()) + .setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder() + .setBlockID(ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(1L) + .setLocalID(1L) + .setBlockCommitSequenceId(1L) + .build()) + .setOffset(0L) + .setLength(1024L) + .build()) + .build(); + } + private XceiverClientReply buildValidResponse() { ContainerProtos.ContainerCommandResponseProto resp = ContainerProtos.ContainerCommandResponseProto.newBuilder() From 517ec22076d6fe56411ae938964c171020ea312f Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Wed, 24 Jun 2026 13:13:55 -0700 Subject: [PATCH 2/3] Address the comments. Make stream read deadline overflow-safe and simplify wait Co-Authored-By: Claude Opus 4.8 --- .../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 12 ++++-------- .../hdds/scm/storage/StreamBlockInputStream.java | 6 +++--- .../hadoop/hdds/scm/StreamingReadResponse.java | 13 ++++++++++--- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index bcd1e8e576fc..5393276e9fcb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -578,12 +578,9 @@ public void streamRead(ContainerCommandRequestProto request, request.getReadBlock().getBlockID().getLocalID(), request.getReadBlock().getOffset(), request.getReadBlock().getLength()); - final long now = System.nanoTime(); - final long callerDeadlineNs = streamObserver.getReadDeadlineNs(); - final long waitTimeoutNanos = callerDeadlineNs > 0 ? Math.max(0, callerDeadlineNs - now) - : TimeUnit.SECONDS.toNanos(timeout); - final long deadlineNs = callerDeadlineNs > 0 ? callerDeadlineNs : now + waitTimeoutNanos; - while (!obs.isReady() && System.nanoTime() < deadlineNs) { + final long deadlineNs = streamObserver.hasReadDeadline() ? streamObserver.getReadDeadlineNs() + : System.nanoTime() + TimeUnit.SECONDS.toNanos(timeout); + while (!obs.isReady() && System.nanoTime() - deadlineNs < 0) { LockSupport.parkNanos(10_000_000L); if (Thread.currentThread().isInterrupted()) { Thread.currentThread().interrupt(); @@ -591,8 +588,7 @@ public void streamRead(ContainerCommandRequestProto request, } } if (!obs.isReady()) { - throw new TimeoutIOException("Timed out waiting for stream to become ready after " - + TimeUnit.NANOSECONDS.toMillis(waitTimeoutNanos) + "ms"); + throw new TimeoutIOException("Timed out waiting for stream to become ready: " + streamObserver); } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 48e22091caf6..755b46022e12 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -432,7 +432,7 @@ ReadBlockResponseProto poll() throws IOException { return null; // Stream ended, queue is empty } - if (System.nanoTime() >= deadlineNs) { + if (System.nanoTime() - deadlineNs >= 0) { setFailedAndThrow(new TimeoutIOException( "Timed out waiting for response after " + readTimeout)); return null; @@ -442,8 +442,8 @@ ReadBlockResponseProto poll() throws IOException { private long getReadDeadlineNs() { final StreamingReadResponse r = getResponse(); - final long deadlineNs = r != null ? r.getReadDeadlineNs() : 0; - return deadlineNs > 0 ? deadlineNs : System.nanoTime() + readTimeoutNanos; + return r != null && r.hasReadDeadline() + ? r.getReadDeadlineNs() : System.nanoTime() + readTimeoutNanos; } private void refreshReadDeadline() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java index 4813fd307a1f..175f63026223 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java @@ -32,11 +32,13 @@ public class StreamingReadResponse { private final String name; /** - * Deadline (System.nanoTime) for the current wait. Set before streamRead() to bound the - * isReady() flow-control wait, then refreshed afterwards to bound the response wait in poll(). - * Zero falls back to the client-level read timeout. + * Deadline (a {@link System#nanoTime} value) bounding the current wait, used as a single read budget + * for the call: set before streamRead() to bound the isReady() flow-control wait, then refreshed + * afterwards to bound the response wait in poll(). {@link #readDeadlineSet} tracks whether it has been + * set, since nanoTime values may be zero or negative and cannot be used as a sentinel. */ private volatile long readDeadlineNs; + private volatile boolean readDeadlineSet; public StreamingReadResponse(DatanodeDetails dn, ClientCallStreamObserver requestObserver) { @@ -49,12 +51,17 @@ public StreamingReadResponse(DatanodeDetails dn, public void setReadDeadlineNs(long deadlineNs) { this.readDeadlineNs = deadlineNs; + this.readDeadlineSet = true; } public long getReadDeadlineNs() { return readDeadlineNs; } + public boolean hasReadDeadline() { + return readDeadlineSet; + } + public DatanodeDetails getDatanodeDetails() { return dn; } From 92df75b0e1ddf900294dc7b4535732eee39da2de Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Thu, 25 Jun 2026 18:19:41 -0700 Subject: [PATCH 3/3] Restore poll-local timeout tracking. Separate stream read timeout budgets for gRPC isReady() --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 6 +++-- .../scm/storage/StreamBlockInputStream.java | 21 +++--------------- .../hdds/scm/StreamingReadResponse.java | 22 ------------------- .../hdds/scm/TestXceiverClientGrpc.java | 13 +++++------ 4 files changed, 12 insertions(+), 50 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 5393276e9fcb..90e581e8f4be 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -98,6 +98,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private final XceiverClientMetrics metrics; private final Semaphore semaphore; private long timeout; + private final long streamReadTimeoutNanos; private final SecurityConfig secConfig; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; @@ -123,6 +124,8 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, Objects.requireNonNull(config, "config == null"); setTimeout(config.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS)); + this.streamReadTimeoutNanos = config.getObject(OzoneClientConfig.class) + .getStreamReadTimeout().toNanos(); this.pipeline = pipeline; this.config = config; this.secConfig = new SecurityConfig(config); @@ -578,8 +581,7 @@ public void streamRead(ContainerCommandRequestProto request, request.getReadBlock().getBlockID().getLocalID(), request.getReadBlock().getOffset(), request.getReadBlock().getLength()); - final long deadlineNs = streamObserver.hasReadDeadline() ? streamObserver.getReadDeadlineNs() - : System.nanoTime() + TimeUnit.SECONDS.toNanos(timeout); + final long deadlineNs = System.nanoTime() + streamReadTimeoutNanos; while (!obs.isReady() && System.nanoTime() - deadlineNs < 0) { LockSupport.parkNanos(10_000_000L); if (Thread.currentThread().isInterrupted()) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java index 755b46022e12..dd3928f91378 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java @@ -322,10 +322,8 @@ synchronized void readBlockImpl(long length) throws IOException { if (r == null) { throw new IOException("Uninitialized StreamingReadResponse: " + blockID); } - r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); xceiverClient.streamRead(ContainerProtocolCalls.buildReadBlockCommandProto( blockID, requestedLength, length, responseDataSize, tokenRef.get(), pipelineRef.get()), r); - r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); } private void handleExceptions(IOException cause) throws IOException { @@ -408,7 +406,7 @@ void checkError() throws IOException { } ReadBlockResponseProto poll() throws IOException { - final long deadlineNs = getReadDeadlineNs(); + final long startTime = System.nanoTime(); final long pollTimeoutNanos = Math.min(readTimeoutNanos / 10, 100_000_000); while (true) { @@ -432,7 +430,8 @@ ReadBlockResponseProto poll() throws IOException { return null; // Stream ended, queue is empty } - if (System.nanoTime() - deadlineNs >= 0) { + final long elapsedNanos = System.nanoTime() - startTime; + if (elapsedNanos >= readTimeoutNanos) { setFailedAndThrow(new TimeoutIOException( "Timed out waiting for response after " + readTimeout)); return null; @@ -440,19 +439,6 @@ ReadBlockResponseProto poll() throws IOException { } } - private long getReadDeadlineNs() { - final StreamingReadResponse r = getResponse(); - return r != null && r.hasReadDeadline() - ? r.getReadDeadlineNs() : System.nanoTime() + readTimeoutNanos; - } - - private void refreshReadDeadline() { - final StreamingReadResponse r = getResponse(); - if (r != null) { - r.setReadDeadlineNs(System.nanoTime() + readTimeoutNanos); - } - } - private ByteBuffer read(int length, boolean preRead) throws IOException { checkError(); if (future.isDone()) { @@ -461,7 +447,6 @@ private ByteBuffer read(int length, boolean preRead) throws IOException { return responseQueue.isEmpty() ? null : readFromQueue(); } - refreshReadDeadline(); readBlock(length, preRead); while (true) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java index 175f63026223..3018fda7ea61 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java @@ -31,15 +31,6 @@ public class StreamingReadResponse { private final ClientCallStreamObserver requestObserver; private final String name; - /** - * Deadline (a {@link System#nanoTime} value) bounding the current wait, used as a single read budget - * for the call: set before streamRead() to bound the isReady() flow-control wait, then refreshed - * afterwards to bound the response wait in poll(). {@link #readDeadlineSet} tracks whether it has been - * set, since nanoTime values may be zero or negative and cannot be used as a sentinel. - */ - private volatile long readDeadlineNs; - private volatile boolean readDeadlineSet; - public StreamingReadResponse(DatanodeDetails dn, ClientCallStreamObserver requestObserver) { this.dn = dn; @@ -49,19 +40,6 @@ public StreamingReadResponse(DatanodeDetails dn, this.name = "dn" + s.substring(s.lastIndexOf('-')) + "_stream"; } - public void setReadDeadlineNs(long deadlineNs) { - this.readDeadlineNs = deadlineNs; - this.readDeadlineSet = true; - } - - public long getReadDeadlineNs() { - return readDeadlineNs; - } - - public boolean hasReadDeadline() { - return readDeadlineSet; - } - public DatanodeDetails getDatanodeDetails() { return dn; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index d8dd3488b68d..ca346a6bc986 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -333,13 +333,12 @@ public void testStreamReadWaitsUntilReadyThenSends() throws Exception { } /** - * streamRead() honours the caller-supplied deadline rather than the client read timeout - * and does not send while the stream is not ready. + * streamRead() honours the stream read timeout and does not send while the stream is not ready. */ @Test public void testStreamReadFailsAfterTimeoutIfNeverReady() throws Exception { - OzoneConfiguration longClientTimeout = new OzoneConfiguration(); - longClientTimeout.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "60s"); + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + timeoutConf.set("ozone.client.stream.read.timeout", "1s"); TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE); StreamingReadResponse response = new StreamingReadResponse( @@ -347,9 +346,7 @@ public void testStreamReadFailsAfterTimeoutIfNeverReady() throws Exception { ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); long start; - try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longClientTimeout)) { - response.setReadDeadlineNs(System.nanoTime() - + java.util.concurrent.TimeUnit.SECONDS.toNanos(1)); + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, timeoutConf)) { start = System.currentTimeMillis(); assertThrows(TimeoutIOException.class, () -> client.streamRead(request, response)); } @@ -369,7 +366,7 @@ public void testStreamReadRestoresInterruptFlagOnInterruption() throws Exception ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); OzoneConfiguration longTimeout = new OzoneConfiguration(); - longTimeout.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "60s"); + longTimeout.set("ozone.client.stream.read.timeout", "60s"); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longTimeout)) { Thread self = Thread.currentThread();