diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index e06313e9c2..2eeec9ed90 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -5,15 +5,16 @@ __all__ = [ "EntryEventCallable", + "FlakeIdGenerator", "Executor", "HazelcastClient", "List", "Map", + "ReliableMessageListener", + "ReliableTopic", "MultiMap", "PNCounter", "Queue", - "ReliableMessageListener", - "ReliableTopic", "ReplicatedMap", "Ringbuffer", "Set", @@ -21,6 +22,7 @@ ] from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.internal.asyncio_proxy.flake_id_generator import FlakeIdGenerator from hazelcast.internal.asyncio_proxy.executor import Executor from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map, EntryEventCallable diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 5d7bd327f7..e726a65666 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -24,6 +24,7 @@ ) from hazelcast.internal.asyncio_proxy.manager import ( EXECUTOR_SERVICE, + FLAKE_ID_GENERATOR_SERVICE, LIST_SERVICE, MAP_SERVICE, MULTI_MAP_SERVICE, @@ -39,6 +40,7 @@ ) from hazelcast.internal.asyncio_proxy.base import Proxy from hazelcast.internal.asyncio_proxy.executor import Executor +from hazelcast.internal.asyncio_proxy.flake_id_generator import FlakeIdGenerator from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.multi_map import MultiMap @@ -345,6 +347,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_flake_id_generator(self, name: str) -> FlakeIdGenerator: + """Returns the FlakeIdGenerator instance with the specified name. + + Args: + name: Name of the FlakeIdGenerator. + + Returns: + FlakeIdGenerator instance with the specified name. + """ + return await self._proxy_manager.get_or_create(FLAKE_ID_GENERATOR_SERVICE, name) + async def get_reliable_topic(self, name: str) -> ReliableTopic: """Returns the ReliableTopic instance with the specified name. @@ -356,7 +369,7 @@ async def get_reliable_topic(self, name: str) -> ReliableTopic: """ return await self._proxy_manager.get_or_create(RELIABLE_TOPIC_SERVICE, name) - async def get_ringbuffer(self, name: str) -> Ringbuffer: + async def get_ringbuffer(self, name: str) -> Ringbuffer[ItemType]: """Returns the distributed Ringbuffer instance with the specified name. Args: diff --git a/hazelcast/internal/asyncio_proxy/flake_id_generator.py b/hazelcast/internal/asyncio_proxy/flake_id_generator.py new file mode 100644 index 0000000000..fdea1b7bae --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/flake_id_generator.py @@ -0,0 +1,181 @@ +import asyncio +import collections +import itertools + +from hazelcast.config import FlakeIdGeneratorConfig +from hazelcast.internal.asyncio_proxy.base import MAX_SIZE, Proxy +from hazelcast.protocol.codec import flake_id_generator_new_id_batch_codec +from hazelcast.util import current_time + + +class FlakeIdGenerator(Proxy): + """A cluster-wide unique ID generator. Generated IDs are int values and + are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1. + + The IDs contain a timestamp component and a node ID component, which is + assigned when the member joins the cluster. This allows the IDs to be + ordered and unique without any coordination between members, which makes + the generator safe even in split-brain scenario. + + Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and + has 41 bits. This caps the useful lifespan of the generator to little less + than 70 years (until ~2088). The sequence component is 6 bits. If more + than 64 IDs are requested in single millisecond, IDs will gracefully + overflow to the next millisecond and uniqueness is guaranteed in this case. + The implementation does not allow overflowing by more than 15 seconds, if + IDs are requested at higher rate, the call will block. Note, however, that + clients are able to generate even faster because each call goes to a + different (random) member and the 64 IDs/ms limit is for single member. + + Node ID overflow: + It is possible to generate IDs on any member or client as long as there + is at least one member with join version smaller than 2^16 in the + cluster. The remedy is to restart the cluster: nodeId will be assigned + from zero again. Uniqueness after the restart will be preserved thanks + to the timestamp component. + + Warning: + Asyncio client FlakeIdGenerator proxy is not thread-safe, do not + access it from other threads. + """ + + _BITS_NODE_ID = 16 + _BITS_SEQUENCE = 6 + + def __init__(self, service_name, name, context): + super().__init__(service_name, name, context) + config = context.config.flake_id_generators.get(name, None) + if config is None: + config = FlakeIdGeneratorConfig() + self._auto_batcher = _AutoBatcher( + config.prefetch_count, config.prefetch_validity, self._new_id_batch + ) + + async def new_id(self) -> int: + """Generates and returns a cluster-wide unique ID. + + This method goes to a random member and gets a batch of IDs, which will + then be returned locally for a limited time. The pre-fetch size and + the validity time can be configured. + + Note: + Values returned from this method may not be strictly ordered. + + Returns: + New cluster-wide unique ID. + + Raises: + HazelcastError: If node ID for all members in the cluster is out + of valid range. See ``Node ID overflow`` note above. + """ + return await self._auto_batcher.new_id() + + def _new_id_batch(self, batch_size) -> asyncio.Future: + def handler(message): + response = flake_id_generator_new_id_batch_codec.decode_response(message) + return _IdBatch(response["base"], response["increment"], response["batch_size"]) + + request = flake_id_generator_new_id_batch_codec.encode_request(self.name, batch_size) + return self._invoke(request, handler) + + +class _AutoBatcher: + def __init__(self, batch_size, validity, id_generator): + self._batch_size = batch_size + self._validity = validity + self._batch_id_supplier = id_generator + self._block = _Block(_IdBatch(0, 0, 0), 0) + self._id_queue = collections.deque() + self._request_in_air = False + + async def new_id(self) -> int: + loop = asyncio.get_running_loop() + while True: + block = self._block + next_id = block.next_id() + if next_id is not None: + return next_id + + # In asyncio there is no preemption between non-await statements, + # so block cannot change here. The check mirrors the threading + # version for clarity. + if block is not self._block: + continue + + future = loop.create_future() + self._id_queue.append(future) + if not self._request_in_air: + self._request_in_air = True + self._request_new_batch() + return await future + + def _request_new_batch(self): + future = self._batch_id_supplier(self._batch_size) + future.add_done_callback(self._assign_new_block) + + def _assign_new_block(self, future): + try: + new_batch_required = False + id_batch = future.result() + block = _Block(id_batch, self._validity) + while True: + try: + f = self._id_queue.popleft() + next_id = block.next_id() + if next_id is not None: + f.set_result(next_id) + else: + self._id_queue.appendleft(f) + new_batch_required = True + break + except IndexError: + break + if new_batch_required: + self._request_in_air = True + self._request_new_batch() + else: + self._request_in_air = False + self._block = block + except Exception as ex: + while True: + try: + f = self._id_queue.popleft() + f.set_exception(ex) + except IndexError: + break + self._request_in_air = False + + +class _IdBatch: + def __init__(self, base, increment, batch_size): + self._base = base + self._increment = increment + self._batch_size = batch_size + + def __iter__(self): + self._remaining = itertools.count(self._batch_size, -1) + self._next_id = itertools.count(self._base, self._increment) + return self + + def __next__(self): + if next(self._remaining) <= 0: + raise StopIteration + + return next(self._next_id) + + +class _Block: + def __init__(self, id_batch, validity): + self._id_batch = id_batch + self._iterator = iter(self._id_batch) + self._invalid_since = validity + current_time() if validity > 0 else MAX_SIZE + + def next_id(self): + if self._invalid_since <= current_time(): + return None + + return next(self._iterator, None) + + +async def create_flake_id_generator_proxy(service_name, name, context): + return FlakeIdGenerator(service_name, name, context) diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index e2f4d0616b..0cf6e342c0 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -2,6 +2,7 @@ import typing from hazelcast.internal.asyncio_proxy.executor import create_executor_proxy +from hazelcast.internal.asyncio_proxy.flake_id_generator import create_flake_id_generator_proxy from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy from hazelcast.internal.asyncio_proxy.pn_counter import create_pn_counter_proxy @@ -22,6 +23,7 @@ from hazelcast.util import to_list EXECUTOR_SERVICE = "hz:impl:executorService" +FLAKE_ID_GENERATOR_SERVICE = "hz:impl:flakeIdGeneratorService" LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" MULTI_MAP_SERVICE = "hz:impl:multiMapService" @@ -100,6 +102,7 @@ async def create_reliable_topic_proxy(service_name, name, context): typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], ] = { EXECUTOR_SERVICE: create_executor_proxy, + FLAKE_ID_GENERATOR_SERVICE: create_flake_id_generator_proxy, LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, MULTI_MAP_SERVICE: create_multi_map_proxy, diff --git a/tests/integration/asyncio/proxy/flake_id_generator_test.py b/tests/integration/asyncio/proxy/flake_id_generator_test.py new file mode 100644 index 0000000000..a25bb237b9 --- /dev/null +++ b/tests/integration/asyncio/proxy/flake_id_generator_test.py @@ -0,0 +1,138 @@ +import asyncio +import random +import unittest + +from tests.integration.asyncio.base import SingleMemberTestCase, HazelcastTestCase +from tests.hzrc.ttypes import Lang +from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.errors import HazelcastError + +FLAKE_ID_STEP = 1 << 16 +SHORT_TERM_BATCH_SIZE = 3 +SHORT_TERM_VALIDITY_SECONDS = 3 +NUM_TASKS = 4 +NUM_IDS_IN_TASKS = 100000 + + +class FlakeIdGeneratorTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + config["flake_id_generators"] = { + "short-term": { + "prefetch_count": SHORT_TERM_BATCH_SIZE, + "prefetch_validity": SHORT_TERM_VALIDITY_SECONDS, + } + } + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.flake_id_generator = await self.client.get_flake_id_generator("test") + + async def asyncTearDown(self): + await self.flake_id_generator.destroy() + await super().asyncTearDown() + + async def test_new_id(self): + flake_id = await self.flake_id_generator.new_id() + self.assertIsNotNone(flake_id) + + async def test_new_id_generates_unique_ids(self): + id_set = set() + for i in range(10): + id_set.add(await self.flake_id_generator.new_id()) + + self.assertEqual(10, len(id_set)) + + async def test_new_id_generates_unique_ids_concurrent(self): + id_set = set() + + async def func(): + for i in range(NUM_IDS_IN_TASKS): + id_set.add(await self.flake_id_generator.new_id()) + + async with asyncio.TaskGroup() as tg: + for _ in range(NUM_TASKS): + tg.create_task(func()) + + self.assertEqual(NUM_TASKS * NUM_IDS_IN_TASKS, len(id_set)) + + async def test_subsequent_ids_are_from_same_batch(self): + first_id = await self.flake_id_generator.new_id() + second_id = await self.flake_id_generator.new_id() + self.assertEqual(second_id, first_id + FLAKE_ID_STEP) + + async def test_ids_are_from_new_batch_after_validity_period(self): + flake_id_generator = await self.client.get_flake_id_generator("short-term") + first_id = await flake_id_generator.new_id() + await asyncio.sleep(SHORT_TERM_VALIDITY_SECONDS + 1) + second_id = await flake_id_generator.new_id() + self.assertGreater(second_id, first_id + FLAKE_ID_STEP * SHORT_TERM_BATCH_SIZE) + await flake_id_generator.destroy() + + async def test_ids_are_from_new_batch_after_batch_is_exhausted(self): + flake_id_generator = await self.client.get_flake_id_generator("short-term") + first_id = await flake_id_generator.new_id() + for i in range(1, SHORT_TERM_BATCH_SIZE): + await flake_id_generator.new_id() + + # Batch is exhausted. We should wait for a little so that the member + # sends the new batch with a base greater than the last id + # generated + flake id step size. + await asyncio.sleep(1) + second_id = await flake_id_generator.new_id() + self.assertGreater(second_id, first_id + FLAKE_ID_STEP * SHORT_TERM_BATCH_SIZE) + await flake_id_generator.destroy() + + +class FlakeIdGeneratorIdOutOfRangeTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + def setUp(self): + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc, None) + self.cluster.start_member() + self.cluster.start_member() + + def tearDown(self): + self.rc.terminateCluster(self.cluster.id) + self.rc.exit() + + async def test_new_id_with_at_least_one_suitable_member(self): + response = await self.assign_out_of_range_node_id(self.cluster.id, random.randint(0, 1)) + self.assertTrue(response.success and response.result is not None) + client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, smart_routing=False + ) + generator = await client.get_flake_id_generator("test") + + for i in range(100): + await generator.new_id() + + await generator.destroy() + await client.shutdown() + + async def test_new_id_fails_when_all_members_are_out_of_node_id_range(self): + response1 = await self.assign_out_of_range_node_id(self.cluster.id, 0) + self.assertTrue(response1.success and response1.result is not None) + response2 = await self.assign_out_of_range_node_id(self.cluster.id, 1) + self.assertTrue(response2.success and response2.result is not None) + client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + generator = await client.get_flake_id_generator("test") + with self.assertRaises(HazelcastError): + await generator.new_id() + + await generator.destroy() + await client.shutdown() + + async def assign_out_of_range_node_id(self, cluster_id, instance_id): + script = """ + instance_%s.getCluster().getLocalMember().setMemberListJoinVersion(100000); + result = "" + instance_%s.getCluster().getLocalMember().getMemberListJoinVersion(); + """ % ( + instance_id, + instance_id, + ) + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, self.rc.executeOnController, cluster_id, script, Lang.JAVASCRIPT + )