Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
Expand All @@ -49,7 +49,6 @@
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
Expand All @@ -68,12 +67,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -106,55 +102,12 @@ void setUp() {
// execution semantics without strict thread identity checks. This fixes flaky test
// failures (FLINK-38970) caused by main thread constraint violations when
// CompletableFuture callbacks are dispatched from background IO executor threads.
// The synchronous execution is preserved while eliminating the race condition.
mainThreadExecutor = new SynchronousComponentMainThreadExecutor();
// No-op thread check avoids flakiness from FLINK-38970: scheduler callbacks
// can complete on background IO threads rather than the main thread.
mainThreadExecutor = new NoMainThreadCheckComponentMainThreadExecutor();
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
}

/**
* A synchronous ComponentMainThreadExecutor that runs tasks immediately on the calling thread.
* Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
* not perform strict thread identity checks, avoiding flaky failures when CompletableFuture
* callbacks are dispatched from background threads.
*/
private static class SynchronousComponentMainThreadExecutor
implements ComponentMainThreadExecutor {
private final DirectScheduledExecutorService executor =
new DirectScheduledExecutorService();

@Override
public void assertRunningInMainThread() {
// No-op: Skip thread assertion to avoid flaky failures
}

@Override
public void execute(Runnable command) {
executor.execute(command);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return executor.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return executor.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return executor.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

@Test
void testVertexInitializationFailureIsLabeled() throws Exception {
final JobGraph jobGraph = createBrokenJobGraph();
Expand Down