From 25c34c2bfe8ecb68172ef2dc351e49cbad42f62a Mon Sep 17 00:00:00 2001 From: OpenAI Codex Date: Thu, 28 May 2026 05:53:30 +0000 Subject: [PATCH 1/3] Subscription: add topic owner epoch fencing --- .../local/IoTDBSubscriptionTopicOwnerIT.java | 150 ++++++++++++++++ .../org/apache/iotdb/rpc/TSStatusCode.java | 5 + .../subscription/config/ConsumerConfig.java | 10 ++ .../subscription/config/ConsumerConstant.java | 2 + .../subscription/config/TopicConstant.java | 4 + .../SubscriptionOwnerFencedException.java | 45 +++++ .../base/AbstractSubscriptionConsumer.java | 26 +++ .../AbstractSubscriptionConsumerBuilder.java | 23 +++ .../base/AbstractSubscriptionProvider.java | 24 +++ .../table/SubscriptionTableProvider.java | 4 + .../table/SubscriptionTablePullConsumer.java | 4 + .../SubscriptionTablePullConsumerBuilder.java | 18 ++ .../table/SubscriptionTablePushConsumer.java | 4 + .../SubscriptionTablePushConsumerBuilder.java | 18 ++ .../tree/SubscriptionTreeProvider.java | 4 + .../tree/SubscriptionTreePullConsumer.java | 21 +++ .../SubscriptionTreePullConsumerBuilder.java | 18 ++ .../tree/SubscriptionTreePushConsumer.java | 21 +++ .../SubscriptionTreePushConsumerBuilder.java | 18 ++ .../subscription/SubscriptionInfo.java | 8 + .../subscription/SubscriptionInfoTest.java | 70 ++++++++ .../agent/SubscriptionTopicAgent.java | 72 ++++++++ .../receiver/SubscriptionReceiverV1.java | 57 ++++++ .../receiver/SubscriptionReceiverV1Test.java | 94 ++++++++++ .../subscription/meta/topic/TopicMeta.java | 167 +++++++++++++++++- .../subscription/topic/TopicDeSerTest.java | 61 +++++++ 26 files changed, 946 insertions(+), 2 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java create mode 100644 iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java new file mode 100644 index 0000000000000..4e89755048665 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.local; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionOwnerFencedException; +import org.apache.iotdb.session.subscription.SubscriptionTreeSession; +import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.session.subscription.payload.SubscriptionRecordHandler; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSubscriptionTopicOwnerIT extends AbstractSubscriptionLocalIT { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Ignore + @Test + public void testTopicOwnerFencingRejectsStaleOwnerAndAllowsCurrentOwner() throws Exception { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + final String topicName = "topic_owner_fencing"; + + try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) { + session.open(); + final Properties properties = new Properties(); + properties.put(TopicConstant.PATH_KEY, "root.topic_owner.**"); + properties.put(TopicConstant.START_TIME_KEY, "0"); + properties.put(TopicConstant.OWNER_ID_KEY, "sn2"); + properties.put(TopicConstant.OWNER_EPOCH_KEY, "6"); + session.createTopic(topicName, properties); + } + + try { + try (final SubscriptionTreePullConsumer staleOwnerConsumer = + new SubscriptionTreePullConsumer.Builder() + .host(host) + .port(port) + .consumerId("stale_sn") + .consumerGroupId("topic_owner_group") + .ownerId("sn1") + .ownerEpoch(5L) + .autoCommit(false) + .buildPullConsumer()) { + staleOwnerConsumer.open(); + Assert.assertThrows( + SubscriptionOwnerFencedException.class, () -> staleOwnerConsumer.subscribe(topicName)); + } + + try (final SubscriptionTreePullConsumer currentOwnerConsumer = + new SubscriptionTreePullConsumer.Builder() + .host(host) + .port(port) + .consumerId("current_sn") + .consumerGroupId("topic_owner_group") + .ownerId("sn2") + .ownerEpoch(6L) + .autoCommit(false) + .buildPullConsumer()) { + currentOwnerConsumer.open(); + currentOwnerConsumer.subscribe(topicName); + + insertData(); + + final AtomicReference> polledMessages = + new AtomicReference<>(Collections.emptyList()); + IoTDBSubscriptionITConstant.AWAIT.untilAsserted( + () -> { + final List messages = + currentOwnerConsumer.poll(Duration.ofMillis(1000)); + polledMessages.set(messages); + Assert.assertFalse(messages.isEmpty()); + Assert.assertTrue(countRows(messages) > 0); + }); + + currentOwnerConsumer.commitSync(polledMessages.get()); + currentOwnerConsumer.unsubscribe(topicName); + } + } finally { + try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) { + session.open(); + session.dropTopicIfExists(topicName); + } + } + } + + private void insertData() throws Exception { + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + for (int i = 0; i < 10; i++) { + session.executeNonQueryStatement( + String.format("insert into root.topic_owner.d1(time, s1) values (%s, %s)", i, i)); + } + session.executeNonQueryStatement("flush"); + } + } + + private static int countRows(final List messages) throws Exception { + int rowCount = 0; + for (final SubscriptionMessage message : messages) { + for (final ResultSet resultSet : message.getResultSets()) { + while (((SubscriptionRecordHandler.SubscriptionResultSet) resultSet).hasNext()) { + resultSet.next(); + rowCount++; + } + } + } + return rowCount; + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index e5014681fa724..97a70963d720c 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -316,6 +316,11 @@ public enum TSStatusCode { SHOW_SUBSCRIPTION_ERROR(1910), SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911), SUBSCRIPTION_NOT_ENABLED_ERROR(1912), + SUBSCRIPTION_OWNER_FENCED(1913), + SUBSCRIPTION_OWNER_REQUIRED(1914), + SUBSCRIPTION_OWNER_EPOCH_REQUIRED(1915), + SUBSCRIPTION_OWNER_LEASE_EXPIRED(1916), + SUBSCRIPTION_OWNER_EPOCH_CONFLICT(1917), // Topic CREATE_TOPIC_ERROR(2000), diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 3cb0087d6827e..13f2a9ee3fb03 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -68,6 +68,16 @@ public String getConsumerGroupId() { return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY); } + public String getOwnerId() { + return getString(ConsumerConstant.OWNER_ID_KEY); + } + + public Long getOwnerEpoch() { + return hasAttribute(ConsumerConstant.OWNER_EPOCH_KEY) + ? getLong(ConsumerConstant.OWNER_EPOCH_KEY) + : null; + } + public String getUsername() { return getString(ConsumerConstant.USERNAME_KEY); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index 90d2ea7a01fb0..3df95facf367c 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -40,6 +40,8 @@ public class ConsumerConstant { public static final String CONSUMER_ID_KEY = "consumer-id"; public static final String CONSUMER_GROUP_ID_KEY = "group-id"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat-interval-ms"; public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java index 52c8e4de75221..73036684ccde9 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java @@ -61,6 +61,10 @@ public class TopicConstant { public static final String STRICT_KEY = "strict"; public static final String STRICT_DEFAULT_VALUE = "true"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; + public static final String OWNER_LEASE_EXPIRE_TIME_MS_KEY = "owner-lease-expire-time-ms"; + private TopicConstant() { throw new IllegalStateException(SubscriptionMessages.UTILITY_CLASS); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java new file mode 100644 index 0000000000000..f00c3ba1dcb0c --- /dev/null +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionOwnerFencedException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc.subscription.exception; + +import java.util.Objects; + +public class SubscriptionOwnerFencedException extends SubscriptionRuntimeNonCriticalException { + + public SubscriptionOwnerFencedException(final String message) { + super(message); + } + + public SubscriptionOwnerFencedException(final String message, final Throwable cause) { + super(message, cause); + } + + @Override + public boolean equals(final Object obj) { + return obj instanceof SubscriptionOwnerFencedException + && Objects.equals(getMessage(), ((SubscriptionOwnerFencedException) obj).getMessage()) + && Objects.equals(getTimeStamp(), ((SubscriptionOwnerFencedException) obj).getTimeStamp()); + } + + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index 63e5a263f22b3..ed59935b7f2b4 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -25,6 +25,7 @@ import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionOwnerFencedException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPollTimeoutException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException; @@ -105,6 +106,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; private final long heartbeatIntervalMs; private final long endpointsSyncIntervalMs; @@ -154,6 +157,14 @@ public String getConsumerGroupId() { return consumerGroupId; } + public String getOwnerId() { + return ownerId; + } + + public Long getOwnerEpoch() { + return ownerEpoch; + } + /////////////////////////////// ctor /////////////////////////////// protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) { @@ -183,6 +194,8 @@ protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder this.consumerId = builder.consumerId; this.consumerGroupId = builder.consumerGroupId; + this.ownerId = builder.ownerId; + this.ownerEpoch = builder.ownerEpoch; this.heartbeatIntervalMs = builder.heartbeatIntervalMs; this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs; @@ -213,6 +226,8 @@ protected AbstractSubscriptionConsumer( .encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY)) .consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)) .consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)) + .ownerId((String) properties.get(ConsumerConstant.OWNER_ID_KEY)) + .ownerEpoch((Long) properties.get(ConsumerConstant.OWNER_EPOCH_KEY)) .heartbeatIntervalMs( (Long) properties.getOrDefault( @@ -394,6 +409,8 @@ protected abstract AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs); @@ -408,6 +425,8 @@ AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPo this.encryptedPassword, this.consumerId, this.consumerGroupId, + this.ownerId, + this.ownerEpoch, this.thriftMaxFrameSize, this.heartbeatIntervalMs, this.connectionTimeoutInMs); @@ -1341,6 +1360,9 @@ private void subscribeWithRedirection(final Set topicNames) throws Subsc subscribedTopics = provider.subscribe(topicNames); return; } catch (final Exception e) { + if (e instanceof SubscriptionOwnerFencedException) { + throw (SubscriptionOwnerFencedException) e; + } if (e instanceof SubscriptionPipeTimeoutException) { // degrade exception to log for pipe timeout LOGGER.warn(e.getMessage()); @@ -1429,6 +1451,8 @@ protected Map coreReportMessage() { final Map result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("isClosed", isClosed.toString()); result.put("fileSaveDir", fileSaveDir); result.put( @@ -1443,6 +1467,8 @@ protected Map allReportMessage() { final Map result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); result.put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs)); result.put("providers", providers.toString()); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 991d237ed2b89..a0c4b421ed18b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -40,6 +40,8 @@ public class AbstractSubscriptionConsumerBuilder { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; protected long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; protected long endpointsSyncIntervalMs = @@ -111,6 +113,27 @@ public AbstractSubscriptionConsumerBuilder consumerGroupId( return this; } + public AbstractSubscriptionConsumerBuilder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(@Nullable final Long ownerEpoch) { + if (Objects.isNull(ownerEpoch)) { + return this; + } + this.ownerEpoch = ownerEpoch; + return this; + } + public AbstractSubscriptionConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 413c609abbff3..3751d019fe779 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -27,6 +27,7 @@ import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionOwnerFencedException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException; @@ -81,6 +82,8 @@ public abstract class AbstractSubscriptionProvider { private String consumerId; private String consumerGroupId; + private final String ownerId; + private final Long ownerEpoch; private final AtomicBoolean isClosed = new AtomicBoolean(true); private final AtomicBoolean isAvailable = new AtomicBoolean(false); @@ -109,6 +112,8 @@ protected AbstractSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -124,6 +129,8 @@ protected AbstractSubscriptionProvider( this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; this.username = username; this.password = password; this.encryptedPassword = encryptedPassword; @@ -176,6 +183,12 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep final Map consumerAttributes = new HashMap<>(); consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId); consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); + if (ownerId != null) { + consumerAttributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + consumerAttributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); if (encryptedPassword != null) { @@ -451,6 +464,17 @@ private static void verifyPipeSubscribeSuccess(final TSStatus status) case 1911: // SUBSCRIPTION_PIPE_TIMEOUT_ERROR throw new SubscriptionPipeTimeoutException( String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, status.code, status.message)); + case 1913: // SUBSCRIPTION_OWNER_FENCED + case 1914: // SUBSCRIPTION_OWNER_REQUIRED + case 1915: // SUBSCRIPTION_OWNER_EPOCH_REQUIRED + case 1916: // SUBSCRIPTION_OWNER_LEASE_EXPIRED + case 1917: // SUBSCRIPTION_OWNER_EPOCH_CONFLICT + { + final String errorMessage = + String.format(INTERNAL_ERROR_FORMATTER, status.code, status.message); + LOGGER.warn(errorMessage); + throw new SubscriptionOwnerFencedException(errorMessage); + } case 1900: // SUBSCRIPTION_VERSION_ERROR case 1901: // SUBSCRIPTION_TYPE_ERROR case 1909: // SUBSCRIPTION_MISSING_CUSTOMER diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 84470d283c21b..ff67e3e532c3d 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 8f712782fb5f0..4c390c96420e3 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -45,6 +45,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -55,6 +57,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index 939228a7f49e9..efc99debb4977 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -74,6 +74,24 @@ public SubscriptionTablePullConsumerBuilder consumerGroupId(final String consume return this; } + @Override + public SubscriptionTablePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index e90afc1d8d175..4c85993a933dc 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -41,6 +41,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -51,6 +53,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index 27bf328fea9e6..143b34056af9e 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -76,6 +76,24 @@ public SubscriptionTablePushConsumerBuilder consumerGroupId(final String consume return this; } + @Override + public SubscriptionTablePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index 3589fbbcf749a..8720c577d4dfa 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 7225036aaa4cf..d4d615130625b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -52,6 +52,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -62,6 +64,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -85,6 +89,8 @@ private SubscriptionTreePullConsumer(final SubscriptionTreePullConsumer.Builder .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -238,6 +244,8 @@ public static class Builder { private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -299,6 +307,19 @@ public Builder consumerGroupId(@Nullable final String consumerGroupId) { return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index cbceb95d77f90..2d057b7bfbd80 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -74,6 +74,24 @@ public SubscriptionTreePullConsumerBuilder consumerGroupId(final String consumer return this; } + @Override + public SubscriptionTreePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index 4d8a5ef3e169f..8fbc33d14ce8a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -51,6 +51,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -61,6 +63,8 @@ protected AbstractSubscriptionProvider constructSubscriptionProvider( encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -84,6 +88,8 @@ private SubscriptionTreePushConsumer(final Builder builder) { .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -192,6 +198,8 @@ public static class Builder { private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -256,6 +264,19 @@ public Builder consumerGroupId(@Nullable final String consumerGroupId) { return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index 86594433e77e0..9067f3e7ae564 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -76,6 +76,24 @@ public SubscriptionTreePushConsumerBuilder consumerGroupId(final String consumer return this; } + @Override + public SubscriptionTreePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index 6fcdcf28ebd0f..1849823888f0a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -338,6 +338,14 @@ public TSStatus alterTopic(AlterTopicPlan plan) { } private TSStatus alterTopicInternal(final AlterTopicPlan plan) { + try { + TopicMeta.validateOwnerProgression( + topicMetaKeeper.getTopicMeta(plan.getTopicMeta().getTopicName()), plan.getTopicMeta()); + } catch (final IllegalArgumentException e) { + return new TSStatus(TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_CONFLICT.getStatusCode()) + .setMessage(e.getMessage()); + } + topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName()); topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java new file mode 100644 index 0000000000000..768eab6ea50d8 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.persistence.subscription; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan; +import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class SubscriptionInfoTest { + + @Test + public void testAlterTopicRejectsOwnerEpochRollback() { + final String topicName = "topic-" + UUID.randomUUID(); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); + + final TopicMeta initialTopicMeta = createTopicMeta(topicName, "sn1", 5L); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.createTopic(new CreateTopicPlan(initialTopicMeta)).getCode()); + + final TopicMeta transferredTopicMeta = initialTopicMeta.deepCopy(); + transferredTopicMeta.transferOwner("sn2", 6L); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.alterTopic(new AlterTopicPlan(transferredTopicMeta)).getCode()); + + final TSStatus rollbackStatus = + subscriptionInfo.alterTopic(new AlterTopicPlan(createTopicMeta(topicName, "sn1", 5L))); + + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_CONFLICT.getStatusCode(), rollbackStatus.getCode()); + Assert.assertEquals("sn2", subscriptionInfo.getTopicMeta(topicName).getOwnerId()); + Assert.assertEquals(6L, subscriptionInfo.getTopicMeta(topicName).getOwnerEpoch()); + } + + private TopicMeta createTopicMeta( + final String topicName, final String ownerId, final long ownerEpoch) { + final Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + return new TopicMeta(topicName, 1, topicAttributes); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java index 9b629ab9c8dde..d524b38d4c13c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java @@ -19,10 +19,14 @@ package org.apache.iotdb.db.subscription.agent; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.subscription.meta.topic.TopicMetaKeeper; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.config.TopicConstant; @@ -31,6 +35,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -87,6 +92,8 @@ public TPushTopicMetaRespExceptionMessage handleSingleTopicMetaChanges( private void handleSingleTopicMetaChangesInternal(final TopicMeta metaFromCoordinator) { final String topicName = metaFromCoordinator.getTopicName(); + TopicMeta.validateOwnerProgression( + topicMetaKeeper.getTopicMeta(topicName), metaFromCoordinator); topicMetaKeeper.removeTopicMeta(topicName); topicMetaKeeper.addTopicMeta(topicName, metaFromCoordinator); } @@ -188,4 +195,69 @@ public Map getTopicConfigs(final Set topicNames) { releaseReadLock(); } } + + public TSStatus checkTopicOwner(final ConsumerConfig consumerConfig, final String topicName) { + acquireReadLock(); + try { + final TopicMeta topicMeta = topicMetaKeeper.getTopicMeta(topicName); + if (Objects.isNull(topicMeta) || !topicMeta.isOwnerFencingEnabled()) { + return RpcUtils.SUCCESS_STATUS; + } + + final String requestOwnerId = consumerConfig.getOwnerId(); + if (Objects.isNull(requestOwnerId)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-id.", + topicName, consumerConfig)); + } + + final Long requestOwnerEpoch = consumerConfig.getOwnerEpoch(); + if (Objects.isNull(requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-epoch.", + topicName, consumerConfig)); + } + + if (Objects.nonNull(topicMeta.getOwnerLeaseExpireTimeMs()) + && System.currentTimeMillis() > topicMeta.getOwnerLeaseExpireTimeMs()) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_LEASE_EXPIRED, + String.format( + "Subscription: owner lease for topic %s has expired, owner-id: %s, owner-epoch: %s.", + topicName, topicMeta.getOwnerId(), topicMeta.getOwnerEpoch())); + } + + if (!topicMeta.matchesOwner(requestOwnerId, requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED, + String.format( + "Subscription: consumer owner is fenced for topic %s, request owner-id: %s," + + " request owner-epoch: %s, current owner-id: %s, current owner-epoch: %s.", + topicName, + requestOwnerId, + requestOwnerEpoch, + topicMeta.getOwnerId(), + topicMeta.getOwnerEpoch())); + } + + return RpcUtils.SUCCESS_STATUS; + } finally { + releaseReadLock(); + } + } + + public TSStatus checkTopicOwners( + final ConsumerConfig consumerConfig, final Iterable topicNames) { + for (final String topicName : topicNames) { + final TSStatus status = checkTopicOwner(consumerConfig, topicName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + } + return RpcUtils.SUCCESS_STATUS; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 2c4accecf45f0..fe08ffc43ca39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -326,6 +326,16 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal( } // TODO: do something + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(ownerStatus); + } LOGGER.info(DataNodeMiscMessages.SUBSCRIPTION_CONSUMER_HEARTBEAT_SUCCESS, consumerConfig); @@ -406,6 +416,11 @@ private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal( // subscribe topics final Set topicNames = req.getTopicNames(); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNames); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(ownerStatus); + } subscribe(consumerConfig, topicNames); LOGGER.info( @@ -498,16 +513,48 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(final PipeSubscribePo if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) { switch (SubscriptionPollRequestType.valueOf(requestType)) { case POLL: + final Set pollTopicNames = ((PollPayload) request.getPayload()).getTopicNames(); + final Set subscribedTopicNames = + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()); + final Set topicNamesToCheck = new HashSet<>(pollTopicNames); + topicNamesToCheck.removeIf(topicName -> !subscribedTopicNames.contains(topicName)); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNamesToCheck); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp(ownerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollRequest( consumerConfig, (PollPayload) request.getPayload(), maxBytes); break; case POLL_FILE: + final TSStatus tsFileOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollFilePayload) request.getPayload()).getCommitContext().getTopicName()); + if (tsFileOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tsFileOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTsFileRequest( consumerConfig, (PollFilePayload) request.getPayload()); break; case POLL_TABLETS: + final TSStatus tabletsOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollTabletsPayload) request.getPayload()) + .getCommitContext() + .getTopicName()); + if (tabletsOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tabletsOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTabletsRequest( consumerConfig, (PollTabletsPayload) request.getPayload()); @@ -666,6 +713,16 @@ private TPipeSubscribeResp handlePipeSubscribeCommitInternal(final PipeSubscribe // commit (ack or nack) final List commitContexts = req.getCommitContexts(); + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + commitContexts.stream() + .map(SubscriptionCommitContext::getTopicName) + .collect(Collectors.toSet())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeCommitResp.toTPipeSubscribeResp(ownerStatus); + } final boolean nack = req.isNack(); final List successfulCommitContexts = SubscriptionAgent.broker().commit(consumerConfig, commitContexts, nack); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java index ba0070187e368..c28111ec9d110 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java @@ -20,14 +20,19 @@ package org.apache.iotdb.db.subscription.receiver; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -82,6 +87,76 @@ public void testCalculateConsumerInactivityTimeoutUsesHeartbeatMultiple() throws invokeCalculateConsumerInactivityTimeoutMs(receiver, createConsumerConfig(5_000L))); } + @Test + public void testTopicOwnerFencingStatus() { + final String topicName = "topic-" + UUID.randomUUID(); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(createTopicMeta(topicName, "sn1", 7L)); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn1", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn2", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L), topicName) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + + @Test + public void testOldOwnerFencedAfterNetworkPartitionAndTopicOwnerTransfer() { + final String topicName = "topic-" + UUID.randomUUID(); + final TopicMeta topicMeta = createTopicMeta(topicName, "sn1", 5L); + final ConsumerConfig oldSnConsumer = createConsumerConfig(1_000L, "sn1", 5L); + final ConsumerConfig newSnConsumer = createConsumerConfig(1_000L, "sn2", 6L); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(topicMeta); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + + final TopicMeta transferredTopicMeta = topicMeta.deepCopy(); + transferredTopicMeta.transferOwner("sn2", 6L); + SubscriptionAgent.topic().handleSingleTopicMetaChanges(transferredTopicMeta); + + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(oldSnConsumer, Collections.singleton(topicName)) + .getCode()); + Assert.assertNotNull( + SubscriptionAgent.topic() + .handleSingleTopicMetaChanges(createTopicMeta(topicName, "sn1", 5L))); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(newSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(newSnConsumer, Collections.singleton(topicName)) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + private long invokeCalculateConsumerInactivityTimeoutMs( final SubscriptionReceiverV1 receiver, final ConsumerConfig consumerConfig) throws Exception { final Method method = @@ -91,11 +166,30 @@ private long invokeCalculateConsumerInactivityTimeoutMs( return (long) method.invoke(receiver, consumerConfig); } + private TopicMeta createTopicMeta( + final String topicName, final String ownerId, final long ownerEpoch) { + final Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + return new TopicMeta(topicName, 1, topicAttributes); + } + private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) { + return createConsumerConfig(heartbeatIntervalMs, null, null); + } + + private ConsumerConfig createConsumerConfig( + final long heartbeatIntervalMs, final String ownerId, final Long ownerEpoch) { final Map attributes = new HashMap<>(); attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + UUID.randomUUID()); attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + UUID.randomUUID()); attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + if (ownerId != null) { + attributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + attributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } return new ConsumerConfig(attributes); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index badb77d6f486d..44222a5a697fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.rpc.subscription.config.TopicConfig; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,11 +46,18 @@ public class TopicMeta { private long creationTime; // unit in ms private TopicConfig config; + private String ownerId; + private long ownerEpoch; + private long ownerLastTransferTimeMs; + private Long ownerLeaseExpireTimeMs; + // TODO: remove this variable later private Set subscribedConsumerGroupIds; // unused now private TopicMeta() { this.config = new TopicConfig(new HashMap<>()); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -59,6 +67,9 @@ public TopicMeta( this.topicName = topicName; this.creationTime = creationTime; this.config = new TopicConfig(topicAttributes); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; + initOwnerFromTopicAttributes(topicAttributes); this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -68,6 +79,10 @@ public TopicMeta deepCopy() { copied.topicName = topicName; copied.creationTime = creationTime; copied.config = new TopicConfig(new HashMap<>(config.getAttribute())); + copied.ownerId = ownerId; + copied.ownerEpoch = ownerEpoch; + copied.ownerLastTransferTimeMs = ownerLastTransferTimeMs; + copied.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; copied.subscribedConsumerGroupIds = new HashSet<>(subscribedConsumerGroupIds); return copied; @@ -85,6 +100,81 @@ public TopicConfig getConfig() { return config; } + public boolean isOwnerFencingEnabled() { + return Objects.nonNull(ownerId) && ownerEpoch >= 0; + } + + public String getOwnerId() { + return ownerId; + } + + public long getOwnerEpoch() { + return ownerEpoch; + } + + public long getOwnerLastTransferTimeMs() { + return ownerLastTransferTimeMs; + } + + public Long getOwnerLeaseExpireTimeMs() { + return ownerLeaseExpireTimeMs; + } + + public void transferOwner(final String ownerId, final long ownerEpoch) { + transferOwner(ownerId, ownerEpoch, null); + } + + public void transferOwner( + final String ownerId, final long ownerEpoch, final Long ownerLeaseExpireTimeMs) { + if (Objects.isNull(ownerId) || ownerId.isEmpty()) { + throw new IllegalArgumentException("Subscription topic owner id should not be empty"); + } + if (ownerEpoch < 0) { + throw new IllegalArgumentException("Subscription topic owner epoch should not be negative"); + } + if (isOwnerFencingEnabled() && ownerEpoch <= this.ownerEpoch) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should increase monotonically, current epoch is %s," + + " incoming epoch is %s", + this.ownerEpoch, ownerEpoch)); + } + + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; + this.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; + this.ownerLastTransferTimeMs = System.currentTimeMillis(); + + config.getAttribute().put(TopicConstant.OWNER_ID_KEY, ownerId); + config.getAttribute().put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + if (Objects.nonNull(ownerLeaseExpireTimeMs)) { + config + .getAttribute() + .put( + TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, String.valueOf(ownerLeaseExpireTimeMs)); + } else { + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + } + + public void clearOwner() { + ownerId = null; + ownerEpoch = -1L; + ownerLastTransferTimeMs = -1L; + ownerLeaseExpireTimeMs = null; + config.getAttribute().remove(TopicConstant.OWNER_ID_KEY); + config.getAttribute().remove(TopicConstant.OWNER_EPOCH_KEY); + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + + public boolean matchesOwner(final String requestOwnerId, final Long requestOwnerEpoch) { + return !isOwnerFencingEnabled() + || (Objects.equals(ownerId, requestOwnerId) + && Objects.equals(ownerEpoch, requestOwnerEpoch) + && (Objects.isNull(ownerLeaseExpireTimeMs) + || System.currentTimeMillis() <= ownerLeaseExpireTimeMs)); + } + /** * @return true if the consumer group did not already subscribe this topic */ @@ -156,6 +246,8 @@ public static TopicMeta deserialize(final InputStream inputStream) throws IOExce topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(inputStream)); } + topicMeta.initOwnerFromTopicAttributes(topicMeta.config.getAttribute()); + return topicMeta; } @@ -177,9 +269,68 @@ public static TopicMeta deserialize(final ByteBuffer byteBuffer) { topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(byteBuffer)); } + topicMeta.initOwnerFromTopicAttributes(topicMeta.config.getAttribute()); + return topicMeta; } + public static void validateOwnerProgression( + final TopicMeta currentTopicMeta, final TopicMeta updatedTopicMeta) { + if (Objects.isNull(currentTopicMeta) + || Objects.isNull(updatedTopicMeta) + || !currentTopicMeta.isOwnerFencingEnabled()) { + return; + } + + if (!updatedTopicMeta.isOwnerFencingEnabled()) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner should not be cleared by stale topic meta, topic: %s," + + " current owner-id: %s, current owner-epoch: %s", + currentTopicMeta.getTopicName(), + currentTopicMeta.getOwnerId(), + currentTopicMeta.getOwnerEpoch())); + } + + final boolean epochRollback = + updatedTopicMeta.getOwnerEpoch() < currentTopicMeta.getOwnerEpoch(); + final boolean sameEpochOwnerChanged = + updatedTopicMeta.getOwnerEpoch() == currentTopicMeta.getOwnerEpoch() + && !Objects.equals(updatedTopicMeta.getOwnerId(), currentTopicMeta.getOwnerId()); + if (epochRollback || sameEpochOwnerChanged) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should not roll back, topic: %s, current owner-id:" + + " %s, current owner-epoch: %s, incoming owner-id: %s, incoming owner-epoch:" + + " %s", + currentTopicMeta.getTopicName(), + currentTopicMeta.getOwnerId(), + currentTopicMeta.getOwnerEpoch(), + updatedTopicMeta.getOwnerId(), + updatedTopicMeta.getOwnerEpoch())); + } + } + + private void initOwnerFromTopicAttributes(final Map topicAttributes) { + final TopicConfig topicConfig = new TopicConfig(topicAttributes); + final String configuredOwnerId = topicConfig.getString(TopicConstant.OWNER_ID_KEY); + if (Objects.isNull(configuredOwnerId)) { + return; + } + + final Long configuredOwnerEpoch = topicConfig.getLong(TopicConstant.OWNER_EPOCH_KEY); + if (Objects.isNull(configuredOwnerEpoch)) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should be set when %s is set", + TopicConstant.OWNER_ID_KEY)); + } + transferOwner( + configuredOwnerId, + configuredOwnerEpoch, + topicConfig.getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + /////////////////////////////// utilities /////////////////////////////// public Map generateExtractorAttributes(final String username) { @@ -257,12 +408,16 @@ public boolean equals(final Object obj) { final TopicMeta that = (TopicMeta) obj; return creationTime == that.creationTime && Objects.equals(topicName, that.topicName) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(ownerId, that.ownerId) + && ownerEpoch == that.ownerEpoch + && Objects.equals(ownerLeaseExpireTimeMs, that.ownerLeaseExpireTimeMs); } @Override public int hashCode() { - return Objects.hash(topicName, creationTime, config); + return Objects.hash( + topicName, creationTime, config, ownerId, ownerEpoch, ownerLeaseExpireTimeMs); } @Override @@ -274,6 +429,14 @@ public String toString() { + creationTime + ", config=" + config + + ", ownerId='" + + ownerId + + "', ownerEpoch=" + + ownerEpoch + + ", ownerLastTransferTimeMs=" + + ownerLastTransferTimeMs + + ", ownerLeaseExpireTimeMs=" + + ownerLeaseExpireTimeMs + '}'; } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index d9c280e14938c..0511edff6203e 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -20,10 +20,13 @@ package org.apache.iotdb.commons.subscription.topic; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; @@ -55,6 +58,64 @@ public void test() throws IOException { topicMeta.getSubscribedConsumerGroupIds(), topicMeta2.getSubscribedConsumerGroupIds()); } + @Test + public void testTopicOwnerDeSer() throws IOException { + Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, "sn1"); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "5"); + + TopicMeta topicMeta = new TopicMeta("test_topic", 1, topicAttributes); + + Assert.assertTrue(topicMeta.isOwnerFencingEnabled()); + Assert.assertEquals("sn1", topicMeta.getOwnerId()); + Assert.assertEquals(5L, topicMeta.getOwnerEpoch()); + Assert.assertTrue(topicMeta.matchesOwner("sn1", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn2", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn1", 4L)); + + TopicMeta topicMeta1 = TopicMeta.deserialize(topicMeta.serialize()); + TopicMeta topicMeta2 = topicMeta1.deepCopy(); + + Assert.assertEquals(topicMeta, topicMeta1); + Assert.assertEquals(topicMeta, topicMeta2); + Assert.assertEquals(topicMeta.getOwnerId(), topicMeta2.getOwnerId()); + Assert.assertEquals(topicMeta.getOwnerEpoch(), topicMeta2.getOwnerEpoch()); + Assert.assertEquals( + topicMeta.getOwnerLeaseExpireTimeMs(), topicMeta2.getOwnerLeaseExpireTimeMs()); + + topicMeta.transferOwner("sn2", 6L, 100L); + Assert.assertEquals("sn2", topicMeta.getOwnerId()); + Assert.assertEquals(6L, topicMeta.getOwnerEpoch()); + Assert.assertEquals("sn2", topicMeta.getConfig().getString(TopicConstant.OWNER_ID_KEY)); + Assert.assertEquals( + 6L, topicMeta.getConfig().getLong(TopicConstant.OWNER_EPOCH_KEY).longValue()); + Assert.assertEquals( + 100L, + topicMeta.getConfig().getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY).longValue()); + + topicMeta.clearOwner(); + Assert.assertFalse(topicMeta.isOwnerFencingEnabled()); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_ID_KEY)); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_EPOCH_KEY)); + Assert.assertFalse( + topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + + @Test + public void testSequentialTopicMetaDeserializeDoesNotConsumeNextTopic() throws IOException { + final TopicMeta firstTopicMeta = new TopicMeta("first_topic", 1, new HashMap<>()); + final TopicMeta secondTopicMeta = new TopicMeta("second_topic", 2, new HashMap<>()); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + firstTopicMeta.serialize(outputStream); + secondTopicMeta.serialize(outputStream); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Assert.assertEquals(firstTopicMeta, TopicMeta.deserialize(inputStream)); + Assert.assertEquals(secondTopicMeta, TopicMeta.deserialize(inputStream)); + Assert.assertEquals(0, inputStream.available()); + } + @Test public void testGenerateExtractorAttributesWithEncryptedPassword() { final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new HashMap<>()); From 0ea029f71a490f4f2854bf734cef2f9292cf1780 Mon Sep 17 00:00:00 2001 From: OpenAI Codex Date: Thu, 4 Jun 2026 05:16:21 +0000 Subject: [PATCH 2/3] Implement alter topic owner entrypoints --- .../AbstractSubscriptionSession.java | 82 +++++++++++++---- .../ISubscriptionTableSession.java | 43 +++++++++ .../ISubscriptionTreeSession.java | 43 +++++++++ .../SubscriptionTableSession.java | 22 +++++ .../subscription/SubscriptionTreeSession.java | 22 +++++ .../apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +- .../confignode/manager/ConfigManager.java | 9 ++ .../iotdb/confignode/manager/IManager.java | 4 + .../confignode/manager/ProcedureManager.java | 32 +++++++ .../subscription/SubscriptionCoordinator.java | 21 +++++ .../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++ .../subscription/SubscriptionInfoTest.java | 26 ++++++ .../db/protocol/client/ConfigNodeClient.java | 7 ++ .../config/TableConfigTaskVisitor.java | 13 +++ .../config/TreeConfigTaskVisitor.java | 12 +++ .../executor/ClusterConfigTaskExecutor.java | 30 ++++++ .../config/executor/IConfigTaskExecutor.java | 3 + .../sys/subscription/AlterTopicTask.java | 49 ++++++++++ .../queryengine/plan/parser/ASTVisitor.java | 18 ++++ .../analyzer/StatementAnalyzer.java | 6 ++ .../security/TreeAccessCheckVisitor.java | 7 ++ .../plan/relational/sql/ast/AlterTopic.java | 92 +++++++++++++++++++ .../plan/relational/sql/ast/AstVisitor.java | 4 + .../relational/sql/parser/AstBuilder.java | 8 ++ .../sql/util/DataNodeSqlFormatter.java | 26 ++++++ .../plan/statement/StatementType.java | 1 + .../plan/statement/StatementVisitor.java | 5 + .../subscription/AlterTopicStatement.java | 73 +++++++++++++++ .../plan/parser/StatementGeneratorTest.java | 15 +++ .../sql/parser/TopicStatementTest.java | 48 ++++++++++ .../subscription/meta/topic/TopicMeta.java | 11 +++ .../subscription/topic/TopicDeSerTest.java | 23 +++++ .../relational/grammar/sql/RelationalSql.g4 | 5 + .../src/main/thrift/confignode.thrift | 4 +- 34 files changed, 758 insertions(+), 18 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/AlterTopicTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterTopic.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/AlterTopicStatement.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/TopicStatementTest.java diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java index 6683a1c5adf2e..5bddd082e4e78 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java @@ -22,6 +22,7 @@ import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.iotdb.session.subscription.model.Subscription; import org.apache.iotdb.session.subscription.model.Topic; @@ -103,27 +104,53 @@ private void createTopic( } return; } - final StringBuilder sb = new StringBuilder(); - sb.append('('); - properties.forEach( - (k, v) -> - sb.append('\'') - .append(k) - .append('\'') - .append('=') - .append('\'') - .append(v) - .append('\'') - .append(',')); - sb.deleteCharAt(sb.length() - 1); - sb.append(')'); final String sql = isSetIfNotExistsCondition - ? String.format("CREATE TOPIC IF NOT EXISTS %s WITH %s", topicName, sb) - : String.format("CREATE TOPIC %s WITH %s", topicName, sb); + ? String.format( + "CREATE TOPIC IF NOT EXISTS %s WITH %s", + topicName, buildTopicAttributesClause(properties)) + : String.format( + "CREATE TOPIC %s WITH %s", topicName, buildTopicAttributesClause(properties)); session.executeNonQueryStatement(sql); } + protected void alterTopic(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException { + IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result + if (Objects.isNull(properties) || properties.isEmpty()) { + throw new StatementExecutionException("Topic attributes should not be empty in ALTER TOPIC."); + } + final String sql = + String.format("ALTER TOPIC %s WITH %s", topicName, buildTopicAttributesClause(properties)); + session.executeNonQueryStatement(sql); + } + + protected void alterTopicOwner( + final String topicName, final String ownerId, final long ownerEpoch) + throws IoTDBConnectionException, StatementExecutionException { + alterTopicOwner(topicName, ownerId, ownerEpoch, null); + } + + protected void alterTopicOwner( + final String topicName, + final String ownerId, + final long ownerEpoch, + final Long ownerLeaseExpireTimeMs) + throws IoTDBConnectionException, StatementExecutionException { + if (Objects.isNull(ownerId) || ownerId.isEmpty()) { + throw new StatementExecutionException("Topic owner id should not be empty."); + } + + final Properties properties = new Properties(); + properties.put(TopicConstant.OWNER_ID_KEY, ownerId); + properties.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + if (Objects.nonNull(ownerLeaseExpireTimeMs)) { + properties.put( + TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, String.valueOf(ownerLeaseExpireTimeMs)); + } + alterTopic(topicName, properties); + } + protected void dropTopic(final String topicName) throws IoTDBConnectionException, StatementExecutionException { IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result @@ -231,4 +258,27 @@ private Set convertDataSetToSubscriptions(final SessionDataSet dat } return subscriptions; } + + private static String buildTopicAttributesClause(final Properties properties) { + final StringBuilder builder = new StringBuilder(); + builder.append('('); + properties.forEach( + (key, value) -> + builder + .append('\'') + .append(escapeSqlStringLiteral(String.valueOf(key))) + .append('\'') + .append('=') + .append('\'') + .append(escapeSqlStringLiteral(String.valueOf(value))) + .append('\'') + .append(',')); + builder.deleteCharAt(builder.length() - 1); + builder.append(')'); + return builder.toString(); + } + + private static String escapeSqlStringLiteral(final String value) { + return value.replace("'", "''"); + } } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java index 2dac0f5106b96..2ec7471127add 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java @@ -99,6 +99,49 @@ void createTopic(final String topicName, final Properties properties) void createTopicIfNotExists(final String topicName, final Properties properties) throws IoTDBConnectionException, StatementExecutionException; + /** + * Alters a topic with the specified properties. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param properties A {@link Properties} object containing the topic properties to alter. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopic(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException; + + /** + * Transfers a topic owner to the specified owner id and epoch. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param ownerId The new topic owner id. + * @param ownerEpoch The new monotonically increasing topic owner epoch. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopicOwner(final String topicName, final String ownerId, final long ownerEpoch) + throws IoTDBConnectionException, StatementExecutionException; + + /** + * Transfers a topic owner to the specified owner id, epoch, and lease expire time. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param ownerId The new topic owner id. + * @param ownerEpoch The new monotonically increasing topic owner epoch. + * @param ownerLeaseExpireTimeMs The owner lease expire time in milliseconds. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopicOwner( + final String topicName, + final String ownerId, + final long ownerEpoch, + final Long ownerLeaseExpireTimeMs) + throws IoTDBConnectionException, StatementExecutionException; + /** * Drops the specified topic. * diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java index 799314769e8eb..7a827a207e843 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java @@ -99,6 +99,49 @@ void createTopic(final String topicName, final Properties properties) void createTopicIfNotExists(final String topicName, final Properties properties) throws IoTDBConnectionException, StatementExecutionException; + /** + * Alters a topic with the specified properties. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param properties A {@link Properties} object containing the topic properties to alter. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopic(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException; + + /** + * Transfers a topic owner to the specified owner id and epoch. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param ownerId The new topic owner id. + * @param ownerEpoch The new monotonically increasing topic owner epoch. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopicOwner(final String topicName, final String ownerId, final long ownerEpoch) + throws IoTDBConnectionException, StatementExecutionException; + + /** + * Transfers a topic owner to the specified owner id, epoch, and lease expire time. + * + * @param topicName If the topic name contains single quotes, the passed parameter needs to be + * enclosed in backticks. + * @param ownerId The new topic owner id. + * @param ownerEpoch The new monotonically increasing topic owner epoch. + * @param ownerLeaseExpireTimeMs The owner lease expire time in milliseconds. + * @throws IoTDBConnectionException If a connection issue occurs. + * @throws StatementExecutionException If the SQL statement execution fails. + */ + void alterTopicOwner( + final String topicName, + final String ownerId, + final long ownerEpoch, + final Long ownerLeaseExpireTimeMs) + throws IoTDBConnectionException, StatementExecutionException; + /** * Drops the specified topic. * diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java index bf21a43b26f81..b666297052d83 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java @@ -74,6 +74,28 @@ public void createTopicIfNotExists(final String topicName, final Properties prop super.createTopicIfNotExists(topicName, properties); } + @Override + public void alterTopic(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopic(topicName, properties); + } + + @Override + public void alterTopicOwner(final String topicName, final String ownerId, final long ownerEpoch) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopicOwner(topicName, ownerId, ownerEpoch); + } + + @Override + public void alterTopicOwner( + final String topicName, + final String ownerId, + final long ownerEpoch, + final Long ownerLeaseExpireTimeMs) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopicOwner(topicName, ownerId, ownerEpoch, ownerLeaseExpireTimeMs); + } + @Override public void dropTopic(final String topicName) throws IoTDBConnectionException, StatementExecutionException { diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java index 900a243434d1c..3b921c5270a3e 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSession.java @@ -97,6 +97,28 @@ public void createTopicIfNotExists(final String topicName, final Properties prop super.createTopicIfNotExists(topicName, properties); } + @Override + public void alterTopic(final String topicName, final Properties properties) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopic(topicName, properties); + } + + @Override + public void alterTopicOwner(final String topicName, final String ownerId, final long ownerEpoch) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopicOwner(topicName, ownerId, ownerEpoch); + } + + @Override + public void alterTopicOwner( + final String topicName, + final String ownerId, + final long ownerEpoch, + final Long ownerLeaseExpireTimeMs) + throws IoTDBConnectionException, StatementExecutionException { + super.alterTopicOwner(topicName, ownerId, ownerEpoch, ownerLeaseExpireTimeMs); + } + @Override public void dropTopic(final String topicName) throws IoTDBConnectionException, StatementExecutionException { diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index c6271c134cb00..186675d88a3c9 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -59,7 +59,7 @@ ddlStatement // Pipe Plugin | createPipePlugin | dropPipePlugin | showPipePlugins // Subscription - | createTopic | dropTopic | showTopics | showSubscriptions | dropSubscription + | createTopic | alterTopic | dropTopic | showTopics | showSubscriptions | dropSubscription // CQ | createContinuousQuery | dropContinuousQuery | showContinuousQueries // Cluster @@ -718,6 +718,10 @@ createTopic : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause? ; +alterTopic + : ALTER TOPIC topicName=identifier topicAttributesClause + ; + topicAttributesClause : WITH LR_BRACKET topicAttributeClause (COMMA topicAttributeClause)* RR_BRACKET ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2db0255e35ac4..9e59bb9e51565 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -149,6 +149,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters; @@ -2459,6 +2460,14 @@ public TSStatus createTopic(TCreateTopicReq req) { : status; } + @Override + public TSStatus alterTopic(TAlterTopicReq req) { + TSStatus status = confirmLeader(); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? subscriptionManager.getSubscriptionCoordinator().alterTopic(req) + : status; + } + @Override public TSStatus dropTopic(TDropTopicReq req) { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 4dce39a9e98f0..fa708dbaeca83 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -70,6 +70,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp; @@ -803,6 +804,9 @@ TPermissionInfoResp login( /** Create Topic. */ TSStatus createTopic(TCreateTopicReq topic); + /** Alter Topic. */ + TSStatus alterTopic(TAlterTopicReq req); + /** Drop Topic. */ TSStatus dropTopic(TDropTopicReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 3f0ba82fa768c..78e43ce8f470a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -42,6 +42,7 @@ import org.apache.iotdb.commons.schema.template.Template; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; @@ -119,6 +120,7 @@ import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.ConsumerGroupMetaSyncProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.subscription.CreateSubscriptionProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.subscription.DropSubscriptionProcedure; +import org.apache.iotdb.confignode.procedure.impl.subscription.topic.AlterTopicProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.topic.CreateTopicProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.topic.DropTopicProcedure; import org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime.TopicMetaSyncProcedure; @@ -136,6 +138,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; @@ -1733,6 +1736,35 @@ public TSStatus createTopic(TCreateTopicReq req) { } } + public TSStatus alterTopic(TAlterTopicReq req) { + try { + final TopicMeta updatedTopicMeta = + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .buildAlteredTopicMeta(req); + if (updatedTopicMeta == null) { + return new TSStatus(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode()) + .setMessage( + String.format( + "Failed to alter topic %s, the topic is not existed", req.getTopicName())); + } + + AlterTopicProcedure procedure = new AlterTopicProcedure(updatedTopicMeta); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } else { + return new TSStatus(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode()) + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); + } + } catch (Exception e) { + return new TSStatus(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode()) + .setMessage(e.getMessage()); + } + } + public TSStatus dropTopic(String topicName) { try { DropTopicProcedure procedure = new DropTopicProcedure(topicName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java index 83031570fa61e..5f0cabfff8a53 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.subscription; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowSubscriptionPlan; import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan; import org.apache.iotdb.confignode.consensus.response.subscription.SubscriptionTableResp; @@ -28,6 +29,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinatorLock; import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq; @@ -149,6 +151,25 @@ public TSStatus createTopic(TCreateTopicReq req) { return status; } + public TSStatus alterTopic(TAlterTopicReq req) { + final TSStatus status = configManager.getProcedureManager().alterTopic(req); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "Failed to alter topic {} with attributes {}, result status is {}.", + req.getTopicName(), + req.getTopicAttributes(), + status); + } + return status; + } + + public TopicMeta buildAlteredTopicMeta(TAlterTopicReq req) { + final TopicMeta existedTopicMeta = subscriptionInfo.deepCopyTopicMeta(req.getTopicName()); + return existedTopicMeta == null + ? null + : existedTopicMeta.deepCopyWithUpdatedAttributes(req.getTopicAttributes()); + } + public TSStatus dropTopic(TDropTopicReq req) { final String topicName = req.getTopicName(); final boolean isSetIfExistsCondition = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index dbb35839045d0..4a4c4f59e2161 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -102,6 +102,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerRelationalReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq; @@ -1268,6 +1269,11 @@ public TSStatus createTopic(TCreateTopicReq req) { return configManager.createTopic(req); } + @Override + public TSStatus alterTopic(TAlterTopicReq req) { + return configManager.alterTopic(req); + } + @Override public TSStatus dropTopic(String topicName) { return configManager.dropTopic( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java index 768eab6ea50d8..d78fe77af9d86 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java @@ -60,6 +60,32 @@ public void testAlterTopicRejectsOwnerEpochRollback() { Assert.assertEquals(6L, subscriptionInfo.getTopicMeta(topicName).getOwnerEpoch()); } + @Test + public void testAlterTopicTransfersOwnerWithUpdatedAttributes() { + final String topicName = "topic-" + UUID.randomUUID(); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); + + final TopicMeta initialTopicMeta = createTopicMeta(topicName, "sn1", 5L); + initialTopicMeta.getConfig().getAttribute().put(TopicConstant.PATH_KEY, "root.sg.**"); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.createTopic(new CreateTopicPlan(initialTopicMeta)).getCode()); + + final Map updatedAttributes = new HashMap<>(); + updatedAttributes.put(TopicConstant.OWNER_ID_KEY, "sn2"); + updatedAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "6"); + final TSStatus alterStatus = + subscriptionInfo.alterTopic( + new AlterTopicPlan(initialTopicMeta.deepCopyWithUpdatedAttributes(updatedAttributes))); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), alterStatus.getCode()); + Assert.assertEquals("sn2", subscriptionInfo.getTopicMeta(topicName).getOwnerId()); + Assert.assertEquals(6L, subscriptionInfo.getTopicMeta(topicName).getOwnerEpoch()); + Assert.assertEquals( + "root.sg.**", + subscriptionInfo.getTopicMeta(topicName).getConfig().getString(TopicConstant.PATH_KEY)); + } + private TopicMeta createTopicMeta( final String topicName, final String ownerId, final long ownerEpoch) { final Map topicAttributes = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1f0b09f0b8689..7685864080164 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -59,6 +59,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerRelationalReq; import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq; @@ -1207,6 +1208,12 @@ public TSStatus createTopic(TCreateTopicReq req) throws TException { () -> client.createTopic(req), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus alterTopic(TAlterTopicReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.alterTopic(req), status -> !updateConfigNodeLeader(status)); + } + @Override public TSStatus dropTopic(String topicName) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 827590e86a011..4eff93bfc12d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -140,6 +140,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StartPipeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StopPipeTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.AlterTopicTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask; @@ -154,6 +155,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; @@ -1392,6 +1394,17 @@ public IConfigTask visitCreateTopic(CreateTopic node, MPPQueryContext context) { return new CreateTopicTask(node); } + @Override + public IConfigTask visitAlterTopic(AlterTopic node, MPPQueryContext context) { + context.setQueryType(QueryType.OTHER); + accessControl.checkUserGlobalSysPrivilege(context); + + node.getTopicAttributes() + .put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + + return new AlterTopicTask(node); + } + @Override public IConfigTask visitDropTopic(DropTopic node, MPPQueryContext context) { context.setQueryType(QueryType.OTHER); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 8b7792bca985a..28745712d220c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -123,6 +123,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.SetThrottleQuotaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.AlterTopicTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask; @@ -192,6 +193,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -730,6 +732,16 @@ public IConfigTask visitCreateTopic( return new CreateTopicTask(createTopicStatement); } + @Override + public IConfigTask visitAlterTopic( + AlterTopicStatement alterTopicStatement, MPPQueryContext context) { + alterTopicStatement + .getTopicAttributes() + .put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + + return new AlterTopicTask(alterTopicStatement); + } + @Override public IConfigTask visitDropTopic( DropTopicStatement dropTopicStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 15a538b98b42b..59f50d8d6cb69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -106,6 +106,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterTimeSeriesReq; +import org.apache.iotdb.confignode.rpc.thrift.TAlterTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp; @@ -291,6 +292,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -2879,6 +2881,34 @@ public SettableFuture createTopic( return future; } + @Override + public SettableFuture alterTopic( + final AlterTopicStatement alterTopicStatement) { + if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) { + return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE; + } + + final SettableFuture future = SettableFuture.create(); + + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus tsStatus = + configNodeClient.alterTopic( + new TAlterTopicReq() + .setTopicName(alterTopicStatement.getTopicName()) + .setTopicAttributes(alterTopicStatement.getTopicAttributes()) + .setSubscribedConsumerGroupIds(Collections.emptySet())); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + future.setException(new IoTDBException(tsStatus)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (Exception e) { + future.setException(e); + } + return future; + } + @Override public SettableFuture dropTopic(final DropTopicStatement dropTopicStatement) { if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index b45209aeffed8..4e551983d735d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -76,6 +76,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StopPipeStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -243,6 +244,8 @@ SettableFuture dropSubscription( SettableFuture createTopic(CreateTopicStatement createTopicStatement); + SettableFuture alterTopic(AlterTopicStatement alterTopicStatement); + SettableFuture dropTopic(DropTopicStatement dropTopicStatement); SettableFuture showTopics(ShowTopicsStatement showTopicsStatement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/AlterTopicTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/AlterTopicTask.java new file mode 100644 index 0000000000000..33ae342267be2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/AlterTopicTask.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; + +import com.google.common.util.concurrent.ListenableFuture; + +public class AlterTopicTask implements IConfigTask { + + private final AlterTopicStatement alterTopicStatement; + + public AlterTopicTask(final AlterTopicStatement alterTopicStatement) { + this.alterTopicStatement = alterTopicStatement; + } + + public AlterTopicTask(final AlterTopic alterTopic) { + this.alterTopicStatement = new AlterTopicStatement(); + this.alterTopicStatement.setTopicName(alterTopic.getTopicName()); + this.alterTopicStatement.setTopicAttributes(alterTopic.getTopicAttributes()); + } + + @Override + public ListenableFuture execute(final IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.alterTopic(alterTopicStatement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 4a82438b5deec..1df09d9641142 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -214,6 +214,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -4425,6 +4426,23 @@ public Statement visitCreateTopic(IoTDBSqlParser.CreateTopicContext ctx) { return createTopicStatement; } + @Override + public Statement visitAlterTopic(IoTDBSqlParser.AlterTopicContext ctx) { + final AlterTopicStatement alterTopicStatement = new AlterTopicStatement(); + + if (ctx.topicName != null) { + alterTopicStatement.setTopicName(parseIdentifier(ctx.topicName.getText())); + } else { + throw new SemanticException( + "Not support for this sql in ALTER TOPIC, please enter topicName."); + } + + alterTopicStatement.setTopicAttributes( + parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())); + + return alterTopicStatement; + } + private Map parseTopicAttributesClause( List contexts) { final Map collectorMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index d0779ee24a5b8..ac1e8dfc473bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -147,6 +147,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AsofJoinOn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; @@ -4837,6 +4838,11 @@ public Scope visitCreateTopic(CreateTopic node, Optional context) { return createAndAssignScope(node, context); } + @Override + public Scope visitAlterTopic(AlterTopic node, Optional context) { + return createAndAssignScope(node, context); + } + @Override public Scope visitDropTopic(DropTopic node, Optional context) { return createAndAssignScope(node, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index 68658529e71c4..49544f28d1080 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -120,6 +120,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -882,6 +883,12 @@ public TSStatus visitCreateTopic(CreateTopicStatement statement, TreeAccessCheck context.setAuditLogOperation(AuditLogOperation.DDL), statement::getTopicName); } + @Override + public TSStatus visitAlterTopic(AlterTopicStatement statement, TreeAccessCheckContext context) { + return checkPipeManagement( + context.setAuditLogOperation(AuditLogOperation.DDL), statement::getTopicName); + } + @Override public TSStatus visitShowTopics(ShowTopicsStatement statement, TreeAccessCheckContext context) { return checkPipeManagement( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterTopic.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterTopic.java new file mode 100644 index 0000000000000..8598c1525266d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AlterTopic.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.AstMemoryEstimationHelper; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.IAstVisitor; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Map; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class AlterTopic extends SubscriptionStatement { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AlterTopic.class); + + private final String topicName; + private final Map topicAttributes; + + public AlterTopic(final String topicName, final Map topicAttributes) { + this.topicName = requireNonNull(topicName, "topic name can not be null"); + this.topicAttributes = requireNonNull(topicAttributes, "topic attributes can not be null"); + } + + public String getTopicName() { + return topicName; + } + + public Map getTopicAttributes() { + return topicAttributes; + } + + @Override + public R accept(final IAstVisitor visitor, final C context) { + return ((AstVisitor) visitor).visitAlterTopic(this, context); + } + + @Override + public int hashCode() { + return Objects.hash(topicName, topicAttributes); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final AlterTopic that = (AlterTopic) obj; + return Objects.equals(this.topicName, that.topicName) + && Objects.equals(this.topicAttributes, that.topicAttributes); + } + + @Override + public String toString() { + return toStringHelper(this) + .add("topicName", topicName) + .add("topicAttributes", topicAttributes) + .toString(); + } + + @Override + public long ramBytesUsed() { + long size = INSTANCE_SIZE; + size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += RamUsageEstimator.sizeOf(topicName); + size += RamUsageEstimator.sizeOfMap(topicAttributes); + return size; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 7c311fa7f62b6..c00b7f4d492a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -343,6 +343,10 @@ default R visitCreateTopic(CreateTopic node, C context) { return visitStatement(node, context); } + default R visitAlterTopic(AlterTopic node, C context) { + return visitStatement(node, context); + } + default R visitDropTopic(DropTopic node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 25d68eba6beb5..30b61967e1a10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -158,6 +158,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AsofJoinOn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; @@ -1390,6 +1391,13 @@ public Node visitCreateTopicStatement(RelationalSqlParser.CreateTopicStatementCo return new CreateTopic(topicName, hasIfNotExistsCondition, topicAttributes); } + @Override + public Node visitAlterTopicStatement(RelationalSqlParser.AlterTopicStatementContext ctx) { + final String topicName = ((Identifier) visit(ctx.identifier())).getValue(); + return new AlterTopic( + topicName, parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())); + } + private Map parseTopicAttributesClause( List contexts) { final Map tppicMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/DataNodeSqlFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/DataNodeSqlFormatter.java index b806fd725928e..887a6082a5d5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/DataNodeSqlFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/DataNodeSqlFormatter.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; @@ -771,6 +772,31 @@ public Void visitCreateTopic(CreateTopic node, Integer context) { return null; } + @Override + public Void visitAlterTopic(AlterTopic node, Integer context) { + builder.append("ALTER TOPIC "); + builder.append(node.getTopicName()); + builder.append(" \n"); + + builder + .append("WITH (") + .append("\n") + .append( + node.getTopicAttributes().entrySet().stream() + .map( + entry -> + indentString(1) + + "\"" + + entry.getKey() + + "\" = \"" + + entry.getValue() + + "\"") + .collect(joining(", " + "\n"))) + .append(")\n"); + + return null; + } + @Override public Void visitDropTopic(DropTopic node, Integer context) { builder.append("DROP TOPIC "); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index b40c6444816fc..88a2959f1475b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -184,6 +184,7 @@ public enum StatementType { REPAIR_DATA_PARTITION_TABLE, CREATE_TOPIC, + ALTER_TOPIC, DROP_TOPIC, SHOW_TOPICS, SHOW_SUBSCRIPTIONS, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 847e850c52172..39af259928b6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -107,6 +107,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement; @@ -644,6 +645,10 @@ public R visitCreateTopic(CreateTopicStatement createTopicStatement, C context) return visitStatement(createTopicStatement, context); } + public R visitAlterTopic(AlterTopicStatement alterTopicStatement, C context) { + return visitStatement(alterTopicStatement, context); + } + public R visitDropTopic(DropTopicStatement dropTopicStatement, C context) { return visitStatement(dropTopicStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/AlterTopicStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/AlterTopicStatement.java new file mode 100644 index 0000000000000..dd06b3034b00b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/AlterTopicStatement.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class AlterTopicStatement extends Statement implements IConfigStatement { + + private String topicName; + private Map topicAttributes; + + public AlterTopicStatement() { + super(); + statementType = StatementType.ALTER_TOPIC; + } + + public String getTopicName() { + return topicName; + } + + public Map getTopicAttributes() { + return topicAttributes; + } + + public void setTopicName(final String topicName) { + this.topicName = topicName; + } + + public void setTopicAttributes(final Map topicAttributes) { + this.topicAttributes = topicAttributes; + } + + @Override + public QueryType getQueryType() { + return QueryType.OTHER; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } + + @Override + public R accept(final StatementVisitor visitor, final C context) { + return visitor.visitAlterTopic(this, context); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java index b98f34a2484d9..574308f5ec45d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteTimeSeriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.AlterTopicStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSchemaTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DropSchemaTemplateStatement; @@ -121,6 +122,20 @@ public class StatementGeneratorTest { + @Test + public void testAlterTopicStatement() { + final Statement statement = + StatementGenerator.createStatement( + "ALTER TOPIC topic1 WITH ('owner-id'='sn2','owner-epoch'='6')", + ZonedDateTime.now().getOffset()); + + Assert.assertTrue(statement instanceof AlterTopicStatement); + final AlterTopicStatement alterTopicStatement = (AlterTopicStatement) statement; + Assert.assertEquals("topic1", alterTopicStatement.getTopicName()); + Assert.assertEquals("sn2", alterTopicStatement.getTopicAttributes().get("owner-id")); + Assert.assertEquals("6", alterTopicStatement.getTopicAttributes().get("owner-epoch")); + } + @Test public void testShowDiskUsage() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/TopicStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/TopicStatementTest.java new file mode 100644 index 0000000000000..66aac145968ef --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/TopicStatementTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.parser; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterTopic; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.ZoneId; + +public class TopicStatementTest { + + private final SqlParser sqlParser = new SqlParser(); + + @Test + public void testAlterTopicStatement() { + final Statement statement = + sqlParser.createStatement( + "ALTER TOPIC topic1 WITH ('owner-id'='sn2','owner-epoch'='6')", + ZoneId.systemDefault(), + null); + + Assert.assertTrue(statement instanceof AlterTopic); + final AlterTopic alterTopic = (AlterTopic) statement; + Assert.assertEquals("topic1", alterTopic.getTopicName()); + Assert.assertEquals("sn2", alterTopic.getTopicAttributes().get("owner-id")); + Assert.assertEquals("6", alterTopic.getTopicAttributes().get("owner-epoch")); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 44222a5a697fc..2c1ddc6e763d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -88,6 +88,17 @@ public TopicMeta deepCopy() { return copied; } + public TopicMeta deepCopyWithUpdatedAttributes(final Map updatedAttributes) { + final Map copiedAttributes = new HashMap<>(config.getAttribute()); + if (Objects.nonNull(updatedAttributes)) { + copiedAttributes.putAll(updatedAttributes); + } + + final TopicMeta copied = new TopicMeta(topicName, creationTime, copiedAttributes); + copied.subscribedConsumerGroupIds = new HashSet<>(subscribedConsumerGroupIds); + return copied; + } + public String getTopicName() { return topicName; } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index 0511edff6203e..d20464ee9c6ee 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -101,6 +101,29 @@ public void testTopicOwnerDeSer() throws IOException { topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); } + @Test + public void testDeepCopyWithUpdatedAttributesTransfersOwnerAndPreservesExistingConfig() { + final Map topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.PATH_KEY, "root.sg.**"); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, "sn1"); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "5"); + final TopicMeta topicMeta = new TopicMeta("test_topic", 1, topicAttributes); + topicMeta.addSubscribedConsumerGroup("group1"); + + final Map updatedAttributes = new HashMap<>(); + updatedAttributes.put(TopicConstant.OWNER_ID_KEY, "sn2"); + updatedAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "6"); + final TopicMeta updatedTopicMeta = topicMeta.deepCopyWithUpdatedAttributes(updatedAttributes); + + Assert.assertEquals( + "root.sg.**", updatedTopicMeta.getConfig().getString(TopicConstant.PATH_KEY)); + Assert.assertEquals("sn2", updatedTopicMeta.getOwnerId()); + Assert.assertEquals(6L, updatedTopicMeta.getOwnerEpoch()); + Assert.assertTrue(updatedTopicMeta.isSubscribedByConsumerGroup("group1")); + Assert.assertEquals("sn1", topicMeta.getOwnerId()); + Assert.assertEquals(5L, topicMeta.getOwnerEpoch()); + } + @Test public void testSequentialTopicMetaDeserializeDoesNotConsumeNextTopic() throws IOException { final TopicMeta firstTopicMeta = new TopicMeta("first_topic", 1, new HashMap<>()); diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index f984e825fc7ad..af2d4986a4917 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -108,6 +108,7 @@ statement // Subscription Statement | createTopicStatement + | alterTopicStatement | dropTopicStatement | showTopicsStatement | showSubscriptionsStatement @@ -537,6 +538,10 @@ createTopicStatement : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause? ; +alterTopicStatement + : ALTER TOPIC topicName=identifier topicAttributesClause + ; + topicAttributesClause : WITH '(' topicAttributeClause (',' topicAttributeClause)* ')' ; diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb737a..2af5e4c6ccc10 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1930,6 +1930,9 @@ service IConfigNodeRPCService { /** Create Topic */ common.TSStatus createTopic(TCreateTopicReq req) + /** Alter Topic */ + common.TSStatus alterTopic(TAlterTopicReq req) + /** Drop Topic */ common.TSStatus dropTopic(string topicName) @@ -2071,4 +2074,3 @@ service IConfigNodeRPCService { common.TSStatus createTableView(TCreateTableViewReq req) } - From dbbf7977f18aff606fb90baf9b52470ccc9d2a56 Mon Sep 17 00:00:00 2001 From: OpenAI Codex Date: Thu, 4 Jun 2026 05:45:04 +0000 Subject: [PATCH 3/3] Add topic owner update show test --- .../subscription/SubscriptionInfoTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java index d78fe77af9d86..a197c1a46f3f7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfoTest.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; +import org.apache.iotdb.confignode.consensus.response.subscription.TopicTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.TopicConstant; @@ -86,6 +89,51 @@ public void testAlterTopicTransfersOwnerWithUpdatedAttributes() { subscriptionInfo.getTopicMeta(topicName).getConfig().getString(TopicConstant.PATH_KEY)); } + @Test + public void testAlterTopicOwnerAndShowTopicOwner() { + final String topicName = "topic-" + UUID.randomUUID(); + final long ownerLeaseExpireTimeMs = 123456789L; + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(); + + final TopicMeta initialTopicMeta = createTopicMeta(topicName, "sn1", 5L); + initialTopicMeta.getConfig().getAttribute().put(TopicConstant.PATH_KEY, "root.sg.**"); + initialTopicMeta.getConfig().getAttribute().put(TopicConstant.START_TIME_KEY, "0"); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo.createTopic(new CreateTopicPlan(initialTopicMeta)).getCode()); + + final Map updatedAttributes = new HashMap<>(); + updatedAttributes.put(TopicConstant.OWNER_ID_KEY, "sn2"); + updatedAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "6"); + updatedAttributes.put( + TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, String.valueOf(ownerLeaseExpireTimeMs)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + subscriptionInfo + .alterTopic( + new AlterTopicPlan( + initialTopicMeta.deepCopyWithUpdatedAttributes(updatedAttributes))) + .getCode()); + + final TShowTopicResp showTopicResp = + ((TopicTableResp) subscriptionInfo.showTopics()).convertToTShowTopicResp(); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), showTopicResp.status.code); + Assert.assertEquals(1, showTopicResp.getTopicInfoListSize()); + + final TShowTopicInfo showTopicInfo = showTopicResp.getTopicInfoList().get(0); + Assert.assertEquals(topicName, showTopicInfo.getTopicName()); + Assert.assertEquals(1L, showTopicInfo.getCreationTime()); + Assert.assertTrue(showTopicInfo.getTopicAttributes().contains("path=root.sg.**")); + Assert.assertTrue(showTopicInfo.getTopicAttributes().contains("start-time=0")); + Assert.assertTrue(showTopicInfo.getTopicAttributes().contains("owner-id=sn2")); + Assert.assertTrue(showTopicInfo.getTopicAttributes().contains("owner-epoch=6")); + Assert.assertTrue( + showTopicInfo + .getTopicAttributes() + .contains("owner-lease-expire-time-ms=" + ownerLeaseExpireTimeMs)); + } + private TopicMeta createTopicMeta( final String topicName, final String ownerId, final long ownerEpoch) { final Map topicAttributes = new HashMap<>();