Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class MetricNames {
"delayedFetchFromFollowerExpiresPerSecond";
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
"delayedFetchFromClientExpiresPerSecond";
/** Timeouts in follower replica fetcher threads waiting for leader fetchLog responses. */
public static final String REPLICA_FETCH_TIMEOUT_RATE = "replicaFetchTimeoutPerSecond";

public static final String SERVER_LOGICAL_STORAGE_LOG_SIZE = "logSize";
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
private final Counter delayedWriteExpireCount;
private final Counter delayedFetchFromFollowerExpireCount;
private final Counter delayedFetchFromClientExpireCount;
private final Counter replicaFetchTimeoutCount;

// aggregated metrics
private final Counter messagesIn;
Expand Down Expand Up @@ -98,6 +99,8 @@ public TabletServerMetricGroup(
meter(
MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE,
new MeterView(delayedFetchFromClientExpireCount));
replicaFetchTimeoutCount = new ThreadSafeSimpleCounter();
meter(MetricNames.REPLICA_FETCH_TIMEOUT_RATE, new MeterView(replicaFetchTimeoutCount));

messagesIn = new ThreadSafeSimpleCounter();
meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn));
Expand Down Expand Up @@ -191,6 +194,10 @@ public Counter delayedFetchFromClientExpireCount() {
return delayedFetchFromClientExpireCount;
}

public Counter replicaFetchTimeoutCount() {
return replicaFetchTimeoutCount;
}

public Counter messageIn() {
return messagesIn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -242,6 +243,9 @@ private void processFetchLogRequest(FetchLogContext fetchLogContext) {
responseData = fetchFuture.get(timeoutSeconds, TimeUnit.SECONDS);
} catch (Throwable t) {
if (isRunning()) {
if (t instanceof TimeoutException) {
serverMetricGroup.replicaFetchTimeoutCount().inc();
}
LOG.warn(
"Error in response from leader server {} for fetch log request from table ids: {}",
leader.leaderServerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,16 @@ void testFetchTimeoutReleasesPooledByteBuf() throws Exception {
// then the delayed responses arrive after 3s
timeoutFetcher.start();

retry(
Duration.ofSeconds(10),
() ->
assertThat(
followerRM
.getServerMetricGroup()
.replicaFetchTimeoutCount()
.getCount())
.isGreaterThan(0));

// Wait until at least one delayed response has been allocated
retry(
Duration.ofSeconds(10),
Expand Down