diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 4981d9545305..fca17b5b9800 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -86,6 +86,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -102,7 +103,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -322,6 +322,8 @@ public void testDiscardDuplicateFilesMultiThread() throws Exception { createFileStoreTable( options -> { options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true); + options.set(CoreOptions.COMMIT_MAX_RETRIES, 50); + options.set(CoreOptions.COMMIT_MAX_RETRY_WAIT, Duration.ofMillis(100)); // Keep all snapshots so concurrent expiry does not race readers. options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1000); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1000); @@ -334,33 +336,43 @@ public void testDiscardDuplicateFilesMultiThread() throws Exception { messages.add(write.prepareCommit()); } } - Runnable doCommit = - () -> { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (int i = 0; i < 10; i++) { - try (BatchTableCommit commit = writeBuilder.newCommit()) { - commit.commit(messages.get(rnd.nextInt(messages.size()))); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - + int commitThreadNum = 10; + int commitsPerThread = 10; Runnable asserter = () -> { List splits = table.newReadBuilder().newScan().plan().splits(); assertThat(splits.size()).isEqualTo(1); - assertTrue(splits.get(0).convertToRawFiles().get().size() <= 10); + assertThat(splits.get(0).convertToRawFiles().get().size()) + .isLessThanOrEqualTo(messages.size()); }; - // test multiple threads - ExecutorService pool = Executors.newCachedThreadPool(); - List> futures = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - futures.add(pool.submit(doCommit)); - } - for (Future future : futures) { - future.get(); + ExecutorService pool = Executors.newFixedThreadPool(commitThreadNum); + try { + List> futures = new ArrayList<>(); + for (int thread = 0; thread < commitThreadNum; thread++) { + int threadId = thread; + futures.add( + pool.submit( + () -> { + for (int round = 0; round < commitsPerThread; round++) { + int messageIndex = (threadId + round) % messages.size(); + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(messages.get(messageIndex)); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to commit message %s in thread %s round %s.", + messageIndex, threadId, round), + e); + } + } + })); + } + for (Future future : futures) { + future.get(); + } + } finally { + pool.shutdownNow(); } asserter.run(); }