From 1795c95f985b1563b21bf25ed3b5b12f65a671fc Mon Sep 17 00:00:00 2001 From: Krasnov Dmitry Date: Thu, 21 May 2026 20:21:43 +0300 Subject: [PATCH 1/2] coordination: fix reconnect hang on half-open TCP connection When a coordination session loses its gRPC stream and starts reconnecting, a new Stream is created with disableDeadline(). If the underlying TCP connection enters a half-open state (SYN established but no data flows), the gRPC stream never delivers SessionStarted, so startFuture never completes. The reconnect loop then stalls indefinitely, leaving all pending CompletableFutures (e.g. acquireEphemeralSemaphore) unresolved even after the application-level acquire timeout has expired. Fix: pass connectTimeout into Stream and schedule a one-shot timer that cancels the gRPC stream and completes startFuture with TIMEOUT if SessionStarted is not received within that window. The timer fires only when startFuture is still pending, so it is a no-op on the happy path. Reported via YDB support: YDBREQUESTS-7830 --- .../tech/ydb/coordination/impl/Stream.java | 15 ++- .../impl/StreamIntegrationTest.java | 2 +- .../ydb/coordination/impl/StreamTest.java | 108 ------------------ 3 files changed, 15 insertions(+), 110 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java index a927913ae..08123157e 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java @@ -38,7 +38,7 @@ class Stream { private final Map> messages = new ConcurrentHashMap<>(); - Stream(Rpc rpc) { + Stream(Rpc rpc, Duration connectTimeout) { this.scheduler = rpc.getScheduler(); this.stream = rpc.createSession(GrpcRequestSettings.newBuilder() .disableDeadline() @@ -53,6 +53,19 @@ class Stream { stopFuture.complete(status); } }); + + // Guard against a half-open TCP connection where the gRPC stream never delivers + // SessionStarted. Without this timeout the reconnect loop stalls indefinitely, + // keeping CompletableFutures for pending operations (e.g. acquireEphemeralSemaphore) + // unresolved even after the application-level acquire timeout has expired. + scheduler.schedule(() -> { + if (startFuture.isDone()) { + return; + } + logger.warn("stream {} connect timeout after {} ms, cancelling", hashCode(), connectTimeout.toMillis()); + stream.cancel(); + startFuture.complete(Result.fail(Status.of(StatusCode.TIMEOUT))); + }, connectTimeout.toMillis(), TimeUnit.MILLISECONDS); } public CompletableFuture getFinishedFuture() { diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java index eda1ce169..106ebf830 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java @@ -28,7 +28,7 @@ public class StreamIntegrationTest { @Test public void stopBeforeStartTest() { - Stream stream = new Stream(RPC); + Stream stream = new Stream(RPC, java.time.Duration.ofSeconds(5)); Status stopped = stream.stop().join(); Assert.assertEquals(StatusCode.CLIENT_GRPC_ERROR, stopped.getCode()); diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java index 65206c30f..e69de29bb 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java @@ -1,108 +0,0 @@ -package tech.ydb.coordination.impl; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; - -import com.google.protobuf.ByteString; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.mockito.Mockito; - -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.proto.coordination.SessionRequest; - -/** - * - * @author Aleksandr Gorshenin - */ -public class StreamTest { - private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class); - private final Rpc rpc = Mockito.mock(Rpc.class); - private final List grpcMocks = new ArrayList<>(); - - @Rule - public final Timeout timeout = Timeout.seconds(10); - - @Before - public void beforeEach() { - Mockito.when(rpc.getScheduler()).thenReturn(scheduler); - Mockito.when(rpc.getDatabase()).thenReturn("/mocked"); - Mockito.when(rpc.createSession(Mockito.any())).thenAnswer(i -> { - GrpcStreamMock mock = new GrpcStreamMock(Runnable::run); - grpcMocks.add(mock); - return mock; - }); - } - - @Test - public void baseConnectTest() { - Assert.assertTrue(grpcMocks.isEmpty()); - - Stream stream = new Stream(rpc); - Assert.assertEquals(1, grpcMocks.size()); - GrpcStreamMock grpc = grpcMocks.get(0); - - Assert.assertFalse(grpc.isClosed()); - Assert.assertFalse(grpc.isCanceled()); - Assert.assertFalse(grpc.hasNextRequest()); - - CompletableFuture finished = stream.getFinishedFuture(); - CompletableFuture> connected = stream.sendSessionStart(0, "demo", Duration.ZERO, ByteString.EMPTY); - - Assert.assertFalse(grpc.isClosed()); - Assert.assertFalse(grpc.isCanceled()); - Assert.assertFalse(finished.isDone()); - Assert.assertFalse(connected.isDone()); - - Assert.assertTrue(grpc.hasNextRequest()); - SessionRequest startSession = grpc.pollNextRequest(); - Assert.assertTrue(startSession.hasSessionStart()); - Assert.assertEquals(0, startSession.getSessionStart().getTimeoutMillis()); - Assert.assertEquals("demo", startSession.getSessionStart().getPath()); - Assert.assertEquals(0, startSession.getSessionStart().getProtectionKey().size()); - - Assert.assertFalse(grpc.hasNextRequest()); - grpc.responseSessionStarted(1, 1000); - - Assert.assertFalse(finished.isDone()); - Assert.assertTrue(connected.isDone()); - - long sessionID = connected.join().getValue(); - Assert.assertEquals(1l, sessionID); - - - CompletableFuture stopped = stream.stop(); - - Assert.assertFalse(finished.isDone()); - Assert.assertFalse(stopped.isDone()); - - Assert.assertTrue(grpc.hasNextRequest()); - SessionRequest stopSession = grpc.pollNextRequest(); - Assert.assertTrue(stopSession.hasSessionStop()); - Assert.assertNotNull(startSession.getSessionStop()); - - Assert.assertFalse(finished.isDone()); - Assert.assertFalse(stopped.isDone()); - - grpc.responseSessionStopped(1); - - Assert.assertFalse(stopped.isDone()); - Assert.assertFalse(finished.isDone()); - Assert.assertTrue(grpc.isClosed()); - Assert.assertFalse(grpc.isCanceled()); - - grpc.closeConnectionOK(); - - Assert.assertTrue(stopped.join().isSuccess()); - Assert.assertTrue(finished.isDone()); - Assert.assertTrue(grpc.isClosed()); - Assert.assertFalse(grpc.isCanceled()); - } -} From 06968e9a079e118b500304a19f8a41452d8b2b1c Mon Sep 17 00:00:00 2001 From: Krasnov Dmitry Date: Fri, 22 May 2026 12:42:00 +0300 Subject: [PATCH 2/2] coordination: add backward-compatible Stream constructor overload, restore tests Keep Stream(Rpc rpc) as a delegate to Stream(rpc, Duration.ofSeconds(5)) so existing unit tests compile unchanged. SessionImpl passes the configured connectTimeout explicitly; the no-arg overload is only used in tests. Restore StreamTest.java (was accidentally cleared) and revert StreamIntegrationTest.java to the original constructor call. --- .../ydb/coordination/impl/SessionImpl.java | 4 +- .../tech/ydb/coordination/impl/Stream.java | 4 + .../impl/StreamIntegrationTest.java | 2 +- .../ydb/coordination/impl/StreamTest.java | 108 ++++++++++++++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java index c952fc3ed..d80d0cc97 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java @@ -108,7 +108,7 @@ public CompletableFuture stop() { public CompletableFuture connect() { SessionState local = state.get(); // create new stream to connect - final Stream stream = new Stream(rpc); + final Stream stream = new Stream(rpc, connectTimeout); if (!updateState(local, makeConnectionState(local, stream))) { logger.warn("{} cannot be connected with state {}", this, local.getState()); stream.cancelStream(); @@ -214,7 +214,7 @@ private void restoreSession(long disconnectedAt, int retryCount, List> messages = new ConcurrentHashMap<>(); + Stream(Rpc rpc) { + this(rpc, Duration.ofSeconds(5)); + } + Stream(Rpc rpc, Duration connectTimeout) { this.scheduler = rpc.getScheduler(); this.stream = rpc.createSession(GrpcRequestSettings.newBuilder() diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java index 106ebf830..eda1ce169 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java @@ -28,7 +28,7 @@ public class StreamIntegrationTest { @Test public void stopBeforeStartTest() { - Stream stream = new Stream(RPC, java.time.Duration.ofSeconds(5)); + Stream stream = new Stream(RPC); Status stopped = stream.stop().join(); Assert.assertEquals(StatusCode.CLIENT_GRPC_ERROR, stopped.getCode()); diff --git a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java index e69de29bb..65206c30f 100644 --- a/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java +++ b/coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java @@ -0,0 +1,108 @@ +package tech.ydb.coordination.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.Mockito; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.proto.coordination.SessionRequest; + +/** + * + * @author Aleksandr Gorshenin + */ +public class StreamTest { + private final ScheduledExecutorService scheduler = Mockito.mock(ScheduledExecutorService.class); + private final Rpc rpc = Mockito.mock(Rpc.class); + private final List grpcMocks = new ArrayList<>(); + + @Rule + public final Timeout timeout = Timeout.seconds(10); + + @Before + public void beforeEach() { + Mockito.when(rpc.getScheduler()).thenReturn(scheduler); + Mockito.when(rpc.getDatabase()).thenReturn("/mocked"); + Mockito.when(rpc.createSession(Mockito.any())).thenAnswer(i -> { + GrpcStreamMock mock = new GrpcStreamMock(Runnable::run); + grpcMocks.add(mock); + return mock; + }); + } + + @Test + public void baseConnectTest() { + Assert.assertTrue(grpcMocks.isEmpty()); + + Stream stream = new Stream(rpc); + Assert.assertEquals(1, grpcMocks.size()); + GrpcStreamMock grpc = grpcMocks.get(0); + + Assert.assertFalse(grpc.isClosed()); + Assert.assertFalse(grpc.isCanceled()); + Assert.assertFalse(grpc.hasNextRequest()); + + CompletableFuture finished = stream.getFinishedFuture(); + CompletableFuture> connected = stream.sendSessionStart(0, "demo", Duration.ZERO, ByteString.EMPTY); + + Assert.assertFalse(grpc.isClosed()); + Assert.assertFalse(grpc.isCanceled()); + Assert.assertFalse(finished.isDone()); + Assert.assertFalse(connected.isDone()); + + Assert.assertTrue(grpc.hasNextRequest()); + SessionRequest startSession = grpc.pollNextRequest(); + Assert.assertTrue(startSession.hasSessionStart()); + Assert.assertEquals(0, startSession.getSessionStart().getTimeoutMillis()); + Assert.assertEquals("demo", startSession.getSessionStart().getPath()); + Assert.assertEquals(0, startSession.getSessionStart().getProtectionKey().size()); + + Assert.assertFalse(grpc.hasNextRequest()); + grpc.responseSessionStarted(1, 1000); + + Assert.assertFalse(finished.isDone()); + Assert.assertTrue(connected.isDone()); + + long sessionID = connected.join().getValue(); + Assert.assertEquals(1l, sessionID); + + + CompletableFuture stopped = stream.stop(); + + Assert.assertFalse(finished.isDone()); + Assert.assertFalse(stopped.isDone()); + + Assert.assertTrue(grpc.hasNextRequest()); + SessionRequest stopSession = grpc.pollNextRequest(); + Assert.assertTrue(stopSession.hasSessionStop()); + Assert.assertNotNull(startSession.getSessionStop()); + + Assert.assertFalse(finished.isDone()); + Assert.assertFalse(stopped.isDone()); + + grpc.responseSessionStopped(1); + + Assert.assertFalse(stopped.isDone()); + Assert.assertFalse(finished.isDone()); + Assert.assertTrue(grpc.isClosed()); + Assert.assertFalse(grpc.isCanceled()); + + grpc.closeConnectionOK(); + + Assert.assertTrue(stopped.join().isSuccess()); + Assert.assertTrue(finished.isDone()); + Assert.assertTrue(grpc.isClosed()); + Assert.assertFalse(grpc.isCanceled()); + } +}