From bd34e123ebfe075b4e26d2fa2f7ada7fe6c2c6c7 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Tue, 23 Jun 2026 13:55:16 +0800 Subject: [PATCH 1/4] [server] Add negative cache for non-existent partition IDs in metadata requests --- .../apache/fluss/server/RpcServiceBase.java | 20 ++ .../metadata/PartitionNegativeCache.java | 141 +++++++++++++ .../PartitionNegativeCacheITCase.java | 129 ++++++++++++ .../metadata/PartitionNegativeCacheTest.java | 191 ++++++++++++++++++ 4 files changed, 481 insertions(+) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index f7b6de9eb8..e71e8cb702 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -17,6 +17,7 @@ package org.apache.fluss.server; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.cluster.ConfigEntry; @@ -86,6 +87,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.metadata.MetadataProvider; import org.apache.fluss.server.metadata.PartitionMetadata; +import org.apache.fluss.server.metadata.PartitionNegativeCache; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.tablet.TabletService; @@ -143,6 +145,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR protected final MetadataManager metadataManager; protected final @Nullable Authorizer authorizer; protected final DynamicConfigManager dynamicConfigManager; + protected final PartitionNegativeCache partitionNegativeCache; private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; @@ -164,9 +167,15 @@ public RpcServiceBase( this.metadataManager = metadataManager; this.authorizer = authorizer; this.dynamicConfigManager = dynamicConfigManager; + this.partitionNegativeCache = new PartitionNegativeCache(); this.ioExecutor = ioExecutor; } + @VisibleForTesting + public PartitionNegativeCache getPartitionNegativeCache() { + return partitionNegativeCache; + } + @Override public ServerType providerType() { return provider; @@ -614,6 +623,15 @@ protected MetadataResponse processMetadataRequest( long[] partitionIds = request.getPartitionsIds(); List partitionIdsNotExistsInCache = new ArrayList<>(); for (long partitionId : partitionIds) { + // Fast-path: throw immediately for partition IDs known to not exist, + // avoiding ZK queries while preserving the original exception semantics. + if (partitionNegativeCache.isKnownNonExistent(partitionId)) { + throw new PartitionNotExistException( + String.format( + "The partition id '%d' does not exist or you don't have" + + " permission to access it.", + partitionId)); + } Optional physicalTablePath = metadataProvider.getPhysicalTablePathFromCache(partitionId); if (physicalTablePath.isPresent()) { @@ -634,6 +652,8 @@ protected MetadataResponse processMetadataRequest( if (partitionIdAndPaths.containsKey(partitionId)) { partitionPaths.add(partitionIdAndPaths.get(partitionId)); } else { + // Mark this partition ID in the negative cache to avoid future ZK queries + partitionNegativeCache.markNonExistent(partitionId); throw new PartitionNotExistException( String.format( "The partition id '%d' does not exist or you don't have permission to access it.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java new file mode 100644 index 0000000000..7f4a63c489 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; + +import javax.annotation.concurrent.ThreadSafe; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A thread-safe negative cache for partition IDs that are known to not exist in ZooKeeper. + * + *

This cache helps reduce ZooKeeper pressure when clients repeatedly request metadata for + * partitions that have been deleted (e.g., during hourly partition rotation). Instead of querying + * ZK every time, we cache the "not exist" result and return it directly. + * + *

The cache uses access-time-based TTL: entries are evicted after a configurable duration of no + * access. As long as clients keep asking for the same non-existent partition, the cache entry stays + * alive and protects ZK. + */ +@ThreadSafe +public class PartitionNegativeCache { + + /** Default TTL for negative cache entries: 10 minutes of no access. */ + private static final Duration DEFAULT_TTL = Duration.ofMinutes(10); + + /** + * Cleanup interval: only run eviction check when at least this many milliseconds have passed + * since the last cleanup. + */ + private static final long CLEANUP_INTERVAL_MS = 60_000; + + private final ConcurrentHashMap cache; + private final long ttlMs; + private final AtomicLong lastCleanupTime; + private final Clock clock; + + public PartitionNegativeCache() { + this(DEFAULT_TTL); + } + + public PartitionNegativeCache(Duration ttl) { + this(ttl, SystemClock.getInstance()); + } + + @VisibleForTesting + PartitionNegativeCache(Duration ttl, Clock clock) { + this.cache = new ConcurrentHashMap<>(); + this.ttlMs = ttl.toMillis(); + this.clock = clock; + this.lastCleanupTime = new AtomicLong(clock.milliseconds()); + } + + /** + * Checks if the given partition ID is known to not exist. + * + *

If the entry exists and is not expired, its access time is refreshed and this method + * returns {@code true}. If the entry is expired or doesn't exist, returns {@code false}. + * + * @param partitionId the partition ID to check + * @return {@code true} if the partition is known to not exist, {@code false} otherwise + */ + public boolean isKnownNonExistent(long partitionId) { + Long lastAccessTime = cache.get(partitionId); + if (lastAccessTime == null) { + return false; + } + long now = clock.milliseconds(); + if (now - lastAccessTime > ttlMs) { + // Entry expired, remove it + cache.remove(partitionId, lastAccessTime); + return false; + } + // Refresh access time + cache.put(partitionId, now); + return true; + } + + /** + * Marks the given partition ID as known to not exist. The entry will remain in the cache until + * it hasn't been accessed for the configured TTL duration. + * + * @param partitionId the partition ID to mark as non-existent + */ + public void markNonExistent(long partitionId) { + cache.put(partitionId, clock.milliseconds()); + maybeCleanup(); + } + + /** + * Returns the current number of entries in the cache. Includes potentially expired entries that + * haven't been cleaned up yet. + */ + @VisibleForTesting + public int size() { + return cache.size(); + } + + /** Removes all entries from the cache. */ + @VisibleForTesting + public void clear() { + cache.clear(); + } + + /** + * Performs lazy cleanup of expired entries. Only runs if enough time has passed since the last + * cleanup to avoid excessive iteration. + */ + private void maybeCleanup() { + long now = clock.milliseconds(); + long lastCleanup = lastCleanupTime.get(); + if (now - lastCleanup < CLEANUP_INTERVAL_MS) { + return; + } + // CAS to ensure only one thread does the cleanup + if (!lastCleanupTime.compareAndSet(lastCleanup, now)) { + return; + } + cache.entrySet().removeIf(entry -> now - entry.getValue() > ttlMs); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java new file mode 100644 index 0000000000..674ba4c56f --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Collections; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropPartitionRequest; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for {@link PartitionNegativeCache} integration with metadata request processing. */ +class PartitionNegativeCacheITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + + private static CoordinatorGateway coordinatorGateway; + + @BeforeAll + static void setup() { + coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + } + + @Test + void testNegativeCacheForDeletedPartition() throws Exception { + FLUSS_CLUSTER_EXTENSION.waitUntilAllGatewayHasSameMetadata(); + + // Create a partitioned table with auto-partition disabled. + TablePath tablePath = TablePath.of("test_db_neg_cache", "partitioned_table"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1) + .partitionedBy("b") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, false) + .build(); + createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + // Create a partition and get its ID. + String partitionName = "p_to_delete"; + long partitionId = + createPartition( + FLUSS_CLUSTER_EXTENSION, + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false); + + // Drop the partition. + coordinatorGateway + .dropPartition( + newDropPartitionRequest( + tablePath, + new PartitionSpec(Collections.singletonMap("b", partitionName)), + false)) + .get(); + + // Wait until the partition is actually deleted from ZK (metadata cache update). + retry( + Duration.ofMinutes(1), + () -> { + // Request metadata with the deleted partition ID - should throw. + MetadataRequest request = new MetadataRequest(); + request.addPartitionsId(partitionId); + request.addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + assertThatThrownBy(() -> coordinatorGateway.metadata(request).get()) + .cause() + .isInstanceOf(PartitionNotExistException.class); + }); + + // At this point, the partition ID should be in the negative cache. + // Verify the negative cache is populated on the coordinator. + PartitionNegativeCache negativeCache = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getPartitionNegativeCache(); + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); + + // Second request should also fail (served from negative cache, no ZK query). + MetadataRequest secondRequest = new MetadataRequest(); + secondRequest.addPartitionsId(partitionId); + secondRequest + .addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + assertThatThrownBy(() -> coordinatorGateway.metadata(secondRequest).get()) + .cause() + .isInstanceOf(PartitionNotExistException.class); + + // Negative cache should still have the entry. + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java new file mode 100644 index 0000000000..9c4f4e134f --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metadata; + +import org.apache.fluss.utils.clock.ManualClock; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PartitionNegativeCache}. */ +class PartitionNegativeCacheTest { + + private static final Duration TTL = Duration.ofMinutes(10); + private static final long TTL_MS = TTL.toMillis(); + + @Test + void testMarkAndQueryNonExistent() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + // Initially, partition is not known as non-existent + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Mark it as non-existent + cache.markNonExistent(100L); + + // Now it should be known as non-existent + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + assertThat(cache.size()).isEqualTo(1); + } + + @Test + void testUnknownPartitionReturnsFalse() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + // Never marked partition should return false + assertThat(cache.isKnownNonExistent(999L)).isFalse(); + assertThat(cache.isKnownNonExistent(0L)).isFalse(); + assertThat(cache.isKnownNonExistent(-1L)).isFalse(); + } + + @Test + void testMultiplePartitions() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + + assertThat(cache.isKnownNonExistent(1L)).isTrue(); + assertThat(cache.isKnownNonExistent(2L)).isTrue(); + assertThat(cache.isKnownNonExistent(3L)).isTrue(); + assertThat(cache.isKnownNonExistent(4L)).isFalse(); + assertThat(cache.size()).isEqualTo(3); + } + + @Test + void testTtlExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Advance time past TTL from last access time (which is 0) + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Entry should be removed from cache + assertThat(cache.size()).isEqualTo(0); + } + + @Test + void testNotExpiredAtExactBoundary() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(100L); + + // At exactly TTL_MS, condition is "now - lastAccess > ttl" (strict >), so not expired + clock.advanceTime(TTL_MS, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + } + + @Test + void testAccessTimeRefreshKeepsEntryAlive() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(100L); + + // Access at t = TTL - 1 (within TTL window), refreshes access time to TTL - 1 + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Now TTL is measured from last access (TTL_MS - 1). + // At t = 2*TTL - 2, elapsed = TTL - 1, NOT > TTL, still valid. + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 2*TTL - 2 + + // Access again at t = 3*TTL - 3 to prove indefinite survival + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); // refreshes to 3*TTL - 3 + + // Now stop accessing. Advance past TTL from last access (3*TTL - 3). + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + + @Test + void testClear() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + assertThat(cache.size()).isEqualTo(3); + + cache.clear(); + assertThat(cache.size()).isEqualTo(0); + assertThat(cache.isKnownNonExistent(1L)).isFalse(); + assertThat(cache.isKnownNonExistent(2L)).isFalse(); + assertThat(cache.isKnownNonExistent(3L)).isFalse(); + } + + @Test + void testRemarkAfterExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Expire the entry + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + + // Re-mark at new time should work + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // The new entry expires TTL after re-mark time (TTL_MS + 1) + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + + @Test + void testIndependentExpiration() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + + // Mark partition 1 at t=0 + cache.markNonExistent(1L); + + // Mark partition 2 at t=5min + clock.advanceTime(TTL_MS / 2, TimeUnit.MILLISECONDS); + cache.markNonExistent(2L); + + // At t=TTL+1: partition 1 should expire, partition 2 should still be alive + clock.advanceTime(TTL_MS / 2 + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(1L)).isFalse(); + assertThat(cache.isKnownNonExistent(2L)).isTrue(); // refreshes access time to TTL_MS+1 + + // Partition 2's access time was refreshed to TTL_MS+1 above. + // To expire it, advance past TTL from that refreshed time. + clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(2L)).isFalse(); + } +} From 0bf5964c4b2c8297561cb21e7ea8cd87ff8d39d7 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Wed, 24 Jun 2026 09:31:04 +0800 Subject: [PATCH 2/4] address codex's comments --- .../apache/fluss/server/RpcServiceBase.java | 19 +++- .../metadata/PartitionNegativeCache.java | 90 ++++++++----------- .../PartitionNegativeCacheITCase.java | 12 +-- .../metadata/PartitionNegativeCacheTest.java | 41 ++++++--- 4 files changed, 88 insertions(+), 74 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index e71e8cb702..d120de4604 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -652,8 +652,12 @@ protected MetadataResponse processMetadataRequest( if (partitionIdAndPaths.containsKey(partitionId)) { partitionPaths.add(partitionIdAndPaths.get(partitionId)); } else { - // Mark this partition ID in the negative cache to avoid future ZK queries - partitionNegativeCache.markNonExistent(partitionId); + // Only cache when the authoritative partition assignment is also gone. A miss + // from the scoped table-path lookup may simply mean that the request omitted + // the owning table or the session is not authorized for it. + if (isPartitionAssignmentMissingFromZk(partitionId)) { + partitionNegativeCache.markNonExistent(partitionId); + } throw new PartitionNotExistException( String.format( "The partition id '%d' does not exist or you don't have permission to access it.", @@ -694,4 +698,15 @@ protected MetadataResponse processMetadataRequest( return buildMetadataResponse( coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata); } + + private boolean isPartitionAssignmentMissingFromZk(long partitionId) { + try { + return !zkClient.getPartitionAssignment(partitionId).isPresent(); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to get partition assignment for partition %d.", partitionId), + e); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java index 7f4a63c489..e0d1212fd8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -18,14 +18,15 @@ package org.apache.fluss.server.metadata; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.guava32.com.google.common.base.Ticker; +import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache; +import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; import javax.annotation.concurrent.ThreadSafe; import java.time.Duration; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; /** * A thread-safe negative cache for partition IDs that are known to not exist in ZooKeeper. @@ -44,67 +45,62 @@ public class PartitionNegativeCache { /** Default TTL for negative cache entries: 10 minutes of no access. */ private static final Duration DEFAULT_TTL = Duration.ofMinutes(10); - /** - * Cleanup interval: only run eviction check when at least this many milliseconds have passed - * since the last cleanup. - */ - private static final long CLEANUP_INTERVAL_MS = 60_000; + /** Default maximum number of negative cache entries. */ + private static final long DEFAULT_MAXIMUM_SIZE = 100_000L; - private final ConcurrentHashMap cache; - private final long ttlMs; - private final AtomicLong lastCleanupTime; - private final Clock clock; + private final Cache cache; public PartitionNegativeCache() { - this(DEFAULT_TTL); + this(DEFAULT_TTL, DEFAULT_MAXIMUM_SIZE); } - public PartitionNegativeCache(Duration ttl) { - this(ttl, SystemClock.getInstance()); + public PartitionNegativeCache(Duration ttl, long maximumSize) { + this(ttl, maximumSize, SystemClock.getInstance()); } @VisibleForTesting - PartitionNegativeCache(Duration ttl, Clock clock) { - this.cache = new ConcurrentHashMap<>(); - this.ttlMs = ttl.toMillis(); - this.clock = clock; - this.lastCleanupTime = new AtomicLong(clock.milliseconds()); + PartitionNegativeCache(Duration ttl, long maximumSize, Clock clock) { + if (ttl.isZero() || ttl.isNegative()) { + throw new IllegalArgumentException("TTL must be positive."); + } + if (maximumSize <= 0) { + throw new IllegalArgumentException("Maximum size must be positive."); + } + this.cache = + CacheBuilder.newBuilder() + .maximumSize(maximumSize) + .expireAfterAccess(ttl) + .ticker( + new Ticker() { + @Override + public long read() { + return clock.nanoseconds(); + } + }) + .build(); } /** * Checks if the given partition ID is known to not exist. * - *

If the entry exists and is not expired, its access time is refreshed and this method + *

If the entry exists and is not expired, Guava refreshes its access time and this method * returns {@code true}. If the entry is expired or doesn't exist, returns {@code false}. * * @param partitionId the partition ID to check * @return {@code true} if the partition is known to not exist, {@code false} otherwise */ public boolean isKnownNonExistent(long partitionId) { - Long lastAccessTime = cache.get(partitionId); - if (lastAccessTime == null) { - return false; - } - long now = clock.milliseconds(); - if (now - lastAccessTime > ttlMs) { - // Entry expired, remove it - cache.remove(partitionId, lastAccessTime); - return false; - } - // Refresh access time - cache.put(partitionId, now); - return true; + return cache.getIfPresent(partitionId) != null; } /** * Marks the given partition ID as known to not exist. The entry will remain in the cache until - * it hasn't been accessed for the configured TTL duration. + * it hasn't been accessed for the configured TTL duration or the bounded cache evicts it. * * @param partitionId the partition ID to mark as non-existent */ public void markNonExistent(long partitionId) { - cache.put(partitionId, clock.milliseconds()); - maybeCleanup(); + cache.put(partitionId, Boolean.TRUE); } /** @@ -112,30 +108,14 @@ public void markNonExistent(long partitionId) { * haven't been cleaned up yet. */ @VisibleForTesting - public int size() { + public long size() { + cache.cleanUp(); return cache.size(); } /** Removes all entries from the cache. */ @VisibleForTesting public void clear() { - cache.clear(); - } - - /** - * Performs lazy cleanup of expired entries. Only runs if enough time has passed since the last - * cleanup to avoid excessive iteration. - */ - private void maybeCleanup() { - long now = clock.milliseconds(); - long lastCleanup = lastCleanupTime.get(); - if (now - lastCleanup < CLEANUP_INTERVAL_MS) { - return; - } - // CAS to ensure only one thread does the cleanup - if (!lastCleanupTime.compareAndSet(lastCleanup, now)) { - return; - } - cache.entrySet().removeIf(entry -> now - entry.getValue() > ttlMs); + cache.invalidateAll(); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java index 674ba4c56f..b8167da9bc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java @@ -79,6 +79,12 @@ void testNegativeCacheForDeletedPartition() throws Exception { new PartitionSpec(Collections.singletonMap("b", partitionName)), false); + PartitionNegativeCache negativeCache = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getPartitionNegativeCache(); + // Drop the partition. coordinatorGateway .dropPartition( @@ -101,15 +107,11 @@ void testNegativeCacheForDeletedPartition() throws Exception { assertThatThrownBy(() -> coordinatorGateway.metadata(request).get()) .cause() .isInstanceOf(PartitionNotExistException.class); + assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); }); // At this point, the partition ID should be in the negative cache. // Verify the negative cache is populated on the coordinator. - PartitionNegativeCache negativeCache = - FLUSS_CLUSTER_EXTENSION - .getCoordinatorServer() - .getCoordinatorService() - .getPartitionNegativeCache(); assertThat(negativeCache.isKnownNonExistent(partitionId)).isTrue(); // Second request should also fail (served from negative cache, no ZK query). diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java index 9c4f4e134f..7aeec015ce 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java @@ -31,11 +31,12 @@ class PartitionNegativeCacheTest { private static final Duration TTL = Duration.ofMinutes(10); private static final long TTL_MS = TTL.toMillis(); + private static final long MAXIMUM_SIZE = 100L; @Test void testMarkAndQueryNonExistent() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); // Initially, partition is not known as non-existent assertThat(cache.isKnownNonExistent(100L)).isFalse(); @@ -51,7 +52,7 @@ void testMarkAndQueryNonExistent() { @Test void testUnknownPartitionReturnsFalse() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); // Never marked partition should return false assertThat(cache.isKnownNonExistent(999L)).isFalse(); @@ -62,7 +63,7 @@ void testUnknownPartitionReturnsFalse() { @Test void testMultiplePartitions() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(1L); cache.markNonExistent(2L); @@ -78,7 +79,7 @@ void testMultiplePartitions() { @Test void testTtlExpiration() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(100L); assertThat(cache.isKnownNonExistent(100L)).isTrue(); @@ -92,21 +93,25 @@ void testTtlExpiration() { } @Test - void testNotExpiredAtExactBoundary() { + void testExpiresAtTtlBoundary() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(100L); - // At exactly TTL_MS, condition is "now - lastAccess > ttl" (strict >), so not expired - clock.advanceTime(TTL_MS, TimeUnit.MILLISECONDS); + clock.advanceTime(TTL_MS - 1, TimeUnit.MILLISECONDS); assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + // Guava expireAfterAccess expires the entry once the TTL boundary is reached. + cache.markNonExistent(100L); + clock.advanceTime(TTL_MS, TimeUnit.MILLISECONDS); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); } @Test void testAccessTimeRefreshKeepsEntryAlive() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(100L); @@ -131,7 +136,7 @@ void testAccessTimeRefreshKeepsEntryAlive() { @Test void testClear() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(1L); cache.markNonExistent(2L); @@ -148,7 +153,7 @@ void testClear() { @Test void testRemarkAfterExpiration() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); cache.markNonExistent(100L); assertThat(cache.isKnownNonExistent(100L)).isTrue(); @@ -169,7 +174,7 @@ void testRemarkAfterExpiration() { @Test void testIndependentExpiration() { ManualClock clock = new ManualClock(); - PartitionNegativeCache cache = new PartitionNegativeCache(TTL, clock); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); // Mark partition 1 at t=0 cache.markNonExistent(1L); @@ -188,4 +193,16 @@ void testIndependentExpiration() { clock.advanceTime(TTL_MS + 1, TimeUnit.MILLISECONDS); assertThat(cache.isKnownNonExistent(2L)).isFalse(); } + + @Test + void testMaximumSizeBoundsCache() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, 2L, clock); + + cache.markNonExistent(1L); + cache.markNonExistent(2L); + cache.markNonExistent(3L); + + assertThat(cache.size()).isLessThanOrEqualTo(2); + } } From 04e592eb256253fd84b412830fa45dc70b7a788c Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Wed, 24 Jun 2026 09:56:40 +0800 Subject: [PATCH 3/4] address hongshun's comments --- .../org/apache/fluss/server/RpcServiceBase.java | 15 +++++++-------- .../server/metadata/PartitionNegativeCache.java | 2 +- .../metadata/PartitionNegativeCacheITCase.java | 13 +++++++++++++ 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index d120de4604..1cb0750fba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -623,19 +623,18 @@ protected MetadataResponse processMetadataRequest( long[] partitionIds = request.getPartitionsIds(); List partitionIdsNotExistsInCache = new ArrayList<>(); for (long partitionId : partitionIds) { - // Fast-path: throw immediately for partition IDs known to not exist, - // avoiding ZK queries while preserving the original exception semantics. - if (partitionNegativeCache.isKnownNonExistent(partitionId)) { + Optional physicalTablePath = + metadataProvider.getPhysicalTablePathFromCache(partitionId); + if (physicalTablePath.isPresent()) { + partitionPaths.add(physicalTablePath.get()); + } else if (partitionNegativeCache.isKnownNonExistent(partitionId)) { + // Fast-path only after the positive metadata cache misses. A stale negative-cache + // entry must not hide a partition that has already been synced into metadata cache. throw new PartitionNotExistException( String.format( "The partition id '%d' does not exist or you don't have" + " permission to access it.", partitionId)); - } - Optional physicalTablePath = - metadataProvider.getPhysicalTablePathFromCache(partitionId); - if (physicalTablePath.isPresent()) { - partitionPaths.add(physicalTablePath.get()); } else { partitionIdsNotExistsInCache.add(partitionId); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java index e0d1212fd8..c4ddd260e7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -46,7 +46,7 @@ public class PartitionNegativeCache { private static final Duration DEFAULT_TTL = Duration.ofMinutes(10); /** Default maximum number of negative cache entries. */ - private static final long DEFAULT_MAXIMUM_SIZE = 100_000L; + private static final long DEFAULT_MAXIMUM_SIZE = 10_000L; private final Cache cache; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java index b8167da9bc..498db45716 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java @@ -85,6 +85,19 @@ void testNegativeCacheForDeletedPartition() throws Exception { .getCoordinatorService() .getPartitionNegativeCache(); + // A stale negative-cache entry must not hide a partition that exists in metadata cache. + MetadataRequest existingPartitionRequest = new MetadataRequest(); + existingPartitionRequest.addPartitionsId(partitionId); + existingPartitionRequest + .addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + negativeCache.markNonExistent(partitionId); + retry( + Duration.ofMinutes(1), + () -> coordinatorGateway.metadata(existingPartitionRequest).get()); + negativeCache.clear(); + // Drop the partition. coordinatorGateway .dropPartition( From 9e7bc05d7d8425d2662f220d0bcdf4d3fb994d32 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 29 Jun 2026 20:59:49 +0800 Subject: [PATCH 4/4] address hongshun's comments2 --- .../java/org/apache/fluss/server/RpcServiceBase.java | 2 ++ .../server/metadata/PartitionNegativeCache.java | 9 +++++++++ .../metadata/PartitionNegativeCacheITCase.java | 2 +- .../server/metadata/PartitionNegativeCacheTest.java | 12 ++++++++++++ 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index 1cb0750fba..80924c729d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -626,6 +626,7 @@ protected MetadataResponse processMetadataRequest( Optional physicalTablePath = metadataProvider.getPhysicalTablePathFromCache(partitionId); if (physicalTablePath.isPresent()) { + partitionNegativeCache.markExistent(partitionId); partitionPaths.add(physicalTablePath.get()); } else if (partitionNegativeCache.isKnownNonExistent(partitionId)) { // Fast-path only after the positive metadata cache misses. A stale negative-cache @@ -649,6 +650,7 @@ protected MetadataResponse processMetadataRequest( } for (long partitionId : partitionIdsNotExistsInCache) { if (partitionIdAndPaths.containsKey(partitionId)) { + partitionNegativeCache.markExistent(partitionId); partitionPaths.add(partitionIdAndPaths.get(partitionId)); } else { // Only cache when the authoritative partition assignment is also gone. A miss diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java index c4ddd260e7..dbd38dc626 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/PartitionNegativeCache.java @@ -103,6 +103,15 @@ public void markNonExistent(long partitionId) { cache.put(partitionId, Boolean.TRUE); } + /** + * Marks the given partition ID as existing by removing any stale negative-cache entry. + * + * @param partitionId the partition ID to mark as existent + */ + public void markExistent(long partitionId) { + cache.invalidate(partitionId); + } + /** * Returns the current number of entries in the cache. Includes potentially expired entries that * haven't been cleaned up yet. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java index 498db45716..9aa2d379b7 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheITCase.java @@ -96,7 +96,7 @@ void testNegativeCacheForDeletedPartition() throws Exception { retry( Duration.ofMinutes(1), () -> coordinatorGateway.metadata(existingPartitionRequest).get()); - negativeCache.clear(); + assertThat(negativeCache.isKnownNonExistent(partitionId)).isFalse(); // Drop the partition. coordinatorGateway diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java index 7aeec015ce..dacee8cac0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/PartitionNegativeCacheTest.java @@ -150,6 +150,18 @@ void testClear() { assertThat(cache.isKnownNonExistent(3L)).isFalse(); } + @Test + void testMarkExistentRemovesStaleNegativeEntry() { + ManualClock clock = new ManualClock(); + PartitionNegativeCache cache = new PartitionNegativeCache(TTL, MAXIMUM_SIZE, clock); + + cache.markNonExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isTrue(); + + cache.markExistent(100L); + assertThat(cache.isKnownNonExistent(100L)).isFalse(); + } + @Test void testRemarkAfterExpiration() { ManualClock clock = new ManualClock();