From a322a20fa0a35c09f75f14c4f81ca089e4ded43f Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Sun, 26 Apr 2026 17:56:09 +0200 Subject: [PATCH 01/11] feat(gateway,#385): nest x-medkit vendor fields in SOVD payloads Per #385, vendor extensions on SOVD-standard endpoint payloads should be nested under a single `x-medkit` object instead of flat top-level keys. This was already the convention for most endpoints (faults, data, ops, bulk-data, ...) but two sites still emitted flat keys: - GET /updates/{id}/status emitted `x-medkit-phase` at the top level - GET /apps|components/{id}/configurations emitted `x-medkit-source` on each item and `x-medkit-source`/`x-medkit-node` on each parameter Both now emit a nested `x-medkit` object. The OpenAPI schemas for UpdateStatus and ConfigurationMetaData declare the nested object explicitly so generated clients (ros2_medkit_clients) get accurate typing. Adds an integration test (test_openapi_response_drift) that validates runtime GET responses against the OpenAPI response schema served at /api/v1/docs. The original drift slipped past every typed client because the schema never declared `x-medkit-phase` even though the handler emitted it; this test catches that class of regression at runtime. The compile-time contract (template-based emitter-to-schema link) lands later under #338. --- docs/api/rest.rst | 16 +- .../updates/update_types.hpp | 8 +- .../src/http/handlers/config_handlers.cpp | 5 +- .../src/openapi/schema_builder.cpp | 33 ++- .../src/updates/update_manager.cpp | 4 +- .../test/test_update_manager.cpp | 10 +- src/ros2_medkit_integration_tests/package.xml | 1 + .../test_openapi_response_drift.test.py | 279 ++++++++++++++++++ 8 files changed, 328 insertions(+), 28 deletions(-) create mode 100644 src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py diff --git a/docs/api/rest.rst b/docs/api/rest.rst index 8f0c093ef..ac33489b3 100644 --- a/docs/api/rest.rst +++ b/docs/api/rest.rst @@ -1154,25 +1154,27 @@ Without such a plugin, all endpoints return ``501 Not Implemented``. { "status": "inProgress", - "x-medkit-phase": "preparing", "progress": 65, "sub_progress": [ {"name": "download", "progress": 100}, {"name": "verify", "progress": 30} - ] + ], + "x-medkit": { + "phase": "preparing" + } } **Status values:** ``pending``, ``inProgress``, ``completed``, ``failed`` A successful ``POST /api/v1/updates`` seeds a ``pending`` status for the package, - so this endpoint returns ``200`` with ``{"status": "pending"}`` immediately after - registration, before any ``prepare`` or ``execute`` call. + so this endpoint returns ``200`` with ``{"status": "pending", "x-medkit": {"phase": "none"}}`` + immediately after registration, before any ``prepare`` or ``execute`` call. - **Vendor extension ``x-medkit-phase``** (non-standard, SOVD-compatible): + **Vendor extension ``x-medkit.phase``** (non-standard, SOVD-compatible): ``none``, ``preparing``, ``prepared``, ``executing``, ``executed``, ``failed``, ``deleting``. Differentiates "prepare completed" (``status`` - ``completed`` + ``x-medkit-phase`` ``prepared``) from "execute completed" - (``status`` ``completed`` + ``x-medkit-phase`` ``executed``). Clients that + ``completed`` + ``x-medkit.phase`` ``prepared``) from "execute completed" + (``status`` ``completed`` + ``x-medkit.phase`` ``executed``). Clients that only consume the standard ``status`` field continue to work unchanged. When ``status`` is ``failed``, an ``error`` object is included: diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp index 9a233b74d..bb9b4b6bf 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/updates/update_types.hpp @@ -33,7 +33,7 @@ struct UpdateFilter { /// Status of an update operation (SOVD-compliant enum values) enum class UpdateStatus { Pending, InProgress, Completed, Failed }; -/// Lifecycle phase - exposed as SOVD vendor extension `x-medkit-phase`. +/// Lifecycle phase - exposed as SOVD vendor extension `x-medkit.phase`. /// Differentiates "prepare completed" from "execute completed" which share /// status=completed in the SOVD standard enum. enum class UpdatePhase { None, Preparing, Prepared, Executing, Executed, Failed, Deleting }; @@ -94,7 +94,7 @@ class UpdateProgressReporter { std::mutex & mutex_; }; -/// Serialize UpdatePhase to its `x-medkit-phase` string value. +/// Serialize UpdatePhase to its `x-medkit.phase` string value. inline const char * update_phase_to_string(UpdatePhase phase) { switch (phase) { case UpdatePhase::None: @@ -116,7 +116,7 @@ inline const char * update_phase_to_string(UpdatePhase phase) { } /// Serialize UpdateStatusInfo to SOVD-compliant JSON. -/// Adds vendor extension `x-medkit-phase` to distinguish prepare-completed from +/// Adds vendor extension `x-medkit.phase` to distinguish prepare-completed from /// execute-completed (both report SOVD status=completed). inline nlohmann::json update_status_to_json(const UpdateStatusInfo & status) { nlohmann::json j; @@ -134,7 +134,7 @@ inline nlohmann::json update_status_to_json(const UpdateStatusInfo & status) { j["status"] = "failed"; break; } - j["x-medkit-phase"] = update_phase_to_string(status.phase); + j["x-medkit"] = {{"phase", update_phase_to_string(status.phase)}}; if (status.progress.has_value()) { j["progress"] = *status.progress; } diff --git a/src/ros2_medkit_gateway/src/http/handlers/config_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/config_handlers.cpp index 503372323..8b6d034fb 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/config_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/config_handlers.cpp @@ -311,15 +311,14 @@ void ConfigHandlers::handle_list_configurations(const httplib::Request & req, ht // Add source info for aggregated configurations if (agg_configs.is_aggregated) { - config_meta["x-medkit-source"] = node_info.app_id; + config_meta["x-medkit"] = {{"source", node_info.app_id}}; } items.push_back(config_meta); // Also track full parameter info json param_with_source = param; - param_with_source["x-medkit-source"] = node_info.app_id; - param_with_source["x-medkit-node"] = node_info.node_fqn; + param_with_source["x-medkit"] = {{"source", node_info.app_id}, {"node", node_info.node_fqn}}; all_parameters.push_back(param_with_source); } } diff --git a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp index 1b830982c..5d35e42ff 100644 --- a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp +++ b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp @@ -160,12 +160,20 @@ nlohmann::json SchemaBuilder::items_wrapper(const nlohmann::json & item_schema) } nlohmann::json SchemaBuilder::configuration_metadata_schema() { - return {{"type", "object"}, + return { + {"type", "object"}, + {"properties", + {{"id", {{"type", "string"}, {"description", "Configuration parameter ID"}}}, + {"name", {{"type", "string"}, {"description", "Parameter name"}}}, + {"type", {{"type", "string"}, {"description", "Parameter type (e.g. 'parameter')"}}}, + {"x-medkit", + {{"type", "object"}, + {"description", "Vendor extensions (medkit)"}, {"properties", - {{"id", {{"type", "string"}, {"description", "Configuration parameter ID"}}}, - {"name", {{"type", "string"}, {"description", "Parameter name"}}}, - {"type", {{"type", "string"}, {"description", "Parameter type (e.g. 'parameter')"}}}}}, - {"required", {"id", "name", "type"}}}; + {{"source", + {{"type", "string"}, + {"description", "App ID that owns this parameter (only present in aggregated configurations)"}}}}}}}}}, + {"required", {"id", "name", "type"}}}; } nlohmann::json SchemaBuilder::configuration_read_value_schema() { @@ -538,13 +546,24 @@ nlohmann::json SchemaBuilder::update_status_schema() { {"properties", {{"name", {{"type", "string"}}}, {"progress", {{"type", "number"}}}}}, {"required", {"name", "progress"}}}; + nlohmann::json x_medkit_schema = { + {"type", "object"}, + {"description", "Vendor extensions (medkit)"}, + {"properties", + {{"phase", + {{"type", "string"}, + {"enum", {"none", "preparing", "prepared", "executing", "executed", "failed", "deleting"}}, + {"description", "Internal lifecycle phase, distinguishes prepare-completed from execute-completed"}}}}}, + {"required", {"phase"}}}; + return {{"type", "object"}, {"properties", {{"status", {{"type", "string"}, {"enum", {"pending", "inProgress", "completed", "failed"}}}}, {"progress", {{"type", "number"}}}, {"sub_progress", {{"type", "array"}, {"items", sub_progress_schema}}}, - {"error", {{"type", "string"}}}}}, - {"required", {"status"}}}; + {"error", {{"type", "string"}}}, + {"x-medkit", x_medkit_schema}}}, + {"required", {"status", "x-medkit"}}}; } nlohmann::json SchemaBuilder::log_configuration_schema() { diff --git a/src/ros2_medkit_gateway/src/updates/update_manager.cpp b/src/ros2_medkit_gateway/src/updates/update_manager.cpp index 631894288..e0a888ea7 100644 --- a/src/ros2_medkit_gateway/src/updates/update_manager.cpp +++ b/src/ros2_medkit_gateway/src/updates/update_manager.cpp @@ -160,9 +160,9 @@ tl::expected UpdateManager::delete_update(const std::string & if (it != states_.end() && it->second) { // Mark the package as failed consistently across all surfaces // exposed via GET /updates/{id}/status: the SOVD `status` field, - // the `x-medkit-phase` vendor extension, and `error_message`. + // the `x-medkit.phase` vendor extension, and `error_message`. // Without updating all three, clients see payloads like - // {status:pending, x-medkit-phase:failed} with no error context. + // {status:pending, "x-medkit":{phase:failed}} with no error context. it->second->status.status = UpdateStatus::Failed; it->second->status.phase = UpdatePhase::Failed; it->second->status.error_message = err.message; diff --git a/src/ros2_medkit_gateway/test/test_update_manager.cpp b/src/ros2_medkit_gateway/test/test_update_manager.cpp index f2e619c68..59f602d7c 100644 --- a/src/ros2_medkit_gateway/test/test_update_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_update_manager.cpp @@ -635,7 +635,7 @@ TEST(UpdateManagerFailureTest, DeleteRollbackUpdatesStatusPhaseAndErrorMessage) // When delete_update fails for a known package, the rollback must leave // status=Failed, phase=Failed, and error_message populated so the status // endpoint does not emit an inconsistent payload like - // {status:pending, x-medkit-phase:failed} without any error details. + // {status:pending, "x-medkit":{phase:failed}} without any error details. auto backend = std::make_unique(); auto manager = std::make_unique(); manager->set_backend(backend.get()); @@ -656,7 +656,7 @@ TEST(UpdateManagerFailureTest, DeleteRollbackUpdatesStatusPhaseAndErrorMessage) // Serialized payload reflects the rollback end-to-end. auto j = update_status_to_json(*status); EXPECT_EQ(j["status"], "failed"); - EXPECT_EQ(j["x-medkit-phase"], "failed"); + EXPECT_EQ(j["x-medkit"]["phase"], "failed"); EXPECT_EQ(j["error"], "backend delete exploded"); manager.reset(); @@ -667,14 +667,14 @@ TEST(UpdateStatusToJson, SerializesPhaseAsVendorExtension) { UpdateStatusInfo status{UpdateStatus::Completed, UpdatePhase::Prepared, std::nullopt, std::nullopt, std::nullopt}; auto j = update_status_to_json(status); EXPECT_EQ(j["status"], "completed"); - EXPECT_EQ(j["x-medkit-phase"], "prepared"); + EXPECT_EQ(j["x-medkit"]["phase"], "prepared"); } TEST(UpdateStatusToJson, ExposesExecutedPhaseForTerminalCompletion) { UpdateStatusInfo status{UpdateStatus::Completed, UpdatePhase::Executed, 100, std::nullopt, std::nullopt}; auto j = update_status_to_json(status); EXPECT_EQ(j["status"], "completed"); - EXPECT_EQ(j["x-medkit-phase"], "executed"); + EXPECT_EQ(j["x-medkit"]["phase"], "executed"); EXPECT_EQ(j["progress"], 100); } @@ -682,5 +682,5 @@ TEST(UpdateStatusToJson, EmitsNonePhaseForFreshlyRegistered) { UpdateStatusInfo status; auto j = update_status_to_json(status); EXPECT_EQ(j["status"], "pending"); - EXPECT_EQ(j["x-medkit-phase"], "none"); + EXPECT_EQ(j["x-medkit"]["phase"], "none"); } diff --git a/src/ros2_medkit_integration_tests/package.xml b/src/ros2_medkit_integration_tests/package.xml index b37c168fb..bbeecc8d5 100644 --- a/src/ros2_medkit_integration_tests/package.xml +++ b/src/ros2_medkit_integration_tests/package.xml @@ -29,6 +29,7 @@ launch_ros ament_index_python python3-requests + python3-jsonschema ros2_medkit_gateway ros2_medkit_fault_manager ros2_medkit_linux_introspection diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py new file mode 100644 index 000000000..842c0d145 --- /dev/null +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +# Copyright 2026 bburda +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenAPI response-schema drift test. + +For every GET endpoint declared in the runtime OpenAPI spec served at +GET /docs, this test fetches a real response from the live gateway and +validates it against the response schema declared in the same spec for the +returned status code. A mismatch means handler output and declared schema +have drifted apart - either the handler emits a field the schema does not +declare (the failure mode that caused issue #385's `x-medkit-phase` to +silently slip past every typed client), or the schema declares a required +field the handler does not emit. + +The test is intentionally narrow: only response *bodies* are validated, +only against the 200 schema, and only on endpoints that return JSON. It +does not check error envelope shapes, status code coverage, or request +schemas - those are covered by other tests. + +The full structural contract (compile-time link from emitter to schema) +will be solved by issue #338. This test is the runtime stop-gap until +that lands - if it fails, fix the schema or the handler; do not silence +the test. +""" + +import os +import re +import unittest + +from ament_index_python.packages import get_package_prefix +from jsonschema.validators import Draft202012Validator +import launch_testing +import requests + +from ros2_medkit_test_utils.constants import ALLOWED_EXIT_CODES +from ros2_medkit_test_utils.gateway_test_case import GatewayTestCase +from ros2_medkit_test_utils.launch_helpers import create_test_launch + + +def _test_update_backend_path(): + pkg_prefix = get_package_prefix('ros2_medkit_gateway') + return os.path.join( + pkg_prefix, 'lib', 'ros2_medkit_gateway', 'libtest_update_backend.so' + ) + + +def generate_test_description(): + return create_test_launch( + demo_nodes=['temp_sensor', 'calibration'], + gateway_params={ + 'updates.enabled': True, + 'plugins': ['test_update_backend'], + 'plugins.test_update_backend.path': _test_update_backend_path(), + }, + fault_manager=True, + ) + + +# Path placeholders -> entity collection used for substitution. +_ENTITY_PARAM_MAP = { + 'area_id': 'areas', + 'component_id': 'components', + 'app_id': 'apps', + 'function_id': 'functions', +} + + +def _inline_refs(schema, schemas, seen=None): + """Recursively inline $refs into a schema so jsonschema can validate it. + + The runtime spec uses $refs that point at #/components/schemas/, but + Draft202012Validator constructed without a registry will not follow them. + Recursive types are guarded via ``seen`` to break cycles. + """ + if seen is None: + seen = set() + if isinstance(schema, dict): + if '$ref' in schema: + ref = schema['$ref'] + if ref in seen: + return {} + if ref.startswith('#/components/schemas/'): + name = ref.rsplit('/', 1)[-1] + target = schemas.get(name) + if target is None: + return {} + return _inline_refs(target, schemas, seen | {ref}) + return {k: _inline_refs(v, schemas, seen) for k, v in schema.items()} + if isinstance(schema, list): + return [_inline_refs(item, schemas, seen) for item in schema] + return schema + + +def _is_sse_endpoint(operation): + """Detect SSE streaming endpoints that would hang on a plain GET.""" + summary = (operation.get('summary') or '') + (operation.get('description') or '') + return 'SSE' in summary or 'stream' in summary.lower() + + +def _substitute_path_params(path, entity_map, resource_id='test_id'): + def replacer(match): + param = match.group(1) + if param in _ENTITY_PARAM_MAP: + return entity_map.get(_ENTITY_PARAM_MAP[param], 'test_entity') + return resource_id + + return re.sub(r'\{([^}]+)\}', replacer, path) + + +class TestOpenApiResponseDrift(GatewayTestCase): + """Catches handler-vs-spec drift on GET response bodies.""" + + MIN_EXPECTED_APPS = 1 + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Discover real entity IDs to use in path substitution. + cls._entity_map = {} + for etype in ('areas', 'components', 'apps', 'functions'): + resp = requests.get(f'{cls.BASE_URL}/{etype}', timeout=5) + resp.raise_for_status() + items = resp.json().get('items', []) + if items: + cls._entity_map[etype] = items[0].get('id') + + # Register and prepare a test update package so /updates/{id}/status + # returns an interesting payload (status + x-medkit.phase). + cls._update_pkg_id = 'drift-test-pkg' + # Best-effort cleanup of any leftover from a previous interrupted run. + requests.delete( + f'{cls.BASE_URL}/updates/{cls._update_pkg_id}', timeout=5 + ) + post = requests.post( + f'{cls.BASE_URL}/updates', + json={ + 'id': cls._update_pkg_id, + 'update_name': 'Drift test package', + 'automated': False, + 'origins': ['proximity'], + }, + timeout=5, + ) + post.raise_for_status() + prepare = requests.put( + f'{cls.BASE_URL}/updates/{cls._update_pkg_id}/prepare', timeout=5 + ) + prepare.raise_for_status() + + @classmethod + def tearDownClass(cls): + requests.delete( + f'{cls.BASE_URL}/updates/{cls._update_pkg_id}', timeout=5 + ) + super().tearDownClass() + + def _fetch_spec(self): + return self.poll_endpoint_until( + '/docs', lambda d: d if d.get('paths') else None + ) + + def _validate_response(self, schema, body, schemas): + """Validate body against schema with $refs inlined.""" + inlined = _inline_refs(schema, schemas) + validator = Draft202012Validator(inlined) + return sorted(validator.iter_errors(body), key=lambda e: e.path) + + def _resource_for_path(self, path): + """Pick a resource_id that makes a given path return 200 when possible. + + Used for paths like /updates/{id}/status where 'test_id' would 404. + """ + if path.startswith('/updates/{'): + return self._update_pkg_id + return 'test_id' + + def test_get_responses_match_declared_schema(self): + """Every 200 response must validate against its declared schema. + + Iterates GET endpoints from /docs, fetches each, and validates + bodies against the response schema declared for status 200. Schema + drift (handler emits a field schema does not declare as required, + or vice versa) raises an error. + + @verifies REQ_INTEROP_002 + """ + spec = self._fetch_spec() + schemas = spec.get('components', {}).get('schemas', {}) + violations = [] + validated = 0 + + for path, path_item in spec.get('paths', {}).items(): + operation = path_item.get('get') + if not operation or _is_sse_endpoint(operation): + continue + + response_def = operation.get('responses', {}).get('200') + if not response_def: + continue + schema = ( + response_def.get('content', {}) + .get('application/json', {}) + .get('schema') + ) + if not schema: + continue + + uri = _substitute_path_params( + path, self._entity_map, self._resource_for_path(path) + ) + resp = requests.get(f'{self.BASE_URL}{uri}', timeout=10) + + # Non-200 responses fall outside this drift check (handler may + # legitimately return 404/501 for the chosen path params); the + # callability test covers status code shape separately. + if resp.status_code != 200: + continue + if 'application/json' not in resp.headers.get('content-type', ''): + continue + + body = resp.json() + errors = self._validate_response(schema, body, schemas) + if errors: + detail = '; '.join( + f'{".".join(str(p) for p in e.absolute_path) or ""}: {e.message}' + for e in errors[:5] + ) + violations.append(f'GET {uri}: schema drift: {detail}') + else: + validated += 1 + + self.assertGreater( + validated, 0, 'No endpoints validated - test fixture broken?' + ) + self.assertFalse( + violations, + f'{len(violations)} drift violation(s); validated {validated}:\n' + + '\n'.join(violations), + ) + + def test_update_status_payload_uses_nested_x_medkit(self): + """Specific guard for issue #385: /updates/{id}/status payload. + + The handler must emit ``x-medkit: {phase: ...}`` (nested object), not + the legacy flat ``x-medkit-phase`` key. The drift test above already + catches this via the schema declared in update_status_schema(); this + explicit assertion makes the regression intent obvious. + + @verifies REQ_INTEROP_002 + """ + resp = requests.get( + f'{self.BASE_URL}/updates/{self._update_pkg_id}/status', timeout=5 + ) + self.assertEqual(resp.status_code, 200) + body = resp.json() + self.assertNotIn('x-medkit-phase', body, 'Legacy flat key still emitted') + self.assertIn('x-medkit', body) + self.assertIn('phase', body['x-medkit']) + + +@launch_testing.post_shutdown_test() +class TestExitCodes(unittest.TestCase): + + def test_exit_codes(self, proc_info): + for proc in proc_info: + self.assertIn(proc_info[proc].returncode, ALLOWED_EXIT_CODES) From c12a0cbde88683b58f0e0731b4a43e5c0b8a35a8 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 09:48:21 +0200 Subject: [PATCH 02/11] fix(spec): correct response schema ref for getAppHost endpoint GET /apps/{id}/is-located-on returns a single-element collection ({items, x-medkit, _links}) per the handler in discovery_handlers.cpp, but the spec was advertising it as EntityDetail (single object with required id/name). Aligns the spec to the actual response shape so the new drift test passes and generated clients get the right type. --- src/ros2_medkit_gateway/src/http/rest_server.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index a0ae098dd..2323e2ec5 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -1127,8 +1127,8 @@ void RESTServer::setup_routes() { }) .tag("Discovery") .summary("Get app host component") - .description("Returns the component hosting this app.") - .response(200, "Host component", SB::ref("EntityDetail")) + .description("Returns the component hosting this app as a single-element collection.") + .response(200, "Host component(s)", SB::ref("EntityList")) .operation_id("getAppHost"); reg.get(entity_path + "/depends-on", From 0b56d573474e3c59a58e042f9f8b1e294ea4915d Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 10:13:58 +0200 Subject: [PATCH 03/11] fix(test): iterate proc_info correctly in drift test exit-code check ProcInfoHandler iteration yields ProcessExited events, not name keys - indexing back into proc_info[proc] raised KeyError on every shutdown. Mirrors the pattern used in test_openapi_callability. --- .../test/features/test_openapi_response_drift.test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py index 842c0d145..0ba4a5d64 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -275,5 +275,9 @@ def test_update_status_payload_uses_nested_x_medkit(self): class TestExitCodes(unittest.TestCase): def test_exit_codes(self, proc_info): - for proc in proc_info: - self.assertIn(proc_info[proc].returncode, ALLOWED_EXIT_CODES) + for info in proc_info: + self.assertIn( + info.returncode, + ALLOWED_EXIT_CODES, + f'Process {info.process_name} exited with code {info.returncode}', + ) From 0320df6b547874b81b8449cbdd767b3d45b52826 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 11:02:09 +0200 Subject: [PATCH 04/11] fix(test): fall back to Draft7Validator for Humble's jsonschema 3.2 Ubuntu 22.04 ships python3-jsonschema 3.2 which lacks Draft202012Validator (added in 4.0). The validation surface we use (required, type, properties) behaves identically across drafts, so falling back to Draft7 keeps the drift test working on Humble while preferring Draft202012 on Jazzy/Rolling for OpenAPI 3.1 alignment. --- .../features/test_openapi_response_drift.test.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py index 0ba4a5d64..0503a7409 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -40,10 +40,18 @@ import unittest from ament_index_python.packages import get_package_prefix -from jsonschema.validators import Draft202012Validator import launch_testing import requests +# Humble (Ubuntu 22.04) ships python3-jsonschema 3.2 which only has draft-7; +# Jazzy/Rolling (Ubuntu 24.04) ship 4.10+ with Draft202012. Prefer the newest +# draft for OpenAPI 3.1 alignment, fall back to Draft7 on Humble. The properties +# we validate (required, type, properties) behave identically across drafts. +try: + from jsonschema.validators import Draft202012Validator as _Validator +except ImportError: + from jsonschema.validators import Draft7Validator as _Validator + from ros2_medkit_test_utils.constants import ALLOWED_EXIT_CODES from ros2_medkit_test_utils.gateway_test_case import GatewayTestCase from ros2_medkit_test_utils.launch_helpers import create_test_launch @@ -175,7 +183,7 @@ def _fetch_spec(self): def _validate_response(self, schema, body, schemas): """Validate body against schema with $refs inlined.""" inlined = _inline_refs(schema, schemas) - validator = Draft202012Validator(inlined) + validator = _Validator(inlined) return sorted(validator.iter_errors(body), key=lambda e: e.path) def _resource_for_path(self, path): From 989ef139d38d117a126410096092b4348952e2d1 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 14:16:07 +0200 Subject: [PATCH 05/11] ci(quality): bump sanitizer-asan job timeout to 60 minutes Cold-cache ASan builds take ~33 min on the GitHub-hosted runner, which left ~12 min for the integration test pass. PR branches that miss the ccache restore-keys lookup were getting cancelled at the last scenario test before the actions/cache post step could persist the build, so each subsequent run hit the same cold cache. 60 min provides clear headroom to complete + save cache once, after which warm-cache runs will return to the typical ~22 min. --- .github/workflows/quality.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index 75c6c7a9e..415ec189d 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -217,7 +217,11 @@ jobs: runs-on: ubuntu-latest container: image: ubuntu:noble - timeout-minutes: 45 + # Cold-cache ASan builds are ~33 min; with the 12-min test budget this + # left ~0 headroom and was already cancelling at thermal_protection on + # PR branches that miss the ccache restore key. 60 min covers a full + # cold-cache cycle so the cache always gets saved on the post step. + timeout-minutes: 60 defaults: run: shell: bash From 4e480f767040cc55aed79ad7a1cbd3bd6890476f Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 15:44:21 +0200 Subject: [PATCH 06/11] fix(spec): make x-medkit optional in UpdateStatus schema SOVD does not require vendor extensions, and the existing codebase convention (fault_detail_schema, entity_detail_schema) leaves x-medkit out of the required list even when the gateway always emits it. Marking it required in update_status_schema would imply a SOVD-incompatible contract while only the gateway implementation actually depends on it. The explicit handler guard (test_update_status_payload_uses_nested_x_medkit) keeps regression risk covered. --- src/ros2_medkit_gateway/src/openapi/schema_builder.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp index 5d35e42ff..98e1e12ee 100644 --- a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp +++ b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp @@ -556,6 +556,11 @@ nlohmann::json SchemaBuilder::update_status_schema() { {"description", "Internal lifecycle phase, distinguishes prepare-completed from execute-completed"}}}}}, {"required", {"phase"}}}; + // x-medkit is optional in the schema (matches the convention used by + // fault_detail_schema and entity_detail_schema): SOVD does not require + // vendor extensions, so a SOVD-compliant client must be able to ignore + // it. The gateway always emits it; the explicit handler guard in + // test_openapi_response_drift covers regression risk. return {{"type", "object"}, {"properties", {{"status", {{"type", "string"}, {"enum", {"pending", "inProgress", "completed", "failed"}}}}, @@ -563,7 +568,7 @@ nlohmann::json SchemaBuilder::update_status_schema() { {"sub_progress", {{"type", "array"}, {"items", sub_progress_schema}}}, {"error", {{"type", "string"}}}, {"x-medkit", x_medkit_schema}}}, - {"required", {"status", "x-medkit"}}}; + {"required", {"status"}}}; } nlohmann::json SchemaBuilder::log_configuration_schema() { From c492f9e0f2ca905f0b7d2dfce8d8a8ffc2efb53a Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 16:12:02 +0200 Subject: [PATCH 07/11] fix(test): tighten drift test invariants from review feedback Four review points addressed: - test_update_manager.cpp: switch j["x-medkit"]["phase"] reads to contains() + at(). nlohmann's operator[] on a non-const json inserts missing keys, which can mask a regression where update_status_to_json stops emitting the vendor extension. - test_openapi_response_drift: REQUIRED_APPS = {temp_sensor, calibration} forces deterministic discovery before setUpClass proceeds, instead of relying on MIN_EXPECTED_APPS=1 catching an arbitrary first node. - _inline_refs: raise _RefResolutionError on unresolved or cyclic $refs rather than returning {} (which validates anything and hides drift). Caught in the validation loop so multiple spec issues surface in one run. - Module docstring: spell out what the validator catches (required, type, enum, nested shape) and what it does not (extra undeclared fields, since most schemas don't set additionalProperties: false). Closing the additionalProperties gap belongs in the issue #338 rework. --- .../test/test_update_manager.cpp | 28 ++++--- .../test_openapi_response_drift.test.py | 79 ++++++++++++------- 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/src/ros2_medkit_gateway/test/test_update_manager.cpp b/src/ros2_medkit_gateway/test/test_update_manager.cpp index 59f602d7c..234693665 100644 --- a/src/ros2_medkit_gateway/test/test_update_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_update_manager.cpp @@ -653,11 +653,14 @@ TEST(UpdateManagerFailureTest, DeleteRollbackUpdatesStatusPhaseAndErrorMessage) ASSERT_TRUE(status->error_message.has_value()); EXPECT_NE(status->error_message->find("backend delete exploded"), std::string::npos); - // Serialized payload reflects the rollback end-to-end. + // Serialized payload reflects the rollback end-to-end. Use at() so a + // missing x-medkit/phase fails the test deterministically instead of being + // inserted as null by operator[]. auto j = update_status_to_json(*status); - EXPECT_EQ(j["status"], "failed"); - EXPECT_EQ(j["x-medkit"]["phase"], "failed"); - EXPECT_EQ(j["error"], "backend delete exploded"); + EXPECT_EQ(j.at("status"), "failed"); + ASSERT_TRUE(j.contains("x-medkit")); + EXPECT_EQ(j.at("x-medkit").at("phase"), "failed"); + EXPECT_EQ(j.at("error"), "backend delete exploded"); manager.reset(); backend.reset(); @@ -666,21 +669,24 @@ TEST(UpdateManagerFailureTest, DeleteRollbackUpdatesStatusPhaseAndErrorMessage) TEST(UpdateStatusToJson, SerializesPhaseAsVendorExtension) { UpdateStatusInfo status{UpdateStatus::Completed, UpdatePhase::Prepared, std::nullopt, std::nullopt, std::nullopt}; auto j = update_status_to_json(status); - EXPECT_EQ(j["status"], "completed"); - EXPECT_EQ(j["x-medkit"]["phase"], "prepared"); + EXPECT_EQ(j.at("status"), "completed"); + ASSERT_TRUE(j.contains("x-medkit")); + EXPECT_EQ(j.at("x-medkit").at("phase"), "prepared"); } TEST(UpdateStatusToJson, ExposesExecutedPhaseForTerminalCompletion) { UpdateStatusInfo status{UpdateStatus::Completed, UpdatePhase::Executed, 100, std::nullopt, std::nullopt}; auto j = update_status_to_json(status); - EXPECT_EQ(j["status"], "completed"); - EXPECT_EQ(j["x-medkit"]["phase"], "executed"); - EXPECT_EQ(j["progress"], 100); + EXPECT_EQ(j.at("status"), "completed"); + ASSERT_TRUE(j.contains("x-medkit")); + EXPECT_EQ(j.at("x-medkit").at("phase"), "executed"); + EXPECT_EQ(j.at("progress"), 100); } TEST(UpdateStatusToJson, EmitsNonePhaseForFreshlyRegistered) { UpdateStatusInfo status; auto j = update_status_to_json(status); - EXPECT_EQ(j["status"], "pending"); - EXPECT_EQ(j["x-medkit"]["phase"], "none"); + EXPECT_EQ(j.at("status"), "pending"); + ASSERT_TRUE(j.contains("x-medkit")); + EXPECT_EQ(j.at("x-medkit").at("phase"), "none"); } diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py index 0503a7409..72974e5e2 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -17,22 +17,30 @@ For every GET endpoint declared in the runtime OpenAPI spec served at GET /docs, this test fetches a real response from the live gateway and -validates it against the response schema declared in the same spec for the -returned status code. A mismatch means handler output and declared schema -have drifted apart - either the handler emits a field the schema does not -declare (the failure mode that caused issue #385's `x-medkit-phase` to -silently slip past every typed client), or the schema declares a required -field the handler does not emit. - -The test is intentionally narrow: only response *bodies* are validated, -only against the 200 schema, and only on endpoints that return JSON. It -does not check error envelope shapes, status code coverage, or request -schemas - those are covered by other tests. - -The full structural contract (compile-time link from emitter to schema) -will be solved by issue #338. This test is the runtime stop-gap until -that lands - if it fails, fix the schema or the handler; do not silence -the test. +validates it against the response schema declared in the same spec for +status 200. The validator runs jsonschema in its default mode, so what +this catches is the *subset* of drift that breaks the schema's positive +constraints: + +- ``required`` fields the handler does not emit +- type mismatches (string vs number, etc.) +- ``enum`` values the handler emits but the schema does not list +- malformed nested objects against declared sub-schemas + +What this does **not** catch (because most schemas in +``schema_builder.cpp`` do not set ``additionalProperties: false``): + +- handler emits an extra top-level field the schema never declares + (this was exactly the failure mode that allowed flat + ``x-medkit-phase`` to slip through before issue #385 was filed) + +A closed-object check would surface dozens of pre-existing schema gaps +that are out of scope for #385; the structural fix (compile-time link +from each emitter to its schema) lives under issue #338. Until then, +the explicit ``test_update_status_payload_uses_nested_x_medkit`` guards +the specific regression #385 closes. + +If this test fails, fix the schema or the handler; do not silence it. """ import os @@ -85,12 +93,21 @@ def generate_test_description(): } +class _RefResolutionError(Exception): + """Raised when a $ref in the spec cannot be resolved or forms a cycle. + + Returning an empty schema on these conditions would let jsonschema + validate any payload, masking the drift the test is meant to detect. + """ + + def _inline_refs(schema, schemas, seen=None): """Recursively inline $refs into a schema so jsonschema can validate it. The runtime spec uses $refs that point at #/components/schemas/, but - Draft202012Validator constructed without a registry will not follow them. - Recursive types are guarded via ``seen`` to break cycles. + Draft202012Validator constructed without a registry will not follow + them. Cycles and unresolved refs raise ``_RefResolutionError`` rather + than silently degrading the validator to "anything goes". """ if seen is None: seen = set() @@ -98,13 +115,16 @@ def _inline_refs(schema, schemas, seen=None): if '$ref' in schema: ref = schema['$ref'] if ref in seen: - return {} - if ref.startswith('#/components/schemas/'): - name = ref.rsplit('/', 1)[-1] - target = schemas.get(name) - if target is None: - return {} - return _inline_refs(target, schemas, seen | {ref}) + raise _RefResolutionError( + f'Cycle in $ref chain: {" -> ".join(sorted(seen))} -> {ref}' + ) + if not ref.startswith('#/components/schemas/'): + raise _RefResolutionError(f'Unsupported $ref form: {ref}') + name = ref.rsplit('/', 1)[-1] + target = schemas.get(name) + if target is None: + raise _RefResolutionError(f'Unknown $ref target: {ref}') + return _inline_refs(target, schemas, seen | {ref}) return {k: _inline_refs(v, schemas, seen) for k, v in schema.items()} if isinstance(schema, list): return [_inline_refs(item, schemas, seen) for item in schema] @@ -130,7 +150,8 @@ def replacer(match): class TestOpenApiResponseDrift(GatewayTestCase): """Catches handler-vs-spec drift on GET response bodies.""" - MIN_EXPECTED_APPS = 1 + MIN_EXPECTED_APPS = 2 + REQUIRED_APPS = {'temp_sensor', 'calibration'} @classmethod def setUpClass(cls): @@ -240,7 +261,11 @@ def test_get_responses_match_declared_schema(self): continue body = resp.json() - errors = self._validate_response(schema, body, schemas) + try: + errors = self._validate_response(schema, body, schemas) + except _RefResolutionError as exc: + violations.append(f'GET {uri}: spec error: {exc}') + continue if errors: detail = '; '.join( f'{".".join(str(p) for p in e.absolute_path) or ""}: {e.message}' From 53fddac8692b8cc1c923ec5f8ce63216befcc261 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Mon, 27 Apr 2026 19:11:15 +0200 Subject: [PATCH 08/11] test: add explicit guard for nested x-medkit on configurations Mirror of test_update_status_payload_uses_nested_x_medkit for the configurations endpoint. The drift test cannot catch reintroduction of the legacy flat x-medkit-source / x-medkit-node keys because the ConfigurationMetaData schema is not closed (no additionalProperties: false), so an explicit positive assertion locks in the new shape. The test exercises temp_sensor's four declared ROS 2 parameters, which covers the per-parameter emit path migrated by this PR. --- .../test_openapi_response_drift.test.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py index 72974e5e2..5b3ac0721 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -303,6 +303,53 @@ def test_update_status_payload_uses_nested_x_medkit(self): self.assertIn('x-medkit', body) self.assertIn('phase', body['x-medkit']) + def test_configurations_payload_uses_nested_x_medkit(self): + """Specific guard for issue #385: configurations vendor fields nested. + + Items at the top level (only in aggregated mode) and parameters + inside ``x-medkit.parameters[]`` (always emitted) must carry vendor + source attribution under a nested ``x-medkit`` object, never as flat + ``x-medkit-source`` / ``x-medkit-node`` keys. The drift test cannot + catch reintroduction of the legacy flat keys because + ``ConfigurationMetaData`` does not set ``additionalProperties: false``. + + ``temp_sensor`` declares four ROS 2 parameters in the test fixture, + which exercises the per-parameter emit path that #385 migrated. + + @verifies REQ_INTEROP_002 + """ + resp = requests.get( + f'{self.BASE_URL}/apps/temp_sensor/configurations', timeout=10 + ) + self.assertEqual(resp.status_code, 200) + body = resp.json() + + items = body.get('items', []) + self.assertGreater( + len(items), 0, + 'Fixture broken: temp_sensor should declare ROS 2 parameters' + ) + for item in items: + self.assertNotIn( + 'x-medkit-source', item, f'Legacy flat key on item: {item}' + ) + self.assertNotIn( + 'x-medkit-node', item, f'Legacy flat key on item: {item}' + ) + + parameters = body.get('x-medkit', {}).get('parameters', []) + self.assertGreater(len(parameters), 0) + for param in parameters: + self.assertNotIn( + 'x-medkit-source', param, f'Legacy flat key on parameter: {param}' + ) + self.assertNotIn( + 'x-medkit-node', param, f'Legacy flat key on parameter: {param}' + ) + self.assertIn('x-medkit', param) + self.assertEqual(param['x-medkit'].get('source'), 'temp_sensor') + self.assertIn('node', param['x-medkit']) + @launch_testing.post_shutdown_test() class TestExitCodes(unittest.TestCase): From e443c2343b7b9faf3474f14a3952e57258911fe0 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 28 Apr 2026 17:06:43 +0200 Subject: [PATCH 09/11] fix(gateway): drain in-flight graph callbacks in remove_graph_change fire_graph_callbacks() snapshotted GraphCallback copies and posted them to the worker queue; the captured `this` from Ros2TopicDataProvider could be invoked after the consumer's destructor freed members because remove_graph_change() only cleared the slot for future snapshots. Track per-slot in_flight counters: bump under graph_mtx_ when snapshotting, drop them after the wrapper invokes each callback (or roll back if post() rejects the wrapper during shutdown / queue full). remove_graph_change() now waits on a condvar until in_flight[token] reaches zero, so callers can safely tear down captured state. Adds a regression test that verifies the synchronous drain semantic. Surfaced by sanitizer-asan as a heap-use-after-free in DataAccessManagerWithPublisherTest::TearDown -> ~Ros2TopicDataProvider -> on_graph_change running on the executor worker. --- .../ros2_subscription_executor.hpp | 32 ++++++++-- .../ros2_subscription_executor.cpp | 54 ++++++++++++++-- .../test/test_ros2_subscription_executor.cpp | 61 +++++++++++++++++++ 3 files changed, 136 insertions(+), 11 deletions(-) diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp index 639e12537..3ddcd7390 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp @@ -190,16 +190,29 @@ class Ros2SubscriptionExecutor final { * pool entries. * * @warning A registered callback must NOT call remove_graph_change() on its own - * token from inside the callback - graph_mtx_ is non-recursive and a - * self-removal attempt deadlocks. Drop the token through a separate task - * posted to the worker if dynamic deregistration is needed. + * token from inside the callback - graph_mtx_ is non-recursive and the + * synchronous in-flight drain in remove_graph_change() would wait + * indefinitely on the very call holding the slot's in_flight counter. + * Drop the token through a separate task posted to the worker if + * dynamic deregistration is needed. * * @return Opaque token in range [0, kMaxGraphListeners). `kMaxGraphListeners` if * all slots are taken. */ [[nodiscard]] std::size_t on_graph_change(GraphCallback cb); - /// Remove a previously-registered graph callback. Idempotent. See warning on on_graph_change. + /** + * @brief Remove a previously-registered graph callback. Idempotent. + * + * Synchronously drains in-flight execution: returns only after any worker + * thread that already entered this slot's callback has exited it. After + * remove_graph_change() returns, the caller can safely tear down state that + * the callback's captured `this` pointer references; the executor guarantees + * no further dispatch and no in-flight call. + * + * See warning on on_graph_change(): self-removal from inside the callback + * deadlocks. + */ void remove_graph_change(std::size_t token); /// True after shutdown has started. Monotonic. Use to skip re-posting work on teardown. @@ -269,10 +282,19 @@ class Ros2SubscriptionExecutor final { std::atomic watchdog_trips_{0}; std::atomic graph_events_received_{0}; - // Graph callbacks (pre-allocated, Tier 1) + // Graph callbacks (pre-allocated, Tier 1). + // graph_in_flight_[i] tracks how many worker invocations of slot i's + // callback are currently mid-execution. fire_graph_callbacks() increments + // it when snapshotting under graph_mtx_, the snapshot wrapper decrements + // it after the callback returns and notifies graph_in_flight_cv_. This + // lets remove_graph_change() block until in-flight execution drains, so a + // consumer's destructor can safely tear down state that the callback's + // captured `this` references. mutable std::mutex graph_mtx_; std::array graph_callbacks_{}; std::array graph_slot_used_{}; + std::array graph_in_flight_{}; + std::condition_variable graph_in_flight_cv_; // Public rclcpp graph API - stable across Humble / Jazzy / Rolling rclcpp::Event::SharedPtr graph_event_; diff --git a/src/ros2_medkit_gateway/src/ros2_common/ros2_subscription_executor.cpp b/src/ros2_medkit_gateway/src/ros2_common/ros2_subscription_executor.cpp index 46445a885..fe6c43088 100644 --- a/src/ros2_medkit_gateway/src/ros2_common/ros2_subscription_executor.cpp +++ b/src/ros2_medkit_gateway/src/ros2_common/ros2_subscription_executor.cpp @@ -135,9 +135,17 @@ void Ros2SubscriptionExecutor::remove_graph_change(std::size_t token) { if (token >= kMaxGraphListeners) { return; } - std::lock_guard lk(graph_mtx_); + std::unique_lock lk(graph_mtx_); graph_slot_used_[token] = false; graph_callbacks_[token] = nullptr; + // Wait for any worker-thread invocation that already snapshotted this + // slot to finish executing the callback. Without this drain, a wrapper + // task posted by fire_graph_callbacks() could call cb() against a + // captured `this` after the consumer's destructor has freed members, + // producing the heap-use-after-free in the snapshotted callback body. + graph_in_flight_cv_.wait(lk, [this, token] { + return graph_in_flight_[token] == 0; + }); } Ros2SubscriptionExecutor::Stats Ros2SubscriptionExecutor::stats() const { @@ -309,30 +317,64 @@ void Ros2SubscriptionExecutor::fire_graph_callbacks() { // sweep, which can take 15 minutes by default. All-or-nothing delivery // means a single dropped post at most loses one whole graph event, which // graph_poll_tick() will refire on the next change. - std::vector snapshot; + // + // Each entry pairs the callback with its slot token: when the wrapper + // finishes invoking the callback, it decrements graph_in_flight_[token] + // so remove_graph_change() can synchronously wait for in-flight execution + // to drain. + struct PendingCb { + std::size_t token; + GraphCallback cb; + }; + std::vector snapshot; snapshot.reserve(kMaxGraphListeners); { std::lock_guard lk(graph_mtx_); for (std::size_t i = 0; i < kMaxGraphListeners; ++i) { if (graph_slot_used_[i] && graph_callbacks_[i]) { - snapshot.push_back(graph_callbacks_[i]); + snapshot.push_back({i, graph_callbacks_[i]}); + ++graph_in_flight_[i]; } } } if (snapshot.empty()) { return; } - (void)post([cbs = std::move(snapshot)] { - for (const auto & cb : cbs) { + // Capture tokens before moving the snapshot into the wrapper so we can + // roll back graph_in_flight_ if post() rejects the wrapper (queue full + // or shutting down). + std::vector reserved_tokens; + reserved_tokens.reserve(snapshot.size()); + for (const auto & p : snapshot) { + reserved_tokens.push_back(p.token); + } + const bool posted = post([this, cbs = std::move(snapshot)] { + for (const auto & p : cbs) { try { - cb(); + p.cb(); } catch (const std::exception & ex) { RCLCPP_ERROR(rclcpp::get_logger("ros2_subscription_executor"), "Graph callback threw exception: %s", ex.what()); } catch (...) { RCLCPP_ERROR(rclcpp::get_logger("ros2_subscription_executor"), "Graph callback threw unknown exception"); } + { + std::lock_guard lk(graph_mtx_); + --graph_in_flight_[p.token]; + } + graph_in_flight_cv_.notify_all(); } }); + if (!posted) { + // Queue full or shutting down: the wrapper will not run, so we must + // decrement the counters we incremented to keep remove_graph_change() + // from waiting forever. snapshot is in moved-from state here; use + // reserved_tokens captured above the post() call. + std::lock_guard lk(graph_mtx_); + for (auto t : reserved_tokens) { + --graph_in_flight_[t]; + } + graph_in_flight_cv_.notify_all(); + } } } // namespace ros2_medkit_gateway::ros2_common diff --git a/src/ros2_medkit_gateway/test/test_ros2_subscription_executor.cpp b/src/ros2_medkit_gateway/test/test_ros2_subscription_executor.cpp index f9768e0e8..36f35046b 100644 --- a/src/ros2_medkit_gateway/test/test_ros2_subscription_executor.cpp +++ b/src/ros2_medkit_gateway/test/test_ros2_subscription_executor.cpp @@ -16,8 +16,10 @@ #include #include +#include #include #include +#include #include #include @@ -235,6 +237,65 @@ TEST_F(Ros2SubscriptionExecutorTest, RemoveGraphChangeStopsFiring) { EXPECT_EQ(fired.load(), 0); } +TEST_F(Ros2SubscriptionExecutorTest, RemoveGraphChangeWaitsForInFlightCallback) { + // Regression: snapshotted graph callbacks running on the worker thread + // could touch members of a freed consumer (heap-use-after-free seen by + // ASan in DataAccessManagerWithPublisherTest). remove_graph_change() + // now drains in-flight execution synchronously so the consumer's + // destructor can safely tear down state captured by the lambda. + std::mutex m; + std::condition_variable in_cb; + std::condition_variable cb_release; + std::atomic entered{false}; + std::atomic exited{false}; + bool release = false; + + auto token = sub_exec_->on_graph_change([&] { + entered.store(true); + in_cb.notify_all(); + std::unique_lock lk(m); + cb_release.wait(lk, [&] { + return release; + }); + exited.store(true); + }); + ASSERT_LT(token, 16u); + + auto other_node = std::make_shared("other_node_for_in_flight"); + auto pub = other_node->create_publisher("/in_flight_topic", 10); + executor_->add_node(other_node); + + { + std::unique_lock lk(m); + in_cb.wait_for(lk, 3s, [&] { + return entered.load(); + }); + } + ASSERT_TRUE(entered.load()) << "Callback never started; cannot exercise drain"; + EXPECT_FALSE(exited.load()); + + std::atomic remove_returned{false}; + std::thread remover([&] { + sub_exec_->remove_graph_change(token); + remove_returned.store(true); + }); + + std::this_thread::sleep_for(150ms); + EXPECT_FALSE(remove_returned.load()) << "remove_graph_change did not wait for in-flight callback"; + + { + std::lock_guard lk(m); + release = true; + } + cb_release.notify_all(); + + remover.join(); + EXPECT_TRUE(exited.load()); + EXPECT_TRUE(remove_returned.load()); + + executor_->remove_node(other_node); +} + TEST_F(Ros2SubscriptionExecutorTest, WatchdogDetectsStuckTask) { std::promise release; std::shared_future release_f = release.get_future().share(); From dbcd01093b93ba1a90c1f04f71ab6540022ea225 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 28 Apr 2026 20:29:13 +0200 Subject: [PATCH 10/11] fix(gateway): declare x-medkit.node in configuration schema; harden drift test - configuration_metadata_schema(): declare ``x-medkit.node`` alongside ``source``. config_handlers.cpp emits both on every per-parameter entry in aggregated configurations; clients generated from the spec dropped ``node`` because it was undeclared. Add a static schema test that fails if either property goes missing - the integration drift test cannot catch this because configuration_metadata_schema leaves additionalProperties open by convention. - update_status_schema(): rewrite the comment around the inner ``required: {phase}`` to spell out that ``phase`` is required only when x-medkit is present, not unconditionally. - test_openapi_response_drift: replace the silent ``test_entity`` fallback in _substitute_path_params with an explicit _MissingEntityType exception. The loop now skips paths whose entity type was not discovered and reports them in test output, instead of hitting 404s the loop dropped without validating. Added unit tests for the helper. - test_openapi_response_drift: poll /docs until at least MIN_EXPECTED_PATHS paths are present, so a slow plugin registration cannot let the test read a partial spec and silently miss endpoints registered after the snapshot. - test_openapi_response_drift: clarify class docstring on GET-only scope; POST/PUT/DELETE coverage stays out of scope. - ros2_subscription_executor: strengthen on_graph_change() warning to describe the exact deadlock surface and the symptom (hung worker, no log line). Runtime detection would need thread_local TLS, which is banned in code linked into the gateway plugin MODULE. --- .../ros2_subscription_executor.hpp | 19 ++- .../src/openapi/schema_builder.cpp | 18 ++- .../test/test_schema_builder.cpp | 22 ++++ .../test_openapi_response_drift.test.py | 109 ++++++++++++++++-- 4 files changed, 148 insertions(+), 20 deletions(-) diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp index 3ddcd7390..aec601878 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp @@ -189,12 +189,19 @@ class Ros2SubscriptionExecutor final { * disappearing, types changing). Use to invalidate per-topic caches or evict stale * pool entries. * - * @warning A registered callback must NOT call remove_graph_change() on its own - * token from inside the callback - graph_mtx_ is non-recursive and the - * synchronous in-flight drain in remove_graph_change() would wait - * indefinitely on the very call holding the slot's in_flight counter. - * Drop the token through a separate task posted to the worker if - * dynamic deregistration is needed. + * @warning DO NOT call remove_graph_change() on this token from inside the + * callback body. The deadlock is concrete: remove_graph_change() + * acquires graph_mtx_ and waits for graph_in_flight_[token] to + * drain to zero, but the wrapper that decrements is the very call + * frame that ran your callback. The worker thread is now blocked + * on cv.wait, the decrement never runs, the wait never returns. + * Symptom in production: hung worker thread, /health stops + * updating, no log line. There is no runtime detection (would + * need thread_local TLS, banned in code linked into the gateway + * plugin MODULE; see CLAUDE.md). Workaround for dynamic + * deregistration: have the callback hand the token to a + * non-worker thread via a queue/promise, and call + * remove_graph_change() from there. * * @return Opaque token in range [0, kMaxGraphListeners). `kMaxGraphListeners` if * all slots are taken. diff --git a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp index 98e1e12ee..550289195 100644 --- a/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp +++ b/src/ros2_medkit_gateway/src/openapi/schema_builder.cpp @@ -172,7 +172,10 @@ nlohmann::json SchemaBuilder::configuration_metadata_schema() { {"properties", {{"source", {{"type", "string"}, - {"description", "App ID that owns this parameter (only present in aggregated configurations)"}}}}}}}}}, + {"description", "App ID that owns this parameter (only present in aggregated configurations)"}}}, + {"node", + {{"type", "string"}, + {"description", "Node FQN providing this parameter (only present in aggregated configurations)"}}}}}}}}}, {"required", {"id", "name", "type"}}}; } @@ -556,11 +559,14 @@ nlohmann::json SchemaBuilder::update_status_schema() { {"description", "Internal lifecycle phase, distinguishes prepare-completed from execute-completed"}}}}}, {"required", {"phase"}}}; - // x-medkit is optional in the schema (matches the convention used by - // fault_detail_schema and entity_detail_schema): SOVD does not require - // vendor extensions, so a SOVD-compliant client must be able to ignore - // it. The gateway always emits it; the explicit handler guard in - // test_openapi_response_drift covers regression risk. + // x-medkit is optional in the SOVD payload (clients may ignore vendor + // extensions; matches the convention in fault_detail_schema and + // entity_detail_schema). When the gateway DOES emit the x-medkit object, + // however, ``phase`` is mandatory inside it - that scope is enforced by + // the inner ``required: {phase}`` above, NOT by listing x-medkit in the + // parent's required list. The drift test in test_openapi_response_drift + // covers regression on the emit side. If x-medkit is ever dropped from + // the parent properties, the inner required must be revisited too. return {{"type", "object"}, {"properties", {{"status", {{"type", "string"}, {"enum", {"pending", "inProgress", "completed", "failed"}}}}, diff --git a/src/ros2_medkit_gateway/test/test_schema_builder.cpp b/src/ros2_medkit_gateway/test/test_schema_builder.cpp index 5398da781..7c301f571 100644 --- a/src/ros2_medkit_gateway/test/test_schema_builder.cpp +++ b/src/ros2_medkit_gateway/test/test_schema_builder.cpp @@ -174,6 +174,28 @@ TEST(SchemaBuilderStaticTest, ConfigurationMetaDataSchema) { EXPECT_EQ(std::find(required.begin(), required.end(), "value"), required.end()); } +// @verifies REQ_INTEROP_002 +TEST(SchemaBuilderStaticTest, ConfigurationMetaDataXMedkitDeclaresAllEmittedFields) { + // Regression: the x-medkit object emitted by config_handlers.cpp on every + // per-parameter entry contains both `source` (app_id) and `node` (FQN). + // The schema must declare both, otherwise generated typed clients drop + // or fail-type the undeclared field - exactly the drift this PR fixes + // for x-medkit.phase. additionalProperties is intentionally left open + // (other endpoints use the same convention), so the drift integration + // test cannot detect missing properties here; this static check does. + auto schema = SchemaBuilder::configuration_metadata_schema(); + ASSERT_TRUE(schema.contains("properties")); + ASSERT_TRUE(schema.at("properties").contains("x-medkit")); + const auto & x_medkit = schema.at("properties").at("x-medkit"); + EXPECT_EQ(x_medkit.at("type"), "object"); + ASSERT_TRUE(x_medkit.contains("properties")); + const auto & x_props = x_medkit.at("properties"); + ASSERT_TRUE(x_props.contains("source")); + EXPECT_EQ(x_props.at("source").at("type"), "string"); + ASSERT_TRUE(x_props.contains("node")); + EXPECT_EQ(x_props.at("node").at("type"), "string"); +} + // @verifies REQ_INTEROP_002 TEST(SchemaBuilderStaticTest, ConfigurationReadValueSchema) { auto schema = SchemaBuilder::configuration_read_value_schema(); diff --git a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py index 5b3ac0721..4591ede2c 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_openapi_response_drift.test.py @@ -137,27 +137,60 @@ def _is_sse_endpoint(operation): return 'SSE' in summary or 'stream' in summary.lower() +class _MissingEntityType(Exception): + """Raised when a path needs an entity type that was not discovered. + + Substituting a placeholder (e.g. ``'test_entity'``) would yield a path + the gateway returns 404 for; the drift loop skips non-200 responses, + so the endpoint would silently exit unvalidated. Callers must catch + this and explicitly account for the skipped path. + """ + + def _substitute_path_params(path, entity_map, resource_id='test_id'): def replacer(match): param = match.group(1) if param in _ENTITY_PARAM_MAP: - return entity_map.get(_ENTITY_PARAM_MAP[param], 'test_entity') + etype = _ENTITY_PARAM_MAP[param] + if etype not in entity_map: + raise _MissingEntityType( + f'Path param {{{param}}} requires entity type ' + f'{etype!r} but none was discovered' + ) + return entity_map[etype] return resource_id return re.sub(r'\{([^}]+)\}', replacer, path) class TestOpenApiResponseDrift(GatewayTestCase): - """Catches handler-vs-spec drift on GET response bodies.""" + """Catches handler-vs-spec drift on GET response bodies. + + Scope: GET endpoints declared at runtime under /docs only. POST/PUT/ + DELETE request bodies and non-200 response shapes are out of scope; + extending coverage to all verbs is tracked under issue #338. + """ MIN_EXPECTED_APPS = 2 REQUIRED_APPS = {'temp_sensor', 'calibration'} + # Conservative lower bound on the number of paths a fully registered + # gateway exposes (currently ~50 with the test fixture). _fetch_spec + # polls until at least this many paths are present so a slow plugin + # registration cannot let the test read back a partial spec and + # silently skip the endpoints registered after the snapshot. + MIN_EXPECTED_PATHS = 30 @classmethod def setUpClass(cls): super().setUpClass() - # Discover real entity IDs to use in path substitution. + # Discover real entity IDs to use in path substitution. Each + # entity type may legitimately be empty in some fixture + # configurations (e.g. runtime_only mode without manifest-declared + # areas). Endpoints needing a type that was not discovered are + # skipped explicitly in the validation loop and reported in test + # output - no longer silently substituted with a placeholder that + # produces 404s the loop drops. cls._entity_map = {} for etype in ('areas', 'components', 'apps', 'functions'): resp = requests.get(f'{cls.BASE_URL}/{etype}', timeout=5) @@ -198,7 +231,10 @@ def tearDownClass(cls): def _fetch_spec(self): return self.poll_endpoint_until( - '/docs', lambda d: d if d.get('paths') else None + '/docs', + lambda d: d if ( + d.get('paths') and len(d['paths']) >= self.MIN_EXPECTED_PATHS + ) else None, ) def _validate_response(self, schema, body, schemas): @@ -217,18 +253,27 @@ def _resource_for_path(self, path): return 'test_id' def test_get_responses_match_declared_schema(self): - """Every 200 response must validate against its declared schema. + """GET 200 responses must validate against the declared schema. Iterates GET endpoints from /docs, fetches each, and validates bodies against the response schema declared for status 200. Schema drift (handler emits a field schema does not declare as required, or vice versa) raises an error. + Coverage limited to GET verbs; POST/PUT/DELETE request bodies and + non-200 response shapes are out of scope (tracked under issue + #338). Endpoints whose path placeholders need an entity type that + was not discovered (e.g. /functions/{function_id}/... when no + functions exist) are skipped explicitly with an entry in + ``skipped_for_missing_entity`` rather than substituted with a + bogus value that would silently 404 unvalidated. + @verifies REQ_INTEROP_002 """ spec = self._fetch_spec() schemas = spec.get('components', {}).get('schemas', {}) violations = [] + skipped_for_missing_entity = [] validated = 0 for path, path_item in spec.get('paths', {}).items(): @@ -247,9 +292,13 @@ def test_get_responses_match_declared_schema(self): if not schema: continue - uri = _substitute_path_params( - path, self._entity_map, self._resource_for_path(path) - ) + try: + uri = _substitute_path_params( + path, self._entity_map, self._resource_for_path(path) + ) + except _MissingEntityType as exc: + skipped_for_missing_entity.append(f'{path}: {exc}') + continue resp = requests.get(f'{self.BASE_URL}{uri}', timeout=10) # Non-200 responses fall outside this drift check (handler may @@ -278,6 +327,19 @@ def test_get_responses_match_declared_schema(self): self.assertGreater( validated, 0, 'No endpoints validated - test fixture broken?' ) + # Surface paths skipped due to missing optional entity types + # (e.g. /functions/{function_id}/... when no functions were + # declared). The previous fallback substituted 'test_entity', + # which produced 404s the validation loop silently dropped - + # hiding entire entity types from the test. Print is captured + # in test output so the gap is visible without failing CI when + # the fixture legitimately omits an optional type. + if skipped_for_missing_entity: + print( + f'[drift] {len(skipped_for_missing_entity)} endpoint(s) ' + f'unvalidated (missing optional entity type):\n' + + '\n'.join(f' - {s}' for s in skipped_for_missing_entity) + ) self.assertFalse( violations, f'{len(violations)} drift violation(s); validated {validated}:\n' @@ -350,6 +412,37 @@ def test_configurations_payload_uses_nested_x_medkit(self): self.assertEqual(param['x-medkit'].get('source'), 'temp_sensor') self.assertIn('node', param['x-medkit']) + def test_substitute_path_params_uses_discovered_entity_id(self): + """Helper resolves placeholders against the entity_map.""" + uri = _substitute_path_params( + '/areas/{area_id}/components', + {'areas': 'my_area', 'components': 'my_comp'}, + resource_id='resX', + ) + self.assertEqual(uri, '/areas/my_area/components') + + def test_substitute_path_params_resource_id_substituted(self): + """Non-entity placeholders use the resource_id argument.""" + uri = _substitute_path_params( + '/updates/{id}/status', {}, resource_id='pkg-7' + ) + self.assertEqual(uri, '/updates/pkg-7/status') + + def test_substitute_path_params_raises_on_missing_entity_type(self): + """Missing entity type must raise rather than substitute a fake. + + The previous fallback substituted 'test_entity' for unknown + types, producing 404s the validation loop silently dropped. + Now the helper forces callers to handle the missing type + explicitly (skip + report). + """ + with self.assertRaises(_MissingEntityType): + _substitute_path_params('/functions/{function_id}/data', {}) + with self.assertRaises(_MissingEntityType): + _substitute_path_params( + '/areas/{area_id}/components', {'components': 'c1'} + ) + @launch_testing.post_shutdown_test() class TestExitCodes(unittest.TestCase): From 48c2e457938a5b398560ae6e61a6755352ebd0ad Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Wed, 29 Apr 2026 08:14:54 +0200 Subject: [PATCH 11/11] fix(fault_manager): explicit ctor for LockedSubscriptionGuard (C++20 aggregate) Rolling's gcc 14 / libstdc++ enforces the C++20 wording of [dcl.init.aggr] that disqualifies a class as an aggregate when it has any user-declared constructors - including ``= delete`` ones (rule tightened from C++17's "user-provided" to C++20's "user-declared"). Even with CMAKE_CXX_STANDARD=17 set, the libstdc++ headers/checks behave as C++20 on rolling, breaking ``LockedSubscriptionGuard{&mtx, sub, cg}`` because the only candidates are the deleted copy/move constructors. Add an explicit 3-arg constructor so brace-init resolves to direct-init regardless of aggregate semantics across distros (Jazzy/Humble C++17 and Rolling effective-C++20). Reproduces locally with ``g++ -std=c++20``. --- .../src/snapshot_capture.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/ros2_medkit_fault_manager/src/snapshot_capture.cpp b/src/ros2_medkit_fault_manager/src/snapshot_capture.cpp index 2344448ae..f833053df 100644 --- a/src/ros2_medkit_fault_manager/src/snapshot_capture.cpp +++ b/src/ros2_medkit_fault_manager/src/snapshot_capture.cpp @@ -40,6 +40,18 @@ struct LockedSubscriptionGuard { rclcpp::GenericSubscription::SharedPtr subscription; rclcpp::CallbackGroup::SharedPtr callback_group; + // Explicit constructor instead of relying on aggregate initialization: + // C++20 [dcl.init.aggr] disqualifies a class as an aggregate if it has + // any user-declared constructors, including ``= delete`` ones (the rule + // tightened from "user-provided" in C++17 to "user-declared" in C++20). + // Rolling's gcc 14 / libstdc++ enforces the C++20 wording even when + // CMAKE_CXX_STANDARD is 17, so brace-init ``LockedSubscriptionGuard{ + // &mtx, sub, cg}`` would fail to find a matching constructor on rolling. + // An explicit constructor sidesteps the aggregate-init rules entirely. + LockedSubscriptionGuard(std::mutex * m, rclcpp::GenericSubscription::SharedPtr s, rclcpp::CallbackGroup::SharedPtr cg) + : mtx(m), subscription(std::move(s)), callback_group(std::move(cg)) { + } + ~LockedSubscriptionGuard() { if (!subscription && !callback_group) { return;