Skip to content

Make Tier-A metadata operations HA when a DataNode is down (metadata lease + self-fencing)#17831

Open
JackieTien97 wants to merge 17 commits into
apache:masterfrom
JackieTien97:metadata-broadcast-ha-fencing
Open

Make Tier-A metadata operations HA when a DataNode is down (metadata lease + self-fencing)#17831
JackieTien97 wants to merge 17 commits into
apache:masterfrom
JackieTien97:metadata-broadcast-ha-fencing

Conversation

@JackieTien97
Copy link
Copy Markdown
Contributor

Problem

Many cluster metadata operations broadcast a cache-invalidation to every registered DataNode and treat any unreachable DataNode as a hard failure. As a result, with multiple replicas configured, a single down DataNode still makes these operations fail — contradicting the HA goal. The original report was table-model DDL (CREATE TABLE, ALTER TABLE ...), but the same pattern affects ~all Tier-A metadata ops (tree-model schema, templates, views, TTL, DROP DATABASE, permissions).

The reason the broadcast cannot simply "skip" an unreachable DataNode is correctness: a DataNode caches ConfigNode-pushed metadata (table/tree schema, permissions, TTL, ...). If it misses an invalidation during a network partition and keeps serving the stale cache, it can produce dirty data / stale reads / stale authorization.

Approach — metadata lease + self-fencing + a broadcast verdict

  1. DataNode self-fencing (DN side, fail-closed). The ConfigNode already heartbeats every DataNode. A DataNode records the last ConfigNode-heartbeat time on its own monotonic clock; if none arrives within metadata_lease_fence_ms (T_fence, default 20s, aligned with the failure detector), it considers itself fenced and stops trusting ConfigNode-pushed caches. On the next heartbeat (recovery) it resyncs.
  2. ConfigNode broadcast verdict (CN side, liveness). Instead of "any unreachable DataNode fails the op", the ConfigNode proceeds once every un-acked DataNode is provably self-fenced — out of contact for at least T_proceed = T_fence + margin, and known to support fencing. Such a DataNode is fail-closed and will resync, so it cannot serve dirty data. A DataNode that is reachable-but-unacked (still possibly serving) blocks the op (wait, then fail).

This gives the desired CP behavior: a healthy majority makes progress; a partitioned minority fails closed.

What this PR contains

DataNode side (fail-closed when fenced):

  • MetadataLeaseManager — lease tracking, lazy fence check, recovery listeners (monotonic clock, injectable for tests)
  • Table schema cache (DataNodeTableCache) — throws retryable; tree schema cache (TreeDeviceSchemaCacheManager) — forces re-fetch from the quorum-backed SchemaRegion; permission cache (ClusterAuthorityFetcher) — drops cache → re-fetch (deny while partitioned); TTL — compaction uses an infinite TTL when fenced (never deletes by a stale TTL)
  • Heartbeat records contact + reports a supportsMetadataLeaseFencing capability bit; a metadata_lease_heartbeat_age_ms metric

ConfigNode side (proceed past provably-fenced DataNodes):

  • MetadataBroadcastVerdict (pure decision logic) + DataNodeContactTracker (sound per-DataNode last-successful-response signal, capability bit) + ClusterCachePropagator (broadcast → verdict → PROCEED/WAIT/FAIL with retry)
  • Leadership/removal lifecycle hooks for the contact tracker
  • All Tier-A procedures wired through the propagator: table DDL (CreateTable + all 8 alter/drop procedures, forward + rollback), tree-model schema (DeleteTimeSeries, AlterTimeSeriesDataType, AlterEncodingCompressor, logical views, DeactivateTemplate), templates (Set/Unset), SetTTL, and DeleteDatabase (sync path). Region-task / physical-deletion broadcasts are deliberately left on region consensus.
  • AuthOperationProcedure: fixed a silent permission-staleness hole (it used to drop an unacked DataNode after a timeout, leaving a live DataNode serving a just-revoked permission); now proceeds only past provably-fenced DataNodes.

