Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ private void becomeLeader(final long epoch) {
configManager.getLoadManager().startTopologyService();
}

// Start the remaining leader services in parallel and wait for all of them to finish.
// Start the remaining leader services in parallel and wait for all of them to finish. Each
// startup swallows and logs its own failure (see startInParallelIfEpochCurrent), so a single
// misbehaving service cannot abort the whole transition and leave the node stuck warming up.
final CompletableFuture<?>[] startups =
leaderServiceStartups().stream()
.map(startup -> startInParallelIfEpochCurrent(epoch, startup))
Expand Down Expand Up @@ -364,31 +366,47 @@ private void becomeLeader(final long epoch) {
}

/** The leader services that can be started independently, in parallel, within one epoch. */
private List<Runnable> leaderServiceStartups() {
private List<LeaderServiceStartup> leaderServiceStartups() {
return Arrays.asList(
() -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(),
() -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService(),
() -> configManager.getPartitionManager().startRegionCleaner(),
new LeaderServiceStartup(
"ProcedureInfo.upgrade",
() -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()),
new LeaderServiceStartup(
"RetryFailedTasksService",
() -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService()),
new LeaderServiceStartup(
"RegionCleaner", () -> configManager.getPartitionManager().startRegionCleaner()),
// Add metrics after leader ready.
() -> configManager.addMetrics(),
new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()),
// Activate leader related service for config pipe.
() -> PipeConfigNodeAgent.runtime().notifyLeaderReady(),
new LeaderServiceStartup(
"PipeConfigNodeRuntime", () -> PipeConfigNodeAgent.runtime().notifyLeaderReady()),
// CQ recovery may be time-consuming, so it is just one more parallel startup.
() -> configManager.getCQManager().startCQScheduler(),
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(),
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(),
() ->
configManager
.getPipeManager()
.getPipeRuntimeCoordinator()
.onConfigRegionGroupLeaderChanged(),
() ->
configManager
.getSubscriptionManager()
.getSubscriptionCoordinator()
.startSubscriptionMetaSync(),
new LeaderServiceStartup(
"CQScheduler", () -> configManager.getCQManager().startCQScheduler()),
new LeaderServiceStartup(
"PipeMetaSync",
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()),
new LeaderServiceStartup(
"PipeHeartbeat",
() -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()),
new LeaderServiceStartup(
"PipeOnLeaderChanged",
() ->
configManager
.getPipeManager()
.getPipeRuntimeCoordinator()
.onConfigRegionGroupLeaderChanged()),
new LeaderServiceStartup(
"SubscriptionMetaSync",
() ->
configManager
.getSubscriptionManager()
.getSubscriptionCoordinator()
.startSubscriptionMetaSync()),
// To adapt old version, we check cluster ID after state machine has been fully recovered.
() -> configManager.getClusterManager().checkClusterId());
new LeaderServiceStartup(
"CheckClusterId", () -> configManager.getClusterManager().checkClusterId()));
}

/** Tear down every leader service. Runs on the single transition thread. */
Expand Down Expand Up @@ -422,16 +440,35 @@ private void stopLeaderServices() {
}

/**
* Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it (and recording success)
* if the epoch has gone stale by the time it is picked up. Returns a future that always completes
* normally so {@link CompletableFuture#allOf} acts as a clean join barrier.
* Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it if the epoch has gone
* stale by the time it is picked up. Any {@link RuntimeException} thrown by the startup is caught
* and logged here instead of being allowed to escape: this keeps one misbehaving service from
* failing the {@link CompletableFuture#allOf} join barrier in {@link #becomeLeader}, which would
* otherwise abort the whole transition before {@link #markLeaderServicesReadyIfEpochCurrent} runs
* and leave the node stuck returning {@code CONFIG_NODE_LEADER_WARMING_UP} forever. The returned
* future therefore always completes normally, so {@code allOf} acts as a clean join barrier.
*/
private CompletableFuture<Void> startInParallelIfEpochCurrent(
final long epoch, final Runnable startup) {
final long epoch, final LeaderServiceStartup startup) {
return CompletableFuture.runAsync(
() -> {
if (isCurrentLeaderServicesEpoch(epoch)) {
if (!isCurrentLeaderServicesEpoch(epoch)) {
return;
}
try {
startup.run();
} catch (final Exception e) {
// Swallow and log so a single failed startup cannot stall leader warm-up. The service
// stays unstarted, but the node still finishes warming up and begins serving; the
// failure is observable through this error log.
LOGGER.error(
"Current ConfigNode(nodeId: {}, ip: {}) failed to start leader service [{}], the"
+ " node will still finish warming up; this service stays unavailable until the"
+ " next leadership transition.",
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
currentNodeTEndPoint,
startup.name(),
e);
}
},
leaderServicesStartupPool);
Expand Down Expand Up @@ -687,4 +724,27 @@ private static long parseEndIndex(String filename) {
}
return Long.parseLong(endIndexString);
}

/**
* A single leader service startup paired with a human-readable name, so a failure can be logged
* against the service that produced it (see {@link #startInParallelIfEpochCurrent}).
*/
private static class LeaderServiceStartup {

private final String name;
private final Runnable startup;

private LeaderServiceStartup(final String name, final Runnable startup) {
this.name = name;
this.startup = startup;
}

private String name() {
return name;
}

private void run() {
startup.run();
}
}
}
Loading