From 62d5d270b735525dc7e2cae078f6afbcfcdd6ef2 Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Sun, 21 Jun 2026 00:29:13 +0000 Subject: [PATCH] [FLINK-39898][runtime] Fire processing-time timers between unaligned checkpoints when mini-batch is enabled Generated-by: Claude Code Signed-off-by: wilmerdooley --- .../runtime/tasks/mailbox/TaskMailboxImpl.java | 7 ++++++- .../runtime/tasks/mailbox/TaskMailboxImplTest.java | 12 ++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java index 56188c3167f38..275de1dd50e0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java @@ -227,6 +227,7 @@ public boolean createBatch() { private void moveUrgentMailsToBatchIfNeeded(boolean onlyMoveUrgentMails) { checkIsMailboxThread(); Mail peek = batch.peek(); + final boolean isBatchEmpty = peek == null; if (peek != null) { if (peek.getMailOptions().isUrgent()) { // To ensure that urgent mails are executed in FIFO order, moving mails from queue @@ -247,6 +248,10 @@ private void moveUrgentMailsToBatchIfNeeded(boolean onlyMoveUrgentMails) { } } + // When the batch is empty, also pull non-urgent mails in (not just urgent ones): otherwise a + // non-urgent mail queued behind an urgent one is put back and stranded until the next batch, + // starving processing-time timers between unaligned checkpoints. + final boolean shouldOnlyMoveUrgentMails = onlyMoveUrgentMails && !isBatchEmpty; final ReentrantLock lock = this.lock; lock.lock(); try { @@ -255,7 +260,7 @@ private void moveUrgentMailsToBatchIfNeeded(boolean onlyMoveUrgentMails) { if (mail.getMailOptions().isUrgent()) { batch.addFirst(mail); } else { - if (onlyMoveUrgentMails) { + if (shouldOnlyMoveUrgentMails) { // Put non-urgent mail back into the queue, and stop the loop queue.addFirst(mail); break; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java index 6b9cfb3d2b3cc..0f353a755d7f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.java @@ -395,6 +395,18 @@ void testPutHighPriorityMailDuringTakingFromBatch(boolean isAsyncPut) throws Exc assertThat(taskMailbox.tryTakeFromBatch()).hasValue(mailB); } + @Test + void testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch() { + Mail urgentMail = new Mail(urgent(), () -> {}, DEFAULT_PRIORITY, "urgentMail"); + Mail mail = new Mail(() -> {}, DEFAULT_PRIORITY, "mail"); + + taskMailbox.put(urgentMail); + taskMailbox.put(mail); + + assertThat(taskMailbox.tryTakeFromBatch()).hasValue(urgentMail); + assertThat(taskMailbox.tryTakeFromBatch()).hasValue(mail); + } + @ValueSource(booleans = {true, false}) @ParameterizedTest void testMailOrderWithBatch(boolean isAsyncPut) throws Exception {