diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 4852b9d116e25..403671ac3c64f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -85,6 +85,12 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}
+ @Override
+ public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
+ setProperty("metadata_lease_fence_ms", String.valueOf(metadataLeaseFenceMs));
+ return this;
+ }
+
@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
setProperty("time_partition_interval", String.valueOf(partitionInterval));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 582c9a049e492..df54eb295be48 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -61,6 +61,13 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}
+ @Override
+ public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
+ cnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
+ dnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
+ return this;
+ }
+
@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
cnConfig.setPartitionInterval(partitionInterval);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 48c157e957be8..0df0b61ce002f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -44,6 +44,11 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}
+ @Override
+ public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
+ return this;
+ }
+
@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
return this;
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index dc21234e2bad2..41263510f38d7 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -32,6 +32,8 @@ public interface CommonConfig {
CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold);
+ CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs);
+
CommonConfig setPartitionInterval(long partitionInterval);
CommonConfig setCompressor(String compressor);
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableDDLHAIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableDDLHAIT.java
new file mode 100644
index 0000000000000..1df3241b43d72
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableDDLHAIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.schema;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.awaitility.Awaitility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * HA behavior of table-model DDL: a table DDL must broadcast a cache-invalidation to every
+ * DataNode. Before the metadata-lease/fence change it hard-failed whenever any DataNode was
+ * unreachable (so a single down DataNode broke CREATE TABLE, contradicting multi-replica HA). With
+ * the change the ConfigNode proceeds once the unreachable DataNode is provably self-fenced (it
+ * fails closed on its stale caches and resyncs on recovery, so it cannot serve dirty schema).
+ *
+ *
This test stops one DataNode and asserts CREATE TABLE still succeeds.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableClusterIT.class})
+public class IoTDBTableDDLHAIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // Small fence threshold so the ConfigNode can prove the stopped DataNode is self-fenced quickly
+ // (T_proceed = fence + ~5s internal margin), keeping the test fast. Live DataNodes keep
+ // heartbeating (~1s), so they do not spuriously fence.
+ EnvFactory.getEnv().getConfig().getCommonConfig().setMetadataLeaseFenceMs(2000);
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void createTableSucceedsWhileOneDataNodeIsDown() throws Exception {
+ final DataNodeWrapper liveDataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
+ final DataNodeWrapper victimDataNode = EnvFactory.getEnv().getDataNodeWrapper(2);
+
+ // Pin the connection to a DataNode we will keep alive, so stopping the victim cannot break it.
+ try (final Connection connection =
+ EnvFactory.getEnv()
+ .getConnection(liveDataNode, "root", "root", BaseEnv.TABLE_SQL_DIALECT);
+ final Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE test_ha");
+ statement.execute("USE test_ha");
+
+ // Sanity: with all DataNodes up the DDL broadcast acks everywhere and succeeds immediately.
+ statement.execute("CREATE TABLE t_all_up (region STRING TAG, temperature FLOAT FIELD)");
+
+ // Take one DataNode down. Its last successful ConfigNode contact is now frozen; after
+ // T_proceed the ConfigNode can treat it as self-fenced and stop waiting for its ack.
+ victimDataNode.stop();
+ Assert.assertFalse("victim DataNode should be stopped", victimDataNode.isAlive());
+
+ // The DDL broadcast can no longer reach the stopped DataNode. Previously this hard-failed;
+ // now it must still succeed (after blocking ~T_proceed while the fence is proven).
+ statement.execute("CREATE TABLE t_after_down (region STRING TAG, temperature FLOAT FIELD)");
+
+ // Confirm the new table is really visible on the live DataNode.
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> tableExists(statement, "t_after_down"));
+ assertTrue(
+ "CREATE TABLE must succeed with one DataNode down",
+ tableExists(statement, "t_after_down"));
+ }
+ }
+
+ private static boolean tableExists(final Statement statement, final String tableName)
+ throws Exception {
+ try (final ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
+ while (resultSet.next()) {
+ if (tableName.equalsIgnoreCase(resultSet.getString(1))) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index e7a31b1dc73eb..c08aee0d1bdc1 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.lease.DataNodeContactTracker;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
@@ -82,6 +83,16 @@ public DataNodeHeartbeatHandler(
@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
+ // A successful response confirms ConfigNode->DataNode contact; stamp it on the ConfigNode clock
+ // for the metadata-lease verdict. Kept separate from the load-cache samples (which record the
+ // echoed send-time) and deliberately not touched in onError, so failures never advance it.
+ final DataNodeContactTracker contactTracker = DataNodeContactTracker.getInstance();
+ contactTracker.recordSuccessfulResponse(nodeId);
+ contactTracker.recordCapability(
+ nodeId,
+ heartbeatResp.isSetSupportsMetadataLeaseFencing()
+ && heartbeatResp.isSupportsMetadataLeaseFencing());
+
// Update NodeCache
loadManager
.getLoadCache()
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index fe687d17556f7..cf61bbaea0445 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -37,6 +37,7 @@
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.lease.DataNodeContactTracker;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.schema.ConfigNodeSnapshotParser;
@@ -291,6 +292,14 @@ public void notifyLeaderReady() {
// Always start load services first
configManager.getLoadManager().startLoadServices();
+ // Reset every DataNode's last-contact time to now on (re)acquiring leadership: a stale
+ // timestamp
+ // left from a previous leadership term (while another ConfigNode was contacting the DataNodes)
+ // would otherwise let the metadata-broadcast verdict wrongly judge a live DataNode as fenced.
+ DataNodeContactTracker.getInstance()
+ .onLeadershipAcquired(
+ configManager.getNodeManager().getRegisteredDataNodeLocations().keySet());
+
if (CONF.isEnableTopologyProbing()) {
configManager.getLoadManager().startTopologyService();
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/lease/ClusterCachePropagator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/lease/ClusterCachePropagator.java
new file mode 100644
index 0000000000000..74a49ca5c450d
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/lease/ClusterCachePropagator.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.lease;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.lease.MetadataBroadcastVerdict.DataNodeState;
+import org.apache.iotdb.confignode.manager.lease.MetadataBroadcastVerdict.Verdict;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntPredicate;
+import java.util.function.IntToLongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+/**
+ * Drives one Tier-A metadata cache-invalidation broadcast to the cluster and turns "which DataNodes
+ * acknowledged" into a {@link Verdict} via {@link MetadataBroadcastVerdict}. Instead of the legacy
+ * "any unreachable DataNode fails the operation", a DataNode that is provably self-fenced (out of
+ * ConfigNode contact for at least {@code T_proceed} and known to support fencing) is treated as
+ * safe to proceed past, delivering availability without risking dirty data (see the design doc).
+ *
+ *
The caller supplies a {@link CacheBroadcast} closure wrapping its specific RPC (table
+ * pre-release, schema-cache invalidation, permission-cache invalidation, ...); this class is
+ * agnostic to the request type and only interprets the per-DataNode {@link TSStatus} responses.
+ *
+ *
Stateless and cheap to construct per operation. Clock and sleep are injectable for testing.
+ */
+public class ClusterCachePropagator {
+
+ /**
+ * {@code T_proceed = T_fence + margin}. The margin (default 5s) covers heartbeat-recording
+ * granularity and scheduling jitter; see design §2.6. Kept internal (not a user knob).
+ */
+ private static final long DEFAULT_PROCEED_MARGIN_MS = 5_000L;
+
+ /** How often to re-broadcast while waiting for unacked DataNodes to ack or to cross T_proceed. */
+ private static final long RETRY_INTERVAL_MS = 1_000L;
+
+ /**
+ * Extra slack on top of T_proceed before giving up, so a just-died DataNode can cross T_proceed.
+ */
+ private static final long WAIT_BUDGET_BUFFER_MS = 5_000L;
+
+ /** Broadcasts the cache invalidation to {@code targets} and returns the per-nodeId responses. */
+ @FunctionalInterface
+ public interface CacheBroadcast {
+ Map sendTo(Map targets);
+ }
+
+ /** Injectable sleep so the retry loop can be driven deterministically in tests. */
+ @FunctionalInterface
+ interface Sleeper {
+ void sleepMs(long ms) throws InterruptedException;
+ }
+
+ private final Supplier