From d4b193e5221c2915bc3f8d7ff2e2b2f8d16e3b4c Mon Sep 17 00:00:00 2001 From: Abhijeet Sharma Date: Wed, 17 Jun 2026 14:10:59 +0530 Subject: [PATCH 1/2] [FLINK-39330] | [FLINK-39330][runtime] Make CollectSinkFunction teardown null-safe When a CollectSinkOperator is torn down before initializeState/open ran (e.g. during a JobManager failover), CollectSinkOperator#close() invokes accumulateFinalResults() on a sink function whose bufferLock (and serverThread) were never initialized, throwing a NullPointerException. That NPE surfaces in the task's exception-handler path and is escalated to a fatal error that shuts down the TaskManager. In a single-TM MiniCluster the recovered job can then never reacquire slots, starves for the full slot request timeout, and fails with NoResourceAvailableException. Guard accumulateFinalResults() and close() against being called before the function was opened so the failover teardown no longer raises an NPE. Co-Authored-By: abhijeet2096 --- .../api/operators/collect/CollectSinkFunction.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java index 1062cc21aaab1..fdb3a2ea1db86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java @@ -300,11 +300,16 @@ public void invoke(IN value, Context context) throws Exception { @Override public void close() throws Exception { - serverThread.close(); - serverThread.join(); + if (serverThread != null) { + serverThread.close(); + serverThread.join(); + } } public void accumulateFinalResults() throws Exception { + if (bufferLock == null) { + return; + } bufferLock.lock(); try { // put results not consumed by the client into the accumulator From 5a1c7d5e48361ad3cfe7e4e495efaad528832da8 Mon Sep 17 00:00:00 2001 From: Abhijeet Sharma Date: Wed, 17 Jun 2026 14:10:59 +0530 Subject: [PATCH 2/2] [FLINK-39330] | [FLINK-39330][tests] Add regression test for CollectSinkFunction close-before-open Verifies that accumulateFinalResults() and close() do not throw when the function is closed before it was ever opened, covering the JobManager failover teardown path that previously raised a NullPointerException. Co-Authored-By: abhijeet2096 --- .../operators/collect/CollectSinkFunctionTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java index 34efd510a6dfc..9e5d298da20db 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.List; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link CollectSinkFunction}. */ @@ -54,6 +55,19 @@ void after() throws Exception { functionWrapper.closeWrapper(); } + @Test + void testCloseBeforeOpenDoesNotThrow() { + CollectSinkFunction function = + new CollectSinkFunction<>( + serializer, 12, CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME); + assertThatCode( + () -> { + function.accumulateFinalResults(); + function.close(); + }) + .doesNotThrowAnyException(); + } + @Test void testIncreasingToken() throws Exception { functionWrapper.openFunction();