Testing

  • Unit tests for every new component and each fail-closed injection (MetadataLeaseManager, DataNodeTableCache, tree schema, permission, TTL-compaction, MetadataBroadcastVerdict, DataNodeContactTracker, ClusterCachePropagator), plus no-regression on the affected procedures.
  • 1C3D integration test (IoTDBTableDDLHAIT): with one DataNode stopped, CREATE TABLE succeeds after ~T_proceed (instead of failing immediately). A new setMetadataLeaseFenceMs IT-config setter keeps the wait short.

Configuration & compatibility

  • New knob metadata_lease_fence_ms (default 20000). T_proceed = T_fence + internal margin (~5s).
  • Takes effect on upgrade, no switch. Rolling-upgrade safe via the capability bit: an old DataNode (no fencing) is never treated as fenced, so the ConfigNode falls back to the strict (current) semantics for it; correctness is preserved DataNode-by-DataNode.

Not in this PR (follow-up)

Tier-B resource operations (DROP/CREATE FUNCTION/TRIGGER/PIPE PLUGIN, SET SYSTEM STATUS=ReadOnly) and the quota re-pull. These need DataNode-side fail-closed on resource execution (not a read-through cache) plus a resource recovery-resync mechanism, which is a separate, self-contained effort.

Table-model DDL and ~30 other ConfigNode->DataNode metadata broadcasts fail when any single DataNode is unreachable, because the ConfigNode requires every registered DataNode to acknowledge a cache invalidation before committing (to stop a partitioned DataNode from serving stale CN-pushed caches and generating dirty data). This adds the test-covered foundation for a metadata-lease/fencing mechanism that lets such operations tolerate DataNode failures without sacrificing correctness.

DataNode side: MetadataLeaseManager tracks the lease via the ConfigNode heartbeat (monotonic clock); isFenced() when no heartbeat within metadata_lease_fence_ms (default 20s); fires recovery listeners when a heartbeat arrives after a fence. DataNodeTableCache fail-closed (retryable error) in getTableInWrite/getTable while fenced, and invalidateAll() registered as a recovery listener so a recovered DataNode re-fetches fresh schema. getDataNodeHeartBeat records the heartbeat; a metadata_lease_heartbeat_age_ms gauge is exposed.

ConfigNode side: DataNodeContactTracker records, per DataNode, the time the ConfigNode last received a successful heartbeat response, stamped on the ConfigNode clock only on success and never advanced by onError (recorded in DataNodeHeartbeatHandler.onComplete) - the sound signal for deciding whether an unreachable DataNode has self-fenced. MetadataBroadcastVerdict is the pure decision logic (capability checked first; FENCED_SAFE only via hbAge>=T_proceed or retired-from-routing; no additive fast-path).

No ConfigNode procedure control flow is changed yet (the verdict is not wired into procedures); the DataNode fail-closed is active only while a DataNode is actually fenced. Config: metadata_lease_fence_ms in CommonConfig. 20 new unit tests.
Add an optional supportsMetadataLeaseFencing flag to TDataNodeHeartbeatResp. The DataNode advertises it (true); the ConfigNode records it per-DataNode in DataNodeContactTracker. The verdict checks capability before any liveness/timing test, so a not-yet-upgraded DataNode that omits the flag is recorded as not-capable and never treated as fenced (strict, rolling-upgrade safe). DataNodeContactTracker gains recordCapability/supportsFencing (default false). 3 new unit tests.
The DataNode permission cache (ClusterAuthorityFetcher) is invalidated by a
ConfigNode broadcast after GRANT/REVOKE. A DataNode partitioned from the
ConfigNode can miss that broadcast and keep authorizing a privilege that was
already revoked.

The pre-existing refreshToken() timeout did not close this window: it only
marks the cache stale when a heartbeat finally arrives after a long gap, so
during an ongoing partition (no heartbeat at all) refreshToken() is never
called and the stale cache keeps being served until recovery.

checkCacheAvailable() now also drops the cache when MetadataLeaseManager
reports the lease fenced. isFenced() is evaluated on the DataNode's own clock
and needs no heartbeat to fire, so an ongoing partition forces a re-fetch from
the ConfigNode, which fails closed while partitioned (deny, not allow).
The tree-model schema cache (TreeDeviceSchemaCacheManager) is read-through:
on a miss the caller re-fetches from the quorum-backed SchemaRegion, and a
ConfigNode broadcast only invalidates entries after a DELETE TIMESERIES /
datatype change. A DataNode partitioned from the ConfigNode can miss that
broadcast and keep a stale cached entry, then validate a write or resolve a
query against schema that no longer exists.

