[FLINK-39330][runtime] Make CollectSinkFunction teardown null-safe to fix flaky FileSourceTextLinesITCase JM failover#28474
Draft
abhijeet2096 wants to merge 2 commits into
Draft
Conversation
…own null-safe When a CollectSinkOperator is torn down before initializeState/open ran (e.g. during a JobManager failover), CollectSinkOperator#close() invokes accumulateFinalResults() on a sink function whose bufferLock (and serverThread) were never initialized, throwing a NullPointerException. That NPE surfaces in the task's exception-handler path and is escalated to a fatal error that shuts down the TaskManager. In a single-TM MiniCluster the recovered job can then never reacquire slots, starves for the full slot request timeout, and fails with NoResourceAvailableException. Guard accumulateFinalResults() and close() against being called before the function was opened so the failover teardown no longer raises an NPE. Co-Authored-By: abhijeet2096 <sharma.abhijeet2096@gmail.com>
…inkFunction close-before-open Verifies that accumulateFinalResults() and close() do not throw when the function is closed before it was ever opened, covering the JobManager failover teardown path that previously raised a NullPointerException. Co-Authored-By: abhijeet2096 <sharma.abhijeet2096@gmail.com>
Collaborator
|
Working Logs Flaky Logs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


What is the purpose of the change
Fixes FLINK-39330 —
FileSourceTextLinesITCase.testBoundedTextFileSourceWithJobManagerFailoveris flaky.The flakiness is a latent
NullPointerExceptioninCollectSinkFunctionteardown that is triggered by the JobManager-failover timing the test exercises. When the collect-sink operator is torn down beforeinitializeState()/open()ran,CollectSinkOperator#close()callsCollectSinkFunction#accumulateFinalResults()whilebufferLock(andserverThread) are stillnull, throwing an NPE. The NPE surfaces in the task's exception-handler path and is escalated to a fatal error that shuts down the TaskManager. In the single-TaskManagerMiniClusterused by the test, the recovered job can then never reacquire slots, starves for the fullslot.request.timeout(5 min), and finally fails withNoResourceAvailableException; withmaxNumberRestartAttempts=1recovery is suppressed and the job dies, surfacing to the test asRuntimeException: Failed to fetch next result.bufferLockis only assigned ininitBuffer()(reached frominitializeState()/open()), so it isnullwhenever the operator is closed before it was ever opened — exactly what happens on the failover teardown race.Root-cause / failure chain (from a reproduced failing iteration, DEBUG logging)
triggerJobManagerFailover); the TaskExecutor closes the job's tasks.CollectSinkOperator.close()→CollectSinkFunction.accumulateFinalResults()throws:Stopping TaskExecutor …, thenIgnoring the freeing of slot … because the TaskExecutor is shutting down).MiniClusterhassetNumberTaskManagers(1), so its only TM is gone → recovered JM logsMatching resource requirements … Current resources: (none).slot.request.timeout(300000 ms in the logs), the slot pool fails the requests →NoResourceAvailableException→Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1).Brief change log
CollectSinkFunction#accumulateFinalResults()returns early whenbufferLock == null(function never opened → nothing buffered to accumulate).CollectSinkFunction#close()guardsserverThread != null(no server thread to stop if never opened).CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrowregression test.Verifying this change
This change added tests and can be verified as follows:
CollectSinkFunctionTest#testCloseBeforeOpenDoesNotThrow, which constructs aCollectSinkFunctionthat is never opened and asserts thataccumulateFinalResults()andclose()do not throw (Tests run: 1, Failures: 0).testBoundedTextFileSourceWithJobManagerFailoverwith IntelliJ's Repeat → Until Failure (failed by ~iteration 7 without the fix). With the fix the TaskManager is no longer killed during the JobManager failover and the loop stays green.Debugging steps used to localize the root cause:
Failed to fetch next resultatCollectResultIterator.hasNext.Recovery is suppressed … NoResourceAvailableException, which indicated the job died for lack of slots rather than a data error.FATAL … bufferLock is nullNPE that killed the TaskManager before the resource starvation, i.e. the starvation was a downstream symptom.bufferLockinitialization toinitBuffer()(only reached viainitializeState/open), confirming the unguarded close-before-open path.log4j entries used during investigation (
flink-connector-files/src/test/resources/log4j2-test.properties, investigation-only — not part of this PR):Full DEBUG logs of passing vs. failing iterations (the
FATAL … bufferLock is nullstack and theCurrent resources: (none)→NoResourceAvailableExceptionwindow) — screenshots attached below.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Claude Opus 4.8)