Skip to content

[FLINK-39898][runtime] Fire processing-time timers between unaligned checkpoints when mini-batch is enabled#28484

Open
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39898
Open

[FLINK-39898][runtime] Fire processing-time timers between unaligned checkpoints when mini-batch is enabled#28484
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39898

Conversation

@wilmerdooley

Copy link
Copy Markdown

What is the purpose of the change

When both table.exec.mini-batch.enabled and execution.checkpointing.unaligned.enabled are on, ProcessingTimeService timer callbacks are not fired between checkpoints; they only run during checkpoint barrier handling. This is a regression from 1.20 introduced by the urgent-mail system (FLINK-35796): unaligned checkpoint barriers are submitted as urgent mails, and TaskMailboxImpl then skipped non-urgent mails (the timer callbacks) while an urgent mail was present, starving them until the next checkpoint.

This change lets non-urgent mails make progress when the batch would otherwise be empty, without changing urgent-mail prioritization.

Brief change log

  • TaskMailboxImpl.moveUrgentMailsToBatchIfNeeded: compute shouldOnlyMoveUrgentMails = onlyMoveUrgentMails && !isBatchEmpty and gate the non-urgent put-back/break on it, so when the batch is empty a non-urgent mail is moved into the batch instead of being returned to the queue and skipped. Urgent mails still take FIFO priority via batch.addFirst.

Verifying this change

This change added a unit test: TaskMailboxImplTest.testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch asserts that a non-urgent mail queued behind an urgent mail is still taken from the batch (not starved) while the urgent mail is taken first. The test fails on the pre-fix code, where the non-urgent mail was returned to the queue and skipped.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • Public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (the task mailbox take/createBatch path runs once per mailbox-loop iteration; this change adds two local boolean computations and changes only the empty-batch branch, with no new lock or allocation, so no measurable per-record overhead is expected; it does change how often non-urgent mails are drained on the task thread)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (it interacts with unaligned-checkpoint barrier delivery and processing-time timer firing between checkpoints; the change is confined to the mailbox take path and does not touch JobManager, Kubernetes, Yarn, ZooKeeper, snapshot format, or recovery state)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

This PR was written with the assistance of generative AI tooling.

  • Was generative AI tooling used to co-author this PR?

Generated-by: Claude Code

JIRA: https://issues.apache.org/jira/browse/FLINK-39898

@flinkbot

flinkbot commented Jun 19, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…checkpoints when mini-batch is enabled

Generated-by: Claude Code
Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants