Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
Expand All @@ -63,6 +64,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Status;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private final XceiverClientMetrics metrics;
private final Semaphore semaphore;
private long timeout;
private final long streamReadTimeoutNanos;
private final SecurityConfig secConfig;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
Expand All @@ -121,6 +124,8 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
Objects.requireNonNull(config, "config == null");
setTimeout(config.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT,
OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
this.streamReadTimeoutNanos = config.getObject(OzoneClientConfig.class)
.getStreamReadTimeout().toNanos();
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
Expand Down Expand Up @@ -567,12 +572,33 @@ private XceiverClientReply sendCommandWithRetry(

@Override
public void streamRead(ContainerCommandRequestProto request,
StreamingReadResponse streamObserver) {
StreamingReadResponse streamObserver) throws IOException {
final ClientCallStreamObserver<ContainerCommandRequestProto> obs = streamObserver.getRequestObserver();

if (!obs.isReady()) {
LOG.debug("->{}: flow control stall (isReady=false) for block={} offset={} length={}. Waiting.",
streamObserver,
request.getReadBlock().getBlockID().getLocalID(),
request.getReadBlock().getOffset(),
request.getReadBlock().getLength());
final long deadlineNs = System.nanoTime() + streamReadTimeoutNanos;
while (!obs.isReady() && System.nanoTime() - deadlineNs < 0) {
LockSupport.parkNanos(10_000_000L);
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted while waiting for stream to become ready: " + streamObserver);
}
}
if (!obs.isReady()) {
throw new TimeoutIOException("Timed out waiting for stream to become ready: " + streamObserver);
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("->{}, send onNext request {}",
streamObserver, TextFormat.shortDebugString(request.getReadBlock()));
}
streamObserver.getRequestObserver().onNext(request);
obs.onNext(request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -367,6 +369,109 @@ public void testReadDoesNotDropQueuedItemsWhenFutureIsDoneOnSecondCall() throws
}
}

@Test
public void testReadGetsFreshResponseTimeoutAfterStreamReadWait() throws Exception {
OzoneClientConfig clientConfig = newStreamReadConfig();
clientConfig.setStreamReadTimeout(Duration.ofMillis(500));
BlockID blockID = new BlockID(1L, 12L);
Pipeline pipeline = mockStandalonePipeline();
ClientCallStreamObserver<ContainerCommandRequestProto> requestObserver =
mock(ClientCallStreamObserver.class);
StreamingReadResponse streamingReadResponse = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), requestObserver);

XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class);
AtomicReference<StreamingReaderSpi> readerRef = new AtomicReference<>();
AtomicReference<Thread> responseThreadRef = new AtomicReference<>();
doAnswer(inv -> {
StreamingReaderSpi reader = inv.getArgument(1);
reader.setStreamingReadResponse(streamingReadResponse);
readerRef.set(reader);
return null;
}).when(xceiverClient).initStreamRead(any(BlockID.class), any());
doAnswer(inv -> {
Thread.sleep(450);
Thread responseThread = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0));
});
responseThreadRef.set(responseThread);
responseThread.start();
return null;
}).when(xceiverClient).streamRead(any(), any());

XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class);
when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class)))
.thenReturn(xceiverClient);

try (StreamBlockInputStream sbis = new StreamBlockInputStream(
blockID, 1L, pipeline, null, xceiverClientFactory,
NO_REFRESH, clientConfig)) {
ByteBuffer buf = ByteBuffer.allocate(1);
assertEquals(1, sbis.read(buf));
responseThreadRef.get().join();
}
}

@Test
public void testReadWithoutNewRequestGetsFreshTimeoutBudget() throws Exception {
OzoneClientConfig clientConfig = newStreamReadConfig();
clientConfig.setStreamReadPreReadSize(10);
clientConfig.setStreamReadTimeout(Duration.ofMillis(500));
BlockID blockID = new BlockID(1L, 13L);
Pipeline pipeline = mockStandalonePipeline();
ClientCallStreamObserver<ContainerCommandRequestProto> requestObserver =
mock(ClientCallStreamObserver.class);
StreamingReadResponse streamingReadResponse = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), requestObserver);

AtomicReference<StreamingReaderSpi> readerRef = new AtomicReference<>();
AtomicInteger streamReads = new AtomicInteger();
XceiverClientGrpc xceiverClient = mock(XceiverClientGrpc.class);
doAnswer(inv -> {
StreamingReaderSpi reader = inv.getArgument(1);
reader.setStreamingReadResponse(streamingReadResponse);
readerRef.set(reader);
return null;
}).when(xceiverClient).initStreamRead(any(BlockID.class), any());
doAnswer(inv -> {
streamReads.incrementAndGet();
readerRef.get().onNext(buildResponseProto(new byte[] {1}, 0));
return null;
}).when(xceiverClient).streamRead(any(), any());

XceiverClientFactory xceiverClientFactory = mock(XceiverClientFactory.class);
when(xceiverClientFactory.acquireClientForReadData(any(Pipeline.class)))
.thenReturn(xceiverClient);

try (StreamBlockInputStream sbis = new StreamBlockInputStream(
blockID, 2L, pipeline, null, xceiverClientFactory,
NO_REFRESH, clientConfig)) {
ByteBuffer first = ByteBuffer.allocate(1);
assertEquals(1, sbis.read(first));
Thread.sleep(600);

Thread delayedResponse = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
readerRef.get().onNext(buildResponseProto(new byte[] {2}, 1));
});
delayedResponse.start();

ByteBuffer second = ByteBuffer.allocate(1);
assertEquals(1, sbis.readFully(second, false));
delayedResponse.join();
assertEquals(1, streamReads.get(), "second read should use data from the existing request");
}
}

private ReadBlockResponseProto buildReadBlockResponse(byte[] data) {
return ReadBlockResponseProto.newBuilder()
.setOffset(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t
throw new UnsupportedOperationException("Stream read is not supported");
}

public void streamRead(ContainerCommandRequestProto request, StreamingReadResponse streamObserver) {
public void streamRead(ContainerCommandRequestProto request,
StreamingReadResponse streamObserver) throws IOException {
throw new UnsupportedOperationException("Stream read is not supported");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
Expand All @@ -43,6 +44,8 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -294,6 +297,175 @@ private void invokeXceiverClientReadSmallFile(XceiverClientSpi client)
ContainerProtocolCalls.readSmallFile(client, bid, null);
}

/** streamRead() calls onNext() immediately when isReady() is true from the start. */
@Test
public void testStreamReadSendsImmediatelyWhenReady() throws Exception {
TrackingStreamObserver obs = new TrackingStreamObserver(0);
StreamingReadResponse response = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), obs);
ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest();

try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) {
client.streamRead(request, response);
}

assertEquals(1, obs.getSent().size(), "onNext must be called exactly once");
assertEquals(request, obs.getSent().get(0));
assertEquals(1, obs.getReadyCalls().get(),
"isReady() must be checked exactly once when stream is immediately ready");
}

/** streamRead() spin-waits until isReady() becomes true, then calls onNext(). */
@Test
public void testStreamReadWaitsUntilReadyThenSends() throws Exception {
TrackingStreamObserver obs = new TrackingStreamObserver(3);
StreamingReadResponse response = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), obs);
ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest();

try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)) {
client.streamRead(request, response);
}

assertEquals(1, obs.getSent().size(), "onNext must be called exactly once");
assertEquals(request, obs.getSent().get(0));
assertThat(obs.getReadyCalls().get()).isGreaterThanOrEqualTo(4);
}

/**
* streamRead() honours the stream read timeout and does not send while the stream is not ready.
*/
@Test
public void testStreamReadFailsAfterTimeoutIfNeverReady() throws Exception {
OzoneConfiguration timeoutConf = new OzoneConfiguration();
timeoutConf.set("ozone.client.stream.read.timeout", "1s");

TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE);
StreamingReadResponse response = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), obs);
ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest();

long start;
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, timeoutConf)) {
start = System.currentTimeMillis();
assertThrows(TimeoutIOException.class, () -> client.streamRead(request, response));
}
long elapsed = System.currentTimeMillis() - start;

assertEquals(0, obs.getSent().size(), "onNext must not be called while the stream is not ready");
assertThat(elapsed).isGreaterThanOrEqualTo(1000L);
assertThat(elapsed).isLessThan(10_000L);
}

/** streamRead() exits the spin-wait immediately on interrupt and restores the interrupt flag. */
@Test
public void testStreamReadRestoresInterruptFlagOnInterruption() throws Exception {
TrackingStreamObserver obs = new TrackingStreamObserver(Integer.MAX_VALUE);
StreamingReadResponse response = new StreamingReadResponse(
MockDatanodeDetails.randomDatanodeDetails(), obs);
ContainerProtos.ContainerCommandRequestProto request = buildReadBlockRequest();

OzoneConfiguration longTimeout = new OzoneConfiguration();
longTimeout.set("ozone.client.stream.read.timeout", "60s");

try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, longTimeout)) {
Thread self = Thread.currentThread();
new Thread(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException ignored) {
}
self.interrupt();
}).start();

long start = System.currentTimeMillis();
assertThrows(InterruptedIOException.class, () -> client.streamRead(request, response));
long elapsed = System.currentTimeMillis() - start;

assertThat(elapsed).isLessThan(5_000L);
assertThat(Thread.currentThread().isInterrupted()).isTrue();
assertEquals(0, obs.getSent().size());
} finally {
Thread.interrupted(); // clear for test cleanup
}
}

/** Records onNext() calls and controls when isReady() starts returning true. */
private static final class TrackingStreamObserver
extends ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> {

private final List<ContainerProtos.ContainerCommandRequestProto> sent = new ArrayList<>();
private final AtomicInteger readyCalls = new AtomicInteger();
private final int readyAfter;

TrackingStreamObserver(int readyAfter) {
this.readyAfter = readyAfter;
}

List<ContainerProtos.ContainerCommandRequestProto> getSent() {
return sent;
}

AtomicInteger getReadyCalls() {
return readyCalls;
}

@Override
public boolean isReady() {
return readyCalls.incrementAndGet() > readyAfter;
}

@Override
public void onNext(ContainerProtos.ContainerCommandRequestProto value) {
sent.add(value);
}

@Override
public void cancel(String msg, Throwable cause) {
}

@Override
public void setOnReadyHandler(Runnable r) {
}

@Override
public void disableAutoInboundFlowControl() {
}

@Override
public void request(int count) {
}

@Override
public void setMessageCompression(boolean enable) {
}

@Override
public void onError(Throwable t) {
}

@Override
public void onCompleted() {
}
}

private ContainerProtos.ContainerCommandRequestProto buildReadBlockRequest() {
return ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.ReadBlock)
.setContainerID(1L)
.setDatanodeUuid(dns.get(0).getUuidString())
.setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder()
.setBlockID(ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(1L)
.setLocalID(1L)
.setBlockCommitSequenceId(1L)
.build())
.setOffset(0L)
.setLength(1024L)
.build())
.build();
}

private XceiverClientReply buildValidResponse() {
ContainerProtos.ContainerCommandResponseProto resp =
ContainerProtos.ContainerCommandResponseProto.newBuilder()
Expand Down