diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 251db87dbdcd8..b3029e6602808 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -336,7 +336,9 @@ private void becomeLeader(final long epoch) { configManager.getLoadManager().startTopologyService(); } - // Start the remaining leader services in parallel and wait for all of them to finish. + // Start the remaining leader services in parallel and wait for all of them to finish. Each + // startup swallows and logs its own failure (see startInParallelIfEpochCurrent), so a single + // misbehaving service cannot abort the whole transition and leave the node stuck warming up. final CompletableFuture[] startups = leaderServiceStartups().stream() .map(startup -> startInParallelIfEpochCurrent(epoch, startup)) @@ -364,31 +366,47 @@ private void becomeLeader(final long epoch) { } /** The leader services that can be started independently, in parallel, within one epoch. */ - private List leaderServiceStartups() { + private List leaderServiceStartups() { return Arrays.asList( - () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(), - () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService(), - () -> configManager.getPartitionManager().startRegionCleaner(), + new LeaderServiceStartup( + "ProcedureInfo.upgrade", + () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()), + new LeaderServiceStartup( + "RetryFailedTasksService", + () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService()), + new LeaderServiceStartup( + "RegionCleaner", () -> configManager.getPartitionManager().startRegionCleaner()), // Add metrics after leader ready. - () -> configManager.addMetrics(), + new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()), // Activate leader related service for config pipe. - () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(), + new LeaderServiceStartup( + "PipeConfigNodeRuntime", () -> PipeConfigNodeAgent.runtime().notifyLeaderReady()), // CQ recovery may be time-consuming, so it is just one more parallel startup. - () -> configManager.getCQManager().startCQScheduler(), - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(), - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(), - () -> - configManager - .getPipeManager() - .getPipeRuntimeCoordinator() - .onConfigRegionGroupLeaderChanged(), - () -> - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .startSubscriptionMetaSync(), + new LeaderServiceStartup( + "CQScheduler", () -> configManager.getCQManager().startCQScheduler()), + new LeaderServiceStartup( + "PipeMetaSync", + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()), + new LeaderServiceStartup( + "PipeHeartbeat", + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()), + new LeaderServiceStartup( + "PipeOnLeaderChanged", + () -> + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .onConfigRegionGroupLeaderChanged()), + new LeaderServiceStartup( + "SubscriptionMetaSync", + () -> + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .startSubscriptionMetaSync()), // To adapt old version, we check cluster ID after state machine has been fully recovered. - () -> configManager.getClusterManager().checkClusterId()); + new LeaderServiceStartup( + "CheckClusterId", () -> configManager.getClusterManager().checkClusterId())); } /** Tear down every leader service. Runs on the single transition thread. */ @@ -422,16 +440,35 @@ private void stopLeaderServices() { } /** - * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it (and recording success) - * if the epoch has gone stale by the time it is picked up. Returns a future that always completes - * normally so {@link CompletableFuture#allOf} acts as a clean join barrier. + * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it if the epoch has gone + * stale by the time it is picked up. Any {@link RuntimeException} thrown by the startup is caught + * and logged here instead of being allowed to escape: this keeps one misbehaving service from + * failing the {@link CompletableFuture#allOf} join barrier in {@link #becomeLeader}, which would + * otherwise abort the whole transition before {@link #markLeaderServicesReadyIfEpochCurrent} runs + * and leave the node stuck returning {@code CONFIG_NODE_LEADER_WARMING_UP} forever. The returned + * future therefore always completes normally, so {@code allOf} acts as a clean join barrier. */ private CompletableFuture startInParallelIfEpochCurrent( - final long epoch, final Runnable startup) { + final long epoch, final LeaderServiceStartup startup) { return CompletableFuture.runAsync( () -> { - if (isCurrentLeaderServicesEpoch(epoch)) { + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + try { startup.run(); + } catch (final Exception e) { + // Swallow and log so a single failed startup cannot stall leader warm-up. The service + // stays unstarted, but the node still finishes warming up and begins serving; the + // failure is observable through this error log. + LOGGER.error( + "Current ConfigNode(nodeId: {}, ip: {}) failed to start leader service [{}], the" + + " node will still finish warming up; this service stays unavailable until the" + + " next leadership transition.", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint, + startup.name(), + e); } }, leaderServicesStartupPool); @@ -687,4 +724,27 @@ private static long parseEndIndex(String filename) { } return Long.parseLong(endIndexString); } + + /** + * A single leader service startup paired with a human-readable name, so a failure can be logged + * against the service that produced it (see {@link #startInParallelIfEpochCurrent}). + */ + private static class LeaderServiceStartup { + + private final String name; + private final Runnable startup; + + private LeaderServiceStartup(final String name, final Runnable startup) { + this.name = name; + this.startup = startup; + } + + private String name() { + return name; + } + + private void run() { + startup.run(); + } + } }