diff --git a/dapr/common/pubsub/subscription.py b/dapr/common/pubsub/subscription.py index eb22a48d..51d1b122 100644 --- a/dapr/common/pubsub/subscription.py +++ b/dapr/common/pubsub/subscription.py @@ -1,5 +1,5 @@ import json -from typing import Optional, Union +from typing import Mapping, Optional, Union from google.protobuf.json_format import MessageToDict @@ -7,7 +7,15 @@ class SubscriptionMessage: - def __init__(self, msg: TopicEventRequest): + """Message delivered to Dapr pub/sub subscribers. + + Delivered by both the streaming subscription API (``DaprClient.subscribe``) and the + App callback API (``@app.subscribe``). ``metadata`` carries delivery metadata (gRPC + invocation metadata and, for bulk deliveries, per-entry publish metadata); it is only + populated for App callback deliveries. + """ + + def __init__(self, msg: TopicEventRequest, metadata: Optional[Mapping[str, str]] = None): self._id: str = msg.id self._source: str = msg.source self._type: str = msg.type @@ -17,6 +25,7 @@ def __init__(self, msg: TopicEventRequest): self._pubsub_name: str = msg.pubsub_name self._raw_data: bytes = msg.data self._data: Optional[Union[dict, str]] = None + self._metadata: dict[str, str] = dict(metadata) if metadata else {} try: self._extensions = MessageToDict(msg.extensions) @@ -58,6 +67,9 @@ def extensions(self): def data(self): return self._data + def metadata(self) -> dict[str, str]: + return self._metadata + def _parse_data_content(self): try: if self._data_content_type == 'application/json': diff --git a/dapr/ext/grpc/AGENTS.md b/dapr/ext/grpc/AGENTS.md index a802e6f5..5289f130 100644 --- a/dapr/ext/grpc/AGENTS.md +++ b/dapr/ext/grpc/AGENTS.md @@ -27,6 +27,7 @@ Installed via the `grpc` extra on core dapr: `pip install "dapr[grpc]"`. from dapr.ext.grpc import ( App, # Main entry point — decorator-based gRPC server Rule, # CEL-based topic rule with priority + SubscriptionMessage, # Event type received by pub/sub topic handlers (preferred) InvokeMethodRequest, # Request object for service invocation handlers InvokeMethodResponse, # Response object for service invocation handlers BindingRequest, # Request object for input binding handlers @@ -56,7 +57,7 @@ def handle_method(request: InvokeMethodRequest) -> InvokeMethodResponse: @app.subscribe(pubsub_name='pubsub', topic='orders', metadata={}, dead_letter_topic=None, rule=Rule('event.type == "order"', priority=1), disable_topic_validation=False) -def handle_event(event: v1.Event) -> Optional[TopicEventResponse]: +def handle_event(event: SubscriptionMessage) -> Optional[TopicEventResponse]: ... @app.binding('binding_name') @@ -115,7 +116,7 @@ app.register_health_check(lambda: None) # Not a decorator — direct registrati ## Dependencies (declared via the `grpc` extra in the root `pyproject.toml`) - `dapr` (core, same wheel as this extension) -- `cloudevents >= 1.0.0, < 2.0.0` +- `cloudevents >= 1.0.0, < 2.0.0` (deprecated, only used for the legacy handler event type) ## Testing @@ -133,6 +134,6 @@ Test patterns: - **Synchronous only**: Uses `grpc.server()` with `ThreadPoolExecutor(10)`. No async handler support. - **Default port**: 3010 (from `dapr.conf.global_settings.GRPC_APP_PORT`) -- **CloudEvents**: Requires `cloudevents >= 1.0.0` for pub/sub event handling +- **Topic handler event type**: inferred from the handler annotation. Annotating the event parameter with `dapr.ext.grpc.SubscriptionMessage` — the same SDK-owned type the streaming subscription API (`DaprClient.subscribe`) delivers, with `metadata()` populated from the gRPC invocation metadata — delivers that type. Unannotated or otherwise-annotated handlers receive the DEPRECATED `cloudevents.sdk.event.v1.Event` and `subscribe()` emits a `DeprecationWarning` at registration. Deprecation timeline: 1.20 delivers `SubscriptionMessage` to unannotated handlers (legacy only via explicit `v1.Event` annotation), 1.21 drops `cloudevents` from the `grpc` extra (import becomes conditional), 1.22 removes the legacy path entirely (same release the `flask_dapr` shim goes away). New code must annotate with `SubscriptionMessage`. (Internally the choice is plumbed through `_CallbackServicer.register_topic(legacy_cloudevent=...)`.) - **Duplicate registration**: Registering the same method/topic/binding name twice raises `ValueError` - **Missing handlers**: Calling an unregistered method/topic/binding raises `NotImplementedError` (gRPC UNIMPLEMENTED) diff --git a/dapr/ext/grpc/__init__.py b/dapr/ext/grpc/__init__.py index 8e521687..14e3b66f 100644 --- a/dapr/ext/grpc/__init__.py +++ b/dapr/ext/grpc/__init__.py @@ -16,6 +16,7 @@ from dapr.clients.grpc._jobs import ConstantFailurePolicy, DropFailurePolicy, FailurePolicy, Job from dapr.clients.grpc._request import BindingRequest, InvokeMethodRequest, JobEvent from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse +from dapr.common.pubsub.subscription import SubscriptionMessage try: from dapr.ext.grpc.app import App, Rule # type:ignore @@ -30,6 +31,7 @@ __all__ = [ 'App', 'Rule', + 'SubscriptionMessage', 'InvokeMethodRequest', 'InvokeMethodResponse', 'BindingRequest', diff --git a/dapr/ext/grpc/_servicer.py b/dapr/ext/grpc/_servicer.py index 5a16cc62..589ff001 100644 --- a/dapr/ext/grpc/_servicer.py +++ b/dapr/ext/grpc/_servicer.py @@ -25,6 +25,7 @@ from dapr.clients._constants import DEFAULT_JSON_CONTENT_TYPE from dapr.clients.grpc._request import BindingRequest, InvokeMethodRequest, JobEvent from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse +from dapr.common.pubsub.subscription import SubscriptionMessage from dapr.proto import appcallback_service_v1, appcallback_v1, common_v1 from dapr.proto.common.v1.common_pb2 import InvokeRequest from dapr.proto.runtime.v1.appcallback_pb2 import ( @@ -36,7 +37,9 @@ ) InvokeMethodCallable = Callable[[InvokeMethodRequest], Union[str, bytes, InvokeMethodResponse]] -TopicSubscribeCallable = Callable[[v1.Event], Optional[TopicEventResponse]] +TopicSubscribeCallable = Callable[ + [Union[v1.Event, SubscriptionMessage]], Optional[TopicEventResponse] +] BindingCallable = Callable[[BindingRequest], None] JobEventCallable = Callable[[JobEvent], None] @@ -74,6 +77,7 @@ class _CallbackServicer( def __init__(self): self._invoke_method_map: Dict[str, InvokeMethodCallable] = {} self._topic_map: Dict[str, TopicSubscribeCallable] = {} + self._topic_legacy_event: Dict[str, bool] = {} self._binding_map: Dict[str, BindingCallable] = {} self._job_event_map: Dict[str, JobEventCallable] = {} @@ -96,8 +100,15 @@ def register_topic( dead_letter_topic: Optional[str] = None, rule: Optional[Rule] = None, disable_topic_validation: Optional[bool] = False, + legacy_cloudevent: bool = True, ) -> None: - """Registers topic subscription for pubsub.""" + """Registers topic subscription for pubsub. + + Args: + legacy_cloudevent (bool): when True (deprecated default), the handler receives a + ``cloudevents.sdk.event.v1.Event``; when False, it receives a + :class:`dapr.common.pubsub.subscription.SubscriptionMessage`. + """ if not disable_topic_validation: topic_key = pubsub_name + DELIMITER + topic else: @@ -109,6 +120,7 @@ def register_topic( if pubsub_topic in self._topic_map: raise ValueError(f'{topic} is already registered with {pubsub_name}') self._topic_map[pubsub_topic] = cb + self._topic_legacy_event[pubsub_topic] = legacy_cloudevent registered_topic = self._registered_topics_map.get(topic_key) sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription() @@ -206,21 +218,27 @@ def OnTopicEvent(self, request: TopicEventRequest, context): context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore raise NotImplementedError(f'topic {request.topic} is not implemented!') - customdata: Struct = request.extensions - extensions = dict() - for k, v in customdata.items(): - extensions[k] = v - for k, v in context.invocation_metadata(): - extensions['_metadata_' + k] = v - - event = v1.Event() - event.SetEventType(request.type) - event.SetEventID(request.id) - event.SetSource(request.source) - event.SetData(request.data) - event.SetContentType(request.data_content_type) - event.SetSubject(request.topic) - event.SetExtensions(extensions) + invocation_metadata = dict(context.invocation_metadata()) + + event: Union[v1.Event, SubscriptionMessage] + if self._topic_legacy_event.get(pubsub_topic, True): + customdata: Struct = request.extensions + extensions = dict() + for k, v in customdata.items(): + extensions[k] = v + for k, v in invocation_metadata.items(): + extensions['_metadata_' + k] = v + + event = v1.Event() + event.SetEventType(request.type) + event.SetEventID(request.id) + event.SetSource(request.source) + event.SetData(request.data) + event.SetContentType(request.data_content_type) + event.SetSubject(request.topic) + event.SetExtensions(extensions) + else: + event = SubscriptionMessage(request, invocation_metadata) response = self._topic_map[pubsub_topic](event) if isinstance(response, TopicEventResponse): @@ -300,36 +318,20 @@ def _handle_bulk_topic_event( handler_key = topic_key if topic_key in self._topic_map else no_validation_key cb = self._topic_map[handler_key] # callback + use_legacy_event = self._topic_legacy_event.get(handler_key, True) + invocation_metadata = dict(context.invocation_metadata()) statuses = [] for entry in request.entries: entry_id = entry.entry_id try: - # Build event from entry & send req with many entries - event = v1.Event() - extensions = dict() - if entry.HasField('cloud_event') and entry.cloud_event: - ce = entry.cloud_event - event.SetEventType(ce.type) - event.SetEventID(ce.id) - event.SetSource(ce.source) - event.SetData(ce.data) - event.SetContentType(ce.data_content_type) - if ce.extensions: - for k, v in ce.extensions.items(): - extensions[k] = v + event: Union[v1.Event, SubscriptionMessage] + if use_legacy_event: + event = self._bulk_entry_legacy_event(entry, request, invocation_metadata) else: - event.SetEventID(entry_id) - event.SetData(entry.bytes if entry.HasField('bytes') else b'') - event.SetContentType(entry.content_type or '') - event.SetSubject(request.topic) - if entry.metadata: - for k, v in entry.metadata.items(): - extensions[k] = v - for k, v in context.invocation_metadata(): - extensions['_metadata_' + k] = v - if extensions: - event.SetExtensions(extensions) + event = self._bulk_entry_subscription_message( + entry, request, invocation_metadata + ) response = cb(event) # invoke app registered handler and send event if isinstance(response, TopicEventResponse): @@ -343,6 +345,70 @@ def _handle_bulk_topic_event( ) return appcallback_v1.TopicEventBulkResponse(statuses=statuses) + def _bulk_entry_legacy_event( + self, + entry, + request: TopicEventBulkRequest, + invocation_metadata: Dict[str, str], + ) -> v1.Event: + """Builds the deprecated cloudevents v1.Event for a bulk entry.""" + event = v1.Event() + extensions = dict() + if entry.HasField('cloud_event') and entry.cloud_event: + ce = entry.cloud_event + event.SetEventType(ce.type) + event.SetEventID(ce.id) + event.SetSource(ce.source) + event.SetData(ce.data) + event.SetContentType(ce.data_content_type) + if ce.extensions: + for k, v in ce.extensions.items(): + extensions[k] = v + else: + event.SetEventID(entry.entry_id) + event.SetData(entry.bytes if entry.HasField('bytes') else b'') + event.SetContentType(entry.content_type or '') + event.SetSubject(request.topic) + if entry.metadata: + for k, v in entry.metadata.items(): + extensions[k] = v + for k, v in invocation_metadata.items(): + extensions['_metadata_' + k] = v + if extensions: + event.SetExtensions(extensions) + return event + + def _bulk_entry_subscription_message( + self, + entry, + request: TopicEventBulkRequest, + invocation_metadata: Dict[str, str], + ) -> SubscriptionMessage: + """Builds a SubscriptionMessage for a bulk entry via a synthesized TopicEventRequest.""" + if entry.HasField('cloud_event') and entry.cloud_event: + ce = entry.cloud_event + entry_request = TopicEventRequest( + id=ce.id, + source=ce.source, + type=ce.type, + spec_version=ce.spec_version, + data_content_type=ce.data_content_type, + data=ce.data, + topic=request.topic, + pubsub_name=request.pubsub_name, + extensions=ce.extensions, + ) + else: + entry_request = TopicEventRequest( + id=entry.entry_id, + data=entry.bytes if entry.HasField('bytes') else b'', + data_content_type=entry.content_type or '', + topic=request.topic, + pubsub_name=request.pubsub_name, + ) + metadata = {**invocation_metadata, **dict(entry.metadata)} + return SubscriptionMessage(entry_request, metadata) + def OnBulkTopicEvent(self, request: TopicEventBulkRequest, context): """Subscribes bulk events from Pubsub""" response = self._handle_bulk_topic_event(request, context) diff --git a/dapr/ext/grpc/app.py b/dapr/ext/grpc/app.py index 5c48d4a3..c59a05cd 100644 --- a/dapr/ext/grpc/app.py +++ b/dapr/ext/grpc/app.py @@ -13,17 +13,37 @@ limitations under the License. """ +import inspect +import warnings from concurrent import futures -from typing import Dict, Optional +from typing import Callable, Dict, Optional, get_type_hints import grpc +from dapr.common.pubsub.subscription import SubscriptionMessage from dapr.conf import settings from dapr.ext.grpc._health_servicer import _HealthCheckServicer # type: ignore from dapr.ext.grpc._servicer import Rule, _CallbackServicer # type: ignore from dapr.proto import appcallback_service_v1 +def _wants_subscription_message(func: Callable) -> bool: + """True if the handler's first parameter is annotated with SubscriptionMessage. + + Falls back to False (legacy cloudevents delivery) whenever the signature or the + annotation cannot be resolved, so inference never breaks registration. + """ + try: + parameters = list(inspect.signature(func).parameters.values()) + resolved_hints = get_type_hints(func) + except Exception: + return False + if not parameters: + return False + annotation = resolved_hints.get(parameters[0].name) + return isinstance(annotation, type) and issubclass(annotation, SubscriptionMessage) + + class App: """App object implements a Dapr application callback which can interact with Dapr runtime. Once its object is initiated, it will act as a central registry for service invocation, @@ -162,13 +182,18 @@ def subscribe( ): """A decorator that is used to register the subscribing topic method. + The event type the handler receives is inferred from its annotation: annotate the + event parameter with :class:`dapr.ext.grpc.SubscriptionMessage` to receive that type. + Unannotated (or otherwise-annotated) handlers receive the deprecated + ``cloudevents.sdk.event.v1.Event`` and trigger a :class:`DeprecationWarning`. + The below example registers 'topic' subscription topic and pass custom metadata to pubsub component:: - from cloudevents.sdk.event import v1 + from dapr.ext.grpc import SubscriptionMessage - @app.subscribe('pubsub_name', 'topic', metadata=(('session-id', 'session-id-value'),)) - def topic(event: v1.Event) -> None: + @app.subscribe('pubsub_name', 'topic', metadata={'session-id': 'session-id-value'}) + def topic(event: SubscriptionMessage) -> None: ... Args: @@ -180,6 +205,17 @@ def topic(event: v1.Event) -> None: """ def decorator(func): + handler_wants_subscription_message = _wants_subscription_message(func) + if not handler_wants_subscription_message: + warnings.warn( + 'Topic handlers receive a deprecated cloudevents.sdk.event.v1.Event unless ' + 'their event parameter is annotated with dapr.ext.grpc.SubscriptionMessage. ' + 'Annotate the handler to adopt SubscriptionMessage and silence this warning; ' + 'a future release will deliver SubscriptionMessage to all handlers and drop ' + 'the cloudevents dependency.', + DeprecationWarning, + stacklevel=2, + ) self._servicer.register_topic( pubsub_name, topic, @@ -188,6 +224,7 @@ def decorator(func): dead_letter_topic, rule, disable_topic_validation, + legacy_cloudevent=not handler_wants_subscription_message, ) return decorator diff --git a/examples/pubsub-simple/subscriber.py b/examples/pubsub-simple/subscriber.py index 7dab9e92..fa699838 100644 --- a/examples/pubsub-simple/subscriber.py +++ b/examples/pubsub-simple/subscriber.py @@ -11,29 +11,30 @@ # limitations under the License. # ------------------------------------------------------------ -import json from time import sleep -from cloudevents.sdk.event import v1 - from dapr.clients.grpc._response import TopicEventResponse -from dapr.ext.grpc import App +from dapr.ext.grpc import App, SubscriptionMessage from dapr.proto import appcallback_v1 +# Handlers annotated with SubscriptionMessage receive that type. Unannotated handlers +# receive the deprecated cloudevents.sdk.event.v1.Event and a DeprecationWarning. app = App() should_retry = True # To control whether dapr should retry sending a message @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A') -def mytopic(event: v1.Event) -> TopicEventResponse: +def mytopic(event: SubscriptionMessage) -> TopicEventResponse: global should_retry - data = json.loads(event.Data()) + # event.data() is already parsed based on the content type (dict for application/json) + data = event.data() print( f'Subscriber received: id={data["id"]}, message="{data["message"]}", ' - f'content_type="{event.content_type}"', + f'content_type="{event.data_content_type()}"', flush=True, ) - # event.Metadata() contains a dictionary of cloud event extensions and publish metadata + # event.extensions() contains the cloud event extensions and + # event.metadata() the delivery metadata if should_retry: should_retry = False # we only retry once in this example sleep(0.5) # add some delay to help with ordering of expected logs @@ -42,25 +43,21 @@ def mytopic(event: v1.Event) -> TopicEventResponse: @app.subscribe(pubsub_name='pubsub', topic='TOPIC_CE') -def receive_cloud_events(event: v1.Event) -> TopicEventResponse: - print('Subscriber received: ' + event.Subject(), flush=True) +def receive_cloud_events(event: SubscriptionMessage) -> TopicEventResponse: + print('Subscriber received: ' + event.topic(), flush=True) - content_type = event.content_type - data = event.Data() + content_type = event.data_content_type() + # event.data() is parsed by content type: dict for JSON, str for plain text + data = event.data() try: if content_type == 'application/json': - # Handle JSON data - json_data = json.loads(data) print( - f'Subscriber received a json cloud event: id={json_data["id"]}, message="{json_data["message"]}", ' - f'content_type="{event.content_type}"', + f'Subscriber received a json cloud event: id={data["id"]}, message="{data["message"]}", ' + f'content_type="{content_type}"', flush=True, ) elif content_type == 'text/plain': - # Handle plain text data - if isinstance(data, bytes): - data = data.decode('utf-8') print( f'Subscriber received plain text cloud event: {data}, ' f'content_type="{content_type}"', @@ -78,21 +75,21 @@ def receive_cloud_events(event: v1.Event) -> TopicEventResponse: @app.subscribe(pubsub_name='pubsub', topic='TOPIC_D', dead_letter_topic='TOPIC_D_DEAD') -def fail_and_send_to_dead_topic(event: v1.Event) -> TopicEventResponse: +def fail_and_send_to_dead_topic(event: SubscriptionMessage) -> TopicEventResponse: return TopicEventResponse('retry') @app.subscribe(pubsub_name='pubsub', topic='TOPIC_D_DEAD') -def mytopic_dead(event: v1.Event) -> TopicEventResponse: - data = json.loads(event.Data()) +def mytopic_dead(event: SubscriptionMessage) -> TopicEventResponse: + data = event.data() print( f'Dead-Letter Subscriber received: id={data["id"]}, message="{data["message"]}", ' - f'content_type="{event.content_type}"', + f'content_type="{event.data_content_type()}"', flush=True, ) - print('Dead-Letter Subscriber. Received via deadletter topic: ' + event.Subject(), flush=True) + print('Dead-Letter Subscriber. Received via deadletter topic: ' + event.topic(), flush=True) print( - 'Dead-Letter Subscriber. Originally intended topic: ' + event.Extensions()['topic'], + 'Dead-Letter Subscriber. Originally intended topic: ' + event.extensions()['topic'], flush=True, ) return TopicEventResponse('success') @@ -110,11 +107,11 @@ def mytopic_dead(event: v1.Event) -> TopicEventResponse: # this allows subscribing to all events sent to this app - useful for wildcard topics @app.subscribe(pubsub_name='pubsub', topic='topic/#', disable_topic_validation=True) -def mytopic_wildcard(event: v1.Event) -> TopicEventResponse: - data = json.loads(event.Data()) +def mytopic_wildcard(event: SubscriptionMessage) -> TopicEventResponse: + data = event.data() print( f'Wildcard-Subscriber received: id={data["id"]}, message="{data["message"]}", ' - f'content_type="{event.content_type}"', + f'content_type="{event.data_content_type()}"', flush=True, ) return TopicEventResponse('success') diff --git a/tests/ext/grpc/test_app.py b/tests/ext/grpc/test_app.py index e25e6a27..0bb91b34 100644 --- a/tests/ext/grpc/test_app.py +++ b/tests/ext/grpc/test_app.py @@ -14,12 +14,12 @@ """ import unittest +import warnings from unittest.mock import MagicMock, patch -from cloudevents.sdk.event import v1 - from dapr.conf import settings -from dapr.ext.grpc import App, BindingRequest, InvokeMethodRequest, Rule +from dapr.ext.grpc import App, BindingRequest, InvokeMethodRequest, Rule, SubscriptionMessage +from dapr.proto import appcallback_v1 class AppTests(unittest.TestCase): @@ -51,17 +51,17 @@ def binding1(request: BindingRequest): def test_subscribe_decorator(self): @self._app.subscribe(pubsub_name='pubsub', topic='topic') - def handle_default(event: v1.Event) -> None: + def handle_default(event: SubscriptionMessage) -> None: pass @self._app.subscribe( pubsub_name='pubsub', topic='topic', rule=Rule('event.type == "test"', 1) ) - def handle_test_event(event: v1.Event) -> None: + def handle_test_event(event: SubscriptionMessage) -> None: pass @self._app.subscribe(pubsub_name='pubsub', topic='topic2', dead_letter_topic='topic2_dead') - def handle_dead_letter(event: v1.Event) -> None: + def handle_dead_letter(event: SubscriptionMessage) -> None: pass subscription_map = self._app._servicer._topic_map @@ -93,6 +93,84 @@ def test_no_health_check(self): self.assertIsNone(registered_cb) +class SubscribeEventTypeInferenceTests(unittest.TestCase): + """subscribe() infers the delivered event type from the handler's annotation.""" + + def setUp(self): + self._app = App() + self.fake_context = MagicMock() + self.fake_context.invocation_metadata.return_value = () + + def _subscribe_and_deliver(self, handler): + received = [] + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter('always') + wrapped = self._app.subscribe(pubsub_name='pubsub', topic='topic')(handler) + del wrapped + + def recording_handler(event): + received.append(event) + return handler(event) + + self._app._servicer._topic_map['pubsub:topic:'] = recording_handler + request = appcallback_v1.TopicEventRequest( + id='event-1', + data_content_type='application/json', + data=b'{"a": 1}', + topic='topic', + pubsub_name='pubsub', + ) + self._app._servicer.OnTopicEvent(request, self.fake_context) + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + return received[0], deprecations + + def test_unannotated_handler_warns_and_gets_legacy_event(self): + def handler(event): + pass + + event, deprecations = self._subscribe_and_deliver(handler) + self.assertNotIsInstance(event, SubscriptionMessage) + self.assertEqual('event-1', event.EventID()) + self.assertEqual(1, len(deprecations)) + self.assertIn('SubscriptionMessage', str(deprecations[0].message)) + + def test_annotated_handler_gets_subscription_message_without_warning(self): + def handler(event: SubscriptionMessage): + pass + + event, deprecations = self._subscribe_and_deliver(handler) + self.assertIsInstance(event, SubscriptionMessage) + self.assertEqual('event-1', event.id()) + self.assertEqual([], deprecations) + + def test_legacy_annotated_handler_warns_and_gets_legacy_event(self): + from cloudevents.sdk.event import v1 + + def handler(event: v1.Event): + pass + + event, deprecations = self._subscribe_and_deliver(handler) + self.assertIsInstance(event, v1.Event) + self.assertEqual(1, len(deprecations)) + + def test_string_annotation_resolves(self): + def handler(event: 'SubscriptionMessage'): + pass + + event, deprecations = self._subscribe_and_deliver(handler) + self.assertIsInstance(event, SubscriptionMessage) + self.assertEqual([], deprecations) + + def test_unresolvable_annotation_falls_back_to_legacy(self): + def handler(event): + pass + + handler.__annotations__ = {'event': 'NotARealType'} + event, deprecations = self._subscribe_and_deliver(handler) + self.assertNotIsInstance(event, SubscriptionMessage) + self.assertEqual(1, len(deprecations)) + + class AppGrpcOptionsTests(unittest.TestCase): """Exercises options passed to grpc.server() based on env var / constructor arg.""" diff --git a/tests/ext/grpc/test_servicier.py b/tests/ext/grpc/test_servicier.py index 38ad812f..53279cbc 100644 --- a/tests/ext/grpc/test_servicier.py +++ b/tests/ext/grpc/test_servicier.py @@ -393,3 +393,127 @@ def test_non_registered_job_event(self): if __name__ == '__main__': unittest.main() + + +class TopicEventTypeDeliveryTests(unittest.TestCase): + """Handlers registered with legacy_cloudevent=False receive SubscriptionMessage objects.""" + + def setUp(self): + self._servicer = _CallbackServicer() + self._handler = Mock() + self._handler.return_value = TopicEventResponse('success') + + self.fake_context = MagicMock() + self.fake_context.invocation_metadata.return_value = ( + ('key1', 'value1'), + ('key2', 'value2'), + ) + + def _topic_request(self): + return appcallback_v1.TopicEventRequest( + id='event-1', + source='app-1', + type='com.example.test', + spec_version='1.0', + data_content_type='application/json', + data=b'{"message": "hello"}', + topic='topic1', + pubsub_name='pubsub1', + ) + + def test_legacy_default_delivers_cloudevent(self): + from cloudevents.sdk.event import v1 + + self._servicer.register_topic('pubsub1', 'topic1', self._handler, {}) + self._servicer.OnTopicEvent(self._topic_request(), self.fake_context) + + event = self._handler.call_args[0][0] + self.assertIsInstance(event, v1.Event) + self.assertEqual('event-1', event.EventID()) + + def test_opt_out_delivers_topic_event(self): + from dapr.ext.grpc import SubscriptionMessage + + self._servicer.register_topic( + 'pubsub1', 'topic1', self._handler, {}, legacy_cloudevent=False + ) + self._servicer.OnTopicEvent(self._topic_request(), self.fake_context) + + event = self._handler.call_args[0][0] + self.assertIsInstance(event, SubscriptionMessage) + self.assertEqual('event-1', event.id()) + self.assertEqual('app-1', event.source()) + self.assertEqual('com.example.test', event.type()) + self.assertEqual('topic1', event.topic()) + self.assertEqual('pubsub1', event.pubsub_name()) + self.assertEqual('application/json', event.data_content_type()) + self.assertEqual(b'{"message": "hello"}', event.raw_data()) + self.assertEqual({'message': 'hello'}, event.data()) + self.assertEqual({'key1': 'value1', 'key2': 'value2'}, event.metadata()) + + def test_opt_out_bulk_cloud_event_entry(self): + from dapr.ext.grpc import SubscriptionMessage + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + TopicEventCERequest, + ) + + self._servicer.register_topic( + 'pubsub1', 'topic1', self._handler, {}, legacy_cloudevent=False + ) + ce = TopicEventCERequest( + id='ce-1', + source='app-1', + type='com.example.test', + spec_version='1.0', + data_content_type='text/plain', + data=b'payload', + ) + entry = TopicEventBulkRequestEntry( + entry_id='entry1', cloud_event=ce, metadata={'prop': 'val'} + ) + request = TopicEventBulkRequest( + id='bulk1', pubsub_name='pubsub1', topic='topic1', path='', entries=[entry] + ) + + resp = self._servicer.OnBulkTopicEvent(request, self.fake_context) + + self.assertEqual( + appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS, + resp.statuses[0].status, + ) + event = self._handler.call_args[0][0] + self.assertIsInstance(event, SubscriptionMessage) + self.assertEqual('ce-1', event.id()) + self.assertEqual('topic1', event.topic()) + self.assertEqual('pubsub1', event.pubsub_name()) + self.assertEqual('payload', event.data()) + self.assertEqual('val', event.metadata()['prop']) + self.assertEqual('value1', event.metadata()['key1']) + + def test_opt_out_bulk_raw_entry(self): + from dapr.ext.grpc import SubscriptionMessage + from dapr.proto.runtime.v1.appcallback_pb2 import ( + TopicEventBulkRequest, + TopicEventBulkRequestEntry, + ) + + self._servicer.register_topic( + 'pubsub1', 'topic1', self._handler, {}, legacy_cloudevent=False + ) + entry = TopicEventBulkRequestEntry( + entry_id='entry1', bytes=b'{"a": 1}', content_type='application/json' + ) + request = TopicEventBulkRequest( + id='bulk1', pubsub_name='pubsub1', topic='topic1', path='', entries=[entry] + ) + + resp = self._servicer.OnBulkTopicEvent(request, self.fake_context) + + self.assertEqual(1, len(resp.statuses)) + event = self._handler.call_args[0][0] + self.assertIsInstance(event, SubscriptionMessage) + self.assertEqual('entry1', event.id()) + self.assertEqual({'a': 1}, event.data()) + self.assertEqual('topic1', event.topic()) diff --git a/tests/integration/apps/pubsub_subscriber.py b/tests/integration/apps/pubsub_subscriber.py index 4e89bfb7..23defc9c 100644 --- a/tests/integration/apps/pubsub_subscriber.py +++ b/tests/integration/apps/pubsub_subscriber.py @@ -3,23 +3,19 @@ Used by integration tests to verify message delivery without relying on stdout. """ -import json - -from cloudevents.sdk.event import v1 - from dapr.clients import DaprClient from dapr.clients.grpc._response import TopicEventResponse -from dapr.ext.grpc import App +from dapr.ext.grpc import App, SubscriptionMessage app = App() @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A') -def handle_topic_a(event: v1.Event) -> TopicEventResponse: - data = json.loads(event.Data()) +def handle_topic_a(event: SubscriptionMessage) -> TopicEventResponse: + data = event.data() key = f'received-{data["run_id"]}-{data["id"]}' with DaprClient() as d: - d.save_state('statestore', key, event.Data()) + d.save_state('statestore', key, event.raw_data()) return TopicEventResponse('success')