diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java index 1062cc21aaab1..fdb3a2ea1db86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java @@ -300,11 +300,16 @@ public void invoke(IN value, Context context) throws Exception { @Override public void close() throws Exception { - serverThread.close(); - serverThread.join(); + if (serverThread != null) { + serverThread.close(); + serverThread.join(); + } } public void accumulateFinalResults() throws Exception { + if (bufferLock == null) { + return; + } bufferLock.lock(); try { // put results not consumed by the client into the accumulator diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java index 34efd510a6dfc..9e5d298da20db 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.List; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link CollectSinkFunction}. */ @@ -54,6 +55,19 @@ void after() throws Exception { functionWrapper.closeWrapper(); } + @Test + void testCloseBeforeOpenDoesNotThrow() { + CollectSinkFunction function = + new CollectSinkFunction<>( + serializer, 12, CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME); + assertThatCode( + () -> { + function.accumulateFinalResults(); + function.close(); + }) + .doesNotThrowAnyException(); + } + @Test void testIncreasingToken() throws Exception { functionWrapper.openFunction();