Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions dapr/common/pubsub/subscription.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import json
from typing import Optional, Union
from typing import Mapping, Optional, Union

from google.protobuf.json_format import MessageToDict

from dapr.proto.runtime.v1.appcallback_pb2 import TopicEventRequest


class SubscriptionMessage:
def __init__(self, msg: TopicEventRequest):
"""Message delivered to Dapr pub/sub subscribers.

Delivered by both the streaming subscription API (``DaprClient.subscribe``) and the
App callback API (``@app.subscribe``). ``metadata`` carries delivery metadata (gRPC
invocation metadata and, for bulk deliveries, per-entry publish metadata); it is only
populated for App callback deliveries.
"""

def __init__(self, msg: TopicEventRequest, metadata: Optional[Mapping[str, str]] = None):
self._id: str = msg.id
self._source: str = msg.source
self._type: str = msg.type
Expand All @@ -17,6 +25,7 @@ def __init__(self, msg: TopicEventRequest):
self._pubsub_name: str = msg.pubsub_name
self._raw_data: bytes = msg.data
self._data: Optional[Union[dict, str]] = None
self._metadata: dict[str, str] = dict(metadata) if metadata else {}

try:
self._extensions = MessageToDict(msg.extensions)
Expand Down Expand Up @@ -58,6 +67,9 @@ def extensions(self):
def data(self):
return self._data

def metadata(self) -> dict[str, str]:
return self._metadata

