Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ Hazelcast Python Client
.. image:: https://img.shields.io/readthedocs/hazelcast
:target: https://hazelcast.readthedocs.io
:alt: Read the Docs
.. image:: https://img.shields.io/badge/slack-chat-green.svg
:target: https://slack.hazelcast.com
:alt: Join the community on Slack
.. image:: https://img.shields.io/pypi/l/hazelcast-python-client
:target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt
:alt: License
Expand Down Expand Up @@ -161,7 +158,6 @@ development/usage issues:
- `GitHub
repository <https://github.com/hazelcast/hazelcast-python-client/issues/new>`__
- `Documentation <https://hazelcast.readthedocs.io>`__
- `Slack <https://slack.hazelcast.com>`__

Contributing
------------
Expand Down
1 change: 0 additions & 1 deletion docs/getting_help.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ You can use the following channels for your questions and
development/usage issues:

- `Github Repository <https://github.com/hazelcast/hazelcast-python-client/issues/new>`__
- `Slack <https://slack.hazelcast.com>`__
3 changes: 0 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ Hazelcast Python Client
.. image:: https://img.shields.io/static/v1?label=Github&message=Hazelcast%20Python%20client&style=flat&logo=github
:target: https://github.com/hazelcast/hazelcast-python-client
:alt: Github Repository
.. image:: https://img.shields.io/badge/slack-chat-green.svg
:target: https://slack.hazelcast.com
:alt: Join the community on Slack
.. image:: https://img.shields.io/pypi/l/hazelcast-python-client
:target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt
:alt: License
Expand Down
15 changes: 14 additions & 1 deletion hazelcast/internal/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
REPLICATED_MAP_SERVICE,
RINGBUFFER_SERVICE,
SET_SERVICE,
TOPIC_SERVICE,
VECTOR_SERVICE,
)
from hazelcast.internal.asyncio_proxy.base import Proxy
Expand All @@ -42,10 +43,11 @@
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer
from hazelcast.internal.asyncio_proxy.set import Set
from hazelcast.internal.asyncio_proxy.topic import Topic
from hazelcast.internal.asyncio_reactor import AsyncioReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.internal.asyncio_statistics import Statistics
from hazelcast.types import KeyType, ValueType, ItemType
from hazelcast.types import KeyType, MessageType, ValueType, ItemType
from hazelcast.util import AtomicInteger, RoundRobinLB

__all__ = ("HazelcastClient",)
Expand Down Expand Up @@ -350,6 +352,17 @@ async def get_ringbuffer(self, name: str) -> Ringbuffer[ItemType]:
"""
return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name)

async def get_topic(self, name: str) -> Topic[MessageType]:
"""Returns the distributed topic instance with the specified name.

Args:
name: Name of the distributed topic.

Returns:
Distributed topic instance with the specified name.
"""
return await self._proxy_manager.get_or_create(TOPIC_SERVICE, name)

async def create_vector_collection_config(
self,
name: str,
Expand Down
3 changes: 3 additions & 0 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy
from hazelcast.internal.asyncio_proxy.queue import create_queue_proxy
from hazelcast.internal.asyncio_proxy.set import create_set_proxy
from hazelcast.internal.asyncio_proxy.topic import create_topic_proxy
from hazelcast.internal.asyncio_proxy.vector_collection import (
create_vector_collection_proxy,
)
Expand All @@ -25,6 +26,7 @@
REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService"
RINGBUFFER_SERVICE = "hz:impl:ringbufferService"
SET_SERVICE = "hz:impl:setService"
TOPIC_SERVICE = "hz:impl:topicService"
VECTOR_SERVICE = "hz:service:vector"

_proxy_init: typing.Dict[
Expand All @@ -39,6 +41,7 @@
REPLICATED_MAP_SERVICE: create_replicated_map_proxy,
RINGBUFFER_SERVICE: create_ringbuffer_proxy,
SET_SERVICE: create_set_proxy,
TOPIC_SERVICE: create_topic_proxy,
VECTOR_SERVICE: create_vector_collection_proxy,
}

Expand Down
117 changes: 117 additions & 0 deletions hazelcast/internal/asyncio_proxy/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import typing

from hazelcast.protocol.codec import (
topic_add_message_listener_codec,
topic_publish_codec,
topic_publish_all_codec,
topic_remove_message_listener_codec,
)
from hazelcast.internal.asyncio_proxy.base import PartitionSpecificProxy
from hazelcast.proxy.base import TopicMessage
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.types import MessageType
from hazelcast.util import check_not_none


class Topic(PartitionSpecificProxy, typing.Generic[MessageType]):
"""Hazelcast provides distribution mechanism for publishing messages that
are delivered to multiple subscribers, which is also known as a
publish/subscribe (pub/sub) messaging model.

Publish and subscriptions are cluster-wide. When a member subscribes to
a topic, it is actually registering for messages published by any member
in the cluster, including the new members joined after you added the
listener.

Messages are ordered, meaning that listeners(subscribers) will process the
messages in the order they are actually published.

Example:
>>> my_topic = await client.get_topic("my_topic")
>>> await my_topic.publish("hello")

Warning:
Asyncio client topic proxy is not thread-safe, do not access it from other threads.
"""

async def add_listener(
self, on_message: typing.Callable[[TopicMessage[MessageType]], None] = None
Comment thread
emreyigit marked this conversation as resolved.
) -> str:
"""Subscribes to this topic.

