diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java index c952fc3ed..d80d0cc97 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java @@ -108,7 +108,7 @@ public CompletableFuture stop() { public CompletableFuture connect() { SessionState local = state.get(); // create new stream to connect - final Stream stream = new Stream(rpc); + final Stream stream = new Stream(rpc, connectTimeout); if (!updateState(local, makeConnectionState(local, stream))) { logger.warn("{} cannot be connected with state {}", this, local.getState()); stream.cancelStream(); @@ -214,7 +214,7 @@ private void restoreSession(long disconnectedAt, int retryCount, List> messages = new ConcurrentHashMap<>(); Stream(Rpc rpc) { + this(rpc, Duration.ofSeconds(5)); + } + + Stream(Rpc rpc, Duration connectTimeout) { this.scheduler = rpc.getScheduler(); this.stream = rpc.createSession(GrpcRequestSettings.newBuilder() .disableDeadline() @@ -53,6 +57,19 @@ class Stream { stopFuture.complete(status); } }); + + // Guard against a half-open TCP connection where the gRPC stream never delivers + // SessionStarted. Without this timeout the reconnect loop stalls indefinitely, + // keeping CompletableFutures for pending operations (e.g. acquireEphemeralSemaphore) + // unresolved even after the application-level acquire timeout has expired. + scheduler.schedule(() -> { + if (startFuture.isDone()) { + return; + } + logger.warn("stream {} connect timeout after {} ms, cancelling", hashCode(), connectTimeout.toMillis()); + stream.cancel(); + startFuture.complete(Result.fail(Status.of(StatusCode.TIMEOUT))); + }, connectTimeout.toMillis(), TimeUnit.MILLISECONDS); } public CompletableFuture getFinishedFuture() {