Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public enum TSStatusCode {
CAN_NOT_CONNECT_AINODE(1011),
NO_AVAILABLE_REPLICA(1012),
NO_AVAILABLE_AINODE(1013),
CONFIG_NODE_LEADER_WARMING_UP(1014),

// Sync, Load TsFile
LOAD_FILE_ERROR(1100),
Expand Down
1 change: 1 addition & 0 deletions iotdb-core/ainode/iotdb/ainode/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
class TSStatusCode(Enum):
SUCCESS_STATUS = 200
REDIRECTION_RECOMMEND = 400
CONFIG_NODE_LEADER_WARMING_UP = 1014
MODEL_EXISTED_ERROR = 1503
MODEL_NOT_EXIST_ERROR = 1504
CREATE_MODEL_ERROR = 1505
Expand Down
11 changes: 9 additions & 2 deletions iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, config_leader: TEndPoint):
"Fail to connect to any config node. Please check status of ConfigNodes"
)
self._RETRY_NUM = 10
self._STARTUP_RETRY_NUM = 60
self._RETRY_INTERVAL_IN_S = 1

self._try_to_connect()
Expand Down Expand Up @@ -163,6 +164,12 @@ def _update_config_node_leader(self, status: TSStatus) -> bool:
else:
self._config_leader = None
return True
if status.code == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code():
logger.info(
"ConfigNode leader is warming up before serving AINode, will wait and retry. Reason: {}",
status.message,
)
return True
return False

def node_register(
Expand All @@ -177,7 +184,7 @@ def node_register(
versionInfo=version_info,
)

for _ in range(0, self._RETRY_NUM):
for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.registerAINode(req)
if not self._update_config_node_leader(resp.status):
Expand Down Expand Up @@ -208,7 +215,7 @@ def node_restart(
versionInfo=version_info,
)

for _ in range(0, self._RETRY_NUM):
for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.restartAINode(req)
if not self._update_config_node_leader(resp.status):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void onComplete(TAIHeartbeatResp aiHeartbeatResp) {
public void onError(Exception e) {
if (ThriftClient.isConnectionBroken(e)) {
loadManager.forceUpdateNodeCache(
NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
NodeType.AINode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
}
loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.confignode.client.async.handlers.heartbeat;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
Expand All @@ -36,6 +37,7 @@

import org.apache.thrift.async.AsyncMethodCallback;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -82,54 +84,86 @@ public DataNodeHeartbeatHandler(

@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
// Update NodeCache
cacheNodeHeartbeatSample(heartbeatResp);
cacheRegionGroupHeartbeatSamples(heartbeatResp);
cacheUsageSamples(heartbeatResp);
cachePipeHeartbeat(heartbeatResp);
cacheConfirmedConfigNodeEndPoints(heartbeatResp);
cacheRegionSizeSamples(heartbeatResp);
}

private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) {
loadManager
.getLoadCache()
.cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp));
}

private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp heartbeatResp) {
RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus());

heartbeatResp
.getJudgedLeaders()
.forEach(
(regionGroupId, isLeader) -> {

// Do not allow regions to inherit the Removing state from datanode
RegionStatus nextRegionStatus = regionStatus;
if (nextRegionStatus == RegionStatus.Removing) {
nextRegionStatus =
loadManager
.getLoadCache()
.getRegionCacheLastSampleStatus(regionGroupId, nodeId);
}

// Update RegionGroupCache
loadManager
.getLoadCache()
.cacheRegionHeartbeatSample(
regionGroupId,
nodeId,
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
// Region will inherit DataNode's status
nextRegionStatus),
false);

if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&& DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
&& Boolean.TRUE.equals(isLeader)) {
// Update ConsensusGroupCache when necessary
loadManager
.getLoadCache()
.cacheConsensusSample(
regionGroupId,
new ConsensusGroupHeartbeatSample(
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
}
});
Map<TConsensusGroupId, Boolean> judgedLeaders =
heartbeatResp.isSetJudgedLeaders()
? heartbeatResp.getJudgedLeaders()
: Collections.emptyMap();
judgedLeaders.forEach(
(regionGroupId, isLeader) -> {
cacheRegionHeartbeatSample(heartbeatResp, regionStatus, regionGroupId);
cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader);
});
}

private void cacheRegionHeartbeatSample(
TDataNodeHeartbeatResp heartbeatResp,
RegionStatus dataNodeRegionStatus,
TConsensusGroupId regionGroupId) {
loadManager
.getLoadCache()
.cacheRegionHeartbeatSample(
regionGroupId,
nodeId,
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)),
false);
}

private RegionStatus getRegionHeartbeatStatus(
TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) {
return dataNodeRegionStatus == RegionStatus.Removing
? loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId)
: dataNodeRegionStatus;
}

private void cacheConsensusSampleIfNeeded(
TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId, Boolean isLeader) {
if (!Boolean.TRUE.equals(isLeader)
|| !shouldCacheConsensusSample(regionGroupId)
|| !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) {
return;
}

loadManager
.getLoadCache()
.cacheConsensusSample(
regionGroupId,
new ConsensusGroupHeartbeatSample(
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
}

private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) {
return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&& DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
}

private boolean hasConsensusLogicalTimestamp(
TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) {
return heartbeatResp.isSetConsensusLogicalTimeMap()
&& heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId);
}

private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.getRegionDeviceUsageMap() != null) {
deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap());
Expand All @@ -141,6 +175,9 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.getRegionDisk() != null) {
regionDisk.putAll(heartbeatResp.getRegionDisk());
}
}

private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(
nodeId,
Expand All @@ -149,12 +186,18 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
heartbeatResp.getPipeRemainingEventCountList(),
heartbeatResp.getPipeRemainingTimeList());
}
}

private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
.getLoadCache()
.updateConfirmedConfigNodeEndPoints(
nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints());
}
}

private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) {
if (heartbeatResp.isSetRegionDisk()) {
loadManager.getLoadCache().updateRegionSizeMap(nodeId, heartbeatResp.getRegionDisk());
}
Expand Down
Loading
Loading