From d977cff46645acbef4e2892dd4bb9831f94084f7 Mon Sep 17 00:00:00 2001 From: agrawal-siddharth Date: Fri, 24 Apr 2026 03:30:07 +0000 Subject: [PATCH] fix: data race in ConnectionWorker. RACE DESCRIPTION: Requests are added to inflightRequestQueue before actually being sent. A race occurs when a response arrives on a separate thread, triggering a retry process that clears inflightRequestQueue and nullifies requestSendTimeStamp. Simultaneously, the appendLoop thread might try to send a request from the localQueue, setting requestSendTimeStamp to the current time, leading to conflicting writes. FIX DESCRIPTION: The fix is to ensure only the appendLoop thread modifies requestSendTimeStamp. This thread confinement prevents conflicts with other threads, like the response thread. We can stop nullifying requestSendTimeStamp because appendLoop will always update it to the current time just before sending. The check for exceeding response timeouts happens earlier in the appendLoop, so this change does not cause issues. --- .../google/cloud/bigquery/storage/v1/ConnectionWorker.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 6e353617e36d..f425ee235933 100644 --- a/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -878,7 +878,6 @@ private void appendLoop() { // Consider the backend latency as completed for the current request. requestProfilerHook.endOperation( RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId); - requestWrapper.requestSendTimeStamp = null; requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); waitingRequestQueue.addFirst(requestWrapper); @@ -1454,7 +1453,6 @@ private void doneCallback(Throwable finalStatus) { private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) { AppendRequestAndResponse requestWrapper = pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll(); - requestWrapper.requestSendTimeStamp = null; --this.inflightRequests; this.inflightBytes -= requestWrapper.messageSize; this.inflightReduced.signal(); @@ -1501,7 +1499,7 @@ static final class AppendRequestAndResponse { long recordBatchRowCount = -1; // Time at which request was last sent over the network. - // If a response is no longer expected this is set back to null. + // This is set ONLY by the appendLoop thread. Instant requestSendTimeStamp; AppendRequestAndResponse(