diff --git a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java index 0eeed37f027..e84742951d9 100644 --- a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java +++ b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java @@ -156,8 +156,8 @@ public static void deleteNode(CuratorFramework zk, String path) { zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path)); } } catch (Exception e) { - if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) { - // do nothing + if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { + // Node was deleted concurrently between the exists check and the delete. LOG.info("delete {} failed.", path, e); } else { throw Utils.wrapInRuntime(e); @@ -189,13 +189,13 @@ public static Stat setData(CuratorFramework zk, String path, byte[] data) { public static Integer getVersion(CuratorFramework zk, String path, boolean watch) throws Exception { String npath = normalizePath(path); - Stat stat = null; - if (existsNode(zk, npath, watch)) { - if (watch) { - stat = zk.checkExists().watched().forPath(npath); - } else { - stat = zk.checkExists().forPath(npath); - } + // checkExists returns the Stat directly (null when absent) and still arms the watch when + // watch is true. request 1. + Stat stat; + if (watch) { + stat = zk.checkExists().watched().forPath(npath); + } else { + stat = zk.checkExists().forPath(npath); // request 2. } return stat == null ? null : Integer.valueOf(stat.getVersion()); } @@ -214,23 +214,23 @@ public static List getChildren(CuratorFramework zk, String path, boolean } public static byte[] getData(CuratorFramework zk, String path, boolean watch) { + String npath = normalizePath(path); try { - String npath = normalizePath(path); - if (existsNode(zk, npath, watch)) { - if (watch) { - return zk.getData().watched().forPath(npath); - } else { - return zk.getData().forPath(npath); - } + if (watch) { + return zk.getData().watched().forPath(npath); + } else { + return zk.getData().forPath(npath); } } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { - // this is fine b/c we still have a watch from the successful exists call - } else { - throw Utils.wrapInRuntime(e); + // Node is absent. A watcher can notify when the node is announced. + if (watch) { + existsNode(zk, npath, true); + } + return null; } + throw Utils.wrapInRuntime(e); } - return null; } /** @@ -243,26 +243,27 @@ public static byte[] getData(CuratorFramework zk, String path, boolean watch) { */ public static VersionedData getDataWithVersion(CuratorFramework zk, String path, boolean watch) { VersionedData data = null; + String npath = normalizePath(path); + Stat stats = new Stat(); try { - byte[] bytes = null; - Stat stats = new Stat(); - String npath = normalizePath(path); - if (existsNode(zk, npath, watch)) { - if (watch) { - bytes = zk.getData().storingStatIn(stats).watched().forPath(npath); - } else { - bytes = zk.getData().storingStatIn(stats).forPath(npath); - } - if (bytes != null) { - int version = stats.getVersion(); - data = new VersionedData<>(version, bytes); - } + byte[] bytes; + if (watch) { + bytes = zk.getData().storingStatIn(stats).watched().forPath(npath); + } else { + bytes = zk.getData().storingStatIn(stats).forPath(npath); + } + if (bytes != null) { + data = new VersionedData<>(stats.getVersion(), bytes); } } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { - // this is fine b/c we still have a watch from the successful exists call + // Node is absent. getData does not arm a watch when it fails, so when watching we + // set an existence watch separately to be notified if the node is created. + if (watch) { + existsNode(zk, npath, true); + } } else { - Utils.wrapInRuntime(e); + throw Utils.wrapInRuntime(e); } } return data; diff --git a/storm-server/src/test/java/org/apache/storm/cluster/ClusterStateTest.java b/storm-server/src/test/java/org/apache/storm/cluster/ClusterStateTest.java index 0908c016697..fff2fdf4bb4 100644 --- a/storm-server/src/test/java/org/apache/storm/cluster/ClusterStateTest.java +++ b/storm-server/src/test/java/org/apache/storm/cluster/ClusterStateTest.java @@ -49,6 +49,13 @@ import org.apache.storm.utils.ZookeeperAuthInfo; import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.storm.callback.WatcherCallBack; +import org.apache.storm.shade.org.apache.curator.framework.api.BackgroundVersionable; +import org.apache.storm.shade.org.apache.curator.framework.api.DeleteBuilder; +import org.apache.storm.shade.org.apache.curator.framework.api.ExistsBuilder; +import org.apache.storm.shade.org.apache.zookeeper.KeeperException; +import org.apache.storm.shade.org.apache.zookeeper.data.Stat; +import org.apache.storm.zookeeper.ClientZookeeper; import org.junit.jupiter.api.Test; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -482,4 +489,117 @@ public void testClusterAuthentication() throws Exception { } } + @Test + public void testGetVersion() throws Exception { + try (InProcessZookeeper zk = new InProcessZookeeper()) { + IStateStorage state = mkState(zk.getPort()); + + // absent node -> null version + assertNull(state.get_version("/v", false)); + + // first set_data creates the node at dataVersion 0 + state.set_data("/v", barr(1), OPEN_ACL); + assertEquals(Integer.valueOf(0), state.get_version("/v", false)); + + // each subsequent update bumps the version + state.set_data("/v", barr(2), OPEN_ACL); + assertEquals(Integer.valueOf(1), state.get_version("/v", false)); + state.set_data("/v", barr(3), OPEN_ACL); + assertEquals(Integer.valueOf(2), state.get_version("/v", false)); + + // deleted -> null again + state.delete_node("/v"); + assertNull(state.get_version("/v", false)); + + state.close(); + } + } + + @Test + public void testGetDataWithVersion() throws Exception { + try (InProcessZookeeper zk = new InProcessZookeeper()) { + IStateStorage state = mkState(zk.getPort()); + + // absent node -> null + assertNull(state.get_data_with_version("/v", false)); + + // created -> data with dataVersion 0 + state.set_data("/v", barr(1), OPEN_ACL); + VersionedData v0 = state.get_data_with_version("/v", false); + assertNotNull(v0); + assertArrayEquals(barr(1), v0.getData()); + assertEquals(0, v0.getVersion()); + + // updated -> new data and bumped version, consistent with get_version + state.set_data("/v", barr(2, 3), OPEN_ACL); + VersionedData v1 = state.get_data_with_version("/v", false); + assertNotNull(v1); + assertArrayEquals(barr(2, 3), v1.getData()); + assertEquals(1, v1.getVersion()); + assertEquals(Integer.valueOf(1), state.get_version("/v", false)); + + // deleted -> null + state.delete_node("/v"); + assertNull(state.get_data_with_version("/v", false)); + + state.close(); + } + } + + // concurrent delete + @Test + public void testDeleteNodeSwallowsConcurrentDelete() throws Exception { + // Simulates the race where the node is present at the exists-check but removed by another + // client before the delete runs: deleteNode must swallow NoNodeException, not rethrow it. + CuratorFramework zk = Mockito.mock(CuratorFramework.class); + + ExistsBuilder existsBuilder = Mockito.mock(ExistsBuilder.class); + Mockito.when(zk.checkExists()).thenReturn(existsBuilder); + Mockito.when(existsBuilder.forPath("/race")).thenReturn(new Stat()); + + DeleteBuilder deleteBuilder = Mockito.mock(DeleteBuilder.class); + BackgroundVersionable childrenDeletable = Mockito.mock(BackgroundVersionable.class); + Mockito.when(zk.delete()).thenReturn(deleteBuilder); + Mockito.when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable); + Mockito.when(childrenDeletable.forPath("/race")).thenThrow(new KeeperException.NoNodeException()); + + assertDoesNotThrow(() -> ClientZookeeper.deleteNode(zk, "/race")); + // ensure to delete was actually attempted (the catch branch was exercised, not skipped) + Mockito.verify(deleteBuilder).deletingChildrenIfNeeded(); + } + + @Test + public void testGetDataArmsWatchOnAbsentNode() throws Exception { + try (InProcessZookeeper zk = new InProcessZookeeper()) { + Map conf = mkConfig(zk.getPort()); + AtomicReference> lastEvent = new AtomicReference<>(); + + WatcherCallBack watcher = (state, type, path) -> { + if (type != Watcher.Event.EventType.None) { + lastEvent.set(event(type, path)); + } + }; + + CuratorFramework client = ClientZookeeper.mkClient(conf, List.of("localhost"), + zk.getPort(), "", watcher, null, DaemonType.UNKNOWN); + try { + final String path = String.format("/node-%s", Time.currentTimeMillis()); + // absent node, the fallback must have armed a watch + assertNull(ClientZookeeper.getData(client, path, true)); + assertNull(lastEvent.get()); + + // create the node + ClientZookeeper.createNode(client, path, barr(1, 2, 3), OPEN_ACL); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> lastEvent.get() != null); + + // check the node is announced + assertEquals(event(Watcher.Event.EventType.NodeCreated, path), lastEvent.get()); + } finally { + client.close(); + } + } + } + }