diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index d22e8f5520776..a8e2ef8d12e63 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -209,6 +209,7 @@ public enum TSStatusCode { CAN_NOT_CONNECT_AINODE(1011), NO_AVAILABLE_REPLICA(1012), NO_AVAILABLE_AINODE(1013), + CONFIG_NODE_LEADER_WARMING_UP(1014), // Sync, Load TsFile LOAD_FILE_ERROR(1100), diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py b/iotdb-core/ainode/iotdb/ainode/core/constant.py index 4a2ee543d1f8d..1eb79085bcfa0 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/constant.py +++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py @@ -80,6 +80,7 @@ class TSStatusCode(Enum): SUCCESS_STATUS = 200 REDIRECTION_RECOMMEND = 400 + CONFIG_NODE_LEADER_WARMING_UP = 1014 MODEL_EXISTED_ERROR = 1503 MODEL_NOT_EXIST_ERROR = 1504 CREATE_MODEL_ERROR = 1505 diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py index ea6362ef080af..81bb81d50bb34 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py @@ -66,6 +66,7 @@ def __init__(self, config_leader: TEndPoint): "Fail to connect to any config node. Please check status of ConfigNodes" ) self._RETRY_NUM = 10 + self._STARTUP_RETRY_NUM = 60 self._RETRY_INTERVAL_IN_S = 1 self._try_to_connect() @@ -163,6 +164,12 @@ def _update_config_node_leader(self, status: TSStatus) -> bool: else: self._config_leader = None return True + if status.code == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code(): + logger.info( + "ConfigNode leader is warming up before serving AINode, will wait and retry. Reason: {}", + status.message, + ) + return True return False def node_register( @@ -177,7 +184,7 @@ def node_register( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.registerAINode(req) if not self._update_config_node_leader(resp.status): @@ -208,7 +215,7 @@ def node_restart( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.restartAINode(req) if not self._update_config_node_leader(resp.status): diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java index 9d8e0b6e8474f..03e6c0bfe3111 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java @@ -50,7 +50,7 @@ public void onComplete(TAIHeartbeatResp aiHeartbeatResp) { public void onError(Exception e) { if (ThriftClient.isConnectionBroken(e)) { loadManager.forceUpdateNodeCache( - NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + NodeType.AINode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index e7a31b1dc73eb..52053cdab4564 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -36,6 +37,7 @@ import org.apache.thrift.async.AsyncMethodCallback; +import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @@ -82,54 +84,86 @@ public DataNodeHeartbeatHandler( @Override public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { - // Update NodeCache + cacheNodeHeartbeatSample(heartbeatResp); + cacheRegionGroupHeartbeatSamples(heartbeatResp); + cacheUsageSamples(heartbeatResp); + cachePipeHeartbeat(heartbeatResp); + cacheConfirmedConfigNodeEndPoints(heartbeatResp); + cacheRegionSizeSamples(heartbeatResp); + } + + private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) { loadManager .getLoadCache() .cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp)); + } + private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp heartbeatResp) { RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus()); - heartbeatResp - .getJudgedLeaders() - .forEach( - (regionGroupId, isLeader) -> { - - // Do not allow regions to inherit the Removing state from datanode - RegionStatus nextRegionStatus = regionStatus; - if (nextRegionStatus == RegionStatus.Removing) { - nextRegionStatus = - loadManager - .getLoadCache() - .getRegionCacheLastSampleStatus(regionGroupId, nodeId); - } - - // Update RegionGroupCache - loadManager - .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); - - if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) - && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) - || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)) - && Boolean.TRUE.equals(isLeader)) { - // Update ConsensusGroupCache when necessary - loadManager - .getLoadCache() - .cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); - } - }); + Map judgedLeaders = + heartbeatResp.isSetJudgedLeaders() + ? heartbeatResp.getJudgedLeaders() + : Collections.emptyMap(); + judgedLeaders.forEach( + (regionGroupId, isLeader) -> { + cacheRegionHeartbeatSample(heartbeatResp, regionStatus, regionGroupId); + cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader); + }); + } + + private void cacheRegionHeartbeatSample( + TDataNodeHeartbeatResp heartbeatResp, + RegionStatus dataNodeRegionStatus, + TConsensusGroupId regionGroupId) { + loadManager + .getLoadCache() + .cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)), + false); + } + + private RegionStatus getRegionHeartbeatStatus( + TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) { + return dataNodeRegionStatus == RegionStatus.Removing + ? loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId) + : dataNodeRegionStatus; + } + + private void cacheConsensusSampleIfNeeded( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId, Boolean isLeader) { + if (!Boolean.TRUE.equals(isLeader) + || !shouldCacheConsensusSample(regionGroupId) + || !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) { + return; + } + + loadManager + .getLoadCache() + .cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); + } + + private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) { + return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); + } + + private boolean hasConsensusLogicalTimestamp( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) { + return heartbeatResp.isSetConsensusLogicalTimeMap() + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); + } + private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap()); @@ -141,6 +175,9 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDisk() != null) { regionDisk.putAll(heartbeatResp.getRegionDisk()); } + } + + private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getPipeMetaList() != null) { pipeRuntimeCoordinator.parseHeartbeat( nodeId, @@ -149,12 +186,18 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { heartbeatResp.getPipeRemainingEventCountList(), heartbeatResp.getPipeRemainingTimeList()); } + } + + private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) { loadManager .getLoadCache() .updateConfirmedConfigNodeEndPoints( nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints()); } + } + + private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetRegionDisk()) { loadManager.getLoadCache().updateRegionSizeMap(nodeId, heartbeatResp.getRegionDisk()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index fe687d17556f7..251db87dbdcd8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -60,22 +60,53 @@ import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Comparator; -import java.util.Optional; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** {@link IStateMachine} for ConfigRegion. */ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.EventApi { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class); - private static final ExecutorService threadPool = + /** + * Serializes leadership transitions (become-leader / resign-leader). A single worker thread is + * the barrier that keeps epochs strictly serial: the orchestration of one transition runs to + * completion before the next one begins. + */ + private static final ExecutorService leaderServicesTransitionExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor( + ThreadName.CONFIG_NODE_LEADER_SERVICES_TRANSITION.getName()); + + /** Runs the individual leader services in parallel within a single become-leader epoch. */ + private static final ExecutorService leaderServicesStartupPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName()); + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final long WAIT_LOAD_READY_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS() / 2; + private static final long WAIT_LOAD_READY_INTERVAL_MS = 100; private final ConfigPlanExecutor executor; + + /** + * Whether the leader services of the {@link #leaderServicesEpoch current epoch} have finished + * starting up. Read by {@link ConsensusManager#confirmLeader()} to gate external serving. + */ + private final AtomicBoolean leaderServicesReady; + + /** + * Monotonically increasing leadership generation. Every become-leader / resign-leader transition + * bumps it, so any work submitted for an older epoch can detect it is stale and bail out. + */ + private final AtomicLong leaderServicesEpoch; + + /** Guards {@link #leaderServicesReady} and {@link #leaderServicesEpoch} as a unit. */ + private final Object leaderServicesLock; + private ConfigManager configManager; /** Variables for {@link ConfigNode} Simple Consensus. */ @@ -87,17 +118,20 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final String CURRENT_FILE_DIR = ConsensusManager.getConfigRegionDir() + File.separator + "current"; + private static final String LOG_INPROGRESS_FILE_PREFIX = "log_inprogress_"; + private static final String LOG_FILE_PREFIX = "log_"; private static final String PROGRESS_FILE_PATH = - CURRENT_FILE_DIR + File.separator + "log_inprogress_"; - private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_"; + CURRENT_FILE_DIR + File.separator + LOG_INPROGRESS_FILE_PREFIX; + private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + LOG_FILE_PREFIX; private static final long LOG_FILE_MAX_SIZE = CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax(); private final TEndPoint currentNodeTEndPoint; - private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); - private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; + this.leaderServicesReady = new AtomicBoolean(false); + this.leaderServicesEpoch = new AtomicLong(0); + this.leaderServicesLock = new Object(); this.configManager = configManager; this.currentNodeTEndPoint = new TEndPoint() @@ -115,9 +149,9 @@ public void setConfigManager(ConfigManager configManager) { @Override public TSStatus write(IConsensusRequest request) { - return Optional.ofNullable(request) - .map(o -> write((ConfigPhysicalPlan) request)) - .orElseGet(() -> new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + return request == null + ? new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + : write((ConfigPhysicalPlan) request); } /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ @@ -233,7 +267,7 @@ public void loadSnapshot(final File latestSnapshotRootDir) { public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine - int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); + final int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); if (currentNodeId != newLeaderId) { LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER @@ -241,6 +275,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { currentNodeId, currentNodeTEndPoint, newLeaderId); + resignLeaderAsync(); } } @@ -248,12 +283,117 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { public void notifyNotLeader() { // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine - int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); + final int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + "start cleaning up related services", currentNodeId, currentNodeTEndPoint); + resignLeaderAsync(); + } + + @Override + public void notifyLeaderReady() { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER, + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + // Bump the epoch eagerly so that any in-flight services of an older epoch are invalidated + // immediately, even before the (serialized) become-leader orchestration gets to run. + final long epoch = nextLeaderServicesEpoch(); + leaderServicesTransitionExecutor.submit(() -> becomeLeader(epoch)); + } + + /** + * Submit a resign-leader transition. The epoch is bumped eagerly (on the consensus thread) so + * that stale leader work is invalidated at once, while the teardown itself is serialized behind + * any in-flight transition on {@link #leaderServicesTransitionExecutor}. + */ + private void resignLeaderAsync() { + invalidateLeaderServices(); + leaderServicesTransitionExecutor.submit(this::stopLeaderServices); + } + + /** + * Bring up the leader services for {@code epoch}. Runs on the single transition thread, so it is + * strictly serialized against every other transition. Within the epoch, the load services start + * first (to warm up as early as possible), then the remaining services start in parallel and are + * joined before the epoch is marked ready. + */ + private void becomeLeader(final long epoch) { + if (!isCurrentLeaderServicesEpoch(epoch)) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because the leader epoch is stale", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + + // Always start load services first. ConsensusManager gates external serving until warm-up. + configManager.getLoadManager().startLoadServices(); + if (CONF.isEnableTopologyProbing()) { + configManager.getLoadManager().startTopologyService(); + } + + // Start the remaining leader services in parallel and wait for all of them to finish. + final CompletableFuture[] startups = + leaderServiceStartups().stream() + .map(startup -> startInParallelIfEpochCurrent(epoch, startup)) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(startups).join(); + + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + // The procedure executor may report readiness asynchronously once it has caught up. + configManager + .getProcedureManager() + .startExecutor(() -> markLeaderServicesReadyIfEpochCurrent(epoch)); + markLeaderServicesReadyIfEpochCurrent(epoch); + + final boolean loadReady = waitForLoadReady(epoch); + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + logLoadWarmUpIfNeeded(loadReady); + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + } + + /** The leader services that can be started independently, in parallel, within one epoch. */ + private List leaderServiceStartups() { + return Arrays.asList( + () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade(), + () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService(), + () -> configManager.getPartitionManager().startRegionCleaner(), + // Add metrics after leader ready. + () -> configManager.addMetrics(), + // Activate leader related service for config pipe. + () -> PipeConfigNodeAgent.runtime().notifyLeaderReady(), + // CQ recovery may be time-consuming, so it is just one more parallel startup. + () -> configManager.getCQManager().startCQScheduler(), + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync(), + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat(), + () -> + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .onConfigRegionGroupLeaderChanged(), + () -> + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .startSubscriptionMetaSync(), + // To adapt old version, we check cluster ID after state machine has been fully recovered. + () -> configManager.getClusterManager().checkClusterId()); + } + + /** Tear down every leader service. Runs on the single transition thread. */ + private void stopLeaderServices() { + final int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); // Stop leader scheduling services configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); @@ -281,63 +421,89 @@ public void notifyNotLeader() { currentNodeTEndPoint); } - @Override - public void notifyLeaderReady() { - LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER, - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - currentNodeTEndPoint); + /** + * Run {@code startup} on {@link #leaderServicesStartupPool}, skipping it (and recording success) + * if the epoch has gone stale by the time it is picked up. Returns a future that always completes + * normally so {@link CompletableFuture#allOf} acts as a clean join barrier. + */ + private CompletableFuture startInParallelIfEpochCurrent( + final long epoch, final Runnable startup) { + return CompletableFuture.runAsync( + () -> { + if (isCurrentLeaderServicesEpoch(epoch)) { + startup.run(); + } + }, + leaderServicesStartupPool); + } - // Always start load services first - configManager.getLoadManager().startLoadServices(); + private void markLeaderServicesReadyIfEpochCurrent(final long epoch) { + synchronized (leaderServicesLock) { + if (isCurrentLeaderServicesEpoch(epoch)) { + leaderServicesReady.set(true); + } + } + } - if (CONF.isEnableTopologyProbing()) { - configManager.getLoadManager().startTopologyService(); + private void logLoadWarmUpIfNeeded(final boolean loadReady) { + if (!loadReady) { + LOGGER.info( + "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader services while load" + + " warm-up is still in progress: {}", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint, + configManager.getLoadManager().getLoadReadyReason()); } + } - // Start leader scheduling services - configManager.getProcedureManager().startExecutor(); - threadPool.submit( - () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()); - configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); - configManager.getPartitionManager().startRegionCleaner(); - // Add Metric after leader ready - configManager.addMetrics(); - - // Activate leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderReady(); - - // we do cq recovery async for performance: - // cq recovery may be time-consuming, we use another thread to do it in - // make notifyLeaderChanged not blocked by it - threadPool.submit(() -> configManager.getCQManager().startCQScheduler()); - - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); - threadPool.submit( - () -> - configManager - .getPipeManager() - .getPipeRuntimeCoordinator() - .onConfigRegionGroupLeaderChanged()); + private boolean waitForLoadReady(final long epoch) { + long startTime = System.currentTimeMillis(); + while (isCurrentLeaderServicesEpoch(epoch) + && System.currentTimeMillis() - startTime < WAIT_LOAD_READY_TIMEOUT_MS) { + if (configManager.getLoadManager().isLoadReady()) { + return true; + } + if (!sleepForLoadReady()) { + return false; + } + } + return isCurrentLeaderServicesEpoch(epoch) && configManager.getLoadManager().isLoadReady(); + } - threadPool.submit( - () -> - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .startSubscriptionMetaSync()); + private boolean sleepForLoadReady() { + try { + TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader load warm-up.", e); + return false; + } + } - // To adapt old version, we check cluster ID after state machine has been fully recovered. - // Do check async because sync will be slow and block every other things. - threadPool.submit(() -> configManager.getClusterManager().checkClusterId()); + public boolean areLeaderServicesReady() { + return leaderServicesReady.get(); + } - LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - currentNodeTEndPoint); + /** Open a new leadership generation, invalidating the previous one. */ + private long nextLeaderServicesEpoch() { + synchronized (leaderServicesLock) { + leaderServicesReady.set(false); + return leaderServicesEpoch.incrementAndGet(); + } + } + + /** Invalidate the current leadership generation without opening a serving one. */ + private void invalidateLeaderServices() { + synchronized (leaderServicesLock) { + leaderServicesReady.set(false); + leaderServicesEpoch.incrementAndGet(); + } + } + + private boolean isCurrentLeaderServicesEpoch(final long epoch) { + return leaderServicesEpoch.get() == epoch + && configManager.getConsensusManager().isLeaderReady(); } @Override @@ -413,7 +579,7 @@ private void initStandAloneConfigNode() { dir.mkdirs(); String[] list = new File(CURRENT_FILE_DIR).list(); if (list != null && list.length != 0) { - Arrays.sort(list, new FileComparator()); + Arrays.sort(list, Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex)); for (String logFileName : list) { File logFile = SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName); @@ -497,28 +663,28 @@ private void createLogFile(int startIndex) { } } - static class FileComparator implements Comparator { - - @Override - public int compare(String filename1, String filename2) { - long id1 = parseEndIndex(filename1); - long id2 = parseEndIndex(filename2); - return Long.compare(id1, id2); - } - } - - static long parseEndIndex(String filename) { - if (filename.startsWith("log_inprogress_")) { - Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename); - if (matcher.find()) { - return Long.parseLong(matcher.group()); + private static long parseEndIndex(String filename) { + final String endIndexString; + if (filename.startsWith(LOG_INPROGRESS_FILE_PREFIX)) { + endIndexString = filename.substring(LOG_INPROGRESS_FILE_PREFIX.length()); + } else if (filename.startsWith(LOG_FILE_PREFIX)) { + final int lastSeparatorIndex = filename.lastIndexOf('_'); + if (lastSeparatorIndex < LOG_FILE_PREFIX.length()) { + return 0; } + endIndexString = filename.substring(lastSeparatorIndex + 1); } else { - Matcher matcher = LOG_PATTERN.matcher(filename); - if (matcher.find()) { - return Long.parseLong(matcher.group()); + return 0; + } + + if (endIndexString.isEmpty()) { + return 0; + } + for (int i = 0; i < endIndexString.length(); i++) { + if (!Character.isDigit(endIndexString.charAt(i))) { + return 0; } } - return 0; + return Long.parseLong(endIndexString); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 64ee0c54791cc..c5bafc550f642 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1256,6 +1256,10 @@ protected TSStatus confirmLeader() { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } + // Procedure recovery replays metadata writes before external load warm-up is complete. + if (procedureManager.isProcedureExecutionThread()) { + return getConsensusManager().confirmLeaderForInternalProcedure(); + } return getConsensusManager().confirmLeader(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index a974f9e1d7cb7..cb76a9550afe8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -228,14 +228,21 @@ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo } public void startExecutor() { + startExecutor(null); + } + + public void startExecutor(final Runnable beforeStartingWorkers) { if (!executor.isRunning()) { executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount()); - executor.startWorkers(); executor.startCompletedCleaner( CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); executor.addInternalProcedure(partitionTableCleaner); store.start(); + if (beforeStartingWorkers != null) { + beforeStartingWorkers.run(); + } + executor.startWorkers(); LOGGER.info(ManagerMessages.PROCEDUREMANAGER_IS_STARTED_SUCCESSFULLY); } } @@ -252,6 +259,10 @@ public void stopExecutor() { } } + public boolean isProcedureExecutionThread() { + return ProcedureExecutor.isProcedureExecutionThread(); + } + @TestOnly public TSStatus createManyDatabases() { this.executor.submitProcedure(new CreateManyDatabasesProcedure()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 84594b0d7a85d..d7024c357b188 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -81,12 +81,14 @@ public class ConsensusManager { new ConfigRegionId(CONF.getConfigRegionId()); private final IManager configManager; + private final ConfigRegionStateMachine stateMachine; private IConsensus consensusImpl; private boolean isInitialized; public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMachine) { this.configManager = configManager; + this.stateMachine = stateMachine; setConsensusLayer(stateMachine); } @@ -94,6 +96,7 @@ public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMa ConsensusManager(IManager configManager, IConsensus consensusImpl) { this.configManager = configManager; this.consensusImpl = consensusImpl; + this.stateMachine = null; } public void start() throws IOException { @@ -445,39 +448,59 @@ public boolean isLeaderExist() { * NEED_REDIRECTION otherwise */ public TSStatus confirmLeader() { - TSStatus result = new TSStatus(); - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } else { - result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); - if (isLeader()) { - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - return result; - } - try { - Thread.sleep(RETRY_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn( - ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); - break; - } - } - result.setMessage( - "The current ConfigNode is leader but not ready yet, please try again later."); - } else { - result.setMessage( - "The current ConfigNode is not leader, please redirect to a new ConfigNode."); - } + return confirmLeader(true); + } + + private TSStatus confirmLeader(final boolean checkLoadReady) { + if (!isLeader()) { + TSStatus result = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + result.setMessage( + "The current ConfigNode is not leader, please redirect to a new ConfigNode."); TConfigNodeLocation leaderLocation = getLeaderLocation(); if (leaderLocation != null) { result.setRedirectNode(leaderLocation.getInternalEndPoint()); } + return result; + } + + waitForLeaderReady(); + + if (!isLeaderReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but consensus is not ready yet."); + } + if (!stateMachine.areLeaderServicesReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but leader services are not ready yet."); + } + if (checkLoadReady && !configManager.getLoadManager().isLoadReady()) { + return getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason()); + } + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + public TSStatus confirmLeaderForInternalProcedure() { + return confirmLeader(false); + } + + private void waitForLeaderReady() { + long startTime = System.currentTimeMillis(); + while (!isLeaderReady() && System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { + try { + Thread.sleep(RETRY_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn( + ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); + return; + } } - return result; + } + + private TSStatus getLeaderWarmingUpStatus(String message) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(message); } public ConsensusGroupId getConsensusGroupId() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e0547bf640d16..63d5ff8befdb6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -50,7 +50,10 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -59,6 +62,8 @@ */ public class LoadManager { + private static final long FIRST_HEARTBEAT_READY_TOLERANCE_MS = TimeUnit.SECONDS.toMillis(30); + protected final IManager configManager; /** Balancers. */ @@ -74,6 +79,10 @@ public class LoadManager { private final StatisticsService statisticsService; private final EventService eventService; private final TopologyService topologyService; + private final AtomicBoolean loadServicesStarted; + private final AtomicLong loadReadyStartTimeMillis; + private final AtomicBoolean loadReady; + private volatile String loadReadyReason; public LoadManager(IManager configManager) { this.configManager = configManager; @@ -92,6 +101,10 @@ public LoadManager(IManager configManager) { configManager.getSubscriptionManager().getSubscriptionLeaderChangeHandler()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); + this.loadServicesStarted = new AtomicBoolean(false); + this.loadReadyStartTimeMillis = new AtomicLong(0); + this.loadReady = new AtomicBoolean(false); + this.loadReadyReason = "ConfigNode leader load services are not started."; } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { @@ -149,7 +162,11 @@ public void reBalanceDataPartitionPolicy(String database) { } public void startLoadServices() { + loadReady.set(false); + loadReadyStartTimeMillis.set(System.currentTimeMillis()); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; loadCache.initHeartbeatCache(configManager); + loadServicesStarted.set(true); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); eventService.startEventService(); @@ -157,6 +174,10 @@ public void startLoadServices() { } public void stopLoadServices() { + loadServicesStarted.set(false); + loadReadyStartTimeMillis.set(0); + loadReady.set(false); + loadReadyReason = "ConfigNode leader load services are stopped."; heartbeatService.stopHeartbeatService(); statisticsService.stopLoadStatisticsService(); eventService.stopEventService(); @@ -165,6 +186,50 @@ public void stopLoadServices() { routeBalancer.clearRegionPriority(); } + public boolean isLoadReady() { + return loadReady.get() || tryUpdateLoadReady(); + } + + public String getLoadReadyReason() { + return loadReadyReason; + } + + private synchronized boolean tryUpdateLoadReady() { + if (loadReady.get()) { + return true; + } + if (!loadServicesStarted.get()) { + loadReadyReason = "ConfigNode leader load services are not started."; + return false; + } + + loadCache.updateNodeStatistics(false); + eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); + + List unreadyReasons = loadCache.getNodeHeartbeatUnreadyReasons(); + if (unreadyReasons.isEmpty()) { + loadReadyReason = "ConfigNode leader load services are ready."; + loadReady.set(true); + return true; + } + + long elapsedMillis = System.currentTimeMillis() - loadReadyStartTimeMillis.get(); + if (elapsedMillis < FIRST_HEARTBEAT_READY_TOLERANCE_MS) { + loadReadyReason = + "ConfigNode leader is waiting for first heartbeat from registered ConfigNodes/DataNodes: " + + unreadyReasons; + return false; + } + + loadReadyReason = + "ConfigNode leader load services are ready after waiting " + + FIRST_HEARTBEAT_READY_TOLERANCE_MS + + "ms for first heartbeat. Missing heartbeats: " + + unreadyReasons; + loadReady.set(true); + return true; + } + public void startTopologyService() { topologyService.startTopologyService(); } @@ -489,6 +554,14 @@ public RouteBalancer getRouteBalancer() { return routeBalancer; } + @TestOnly + void markLoadServicesStartedForTest(long loadReadyStartTimeMillis) { + loadServicesStarted.set(true); + loadReady.set(false); + this.loadReadyStartTimeMillis.set(loadReadyStartTimeMillis); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; + } + @TestOnly public EventService getEventService() { return eventService; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java index d61a004352033..e5ab6445f98e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java @@ -97,6 +97,10 @@ public AbstractHeartbeatSample getLastSample() { return slidingWindow.isEmpty() ? null : slidingWindow.get(slidingWindow.size() - 1); } + public boolean hasHeartbeatSample() { + return getLastSample() != null; + } + /** * Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow. */ diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bcc8..3416246811edc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -82,6 +82,7 @@ public class LoadCache { Math.max( ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), TimeUnit.SECONDS.toMillis(10)); + private static final int MAX_UNREADY_ENTITY_PRINT = 10; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -496,6 +497,32 @@ public Map getCurrentConsensusGroup return consensusGroupStatisticsMap; } + public List getNodeHeartbeatUnreadyReasons() { + List unreadyNodes = new ArrayList<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + return; + } + if ((nodeCache instanceof ConfigNodeHeartbeatCache + || nodeCache instanceof DataNodeHeartbeatCache) + && !nodeCache.hasHeartbeatSample()) { + unreadyNodes.add(nodeId); + } + }); + if (unreadyNodes.isEmpty()) { + return Collections.emptyList(); + } + Collections.sort(unreadyNodes); + List nodesToPrint = + unreadyNodes.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyNodes.size())); + String suffix = + unreadyNodes.size() > MAX_UNREADY_ENTITY_PRINT + ? "...(" + (unreadyNodes.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" + : ""; + return Collections.singletonList("nodes=" + nodesToPrint + suffix); + } + /** * Safely get NodeStatus by NodeId. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java index aa924dc29b585..7dcacc4fd805c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java @@ -41,7 +41,7 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { synchronized (slidingWindow) { lastSample = (ConsensusGroupHeartbeatSample) getLastSample(); } - if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) { + if (lastSample != null) { currentStatistics.set( new ConsensusGroupStatistics(System.nanoTime(), lastSample.getLeaderId())); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 11bb7f382d41e..82afea3859fd4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -56,6 +56,8 @@ public class ProcedureExecutor { private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); + private static final ThreadLocal PROCEDURE_EXECUTION_CONTEXT = + ThreadLocal.withInitial(() -> false); private final ConcurrentHashMap> completed = new ConcurrentHashMap<>(); @@ -96,6 +98,10 @@ public ProcedureExecutor(final Env environment, final IProcedureStore store this(environment, store, new SimpleProcedureScheduler()); } + public static boolean isProcedureExecutionThread() { + return PROCEDURE_EXECUTION_CONTEXT.get(); + } + public void init(int numThreads) { this.corePoolSize = numThreads; this.maxPoolSize = 10 * numThreads; @@ -784,7 +790,12 @@ public void run() { this.activeProcedure.set(procedure); activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); - executeProcedure(procedure); + PROCEDURE_EXECUTION_CONTEXT.set(true); + try { + executeProcedure(procedure); + } finally { + PROCEDURE_EXECUTION_CONTEXT.remove(); + } activeExecutorCount.decrementAndGet(); LOG.trace( "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a18..da766541786cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -89,7 +89,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private static final int STARTUP_RETRY_NUM = 10; + private static final int STARTUP_RETRY_NUM = 20; private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3); private static final int SCHEDULE_WAITING_RETRY_NUM = (int) (COMMON_CONFIG.getCnConnectionTimeoutInMS() / STARTUP_RETRY_INTERVAL_IN_MS); @@ -414,6 +414,12 @@ private void sendRegisterConfigNodeRequest() throws StartupException, IOExceptio } else if (status.getCode() == TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) { LOGGER.warn( ConfigNodeMessages.THE_RESULT_OF_REGISTER_SELF_CONFIGNODE_IS_RETRY, status, retry); + } else if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + LOGGER.info( + "ConfigNode leader is warming up before serving the registering ConfigNode, will wait" + + " and retry. Status: {}, retry: {}", + status, + retry); } else { throw new StartupException(status.getMessage()); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index fd62c31ae3678..e2c51df9693be 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -292,4 +293,73 @@ public void testConsensusGroupCache() throws InterruptedException { new Pair<>(new ConsensusGroupStatistics(newLeaderId), null), differentConsensusGroupStatisticsMap.get(regionGroupId)); } + + @Test + public void testLoadWarmUpRequiresOnlyConfigNodeAndDataNodeSamples() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 100); + Set dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet()); + + loadCache.createNodeHeartbeatCache(NodeType.ConfigNode, 10); + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createNodeHeartbeatCache(NodeType.AINode, 13); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + + Assert.assertEquals( + Collections.singletonList("nodes=[10, 11, 12]"), + loadCache.getNodeHeartbeatUnreadyReasons()); + + loadCache.cacheConfigNodeHeartbeatSample(10, new NodeHeartbeatSample(NodeStatus.Unknown)); + + dataNodeIds.forEach( + dataNodeId -> + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running))); + loadCache.updateNodeStatistics(false); + + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); + } + + @Test + public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101); + Set dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + dataNodeIds.forEach( + dataNodeId -> { + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)); + }); + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); + } + + @Test + public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() { + LOAD_CACHE.clearHeartbeatCache(); + LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 31); + + try { + LOAD_MANAGER.markLoadServicesStartedForTest(System.currentTimeMillis()); + + Assert.assertFalse(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("waiting for first heartbeat")); + + LOAD_MANAGER.markLoadServicesStartedForTest( + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31)); + + Assert.assertTrue(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("Missing heartbeats")); + } finally { + LOAD_MANAGER.stopLoadServices(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index b21d3a3a79209..5c7fa59f78d5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -224,6 +224,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:"; private static final long RETRY_INTERVAL_MS = 1000L; private static final long WAIT_CN_LEADER_ELECTION_INTERVAL_MS = 2000L; + private static final long REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS = 60_000L; private static final String UNSUPPORTED_INVOCATION = "This method is not supported for invocation by DataNode"; @@ -403,12 +404,33 @@ private boolean updateConfigNodeLeader(TSStatus status) { } return true; } + if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + if (!isFirstInitiated) { + logger.info( + "ConfigNode leader {} is warming up before serving DataNode {}, will wait and retry." + + " Reason: {}", + configNode, + config.getAddressAndPort(), + status.getMessage()); + } + try { + Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK); + } + return true; + } return false; } finally { isFirstInitiated = false; } } + private boolean isConfigNodeLeaderWarmingUp(TSStatus status) { + return status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode(); + } + /** * The frame of execute RPC, include logic of retry and exception handling. * @@ -480,20 +502,33 @@ public TGetClusterIdResp getClusterId() throws TException { @Override public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException { - for (int i = 0; i < RETRY_NUM; i++) { + boolean leaderWarmingUpObserved = false; + long leaderWarmingUpRetryDeadline = 0; + for (int i = 0; + i < RETRY_NUM + || (leaderWarmingUpObserved + && System.currentTimeMillis() < leaderWarmingUpRetryDeadline); + i++) { try { TDataNodeRegisterResp resp = client.registerDataNode(req); if (!updateConfigNodeLeader(resp.status)) { return resp; } + if (isConfigNodeLeaderWarmingUp(resp.status) && !leaderWarmingUpObserved) { + leaderWarmingUpObserved = true; + leaderWarmingUpRetryDeadline = + System.currentTimeMillis() + REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS; + } // set latest config node list - List newConfigNodes = new ArrayList<>(); - for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { - newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + if (resp.isSetConfigNodeList()) { + List newConfigNodes = new ArrayList<>(); + for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { + newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + } + configNodes = newConfigNodes; } - configNodes = newConfigNodes; } catch (TException e) { String message = String.format( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 978fd3f1f0ef2..e9e01efd2d48b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -98,6 +98,7 @@ public enum ThreadName { CONFIG_NODE_REGION_MAINTAINER("IoTDB-Region-Maintainer"), // -------------------------- ConfigNode-Recover -------------------------- CONFIG_NODE_RECOVER("ConfigNode-Manager-Recovery"), + CONFIG_NODE_LEADER_SERVICES_TRANSITION("ConfigNode-Leader-Services-Transition"), // -------------------------- ConfigNode-Procedure ------------------------ // TODO: Use Thread Pool to manage the procedure thread @Potato CONFIG_NODE_PROCEDURE_WORKER("ProcedureWorkerGroup"), @@ -374,7 +375,7 @@ public enum ThreadName { new HashSet<>(Arrays.asList(CONFIG_NODE_REGION_MAINTAINER)); private static final Set configNodeRecoverThreadNames = - new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER)); + new HashSet<>(Arrays.asList(CONFIG_NODE_RECOVER, CONFIG_NODE_LEADER_SERVICES_TRANSITION)); private static final Set configNodeProcedureThreadNames = new HashSet<>(