All six cache reads funnel through getDeviceSchema(String[]); route them
through getDeviceSchemaOrMissWhenFenced, which returns null (a miss) while the
lease is fenced so the caller re-fetches from the authoritative SchemaRegion.
This is more available than hard-failing (the op still succeeds whenever the
SchemaRegion quorum is reachable, and only fails closed when it is not) and
keeps the gate tree-scoped, since getDeviceSchema is also used by table-model
fetching. On lease recovery cleanUp() drops entries cached before the
partition that were never re-read while fenced.
Compaction physically deletes data older than its TTL window, reading the TTL
from DataNodeTTLCache (pushed by the ConfigNode). A DataNode partitioned from
the ConfigNode can miss a TTL update; a too-short stale TTL would make
compaction permanently delete data that a missed TTL-increase says to keep -
an irreversible loss.

MultiTsFileDeviceIterator.nextDevice() now uses an infinite TTL
(Long.MAX_VALUE -> timeLowerBound Long.MIN_VALUE -> no TTL-based deletion) when
the lease is fenced, scoped to the compaction path only (query/write TTL
behavior is unchanged). The check runs before the cache reads, so the
table-model path also avoids the now fail-closed DataNodeTableCache. Real TTL
deletion resumes once the lease recovers and the cache resyncs.
The ConfigNode side of the metadata-lease HA change. ClusterCachePropagator
broadcasts a cache-invalidation to all registered DataNodes and turns the
per-DataNode responses into a PROCEED/WAIT/FAIL verdict via the already-built
MetadataBroadcastVerdict, instead of the legacy 'any unreachable DataNode
fails the operation'.

For each registered DataNode it builds a DataNodeState from: acked (SUCCESS
response), supportsFencing and hbAge (from DataNodeContactTracker). A DataNode
that is provably self-fenced (capable + silent past T_proceed) is safe to
proceed past; a non-SUCCESS or recently-contacted unacked DataNode is unsafe.
propagate() retries on WAIT until a DataNode acks/crosses T_proceed or the
wait budget (T_proceed + buffer) runs out.

The caller supplies a CacheBroadcast closure wrapping its specific RPC, so the
propagator is agnostic to the request type. Clock and sleep are injectable;
the verdict construction and the retry loop are covered by unit tests.
The metadata-broadcast verdict reads each DataNode's last-successful-response
time from DataNodeContactTracker. Two lifecycle events must keep that signal
sound:

- On (re)acquiring ConfigRegion leadership (notifyLeaderReady), reset every
  registered DataNode's contact time to now. Otherwise a timestamp left from a
  previous leadership term - during which another ConfigNode was contacting the
  DataNodes - could make the verdict wrongly judge a live DataNode as fenced.

- On permanent DataNode removal (removeDataNodePersistence), drop its tracker
  entry so stale contact/capability state is not retained and a future DataNode
  reusing the id cannot inherit it.
First Tier-A procedure migrated off 'any unreachable DataNode fails the op'.
CreateTableProcedure's PRE_RELEASE step now broadcasts via
ClusterCachePropagator: it proceeds once every unacked DataNode is provably
self-fenced (which, per Phase 1, fails closed on its stale table cache and
resyncs on lease recovery, so it cannot serve dirty schema), and only fails
when an unacked DataNode is not provably fenced.

SchemaUtils gains preUpdateTableReq() (request builder) and
broadcastTableUpdate() (returns the full per-nodeId response map the verdict
needs); the legacy preReleaseTable() is now a thin wrapper returning only
failures, so its other callers are unchanged. The happy path (all DataNodes
ack -> PROCEED) is behaviorally identical; CreateTableProcedureTest still
passes. COMMIT_RELEASE stays best-effort (warn-only) as before.

