From e6e9785d2a40e46f79d87c1419e6a00e373600d8 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 24 Mar 2026 12:58:58 +0300 Subject: [PATCH 1/5] Added FlakeID generator --- hazelcast/internal/asyncio_client.py | 13 ++ .../asyncio_proxy/flake_id_generator.py | 180 ++++++++++++++++++ hazelcast/internal/asyncio_proxy/manager.py | 3 + 3 files changed, 196 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/flake_id_generator.py diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index f6e2d62954..393bfa4ada 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -23,6 +23,7 @@ dynamic_config_add_vector_collection_config_codec, ) from hazelcast.internal.asyncio_proxy.manager import ( + FLAKE_ID_GENERATOR_SERVICE, LIST_SERVICE, MAP_SERVICE, ProxyManager, @@ -30,6 +31,7 @@ VECTOR_SERVICE, ) from hazelcast.internal.asyncio_proxy.base import Proxy +from hazelcast.internal.asyncio_proxy.flake_id_generator import FlakeIdGenerator from hazelcast.internal.asyncio_proxy.list import List from hazelcast.internal.asyncio_proxy.map import Map from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap @@ -286,6 +288,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp """ return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name) + async def get_flake_id_generator(self, name: str) -> FlakeIdGenerator: + """Returns the FlakeIdGenerator instance with the specified name. + + Args: + name: Name of the FlakeIdGenerator. + + Returns: + FlakeIdGenerator instance with the specified name. + """ + return await self._proxy_manager.get_or_create(FLAKE_ID_GENERATOR_SERVICE, name) + async def create_vector_collection_config( self, name: str, diff --git a/hazelcast/internal/asyncio_proxy/flake_id_generator.py b/hazelcast/internal/asyncio_proxy/flake_id_generator.py new file mode 100644 index 0000000000..b57dc690d0 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/flake_id_generator.py @@ -0,0 +1,180 @@ +import asyncio +import collections +import itertools + +from hazelcast.config import FlakeIdGeneratorConfig +from hazelcast.internal.asyncio_proxy.base import MAX_SIZE, Proxy +from hazelcast.protocol.codec import flake_id_generator_new_id_batch_codec +from hazelcast.util import current_time + + +class FlakeIdGenerator(Proxy): + """A cluster-wide unique ID generator. Generated IDs are int values and + are k-ordered (roughly ordered). IDs are in the range from 0 to 2^63 - 1. + + The IDs contain a timestamp component and a node ID component, which is + assigned when the member joins the cluster. This allows the IDs to be + ordered and unique without any coordination between members, which makes + the generator safe even in split-brain scenario. + + Timestamp component is in milliseconds since 1.1.2018, 0:00 UTC and + has 41 bits. This caps the useful lifespan of the generator to little less + than 70 years (until ~2088). The sequence component is 6 bits. If more + than 64 IDs are requested in single millisecond, IDs will gracefully + overflow to the next millisecond and uniqueness is guaranteed in this case. + The implementation does not allow overflowing by more than 15 seconds, if + IDs are requested at higher rate, the call will block. Note, however, that + clients are able to generate even faster because each call goes to a + different (random) member and the 64 IDs/ms limit is for single member. + + Node ID overflow: + It is possible to generate IDs on any member or client as long as there + is at least one member with join version smaller than 2^16 in the + cluster. The remedy is to restart the cluster: nodeId will be assigned + from zero again. Uniqueness after the restart will be preserved thanks + to the timestamp component. + + Warning: + Asyncio client FlakeIdGenerator proxy is not thread-safe, do not + access it from other threads. + """ + + _BITS_NODE_ID = 16 + _BITS_SEQUENCE = 6 + + def __init__(self, service_name, name, context): + super().__init__(service_name, name, context) + config = context.config.flake_id_generators.get(name, None) + if config is None: + config = FlakeIdGeneratorConfig() + self._auto_batcher = _AutoBatcher( + config.prefetch_count, config.prefetch_validity, self._new_id_batch + ) + + async def new_id(self) -> int: + """Generates and returns a cluster-wide unique ID. + + This method goes to a random member and gets a batch of IDs, which will + then be returned locally for a limited time. The pre-fetch size and + the validity time can be configured. + + Note: + Values returned from this method may not be strictly ordered. + + Returns: + New cluster-wide unique ID. + + Raises: + HazelcastError: If node ID for all members in the cluster is out + of valid range. See ``Node ID overflow`` note above. + """ + return await self._auto_batcher.new_id() + + def _new_id_batch(self, batch_size) -> asyncio.Future: + def handler(message): + response = flake_id_generator_new_id_batch_codec.decode_response(message) + return _IdBatch(response["base"], response["increment"], response["batch_size"]) + + request = flake_id_generator_new_id_batch_codec.encode_request(self.name, batch_size) + return self._invoke(request, handler) + + +class _AutoBatcher: + def __init__(self, batch_size, validity, id_generator): + self._batch_size = batch_size + self._validity = validity + self._batch_id_supplier = id_generator + self._block = _Block(_IdBatch(0, 0, 0), 0) + self._id_queue = collections.deque() + self._request_in_air = False + + async def new_id(self) -> int: + while True: + block = self._block + next_id = block.next_id() + if next_id is not None: + return next_id + + # In asyncio there is no preemption between non-await statements, + # so block cannot change here. The check mirrors the threading + # version for clarity. + if block is not self._block: + continue + + future = asyncio.get_running_loop().create_future() + self._id_queue.append(future) + if not self._request_in_air: + self._request_in_air = True + self._request_new_batch() + return await future + + def _request_new_batch(self): + future = self._batch_id_supplier(self._batch_size) + future.add_done_callback(self._assign_new_block) + + def _assign_new_block(self, future): + try: + new_batch_required = False + id_batch = future.result() + block = _Block(id_batch, self._validity) + while True: + try: + f = self._id_queue.popleft() + next_id = block.next_id() + if next_id is not None: + f.set_result(next_id) + else: + self._id_queue.appendleft(f) + new_batch_required = True + break + except IndexError: + break + if new_batch_required: + self._request_in_air = True + self._request_new_batch() + else: + self._request_in_air = False + self._block = block + except Exception as ex: + while True: + try: + f = self._id_queue.popleft() + f.set_exception(ex) + except IndexError: + break + self._request_in_air = False + + +class _IdBatch: + def __init__(self, base, increment, batch_size): + self._base = base + self._increment = increment + self._batch_size = batch_size + + def __iter__(self): + self._remaining = itertools.count(self._batch_size, -1) + self._next_id = itertools.count(self._base, self._increment) + return self + + def __next__(self): + if next(self._remaining) <= 0: + raise StopIteration + + return next(self._next_id) + + +class _Block: + def __init__(self, id_batch, validity): + self._id_batch = id_batch + self._iterator = iter(self._id_batch) + self._invalid_since = validity + current_time() if validity > 0 else MAX_SIZE + + def next_id(self): + if self._invalid_since <= current_time(): + return None + + return next(self._iterator, None) + + +async def create_flake_id_generator_proxy(service_name, name, context): + return FlakeIdGenerator(service_name, name, context) diff --git a/hazelcast/internal/asyncio_proxy/manager.py b/hazelcast/internal/asyncio_proxy/manager.py index 812d8eeb72..45e205f5b3 100644 --- a/hazelcast/internal/asyncio_proxy/manager.py +++ b/hazelcast/internal/asyncio_proxy/manager.py @@ -1,6 +1,7 @@ import asyncio import typing +from hazelcast.internal.asyncio_proxy.flake_id_generator import create_flake_id_generator_proxy from hazelcast.internal.asyncio_proxy.list import create_list_proxy from hazelcast.internal.asyncio_proxy.vector_collection import ( VectorCollection, @@ -13,6 +14,7 @@ from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy from hazelcast.util import to_list +FLAKE_ID_GENERATOR_SERVICE = "hz:impl:flakeIdGeneratorService" LIST_SERVICE = "hz:impl:listService" MAP_SERVICE = "hz:impl:mapService" REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService" @@ -22,6 +24,7 @@ str, typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]], ] = { + FLAKE_ID_GENERATOR_SERVICE: create_flake_id_generator_proxy, LIST_SERVICE: create_list_proxy, MAP_SERVICE: create_map_proxy, REPLICATED_MAP_SERVICE: create_replicated_map_proxy, From 3b5f4bd2d08ec5d1224ecf061ddeff83c0c2fd88 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 31 Mar 2026 14:35:55 +0300 Subject: [PATCH 2/5] Update --- hazelcast/internal/asyncio_proxy/flake_id_generator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hazelcast/internal/asyncio_proxy/flake_id_generator.py b/hazelcast/internal/asyncio_proxy/flake_id_generator.py index b57dc690d0..fdea1b7bae 100644 --- a/hazelcast/internal/asyncio_proxy/flake_id_generator.py +++ b/hazelcast/internal/asyncio_proxy/flake_id_generator.py @@ -89,6 +89,7 @@ def __init__(self, batch_size, validity, id_generator): self._request_in_air = False async def new_id(self) -> int: + loop = asyncio.get_running_loop() while True: block = self._block next_id = block.next_id() @@ -101,7 +102,7 @@ async def new_id(self) -> int: if block is not self._block: continue - future = asyncio.get_running_loop().create_future() + future = loop.create_future() self._id_queue.append(future) if not self._request_in_air: self._request_in_air = True From fe6bc9c34a9e7634734e1c63620b0da274addef3 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 6 May 2026 12:22:55 +0300 Subject: [PATCH 3/5] Added flake ID generator tests --- .../asyncio/proxy/flake_id_generator_test.py | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 tests/integration/asyncio/proxy/flake_id_generator_test.py diff --git a/tests/integration/asyncio/proxy/flake_id_generator_test.py b/tests/integration/asyncio/proxy/flake_id_generator_test.py new file mode 100644 index 0000000000..233bb50641 --- /dev/null +++ b/tests/integration/asyncio/proxy/flake_id_generator_test.py @@ -0,0 +1,138 @@ +import asyncio +import random +import unittest + +from tests.integration.asyncio.base import SingleMemberTestCase, HazelcastTestCase +from tests.hzrc.ttypes import Lang +from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.errors import HazelcastError + +FLAKE_ID_STEP = 1 << 16 +SHORT_TERM_BATCH_SIZE = 3 +SHORT_TERM_VALIDITY_SECONDS = 3 +NUM_TASKS = 4 +NUM_IDS_IN_TASKS = 100000 + + +class FlakeIdGeneratorTest(SingleMemberTestCase): + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + config["flake_id_generators"] = { + "short-term": { + "prefetch_count": SHORT_TERM_BATCH_SIZE, + "prefetch_validity": SHORT_TERM_VALIDITY_SECONDS, + } + } + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.flake_id_generator = await self.client.get_flake_id_generator("test") + + async def asyncTearDown(self): + await self.flake_id_generator.destroy() + await super().asyncTearDown() + + async def test_new_id(self): + flake_id = await self.flake_id_generator.new_id() + self.assertIsNotNone(flake_id) + + async def test_new_id_generates_unique_ids(self): + id_set = set() + for i in range(10): + id_set.add(await self.flake_id_generator.new_id()) + + self.assertEqual(10, len(id_set)) + + async def test_new_id_generates_unique_ids_concurrent(self): + id_set = set() + + async def func(): + for i in range(NUM_IDS_IN_TASKS): + id_set.add(await self.flake_id_generator.new_id()) + + async with asyncio.TaskGroup() as tg: + for _ in range(NUM_TASKS): + tg.create_task(func()) + + self.assertEqual(NUM_TASKS * NUM_IDS_IN_TASKS, len(id_set)) + + async def test_subsequent_ids_are_from_same_batch(self): + first_id = await self.flake_id_generator.new_id() + second_id = await self.flake_id_generator.new_id() + self.assertEqual(second_id, first_id + FLAKE_ID_STEP) + + async def test_ids_are_from_new_batch_after_validity_period(self): + flake_id_generator = await self.client.get_flake_id_generator("short-term") + first_id = await flake_id_generator.new_id() + await asyncio.sleep(SHORT_TERM_VALIDITY_SECONDS + 1) + second_id = await flake_id_generator.new_id() + self.assertGreater(second_id, first_id + FLAKE_ID_STEP * SHORT_TERM_BATCH_SIZE) + await flake_id_generator.destroy() + + async def test_ids_are_from_new_batch_after_batch_is_exhausted(self): + flake_id_generator = await self.client.get_flake_id_generator("short-term") + first_id = await flake_id_generator.new_id() + for i in range(1, SHORT_TERM_BATCH_SIZE): + await flake_id_generator.new_id() + + # Batch is exhausted. We should wait for a little so that the member + # sends the new batch with a base greater than the last id + # generated + flake id step size. + await asyncio.sleep(1) + second_id = await flake_id_generator.new_id() + self.assertGreater(second_id, first_id + FLAKE_ID_STEP * SHORT_TERM_BATCH_SIZE) + await flake_id_generator.destroy() + + +class FlakeIdGeneratorIdOutOfRangeTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + + def setUp(self): + self.rc = self.create_rc() + self.cluster = self.create_cluster(self.rc, None) + self.cluster.start_member() + self.cluster.start_member() + + def tearDown(self): + self.rc.terminateCluster(self.cluster.id) + self.rc.exit() + + async def test_new_id_with_at_least_one_suitable_member(self): + response = await self.assign_out_of_range_node_id(self.cluster.id, random.randint(0, 1)) + self.assertTrue(response.success and response.result is not None) + client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, smart_routing=False + ) + generator = await client.get_flake_id_generator("test") + + for i in range(100): + await generator.new_id() + + await generator.destroy() + await client.shutdown() + + async def test_new_id_fails_when_all_members_are_out_of_node_id_range(self): + response1 = await self.assign_out_of_range_node_id(self.cluster.id, 0) + self.assertTrue(response1.success and response1.result is not None) + response2 = await self.assign_out_of_range_node_id(self.cluster.id, 1) + self.assertTrue(response2.success and response2.result is not None) + client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + generator = await client.get_flake_id_generator("test") + with self.assertRaises(HazelcastError): + await generator.new_id() + + await generator.destroy() + await client.shutdown() + + async def assign_out_of_range_node_id(self, cluster_id, instance_id): + script = """ + instance_%s.getCluster().getLocalMember().setMemberListJoinVersion(100000); + result = "" + instance_%s.getCluster().getLocalMember().getMemberListJoinVersion(); + """ % ( + instance_id, + instance_id, + ) + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self.rc.executeOnController, cluster_id, script, Lang.JAVASCRIPT) From 638b4d8f1c3995e341c28a6c9e1d7d331eb5dcd6 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 6 May 2026 13:18:48 +0300 Subject: [PATCH 4/5] Remove Slack link --- README.rst | 4 ---- docs/getting_help.rst | 1 - docs/index.rst | 3 --- 3 files changed, 8 deletions(-) diff --git a/README.rst b/README.rst index ed5136699c..55e6c15244 100644 --- a/README.rst +++ b/README.rst @@ -7,9 +7,6 @@ Hazelcast Python Client .. image:: https://img.shields.io/readthedocs/hazelcast :target: https://hazelcast.readthedocs.io :alt: Read the Docs -.. image:: https://img.shields.io/badge/slack-chat-green.svg - :target: https://slack.hazelcast.com - :alt: Join the community on Slack .. image:: https://img.shields.io/pypi/l/hazelcast-python-client :target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt :alt: License @@ -161,7 +158,6 @@ development/usage issues: - `GitHub repository `__ - `Documentation `__ -- `Slack `__ Contributing ------------ diff --git a/docs/getting_help.rst b/docs/getting_help.rst index 95de9eecc9..8c651bbf21 100644 --- a/docs/getting_help.rst +++ b/docs/getting_help.rst @@ -5,4 +5,3 @@ You can use the following channels for your questions and development/usage issues: - `Github Repository `__ -- `Slack `__ diff --git a/docs/index.rst b/docs/index.rst index 3f6f779276..f226881b09 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,9 +7,6 @@ Hazelcast Python Client .. image:: https://img.shields.io/static/v1?label=Github&message=Hazelcast%20Python%20client&style=flat&logo=github :target: https://github.com/hazelcast/hazelcast-python-client :alt: Github Repository -.. image:: https://img.shields.io/badge/slack-chat-green.svg - :target: https://slack.hazelcast.com - :alt: Join the community on Slack .. image:: https://img.shields.io/pypi/l/hazelcast-python-client :target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt :alt: License From 63309b09976b2394eabff50ba869cdb6d03ad3e4 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Fri, 8 May 2026 17:30:35 +0300 Subject: [PATCH 5/5] update --- hazelcast/asyncio/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 59f34efa89..ff86b76401 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -9,6 +9,8 @@ "HazelcastClient", "List", "Map", + "ReliableMessageListener", + "ReliableTopic", "ReplicatedMap", "VectorCollection", ]