diff --git a/docs/api/config.rst b/docs/api/config.rst index 3db052d..d405eeb 100644 --- a/docs/api/config.rst +++ b/docs/api/config.rst @@ -5,7 +5,7 @@ Config Config - Shared =============== -.. automodule:: intersect_sdk.config.shared +.. automodule:: intersect_sdk_common.config :members: :undoc-members: :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/constants.rst b/docs/api/constants.rst index 0da9538..f369885 100644 --- a/docs/api/constants.rst +++ b/docs/api/constants.rst @@ -1,6 +1,6 @@ Constants ========= -.. automodule:: intersect_sdk.constants +.. automodule:: intersect_sdk_common.constants :members: :undoc-members: diff --git a/docs/api/core_definitions.rst b/docs/api/core_definitions.rst index 481e438..1422117 100644 --- a/docs/api/core_definitions.rst +++ b/docs/api/core_definitions.rst @@ -1,7 +1,7 @@ Core Definitions ================ -.. automodule:: intersect_sdk.core_definitions +.. automodule:: intersect_sdk_common.core_definitions :members: :undoc-members: :exclude-members: model_computed_fields, model_config, model_fields diff --git a/pyproject.toml b/pyproject.toml index 625f8d4..5d9c996 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,17 +2,17 @@ name = "intersect-sdk" description = "Python SDK to interact with INTERSECT" authors = [ - { name = "Lance Drane", email = "dranelt@ornl.gov" }, - { name = "Marshall McDonnell", email = "mcdonnellmt@ornl.gov" }, - { name = "Seth Hitefield", email = "hitefieldsd@ornl.gov" }, - { name = "Andrew Ayres", email = "ayresaf@ornl.gov" }, - { name = "Gregory Cage", email = "cagege@ornl.gov" }, - { name = "Jesse McGaha", email = "mcgahajr@ornl.gov" }, - { name = "Robert Smith", email = "smithrw@ornl.gov" }, - { name = "Gavin Wiggins", email = "wigginsg@ornl.gov" }, - { name = "Michael Brim", email = "brimmj@ornl.gov" }, - { name = "Rick Archibald", email = "archibaldrk@ornl.gov" }, - { name = "Addi Malviya Thakur", email = "malviyaa@ornl.gov" }, + { name = "Lance Drane", email = "dranelt@ornl.gov" }, + { name = "Marshall McDonnell", email = "mcdonnellmt@ornl.gov" }, + { name = "Seth Hitefield", email = "hitefieldsd@ornl.gov" }, + { name = "Andrew Ayres", email = "ayresaf@ornl.gov" }, + { name = "Gregory Cage", email = "cagege@ornl.gov" }, + { name = "Jesse McGaha", email = "mcgahajr@ornl.gov" }, + { name = "Robert Smith", email = "smithrw@ornl.gov" }, + { name = "Gavin Wiggins", email = "wigginsg@ornl.gov" }, + { name = "Michael Brim", email = "brimmj@ornl.gov" }, + { name = "Rick Archibald", email = "archibaldrk@ornl.gov" }, + { name = "Addi Malviya Thakur", email = "malviyaa@ornl.gov" }, ] version = "0.9.0" readme = "README.md" @@ -20,21 +20,18 @@ license = { text = "BSD-3-Clause" } requires-python = ">=3.10,<4.0" keywords = ["intersect"] classifiers = [ - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ] dependencies = [ - "pydantic>=2.7.0", - "retrying>=1.3.4,<2.0.0", - "paho-mqtt>=2.1.0,<3.0.0", - "pika>=1.3.2,<2.0.0", - "minio>=7.2.3", - "jsonschema[format-nongpl]>=4.21.1", # extras necessary for enforcing formats - #"brotli>=1.1.0", # TODO - add this dependency when we add compression - "psutil>=7.0.0", + "pydantic>=2.7.0", + "intersect-sdk-common>=0.9.5,<0.10.0", + "jsonschema[format-nongpl]>=4.21.1", # extras necessary for enforcing formats + #"brotli>=1.1.0", # TODO - add this dependency when we add compression + "psutil>=7.0.0", ] [project.urls] @@ -44,20 +41,17 @@ Documentation = "https://intersect-python-sdk.readthedocs.io/en/latest/" Issues = "https://github.com/INTERSECT-SDK/python-sdk/issues" [project.optional-dependencies] -docs = [ - "sphinx>=5.3.0", - "furo>=2023.3.27", -] +docs = ["sphinx>=5.3.0", "furo>=2023.3.27"] [dependency-groups] dev = [ - "pre-commit>=3.3.1", - "ruff==0.12.7", - "mypy>=1.10.0", - "codespell>=2.3.0", - "pytest>=7.3.2", - "pytest-cov>=4.1.0", - "httpretty>=1.1.4" + "pre-commit>=3.3.1", + "ruff==0.12.7", + "mypy>=1.10.0", + "codespell>=2.3.0", + "pytest>=7.3.2", + "pytest-cov>=4.1.0", + "httpretty>=1.1.4", ] [build-system] @@ -76,88 +70,94 @@ mccabe = { max-complexity = 20 } pylint = { max-args = 10, max-branches = 20, max-returns = 15, max-statements = 75 } # pyflakes and the relevant pycodestyle rules are already configured extend-select = [ - 'C90', # mccabe complexity - 'I', # isort - 'N', # pep8-naming - 'D', # pydocstyle - 'UP', # pyupgrade - 'YTT', # flake8-2020 - 'ANN', # flake8-annotations - 'ASYNC', # flake8-async - 'S', # flake8-bandit - 'BLE', # flake8-blind-except - 'B', # flake8-bugbear - 'A', # flake8-builtins - 'COM', # flake8-commas - 'C4', # flake8-comprehensions - 'DTZ', # flake8-datetimez - 'T10', # flake8-debugger - 'EM', # flake8-error-message - 'FA', # flake8-future-annotations - 'ISC', # flake8-implicit-string-concat - 'ICN', # flake8-import-conventions - 'G', # flake8-logging-format - 'INP', # flake8-no-pep420 - 'PIE', # flake8-PIE - 'T20', # flake8-T20 - 'PYI', # flake8-pyi - 'PT', # flake8-pytest-style - 'Q', # flake8-quotes - 'RSE', # flake8-raise - 'RET', # flake8-return - 'SLF', # flake8-self - 'SLOT', # flake8-slots - 'SIM', # flake8-simplify - 'TC', # flake8-type-checking - 'ARG', # flake8-unused-arguments - 'PTH', # flake8-use-pathlib - 'PGH', # pygrep-hooks - 'PL', # pylint - 'TRY', # tryceratops - 'FLY', # flynt - 'RUF', # RUFF additional rules - 'INT', # flake8-gettext + 'C90', # mccabe complexity + 'I', # isort + 'N', # pep8-naming + 'D', # pydocstyle + 'UP', # pyupgrade + 'YTT', # flake8-2020 + 'ANN', # flake8-annotations + 'ASYNC', # flake8-async + 'S', # flake8-bandit + 'BLE', # flake8-blind-except + 'B', # flake8-bugbear + 'A', # flake8-builtins + 'COM', # flake8-commas + 'C4', # flake8-comprehensions + 'DTZ', # flake8-datetimez + 'T10', # flake8-debugger + 'EM', # flake8-error-message + 'FA', # flake8-future-annotations + 'ISC', # flake8-implicit-string-concat + 'ICN', # flake8-import-conventions + 'G', # flake8-logging-format + 'INP', # flake8-no-pep420 + 'PIE', # flake8-PIE + 'T20', # flake8-T20 + 'PYI', # flake8-pyi + 'PT', # flake8-pytest-style + 'Q', # flake8-quotes + 'RSE', # flake8-raise + 'RET', # flake8-return + 'SLF', # flake8-self + 'SLOT', # flake8-slots + 'SIM', # flake8-simplify + 'TC', # flake8-type-checking + 'ARG', # flake8-unused-arguments + 'PTH', # flake8-use-pathlib + 'PGH', # pygrep-hooks + 'PL', # pylint + 'TRY', # tryceratops + 'FLY', # flynt + 'RUF', # RUFF additional rules + 'INT', # flake8-gettext ] # If you're seeking to disable a rule, first consider whether the rule is overbearing, or if it should only be turned off for your usecase. ignore = [ - 'COM812', # formatter, handled by Ruff format - 'ISC001', # formatter, handled by Ruff format - 'SIM105', # "with contextlib.suppress():" is slower than try-except-pass - 'ANN401', # allow explicit "Any" typing, use with care - 'PLR2004', # allow "magic numbers" + 'COM812', # formatter, handled by Ruff format + 'ISC001', # formatter, handled by Ruff format + 'SIM105', # "with contextlib.suppress():" is slower than try-except-pass + 'ANN401', # allow explicit "Any" typing, use with care + 'PLR2004', # allow "magic numbers" ] [tool.ruff.lint.flake8-type-checking] -runtime-evaluated-base-classes = ["pydantic.BaseModel", "intersect_sdk.IntersectBaseCapabilityImplementation"] -runtime-evaluated-decorators = ["pydantic.dataclasses.dataclass","pydantic.validate_call"] +runtime-evaluated-base-classes = [ + "pydantic.BaseModel", + "intersect_sdk.IntersectBaseCapabilityImplementation", +] +runtime-evaluated-decorators = [ + "pydantic.dataclasses.dataclass", + "pydantic.validate_call", +] [tool.ruff.lint.extend-per-file-ignores] '__init__.py' = [ - 'F401', # __init__.py commonly has unused imports - 'TC004', # do lazy imports when importing from the base module + 'F401', # __init__.py commonly has unused imports + 'TC004', # do lazy imports when importing from the base module ] 'docs/*' = [ - 'D', # the documentation folder does not need documentation - 'INP001', # docs are not a namespace package + 'D', # the documentation folder does not need documentation + 'INP001', # docs are not a namespace package ] 'examples/*' = [ - 'N999', # module names for examples are not standard - 'T20', # allow print/pprint statements in examples - 'S106', # don't care about credentials in examples - 'D100', # documenting modules in examples is unhelpful - 'D104', # documenting packages in examples is unhelpful - 'TRY002', # examples can raise their own exception + 'N999', # module names for examples are not standard + 'T20', # allow print/pprint statements in examples + 'S106', # don't care about credentials in examples + 'D100', # documenting modules in examples is unhelpful + 'D104', # documenting packages in examples is unhelpful + 'TRY002', # examples can raise their own exception ] 'tests/*' = [ - 'S101', # allow assert statements in tests - 'S106', # don't care about credentials in tests - 'S311', # don't care about cryptographic security in tests - 'SLF001', # allow private member access in tests - 'ANN', # tests in general don't need types, unless they are runtime types. - 'ARG', # allow unused parameters in tests - 'D', # ignore documentation in tests - 'RUF012', # permit "mutable" class attributes to not be annotated with typing.ClassVar (these shouldn't be mutated anyways...) - ] + 'S101', # allow assert statements in tests + 'S106', # don't care about credentials in tests + 'S311', # don't care about cryptographic security in tests + 'SLF001', # allow private member access in tests + 'ANN', # tests in general don't need types, unless they are runtime types. + 'ARG', # allow unused parameters in tests + 'D', # ignore documentation in tests + 'RUF012', # permit "mutable" class attributes to not be annotated with typing.ClassVar (these shouldn't be mutated anyways...) +] # see https://mypy.readthedocs.io/en/stable/config_file.html for a complete reference [tool.mypy] @@ -178,8 +178,8 @@ addopts = "-ra" [tool.coverage.report] omit = [ - '*__init__*', # __init__ files should just re-export other classes and functions - '*/discovery_service.py', # currently unused + '*__init__*', # __init__ files should just re-export other classes and functions + '*/discovery_service.py', # currently unused ] exclude_also = [ 'pragma: no-cover', # standard diff --git a/src/intersect_sdk/__init__.py b/src/intersect_sdk/__init__.py index f512d93..a37564e 100644 --- a/src/intersect_sdk/__init__.py +++ b/src/intersect_sdk/__init__.py @@ -12,6 +12,16 @@ # import everything eagerly for IDEs/LSPs if TYPE_CHECKING: + from intersect_sdk_common import ( + ControlPlaneConfig, + ControlProvider, + DataStoreConfig, + DataStoreConfigMap, + HierarchyConfig, + IntersectDataHandler, + IntersectMimeType, + ) + from .app_lifecycle import default_intersect_lifecycle_loop from .capability.base import IntersectBaseCapabilityImplementation from .client import IntersectClient @@ -22,14 +32,6 @@ ) from .config.client import IntersectClientConfig from .config.service import IntersectServiceConfig - from .config.shared import ( - ControlPlaneConfig, - ControlProvider, - DataStoreConfig, - DataStoreConfigMap, - HierarchyConfig, - ) - from .core_definitions import IntersectDataHandler, IntersectMimeType from .exceptions import IntersectCapabilityError from .schema import get_schema_from_capability_implementations from .service import IntersectService @@ -82,44 +84,71 @@ ) # PEP 562 stuff: do lazy imports for people who just want to import from the top-level module - +# [0] = package, [1] = path to module within package __lazy_imports = { - 'INTERSECT_CLIENT_EVENT_CALLBACK_TYPE': '.client_callback_definitions', - 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE': '.client_callback_definitions', - 'INTERSECT_JSON_VALUE': '.shared_callback_definitions', - 'INTERSECT_RESPONSE_VALUE': '.shared_callback_definitions', - 'INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE': '.service_callback_definitions', - 'ControlPlaneConfig': '.config.shared', - 'ControlProvider': '.config.shared', - 'DataStoreConfig': '.config.shared', - 'DataStoreConfigMap': '.config.shared', - 'HierarchyConfig': '.config.shared', - 'IntersectBaseCapabilityImplementation': '.capability.base', - 'IntersectCapabilityError': '.exceptions', - 'IntersectClient': '.client', - 'IntersectClientCallback': '.client_callback_definitions', - 'IntersectClientConfig': '.config.client', - 'IntersectDataHandler': '.core_definitions', - 'IntersectDirectMessageParams': '.shared_callback_definitions', - 'IntersectEventDefinition': '.service_definitions', - 'IntersectEventMessageParams': '.shared_callback_definitions', - 'IntersectMimeType': '.core_definitions', - 'IntersectService': '.service', - 'IntersectServiceConfig': '.config.service', - '__version__': '.version', - 'default_intersect_lifecycle_loop': '.app_lifecycle', - 'get_schema_from_capability_implementations': '.schema', - 'intersect_message': '.service_definitions', - 'intersect_status': '.service_definitions', - 'version_info': '.version', - 'version_string': '.version', + # COMMON: core types + 'IntersectDataHandler': ('intersect_sdk_common', '.'), + 'IntersectMimeType': ('intersect_sdk_common', '.'), + # COMMON: config types + 'ControlPlaneConfig': ('intersect_sdk_common', '.'), + 'ControlProvider': ('intersect_sdk_common', '.'), + 'DataStoreConfig': ('intersect_sdk_common', '.'), + 'DataStoreConfigMap': ('intersect_sdk_common', '.'), + 'HierarchyConfig': ('intersect_sdk_common', '.'), + # imports not in common + 'INTERSECT_CLIENT_EVENT_CALLBACK_TYPE': ( + __spec__.parent, + '.client_callback_definitions', + ), + 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE': ( + __spec__.parent, + '.client_callback_definitions', + ), + 'INTERSECT_JSON_VALUE': (__spec__.parent, '.shared_callback_definitions'), + 'INTERSECT_RESPONSE_VALUE': ( + __spec__.parent, + '.shared_callback_definitions', + ), + 'INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE': ( + __spec__.parent, + '.service_callback_definitions', + ), + 'IntersectBaseCapabilityImplementation': ( + __spec__.parent, + '.capability.base', + ), + 'IntersectCapabilityError': (__spec__.parent, '.exceptions'), + 'IntersectClient': (__spec__.parent, '.client'), + 'IntersectClientCallback': ( + __spec__.parent, + '.client_callback_definitions', + ), + 'IntersectClientConfig': (__spec__.parent, '.config.client'), + 'IntersectDirectMessageParams': ( + __spec__.parent, + '.shared_callback_definitions', + ), + 'IntersectEventDefinition': (__spec__.parent, '.service_definitions'), + 'IntersectEventMessageParams': ( + __spec__.parent, + '.shared_callback_definitions', + ), + 'IntersectService': (__spec__.parent, '.service'), + 'IntersectServiceConfig': (__spec__.parent, '.config.service'), + '__version__': (__spec__.parent, '.version'), + 'default_intersect_lifecycle_loop': (__spec__.parent, '.app_lifecycle'), + 'get_schema_from_capability_implementations': (__spec__.parent, '.schema'), + 'intersect_message': (__spec__.parent, '.service_definitions'), + 'intersect_status': (__spec__.parent, '.service_definitions'), + 'version_info': (__spec__.parent, '.version'), + 'version_string': (__spec__.parent, '.version'), } def __getattr__(attr_name: str) -> object: attr_module = __lazy_imports.get(attr_name) if attr_module: - module = import_module(attr_module, package=__spec__.parent) + module = import_module(attr_module[1], package=attr_module[0]) return getattr(module, attr_name) msg = f'module {__name__!r} has no attribute {attr_name!r}' diff --git a/src/intersect_sdk/_internal/control_plane/__init__.py b/src/intersect_sdk/_internal/control_plane/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/intersect_sdk/_internal/control_plane/brokers/__init__.py b/src/intersect_sdk/_internal/control_plane/brokers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py b/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py deleted file mode 100644 index 6e8492f..0000000 --- a/src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py +++ /dev/null @@ -1,627 +0,0 @@ -"""This module handles ALL AMQP protocol logic in INTERSECT. We seek to entirely abstract protocols away from users. - -This is a very specific pub-sub model which assumes a single topic exchange. -AMQP topics in INTERSECT generally look like ${ORGANIZATION}.${HIERARCHY}.${SYSTEM}.${SUBSYSTEM}.${SERVICE}.${MESSAGE_TYPE} . -MESSAGE_TYPE refers to INTERSECT domain messages - we do not allow users to determine their own message types directly, and every message has a message type. -SERVICE refers to a specific application. -SYSTEM is generally the level where Auth should occur, and where you should configure access control on the broker itself. -""" - -from __future__ import annotations - -import functools -import threading -from hashlib import sha384 -from typing import TYPE_CHECKING - -import pika -import pika.delivery_mode -import pika.exceptions -import pika.frame - -from ...logger import logger -from ...multi_flag_thread_event import MultiFlagThreadEvent -from .broker_client import BrokerClient - -if TYPE_CHECKING: - from collections.abc import Callable - - from pika.channel import Channel - from pika.frame import Frame - from pika.spec import Basic, BasicProperties - - from ..definitions import MessageCallback - from ..topic_handler import TopicHandler - - -_AMQP_MAX_RETRIES = 10 - -_PREFETCH_COUNT = 1 -"""This determines the maximum amount of messages a single consumer will handle. The consumer must acknowledge the message before it gets another message.""" - -# Note that we deliberately do NOT want this configurable at runtime. Any two INTERSECT services/clients could potentially exchange messages between one another. -_INTERSECT_MESSAGE_EXCHANGE = 'intersect-messages' -"""All INTERSECT messages get published to one exchange on the broker.""" - - -class ConsumerTagInfo: - def __init__(self) -> None: - self.consumer_tag = '' - self.consumer_tag_event = threading.Event() - - def obtain_consumer_tag(self, consumer_tag: str) -> None: - self.consumer_tag = consumer_tag - self.consumer_tag_event.set() - - def consumer_tag_obtained(self) -> bool: - return self.consumer_tag_event.is_set() - - def wait(self, time: float) -> None: - self.consumer_tag_event.wait(time) - - def __repr__(self) -> str: - return f'{self.consumer_tag} -- OBTAINED: {self.consumer_tag_obtained()}' - - -def _get_queue_name(routing_key: str) -> str: - """Generate a valid queue name from the routing key. - - We want to always be able to generate the same queue name from the routing key every time, - so we don't use UUIDs or want the broker to generate a key name. - - We must also keep the length under 128 characters. - - See https://www.rabbitmq.com/docs/queues#names for a complete reference. - """ - return sha384(routing_key.encode()).hexdigest() - - -# TODO we should be handling hierarchy parts as a list of strings until they get to the client -# this will be a breaking change, so only add it when ready to break -def _hierarchy_2_amqp(hierarchy: str) -> str: - """Take the hierarchy string format saved in the Service and map it to the AMQP topic format.""" - return hierarchy.replace('/', '.') - - -# TODO see above -def _amqp_2_hierarchy(amqp_routing_key: str) -> str: - """Convert AMQP topic formats to how we store a key in the ControlPlaneManager.""" - return amqp_routing_key.replace('.', '/') - - -class AMQPClient(BrokerClient): - """Client for performing broker actions backed by a AMQP broker. - - NOTE: Currently, thread safety has been attempted, but may not be guaranteed - - Attributes: - id: A string representation of the client's UUID. - _connection_params: connection information to the AMQP broker (includes credentials) - _publish_connection: AMQP connection dedicated to publishing messages - _consume_connection: AMQP connection dedicated to consuming messages - _topics_to_handlers: Dictionary of string topic names to lists of - Callables to invoke for messages on that topic. - """ - - def __init__( - self, - host: str, - port: int, - username: str, - password: str, - topics_to_handlers: Callable[[], dict[str, TopicHandler]], - ) -> None: - """The default constructor. - - Args: - host: String for hostname of AMQP broker - port: port number of AMQP broker - username: username credentials for AMQP broker - password: password credentials for AMQP broker - topics_to_handlers: callback function which gets the topic to handler map from the channel manager - """ - self._connection_params = pika.ConnectionParameters( - host=host, - port=port, - virtual_host='/', - credentials=pika.PlainCredentials(username, password), - connection_attempts=3, - # if not specified, this value is obtained by the broker. RabbitMQ sets it to 60s by default - heartbeat=10, - blocked_connection_timeout=5.0, - retry_delay=0.5, - ) - - # The pika connection to the broker - self._connection: pika.adapters.SelectConnection = None - self._channel_in: Channel = None - self._channel_out: Channel = None - - self._thread: threading.Thread | None = None - - # Callback to the topics_to_handler list inside of - self._topics_to_handlers = topics_to_handlers - # mapping of topics to callables which can unsubscribe from the topic - self._topics_to_consumer_tags: dict[str, ConsumerTagInfo] = {} - self._consumer_tags_to_threads: dict[str, threading.Thread] = {} - - self._should_disconnect = False - self._connection_retries = 0 - self._unrecoverable = False - # tracking both channels is the best way to handle continuations - self._channel_flags = MultiFlagThreadEvent(2) - - def connect(self) -> None: - """Connect to the defined broker. - - Try to connect to the broker, performing exponential backoff if connection fails. - """ - self._should_disconnect = False - self._channel_flags.unset_all() - - if not self._thread or not self._thread.is_alive(): - self._thread = threading.Thread(target=self._init_connection, daemon=True) - self._thread.start() - - while not self._channel_flags.is_set(): - self._channel_flags.wait(1.0) - - def disconnect(self) -> None: - """Close all connections.""" - self._should_disconnect = True - for topic, tag_info in self._topics_to_consumer_tags.items(): - self._cancel_consumer_tag(topic, tag_info.consumer_tag) - - # since _should_disconnect was set, _connection.ioloop.stop() will now execute after explicit connection close - self._connection.close() - - if self._thread: - # If gracefully shutting down, we should finish up the current job. - self._thread.join(5 if self.considered_unrecoverable() else None) - self._thread = None - - def is_connected(self) -> bool: - """Check if there is an active connection to the broker. - - Returns: - A boolean. True if there is a connection, False if not. - """ - # We are connected to the broker if either the publish or consume connections is open - return self._connection is not None and ( - not self._connection.is_closed or not self._connection.is_closing - ) - - def considered_unrecoverable(self) -> bool: - return self._unrecoverable - - def publish( - self, topic: str, payload: bytes, content_type: str, headers: dict[str, str], persist: bool - ) -> None: - """Publish the given message. - - Publish payload with the pre-existing connection (via connect()) on topic. - - Args: - topic: The topic on which to publish the message as a string - payload: The message to publish, as raw bytes. - content_type: The content type of the message (if the data plane used is the control plane itself), or the value to be retrieved from the data plane (if the message handler is MINIO/etc.) - headers: UTF-8 dictionary which can help parse information about the message - persist: True if message should persist until consumers available, False if message should be removed immediately. - """ - topic = _hierarchy_2_amqp(topic) - while not self._channel_flags.is_set(): - self._channel_flags.wait(1.0) - if self._connection and self._connection.is_open: - self._channel_out.basic_publish( - exchange=_INTERSECT_MESSAGE_EXCHANGE, - routing_key=topic, - body=payload, - properties=pika.BasicProperties( - content_type=content_type, - headers=headers, - delivery_mode=pika.delivery_mode.DeliveryMode.Persistent - if persist - else pika.delivery_mode.DeliveryMode.Transient, - ), - ) - else: - logger.error('Unable to publish message %s on topic %s', payload, topic) - - def subscribe(self, topic: str, persist: bool) -> None: - """Subscribe to a topic. - - topic: system-of-system hierarchy. In AMQP parlance this gets translated to the routing key. - persist: If True, we will create an idempotent queue name which should persist - even on broker or application shutdown. If False, we will allow the server to create a unique - queue name, and the queue will be destroyed once the associated channel is closed. - - """ - topic = _hierarchy_2_amqp(topic) - cb = functools.partial( - self._create_queue, channel=self._channel_in, topic=topic, persist=persist - ) - self._connection.ioloop.add_callback_threadsafe(cb) - - def unsubscribe(self, topic: str) -> None: - """Stop consuming from a topic. - - With INTERSECT's AMQP configuration, each queue will only have one consumer. - Therefore, transient queues will be cleaned up. - """ - amqp_topic = _hierarchy_2_amqp(topic) - consumer_tag_info = self._topics_to_consumer_tags.get(amqp_topic, None) - if consumer_tag_info: - self._cancel_consumer_tag(amqp_topic, consumer_tag_info.consumer_tag) - - def _cancel_consumer_tag(self, topic: str, consumer_tag: str) -> None: - if self._channel_in and self._channel_in.is_open: - cb = functools.partial( - self._cancel_consumer_tag_cb, topic=topic, consumer_tag=consumer_tag - ) - self._channel_in.basic_cancel( - consumer_tag, - callback=cb, - ) - - def _cancel_consumer_tag_cb( - self, _frame: pika.frame.Frame, topic: str, consumer_tag: str - ) -> None: - try: - del self._topics_to_consumer_tags[topic] - except KeyError: - # shouldn't happen because ControlPlaneManager gatekeeps consecutive remove_subscription_channel() calls - logger.error( - 'Unable to clean up consumer tag related to topic %s , please inform an INTERSECT-SDK developer if you somehow see this message.', - topic, - ) - logger.info('Unsubscribed from %s', topic) - try: - thread = self._consumer_tags_to_threads[consumer_tag] - # kill thread immediately if not recoverable, wait to send last message if we are shutting down gracefully - thread.join(0 if self.considered_unrecoverable() else None) - del self._consumer_tags_to_threads[consumer_tag] - logger.debug('Consumer cancelled') - except KeyError: - # This will commonly be encountered, as several topics will often share a single consumer - pass - - # BEGIN CALLBACKS + THREADSAFE FUNCTIONS # - - def _init_connection(self) -> None: - """Open the consuming connection and start its io loop. - - NOTE: ANY functions which are not eventually called from this function - should be called via self._connection.ioloop.add_callback_threadsafe(cb) - """ - self._connection = None - self._channel_in = None - self._channel_out = None - - while not self._should_disconnect and not self.considered_unrecoverable(): - if not self._connection or self._connection.is_closed: - self._connection = pika.adapters.SelectConnection( - parameters=self._connection_params, - on_close_callback=self._on_connection_closed, - on_open_error_callback=self._on_connection_open_error, - on_open_callback=self._on_connection_open, - ) - - # Loops forever until ioloop.stop is called WHEN self._should_disconnect is True - self._connection.ioloop.start() - - def _on_connection_closed(self, connection: pika.SelectConnection, reason: Exception) -> None: - """This method is called if the connection to RabbitMQ closes.""" - if self._should_disconnect: - connection.ioloop.stop() - else: - logger.warning('Connection closed, reopening in 5 seconds: %s', reason) - connection.ioloop.call_later(5, connection.ioloop.stop) - self._channel_flags.unset_all() - self._channel_out = None - self._channel_in = None - - def _on_connection_open_error( - self, connection: pika.SelectConnection, err: pika.exceptions.AMQPConnectionError - ) -> None: - """This gets called if the connection to RabbitMQ can't be established. - - This function usually implies a misconfiguration in the application config. - """ - self._connection_retries += 1 - logger.error( - f'On connect error received (probable broker config error), have tried {self._connection_retries} times' - ) - logger.error(err) - if self._connection_retries >= _AMQP_MAX_RETRIES: - # This will allow us to break out of the while loop - # where we establish the connection, as ioloop.stop - # will now stop the thread for good - logger.error('Giving up AMQP reconnection attempt') - self._should_disconnect = True - self._unrecoverable = True - self._channel_flags.set_all() - connection.ioloop.stop() - else: - logger.error('Reopening in 5 seconds') - connection.ioloop.call_later(5, connection.ioloop.stop) - - def _on_connection_open(self, connection: pika.SelectConnection) -> None: - logger.info('AMQP connection open') - self._connection_retries = 0 - self._topics_to_consumer_tags.clear() - connection.channel(on_open_callback=self._on_input_channel_open) - connection.channel(on_open_callback=self._on_output_channel_open) - - def _on_channel_closed( - self, - channel: Channel, - exception: pika.exceptions.ChannelClosed, - channel_num: int, - ) -> None: - self._channel_flags.unset_nth_flag(channel_num) - if self._connection.is_open: - # This should rarely happen in practice, should only happen if you attempt to do something which violates the protocol. - logger.error( - 'Closing connection due to closed channel %s, please check the usage of the SDK or your configuration. Exception: %s', - channel, - str(exception), - ) - self._connection.close(reply_code=exception.reply_code, reply_text=exception.reply_text) - - # PRODUCER # - def _on_output_channel_open(self, channel: Channel) -> None: - channel_num = 0 - self._channel_out = channel - cb = functools.partial(self._on_channel_closed, channel_num=channel_num) - self._channel_out.add_on_close_callback(cb) - # producer flag should first make sure the exchange exists before publishing - channel.exchange_declare( - exchange=_INTERSECT_MESSAGE_EXCHANGE, - exchange_type='topic', - durable=True, - callback=lambda _frame: self._channel_flags.set_nth_flag(channel_num), - ) - logger.info('AMQP: output channel ready') - - # CONSUMER # - def _on_input_channel_open(self, channel: Channel) -> None: - channel_num = 1 - self._channel_in = channel - # consumer channel flag can be set immediately - self._channel_flags.set_nth_flag(channel_num) - cb_1 = functools.partial(self._on_channel_closed, channel_num=channel_num) - self._channel_in.add_on_close_callback(cb_1) - cb_2 = functools.partial(self._on_exchange_declareok, channel=channel) - channel.exchange_declare( - exchange=_INTERSECT_MESSAGE_EXCHANGE, exchange_type='topic', durable=True, callback=cb_2 - ) - - def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None: - """Create a queue on the broker (called from AMQP). - - After verifying that the exchange exists, we can now proceed to execute - "initial subscriptions". - - Args: - _unused_frame: response from declaring the exchange on the broker (irrelevant). - channel: The Channel being instantiated. - """ - for topic, topic_handler in self._topics_to_handlers().items(): - amqp_topic = _hierarchy_2_amqp(topic) - cb = functools.partial( - self._create_queue, - channel=channel, - topic=amqp_topic, - persist=topic_handler.topic_persist, - ) - self._connection.ioloop.add_callback_threadsafe(cb) - - def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None: - """Create a queue on the broker. - - This can be called directly from the AMQP Client if the subscribed connection already has a Channel it's listening to. - - Args: - channel: The Channel being instantiated. - topic: The string name for the Channel on the broker. - persist: boolean value to determine how we manage the queue. - If True, this queue will persist forever, even on application or broker shutdown, and we need a persistent name. - If False, we will generate a temporary queue using the broker's naming scheme. - """ - cb = functools.partial( - self._on_queue_declareok, channel=channel, topic=topic, persist=persist - ) - channel.queue_declare( - queue=_get_queue_name(topic) - if persist - else '', # if we're transient, let the broker generate a name for us - durable=persist, - exclusive=not persist, # transient queues can be exclusive - callback=cb, - ) - - def _on_queue_declareok( - self, frame: Frame, channel: Channel, topic: str, persist: bool - ) -> None: - """Begins listening on the given queue. - - Used as a listener on queue declaration. - - Args: - frame: Response from the queue declare we sent to the AMQP broker. We get the queue name from this. - channel: The Channel being instantiated. - topic: The string name for the Channel on the broker. - persist: Whether or not our queue should persist on either broker or application shutdown. - """ - queue_name = frame.method.queue - cb = functools.partial( - self._on_queue_bindok, - channel=channel, - topic=topic, - queue_name=queue_name, - persist=persist, - ) - channel.queue_bind( - queue=queue_name, - exchange=_INTERSECT_MESSAGE_EXCHANGE, - routing_key=topic, - callback=cb, - ) - - def _on_queue_bindok( - self, - _unused_frame: Frame, - channel: Channel, - topic: str, - queue_name: str, - persist: bool, - ) -> None: - """Sets up basic QOS on the current channel. - - Args: - _unused_frame: AMQP response from binding to the queue. Ignored. - channel: The Channel being instantiated. - topic: Name of the topic on the broker. - queue_name: The name of the queue on the AMQP broker. - persist: Whether or not our queue should persist on either broker or application shutdown. - """ - cb = functools.partial( - self._on_basic_qos_set, - channel=channel, - topic=topic, - queue_name=queue_name, - persist=persist, - ) - channel.basic_qos(prefetch_count=_PREFETCH_COUNT, callback=cb) - - def _on_basic_qos_set( - self, - _unused_frame: Frame, - channel: Channel, - topic: str, - queue_name: str, - persist: bool, - ) -> None: - """Consumes a message from the given channel. - - Used as a listener on queue binding. - - Args: - _unused_frame: AMQP response from binding to the queue. Ignored. - channel: The Channel being instantiated. - topic: Name of the topic on the broker. - queue_name: The name of the queue on the AMQP broker. - persist: Whether or not our queue should persist on either broker or application shutdown. - """ - cb = functools.partial(self._on_consume_ok, topic=topic) - message_cb = functools.partial(self._consume_message, persist=persist) - info = ConsumerTagInfo() - self._topics_to_consumer_tags[topic] = info - consumer_tag = channel.basic_consume( - queue=queue_name, - auto_ack=not persist, # persistent messages should be manually acked and we have no reason to NACK a message for now - on_message_callback=message_cb, - callback=cb, - ) - info.obtain_consumer_tag(consumer_tag) - - def _on_consume_ok(self, _unused_frame: Frame, topic: str) -> None: - """Sets the consume subscription ready event. - - Used as a listener on consuming an initial message on a channel. - - Args: - _unused_frame: AMQP response from successfully beginning consumption. Ignored. - topic: Name of the topic on the broker. - """ - logger.info('ready to start consuming to %s', topic) - - def _consume_message( - self, - channel: Channel, - basic_deliver: Basic.Deliver, - properties: BasicProperties, - body: bytes, - persist: bool, - ) -> None: - """Handles incoming messages and acknowledges them ONLY after code executes on the domain side. - - Looks up all handlers for the topic and delegates message handling to them. - The handlers comprise the Service/Client logic, which includes all domain science logic. - - Args: - channel: The AMQP channel the message was received on. Used to manually acknowledge messages. - basic_deliver: Contains internal AMQP delivery information - i.e. the routing key. - properties: Object from the AMQP call. Contains various metadata. - body: the AMQP message to be handled. - persist: Whether or not our queue should persist on either broker or application shutdown. - """ - # if we got a message when we shouldn't, quickly try to requeue it - if self.considered_unrecoverable() or self._should_disconnect: - logger.warning( - "WARNING: A message for topic %s has been received when it shouldn't, attempting requeue" - ) - channel.basic_reject(basic_deliver.delivery_tag) - return - - # make sure that we have a content-type and headers, note that this does not publish a "reply" message if we fail here - content_type = properties.content_type - if not content_type: - logger.error('Missing message content type') - channel.basic_ack(basic_deliver.delivery_tag) - return - headers = properties.headers - if not headers: - logger.error('Missing message headers') - channel.basic_ack(basic_deliver.delivery_tag) - return - - tth_key = _amqp_2_hierarchy(basic_deliver.routing_key) - topic_handler = self._topics_to_handlers().get(tth_key) - if topic_handler: - consumer_tag_info = self._topics_to_consumer_tags.get(basic_deliver.routing_key) - if not consumer_tag_info: - logger.error( - 'Could not fetch consumer tag for topic %s, please inform an INTERSECT-SDK developer that you saw this message', - tth_key, - ) - return - while not consumer_tag_info.consumer_tag_obtained(): - consumer_tag_info.wait(1.0) - thrd = threading.Thread( - target=self._consume_message_subthread, - args=( - channel, - topic_handler.callbacks, - body, - content_type, - headers, - basic_deliver.delivery_tag, - persist, - ), - ) - self._consumer_tags_to_threads[consumer_tag_info.consumer_tag] = thrd - thrd.start() - elif persist: - # we somehow got a message that we don't want, discard it - channel.basic_ack(basic_deliver.delivery_tag) - - def _consume_message_subthread( - self, - channel: Channel, - callbacks: set[MessageCallback], - body: bytes, - content_type: str, - headers: dict[str, str], - delivery_tag: int, - persist: bool, - ) -> None: - """This is a subthread which executes the consumer code without blocking the IO loop. Without using a subthread, the AMQP heartbeat checker will be blocked.""" - for cb in callbacks: - cb(body, content_type, headers) - # With persistent messages, we only acknowledge the message AFTER we are done processing - # (this removes the message from the broker queue) - # this allows us to retry a message if the broker OR this application goes down - # We currently never NACK or reject a message because in INTERSECT, applications currently never "share" a queue. - if persist: - channel.basic_ack(delivery_tag) diff --git a/src/intersect_sdk/_internal/control_plane/brokers/broker_client.py b/src/intersect_sdk/_internal/control_plane/brokers/broker_client.py deleted file mode 100644 index a69bbee..0000000 --- a/src/intersect_sdk/_internal/control_plane/brokers/broker_client.py +++ /dev/null @@ -1,76 +0,0 @@ -from typing import Protocol - - -class BrokerClient(Protocol): - """Abstract definition of a Broker Client. - - Will abstractly manage interaction between implementation specific message broker - calls and higher level pub/sub features. - """ - - def connect(self) -> None: - """Connect to the defined broker. Credentials should be cached in the constructor.""" - ... - - def disconnect(self) -> None: - """Disconnect from the defined broker.""" - ... - - def is_connected(self) -> bool: - """Checks if there is an active connection to the broker. - - Returns: - A boolean. True if there is a connection, False if not. - """ - ... - - def considered_unrecoverable(self) -> bool: - """Checks if the broker is considered to be in a state where it would be impossible to reconnect. - - Returns: - A boolean. True if can't recover, False otherwise. - """ - ... - - def publish( - self, topic: str, payload: bytes, content_type: str, headers: dict[str, str], persist: bool - ) -> None: - """Publishes the given message. - - Publish payload with the pre-existing connection (via connect()) on topic. - - Args: - topic: The topic on which to publish the message as a string. - payload: The message to publish, as raw bytes. - content_type: The content type of the message (if the data plane used is the control plane itself), or the value to be retrieved from the data plane (if the message handler is MINIO/etc.) - headers: UTF-8 dictionary which can help parse information about the message - persist: - True = message will persist forever in associated queues until consumers are available (usually used for Userspace messages) - False = remove message immediately if no consumers available (usually used for Event messages and Lifecycle messages) - """ - ... - - def subscribe(self, topic: str, persist: bool) -> None: - """Subscribe to a topic over the pre-existing connection (via connect()). - - This function should ALSO be called by reconnect handlers, and not just directly. - When calling the function directly, it's expected that you have already connected. - - Args: - topic: Topic to subscribe to. - persist: Whether or not the queue subscribed to is intended to be long-lived. - """ - ... - - def unsubscribe(self, topic: str) -> None: - """Unsubscribe from a topic over the pre-existing connection (via connect()). - - Note that it should NEVER be expected to call this function as part of routine cleanup. - It should only be called if you want to continue staying connected to the broker, - but do not want to continue listening for certain topics. In general, Services should - NEVER call this, and clients should only call this to help clean up their queues. - - Args: - topic: Topic to unsubscribe from. - """ - ... diff --git a/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py b/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py deleted file mode 100644 index 236de08..0000000 --- a/src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py +++ /dev/null @@ -1,288 +0,0 @@ -from __future__ import annotations - -import threading -import uuid -from typing import TYPE_CHECKING, Any - -import paho.mqtt.client as paho_client -from paho.mqtt.enums import CallbackAPIVersion -from paho.mqtt.packettypes import PacketTypes -from paho.mqtt.properties import Properties -from retrying import retry - -from ...logger import logger -from .broker_client import BrokerClient - -if TYPE_CHECKING: - from collections.abc import Callable - - from paho.mqtt.client import DisconnectFlags - from paho.mqtt.reasoncodes import ReasonCode - - from ..topic_handler import TopicHandler - - -_MQTT_MAX_RETRIES = 10 - - -class MQTTClient(BrokerClient): - """Client for performing broker actions backed by a MQTT broker. - - Note that this class may not be thread safe, see https://github.com/eclipse/paho.mqtt.python/issues/358#issuecomment-1880819505 - - Attributes: - uid: String defining this client's unique ID in the broker - host: hostname of MQTT broker - port: port of MQTT broker - _connection: Paho Client used to interact with the broker - _connected: current state of whether or not we're connected to the broker (boolean) - _topics_to_handlers: Dictionary of string topic names to lists of - Callables to invoke for messages on that topic. - """ - - def __init__( - self, - host: str, - port: int, - username: str, - password: str, - topics_to_handlers: Callable[[], dict[str, TopicHandler]], - uid: str | None = None, - ) -> None: - """The default constructor. - - Args: - host: String for hostname of MQTT broker - port: port number of MQTT broker - username: username credentials for MQTT broker - password: password credentials for MQTT broker - topics_to_handlers: callback function which gets the topic to handler map from the channel manager - uid: A string representing the unique id to identify the client. - """ - # Unique id for the MQTT broker to associate this client with - self.uid = uid if uid else str(uuid.uuid4()) - self.host = host - self.port = port - - # Create a client to connect to RabbitMQ - self._connection = paho_client.Client( - callback_api_version=CallbackAPIVersion.VERSION2, - protocol=paho_client.MQTTv5, - client_id=self.uid, - ) - self._connection.username_pw_set(username=username, password=password) - - # Whether the connection is currently active - self._connected = False - self._should_disconnect = False - self._unrecoverable = False - self._connection_retries = 0 - self._connected_flag = threading.Event() - - # ConnectionManager callable state - self._topics_to_handlers = topics_to_handlers - - # MQTT v3.1.1 automatically downgrades a QOS which is too high (good), but MQTT v5 will terminate the connection (bad) - # see https://github.com/rabbitmq/rabbitmq-server/discussions/11842 - self._max_supported_qos = 2 - - # MQTT callback functions - self._connection.on_connect = self._handle_connect - self._connection.on_disconnect = self._handle_disconnect - self._connection.on_message = self._on_message - - @retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000, wait_exponential_max=60000) - def connect(self) -> None: - """Connect to the defined broker.""" - # Create a client to connect to RabbitMQ - self._should_disconnect = False - self._connected_flag.clear() - self._connection.connect( - self.host, - self.port, - 60, - clean_start=False, - ) - self._connection.loop_start() - while not self.is_connected() and not self._connected_flag.is_set(): - self._connected_flag.wait(1.0) - - def disconnect(self) -> None: - """Disconnect from the broker.""" - self._should_disconnect = True - if self._connection: - self._connection.disconnect() - self._connection.loop_stop() - - def is_connected(self) -> bool: - """Check if there is an active connection to the broker. - - Returns: - A boolean. True if there is a connection, False if not. - """ - return self._connected - - def considered_unrecoverable(self) -> bool: - return self._unrecoverable - - def publish( - self, topic: str, payload: bytes, content_type: str, headers: dict[str, str], persist: bool - ) -> None: - """Publish the given message. - - Publish payload with the pre-existing connection (via connect()) on topic. - - Args: - topic: The topic on which to publish the message as a string. - payload: The message to publish, as raw bytes. - content_type: The content type of the message (if the data plane used is the control plane itself), or the value to be retrieved from the data plane (if the message handler is MINIO/etc.) - headers: UTF-8 dictionary which can help parse information about the message - persist: Determine if the message should live until queue consumers or available (True), or - if it should be removed immediately (False) - """ - props = Properties(PacketTypes.PUBLISH) # type: ignore[no-untyped-call] - props.ContentType = content_type - props.UserProperty = list(headers.items()) - self._connection.publish( - topic, payload, qos=self._max_supported_qos if persist else 0, properties=props - ) - - def subscribe(self, topic: str, persist: bool) -> None: - """Subscribe to a topic over the pre-existing connection (via connect()). - - Args: - topic: Topic to subscribe to. - persist: Determine if the associated message queue of the topic is long-lived (True) or not (False) - """ - # NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1 - self._connection.subscribe(topic, qos=2 if persist else 0, properties=None) - - def unsubscribe(self, topic: str) -> None: - """Unsubscribe from a topic over the pre-existing connection. - - Args: - topic: Topic to unsubscribe from. - """ - self._connection.unsubscribe(topic) - - def _on_message( - self, - client: paho_client.Client, # noqa: ARG002 - userdata: Any, # noqa: ARG002 - message: paho_client.MQTTMessage, - ) -> None: - """Handle a message from the MQTT server. - - Args: - client: the Paho client - userdata: MQTT user data - message: MQTT message - """ - topic_handler = self._topics_to_handlers().get(message.topic) - # Note that if we return prior to the callback, there will be no reply message - if not topic_handler: - logger.warning('Incompatible message topic %s, rejecting message', message.topic) - return - try: - content_type = message.properties.ContentType # type: ignore[union-attr] - headers = dict(message.properties.UserProperty) # type: ignore[union-attr] - except AttributeError as e: - logger.warning( - 'Missing mandatory property %s in received message. The message will be rejected', - e.name, - ) - return - except ValueError: - logger.warning( - 'Headers in received message are in improper format. The message will be rejected' - ) - return - for cb in topic_handler.callbacks: - cb(message.payload, content_type, headers) - - def _handle_disconnect( - self, - client: paho_client.Client, - userdata: Any, - flags: DisconnectFlags, - reason_code: ReasonCode, - properties: Properties | None, - ) -> None: - """Handle a disconnection from the MQTT server. - - This callback usually implies a temporary connection fault, so we'll try to handle it. - - Args: - client: The Paho client. - userdata: MQTT user data. - flags: List of MQTT connection flags. - reason_code: MQTT return code. - properties: MQTT user properties. - """ - logger.debug( - 'mqtt disconnected log - uid=%s reason_code=%s flags=%s userdata=%s properties=%s', - self.uid, - reason_code, - flags, - userdata, - properties, - ) - self._connected = False - if not self._should_disconnect: - client.reconnect() - - def _handle_connect( - self, - client: paho_client.Client, # noqa: ARG002 - userdata: Any, - flags: dict[str, Any], - reason_code: ReasonCode, - properties: Properties | None, - ) -> None: - """Set the connection status in response to the result of a Paho connection attempt. - - If rc is 0, connection was successful - if not, - this usually implies a misconfiguration in the application config. - - Args: - client: The Paho MQTT client. - userdata: The MQTT userdata. - flags: List of MQTT connection flags. - reason_code: The MQTT return code. - properties: MQTT user properties - """ - if str(reason_code) == 'Success': - logger.debug( - 'MQTT connected log - reason-code=%s properties=%s userdata=%s flags=%s', - reason_code, - properties, - userdata, - flags, - ) - self._connected = True - self._connection_retries = 0 - self._should_disconnect = False - - # mimic "automatic QoS downgrade" of MQTTv3 for MQTTv5 - if properties and hasattr(properties, 'MaximumQoS'): - logger.info('MQTT: Maximum supported QoS is %s', properties.MaximumQoS) - self._max_supported_qos = properties.MaximumQoS - - self._connected_flag.set() - for topic, topic_handler in self._topics_to_handlers().items(): - self.subscribe(topic, topic_handler.topic_persist) - else: - # This will generally suggest a misconfiguration - self._connected = False - self._connection_retries += 1 - logger.error('Bad connection (reason: %s)', reason_code) - logger.error( - 'On connect error received (probable broker config error), have tried %s times', - self._connection_retries, - ) - logger.error('Connection error userdata: %s', userdata) - logger.error('Connection error flags: %s', flags) - if self._connection_retries >= _MQTT_MAX_RETRIES: - logger.error('Giving up MQTT reconnection attempt') - self._connected_flag.set() - self._unrecoverable = True diff --git a/src/intersect_sdk/_internal/control_plane/control_plane_manager.py b/src/intersect_sdk/_internal/control_plane/control_plane_manager.py deleted file mode 100644 index 6c23cc3..0000000 --- a/src/intersect_sdk/_internal/control_plane/control_plane_manager.py +++ /dev/null @@ -1,182 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Literal - -from ..exceptions import IntersectInvalidBrokerError -from ..logger import logger -from .topic_handler import TopicHandler - -if TYPE_CHECKING: - from collections.abc import Callable - - from ...config.shared import ControlPlaneConfig - from .brokers.broker_client import BrokerClient - from .definitions import MessageCallback - - -def create_control_provider( - config: ControlPlaneConfig, - topic_handler_callback: Callable[[], dict[str, TopicHandler]], -) -> BrokerClient: - if config.protocol == 'amqp0.9.1': - # only try to import the AMQP client if the user is using an AMQP broker - try: - from .brokers.amqp_client import ( # noqa: PLC0415 (lazy load all AMQP modules) - AMQPClient, - ) - - return AMQPClient( - host=config.host, - port=config.port or 5672, - username=config.username, - password=config.password, - topics_to_handlers=topic_handler_callback, - ) - except ImportError as e: - msg = "Configuration includes AMQP broker, but AMQP dependencies were not installed. Install intersect with the 'amqp' optional dependency to use this backend. (i.e. `pip install intersect_sdk[amqp]`)" - raise IntersectInvalidBrokerError(msg) from e - - # MQTT - from .brokers.mqtt_client import MQTTClient # noqa: PLC0415 (lazy load MQTT modules) - - return MQTTClient( - host=config.host, - port=config.port or 1883, - username=config.username, - password=config.password, - topics_to_handlers=topic_handler_callback, - ) - - -class ControlPlaneManager: - """The ControlPlaneManager class allows for working with multiple brokers from a single function call.""" - - def __init__( - self, - control_configs: list[ControlPlaneConfig] | Literal['discovery'], - ) -> None: - """Basic constructor. - - Some interaction with message brokers can change based on whether or not a Service or a Client is calling it. - """ - if control_configs == 'discovery': - msg = 'Discovery service not implemented yet' - raise NotImplementedError(msg) - self._control_providers = [ - create_control_provider(config, self.get_subscription_channels) - for config in control_configs - ] - - # flag which indicates if we SHOULD be connected. - self._ready = False - # topics_to_handlers are managed here and transcend connections/disconnections to the broker - self._topics_to_handlers: dict[str, TopicHandler] = {} - - def add_subscription_channel( - self, channel: str, callbacks: set[MessageCallback], persist: bool - ) -> None: - """Start listening for messages on a channel on all configured brokers. - - Use the format ${TOPIC}/${TOPIC}/${TOPIC} ... for the channel name, protocol implementations - are responsible for converting the string to a valid channel. - - This function should usually only be called before you've connected, - but it's okay to call it after connecting. (Mostly used for Clients.) - - Params: - channel: string of the channel which we should start listening to - callbacks: functions to call on subscribing to a message - persist: if True, expect the associated message queue to live long; if False, it will only live the duration of the application. - Any queue associated with a Service should always set this to True. Clients will need to subscribe to their own, temporary queues, and should set this to False. - """ - topic_handler = self._topics_to_handlers.get(channel) - if topic_handler is None: - topic_handler = TopicHandler(persist) - topic_handler.callbacks |= callbacks - self._topics_to_handlers[channel] = topic_handler - else: - topic_handler.callbacks |= callbacks - if self.is_connected(): - for provider in self._control_providers: - provider.subscribe(channel, persist) - - def remove_subscription_channel(self, channel: str) -> bool: - """Stop subscribing to a channel on all configured brokers. - - Params: - channel: string of the channel which should no longer be listened to - Returns: True if channel is no longer being listened to, False if the channel wasn't being listened to in the first place - """ - try: - del self._topics_to_handlers[channel] - except KeyError: - return False - else: - if self.is_connected(): - for provider in self._control_providers: - provider.unsubscribe(channel) - return True - - def get_subscription_channels(self) -> dict[str, TopicHandler]: - """Get the subscription channels. - - Note that this function gets accessed as a callback from the direct broker implementations. - - Returns: - the dictionary of topics to topic information - """ - return self._topics_to_handlers - - def connect(self) -> None: - """Connect to all configured brokers. - - Each broker client should utilize their respective on_connect callback to - subscribe to all channels tracked in the ControlPlaneManager. - """ - # TODO - when implementing discovery service, discovery and connection logic should be applied here - for provider in self._control_providers: - provider.connect() - self._ready = True - - def disconnect(self) -> None: - """Disconnect from all configured brokers.""" - self._ready = False - for provider in self._control_providers: - provider.disconnect() - - def publish_message( - self, - channel: str, - payload: bytes, - content_type: str, - headers: dict[str, str], - persist: bool, - ) -> None: - """Publish message on channel for all brokers.""" - if self.is_connected(): - for provider in self._control_providers: - provider.publish(channel, payload, content_type, headers, persist) - else: - # TODO may want more robust error handling here - logger.error('Cannot send message, providers are not connected') - - def is_connected(self) -> bool: - """Check that we are connected to ALL configured brokers. - - Returns: - - True if we are currently connected to all brokers we've configured, False if not - """ - return self._ready and all( - control_provider.is_connected() for control_provider in self._control_providers - ) - - def considered_unrecoverable(self) -> bool: - """Check if any broker is considered to be in an unrecoverable state. - - Returns: - - True if we can't recover, false otherwise - """ - return any( - control_provider.considered_unrecoverable() - for control_provider in self._control_providers - ) diff --git a/src/intersect_sdk/_internal/control_plane/definitions.py b/src/intersect_sdk/_internal/control_plane/definitions.py deleted file mode 100644 index 2ca9371..0000000 --- a/src/intersect_sdk/_internal/control_plane/definitions.py +++ /dev/null @@ -1,10 +0,0 @@ -from collections.abc import Callable - -MessageCallback = Callable[[bytes, str, dict[str, str]], None] -""" -All subscription callback functions take three arguments, provided by the protocol handler: - -1. The PAYLOAD of the message, in raw bytes. -2. The content-type of the PAYLOAD (as a valid utf-8 string). This can be validated prior to the callback function. -3. A UTF-8 mapping of header keys to header values. These should generally be specific to a domain, and will get validated in the callback function. -""" diff --git a/src/intersect_sdk/_internal/control_plane/topic_handler.py b/src/intersect_sdk/_internal/control_plane/topic_handler.py deleted file mode 100644 index d8d09a2..0000000 --- a/src/intersect_sdk/_internal/control_plane/topic_handler.py +++ /dev/null @@ -1,17 +0,0 @@ -from .definitions import MessageCallback - - -class TopicHandler: - """ControlPlaneManager information about a topic, avoids protocol specific information.""" - - callbacks: set[MessageCallback] - """Set of functions to call when consuming a message. - - (In practice there will only be one callback, but it could be helpful to add a debugging function callback in for development.) - """ - topic_persist: bool - """Whether or not a topic queue is expected to persist on the message broker.""" - - def __init__(self, topic_persist: bool) -> None: - self.callbacks = set() - self.topic_persist = topic_persist diff --git a/src/intersect_sdk/_internal/data_plane/__init__.py b/src/intersect_sdk/_internal/data_plane/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/intersect_sdk/_internal/data_plane/data_plane_manager.py b/src/intersect_sdk/_internal/data_plane/data_plane_manager.py deleted file mode 100644 index 98e55b9..0000000 --- a/src/intersect_sdk/_internal/data_plane/data_plane_manager.py +++ /dev/null @@ -1,113 +0,0 @@ -from __future__ import annotations - -import random -from typing import TYPE_CHECKING - -from pydantic import TypeAdapter, ValidationError - -from ...core_definitions import IntersectDataHandler, IntersectMimeType -from ..exceptions import IntersectError -from ..logger import logger -from .minio_utils import MinioPayload, create_minio_store, get_minio_object, send_minio_object - -if TYPE_CHECKING: - from ...config.shared import DataStoreConfigMap, HierarchyConfig - - -MINIO_ADAPTER = TypeAdapter(MinioPayload) - - -class DataPlaneManager: - """The DataPlaneManager serves as a common interface to the data plane. - - The API supports extensive plug-and-play for different data providers. - """ - - def __init__(self, hierarchy: HierarchyConfig, data_configs: DataStoreConfigMap) -> None: - """Inside the constructor, we verify that all data configuration credentials are correct. - - Params: - hierarchy: Hierarchy configuration - data_configs: data configuration - """ - self._hierarchy = hierarchy - self._minio_providers = list(map(create_minio_store, data_configs.minio)) - - # warn users about missing data plane - if not self._minio_providers: - logger.warning('WARNING: This service cannot support any MINIO instances') - - def incoming_message_data_handler( - self, message: bytes, request_data_handler: IntersectDataHandler - ) -> bytes: - """Get data from the request data provider. - - Params: - message: the message sent externally to this location - Returns: - the actual data we want to submit to the user function - Raise: - IntersectException - if we couldn't get the data - """ - if request_data_handler == IntersectDataHandler.MESSAGE: - return message - if request_data_handler == IntersectDataHandler.MINIO: - # TODO - we may want to send additional provider information in the payload - try: - payload: MinioPayload = MINIO_ADAPTER.validate_json(message) - except ValidationError as e: - logger.warning('Invalid MINIO payload format, dropping message') - raise IntersectError from e - provider = None - for store in self._minio_providers: - if store._base_url._url.geturl() == payload['minio_url']: # noqa: SLF001 (only way to get URL from MINIO API) - provider = store - break - if not provider: - logger.error( - f"You did not configure listening to MINIO instance '{payload['minio_url']}'. You must fix this to handle this data." - ) - raise IntersectError - return get_minio_object(provider, payload) - logger.warning(f'Cannot parse data handler {request_data_handler}') - raise IntersectError - - def outgoing_message_data_handler( - self, - function_response: bytes, - content_type: IntersectMimeType, - data_handler: IntersectDataHandler, - ) -> bytes: - """Send the user's response to the appropriate data provider. - - Params: - - function_response - the return value from the user's function - - content_type - content type of function_response - - data_handler - where we're going to send the data off to (i.e. the message, MINIO...) - - Returns: - the payload of the message, this varies based off of the data_handler value - Raise: - IntersectException - if there was any error in submitting the response - """ - # TODO - instead of requiring users to specify the data handler themselves, another idea could be to use - # sys.getsizeof(function_response) and determine the data handler dynamically - # users could perhaps specify T1/T2/T3 data types but not the specific implementation - if data_handler == IntersectDataHandler.MESSAGE: - return function_response - if data_handler == IntersectDataHandler.MINIO: - if not self._minio_providers: - logger.error( - 'No MINIO provider configured, so you cannot set response_data_handler on @intersect_message to equal IntersectDataHandler.MINIO .' - ) - raise IntersectError - provider = random.choice(self._minio_providers) # noqa: S311 (TODO choose a MINIO provider better than at random - this may be determined from external message params) - minio_payload = send_minio_object( - function_response, provider, content_type, self._hierarchy - ) - return MINIO_ADAPTER.dump_json(minio_payload) - - logger.error( - f'No support implemented for code {data_handler}, please upgrade your intersect-sdk version.' - ) - raise IntersectError diff --git a/src/intersect_sdk/_internal/data_plane/minio_utils.py b/src/intersect_sdk/_internal/data_plane/minio_utils.py deleted file mode 100644 index 6f94137..0000000 --- a/src/intersect_sdk/_internal/data_plane/minio_utils.py +++ /dev/null @@ -1,154 +0,0 @@ -from __future__ import annotations - -import mimetypes -from hashlib import sha224 -from io import BytesIO -from typing import TYPE_CHECKING -from uuid import uuid4 - -from minio import Minio -from minio.error import MinioException -from typing_extensions import TypedDict -from urllib3.exceptions import MaxRetryError -from urllib3.util import parse_url - -from ..exceptions import IntersectError -from ..logger import logger -from ..utils import die - -if TYPE_CHECKING: - from ...config.shared import DataStoreConfig, HierarchyConfig - from ...core_definitions import IntersectMimeType - - -class MinioPayload(TypedDict): - """This is a payload which gets sent in the actual userspace message if the data handler is "MINIO".""" - - minio_url: str - """ - The complete URL of the MINIO instance. This is used for finding the correct MINIO instance when retrieving an object. - """ - minio_bucket: str - """ - The name of the bucket where the object is located. Each service should only create objects in one bucket. - """ - minio_object_id: str - """ - The name of the object. This is a random UUID. - """ - - -def _condense_minio_bucket_name(hierarchy: HierarchyConfig) -> str: - """Condense a hierarchy string into a string less than 64 characters. - - This function is needed to handle MINIO bucket names. Collisions should be extremely rare, - and it should be fairly straightforward to identify a specific bucket for MINIO admins. - - Bucket name = first characters (up to 6) of service name + hyphen + sha224 of full hierarchy string - - TODO in the future, MINIO calls should be system-level only, so only the system + facility + organization - should need to be hashed. - Also, potentially come up with a better hashing algorithm (though sha224 is compressed enough, and gives us 56 characters). - """ - return f'{hierarchy.service[:6]}-{sha224(hierarchy.hierarchy_string().encode()).hexdigest()}' - - -def create_minio_store(config: DataStoreConfig) -> Minio: - config_uri = parse_url(config.host) - if not config_uri.host: - die(f'Minio configuration host {config.host} cannot be parsed as a valid hostname.') - - client = Minio( - secure=config_uri.scheme == 'https', - access_key=config.username, - secret_key=config.password, - endpoint=config_uri.host if not config.port else f'{config_uri.host}:{config.port}', - ) - try: - # it doesn't matter if the bucket exists, we're just checking the exception - # this is the fastest way in the public API to perform a HEAD request. - client.bucket_exists('qwop') - except MinioException: - die(f'Invalid credentials for Minio instance: {config}') - return client - - -def send_minio_object( - data: bytes, provider: Minio, content_type: IntersectMimeType, hierarchy: HierarchyConfig -) -> MinioPayload: - """Core function to save data in MINIO. - - Params: - data: user response data, as bytes - provider: the Minio client - content_type: the content type of the body - hierarchy: the hierarchy configuration - Returns: - The MINIO payload which gets sent in the actual message. - - Raises: - IntersectException - if any non-fatal MinIO error is caught - """ - bucket_name = _condense_minio_bucket_name(hierarchy) - # mimetypes.guess_extension() is a nice-to-have for MINIO preview, but isn't essential. - object_id = str(uuid4()) + (mimetypes.guess_extension(content_type) or '') - try: - if not provider.bucket_exists(bucket_name): - provider.make_bucket(bucket_name) - buff_data = BytesIO(data) - provider.put_object( - bucket_name=bucket_name, - object_name=object_id, - data=buff_data, - length=buff_data.getbuffer().nbytes, - content_type=content_type, - ) - return MinioPayload( - minio_url=provider._base_url._url.geturl(), # noqa: SLF001 (only way to get URL from MINIO API) - minio_bucket=bucket_name, - minio_object_id=object_id, - ) - except MaxRetryError as e: - logger.warning( - f'Non-fatal MinIO error when sending object, the server may be under stress but you should double-check your configuration. Details: \n{e}' - ) - raise IntersectError from e - except MinioException as e: - logger.error( - f'Important MinIO error when sending object, this usually indicates a problem with your configuration. Details: \n{e}' - ) - raise IntersectError from e - - -def get_minio_object(provider: Minio, payload: MinioPayload) -> bytes: - """Core function to retrieve data from MINIO. - - Params: - provider: a pre-cached MinIO provider from the data provider store - payload: the payload from the message (at this point, the minio_url should exist) - - Returns: - user response data, as raw bytes - Raises: - IntersectException - if any non-fatal MinIO error is caught - """ - try: - response = provider.get_object( - bucket_name=payload['minio_bucket'], object_name=payload['minio_object_id'] - ) - # TODO - objects should ONLY be removed if they are T1 - provider.remove_object( - bucket_name=payload['minio_bucket'], object_name=payload['minio_object_id'] - ) - except MaxRetryError as e: - logger.warning( - f'Non-fatal MinIO error when retrieving object, the server may be under stress but you should double-check your configuration. Details: \n{e}' - ) - raise IntersectError from e - except MinioException as e: - logger.error( - f'Important MinIO error when retrieving object, this usually indicates a problem with your configuration. Details: \n{e}' - ) - raise IntersectError from e - else: - return response.data diff --git a/src/intersect_sdk/_internal/exceptions.py b/src/intersect_sdk/_internal/exceptions.py deleted file mode 100644 index dacff0d..0000000 --- a/src/intersect_sdk/_internal/exceptions.py +++ /dev/null @@ -1,17 +0,0 @@ -class IntersectError(Exception): - """Generic marker for INTERSECT-specific exceptions.""" - - -class IntersectApplicationError(IntersectError): - """This is a special IntersectException, thrown if user application logic throws ANY kind of Exception. The only caveat is that if a user explicitly throws an IntersectCapabilityException, in which case that logic will be handled instead. - - In general, validation should be expressed through JSON schema as much as possible; however, JSON schema is NOT a complete prescription for input validation. - When this exception is thrown, however, we do not leak any exception information in the error message. On the other hand, if the input fails - JSON schema validation, Pydantic will throw a specific ValidationError, and that exception information will deliberately be exposed in the error message. - - This exception should strictly be used for control flow - it should NEVER be a fatal exception - """ - - -class IntersectInvalidBrokerError(IntersectError): - """Exception when invalid broker backend used.""" diff --git a/src/intersect_sdk/_internal/interfaces.py b/src/intersect_sdk/_internal/interfaces.py index a5844a8..cff5c46 100644 --- a/src/intersect_sdk/_internal/interfaces.py +++ b/src/intersect_sdk/_internal/interfaces.py @@ -5,8 +5,9 @@ if TYPE_CHECKING: from uuid import UUID + from intersect_sdk_common import HierarchyConfig + from ..client_callback_definitions import INTERSECT_CLIENT_EVENT_CALLBACK_TYPE - from ..config.shared import HierarchyConfig from ..service_callback_definitions import ( INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, ) diff --git a/src/intersect_sdk/_internal/messages/__init__.py b/src/intersect_sdk/_internal/messages/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/intersect_sdk/_internal/messages/event.py b/src/intersect_sdk/_internal/messages/event.py deleted file mode 100644 index 8ec54f5..0000000 --- a/src/intersect_sdk/_internal/messages/event.py +++ /dev/null @@ -1,145 +0,0 @@ -"""This module contains core messaging definitions relating to INTERSECT events. - -This module is internal-facing and should not be used directly by users. - -Event messages are ALWAYS PRODUCED on the events channel. -Services should NEVER be CONSUMING messages on the events channel. -Clients/orchestrators are expected to consume these events themselves. -""" - -import datetime -import uuid -from typing import Annotated - -from pydantic import AwareDatetime, BaseModel, Field, field_serializer - -from ...constants import CAPABILITY_REGEX, SYSTEM_OF_SYSTEM_REGEX -from ...core_definitions import IntersectDataHandler -from ...version import version_string - -# TODO - another property we should consider is an optional max_wait_time for events which are fired from functions. -# This would mostly be useful for clients/orchestrators that are waiting for a specific event. -# This should probably be configured on the schema level... - - -class EventMessageHeaders(BaseModel): - """ALL event messages must include this header. - - We do not include the content type of the message in the header, it is handled separately. - """ - - message_id: Annotated[uuid.UUID, Field(description='Unique message ID')] - """ - ID of the message. - """ - - source: Annotated[ - str, - Field( - description='source of the message', - pattern=SYSTEM_OF_SYSTEM_REGEX, - ), - ] - """ - source of the message - """ - - created_at: Annotated[ - AwareDatetime, - Field( - description='the UTC timestamp of message creation', - ), - ] - """ - created timestamp - should meet ISO-8601 format, always in UTC - """ - - sdk_version: Annotated[ - str, - Field( - pattern=r'^\d+\.\d+\.\d+$', - description="SemVer string of SDK's version, used to check for compatibility", - ), - ] - """ - The version of the SDK sending over the messages. Will be used to determine: - - if two systems can communicate. SDKs will always be compatible if they share minor/bugfix versions. - SDKs should be assumed NOT to be compatible if they don't share the major version. - """ - - data_handler: Annotated[ - IntersectDataHandler, - Field( - IntersectDataHandler.MESSAGE, - description='Code signifying where data is stored.', - ), - ] - """ - Code signifying where data is stored. If this value = IntersectDataHandler.MESSAGE (0), the data is directly in the payload. - - If the value equals anything else, the payload signifies the _pointer_ to the data. For example, if this value signifies MinIO - usage, the payload would indicate the URI to where the data is stored on MinIO. - """ - - capability_name: Annotated[ - str, - Field( - pattern=CAPABILITY_REGEX, - description='The name of the capability which emitted the event originally.', - ), - ] - """ - The name of the capability which emitted the event originally. - """ - - event_name: Annotated[ - str, - Field( - pattern=CAPABILITY_REGEX, - description='The name of the event that was emitted, namespaced to the capability.', - ), - ] - """ - The name of the event that was emitted. This is meaningless without the capability name. - """ - - # make sure all non-string fields are serialized into strings, even in Python code - - @field_serializer('message_id', mode='plain') - def ser_uuid(self, uuid: uuid.UUID) -> str: - return str(uuid) - - @field_serializer('created_at', mode='plain') - def ser_datetime(self, dt: datetime.datetime) -> str: - return dt.isoformat() - - @field_serializer('data_handler', mode='plain') - def ser_enum(self, enum: IntersectDataHandler) -> str: - return enum.value - - -def create_event_message_headers( - source: str, - capability_name: str, - event_name: str, - data_handler: IntersectDataHandler, -) -> dict[str, str]: - """Generate raw headers and write them into a generic data structure which can be handled by any broker protocol.""" - return EventMessageHeaders( - source=source, - message_id=uuid.uuid4(), - created_at=datetime.datetime.now(tz=datetime.timezone.utc), - sdk_version=version_string, - capability_name=capability_name, - event_name=event_name, - data_handler=data_handler, - ).model_dump(by_alias=True) - - -def validate_event_message_headers(raw_headers: dict[str, str]) -> EventMessageHeaders: - """Validate raw headers and return the object. - - Raises: - pydantic.ValidationError - if the headers were missing any essential information - """ - return EventMessageHeaders(**raw_headers) # type: ignore[arg-type] diff --git a/src/intersect_sdk/_internal/messages/lifecycle.py b/src/intersect_sdk/_internal/messages/lifecycle.py deleted file mode 100644 index cae1d9a..0000000 --- a/src/intersect_sdk/_internal/messages/lifecycle.py +++ /dev/null @@ -1,107 +0,0 @@ -"""This module contains core messaging definitions relating to INTERSECT lifecycle actions. - -This module is internal-facing and should not be used directly by users. - -Lifecycle messages are ALWAYS PRODUCED on the lifecycle channel. -Services should NEVER be CONSUMING messages on the lifecycle channel. -(consumption is for INTERSECT core services only). -""" - -import datetime -import uuid -from typing import Annotated, Literal - -from pydantic import AwareDatetime, BaseModel, Field, field_serializer - -from ...constants import SYSTEM_OF_SYSTEM_REGEX -from ...version import version_string - -LifecycleType = Literal[ - 'LCT_STARTUP', - 'LCT_SHUTDOWN', - 'LCT_POLLING', - 'LCT_FUNCTIONS_ALLOWED', - 'LCT_FUNCTIONS_BLOCKED', -] - - -class LifecycleMessageHeaders(BaseModel): - """ALL lifecycle messages must include this header. - - We do not include the content type of the message in the header, it is handled separately. - - A special note about lifecycle messages is that their content type must ALWAYS be "application/json". - """ - - message_id: Annotated[uuid.UUID, Field(description='Unique message ID')] - """UUID of the message.""" - - source: Annotated[ - str, - Field( - description='source of the message', - pattern=SYSTEM_OF_SYSTEM_REGEX, - ), - ] - """ - source of the message - """ - - created_at: Annotated[ - AwareDatetime, - Field( - description='the UTC timestamp of message creation', - ), - ] - """ - created timestamp - should meet ISO-8601 format, always in UTC - """ - - sdk_version: Annotated[ - str, - Field( - pattern=r'^\d+\.\d+\.\d+$', - description="SemVer string of SDK's version, used to check for compatibility", - ), - ] - """ - The version of the SDK sending over the messages. Will be used to determine: - - if two systems can communicate. SDKs will always be compatible if they share minor/bugfix versions. - SDKs should be assumed NOT to be compatible if they don't share the major version. - """ - - lifecycle_type: LifecycleType - """ - The integer code of the lifecycle message being sent/received. - """ - - # make sure all non-string fields are serialized into strings, even in Python code - - @field_serializer('message_id', mode='plain') - def ser_uuid(self, uuid: uuid.UUID) -> str: - return str(uuid) - - @field_serializer('created_at', mode='plain') - def ser_datetime(self, dt: datetime.datetime) -> str: - return dt.isoformat() - - -def create_lifecycle_message_headers( - source: str, - lifecycle_type: LifecycleType, -) -> dict[str, str]: - """Generate raw headers and write them into a generic data structure which can be handled by any broker protocol. - - The contents of the payload should vary based on the lifecycle type. - """ - return LifecycleMessageHeaders( - source=source, - message_id=uuid.uuid4(), - created_at=datetime.datetime.now(tz=datetime.timezone.utc), - sdk_version=version_string, - lifecycle_type=lifecycle_type, - ).model_dump(by_alias=True) - - -def validate_lifecycle_message_headers(raw_headers: dict[str, str]) -> LifecycleMessageHeaders: - return LifecycleMessageHeaders(**raw_headers) # type: ignore[arg-type] diff --git a/src/intersect_sdk/_internal/messages/userspace.py b/src/intersect_sdk/_internal/messages/userspace.py deleted file mode 100644 index 3ae66b7..0000000 --- a/src/intersect_sdk/_internal/messages/userspace.py +++ /dev/null @@ -1,183 +0,0 @@ -"""This module contains core messaging definitions relating to user-defined functions. - -This module is internal-facing and should not be used directly by users. - -Services have two associated channels which handle userspace messages: their request channel -and their response channel. Services always CONSUME messages from these channels, but never PRODUCE messages -on these channels. (A message is always sent in the receiver's namespace). - -The response channel is how the service handles external requests, the request channel is used when this service itself -needs to make external requests through INTERSECT. - -Services should ALWAYS be CONSUMING from their userspace channel. -They should NEVER be PRODUCING messages on their userspace channel. - -Clients should be CONSUMING from their userspace channel, but should only get messages -from services they explicitly messaged. -""" - -import datetime -import uuid -from typing import Annotated - -from pydantic import AwareDatetime, BaseModel, Field, field_serializer - -from ...constants import SYSTEM_OF_SYSTEM_REGEX -from ...core_definitions import ( - IntersectDataHandler, -) -from ...version import version_string - - -class UserspaceMessageHeaders(BaseModel): - """ALL request/response/command messages must contain this header. - - We do not include the content type of the message in the header, it is handled separately. - """ - - message_id: Annotated[uuid.UUID, Field(description='Unique message ID')] - """ - ID of the message. - """ - - campaign_id: Annotated[uuid.UUID, Field(description='ID associated with a campaign')] - """ - ID of the campaign. For Clients, this should be set once per run, and then not changed. For orchestrators, this is associated with a campaign. - """ - - request_id: Annotated[ - uuid.UUID, - Field( - description='ID associated with a specific request message and response message sequence' - ), - ] - """ - ID of the request. A Client/orchestrator generates this ID for each request message it sends, and the Service generates a response message with this ID. - """ - - source: Annotated[ - str, Field(description='source of the message', pattern=SYSTEM_OF_SYSTEM_REGEX) - ] - """ - source of the message - """ - - destination: Annotated[ - str, Field(description='destination of the message', pattern=SYSTEM_OF_SYSTEM_REGEX) - ] - """ - destination of the message - """ - - created_at: Annotated[ - AwareDatetime, - Field( - description='the UTC timestamp of message creation', - ), - ] - """ - created timestamp - should meet ISO-8601 format, always in UTC - """ - - sdk_version: Annotated[ - str, - Field( - pattern=r'^\d+\.\d+\.\d+$', - description="SemVer string of SDK's version, used to check for compatibility", - ), - ] - """ - The version of the SDK sending over the messages. Will be used to determine: - - if two systems can communicate. SDKs will always be compatible if they share minor/bugfix versions. - SDKs should be assumed NOT to be compatible if they don't share the major version. - """ - - operation_id: Annotated[ - str, - Field( - description='Name of capability and operation we want to call, in the format ${CAPABILITY_NAME}.${FUNCTION_NAME}' - ), - ] - """ - The name of the operation we want to call. For Services, this indicates the operation which will be called; for Clients, this is the operation which was called. - - This maps to the format ${CAPABILITY_NAME}.${FUNCTION_NAME} . - """ - - data_handler: Annotated[ - IntersectDataHandler, - Field( - IntersectDataHandler.MESSAGE, - description='Code signifying where data is stored.', - ), - ] - """ - Code signifying where data is stored. If this value = IntersectDataHandler.MESSAGE (0), the data is directly in the payload. - - If the value equals anything else, the payload signifies the _pointer_ to the data. For example, if this value signifies MinIO - usage, the payload would indicate the URI to where the data is stored on MinIO. - """ - - has_error: Annotated[ - bool, - Field( - False, - description='If this value is True, the payload will contain the error message (a string)', - ), - ] - """ - If this flag is set to True, the payload will contain the error message (always a string). - - This should only be set to "True" on return messages sent by services - NEVER clients. - """ - - # make sure all non-string fields are serialized into strings, even in Python code - - @field_serializer('message_id', 'request_id', 'campaign_id', mode='plain') - def ser_uuid(self, uuid: uuid.UUID) -> str: - return str(uuid) - - @field_serializer('created_at', mode='plain') - def ser_datetime(self, dt: datetime.datetime) -> str: - return dt.isoformat() - - @field_serializer('has_error', mode='plain') - def ser_boolean(self, boolean: bool) -> str: - return str(boolean).lower() - - @field_serializer('data_handler', mode='plain') - def ser_enum(self, enum: IntersectDataHandler) -> str: - return enum.value - - -def create_userspace_message_headers( - source: str, - destination: str, - operation_id: str, - data_handler: IntersectDataHandler, - campaign_id: uuid.UUID, - request_id: uuid.UUID, - has_error: bool = False, -) -> dict[str, str]: - """Generate raw headers and write them into a generic data structure which can be handled by any broker protocol.""" - return UserspaceMessageHeaders( - message_id=uuid.uuid4(), - campaign_id=campaign_id, - request_id=request_id, - source=source, - destination=destination, - sdk_version=version_string, - created_at=datetime.datetime.now(tz=datetime.timezone.utc), - operation_id=operation_id, - data_handler=data_handler, - has_error=has_error, - ).model_dump(by_alias=True) - - -def validate_userspace_message_headers(raw_headers: dict[str, str]) -> UserspaceMessageHeaders: - """Validate raw headers and return the object. - - Raises: - pydantic.ValidationError - if the headers were missing any essential information - """ - return UserspaceMessageHeaders(**raw_headers) # type: ignore[arg-type] diff --git a/src/intersect_sdk/_internal/multi_flag_thread_event.py b/src/intersect_sdk/_internal/multi_flag_thread_event.py deleted file mode 100644 index 890b5d6..0000000 --- a/src/intersect_sdk/_internal/multi_flag_thread_event.py +++ /dev/null @@ -1,77 +0,0 @@ -import threading - - -class MultiFlagThreadEvent: - """Only set a single threading event if all flags are set. - - NOTE: This is an internal API and does not do validation or control flow - with arguments to its methods. - """ - - def __init__(self, num_flags: int) -> None: - """Constructor. - - Params: - num_flags: This should be a positive value. - You may suffer performance issues if this is >= 63 - """ - self._flag = threading.Event() - self._bitmask = 0b0 - """Value which gets modified. If it equals bitmask_target, set the flag; otherwise, don't.""" - self._bitmask_target = (1 << num_flags) - 1 - """Immutable number where all bits are set to 1""" - - def unset_all(self) -> None: - """Clear all flags.""" - self._flag.clear() - self._bitmask = 0b0 - - def set_all(self) -> None: - """Set all flags.""" - self._flag.set() - self._bitmask = self._bitmask_target - - def set_nth_flag(self, flag: int) -> None: - """Set a flag. - - If all other flags are set, this sets the threading event. - - Params: - flag: position of flag you want to set (0-indexed), should be less than the value provided in the constructor - """ - self._bitmask |= 1 << flag - if self._bitmask == self._bitmask_target: - self._flag.set() - - def unset_nth_flag(self, flag: int) -> None: - """Unset a flag and the threading event. - - Params: - flag: position of flag you want to set (0-indexed), should be less than the value provided in the constructor - """ - self._bitmask &= ~(1 << flag) - self._flag.clear() - - def is_nth_flag_set(self, flag: int) -> bool: - """Check to see if a specific flag is set. - - Params: - flag: position of flag you want to check (0-indexed), should be less than the value provided in the constructor - """ - return (self._bitmask >> flag) & 1 == 1 - - def wait(self, amount: float) -> None: - """Block until internal flag is True, which will only happen once all other flags are set. - - Params: - amount: time to block - """ - self._flag.wait(amount) - - def is_set(self) -> bool: - """Determine if flag was set. Useful as the "while" condition for application loops. - - Returns: - True if set, False otherwise - """ - return self._flag.is_set() diff --git a/src/intersect_sdk/_internal/schema.py b/src/intersect_sdk/_internal/schema.py index dcfd1c7..d6666a0 100644 --- a/src/intersect_sdk/_internal/schema.py +++ b/src/intersect_sdk/_internal/schema.py @@ -14,10 +14,12 @@ get_origin, ) +from intersect_sdk_common.constants import CAPABILITY_REGEX +from intersect_sdk_common.control_plane.messages.event import EventMessageHeaders +from intersect_sdk_common.control_plane.messages.userspace import UserspaceMessageHeaders from pydantic import Field, PydanticUserError, TypeAdapter from typing_extensions import TypeAliasType -from ..constants import CAPABILITY_REGEX from ..service_definitions import IntersectEventDefinition from ..version import version_string from .constants import ( @@ -32,18 +34,15 @@ from .event_metadata import EventMetadata, definition_metadata_differences from .function_metadata import FunctionMetadata from .logger import logger -from .messages.event import EventMessageHeaders -from .messages.userspace import UserspaceMessageHeaders from .pydantic_schema_generator import GenerateTypedJsonSchema from .status_metadata import StatusMetadata from .utils import die if TYPE_CHECKING: + from intersect_sdk_common import HierarchyConfig, IntersectDataHandler from pydantic.json_schema import JsonSchemaMode from ..capability.base import IntersectBaseCapabilityImplementation - from ..config.shared import HierarchyConfig - from ..core_definitions import IntersectDataHandler ASYNCAPI_VERSION = '2.6.0' diff --git a/src/intersect_sdk/_internal/service_queue_name.py b/src/intersect_sdk/_internal/service_queue_name.py new file mode 100644 index 0000000..3f4b294 --- /dev/null +++ b/src/intersect_sdk/_internal/service_queue_name.py @@ -0,0 +1,14 @@ +from hashlib import sha384 + + +def get_service_queue_name(routing_key: str) -> str: + """Generate a valid queue name from the routing key. + + We want to always be able to generate the same queue name from the routing key every time, + so we don't use UUIDs or want the broker to generate a key name. + + We must also keep the length under 128 characters. + + See https://www.rabbitmq.com/docs/queues#names for a complete reference. + """ + return sha384(routing_key.encode()).hexdigest() diff --git a/src/intersect_sdk/_internal/version_resolver.py b/src/intersect_sdk/_internal/version_resolver.py index 29e8592..a92f64e 100644 --- a/src/intersect_sdk/_internal/version_resolver.py +++ b/src/intersect_sdk/_internal/version_resolver.py @@ -1,4 +1,5 @@ -from ..core_definitions import IntersectDataHandler +from intersect_sdk_common import IntersectDataHandler + from ..version import version_info, version_string from .logger import logger @@ -38,7 +39,9 @@ def _resolve_user_version( def resolve_user_version( - their_version: str, their_source: str, their_data_handler: IntersectDataHandler + their_version: str, + their_source: str, + their_data_handler: IntersectDataHandler, ) -> bool: """This function handles all version compatibilities between our SDK version and an incoming message's SDK version. diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index de209bd..d4b9a5a 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -13,24 +13,27 @@ import time from uuid import uuid4 -from pydantic import ValidationError -from typing_extensions import Self, final - -from intersect_sdk._internal.generic_serializer import GENERIC_MESSAGE_SERIALIZER - -from ._internal.control_plane.control_plane_manager import ( +from intersect_sdk_common import ( ControlPlaneManager, + DataPlaneManager, + HierarchyConfig, + IntersectError, ) -from ._internal.data_plane.data_plane_manager import DataPlaneManager -from ._internal.exceptions import IntersectError -from ._internal.logger import logger -from ._internal.messages.event import ( +from intersect_sdk_common.control_plane.messages.event import ( validate_event_message_headers, ) -from ._internal.messages.userspace import ( +from intersect_sdk_common.control_plane.messages.userspace import ( create_userspace_message_headers, validate_userspace_message_headers, ) +from pydantic import ValidationError +from typing_extensions import Self, final + +from intersect_sdk._internal.generic_serializer import ( + GENERIC_MESSAGE_SERIALIZER, +) + +from ._internal.logger import logger from ._internal.utils import die, send_os_signal from ._internal.version_resolver import resolve_user_version from .client_callback_definitions import ( @@ -39,7 +42,6 @@ IntersectClientCallback, ) from .config.client import IntersectClientConfig -from .config.shared import HierarchyConfig from .shared_callback_definitions import IntersectDirectMessageParams @@ -124,6 +126,7 @@ def __init__( f'{self._hierarchy.hierarchy_string("/")}/response', {self._handle_userspace_message}, persist=False, + queue_name=self._hierarchy.service, ) if event_callback: # Do not persist, as event messages are meant to be short-lived. @@ -135,6 +138,7 @@ def __init__( f'{service.hierarchy.replace(".", "/")}/events/{service.capability_name}/{service.event_name}', {self._handle_event_message}, persist=False, + queue_name=self._hierarchy.service, ) self._user_callback = user_callback self._event_callback = event_callback @@ -386,6 +390,7 @@ def _handle_client_callback(self, user_value: IntersectClientCallback | None) -> self._control_plane_manager.add_subscription_channel( f'{add_event.hierarchy.replace(".", "/")}/events/{add_event.capability_name}/{add_event.event_name}', {self._handle_event_message}, + queue_name=self._hierarchy.service, persist=False, ) for remove_event in validated_result.services_to_stop_listening_for_events: diff --git a/src/intersect_sdk/config/client.py b/src/intersect_sdk/config/client.py index 37ce295..25abd6b 100644 --- a/src/intersect_sdk/config/client.py +++ b/src/intersect_sdk/config/client.py @@ -1,19 +1,19 @@ """Client specific configuration types.""" -from typing import Annotated, Literal +from typing import Annotated +from intersect_sdk_common import ControlPlaneConfig, DataStoreConfigMap from pydantic import BaseModel, ConfigDict, Field from typing_extensions import final from ..client_callback_definitions import IntersectClientCallback -from .shared import ControlPlaneConfig, DataStoreConfigMap @final class IntersectClientConfig(BaseModel): """The user-provided configuration needed to integrate with INTERSECT as a client.""" - brokers: Annotated[list[ControlPlaneConfig], Field(min_length=1)] | Literal['discovery'] + brokers: Annotated[list[ControlPlaneConfig], Field(min_length=1)] """ Configurations for any message brokers the application should attach to diff --git a/src/intersect_sdk/config/service.py b/src/intersect_sdk/config/service.py index d2abb64..96c78e8 100644 --- a/src/intersect_sdk/config/service.py +++ b/src/intersect_sdk/config/service.py @@ -1,11 +1,14 @@ """Service specific configuration types.""" -from typing import Annotated, Literal +from typing import Annotated +from intersect_sdk_common import ( + ControlPlaneConfig, + DataStoreConfigMap, + HierarchyConfig, +) from pydantic import BaseModel, ConfigDict, Field, PositiveFloat -from .shared import ControlPlaneConfig, DataStoreConfigMap, HierarchyConfig - class IntersectServiceConfig(BaseModel): """The user-provided configuration needed to integrate with INTERSECT.""" @@ -15,7 +18,7 @@ class IntersectServiceConfig(BaseModel): Configuration of the System-of-System representation """ - brokers: Annotated[list[ControlPlaneConfig], Field(min_length=1)] | Literal['discovery'] + brokers: Annotated[list[ControlPlaneConfig], Field(min_length=1)] """ Configurations for any message brokers the application should attach to diff --git a/src/intersect_sdk/config/shared.py b/src/intersect_sdk/config/shared.py deleted file mode 100644 index a6babb2..0000000 --- a/src/intersect_sdk/config/shared.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Configuration types shared across both Clients and Services.""" - -from dataclasses import dataclass, field -from typing import Annotated, Literal - -from pydantic import BaseModel, ConfigDict, Field, PositiveInt - -from ..core_definitions import IntersectDataHandler - -HIERARCHY_REGEX = r'^[a-z]((?!--)[a-z0-9-]){2,62}$' -""" -The hierarchy regex needs to be fairly restricted due to the number of different -systems we want to be compatible with. The rules: - -- Only allow unreserved characters (alphanumeric and .-~_): https://datatracker.ietf.org/doc/html/rfc3986#section-2.3 -- Require lowercase letters to avoid incompatibilities with case-insensitive systems. -- MinIO has been found to forbid _ and ~ characters -- MinIO requires an alphanumeric character at the start of the string -- No adjacent non-alphanumeric characters allowed -- Range should be from 3-63 characters - -The following commit tracks several issues with MINIO: https://code.ornl.gov/intersect/additive-manufacturing/ros-intersect-adapter/-/commit/fa71b791be0ccf1a5884910b5be3b5239cf9896f -""" - -ControlProvider = Literal['mqtt5.0', 'amqp0.9.1'] -"""The type of broker we connect to.""" - - -class HierarchyConfig(BaseModel): - """Configuration for registering this service in a system-of-system architecture.""" - - service: Annotated[str, Field(pattern=HIERARCHY_REGEX)] - """ - The name of this application - should be unique within an INTERSECT system - """ - - subsystem: str | None = Field(default=None, pattern=HIERARCHY_REGEX) - """ - An associated subsystem / service-grouping of the system (should be unique within an INTERSECT system) - """ - - system: Annotated[str, Field(pattern=HIERARCHY_REGEX)] - """ - Name of the "system", could also be thought of as a "device" (should be unique within a facility) - """ - - facility: Annotated[str, Field(pattern=HIERARCHY_REGEX)] - """ - Name of the facility (an ORNL institutional designation, i.e. 'neutrons') (NOT abbreviated, should be unique within an organization) - """ - - organization: Annotated[str, Field(pattern=HIERARCHY_REGEX)] - """ - Name of the organization (i.e. 'ornl') (NOT abbreviated) (should be unique in an INTERSECT cluster) - """ - - def hierarchy_string(self, join_str: str = '') -> str: - """Get the full hierarchy string. This is mostly used internally, but if you're developing a client, it could potentially be helpful. - - Params - join_str: String used to separate different hierarchy parts in the full string (default: empty string). - - Returns: - Single string, which will contain all system-of-system parts. For optional parts not configured (i.e. - no subsystem), they will be represented by a "-" character. - """ - if not self.subsystem: - return join_str.join([self.organization, self.facility, self.system, '-', self.service]) - return join_str.join( - [ - self.organization, - self.facility, - self.system, - self.subsystem, - self.service, - ] - ) - - # we need to use the Python regex engine instead of the Rust regex engine here, because Rust's does not support lookaheads - model_config = ConfigDict(regex_engine='python-re') - - -@dataclass -class ControlPlaneConfig: - """Configuration for interacting with a broker.""" - - protocol: ControlProvider - """ - The protocol of the broker you'd like to use (i.e. AMQP, MQTT...) - """ - # TODO - support more protocols and protocol versions as needed - see https://www.asyncapi.com/docs/reference/specification/v2.6.0#serverObject - - username: Annotated[str, Field(min_length=1)] - """ - Username credentials for broker connection. - """ - - password: Annotated[str, Field(min_length=1)] - """ - Password credentials for broker connection. - """ - - host: Annotated[str, Field(min_length=1)] = '127.0.0.1' - """ - Broker hostname (default: 127.0.0.1) - """ - - port: PositiveInt | None = None - """ - Broker port. List of common ports: - - - 1883 (MQTT) - - 4222 (NATS default port) - - 5222 (XMPP) - - 5223 (XMPP over TLS) - - 5671 (AMQP over TLS) - - 5672 (AMQP) - - 7400 (DDS Discovery) - - 7401 (DDS User traffic) - - 8883 (MQTT over TLS) - - 61613 (RabbitMQ STOMP - WARNING: ephemeral port) - - NOTE: INTERSECT currently only supports AMQP and MQTT. - """ - - -@dataclass -class DataStoreConfig: - """Configuration for interacting with a data store.""" - - username: Annotated[str, Field(min_length=1)] - """ - Username credentials for data store connection. - """ - - password: Annotated[str, Field(min_length=1)] - """ - Password credentials for data store connection. - """ - - host: Annotated[str, Field(min_length=1)] = '127.0.0.1' - """ - Data store hostname (default: 127.0.0.1) - """ - - port: PositiveInt | None = None - """ - Data store port - """ - - -@dataclass -class DataStoreConfigMap: - """Configurations for any data stores the application should talk to.""" - - minio: list[DataStoreConfig] = field(default_factory=list) - """ - minio configurations - """ - - def get_missing_data_store_types(self) -> set[IntersectDataHandler]: - """Return a set of IntersectDataHandlers which will not be permitted, due to a configuration type missing. - - If all data configurations exist, returns an empty set - """ - missing = set() - if not self.minio: - missing.add(IntersectDataHandler.MINIO) - return missing diff --git a/src/intersect_sdk/constants.py b/src/intersect_sdk/constants.py deleted file mode 100644 index 2a1c499..0000000 --- a/src/intersect_sdk/constants.py +++ /dev/null @@ -1,28 +0,0 @@ -"""These are miscellaneous constants used in INTERSECT which SDK users may obtain value from knowing about.""" - -SYSTEM_OF_SYSTEM_REGEX = r'^[a-z0-9][-a-z0-9.]*[-a-z0-9]$' -""" -This is the regex used as a representation of a source/destination. -This is only needed externally if you are building a client, services can ignore this. - -NOTE: for future compatibility reasons, we are NOT specifying the number of "parts" (separated by a '.') in this regex. All that matters is that you don't start or end with a period, or start with a hyphen. -""" - -# see the internal schema file for full validation details -CAPABILITY_REGEX = r'^[a-zA-Z0-9]\w*$' -""" -This is the regex used for representing capabilities and event keys. Capabilities should start with an alphanumeric character, and not be longer than 255 characters. - -This regex applies to namespacing local to a Service, so does not have to be unique across the ecosystem. -""" - -MIME_TYPE_REGEX = r'\w+/[-+.\w]+' -""" -Regex used for validating Content Types. - -References can be found at: - -- https://www.iana.org/assignments/media-types/media-types.xhtml - -- https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types -""" diff --git a/src/intersect_sdk/core_definitions.py b/src/intersect_sdk/core_definitions.py deleted file mode 100644 index b9b0047..0000000 --- a/src/intersect_sdk/core_definitions.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Core enumerations and structures used throughout INTERSECT, for both client and service.""" - -from enum import Enum -from typing import Annotated - -from pydantic import Field - -from .constants import MIME_TYPE_REGEX - - -class IntersectDataHandler(Enum): - """What data transfer type do you want to use for handling the request/response? - - Default: MESSAGE - """ - - MESSAGE = 'MESSAGE' - MINIO = 'MINIO' - - -IntersectMimeType = Annotated[str, Field(pattern=MIME_TYPE_REGEX)] -""" -Special typing which represents a "Content-Type" value (i.e. `application/json`). - -The value should be a MIME type; references can be found at: - -- https://www.iana.org/assignments/media-types/media-types.xhtml - -- https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types - -These values are used to help map an output (or a part of an output) from an arbitrary microservice to an input (or a part of an input) of another arbitrary microservice. - -In general, mime types follow one of two rules: -- Complex types (types which cannot be represented as a sequence of bytes) MUST be represented by a Content-Type of 'application/json' (this is default). - - If a complex type has binary data in a field, this field MUST be Base64 encoded. - - You can mark the type with either 'pydantic.Base64Bytes', or if you need the value to be URL safe, 'pydantic.Base64UrlBytes'. You MUST also specify the "contentType" property, like this: - - ``` - field: Annotated[pydantic.Base64Bytes, pydantic.Field(json_schema_extra={"contentType": "image/png"})] - ``` - - INTERSECT is able to handle serialization/deserialization of 'application/json' types for you, though note that you will need to verify binary data (incoming and outgoing) yourself. INTERSECT will handle the Base64 encoding/decoding, though. - -- If your Content-Type value is ANYTHING ELSE, you MUST mark it as "bytes" . In this instance, INTERSECT will not base64-encode or base64-decode the value. -""" diff --git a/src/intersect_sdk/exceptions.py b/src/intersect_sdk/exceptions.py index 685ed25..3afa3a9 100644 --- a/src/intersect_sdk/exceptions.py +++ b/src/intersect_sdk/exceptions.py @@ -1,9 +1,9 @@ """Public exceptions API.""" -from ._internal.exceptions import IntersectError +from intersect_sdk_common import IntersectApplicationError -class IntersectCapabilityError(IntersectError): +class IntersectCapabilityError(IntersectApplicationError): """This is a marker for a special kind of Capability Exception. WARNING: USE THIS WITH CARE. When the SDK catches an Exception from Capability code, it has to decide whether to send information about the Exception in the message, or a generic "Application raised Exception" message. diff --git a/src/intersect_sdk/schema.py b/src/intersect_sdk/schema.py index 3f67463..29b6594 100644 --- a/src/intersect_sdk/schema.py +++ b/src/intersect_sdk/schema.py @@ -38,7 +38,7 @@ from .capability.universal_capability.universal_capability import IntersectSdkCoreCapability if TYPE_CHECKING: - from .config.shared import HierarchyConfig + from intersect_sdk_common import HierarchyConfig def get_schema_from_capability_implementations( diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index 8e80b84..99bb218 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -21,30 +21,37 @@ from typing import Any, Literal from uuid import UUID, uuid1, uuid3, uuid4 -from pydantic import ConfigDict, ValidationError, validate_call -from pydantic_core import PydanticSerializationError -from typing_extensions import Self, final - -from ._internal.control_plane.control_plane_manager import ( +from intersect_sdk_common import ( ControlPlaneManager, + DataPlaneManager, + HierarchyConfig, + IntersectApplicationError, + IntersectDataHandler, + IntersectError, ) -from ._internal.data_plane.data_plane_manager import DataPlaneManager -from ._internal.exceptions import IntersectApplicationError, IntersectError -from ._internal.function_metadata import FunctionMetadata -from ._internal.generic_serializer import GENERIC_MESSAGE_SERIALIZER -from ._internal.interfaces import IntersectEventObserver -from ._internal.logger import logger -from ._internal.messages.event import ( +from intersect_sdk_common.control_plane.messages.event import ( create_event_message_headers, validate_event_message_headers, ) -from ._internal.messages.lifecycle import LifecycleType, create_lifecycle_message_headers -from ._internal.messages.userspace import ( +from intersect_sdk_common.control_plane.messages.lifecycle import ( + LifecycleType, + create_lifecycle_message_headers, +) +from intersect_sdk_common.control_plane.messages.userspace import ( UserspaceMessageHeaders, create_userspace_message_headers, validate_userspace_message_headers, ) +from pydantic import ConfigDict, ValidationError, validate_call +from pydantic_core import PydanticSerializationError +from typing_extensions import Self, final + +from ._internal.function_metadata import FunctionMetadata +from ._internal.generic_serializer import GENERIC_MESSAGE_SERIALIZER +from ._internal.interfaces import IntersectEventObserver +from ._internal.logger import logger from ._internal.schema import get_schema_and_functions_from_capability_implementations +from ._internal.service_queue_name import get_service_queue_name from ._internal.stoppable_thread import StoppableThread from ._internal.utils import die, send_os_signal from ._internal.version_resolver import resolve_user_version @@ -54,8 +61,6 @@ INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, ) from .config.service import IntersectServiceConfig -from .config.shared import HierarchyConfig -from .core_definitions import IntersectDataHandler from .exceptions import IntersectCapabilityError from .service_callback_definitions import ( INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, @@ -298,10 +303,16 @@ def __init__( ) # our userspace queue should be able to survive shutdown self._control_plane_manager.add_subscription_channel( - self._service_channel_name, {self._handle_service_message}, persist=True + self._service_channel_name, + {self._handle_service_message}, + persist=True, + queue_name=get_service_queue_name(self._service_channel_name), ) self._control_plane_manager.add_subscription_channel( - self._client_channel_name, {self._handle_client_message}, persist=True + self._client_channel_name, + {self._handle_client_message}, + persist=True, + queue_name=get_service_queue_name(self._client_channel_name), ) def _get_capability(self, target: str) -> IntersectBaseCapabilityImplementation | None: @@ -563,10 +574,14 @@ def register_event( hierarchy = service.hierarchy_string('.') self._svc2svc_events[hierarchy][f'{capability_name}.{event_name}'].add(response_handler) + event_channel_name = ( + f'{service.hierarchy_string("/")}/events/{capability_name}/{event_name}' + ) self._control_plane_manager.add_subscription_channel( - f'{service.hierarchy_string("/")}/events/{capability_name}/{event_name}', + event_channel_name, {self._svc2svc_event_callback}, persist=True, + queue_name=get_service_queue_name(event_channel_name), ) def _svc2svc_event_callback( diff --git a/src/intersect_sdk/service_definitions.py b/src/intersect_sdk/service_definitions.py index ece3391..1e9e4fe 100644 --- a/src/intersect_sdk/service_definitions.py +++ b/src/intersect_sdk/service_definitions.py @@ -15,6 +15,7 @@ from collections.abc import Callable, Mapping, Sequence from typing import Any +from intersect_sdk_common import IntersectDataHandler, IntersectMimeType from pydantic import BaseModel, ConfigDict, field_validator, validate_call from typing_extensions import final @@ -27,7 +28,6 @@ SHUTDOWN_KEYS, STRICT_VALIDATION, ) -from .core_definitions import IntersectDataHandler, IntersectMimeType @final diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py index 4172cc9..7481e04 100644 --- a/src/intersect_sdk/shared_callback_definitions.py +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -2,11 +2,13 @@ from typing import Annotated, Any, TypeAlias +from intersect_sdk_common import ( + IntersectDataHandler, + IntersectMimeType, +) +from intersect_sdk_common.constants import CAPABILITY_REGEX, SYSTEM_OF_SYSTEM_REGEX from pydantic import BaseModel, ConfigDict, Field -from .constants import CAPABILITY_REGEX, SYSTEM_OF_SYSTEM_REGEX -from .core_definitions import IntersectDataHandler, IntersectMimeType - INTERSECT_JSON_VALUE: TypeAlias = ( list['INTERSECT_JSON_VALUE'] | dict[str, 'INTERSECT_JSON_VALUE'] diff --git a/tests/integration/test_return_type_mismatch.py b/tests/integration/test_return_type_mismatch.py index cf57a2a..2e4f498 100644 --- a/tests/integration/test_return_type_mismatch.py +++ b/tests/integration/test_return_type_mismatch.py @@ -10,6 +10,14 @@ import time from uuid import uuid4 +from intersect_sdk_common import ( + ControlPlaneManager, +) +from intersect_sdk_common.control_plane.messages.userspace import ( + create_userspace_message_headers, + validate_userspace_message_headers, +) + from intersect_sdk import ( ControlPlaneConfig, DataStoreConfig, @@ -20,13 +28,6 @@ IntersectServiceConfig, intersect_message, ) -from intersect_sdk._internal.control_plane.control_plane_manager import ( - ControlPlaneManager, -) -from intersect_sdk._internal.messages.userspace import ( - create_userspace_message_headers, - validate_userspace_message_headers, -) from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG # FIXTURE ############################# @@ -96,10 +97,17 @@ def test_call_user_function_with_invalid_payload() -> None: def userspace_msg_callback( payload: bytes, content_type: str, raw_headers: dict[str, str] ) -> None: - msg[0] = (payload, content_type, validate_userspace_message_headers(raw_headers)) + msg[0] = ( + payload, + content_type, + validate_userspace_message_headers(raw_headers), + ) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() diff --git a/tests/integration/test_service.py b/tests/integration/test_service.py index 81d2d43..9f99c23 100644 --- a/tests/integration/test_service.py +++ b/tests/integration/test_service.py @@ -11,6 +11,19 @@ import time from uuid import uuid4 +from intersect_sdk_common import ControlPlaneManager +from intersect_sdk_common.control_plane.messages.lifecycle import ( + validate_lifecycle_message_headers, +) +from intersect_sdk_common.control_plane.messages.userspace import ( + create_userspace_message_headers, + validate_userspace_message_headers, +) +from intersect_sdk_common.data_plane.minio_utils import ( + MinioPayload, + get_minio_object, +) + from intersect_sdk import ( ControlPlaneConfig, DataStoreConfig, @@ -19,16 +32,10 @@ IntersectService, IntersectServiceConfig, ) -from intersect_sdk._internal.control_plane.control_plane_manager import ControlPlaneManager -from intersect_sdk._internal.data_plane.minio_utils import MinioPayload, get_minio_object -from intersect_sdk._internal.messages.lifecycle import ( - validate_lifecycle_message_headers, -) -from intersect_sdk._internal.messages.userspace import ( - create_userspace_message_headers, - validate_userspace_message_headers, +from tests.fixtures.example_schema import ( + FAKE_HIERARCHY_CONFIG, + DummyCapabilityImplementation, ) -from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG, DummyCapabilityImplementation # HELPERS ############################# @@ -87,18 +94,18 @@ def test_control_plane_connections() -> None: time.sleep(1.0) assert intersect_service.is_connected() is False - channels = intersect_service._control_plane_manager.get_subscription_channels() + channels = list(intersect_service._control_plane_manager.get_all_subscription_channels()) # we have two channels (even if we're disconnected) ... assert len(channels) == 2 # ... and one callback function for each channel channel_keys = [] - for channel_key in iter(channels): + for channel_key, topic_handler in iter(channels): channel_keys.append(channel_key) - assert len(channels[channel_key].callbacks) == 1 + assert len(topic_handler.callbacks) == 1 for channel_key in channel_keys: intersect_service._control_plane_manager.remove_subscription_channel(channel_key) - assert len(intersect_service._control_plane_manager.get_subscription_channels()) == 0 + assert len(list(intersect_service._control_plane_manager.get_all_subscription_channels())) == 0 # normal test that the user function can be called @@ -118,7 +125,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -161,7 +171,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -200,7 +213,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -240,7 +256,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -283,7 +302,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -319,10 +341,19 @@ def test_exception_propagation() -> None: def userspace_msg_callback( payload: bytes, content_type: str, raw_headers: dict[str, str] ) -> None: - msg.append((payload, content_type, validate_userspace_message_headers(raw_headers))) + msg.append( + ( + payload, + content_type, + validate_userspace_message_headers(raw_headers), + ) + ) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -401,7 +432,10 @@ def userspace_msg_callback( msg[2] = validate_userspace_message_headers(raw_headers) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', + {userspace_msg_callback}, + False, + 'my_queue_name', ) message_interceptor.connect() intersect_service.startup() @@ -446,14 +480,26 @@ def lifecycle_msg_callback( payload: bytes, content_type: str, raw_headers: dict[str, str] ) -> None: messages.append( - (json.loads(payload), content_type, validate_lifecycle_message_headers(raw_headers)) + ( + json.loads(payload), + content_type, + validate_lifecycle_message_headers(raw_headers), + ) ) message_interceptor.add_subscription_channel( - 'test/test/test/test/test/lifecycle', {lifecycle_msg_callback}, False + 'test/test/test/test/test/lifecycle', + {lifecycle_msg_callback}, + False, + 'my_lifecycle_queue', ) # we do not really care about the userspace message response, but we'll listen to it to consume it - message_interceptor.add_subscription_channel('msg/msg/msg/msg/msg/response', set(), False) + message_interceptor.add_subscription_channel( + 'msg/msg/msg/msg/msg/response', + set(), + False, + 'my_lifecycle_queue', + ) message_interceptor.connect() # sleep a moment to make sure message_interceptor catches the startup message time.sleep(1.0) @@ -497,7 +543,10 @@ def lifecycle_msg_callback( # make sure both the universal capability and the test capability show up in the first two messages for i in range(2): - assert list(messages[i][0]['status'].keys()) == ['intersect_sdk', 'DummyCapability'] + assert list(messages[i][0]['status'].keys()) == [ + 'intersect_sdk', + 'DummyCapability', + ] # check the status values of the DummyCapability (the INTERSECT-SDK capability's status values are too variable) assert messages[0][0]['status']['DummyCapability'] == { diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 4c8118b..3e11d6f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -113,16 +113,18 @@ def test_missing_client_config(): errors = [{'type': e['type'], 'loc': e['loc']} for e in ex.value.errors()] assert len(errors) == 2 assert {'type': 'missing', 'loc': ('brokers',)} in errors - assert {'type': 'missing', 'loc': ('initial_message_event_config',)} in errors + assert { + 'type': 'missing', + 'loc': ('initial_message_event_config',), + } in errors def test_empty_client_config(): with pytest.raises(ValidationError) as ex: IntersectClientConfig(brokers=[], initial_message_event_config=IntersectClientCallback()) errors = [{'type': e['type'], 'loc': e['loc']} for e in ex.value.errors()] - assert len(errors) == 2 + assert len(errors) == 1 assert {'loc': ('brokers', 'list[ControlPlaneConfig]'), 'type': 'too_short'} - assert {'loc': ('brokers', "literal['discovery']"), 'type': 'literal_error'} in errors def test_missing_service_config(): @@ -143,10 +145,12 @@ def test_invalid_service_config(): status_interval=1, ) errors = [{'type': e['type'], 'loc': e['loc']} for e in ex.value.errors()] - assert len(errors) == 4 + assert len(errors) == 3 assert {'loc': ('hierarchy',), 'type': 'model_type'} in errors - assert {'loc': ('brokers', 'list[ControlPlaneConfig]'), 'type': 'too_short'} in errors - assert {'loc': ('brokers', "literal['discovery']"), 'type': 'literal_error'} in errors + assert { + 'loc': ('brokers',), + 'type': 'too_short', + } in errors assert {'loc': ('status_interval',), 'type': 'greater_than_equal'} in errors @@ -181,7 +185,10 @@ def test_valid_service_config(): minio=[ DataStoreConfig(username='idc', password='idc', host='idc', port='6'), DataStoreConfig( - username='idc', password='idc', host='somewhereelse.com', port='9999' + username='idc', + password='idc', + host='somewhereelse.com', + port='9999', ), ] ), diff --git a/tests/unit/test_event_message_headers.py b/tests/unit/test_event_message_headers.py deleted file mode 100644 index 1100a57..0000000 --- a/tests/unit/test_event_message_headers.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -event message header validation testing -""" - -import datetime -import uuid - -import pytest -from pydantic import ValidationError - -from intersect_sdk import IntersectDataHandler, version_string -from intersect_sdk._internal.messages.event import ( - create_event_message_headers, - validate_event_message_headers, -) - - -def test_valid_event_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'source': 'source', - 'sdk_version': '0.5.0', - 'created_at': '2024-01-19T20:21:14.045591Z', - 'capability_name': 'capability', - 'event_name': 'event', - } - headers = validate_event_message_headers(raw_headers) - # check defaults - assert headers.data_handler == IntersectDataHandler.MESSAGE - # check type serializations - assert isinstance(headers.message_id, uuid.UUID) - assert isinstance(headers.created_at, datetime.datetime) - assert headers.created_at.year == 2024 - - -def test_unusual_event_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'source': 'source.one', - 'sdk_version': '0.5.0', - 'created_at': '2024', - 'data_handler': 'MINIO', - 'capability_name': 'capability', - 'event_name': 'event', - } - headers = validate_event_message_headers(raw_headers) - assert headers.data_handler == IntersectDataHandler.MINIO - # even on strict mode, Pydantic can validate an integer as a string type, i.e. '"2024"' - it parses this as number of seconds since the Unix epoch - assert headers.created_at.year == 1970 - - -def test_missing_does_not_deserialize() -> None: - raw_headers: dict[str, str] = {} - with pytest.raises(ValidationError) as err: - validate_event_message_headers(raw_headers) - errors = err.value.errors() - assert len(errors) == 6 - assert all(e['type'] == 'missing' for e in errors) - locations = [e['loc'] for e in errors] - assert ('message_id',) in locations - assert ('source',) in locations - assert ('sdk_version',) in locations - assert ('created_at',) in locations - assert ('capability_name',) in locations - assert ('event_name',) in locations - - -def test_invalid_does_not_deserialize() -> None: - raw_headers = { - 'message_id': 'not_a_uuid', - 'source': '/', - 'sdk_version': '1.0.0+20130313144700', - 'created_at': '2024-01-19T20:21:14.045591', - 'data_handler': 'COBOL', - 'capability_name': 'b@d_ch@r$', - 'event_name': 'b@d_ch@r$', - } - with pytest.raises(ValidationError) as err: - validate_event_message_headers(raw_headers) - errors = [{'type': e['type'], 'loc': e['loc']} for e in err.value.errors()] - assert len(errors) == 7 - # value we have is a string, but not a UUID - assert {'type': 'uuid_parsing', 'loc': ('message_id',)} in errors - # '/' is not a valid character in a source string or destination string - assert {'type': 'string_pattern_mismatch', 'loc': ('source',)} in errors - # The datetime in the sample data is ALMOST valid, but lacks zone information! - assert {'type': 'timezone_aware', 'loc': ('created_at',)} in errors - # the sample versions here have build metadata or alpha release data in their strings, this is not valid for INTERSECT - assert {'type': 'string_pattern_mismatch', 'loc': ('sdk_version',)} in errors - # can't transpose these values into the enumerations - assert {'type': 'enum', 'loc': ('data_handler',)} in errors - # invalid capability / event name format - assert {'type': 'string_pattern_mismatch', 'loc': ('capability_name',)} in errors - assert {'type': 'string_pattern_mismatch', 'loc': ('event_name',)} in errors - - -def test_create_event_message() -> None: - msg = create_event_message_headers( - source='source', - data_handler=IntersectDataHandler.MESSAGE, - capability_name='capability', - event_name='event', - ) - - # make sure all values are serialized as strings, this is necessary for some protocols i.e. MQTT5 Properties - for value in msg.values(): - assert isinstance(value, str) - - # rule of UUID-4 generation - assert str(msg['message_id'])[14] == '4' - assert len(msg['message_id']) == 36 - # enforce UTC - assert msg['created_at'][-6:] == '+00:00' - # this should be lowercase for maximum language capability - assert msg['data_handler'] == 'MESSAGE' - assert msg['sdk_version'] == version_string - assert msg['source'] == 'source' - assert msg['capability_name'] == 'capability' - assert msg['event_name'] == 'event' diff --git a/tests/unit/test_lifecycle_message_headers.py b/tests/unit/test_lifecycle_message_headers.py deleted file mode 100644 index 4841375..0000000 --- a/tests/unit/test_lifecycle_message_headers.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -lifecycle message header validation testing -""" - -import datetime -import uuid - -import pytest -from pydantic import ValidationError - -from intersect_sdk import version_string -from intersect_sdk._internal.messages.lifecycle import ( - create_lifecycle_message_headers, - validate_lifecycle_message_headers, -) - - -def test_valid_lifecycle_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'source': 'source', - 'sdk_version': '0.5.0', - 'created_at': '2024-01-19T20:21:14.045591Z', - 'lifecycle_type': 'LCT_STARTUP', - } - headers = validate_lifecycle_message_headers(raw_headers) - assert isinstance(headers.message_id, uuid.UUID) - assert isinstance(headers.created_at, datetime.datetime) - assert headers.created_at.year == 2024 - - -def test_unusual_lifecycle_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'source': 'source.one', - 'sdk_version': '0.5.0', - 'created_at': '2024', - 'lifecycle_type': 'LCT_STARTUP', - } - headers = validate_lifecycle_message_headers(raw_headers) - # even on strict mode, Pydantic can validate an integer as a string type, i.e. '"2024"' - it parses this as number of seconds since the Unix epoch - assert headers.created_at.year == 1970 - - -def test_missing_does_not_deserialize() -> None: - raw_headers: dict[str, str] = {} - with pytest.raises(ValidationError) as err: - validate_lifecycle_message_headers(raw_headers) - errors = err.value.errors() - assert len(errors) == 5 - assert all(e['type'] == 'missing' for e in errors) - locations = [e['loc'] for e in errors] - assert ('message_id',) in locations - assert ('source',) in locations - assert ('lifecycle_type',) in locations - assert ('sdk_version',) in locations - assert ('created_at',) in locations - - -def test_invalid_does_not_deserialize() -> None: - raw_headers = { - 'message_id': 'not_a_uuid', - 'source': '/', - 'sdk_version': '1.0.0+20130313144700', - 'created_at': '2024-01-19T20:21:14.045591', - 'lifecycle_type': 'NOT_A_LIFECYCLE_TYPE', - } - with pytest.raises(ValidationError) as err: - validate_lifecycle_message_headers(raw_headers) - errors = [{'type': e['type'], 'loc': e['loc']} for e in err.value.errors()] - assert len(errors) == 5 - # value we have is a string, but not a UUID - assert {'type': 'uuid_parsing', 'loc': ('message_id',)} in errors - # '/' is not a valid character in a source string - assert {'type': 'string_pattern_mismatch', 'loc': ('source',)} in errors - # The datetime in the sample data is ALMOST valid, but lacks zone information! - assert {'type': 'timezone_aware', 'loc': ('created_at',)} in errors - # the sample versions here have build metadata or alpha release data in their strings, this is not valid for INTERSECT - assert {'type': 'string_pattern_mismatch', 'loc': ('sdk_version',)} in errors - # can't transpose these values into the enumerations - assert {'type': 'literal_error', 'loc': ('lifecycle_type',)} in errors - - -def test_create_lifecycle_message() -> None: - msg = create_lifecycle_message_headers( - source='source', - lifecycle_type='LCT_SHUTDOWN', - ) - - # make sure all values are serialized as strings, this is necessary for some protocols i.e. MQTT5 Properties - for value in msg.values(): - assert isinstance(value, str) - - # rule of UUID-4 generation - assert msg['message_id'][14] == '4' - assert len(msg['message_id']) == 36 - # enforce UTC - assert msg['created_at'][-6:] == '+00:00' - assert msg['lifecycle_type'] == 'LCT_SHUTDOWN' - assert msg['sdk_version'] == version_string - assert msg['source'] == 'source' diff --git a/tests/unit/test_userspace_message_headers.py b/tests/unit/test_userspace_message_headers.py deleted file mode 100644 index 6bfa47c..0000000 --- a/tests/unit/test_userspace_message_headers.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -userspace message header validation testing -""" - -import datetime -import uuid - -import pytest -from pydantic import ValidationError - -from intersect_sdk import IntersectDataHandler, version_string -from intersect_sdk._internal.messages.userspace import ( - create_userspace_message_headers, - validate_userspace_message_headers, -) - - -def test_valid_userspace_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'campaign_id': 'dd88a2c9-7e47-409f-82c5-ef49914ae141', - 'request_id': 'ee88a2c9-7e47-409f-82c5-ef49914ae142', - 'operation_id': 'operation', - 'source': 'source', - 'destination': 'destination', - 'sdk_version': '0.5.0', - 'created_at': '2024-01-19T20:21:14.045591Z', - } - headers = validate_userspace_message_headers(raw_headers) - # check defaults - assert headers.data_handler == IntersectDataHandler.MESSAGE - assert headers.has_error is False - # check type serializations - assert isinstance(headers.message_id, uuid.UUID) - assert isinstance(headers.created_at, datetime.datetime) - assert headers.created_at.year == 2024 - - -def test_unusual_userspace_message_deserializes() -> None: - raw_headers = { - 'message_id': 'cc88a2c9-7e47-409f-82c5-ef49914ae140', - 'campaign_id': 'dd88a2c9-7e47-409f-82c5-ef49914ae141', - 'request_id': 'ee88a2c9-7e47-409f-82c5-ef49914ae142', - 'operation_id': 'operation', - 'source': 'source.one', - 'destination': 'destination.two', - 'sdk_version': '0.5.0', - 'created_at': '2024', - 'data_handler': 'MINIO', - 'has_error': 'true', - } - headers = validate_userspace_message_headers(raw_headers) - assert headers.data_handler == IntersectDataHandler.MINIO - assert headers.has_error is True - # even on strict mode, Pydantic can validate an integer as a string type, i.e. '"2024"' - it parses this as number of seconds since the Unix epoch - assert headers.created_at.year == 1970 - - -def test_missing_does_not_deserialize() -> None: - raw_headers: dict[str, str] = {} - with pytest.raises(ValidationError) as err: - validate_userspace_message_headers(raw_headers) - errors = err.value.errors() - assert len(errors) == 8 - assert all(e['type'] == 'missing' for e in errors) - locations = [e['loc'] for e in errors] - assert ('message_id',) in locations - assert ('campaign_id',) in locations - assert ('request_id',) in locations - assert ('operation_id',) in locations - assert ('source',) in locations - assert ('destination',) in locations - assert ('sdk_version',) in locations - assert ('created_at',) in locations - - -def test_invalid_does_not_deserialize() -> None: - raw_headers = { - 'message_id': 'not_a_uuid', - 'campaign_id': 'also_not_a_uuid', - 'request_id': 'definitely_not_a_uuid', - 'operation_id': 1, - 'source': '/', - 'destination': '/', - 'sdk_version': '1.0.0+20130313144700', - 'created_at': '2024-01-19T20:21:14.045591', - 'data_handler': 'COBOL', - 'has_error': 'I_AM_NOT_A_BOOLEAN', - } - with pytest.raises(ValidationError) as err: - validate_userspace_message_headers(raw_headers) - errors = [{'type': e['type'], 'loc': e['loc']} for e in err.value.errors()] - assert len(errors) == 10 - # value we have is a string, but not a UUID - assert {'type': 'uuid_parsing', 'loc': ('message_id',)} in errors - assert {'type': 'uuid_parsing', 'loc': ('request_id',)} in errors - assert {'type': 'uuid_parsing', 'loc': ('campaign_id',)} in errors - assert {'type': 'string_type', 'loc': ('operation_id',)} in errors - # '/' is not a valid character in a source string or destination string - assert {'type': 'string_pattern_mismatch', 'loc': ('source',)} in errors - assert {'type': 'string_pattern_mismatch', 'loc': ('destination',)} in errors - # The datetime in the sample data is ALMOST valid, but lacks zone information! - assert {'type': 'timezone_aware', 'loc': ('created_at',)} in errors - # the sample versions here have build metadata or alpha release data in their strings, this is not valid for INTERSECT - assert {'type': 'string_pattern_mismatch', 'loc': ('sdk_version',)} in errors - # can't transpose these values into the enumerations - assert {'type': 'enum', 'loc': ('data_handler',)} in errors - - -def test_create_userspace_message() -> None: - msg = create_userspace_message_headers( - source='source', - destination='destination', - operation_id='operation', - data_handler=IntersectDataHandler.MESSAGE, - request_id=uuid.uuid4(), - campaign_id=uuid.uuid4(), - ) - - # make sure all values are serialized as strings, this is necessary for some protocols i.e. MQTT5 Properties - for value in msg.values(): - assert isinstance(value, str) - - # rule of UUID-4 generation - assert str(msg['message_id'])[14] == '4' - assert len(msg['message_id']) == 36 - assert str(msg['request_id'])[14] == '4' - assert len(msg['request_id']) == 36 - assert str(msg['campaign_id'])[14] == '4' - assert len(msg['campaign_id']) == 36 - # enforce UTC - assert msg['created_at'][-6:] == '+00:00' - # this should be lowercase for maximum language capability - assert msg['has_error'] == 'false' - assert msg['operation_id'] == 'operation' - assert msg['data_handler'] == 'MESSAGE' - assert msg['sdk_version'] == version_string - assert msg['source'] == 'source' - assert msg['destination'] == 'destination' diff --git a/uv.lock b/uv.lock index 29a342b..54f5b76 100644 --- a/uv.lock +++ b/uv.lock @@ -571,11 +571,11 @@ wheels = [ [[package]] name = "idna" -version = "3.11" +version = "3.18" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/6f/6d/0703ccc57f3a7233505399edb88de3cbd678da106337b9fcde432b65ed60/idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902", size = 194582, upload-time = "2025-10-12T14:55:20.501Z" } +sdist = { url = "https://files.pythonhosted.org/packages/cd/63/9496c57188a2ee585e0f1db071d75089a11e98aa86eb99d9d7618fc1edce/idna-3.18.tar.gz", hash = "sha256:ffb385a7e039654cef1ab9ef32c6fafe283c0c0467bba1d9029738ce4a14a848", size = 196711, upload-time = "2026-06-02T14:34:07.794Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" }, + { url = "https://files.pythonhosted.org/packages/1e/5e/d4e9f1a599fb8e573b7b87160658329fbf28d19eac2718f51fc3def3aa5a/idna-3.18-py3-none-any.whl", hash = "sha256:7f952cbe720b688055e3f87de14f5c3e5fdaa8bc3928985c4077ca689de849a2", size = 65455, upload-time = "2026-06-02T14:34:06.319Z" }, ] [[package]] @@ -601,13 +601,10 @@ name = "intersect-sdk" version = "0.9.0" source = { editable = "." } dependencies = [ + { name = "intersect-sdk-common" }, { name = "jsonschema", extra = ["format-nongpl"] }, - { name = "minio" }, - { name = "paho-mqtt" }, - { name = "pika" }, { name = "psutil" }, { name = "pydantic" }, - { name = "retrying" }, ] [package.optional-dependencies] @@ -632,13 +629,10 @@ dev = [ [package.metadata] requires-dist = [ { name = "furo", marker = "extra == 'docs'", specifier = ">=2023.3.27" }, + { name = "intersect-sdk-common", specifier = ">=0.9.5,<0.10.0" }, { name = "jsonschema", extras = ["format-nongpl"], specifier = ">=4.21.1" }, - { name = "minio", specifier = ">=7.2.3" }, - { name = "paho-mqtt", specifier = ">=2.1.0,<3.0.0" }, - { name = "pika", specifier = ">=1.3.2,<2.0.0" }, { name = "psutil", specifier = ">=7.0.0" }, { name = "pydantic", specifier = ">=2.7.0" }, - { name = "retrying", specifier = ">=1.3.4,<2.0.0" }, { name = "sphinx", marker = "extra == 'docs'", specifier = ">=5.3.0" }, ] provides-extras = ["docs"] @@ -654,6 +648,21 @@ dev = [ { name = "ruff", specifier = "==0.12.7" }, ] +[[package]] +name = "intersect-sdk-common" +version = "0.9.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "minio" }, + { name = "paho-mqtt" }, + { name = "pika" }, + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/39/4d/283e91e55fb23a7f95c9c5fe9fe95826bc9691ea7ea345efe338638230c0/intersect_sdk_common-0.9.5.tar.gz", hash = "sha256:d36a099f4b53b2e61e8db8e218ac011ae887a00d3975484f74bd59e11f01c646", size = 31602, upload-time = "2026-05-28T21:57:14.573Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d5/97/1128c72328d79656619a89c9e16e4c3b785714d97013deba9cba228307fe/intersect_sdk_common-0.9.5-py3-none-any.whl", hash = "sha256:48d91f06d3f217b4cf08425ebd47cf49ab2a0423728547c8669c6ec2840d05f7", size = 44014, upload-time = "2026-05-28T21:57:15.397Z" }, +] + [[package]] name = "isoduration" version = "20.11.0" @@ -1263,16 +1272,16 @@ wheels = [ [[package]] name = "pygments" -version = "2.19.2" +version = "2.20.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, + { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" }, ] [[package]] name = "pytest" -version = "9.0.2" +version = "9.0.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "colorama", marker = "sys_platform == 'win32'" }, @@ -1283,9 +1292,9 @@ dependencies = [ { name = "pygments" }, { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, + { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, ] [[package]] @@ -1407,7 +1416,7 @@ wheels = [ [[package]] name = "requests" -version = "2.32.5" +version = "2.34.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "certifi" }, @@ -1415,18 +1424,9 @@ dependencies = [ { name = "idna" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c9/74/b3ff8e6c8446842c3f5c837e9c3dfcfe2018ea6ecef224c710c85ef728f4/requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf", size = 134517, upload-time = "2025-08-18T20:46:02.573Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, -] - -[[package]] -name = "retrying" -version = "1.4.2" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c8/5a/b17e1e257d3e6f2e7758930e1256832c9ddd576f8631781e6a072914befa/retrying-1.4.2.tar.gz", hash = "sha256:d102e75d53d8d30b88562d45361d6c6c934da06fab31bd81c0420acb97a8ba39", size = 11411, upload-time = "2025-08-03T03:35:25.189Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ac/c3/e2a2b89f2d3e2179abd6d00ebd70bff6273f37fb3e0cc209f48b39d00cbf/requests-2.34.2.tar.gz", hash = "sha256:f288924cae4e29463698d6d60bc6a4da69c89185ad1e0bcc4104f584e960b9ed", size = 142856, upload-time = "2026-05-14T19:25:27.735Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/67/f3/6cd296376653270ac1b423bb30bd70942d9916b6978c6f40472d6ac038e7/retrying-1.4.2-py3-none-any.whl", hash = "sha256:bbc004aeb542a74f3569aeddf42a2516efefcdaff90df0eb38fbfbf19f179f59", size = 10859, upload-time = "2025-08-03T03:35:23.829Z" }, + { url = "https://files.pythonhosted.org/packages/a0/f4/c67b0b3f1b9245e8d266f0f112c500d50e5b4e83cb6f3b71b6528104182a/requests-2.34.2-py3-none-any.whl", hash = "sha256:2a0d60c172f83ac6ab31e4554906c0f3b3588d37b5cb939b1c061f4907e278e0", size = 73075, upload-time = "2026-05-14T19:25:26.443Z" }, ] [[package]] @@ -1902,11 +1902,11 @@ wheels = [ [[package]] name = "urllib3" -version = "2.6.3" +version = "2.7.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } +sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, + { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, ] [[package]]