def _parse_data_content(self):
try:
if self._data_content_type == 'application/json':
Expand Down
7 changes: 4 additions & 3 deletions dapr/ext/grpc/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Installed via the `grpc` extra on core dapr: `pip install "dapr[grpc]"`.
from dapr.ext.grpc import (
App, # Main entry point — decorator-based gRPC server
Rule, # CEL-based topic rule with priority
SubscriptionMessage, # Event type received by pub/sub topic handlers (preferred)
InvokeMethodRequest, # Request object for service invocation handlers
InvokeMethodResponse, # Response object for service invocation handlers
BindingRequest, # Request object for input binding handlers
Expand Down Expand Up @@ -56,7 +57,7 @@ def handle_method(request: InvokeMethodRequest) -> InvokeMethodResponse:

@app.subscribe(pubsub_name='pubsub', topic='orders', metadata={}, dead_letter_topic=None,
rule=Rule('event.type == "order"', priority=1), disable_topic_validation=False)
def handle_event(event: v1.Event) -> Optional[TopicEventResponse]:
def handle_event(event: SubscriptionMessage) -> Optional[TopicEventResponse]:
...

@app.binding('binding_name')
Expand Down Expand Up @@ -115,7 +116,7 @@ app.register_health_check(lambda: None) # Not a decorator — direct registrati
## Dependencies (declared via the `grpc` extra in the root `pyproject.toml`)

- `dapr` (core, same wheel as this extension)
- `cloudevents >= 1.0.0, < 2.0.0`
- `cloudevents >= 1.0.0, < 2.0.0` (deprecated, only used for the legacy handler event type)

## Testing

Expand All @@ -133,6 +134,6 @@ Test patterns:

- **Synchronous only**: Uses `grpc.server()` with `ThreadPoolExecutor(10)`. No async handler support.
- **Default port**: 3010 (from `dapr.conf.global_settings.GRPC_APP_PORT`)
- **CloudEvents**: Requires `cloudevents >= 1.0.0` for pub/sub event handling
- **Topic handler event type**: inferred from the handler annotation. Annotating the event parameter with `dapr.ext.grpc.SubscriptionMessage` — the same SDK-owned type the streaming subscription API (`DaprClient.subscribe`) delivers, with `metadata()` populated from the gRPC invocation metadata — delivers that type. Unannotated or otherwise-annotated handlers receive the DEPRECATED `cloudevents.sdk.event.v1.Event` and `subscribe()` emits a `DeprecationWarning` at registration. Deprecation timeline: 1.20 delivers `SubscriptionMessage` to unannotated handlers (legacy only via explicit `v1.Event` annotation), 1.21 drops `cloudevents` from the `grpc` extra (import becomes conditional), 1.22 removes the legacy path entirely (same release the `flask_dapr` shim goes away). New code must annotate with `SubscriptionMessage`. (Internally the choice is plumbed through `_CallbackServicer.register_topic(legacy_cloudevent=...)`.)
- **Duplicate registration**: Registering the same method/topic/binding name twice raises `ValueError`
- **Missing handlers**: Calling an unregistered method/topic/binding raises `NotImplementedError` (gRPC UNIMPLEMENTED)
2 changes: 2 additions & 0 deletions dapr/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dapr.clients.grpc._jobs import ConstantFailurePolicy, DropFailurePolicy, FailurePolicy, Job
from dapr.clients.grpc._request import BindingRequest, InvokeMethodRequest, JobEvent
from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse
from dapr.common.pubsub.subscription import SubscriptionMessage

try:
from dapr.ext.grpc.app import App, Rule # type:ignore
Expand All @@ -30,6 +31,7 @@
__all__ = [
'App',
'Rule',
'SubscriptionMessage',
'InvokeMethodRequest',
'InvokeMethodResponse',
'BindingRequest',
Expand Down
148 changes: 107 additions & 41 deletions dapr/ext/grpc/_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dapr.clients._constants import DEFAULT_JSON_CONTENT_TYPE
from dapr.clients.grpc._request import BindingRequest, InvokeMethodRequest, JobEvent
from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse
from dapr.common.pubsub.subscription import SubscriptionMessage
from dapr.proto import appcallback_service_v1, appcallback_v1, common_v1
from dapr.proto.common.v1.common_pb2 import InvokeRequest
from dapr.proto.runtime.v1.appcallback_pb2 import (
Expand All @@ -36,7 +37,9 @@
)

InvokeMethodCallable = Callable[[InvokeMethodRequest], Union[str, bytes, InvokeMethodResponse]]
TopicSubscribeCallable = Callable[[v1.Event], Optional[TopicEventResponse]]
TopicSubscribeCallable = Callable[
[Union[v1.Event, SubscriptionMessage]], Optional[TopicEventResponse]
]
BindingCallable = Callable[[BindingRequest], None]
JobEventCallable = Callable[[JobEvent], None]

Expand Down Expand Up @@ -74,6 +77,7 @@ class _CallbackServicer(
def __init__(self):
self._invoke_method_map: Dict[str, InvokeMethodCallable] = {}
self._topic_map: Dict[str, TopicSubscribeCallable] = {}
self._topic_legacy_event: Dict[str, bool] = {}
self._binding_map: Dict[str, BindingCallable] = {}
self._job_event_map: Dict[str, JobEventCallable] = {}

Expand All @@ -96,8 +100,15 @@ def register_topic(
dead_letter_topic: Optional[str] = None,
rule: Optional[Rule] = None,
disable_topic_validation: Optional[bool] = False,
legacy_cloudevent: bool = True,
) -> None:
"""Registers topic subscription for pubsub."""
"""Registers topic subscription for pubsub.

Args:
legacy_cloudevent (bool): when True (deprecated default), the handler receives a
``cloudevents.sdk.event.v1.Event``; when False, it receives a
:class:`dapr.common.pubsub.subscription.SubscriptionMessage`.
"""
if not disable_topic_validation:
topic_key = pubsub_name + DELIMITER + topic
else:
Expand All @@ -109,6 +120,7 @@ def register_topic(
if pubsub_topic in self._topic_map:
raise ValueError(f'{topic} is already registered with {pubsub_name}')
self._topic_map[pubsub_topic] = cb
self._topic_legacy_event[pubsub_topic] = legacy_cloudevent

registered_topic = self._registered_topics_map.get(topic_key)
sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription()
Expand Down Expand Up @@ -206,21 +218,27 @@ def OnTopicEvent(self, request: TopicEventRequest, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
raise NotImplementedError(f'topic {request.topic} is not implemented!')

customdata: Struct = request.extensions
extensions = dict()
for k, v in customdata.items():
extensions[k] = v
for k, v in context.invocation_metadata():
extensions['_metadata_' + k] = v

event = v1.Event()
event.SetEventType(request.type)
event.SetEventID(request.id)
event.SetSource(request.source)
event.SetData(request.data)
event.SetContentType(request.data_content_type)
event.SetSubject(request.topic)
event.SetExtensions(extensions)
invocation_metadata = dict(context.invocation_metadata())

event: Union[v1.Event, SubscriptionMessage]
if self._topic_legacy_event.get(pubsub_topic, True):
customdata: Struct = request.extensions
extensions = dict()
for k, v in customdata.items():
extensions[k] = v
for k, v in invocation_metadata.items():
extensions['_metadata_' + k] = v

event = v1.Event()
event.SetEventType(request.type)
event.SetEventID(request.id)
event.SetSource(request.source)
event.SetData(request.data)
event.SetContentType(request.data_content_type)
event.SetSubject(request.topic)
event.SetExtensions(extensions)
else:
event = SubscriptionMessage(request, invocation_metadata)

response = self._topic_map[pubsub_topic](event)
if isinstance(response, TopicEventResponse):
Expand Down Expand Up @@ -300,36 +318,20 @@ def _handle_bulk_topic_event(

handler_key = topic_key if topic_key in self._topic_map else no_validation_key
cb = self._topic_map[handler_key] # callback
use_legacy_event = self._topic_legacy_event.get(handler_key, True)
invocation_metadata = dict(context.invocation_metadata())

statuses = []
for entry in request.entries:
entry_id = entry.entry_id
try:
# Build event from entry & send req with many entries
event = v1.Event()
extensions = dict()
if entry.HasField('cloud_event') and entry.cloud_event:
ce = entry.cloud_event
event.SetEventType(ce.type)
event.SetEventID(ce.id)
event.SetSource(ce.source)
event.SetData(ce.data)
event.SetContentType(ce.data_content_type)
if ce.extensions:
for k, v in ce.extensions.items():
extensions[k] = v
event: Union[v1.Event, SubscriptionMessage]
if use_legacy_event:
event = self._bulk_entry_legacy_event(entry, request, invocation_metadata)
else:
event.SetEventID(entry_id)
event.SetData(entry.bytes if entry.HasField('bytes') else b'')
event.SetContentType(entry.content_type or '')
event.SetSubject(request.topic)
if entry.metadata:
for k, v in entry.metadata.items():
extensions[k] = v
for k, v in context.invocation_metadata():
extensions['_metadata_' + k] = v
if extensions:
event.SetExtensions(extensions)
event = self._bulk_entry_subscription_message(
entry, request, invocation_metadata
)

response = cb(event) # invoke app registered handler and send event
if isinstance(response, TopicEventResponse):
Expand All @@ -343,6 +345,70 @@ def _handle_bulk_topic_event(
)
return appcallback_v1.TopicEventBulkResponse(statuses=statuses)

def _bulk_entry_legacy_event(
self,
entry,
request: TopicEventBulkRequest,
invocation_metadata: Dict[str, str],
) -> v1.Event:
"""Builds the deprecated cloudevents v1.Event for a bulk entry."""
event = v1.Event()
extensions = dict()
if entry.HasField('cloud_event') and entry.cloud_event:
ce = entry.cloud_event
event.SetEventType(ce.type)
event.SetEventID(ce.id)
event.SetSource(ce.source)
event.SetData(ce.data)
event.SetContentType(ce.data_content_type)
if ce.extensions:
for k, v in ce.extensions.items():
extensions[k] = v
else:
event.SetEventID(entry.entry_id)
event.SetData(entry.bytes if entry.HasField('bytes') else b'')
event.SetContentType(entry.content_type or '')
event.SetSubject(request.topic)
if entry.metadata:
for k, v in entry.metadata.items():
extensions[k] = v
for k, v in invocation_metadata.items():
extensions['_metadata_' + k] = v
if extensions:
event.SetExtensions(extensions)
return event

def _bulk_entry_subscription_message(
self,
entry,
request: TopicEventBulkRequest,
invocation_metadata: Dict[str, str],
) -> SubscriptionMessage:
"""Builds a SubscriptionMessage for a bulk entry via a synthesized TopicEventRequest."""
if entry.HasField('cloud_event') and entry.cloud_event:
ce = entry.cloud_event
entry_request = TopicEventRequest(
id=ce.id,
source=ce.source,
type=ce.type,
spec_version=ce.spec_version,
data_content_type=ce.data_content_type,
data=ce.data,
topic=request.topic,
pubsub_name=request.pubsub_name,
extensions=ce.extensions,
)
else:
entry_request = TopicEventRequest(
id=entry.entry_id,
data=entry.bytes if entry.HasField('bytes') else b'',
data_content_type=entry.content_type or '',
topic=request.topic,
pubsub_name=request.pubsub_name,
)
metadata = {**invocation_metadata, **dict(entry.metadata)}
return SubscriptionMessage(entry_request, metadata)

def OnBulkTopicEvent(self, request: TopicEventBulkRequest, context):
"""Subscribes bulk events from Pubsub"""
response = self._handle_bulk_topic_event(request, context)
Expand Down
45 changes: 41 additions & 4 deletions dapr/ext/grpc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,37 @@
limitations under the License.
"""

import inspect
import warnings
from concurrent import futures
from typing import Dict, Optional
from typing import Callable, Dict, Optional, get_type_hints

import grpc

from dapr.common.pubsub.subscription import SubscriptionMessage
from dapr.conf import settings
from dapr.ext.grpc._health_servicer import _HealthCheckServicer # type: ignore
from dapr.ext.grpc._servicer import Rule, _CallbackServicer # type: ignore
from dapr.proto import appcallback_service_v1


def _wants_subscription_message(func: Callable) -> bool:
"""True if the handler's first parameter is annotated with SubscriptionMessage.

Falls back to False (legacy cloudevents delivery) whenever the signature or the
annotation cannot be resolved, so inference never breaks registration.
"""
try:
parameters = list(inspect.signature(func).parameters.values())
resolved_hints = get_type_hints(func)
except Exception:
return False
if not parameters:
return False
annotation = resolved_hints.get(parameters[0].name)
return isinstance(annotation, type) and issubclass(annotation, SubscriptionMessage)


class App:
"""App object implements a Dapr application callback which can interact with Dapr runtime.
Once its object is initiated, it will act as a central registry for service invocation,
Expand Down Expand Up @@ -162,13 +182,18 @@ def subscribe(
):
"""A decorator that is used to register the subscribing topic method.

The event type the handler receives is inferred from its annotation: annotate the
event parameter with :class:`dapr.ext.grpc.SubscriptionMessage` to receive that type.
Unannotated (or otherwise-annotated) handlers receive the deprecated
``cloudevents.sdk.event.v1.Event`` and trigger a :class:`DeprecationWarning`.

The below example registers 'topic' subscription topic and pass custom
metadata to pubsub component::

from cloudevents.sdk.event import v1
from dapr.ext.grpc import SubscriptionMessage

@app.subscribe('pubsub_name', 'topic', metadata=(('session-id', 'session-id-value'),))
def topic(event: v1.Event) -> None:
@app.subscribe('pubsub_name', 'topic', metadata={'session-id': 'session-id-value'})
def topic(event: SubscriptionMessage) -> None:
Comment thread
acroca marked this conversation as resolved.
...

Args:
Expand All @@ -180,6 +205,17 @@ def topic(event: v1.Event) -> None:
"""

def decorator(func):
handler_wants_subscription_message = _wants_subscription_message(func)
if not handler_wants_subscription_message:
warnings.warn(
'Topic handlers receive a deprecated cloudevents.sdk.event.v1.Event unless '
'their event parameter is annotated with dapr.ext.grpc.SubscriptionMessage. '
'Annotate the handler to adopt SubscriptionMessage and silence this warning; '
'a future release will deliver SubscriptionMessage to all handlers and drop '
'the cloudevents dependency.',
DeprecationWarning,
stacklevel=2,
)
self._servicer.register_topic(
pubsub_name,
topic,
Expand All @@ -188,6 +224,7 @@ def decorator(func):
dead_letter_topic,
rule,
disable_topic_validation,
legacy_cloudevent=not handler_wants_subscription_message,
)

return decorator
Expand Down
Loading
Loading