diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 8b1a99a6b668..90e581e8f4be 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -37,6 +37,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; @@ -63,6 +64,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.Status; @@ -96,6 +98,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private final XceiverClientMetrics metrics; private final Semaphore semaphore; private long timeout; + private final long streamReadTimeoutNanos; private final SecurityConfig secConfig; private final boolean topologyAwareRead; private final ClientTrustManager trustManager; @@ -121,6 +124,8 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config, Objects.requireNonNull(config, "config == null"); setTimeout(config.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS)); + this.streamReadTimeoutNanos = config.getObject(OzoneClientConfig.class) + .getStreamReadTimeout().toNanos(); this.pipeline = pipeline; this.config = config; this.secConfig = new SecurityConfig(config); @@ -567,12 +572,33 @@ private XceiverClientReply sendCommandWithRetry( @Override public void streamRead(ContainerCommandRequestProto request, - StreamingReadResponse streamObserver) { + StreamingReadResponse streamObserver) throws IOException { + final ClientCallStreamObserver obs = streamObserver.getRequestObserver(); + + if (!obs.isReady()) { + LOG.debug("->{}: flow control stall (isReady=false) for block={} offset={} length={}. Waiting.", + streamObserver, + request.getReadBlock().getBlockID().getLocalID(), + request.getReadBlock().getOffset(), + request.getReadBlock().getLength()); + final long deadlineNs = System.nanoTime() + streamReadTimeoutNanos; + while (!obs.isReady() && System.nanoTime() - deadlineNs < 0) { + LockSupport.parkNanos(10_000_000L); + if (Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted while waiting for stream to become ready: " + streamObserver); + } + } + if (!obs.isReady()) { + throw new TimeoutIOException("Timed out waiting for stream to become ready: " + streamObserver); + } + } + if (LOG.isDebugEnabled()) { LOG.debug("->{}, send onNext request {}", streamObserver, TextFormat.shortDebugString(request.getReadBlock())); } - streamObserver.getRequestObserver().onNext(request); + obs.onNext(request); } @Override diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java index 6275273c9f54..4cdf13032b47 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java @@ -33,11 +33,13 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -367,6 +369,109 @@ public void testReadDoesNotDropQueuedItemsWhenFutureIsDoneOnSecondCall() throws } } + @Test + public void testReadGetsFreshResponseTimeoutAfterStreamReadWait() throws Exception { + OzoneClientConfig clientConfig = newStreamReadConfig(); + clientConfig.setStreamReadTimeout(Duration.ofMillis(500)); + BlockID blockID = new BlockID(1L, 12L); + Pipeline pipeline = mockStandalonePipeline(); + ClientCallStreamObserver requestObserver = + mock(ClientCallStreamObserver.class); + StreamingReadResponse streamingReadResponse = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + + XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class); + AtomicReference readerRef = new AtomicReference<>(); + AtomicReference responseThreadRef = new AtomicReference<>(); + doAnswer(inv -> { + StreamingReaderSpi reader = inv.getArgument(1); + reader.setStreamingReadResponse(streamingReadResponse); + readerRef.set(reader); + return null; + }).when(xceiverClient).initStreamRead(any(BlockID.class), any()); + doAnswer(inv -> { + Thread.sleep(450); + Thread responseThread = new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0)); + }); + responseThreadRef.set(responseThread); + responseThread.start(); + return null; + }).when(xceiverClient).streamRead(any(), any()); + + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class))) + .thenReturn(xceiverClient); + + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, 1L, pipeline, null, xceiverClientFactory, + NO_REFRESH, clientConfig)) { + ByteBuffer buf = ByteBuffer.allocate(1); + assertEquals(1, sbis.read(buf)); + responseThreadRef.get().join(); + } + } + + @Test + public void testReadWithoutNewRequestGetsFreshTimeoutBudget() throws Exception { + OzoneClientConfig clientConfig = newStreamReadConfig(); + clientConfig.setStreamReadPreReadSize(10); + clientConfig.setStreamReadTimeout(Duration.ofMillis(500)); + BlockID blockID = new BlockID(1L, 13L); + Pipeline pipeline = mockStandalonePipeline(); + ClientCallStreamObserver requestObserver = + mock(ClientCallStreamObserver.class); + StreamingReadResponse streamingReadResponse = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), requestObserver); + + AtomicReference readerRef = new AtomicReference<>(); + AtomicInteger streamReads = new AtomicInteger(); + XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class); + doAnswer(inv -> { + StreamingReaderSpi reader = inv.getArgument(1); + reader.setStreamingReadResponse(streamingReadResponse); + readerRef.set(reader); + return null; + }).when(xceiverClient).initStreamRead(any(BlockID.class), any()); + doAnswer(inv -> { + streamReads.incrementAndGet(); + readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0)); + return null; + }).when(xceiverClient).streamRead(any(), any()); + + XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class); + when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class))) + .thenReturn(xceiverClient); + + try (StreamBlockInputStream sbis = new StreamBlockInputStream( + blockID, 2L, pipeline, null, xceiverClientFactory, + NO_REFRESH, clientConfig)) { + ByteBuffer first = ByteBuffer.allocate(1); + assertEquals(1, sbis.read(first)); + Thread.sleep(600); + + Thread delayedResponse = new Thread(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + readerRef.get().onNext(buildResponseProto(new byte[] {2}, 1)); + }); + delayedResponse.start(); + + ByteBuffer second = ByteBuffer.allocate(1); + assertEquals(1, sbis.readFully(second, false)); + delayedResponse.join(); + assertEquals(1, streamReads.get(), "second read should use data from the existing request"); + } + } + private ReadBlockResponseProto buildReadBlockResponse(byte[] data) { return ReadBlockResponseProto.newBuilder() .setOffset(0) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 54be3c5686a0..98598fb1a7c6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -149,7 +149,8 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t throw new UnsupportedOperationException("Stream read is not supported"); } - public void streamRead(ContainerCommandRequestProto request, StreamingReadResponse streamObserver) { + public void streamRead(ContainerCommandRequestProto request, + StreamingReadResponse streamObserver) throws IOException { throw new UnsupportedOperationException("Stream read is not supported"); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 0f3af071fc54..ca346a6bc986 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -43,6 +44,8 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.protocol.exceptions.TimeoutIOException; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -294,6 +297,175 @@ private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) ContainerProtocolCalls.readSmallFile(client, bid, null); } + /** streamRead() calls onNext() immediately when isReady() is true from the start. */ + @Test + public void testStreamReadSendsImmediatelyWhenReady() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(0); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) { + client.streamRead(request, response); + } + + assertEquals(1, obs.getSent().size(), "onNext must be called exactly once"); + assertEquals(request, obs.getSent().get(0)); + assertEquals(1, obs.getReadyCalls().get(), + "isReady() must be checked exactly once when stream is immediately ready"); + } + + /** streamRead() spin-waits until isReady() becomes true, then calls onNext(). */ + @Test + public void testStreamReadWaitsUntilReadyThenSends() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(3); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) { + client.streamRead(request, response); + } + + assertEquals(1, obs.getSent().size(), "onNext must be called exactly once"); + assertEquals(request, obs.getSent().get(0)); + assertThat(obs.getReadyCalls().get()).isGreaterThanOrEqualTo(4); + } + + /** + * streamRead() honours the stream read timeout and does not send while the stream is not ready. + */ + @Test + public void testStreamReadFailsAfterTimeoutIfNeverReady() throws Exception { + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + timeoutConf.set("ozone.client.stream.read.timeout", "1s"); + + TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + long start; + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, timeoutConf)) { + start = System.currentTimeMillis(); + assertThrows(TimeoutIOException.class, () -> client.streamRead(request, response)); + } + long elapsed = System.currentTimeMillis() - start; + + assertEquals(0, obs.getSent().size(), "onNext must not be called while the stream is not ready"); + assertThat(elapsed).isGreaterThanOrEqualTo(1000L); + assertThat(elapsed).isLessThan(10_000L); + } + + /** streamRead() exits the spin-wait immediately on interrupt and restores the interrupt flag. */ + @Test + public void testStreamReadRestoresInterruptFlagOnInterruption() throws Exception { + TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE); + StreamingReadResponse response = new StreamingReadResponse( + MockDatanodeDetails.randomDatanodeDetails(), obs); + ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest(); + + OzoneConfiguration longTimeout = new OzoneConfiguration(); + longTimeout.set("ozone.client.stream.read.timeout", "60s"); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longTimeout)) { + Thread self = Thread.currentThread(); + new Thread(() -> { + try { + Thread.sleep(50); + } catch (InterruptedException ignored) { + } + self.interrupt(); + }).start(); + + long start = System.currentTimeMillis(); + assertThrows(InterruptedIOException.class, () -> client.streamRead(request, response)); + long elapsed = System.currentTimeMillis() - start; + + assertThat(elapsed).isLessThan(5_000L); + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + assertEquals(0, obs.getSent().size()); + } finally { + Thread.interrupted(); // clear for test cleanup + } + } + + /** Records onNext() calls and controls when isReady() starts returning true. */ + private static final class TrackingStreamObserver + extends ClientCallStreamObserver { + + private final List sent = new ArrayList<>(); + private final AtomicInteger readyCalls = new AtomicInteger(); + private final int readyAfter; + + TrackingStreamObserver(int readyAfter) { + this.readyAfter = readyAfter; + } + + List getSent() { + return sent; + } + + AtomicInteger getReadyCalls() { + return readyCalls; + } + + @Override + public boolean isReady() { + return readyCalls.incrementAndGet() > readyAfter; + } + + @Override + public void onNext(ContainerProtos.ContainerCommandRequestProto value) { + sent.add(value); + } + + @Override + public void cancel(String msg, Throwable cause) { + } + + @Override + public void setOnReadyHandler(Runnable r) { + } + + @Override + public void disableAutoInboundFlowControl() { + } + + @Override + public void request(int count) { + } + + @Override + public void setMessageCompression(boolean enable) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } + + private ContainerProtos.ContainerCommandRequestProto buildReadBlockRequest() { + return ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadBlock) + .setContainerID(1L) + .setDatanodeUuid(dns.get(0).getUuidString()) + .setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder() + .setBlockID(ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(1L) + .setLocalID(1L) + .setBlockCommitSequenceId(1L) + .build()) + .setOffset(0L) + .setLength(1024L) + .build()) + .build(); + } + private XceiverClientReply buildValidResponse() { ContainerProtos.ContainerCommandResponseProto resp = ContainerProtos.ContainerCommandResponseProto.newBuilder()