From 1b14ca7e6fa82137e3c264350664b04fb3946b77 Mon Sep 17 00:00:00 2001 From: Evan Date: Wed, 24 Jun 2026 18:06:21 +0300 Subject: [PATCH 1/2] perf(write): skip auto-partition work for non-partitioned tables Building the AutoPartitionStrategy on every record needlessly cost CPU in the hot write path, even for tables without partition keys. Guard the dynamic partition creation call in WriterClient.doSend behind a partition-keys check so the strategy is only resolved for partitioned tables, and cache AutoPartitionStrategy lazily in TableConfig via a volatile field rather than rebuilding it on each access. Signed-off-by: Evan --- .../org/apache/fluss/client/write/WriterClient.java | 13 +++++++++---- .../java/org/apache/fluss/config/TableConfig.java | 12 +++++++++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java index b4c96b0ac4..e727a5d0b0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java @@ -176,10 +176,15 @@ private void doSend(WriteRecord record, WriteCallback callback) { TableInfo tableInfo = record.getTableInfo(); PhysicalTablePath physicalTablePath = record.getPhysicalTablePath(); - dynamicPartitionCreator.checkAndCreatePartitionAsync( - physicalTablePath, - tableInfo.getPartitionKeys(), - tableInfo.getTableConfig().getAutoPartitionStrategy()); + // Skip on non-partitioned tables: the callee returns immediately when + // partitionName is null, but the expensive AutoPartitionStrategy argument + // would still be evaluated per record without this guard. + if (!tableInfo.getPartitionKeys().isEmpty()) { + dynamicPartitionCreator.checkAndCreatePartitionAsync( + physicalTablePath, + tableInfo.getPartitionKeys(), + tableInfo.getTableConfig().getAutoPartitionStrategy()); + } // maybe create bucket assigner. Cluster cluster = metadataUpdater.getCluster(); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index fbf8c77264..c367628dc7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -44,6 +44,11 @@ public class TableConfig { // the table properties configuration private final Configuration config; + // Cached, lazily-built strategy. Safe because TableConfig is immutable after construction + // and AutoPartitionStrategy itself is fully immutable. A benign race on first call + // produces the same value, so a plain volatile is sufficient. + private volatile AutoPartitionStrategy autoPartitionStrategy; + /** * Creates a new table config. * @@ -158,7 +163,12 @@ public ArrowCompressionInfo getArrowCompressionInfo() { /** Gets the auto partition strategy of the table. */ public AutoPartitionStrategy getAutoPartitionStrategy() { - return AutoPartitionStrategy.from(config); + AutoPartitionStrategy s = autoPartitionStrategy; + if (s == null) { + s = AutoPartitionStrategy.from(config); + autoPartitionStrategy = s; + } + return s; } /** Gets the number of auto-increment IDs cached per segment. */ From be9e5c470043a0be98d7ed5380f9060ada95e61a Mon Sep 17 00:00:00 2001 From: Evan Date: Thu, 25 Jun 2026 19:43:48 +0300 Subject: [PATCH 2/2] address comments - test the strategy is memoized, so repeated calls return the same instancetestAutoPartitionStrategyIsCached - use tableInfo.isPartitioned instead of !tableInfo.getPartitionKeys().isEmpty() - address comments Signed-off-by: Evan --- .../java/org/apache/fluss/client/write/WriterClient.java | 2 +- .../main/java/org/apache/fluss/config/TableConfig.java | 7 ++++++- .../java/org/apache/fluss/config/TableConfigTest.java | 9 +++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java index e727a5d0b0..19df2ff6fa 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java @@ -179,7 +179,7 @@ private void doSend(WriteRecord record, WriteCallback callback) { // Skip on non-partitioned tables: the callee returns immediately when // partitionName is null, but the expensive AutoPartitionStrategy argument // would still be evaluated per record without this guard. - if (!tableInfo.getPartitionKeys().isEmpty()) { + if (tableInfo.isPartitioned()) { dynamicPartitionCreator.checkAndCreatePartitionAsync( physicalTablePath, tableInfo.getPartitionKeys(), diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index c367628dc7..4c2e39f779 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -161,7 +161,12 @@ public ArrowCompressionInfo getArrowCompressionInfo() { return ArrowCompressionInfo.fromConf(config); } - /** Gets the auto partition strategy of the table. */ + /** + * Gets the auto partition strategy of the table. + * + *

The result is cached on first access; this assumes the underlying config is not mutated + * after this {@code TableConfig} is constructed. + */ public AutoPartitionStrategy getAutoPartitionStrategy() { AutoPartitionStrategy s = autoPartitionStrategy; if (s == null) { diff --git a/fluss-common/src/test/java/org/apache/fluss/config/TableConfigTest.java b/fluss-common/src/test/java/org/apache/fluss/config/TableConfigTest.java index 5d18fcd1c9..c77c9a3913 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/TableConfigTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/TableConfigTest.java @@ -44,4 +44,13 @@ void testDeleteBehavior() { TableConfig tableConfig3 = new TableConfig(conf); assertThat(tableConfig3.getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE); } + + @Test + void testAutoPartitionStrategyIsCached() { + TableConfig tableConfig = new TableConfig(new Configuration()); + + // the strategy is memoized, so repeated calls return the same instance + assertThat(tableConfig.getAutoPartitionStrategy()) + .isSameAs(tableConfig.getAutoPartitionStrategy()); + } }