This is the template for the remaining Tier-A procedures.
End-to-end verification of the metadata-lease/fence HA change. With a short
metadata_lease_fence_ms, the test starts 1 ConfigNode + 3 DataNodes, creates a
database, stops one DataNode, and asserts CREATE TABLE still succeeds - whereas
before the change a table DDL hard-failed whenever any DataNode could not
acknowledge the cache-invalidation broadcast.

Adds setMetadataLeaseFenceMs to the IT CommonConfig framework (interface +
MppCommonConfig / MppSharedCommonConfig / RemoteCommonConfig) so the fence
threshold can be shortened, keeping the proceed-past-fenced wait fast.
AbstractAlterOrDropTableProcedure is the base for all 8 table-mutation
procedures (AddTableColumn, DropTableColumn, DropTable, RenameTable,
RenameTableColumn, SetTableProperties, AlterTableColumnDataType,
DeleteDevices), so wiring it migrates them all at once.

Both the forward pre-release and the rollback pre-release now broadcast via
ClusterCachePropagator and proceed once every unreachable DataNode is provably
self-fenced, instead of hard-failing on the first unreachable DataNode. The
rollback path is included so a down DataNode cannot block rollback either.
commitRelease stays best-effort (warn-only) as before, since the change is
already authoritative once committed.

SchemaUtils gains rollbackUpdateTableReq() to mirror preUpdateTableReq(); both
legacy preReleaseTable()/rollbackPreRelease() remain thin failure-returning
wrappers. All 7 alter/drop procedure serialization tests still pass.
DeleteTimeSeriesProcedure.invalidateCache is the shared static helper that
broadcasts INVALIDATE_MATCHED_SCHEMA_CACHE; AlterEncodingCompressorProcedure
reuses it, so wiring it covers both. It now proceeds once every unreachable
DataNode is provably self-fenced instead of hard-failing on the first one.

It runs before the physical delete in the state machine, so the 'delete only
after PROCEED' ordering holds with no reordering. Because the propagator may
re-broadcast while waiting for unacked DataNodes, the broadcast closure builds
a fresh request with patternTreeBytes.duplicate() on every attempt, so a
consumed buffer can never be re-sent as an empty (silently-successful)
invalidation. DeleteTimeSeries and AlterEncodingCompressor serialization tests
still pass.
Add SchemaUtils.invalidateMatchedSchemaCache() as the single place that
broadcasts INVALIDATE_MATCHED_SCHEMA_CACHE via ClusterCachePropagator (proceed
once every unreachable DataNode is provably self-fenced) with the
buffer.duplicate() safety against the propagator's re-broadcast on WAIT.

Route all five INVALIDATE_MATCHED_SCHEMA_CACHE callers through it:
- AlterLogicalViewProcedure, DeleteLogicalViewProcedure (views)
- DeactivateTemplateProcedure (template)
- DeleteTimeSeriesProcedure, AlterTimeSeriesDataTypeProcedure (static helpers,
  also used by AlterEncodingCompressorProcedure) - refactored off their inline
  broadcasts onto the shared helper.

Each keeps its own error message and runs before its physical delete/alter
step, so the 'delete/alter only after PROCEED' ordering holds. The region-task
broadcasts (CONSTRUCT_*_BLACK_LIST, ALTER_VIEW, ALTER_TIMESERIES_DATATYPE) are
deliberately untouched - they go through region consensus. Affected procedure
tests still pass.
Add SchemaUtils.broadcastTemplateUpdate(cm, Supplier<TUpdateTemplateReq>): the
single place that broadcasts UPDATE_TEMPLATE via ClusterCachePropagator,
proceeding once every unreachable DataNode is provably self-fenced. The request
is rebuilt from the supplier on each attempt because the propagator may
re-broadcast on WAIT and TUpdateTemplateReq's binary field is ByteBuffer-backed
(reusing one request could re-send a consumed, empty payload).

SetTemplateProcedure (ADD_TEMPLATE_PRE_SET_INFO forward step) and
UnsetTemplateProcedure (INVALIDATE_TEMPLATE_SET_INFO) now use it instead of
hard-failing on the first unreachable DataNode; both keep their own messages and
their state advance / throw-on-failure semantics. The region-task validation
(VALIDATE_TIMESERIES_EXISTENCE) is unchanged. Template procedure tests pass.
SetTTLProcedure's UPDATE_DATANODE_CACHE step (and the symmetric rollback restore)
broadcast SET_TTL to all DataNodes after the authoritative ConfigNode write.
Both used to hard-fail on the first unreachable DataNode, which also forced a
full rollback of the committed TTL whenever any DataNode was down.

