From eb8e1089b57907a9bb9074061ddef7580e345617 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Wed, 10 Jun 2026 14:58:39 +0800 Subject: [PATCH] [server] add replica fetch timeout metric --- .../java/org/apache/fluss/metrics/MetricNames.java | 2 ++ .../server/metrics/group/TabletServerMetricGroup.java | 7 +++++++ .../server/replica/fetcher/ReplicaFetcherThread.java | 4 ++++ .../replica/fetcher/ReplicaFetcherThreadTest.java | 10 ++++++++++ 4 files changed, 23 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index fe34deba95..e23a212a27 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -88,6 +88,8 @@ public class MetricNames { "delayedFetchFromFollowerExpiresPerSecond"; public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE = "delayedFetchFromClientExpiresPerSecond"; + /** Timeouts in follower replica fetcher threads waiting for leader fetchLog responses. */ + public static final String REPLICA_FETCH_TIMEOUT_RATE = "replicaFetchTimeoutPerSecond"; public static final String SERVER_LOGICAL_STORAGE_LOG_SIZE = "logSize"; public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize"; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index 22215bc6de..33206e98a3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -54,6 +54,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private final Counter delayedWriteExpireCount; private final Counter delayedFetchFromFollowerExpireCount; private final Counter delayedFetchFromClientExpireCount; + private final Counter replicaFetchTimeoutCount; // aggregated metrics private final Counter messagesIn; @@ -98,6 +99,8 @@ public TabletServerMetricGroup( meter( MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE, new MeterView(delayedFetchFromClientExpireCount)); + replicaFetchTimeoutCount = new ThreadSafeSimpleCounter(); + meter(MetricNames.REPLICA_FETCH_TIMEOUT_RATE, new MeterView(replicaFetchTimeoutCount)); messagesIn = new ThreadSafeSimpleCounter(); meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn)); @@ -191,6 +194,10 @@ public Counter delayedFetchFromClientExpireCount() { return delayedFetchFromClientExpireCount; } + public Counter replicaFetchTimeoutCount() { + return replicaFetchTimeoutCount; + } + public Counter messageIn() { return messagesIn; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java index cc430a19f9..105785ece3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -242,6 +243,9 @@ private void processFetchLogRequest(FetchLogContext fetchLogContext) { responseData = fetchFuture.get(timeoutSeconds, TimeUnit.SECONDS); } catch (Throwable t) { if (isRunning()) { + if (t instanceof TimeoutException) { + serverMetricGroup.replicaFetchTimeoutCount().inc(); + } LOG.warn( "Error in response from leader server {} for fetch log request from table ids: {}", leader.leaderServerId(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index 5dc17acf14..dd386b7f0d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -439,6 +439,16 @@ void testFetchTimeoutReleasesPooledByteBuf() throws Exception { // then the delayed responses arrive after 3s timeoutFetcher.start(); + retry( + Duration.ofSeconds(10), + () -> + assertThat( + followerRM + .getServerMetricGroup() + .replicaFetchTimeoutCount() + .getCount()) + .isGreaterThan(0)); + // Wait until at least one delayed response has been allocated retry( Duration.ofSeconds(10),