Skip to content

[FLINK-38614][runtime] Defer EndOfData until final checkpoint committables are emitted#28476

Draft
MartijnVisser wants to merge 1 commit into
apache:masterfrom
MartijnVisser:FLINK-38614-global-committer-eoi-fix
Draft

[FLINK-38614][runtime] Defer EndOfData until final checkpoint committables are emitted#28476
MartijnVisser wants to merge 1 commit into
apache:masterfrom
MartijnVisser:FLINK-38614-global-committer-eoi-fix

Conversation

@MartijnVisser

Copy link
Copy Markdown
Contributor

What is the purpose of the change

SinkV2 jobs with a post-commit (global) committer can silently lose the final checkpoint's committables under unaligned checkpoints — an exactly-once data-loss bug. It surfaced as the intermittently failing SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling (FLINK-38614).

Root cause: the CommitterOperator forwards the final checkpoint's committables to the downstream global committer only on notifyCheckpointComplete, which (per FLIP-147) runs after endInput. With unaligned checkpoints those committables can belong to the post-end-of-input final checkpoint. The task broadcasts EndOfData right after endInput (before that final notifyCheckpointComplete), so the downstream global committer finishes consuming its input before the committables arrive and silently drops them.

This change defers the downstream EndOfData broadcast — for tasks whose operator chain emits records on the final checkpoint — until the final checkpoint completes, so those records are delivered before the downstream finishes.

Brief change log

  • New @Internal marker interface EmitsRecordsOnFinalCheckpoint in org.apache.flink.streaming.api.operators (beside BoundedOneInput).
  • CommitterOperator implements it; it returns true only when it forwards downstream and a final checkpoint will be taken (streaming + checkpointing). The condition is centralized in hasFinalCheckpoint(), shared with endInput.
  • StreamTask.endData defers the downstream EndOfData broadcast when checkpoints-after-tasks-finished is enabled and the chain contains such an operator; StreamTask.notifyCheckpointComplete flushes the deferred broadcast once the final checkpoint completes.
  • Added StreamTaskDeferredEndOfDataTest.

Notes for reviewers:

  • Why a marker interface rather than instanceof CommitterOperator: the deferral is a generic task-lifecycle concern, and StreamTask should not depend on the sink package. CommitterOperator is the only in-tree implementor today.
  • Non-sink tasks are unaffected: shouldDeferEndOfData() short-circuits unless checkpoints-after-finished-tasks is enabled and an operator opts in. endData runs once per task, not a per-record path.
  • No new hang: afterInvoke already gates termination on finalCheckpointCompleted under the identical condition, so the deferral adds no new termination dependency. The flush is gated on the final checkpoint completing; an uncompleted/aborted checkpoint does not flush, and an early flush is deliberately avoided because it would re-introduce the loss.

Verifying this change

This change added tests and can be verified as follows:

  • StreamTaskDeferredEndOfDataTest (new) deterministically verifies (a) the final-checkpoint record is emitted before EndOfData downstream, and (b) the deferred EndOfData is not flushed by an uncompleted/aborted final checkpoint but is flushed (after the record) once a later one completes. Both assertions fail without the fix.
  • Existing StreamTaskFinalCheckpointsTest, SinkV2CommitterOperatorTest, SinkV2SinkWriterOperatorTest, CheckpointAfterAllTasksFinishedITCase, and SinkV2ITCase remain green.
  • The end-to-end loss is inherently a cross-task scheduling race (committer task vs global-committer task) and is not deterministically reproducible as a plain ITCase; it was reproduced under controlled scheduling and confirmed fixed. The new unit test guards the single-task invariant the fix establishes (records emitted before EndOfData).

This change affects checkpoint/end-of-input coordination in StreamTask (FLIP-147), so specialist review from a checkpointing/runtime maintainer is requested.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (end-of-input / final-checkpoint coordination)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Opus 4.8)

Generated-by: Claude Opus 4.8

@MartijnVisser MartijnVisser marked this pull request as draft June 17, 2026 21:47
@flinkbot

flinkbot commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…ables are emitted

The sink CommitterOperator forwards the final checkpoint's committables to a
downstream global committer on notifyCheckpointComplete, i.e. after endInput.
With unaligned checkpoints these committables can land in the post-end-of-input
final checkpoint, which completes after the task has already broadcast EndOfData.
The global committer then finishes before receiving them and drops them, causing
exactly-once data loss.

Defer the downstream EndOfData broadcast for tasks whose operator chain emits
records on the final checkpoint until that checkpoint completes.

Generated-by: Claude Opus 4.8
@MartijnVisser MartijnVisser force-pushed the FLINK-38614-global-committer-eoi-fix branch from 40ce6f4 to 6ca2ae0 Compare June 18, 2026 06:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants