Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6217834
Add metadata-lease self-fencing foundation for CN->DN broadcast HA
JackieTien97 Jun 2, 2026
bdf1c5c
Wire DataNode metadata-lease-fencing capability into the heartbeat
JackieTien97 Jun 2, 2026
9901f40
Fail closed on permission cache while metadata lease is fenced
JackieTien97 Jun 2, 2026
f3fb4fe
Force tree-schema cache miss while metadata lease is fenced
JackieTien97 Jun 2, 2026
d5bc64c
Use infinite TTL in compaction while metadata lease is fenced
JackieTien97 Jun 2, 2026
f8a9bee
Add ClusterCachePropagator to drive the Tier-A broadcast verdict
JackieTien97 Jun 3, 2026
3b6a6ea
Wire DataNodeContactTracker leadership/removal lifecycle hooks
JackieTien97 Jun 3, 2026
fbaaae6
Wire CreateTable pre-release through ClusterCachePropagator (template)
JackieTien97 Jun 3, 2026
f9accb5
Add 1C3D IT: table DDL succeeds with one DataNode down
JackieTien97 Jun 3, 2026
88c97ba
Wire all alter/drop-table procedures through ClusterCachePropagator
JackieTien97 Jun 3, 2026
d6f0e8e
Wire tree-model schema-cache invalidation through ClusterCachePropagator
JackieTien97 Jun 3, 2026
f87dae6
Centralize schema-cache invalidation through ClusterCachePropagator
JackieTien97 Jun 3, 2026
109c155
Wire SET/UNSET TEMPLATE through ClusterCachePropagator
JackieTien97 Jun 3, 2026
67a9746
Wire SetTTL DataNode broadcast through ClusterCachePropagator
JackieTien97 Jun 3, 2026
8f58c4e
Wire DeleteDatabase sync cache-invalidation through ClusterCachePropa…
JackieTien97 Jun 3, 2026
1e25355
Fix AuthOperationProcedure silent permission-staleness hole
JackieTien97 Jun 3, 2026
cc0cbcc
Remove internal design notes from version control
JackieTien97 Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
setProperty("metadata_lease_fence_ms", String.valueOf(metadataLeaseFenceMs));
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
setProperty("time_partition_interval", String.valueOf(partitionInterval));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
cnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
dnConfig.setMetadataLeaseFenceMs(metadataLeaseFenceMs);
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
cnConfig.setPartitionInterval(partitionInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold) {
return this;
}

@Override
public CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs) {
return this;
}

