coordination: fix reconnect hang on half-open TCP connection#652
coordination: fix reconnect hang on half-open TCP connection#652krasnovdm wants to merge 2 commits into
Conversation
When a coordination session loses its gRPC stream and starts reconnecting, a new Stream is created with disableDeadline(). If the underlying TCP connection enters a half-open state (SYN established but no data flows), the gRPC stream never delivers SessionStarted, so startFuture never completes. The reconnect loop then stalls indefinitely, leaving all pending CompletableFutures (e.g. acquireEphemeralSemaphore) unresolved even after the application-level acquire timeout has expired. Fix: pass connectTimeout into Stream and schedule a one-shot timer that cancels the gRPC stream and completes startFuture with TIMEOUT if SessionStarted is not received within that window. The timer fires only when startFuture is still pending, so it is a no-op on the happy path. Reported via YDB support: YDBREQUESTS-7830
There was a problem hiding this comment.
Pull request overview
This PR addresses coordination session reconnect hangs caused by half-open TCP connections by adding a client-side connect watchdog that cancels the gRPC stream if SessionStarted is not received within connectTimeout, allowing the reconnect loop (and pending operations) to terminate instead of stalling indefinitely.
Changes:
- Updated
Streamto accept aconnectTimeoutand schedule a one-shot timeout that cancels the stream and failsstartFutureif startup hangs. - Updated
StreamIntegrationTestto pass the newconnectTimeoutconstructor argument. - Removed/emptied
StreamTestinstead of updating it (contradicts the PR description and drops unit coverage).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| coordination/src/main/java/tech/ydb/coordination/impl/Stream.java | Adds connect-time watchdog logic and changes constructor signature to require connectTimeout. |
| coordination/src/test/java/tech/ydb/coordination/impl/StreamIntegrationTest.java | Updates test to use the new Stream(Rpc, Duration) constructor. |
| coordination/src/test/java/tech/ydb/coordination/impl/StreamTest.java | Test contents removed (file now empty), reducing unit coverage and diverging from PR description. |
Comments suppressed due to low confidence (1)
coordination/src/main/java/tech/ydb/coordination/impl/Stream.java:45
- Stream constructor now requires connectTimeout, but there are still call sites using the old
new Stream(rpc)signature (e.g.,SessionImpl.connect()andSessionImpl.restoreSession()), which will fail compilation. Update those call sites to pass the appropriate timeout (likely the existingconnectTimeoutfield).
Stream(Rpc rpc, Duration connectTimeout) {
this.scheduler = rpc.getScheduler();
this.stream = rpc.createSession(GrpcRequestSettings.newBuilder()
.disableDeadline()
.build());
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Guard against a half-open TCP connection where the gRPC stream never delivers | ||
| // SessionStarted. Without this timeout the reconnect loop stalls indefinitely, | ||
| // keeping CompletableFutures for pending operations (e.g. acquireEphemeralSemaphore) | ||
| // unresolved even after the application-level acquire timeout has expired. | ||
| scheduler.schedule(() -> { | ||
| if (startFuture.isDone()) { | ||
| return; | ||
| } | ||
| logger.warn("stream {} connect timeout after {} ms, cancelling", hashCode(), connectTimeout.toMillis()); | ||
| stream.cancel(); | ||
| startFuture.complete(Result.fail(Status.of(StatusCode.TIMEOUT))); | ||
| }, connectTimeout.toMillis(), TimeUnit.MILLISECONDS); |
| @Test | ||
| public void stopBeforeStartTest() { | ||
| Stream stream = new Stream(RPC); | ||
| Stream stream = new Stream(RPC, java.time.Duration.ofSeconds(5)); | ||
| Status stopped = stream.stop().join(); | ||
|
|
…store tests Keep Stream(Rpc rpc) as a delegate to Stream(rpc, Duration.ofSeconds(5)) so existing unit tests compile unchanged. SessionImpl passes the configured connectTimeout explicitly; the no-arg overload is only used in tests. Restore StreamTest.java (was accidentally cleared) and revert StreamIntegrationTest.java to the original constructor call.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #652 +/- ##
============================================
- Coverage 71.31% 71.22% -0.10%
+ Complexity 3365 3364 -1
============================================
Files 379 379
Lines 15920 15929 +9
Branches 1669 1669
============================================
- Hits 11353 11345 -8
- Misses 3917 3929 +12
- Partials 650 655 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Problem
When a coordination session loses its gRPC stream and begins reconnecting, a new
Streamis created withdisableDeadline()(intentional — the session stream is long-lived). If the underlying TCP connection enters a half-open state (TCP handshake completes, but no application data flows), the gRPC stream never deliversSessionStarted, sostartFuturenever completes.The reconnect loop then stalls indefinitely:
restoreSession→ createsnew Stream(rpc)→ callsconnectToSession(stream, sessionId)connectToSessionreturnsstartFuture, which never resolveswhenCompleteAsynccallback inreconnect()never firesmessagesToRetry(containing the originalacquireEphemeralSemaphore) are frozen forever.get()hangs — observed hanging for hours untilThread.interrupt()The
connectTimeoutfield existed inSessionImplbut was only passed to the server viaSessionStart.setTimeoutMillis(server-side session keepalive), not used as a Java-side deadline.Reported via YDB L2 support: YDBREQUESTS-7830
Fix
Pass
connectTimeoutintoStreamand schedule a one-shot watchdog timer:SessionStartedarrives before the deadline → timer is a no-op (startFuture.isDone()is true)startFuturewithTIMEOUTThis unblocks the reconnect loop:
connectToSessionfuture resolves with failure,reconnect()proceeds to either the next retry attempt orcompleteMessagesWithBadSession, ensuring pending operations always terminate.Testing
StreamTestandStreamIntegrationTestto pass the requiredconnectTimeoutargumentBUILD SUCCESS); integration tests skipped (no Docker)