When someone publishes a message on this topic, ``on_message`` function
is called if provided.

Args:
on_message: Function to be called when a message is published. This function must not block.

Returns:
A registration id which is used as a key to remove the listener.
"""
check_not_none(on_message, "on_message can't be None")
codec = topic_add_message_listener_codec
request = codec.encode_request(self.name, self._is_smart)

def handle(item_data, publish_time, uuid):
member = self._context.cluster_service.get_member(uuid)
item_event = TopicMessage(
self.name, self._to_object(item_data), publish_time / 1000.0, member
)
on_message(item_event)

return await self._register_listener(
request,
lambda r: codec.decode_response(r),
lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, handle),
)

async def publish(self, message: MessageType) -> None:
"""Publishes the message to all subscribers of this topic.

Args:
message: The message to be published.
"""
try:
message_data = self._to_data(message)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.publish, message)

request = topic_publish_codec.encode_request(self.name, message_data)
return await self._invoke(request)

async def publish_all(self, messages: typing.Sequence[MessageType]) -> None:
"""Publishes the messages to all subscribers of this topic.

Args:
messages: The messages to be published.
"""
check_not_none(messages, "Messages cannot be None")
try:
topic_messages = []
for m in messages:
check_not_none(m, "Message cannot be None")
data = self._to_data(m)
topic_messages.append(data)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.publish_all, messages)

request = topic_publish_all_codec.encode_request(self.name, topic_messages)
return await self._invoke(request)

async def remove_listener(self, registration_id: str) -> bool:
"""Stops receiving messages for the given message listener.

If the given listener already removed, this method does nothing.

Args:
registration_id: Registration id of the listener to be removed.

Returns:
``True`` if the listener is removed, ``False`` otherwise.
"""
return await self._deregister_listener(registration_id)


async def create_topic_proxy(service_name, name, context):
return Topic(service_name, name, context)
3 changes: 2 additions & 1 deletion hazelcast/proxy/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Topic(PartitionSpecificProxy["BlockingTopic"], typing.Generic[MessageType]
are delivered to multiple subscribers, which is also known as a
publish/subscribe (pub/sub) messaging model.

Publish and subscriptions are cluster-wide. When a member subscribes for
Publish and subscriptions are cluster-wide. When a member subscribes to
a topic, it is actually registering for messages published by any member
in the cluster, including the new members joined after you added the
listener.
Expand All @@ -41,6 +41,7 @@ def add_listener(
Returns:
A registration id which is used as a key to remove the listener.
"""
check_not_none(on_message, "on_message can't be None")
codec = topic_add_message_listener_codec
request = codec.encode_request(self.name, self._is_smart)

Expand Down
87 changes: 87 additions & 0 deletions tests/integration/asyncio/proxy/topic_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from tests.integration.asyncio.base import SingleMemberTestCase
from tests.util import (
random_string,
event_collector,
skip_if_client_version_older_than,
skip_if_server_version_older_than,
)


class TopicTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

async def asyncSetUp(self):
await super().asyncSetUp()
self.topic = await self.client.get_topic(random_string())

async def asyncTearDown(self):
await self.topic.destroy()
await super().asyncTearDown()

async def test_add_listener(self):
collector = event_collector()
await self.topic.add_listener(on_message=collector)
await self.topic.publish("item-value")

def assert_event():
self.assertEqual(len(collector.events), 1)
event = collector.events[0]
self.assertEqual(event.message, "item-value")
self.assertGreater(event.publish_time, 0)

await self.assertTrueEventually(assert_event, 5)

async def test_remove_listener(self):
collector = event_collector()
reg_id = await self.topic.add_listener(on_message=collector)
await self.topic.remove_listener(reg_id)
await self.topic.publish("item-value")

def assert_event():
self.assertEqual(len(collector.events), 0)
if len(collector.events) > 0:
event = collector.events[0]
self.assertEqual(event.message, "item-value")
self.assertGreater(event.publish_time, 0)

await self.assertTrueEventually(assert_event, 5)

async def test_str(self):
self.assertTrue(str(self.topic).startswith("Topic"))

async def test_publish_all(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

collector = event_collector()
await self.topic.add_listener(on_message=collector)

messages = ["message1", "message2", "message3"]
await self.topic.publish_all(messages)

def assert_event():
self.assertEqual(len(collector.events), 3)

await self.assertTrueEventually(assert_event, 5)

async def test_publish_all_none_messages(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

with self.assertRaises(AssertionError):
await self.topic.publish_all(None)

async def test_publish_all_none_message(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

messages = ["message1", None, "message3"]
with self.assertRaises(AssertionError):
await self.topic.publish_all(messages)

async def test_ensure_on_messsage_is_not_none(self):
with self.assertRaises(AssertionError):
await self.topic.add_listener(None)
4 changes: 4 additions & 0 deletions tests/integration/backward_compatible/proxy/topic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ def test_publish_all_none_message(self):
messages = ["message1", None, "message3"]
with self.assertRaises(AssertionError):
self.topic.publish_all(messages)

def test_ensure_on_messsage_is_not_none(self):
with self.assertRaises(AssertionError):
self.topic.add_listener(None)
Loading