From 9e79e9a4028a62f8f03a9a72c977d05691565ec0 Mon Sep 17 00:00:00 2001 From: ocb3916 Date: Fri, 12 Jun 2026 01:23:13 +0800 Subject: [PATCH 1/4] [FLINK-39931][runtime/tests] Fix flaky AdaptiveBatchSchedulerTest due to unexpected ComponentMainThreadExecutor behavior in async execution Co-authored-by: Yuepeng Pan --- .../AdaptiveBatchSchedulerTest.java | 54 ++----------------- 1 file changed, 3 insertions(+), 51 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index 9b5b1c3d33403..3bbfe92c53f66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -25,7 +25,7 @@ import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.TestingFailureEnricher; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; @@ -49,7 +49,6 @@ import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; @@ -68,12 +67,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -87,7 +83,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link AdaptiveBatchScheduler}. */ -class AdaptiveBatchSchedulerTest { +public class AdaptiveBatchSchedulerTest { private static final int SOURCE_PARALLELISM_1 = 6; private static final int SOURCE_PARALLELISM_2 = 4; @@ -107,54 +103,10 @@ void setUp() { // failures (FLINK-38970) caused by main thread constraint violations when // CompletableFuture callbacks are dispatched from background IO executor threads. // The synchronous execution is preserved while eliminating the race condition. - mainThreadExecutor = new SynchronousComponentMainThreadExecutor(); + mainThreadExecutor = new NoMainThreadCheckComponentMainThreadExecutor(); taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } - /** - * A synchronous ComponentMainThreadExecutor that runs tasks immediately on the calling thread. - * Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does - * not perform strict thread identity checks, avoiding flaky failures when CompletableFuture - * callbacks are dispatched from background threads. - */ - private static class SynchronousComponentMainThreadExecutor - implements ComponentMainThreadExecutor { - private final DirectScheduledExecutorService executor = - new DirectScheduledExecutorService(); - - @Override - public void assertRunningInMainThread() { - // No-op: Skip thread assertion to avoid flaky failures - } - - @Override - public void execute(Runnable command) { - executor.execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return executor.schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return executor.schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return executor.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - } - @Test void testVertexInitializationFailureIsLabeled() throws Exception { final JobGraph jobGraph = createBrokenJobGraph(); From dad788d68bc4e1d8fc0be056e77ece21ed658299 Mon Sep 17 00:00:00 2001 From: ocb3916 <58809383+ocb3916@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:51:27 +0900 Subject: [PATCH 2/4] Apply suggestion from @spuru9 Co-authored-by: Purushottam Sinha --- .../scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index 3bbfe92c53f66..ba37f8d03a542 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -83,7 +83,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link AdaptiveBatchScheduler}. */ -public class AdaptiveBatchSchedulerTest { +class AdaptiveBatchSchedulerTest { private static final int SOURCE_PARALLELISM_1 = 6; private static final int SOURCE_PARALLELISM_2 = 4; From aa997c8f541b96dced9707c668ef8040238eba7a Mon Sep 17 00:00:00 2001 From: ocb3916 <58809383+ocb3916@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:51:46 +0900 Subject: [PATCH 3/4] Apply suggestion from @spuru9 Co-authored-by: Purushottam Sinha --- .../scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index ba37f8d03a542..c6a88bdc53f79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -102,7 +102,8 @@ void setUp() { // execution semantics without strict thread identity checks. This fixes flaky test // failures (FLINK-38970) caused by main thread constraint violations when // CompletableFuture callbacks are dispatched from background IO executor threads. - // The synchronous execution is preserved while eliminating the race condition. +// No-op thread check avoids flakiness from FLINK-38970: scheduler callbacks +// can complete on background IO threads rather than the main thread. mainThreadExecutor = new NoMainThreadCheckComponentMainThreadExecutor(); taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } From 44076bf2f4ed1bb695c5d5f48796cb3a244a428c Mon Sep 17 00:00:00 2001 From: ocb3916 Date: Sun, 21 Jun 2026 22:53:33 +0900 Subject: [PATCH 4/4] Apply suggestion from @spuru9 Co-authored-by: Purushottam Sinha --- .../scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index c6a88bdc53f79..143023d91dc54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -102,8 +102,8 @@ void setUp() { // execution semantics without strict thread identity checks. This fixes flaky test // failures (FLINK-38970) caused by main thread constraint violations when // CompletableFuture callbacks are dispatched from background IO executor threads. -// No-op thread check avoids flakiness from FLINK-38970: scheduler callbacks -// can complete on background IO threads rather than the main thread. + // No-op thread check avoids flakiness from FLINK-38970: scheduler callbacks + // can complete on background IO threads rather than the main thread. mainThreadExecutor = new NoMainThreadCheckComponentMainThreadExecutor(); taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); }