Both now go through a new overridable broadcastTTLAndDecide() seam backed by
ClusterCachePropagator: proceed once every unreachable DataNode is provably
self-fenced (it fails closed on TTL in compaction and resyncs on recovery), and
fail (→ rollback) only when a live DataNode is genuinely unacked. TSetTTLReq has
no ByteBuffer field, so the request is reused safely across the propagator's
re-broadcasts.

The test overrides broadcastTTLAndDecide instead of sendTTLRequest, keeping the
rollback-on-live-failure scenario deterministic and fast (no real propagator
sleep). All 6 SetTTL tests pass.
…gator

ConfigNodeProcedureEnv.invalidateCache (the DeleteDatabase INVALIDATE_CACHE
step) synchronously invalidates partition+schema cache on every DataNode. It
used to poll an Unknown DataNode for 5s then hard-fail, so a single down
DataNode broke DROP DATABASE.

It now runs through ClusterCachePropagator: a per-round closure synchronously
invalidates each reachable DataNode (SUCCESS only if both partition and schema
succeed) and reports Unknown/erroring DataNodes as not-acked WITHOUT sync-sending
to them (a sync send to a dead DataNode would block on connect timeouts). The
propagator then proceeds once every not-acked DataNode is provably self-fenced
(it fails closed and resyncs on recovery) and fails only on a live unacked
DataNode. This runs before DELETE_DATABASE_SCHEMA, so the delete-after-PROCEED
ordering holds. Removed the now-dead Unknown-polling loop and unused
getNodeManager() helper. DeleteDatabaseProcedureTest passes.
The DATANODE_AUTHCACHE_INVALIDING step broadcast INVALIDATE_PERMISSION_CACHE
and, after datanode_token_timeout_ms, SILENTLY DROPPED any still-unacked
DataNode from the list - leaving a live DataNode serving a just-revoked
permission until its own token timeout. (Phase 1 already closed the fenced-
DataNode case via DN-side fail-closed; this closes the live-transiently-unacked
case.)

The step now runs through ClusterCachePropagator over the live registered
DataNodes: it proceeds once every unreachable DataNode is provably self-fenced
(it fails closed on auth and resyncs on recovery) and fails only when a live
DataNode stays unacked - never silently dropping one. Unknown DataNodes are not
sync-contacted (avoids connect-timeout stalls) and are reported as not-acked for
the verdict. Fields/serialization are unchanged for procedure-restart
compatibility (dataNodesToInvalid is now vestigial). AuthOperationProcedureTest
passes.
These were accidentally committed by a 'git add -A'; they are informal design
notes, not part of the change, and carry no Apache license header.
@Caideyipi
Copy link
Copy Markdown
Collaborator

Caideyipi commented Jun 3, 2026

