diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index 9b5b1c3d33403..143023d91dc54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -25,7 +25,7 @@ import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.TestingFailureEnricher; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; @@ -49,7 +49,6 @@ import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; @@ -68,12 +67,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -106,55 +102,12 @@ void setUp() { // execution semantics without strict thread identity checks. This fixes flaky test // failures (FLINK-38970) caused by main thread constraint violations when // CompletableFuture callbacks are dispatched from background IO executor threads. - // The synchronous execution is preserved while eliminating the race condition. - mainThreadExecutor = new SynchronousComponentMainThreadExecutor(); + // No-op thread check avoids flakiness from FLINK-38970: scheduler callbacks + // can complete on background IO threads rather than the main thread. + mainThreadExecutor = new NoMainThreadCheckComponentMainThreadExecutor(); taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } - /** - * A synchronous ComponentMainThreadExecutor that runs tasks immediately on the calling thread. - * Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does - * not perform strict thread identity checks, avoiding flaky failures when CompletableFuture - * callbacks are dispatched from background threads. - */ - private static class SynchronousComponentMainThreadExecutor - implements ComponentMainThreadExecutor { - private final DirectScheduledExecutorService executor = - new DirectScheduledExecutorService(); - - @Override - public void assertRunningInMainThread() { - // No-op: Skip thread assertion to avoid flaky failures - } - - @Override - public void execute(Runnable command) { - executor.execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return executor.schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return executor.schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return executor.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - } - @Test void testVertexInitializationFailureIsLabeled() throws Exception { final JobGraph jobGraph = createBrokenJobGraph();