Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public static void deleteNode(CuratorFramework zk, String path) {
zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path));
}
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
// do nothing
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// Node was deleted concurrently between the exists check and the delete.
LOG.info("delete {} failed.", path, e);
} else {
throw Utils.wrapInRuntime(e);
Expand Down Expand Up @@ -189,13 +189,13 @@ public static Stat setData(CuratorFramework zk, String path, byte[] data) {

public static Integer getVersion(CuratorFramework zk, String path, boolean watch) throws Exception {
String npath = normalizePath(path);
Stat stat = null;
if (existsNode(zk, npath, watch)) {
if (watch) {
stat = zk.checkExists().watched().forPath(npath);
} else {
stat = zk.checkExists().forPath(npath);
}
// checkExists returns the Stat directly (null when absent) and still arms the watch when
// watch is true. request 1.
Stat stat;
if (watch) {
stat = zk.checkExists().watched().forPath(npath);
} else {
stat = zk.checkExists().forPath(npath); // request 2.
}
return stat == null ? null : Integer.valueOf(stat.getVersion());
}
Expand All @@ -214,23 +214,23 @@ public static List<String> getChildren(CuratorFramework zk, String path, boolean
}

public static byte[] getData(CuratorFramework zk, String path, boolean watch) {
String npath = normalizePath(path);
try {
String npath = normalizePath(path);
if (existsNode(zk, npath, watch)) {
if (watch) {
return zk.getData().watched().forPath(npath);
} else {
return zk.getData().forPath(npath);
}
if (watch) {
return zk.getData().watched().forPath(npath);
} else {
return zk.getData().forPath(npath);
}
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// this is fine b/c we still have a watch from the successful exists call
} else {
throw Utils.wrapInRuntime(e);
// Node is absent. A watcher can notify when the node is announced.
if (watch) {
existsNode(zk, npath, true);
}
return null;
}
throw Utils.wrapInRuntime(e);
}
return null;
}

/**
Expand All @@ -243,26 +243,27 @@ public static byte[] getData(CuratorFramework zk, String path, boolean watch) {
*/
public static VersionedData<byte[]> getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
VersionedData<byte[]> data = null;
String npath = normalizePath(path);
Stat stats = new Stat();
try {
byte[] bytes = null;
Stat stats = new Stat();
String npath = normalizePath(path);
if (existsNode(zk, npath, watch)) {
if (watch) {
bytes = zk.getData().storingStatIn(stats).watched().forPath(npath);
} else {
bytes = zk.getData().storingStatIn(stats).forPath(npath);
}
if (bytes != null) {
int version = stats.getVersion();
data = new VersionedData<>(version, bytes);
}
byte[] bytes;
if (watch) {
bytes = zk.getData().storingStatIn(stats).watched().forPath(npath);
} else {
bytes = zk.getData().storingStatIn(stats).forPath(npath);
}
if (bytes != null) {
data = new VersionedData<>(stats.getVersion(), bytes);
}
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// this is fine b/c we still have a watch from the successful exists call
// Node is absent. getData does not arm a watch when it fails, so when watching we
// set an existence watch separately to be notified if the node is created.
if (watch) {
existsNode(zk, npath, true);
}
} else {
Utils.wrapInRuntime(e);
throw Utils.wrapInRuntime(e);
}
}
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.shade.org.apache.curator.framework.api.BackgroundVersionable;
import org.apache.storm.shade.org.apache.curator.framework.api.DeleteBuilder;
import org.apache.storm.shade.org.apache.curator.framework.api.ExistsBuilder;
import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
import org.apache.storm.shade.org.apache.zookeeper.data.Stat;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.junit.jupiter.api.Test;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
Expand Down Expand Up @@ -482,4 +489,117 @@ public void testClusterAuthentication() throws Exception {
}
}

@Test
public void testGetVersion() throws Exception {
try (InProcessZookeeper zk = new InProcessZookeeper()) {
IStateStorage state = mkState(zk.getPort());

// absent node -> null version
assertNull(state.get_version("/v", false));

// first set_data creates the node at dataVersion 0
state.set_data("/v", barr(1), OPEN_ACL);
assertEquals(Integer.valueOf(0), state.get_version("/v", false));

// each subsequent update bumps the version
state.set_data("/v", barr(2), OPEN_ACL);
assertEquals(Integer.valueOf(1), state.get_version("/v", false));
state.set_data("/v", barr(3), OPEN_ACL);
assertEquals(Integer.valueOf(2), state.get_version("/v", false));

// deleted -> null again
state.delete_node("/v");
assertNull(state.get_version("/v", false));

state.close();
}
}

@Test
public void testGetDataWithVersion() throws Exception {
try (InProcessZookeeper zk = new InProcessZookeeper()) {
IStateStorage state = mkState(zk.getPort());

// absent node -> null
assertNull(state.get_data_with_version("/v", false));

// created -> data with dataVersion 0
state.set_data("/v", barr(1), OPEN_ACL);
VersionedData<byte[]> v0 = state.get_data_with_version("/v", false);
assertNotNull(v0);
assertArrayEquals(barr(1), v0.getData());
assertEquals(0, v0.getVersion());

// updated -> new data and bumped version, consistent with get_version
state.set_data("/v", barr(2, 3), OPEN_ACL);
VersionedData<byte[]> v1 = state.get_data_with_version("/v", false);
assertNotNull(v1);
assertArrayEquals(barr(2, 3), v1.getData());
assertEquals(1, v1.getVersion());
assertEquals(Integer.valueOf(1), state.get_version("/v", false));

// deleted -> null
state.delete_node("/v");
assertNull(state.get_data_with_version("/v", false));

state.close();
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no test case that does: getData(path, watch=true) on a missing node, then creates the node, and verifies the callback was triggered,
What do you thinking about adding it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've committed the missing test case.
This test is crucial for preventing regressions. It verifies that if a getData call to zk fails with watched clause enabled, the fallback mechanism successfully registers the watcher manually (calling the method existsNode).


// concurrent delete
@Test
public void testDeleteNodeSwallowsConcurrentDelete() throws Exception {
// Simulates the race where the node is present at the exists-check but removed by another
// client before the delete runs: deleteNode must swallow NoNodeException, not rethrow it.
CuratorFramework zk = Mockito.mock(CuratorFramework.class);

ExistsBuilder existsBuilder = Mockito.mock(ExistsBuilder.class);
Mockito.when(zk.checkExists()).thenReturn(existsBuilder);
Mockito.when(existsBuilder.forPath("/race")).thenReturn(new Stat());

DeleteBuilder deleteBuilder = Mockito.mock(DeleteBuilder.class);
BackgroundVersionable childrenDeletable = Mockito.mock(BackgroundVersionable.class);
Mockito.when(zk.delete()).thenReturn(deleteBuilder);
Mockito.when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable);
Mockito.when(childrenDeletable.forPath("/race")).thenThrow(new KeeperException.NoNodeException());

assertDoesNotThrow(() -> ClientZookeeper.deleteNode(zk, "/race"));
// ensure to delete was actually attempted (the catch branch was exercised, not skipped)
Mockito.verify(deleteBuilder).deletingChildrenIfNeeded();
}

@Test
public void testGetDataArmsWatchOnAbsentNode() throws Exception {
try (InProcessZookeeper zk = new InProcessZookeeper()) {
Map<String, Object> conf = mkConfig(zk.getPort());
AtomicReference<Map<String, Object>> lastEvent = new AtomicReference<>();

WatcherCallBack watcher = (state, type, path) -> {
if (type != Watcher.Event.EventType.None) {
lastEvent.set(event(type, path));
}
};

CuratorFramework client = ClientZookeeper.mkClient(conf, List.of("localhost"),
zk.getPort(), "", watcher, null, DaemonType.UNKNOWN);
try {
final String path = String.format("/node-%s", Time.currentTimeMillis());
// absent node, the fallback must have armed a watch
assertNull(ClientZookeeper.getData(client, path, true));
assertNull(lastEvent.get());

// create the node
ClientZookeeper.createNode(client, path, barr(1, 2, 3), OPEN_ACL);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(() -> lastEvent.get() != null);

// check the node is announced
assertEquals(event(Watcher.Event.EventType.NodeCreated, path), lastEvent.get());
} finally {
client.close();
}
}
}

}
Loading