@Override
public CommonConfig setPartitionInterval(long partitionInterval) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface CommonConfig {

CommonConfig setMemtableSizeThreshold(long memtableSizeThreshold);

CommonConfig setMetadataLeaseFenceMs(long metadataLeaseFenceMs);

CommonConfig setPartitionInterval(long partitionInterval);

CommonConfig setCompressor(String compressor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.relational.it.schema;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.env.BaseEnv;

import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;

/**
* HA behavior of table-model DDL: a table DDL must broadcast a cache-invalidation to every
* DataNode. Before the metadata-lease/fence change it hard-failed whenever any DataNode was
* unreachable (so a single down DataNode broke CREATE TABLE, contradicting multi-replica HA). With
* the change the ConfigNode proceeds once the unreachable DataNode is provably self-fenced (it
* fails closed on its stale caches and resyncs on recovery, so it cannot serve dirty schema).
*
* <p>This test stops one DataNode and asserts CREATE TABLE still succeeds.
*/
@RunWith(IoTDBTestRunner.class)
@Category({TableClusterIT.class})
public class IoTDBTableDDLHAIT {

@BeforeClass
public static void setUp() throws Exception {
// Small fence threshold so the ConfigNode can prove the stopped DataNode is self-fenced quickly
// (T_proceed = fence + ~5s internal margin), keeping the test fast. Live DataNodes keep
// heartbeating (~1s), so they do not spuriously fence.
EnvFactory.getEnv().getConfig().getCommonConfig().setMetadataLeaseFenceMs(2000);
EnvFactory.getEnv().initClusterEnvironment(1, 3);
}

@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void createTableSucceedsWhileOneDataNodeIsDown() throws Exception {
final DataNodeWrapper liveDataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
final DataNodeWrapper victimDataNode = EnvFactory.getEnv().getDataNodeWrapper(2);

// Pin the connection to a DataNode we will keep alive, so stopping the victim cannot break it.
try (final Connection connection =
EnvFactory.getEnv()
.getConnection(liveDataNode, "root", "root", BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE test_ha");
statement.execute("USE test_ha");

// Sanity: with all DataNodes up the DDL broadcast acks everywhere and succeeds immediately.
statement.execute("CREATE TABLE t_all_up (region STRING TAG, temperature FLOAT FIELD)");

// Take one DataNode down. Its last successful ConfigNode contact is now frozen; after
// T_proceed the ConfigNode can treat it as self-fenced and stop waiting for its ack.
victimDataNode.stop();
Assert.assertFalse("victim DataNode should be stopped", victimDataNode.isAlive());

// The DDL broadcast can no longer reach the stopped DataNode. Previously this hard-failed;
// now it must still succeed (after blocking ~T_proceed while the fence is proven).
statement.execute("CREATE TABLE t_after_down (region STRING TAG, temperature FLOAT FIELD)");

// Confirm the new table is really visible on the live DataNode.
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> tableExists(statement, "t_after_down"));
assertTrue(
"CREATE TABLE must succeed with one DataNode down",
tableExists(statement, "t_after_down"));
}
}

private static boolean tableExists(final Statement statement, final String tableName)
throws Exception {
try (final ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
while (resultSet.next()) {
if (tableName.equalsIgnoreCase(resultSet.getString(1))) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.lease.DataNodeContactTracker;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
Expand Down Expand Up @@ -82,6 +83,16 @@ public DataNodeHeartbeatHandler(

@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
// A successful response confirms ConfigNode->DataNode contact; stamp it on the ConfigNode clock
// for the metadata-lease verdict. Kept separate from the load-cache samples (which record the
// echoed send-time) and deliberately not touched in onError, so failures never advance it.
final DataNodeContactTracker contactTracker = DataNodeContactTracker.getInstance();
contactTracker.recordSuccessfulResponse(nodeId);
contactTracker.recordCapability(
nodeId,
heartbeatResp.isSetSupportsMetadataLeaseFencing()
&& heartbeatResp.isSupportsMetadataLeaseFencing());

// Update NodeCache
loadManager
.getLoadCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.lease.DataNodeContactTracker;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.schema.ConfigNodeSnapshotParser;
Expand Down Expand Up @@ -291,6 +292,14 @@ public void notifyLeaderReady() {
// Always start load services first
configManager.getLoadManager().startLoadServices();

// Reset every DataNode's last-contact time to now on (re)acquiring leadership: a stale
// timestamp
// left from a previous leadership term (while another ConfigNode was contacting the DataNodes)
// would otherwise let the metadata-broadcast verdict wrongly judge a live DataNode as fenced.
DataNodeContactTracker.getInstance()
.onLeadershipAcquired(
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet());

if (CONF.isEnableTopologyProbing()) {
configManager.getLoadManager().startTopologyService();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.manager.lease;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.lease.MetadataBroadcastVerdict.DataNodeState;
import org.apache.iotdb.confignode.manager.lease.MetadataBroadcastVerdict.Verdict;
import org.apache.iotdb.rpc.TSStatusCode;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* Drives one Tier-A metadata cache-invalidation broadcast to the cluster and turns "which DataNodes
* acknowledged" into a {@link Verdict} via {@link MetadataBroadcastVerdict}. Instead of the legacy
* "any unreachable DataNode fails the operation", a DataNode that is provably self-fenced (out of
* ConfigNode contact for at least {@code T_proceed} and known to support fencing) is treated as
* safe to proceed past, delivering availability without risking dirty data (see the design doc).
*
* <p>The caller supplies a {@link CacheBroadcast} closure wrapping its specific RPC (table
* pre-release, schema-cache invalidation, permission-cache invalidation, ...); this class is
* agnostic to the request type and only interprets the per-DataNode {@link TSStatus} responses.
*
* <p>Stateless and cheap to construct per operation. Clock and sleep are injectable for testing.
*/
public class ClusterCachePropagator {

/**
* {@code T_proceed = T_fence + margin}. The margin (default 5s) covers heartbeat-recording
* granularity and scheduling jitter; see design §2.6. Kept internal (not a user knob).
*/
private static final long DEFAULT_PROCEED_MARGIN_MS = 5_000L;

/** How often to re-broadcast while waiting for unacked DataNodes to ack or to cross T_proceed. */
private static final long RETRY_INTERVAL_MS = 1_000L;

/**
* Extra slack on top of T_proceed before giving up, so a just-died DataNode can cross T_proceed.
*/
private static final long WAIT_BUDGET_BUFFER_MS = 5_000L;

/** Broadcasts the cache invalidation to {@code targets} and returns the per-nodeId responses. */
@FunctionalInterface
public interface CacheBroadcast {
Map<Integer, TSStatus> sendTo(Map<Integer, TDataNodeLocation> targets);
}

/** Injectable sleep so the retry loop can be driven deterministically in tests. */
@FunctionalInterface
interface Sleeper {
void sleepMs(long ms) throws InterruptedException;
}

private final Supplier<Map<Integer, TDataNodeLocation>> registeredDataNodes;
private final IntPredicate supportsFencing;
private final IntToLongFunction hbAgeMs;
private final LongSupplier tProceedMs;
private final LongSupplier nanoClock;
private final Sleeper sleeper;

public ClusterCachePropagator(final IManager configManager) {
this(
() -> configManager.getNodeManager().getRegisteredDataNodeLocations(),
nodeId -> DataNodeContactTracker.getInstance().supportsFencing(nodeId),
nodeId -> DataNodeContactTracker.getInstance().getMillisSinceLastSuccessfulResponse(nodeId),
() ->
CommonDescriptor.getInstance().getConfig().getMetadataLeaseFenceMs()
+ DEFAULT_PROCEED_MARGIN_MS,
System::nanoTime,
Thread::sleep);
}

ClusterCachePropagator(
final Supplier<Map<Integer, TDataNodeLocation>> registeredDataNodes,
final IntPredicate supportsFencing,
final IntToLongFunction hbAgeMs,
final LongSupplier tProceedMs,
final LongSupplier nanoClock,
final Sleeper sleeper) {
this.registeredDataNodes = registeredDataNodes;
this.supportsFencing = supportsFencing;
this.hbAgeMs = hbAgeMs;
this.tProceedMs = tProceedMs;
this.nanoClock = nanoClock;
this.sleeper = sleeper;
}

/**
* Broadcast once and classify the result. {@code waitBudgetExhausted} turns a would-be {@link
* Verdict#WAIT} into {@link Verdict#FAIL} (the caller's retry budget ran out).
*/
public Verdict propagateOnce(final CacheBroadcast broadcast, final boolean waitBudgetExhausted) {
final Map<Integer, TDataNodeLocation> targets = registeredDataNodes.get();
final Map<Integer, TSStatus> responses = broadcast.sendTo(targets);
final long tProceed = tProceedMs.getAsLong();
final int successCode = TSStatusCode.SUCCESS_STATUS.getStatusCode();
final List<DataNodeState> states = new ArrayList<>(targets.size());
for (final Integer nodeId : targets.keySet()) {
final TSStatus status = responses.get(nodeId);
final boolean acked = status != null && status.getCode() == successCode;
// retiredOrFenceAcked is left false: there is no explicit fence-ack signal yet, and a
// Removing DataNode may still serve clients, so it must ack or be provably fenced like any
// other (see MetadataBroadcastVerdict's SAFE_GONE rule).
states.add(
new DataNodeState(
acked, false, supportsFencing.test(nodeId), hbAgeMs.applyAsLong(nodeId)));
}
return MetadataBroadcastVerdict.decide(states, tProceed, waitBudgetExhausted);
}

/**
* Broadcast and retry until the verdict is {@link Verdict#PROCEED} (returns {@code true}) or the
* wait budget is exhausted with at least one DataNode still unsafe ({@link Verdict#FAIL}, returns
* {@code false}). Blocks the calling (procedure) thread for up to {@code T_proceed + buffer}.
*/
public boolean propagate(final CacheBroadcast broadcast) {
final long deadlineNanos =
nanoClock.getAsLong()
+ TimeUnit.MILLISECONDS.toNanos(tProceedMs.getAsLong() + WAIT_BUDGET_BUFFER_MS);
while (true) {
final boolean waitBudgetExhausted = nanoClock.getAsLong() >= deadlineNanos;
final Verdict verdict = propagateOnce(broadcast, waitBudgetExhausted);
if (verdict == Verdict.PROCEED) {
return true;
}
if (verdict == Verdict.FAIL) {
return false;
}
try {
sleeper.sleepMs(RETRY_INTERVAL_MS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
}
Loading
Loading