I reviewed this implementation and the design note. A few safety-semantics points still look worth tightening or proving explicitly:

  1. There is a window during lease recovery before recovery listeners finish.
    MetadataLeaseManager.recordConfigNodeHeartbeat() updates lastConfigNodeHeartbeatNanos before running recovery listeners. During listener execution, and also if a listener throws and the exception is swallowed, isFenced() already returns false, so stale caches may be trusted again. That does not match the safety condition that caches are invalidated/resynced before the DataNode leaves the fenced state.
    I suggest making the lease state explicit, e.g. FENCED -> RECOVERING -> ACTIVE: only switch to ACTIVE after critical recovery listeners/resync complete successfully; otherwise remain fenced.
    Code:

    public void recordConfigNodeHeartbeat() {
    final boolean wasFenced = isFenced();
    this.lastConfigNodeHeartbeatNanos = nanoClock.getAsLong();
    if (wasFenced) {
    for (final Runnable listener : leaseRecoveryListeners) {
    try {
    listener.run();
    } catch (final Exception e) {
    // A misbehaving listener must not break heartbeat processing / lease renewal.
    LOGGER.warn("Metadata lease recovery listener failed", e);

  2. Leader-change safety still needs an epoch/token-level proof.
    A DataNode currently renews the metadata lease on any getDataNodeHeartBeat request, and the heartbeat does not carry a leader term or lease token. Resetting lastResp on the new leader avoids using stale history, but it does not prevent an old leader in a minority partition from continuing to renew some DataNodes. If the new leader cannot receive responses from those DataNodes, it may classify them as FENCED_SAFE after T_proceed, while they are actually being renewed by the old leader and have not self-fenced.
    This needs either a strict proof that an old leader cannot keep sending effective lease-renewing heartbeats after the new leader is ready, or a heartbeat leader epoch/token so a DataNode only renews from the currently valid token.
    Code:

    public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) throws TException {
    TDataNodeHeartbeatResp resp = new TDataNodeHeartbeatResp();
    // Renew the metadata lease: receiving a ConfigNode heartbeat means this DataNode is still in
    // contact with the cluster and may keep trusting its ConfigNode-pushed metadata caches.
    MetadataLeaseManager.getInstance().recordConfigNodeHeartbeat();

  3. AuthOperationProcedure still has a post-consensus failure semantics issue.
    Removing the old 180s silent drop is a good improvement. However, writePlan commits to the ConfigNode consensus first, and then ClusterCachePropagator.propagate() may return false and call setFailure. At that point the permission change is already effective on the ConfigNode, while a heartbeat-fresh DataNode whose invalidate RPC keeps failing may still serve with the old permission cache. For security-sensitive operations such as REVOKE / DROP USER / DROP ROLE, failing the procedure after the write does not restore the safety semantics.
    I suggest keeping this step pending/retrying after a successful write until every DataNode has acked or is fenced, or actively fencing the unsafe DataNode, or introducing an auth version/token so the DataNode fails closed when its permission version is not known to be current.
    Code:

    writePlan(env);
    return Flow.HAS_MORE_STATE;
    case DATANODE_AUTHCACHE_INVALIDING:
    final TInvalidatePermissionCacheReq req = new TInvalidatePermissionCacheReq();
    req.setUsername(user);
    req.setRoleName(role);
    // Proceed once every unreachable DataNode is provably self-fenced (it fails closed on
    // auth
    // and resyncs on recovery), instead of silently dropping a DataNode after a timeout,
    // which
    // left a live but un-acked DataNode serving the just-revoked permission. Fail only when a
    // live DataNode stays un-acked. (dataNodesToInvalid is retained for serialization
    // compatibility but no longer drives this step.)
    if (new ClusterCachePropagator(env.getConfigManager())
    .propagate(targets -> invalidatePermissionCacheOnce(env, targets, req))) {
    LOGGER.info(ProcedureMessages.AUTH_PROCEDURE_CLEAN_DATANODE_CACHE_SUCCESSFULLY);
    return Flow.NO_MORE_STATE;
    }
    setFailure(

    setNextState(DATANODE_AUTHCACHE_INVALIDING);
    for (TDataNodeConfiguration item : datanodes) {
    this.dataNodesToInvalid.add(new Pair<>(item, System.currentTimeMillis()));
    }
    LOGGER.info(
    ProcedureMessages.EXECUTE_AUTH_PLAN_SUCCESS_TO_INVALIDATE_DATANODES,
    plan,
    dataNodesToInvalid);
    } else {
    LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, res.message);
    setFailure(new ProcedureException(new IoTDBException(res)));

  4. Please add a few negative/race tests for the safety proof.
    The current IT covers CREATE TABLE succeeding with one stopped DataNode, but the more important safety cases are still uncovered:

    • heartbeat succeeds while the metadata broadcast RPC keeps failing;
    • leader change with stale/continued heartbeats from the old leader;
    • recovery listener delay/failure must not make the DataNode leave fenced state early;
    • after REVOKE or another permission change, a partitioned or half-broken DataNode must not continue authorizing with the old cache.

These points do not block the availability goal for a stopped DataNode, but they do affect whether the fencing scheme is fully safe under the failure modes it is meant to cover.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants