From 5b6431a0e955a8c78eac636b8818658614afb79c Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 2 Jul 2026 12:59:20 +0200 Subject: [PATCH 1/2] Fix review findings: pause ref-count invariant, QoS adaptation, and more Correctness fixes from a full-application review, all covered by new unit tests (174 total, TSAN/ASAN clean): - BridgeServer: track per-topic middleware refs in the session (ref_held) and guard all subscription state transitions with cleanup_mutex_. Fixes three related bugs: unsubscribe-while-paused double-decrementing shared ref counts, subscribe-while-paused leaking a ref on resume, and the pause vs disconnect race destroying subscriptions other clients still use. - BridgeServer: resume now keeps subscriptions whose topic is temporarily missing from discovery (per the documented pause contract) and reports them via a new unavailable_topics field. - BridgeServer: clamp client-supplied max_rate_hz to [0.001, 1e6] Hz (previously UB int cast above ~2.1e6 Hz; rates below 1 mHz silently meant unlimited) and make bare-string re-subscribes preserve an existing rate limit instead of resetting it to unlimited. - BridgeServer: process_requests() now drains the pending queue (bounded) instead of handling one request per timer tick, removing the ~100 req/s cap and heartbeat starvation under bursts. - GenericSubscriptionManager: adapt subscription QoS to publishers (BEST_EFFORT if any publisher is best-effort, TRANSIENT_LOCAL if all are latched). RELIABLE-only subscriptions silently received nothing from sensor-data publishers. - ros2 main: use executor.spin() instead of a spin_some() busy-loop that pinned a CPU core. - WebSocketMiddleware: copy the server shared_ptr under state_mutex_ in the client callback (shutdown TOCTOU null deref); join the stop thread in the destructor instead of detaching (exit-time UB); make receive_request truly non-blocking (was parking callers 10 ms per empty poll). - MessageBuffer: TTL cleanup no longer purges the whole buffer when the wall clock steps backwards (unsigned underflow); clock is injectable for tests. - SchemaExtractor: bounded string fields (string<=N) no longer make the whole schema extraction fail; try_get_message_definition() distinguishes failure from legitimately empty definitions so std_msgs/msg/Empty topics can be subscribed. Docs: API.md documents rate clamping, bare-string semantics, actual rate-limit selection (first eligible message), and resume's unavailable_topics; CLAUDE.md's wire-format section now includes the 16-byte frame header and the stale test count is fixed. Co-Authored-By: Claude Fable 5 --- .gitignore | 7 + CLAUDE.md | 17 +- app/include/pj_bridge/bridge_server.hpp | 5 +- app/include/pj_bridge/message_buffer.hpp | 9 +- .../middleware/websocket_middleware.hpp | 5 +- app/include/pj_bridge/session_manager.hpp | 8 + .../pj_bridge/topic_source_interface.hpp | 5 +- app/src/bridge_server.cpp | 330 +++++++++------- app/src/message_buffer.cpp | 16 +- app/src/middleware/websocket_middleware.cpp | 53 ++- app/src/session_manager.cpp | 36 ++ docs/API.md | 13 +- .../generic_subscription_manager.hpp | 2 + .../pj_bridge_ros2/schema_extractor.hpp | 17 +- ros2/src/generic_subscription_manager.cpp | 37 +- ros2/src/main.cpp | 8 +- ros2/src/ros2_topic_source.cpp | 12 +- ros2/src/schema_extractor.cpp | 16 +- tests/unit/test_bridge_server.cpp | 368 ++++++++++++++++++ .../test_generic_subscription_manager.cpp | 36 ++ tests/unit/test_message_buffer.cpp | 44 +++ tests/unit/test_schema_extractor.cpp | 78 ++++ 22 files changed, 947 insertions(+), 175 deletions(-) diff --git a/.gitignore b/.gitignore index 9c59f5e..ce8fd53 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,10 @@ CMakeUserPresets.json /build/* /install/* /log/* + +# Sanitizer build/install dirs (in-repo colcon builds) +build_tsan/ +install_tsan/ +build_asan/ +install_asan/ +log/ diff --git a/CLAUDE.md b/CLAUDE.md index 740719a..c7285c9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -141,8 +141,17 @@ BridgeServer does NOT own timers. The entry point (`main.cpp`) drives the event ### Message Serialization Format +Each binary frame starts with a fixed 16-byte header, followed by the +ZSTD-compressed message stream (see docs/API.md for the full layout): + ``` -For each message (streamed, no header): +Frame header (16 bytes, uncompressed): + - Magic "PJRB" (uint32_t LE, 0x42524A50) + - Message count (uint32_t LE) + - Uncompressed payload size (uint32_t LE) + - Flags (uint32_t LE) + +Then, for each message in the (compressed) payload: - Topic name length (uint16_t LE) - Topic name (N bytes UTF-8) - Timestamp (uint64_t ns since epoch, LE) @@ -188,7 +197,7 @@ For each message (streamed, no header): ## Testing -### Test Count: 154 unit tests across 10 test suites +### Test Count: 174 unit tests across 10 test suites ### Commands ```bash @@ -252,7 +261,7 @@ pj_bridge_fastdds --domains 0 1 --port 9090 --publish-rate 50 --session-timeout --- -**Last Updated**: 2026-02-26 +**Last Updated**: 2026-07-02 **Project Phase**: Unified multi-backend architecture -**Test Status**: 154 unit tests passing (all sanitizers clean) +**Test Status**: 174 unit tests passing (all sanitizers clean) **Executables**: `pj_bridge_ros2` (ROS2), `pj_bridge_rti` (RTI DDS, disabled), `pj_bridge_fastdds` (FastDDS) diff --git a/app/include/pj_bridge/bridge_server.hpp b/app/include/pj_bridge/bridge_server.hpp index 92c1eaa..9ac03dc 100644 --- a/app/include/pj_bridge/bridge_server.hpp +++ b/app/include/pj_bridge/bridge_server.hpp @@ -83,10 +83,10 @@ class BridgeServer { /** * @brief Process incoming API requests * - * Non-blocking call that checks for pending requests, + * Non-blocking call that drains pending requests (bounded per call), * processes them, and sends responses. * - * @return true if a request was processed, false if no requests pending + * @return true if at least one request was processed, false if none pending */ bool process_requests(); @@ -123,6 +123,7 @@ class BridgeServer { StatsSnapshot snapshot_and_reset_stats(); private: + void process_single_request(const std::vector& request_data, const std::string& client_id); std::string handle_get_topics(const std::string& client_id, const nlohmann::json& request); std::string handle_subscribe(const std::string& client_id, const nlohmann::json& request); std::string handle_unsubscribe(const std::string& client_id, const nlohmann::json& request); diff --git a/app/include/pj_bridge/message_buffer.hpp b/app/include/pj_bridge/message_buffer.hpp index 20bbc40..2ef2406 100644 --- a/app/include/pj_bridge/message_buffer.hpp +++ b/app/include/pj_bridge/message_buffer.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -52,8 +53,12 @@ class MessageBuffer { public: static constexpr uint64_t kDefaultMaxMessageAgeNs = 1'000'000'000; ///< 1 second + /// Clock source returning nanoseconds since epoch; injectable for testing. + using ClockFn = std::function; + /// @param max_message_age_ns TTL for buffered messages (default 1 second). - explicit MessageBuffer(uint64_t max_message_age_ns = kDefaultMaxMessageAgeNs); + /// @param clock_fn Clock override for tests (default: wall clock). + explicit MessageBuffer(uint64_t max_message_age_ns = kDefaultMaxMessageAgeNs, ClockFn clock_fn = {}); /// Add a message to the buffer for the given topic. /// Triggers cleanup of stale messages before inserting. @@ -77,7 +82,9 @@ class MessageBuffer { mutable std::mutex mutex_; std::unordered_map> topic_buffers_; uint64_t max_message_age_ns_; + ClockFn clock_fn_; + uint64_t now_ns() const; void cleanup_old_messages(); }; diff --git a/app/include/pj_bridge/middleware/websocket_middleware.hpp b/app/include/pj_bridge/middleware/websocket_middleware.hpp index 2c2cefa..5cf96a6 100644 --- a/app/include/pj_bridge/middleware/websocket_middleware.hpp +++ b/app/include/pj_bridge/middleware/websocket_middleware.hpp @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -65,7 +64,6 @@ class WebSocketMiddleware : public MiddlewareInterface { std::queue incoming_queue_; mutable std::mutex queue_mutex_; - std::condition_variable queue_cv_; std::unordered_map> clients_; mutable std::mutex clients_mutex_; @@ -75,8 +73,9 @@ class WebSocketMiddleware : public MiddlewareInterface { mutable std::mutex state_mutex_; bool initialized_; + /// Stop thread that exceeded the shutdown timeout; joined in the destructor. + std::thread pending_stop_thread_; - static constexpr int kReceiveTimeoutMs = 10; static constexpr int kShutdownTimeoutSeconds = 3; static constexpr size_t kMaxIncomingQueueSize = 1024; }; diff --git a/app/include/pj_bridge/session_manager.hpp b/app/include/pj_bridge/session_manager.hpp index 5d23681..6f70314 100644 --- a/app/include/pj_bridge/session_manager.hpp +++ b/app/include/pj_bridge/session_manager.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include namespace pj_bridge { @@ -30,6 +31,11 @@ namespace pj_bridge { struct Session { std::string client_id; std::unordered_map subscribed_topics; + /// Topics for which this session currently holds a middleware subscription + /// ref. Invariant: a ref is held iff the topic is in this set — pause + /// releases all refs, resume re-acquires them, and cleanup releases exactly + /// the held ones (never relying on the paused flag). + std::unordered_set ref_held_topics; std::chrono::steady_clock::time_point last_heartbeat; std::chrono::steady_clock::time_point created_at; bool paused{false}; @@ -53,6 +59,8 @@ class SessionManager { bool session_exists(const std::string& client_id) const; bool set_paused(const std::string& client_id, bool paused); bool is_paused(const std::string& client_id) const; + bool set_ref_held(const std::string& client_id, const std::string& topic, bool held); + std::unordered_set get_ref_held_topics(const std::string& client_id) const; private: std::unordered_map sessions_; diff --git a/app/include/pj_bridge/topic_source_interface.hpp b/app/include/pj_bridge/topic_source_interface.hpp index 684dd2c..3dff34a 100644 --- a/app/include/pj_bridge/topic_source_interface.hpp +++ b/app/include/pj_bridge/topic_source_interface.hpp @@ -50,8 +50,9 @@ class TopicSourceInterface { /// Return the full schema definition for a topic. /// @param topic_name fully-qualified topic name /// @return the schema text (e.g. concatenated .msg definitions for ROS2, - /// OMG IDL for RTI). Empty string if the schema cannot be resolved. - /// @throws std::exception on unrecoverable schema extraction errors. + /// OMG IDL for RTI). An empty string is a VALID schema — some types + /// have legitimately empty definitions (e.g. std_msgs/msg/Empty). + /// @throws std::exception when the schema cannot be resolved. virtual std::string get_schema(const std::string& topic_name) = 0; /// Return the encoding identifier for schemas produced by this source. diff --git a/app/src/bridge_server.cpp b/app/src/bridge_server.cpp index 6bff5b5..1846084 100644 --- a/app/src/bridge_server.cpp +++ b/app/src/bridge_server.cpp @@ -23,6 +23,8 @@ #include #include +#include +#include #include "pj_bridge/message_serializer.hpp" #include "pj_bridge/protocol_constants.hpp" @@ -31,6 +33,30 @@ using json = nlohmann::json; namespace pj_bridge { +namespace { + +// Bounds for client-supplied max_rate_hz. The publish path stores rates as +// integer milli-hertz in an int, so values above kMaxRateHz would overflow +// the cast (UB) and values below kMinRateHz would truncate to 0 == unlimited, +// the opposite of the client's intent. +constexpr double kMaxRateHz = 1e6; +constexpr double kMinRateHz = 0.001; + +double clamp_rate_hz(double rate_hz) { + if (rate_hz <= 0.0) { + return 0.0; // 0 (or negative) = no rate limit + } + if (rate_hz < kMinRateHz) { + return kMinRateHz; + } + if (rate_hz > kMaxRateHz) { + return kMaxRateHz; + } + return rate_hz; +} + +} // namespace + BridgeServer::BridgeServer( std::shared_ptr topic_source, std::shared_ptr subscription_manager, std::shared_ptr middleware, @@ -116,19 +142,28 @@ bool BridgeServer::process_requests() { return false; } - // Receive request (non-blocking with timeout) - std::vector request_data; - std::string client_id; - bool received = middleware_->receive_request(request_data, client_id); - - if (!received || request_data.empty()) { - return false; + // Drain all pending requests. A single-request-per-call policy driven by a + // 10 ms timer caps throughput at ~100 req/s and lets one chatty client + // delay everyone else's heartbeats. The cap bounds one tick's latency. + constexpr int kMaxRequestsPerCall = 256; + bool processed_any = false; + for (int i = 0; i < kMaxRequestsPerCall; i++) { + std::vector request_data; + std::string client_id; + if (!middleware_->receive_request(request_data, client_id) || request_data.empty()) { + break; + } + process_single_request(request_data, client_id); + processed_any = true; } + return processed_any; +} +void BridgeServer::process_single_request(const std::vector& request_data, const std::string& client_id) { std::string request(request_data.begin(), request_data.end()); if (client_id.empty()) { spdlog::warn("Received request with no client identity"); - return true; + return; } // Parse request JSON @@ -174,7 +209,6 @@ bool BridgeServer::process_requests() { spdlog::warn("Failed to send response to client '{}', cleaning up session", client_id); cleanup_session(client_id); } - return true; } std::string BridgeServer::handle_get_topics(const std::string& client_id, const nlohmann::json& request) { @@ -225,17 +259,16 @@ std::string BridgeServer::handle_subscribe(const std::string& client_id, const n // Parse requested topics — supports mixed array of strings and objects: // ["topic1", {"name": "topic2", "max_rate_hz": 10.0}] - std::unordered_map requested_topics; + // A bare string means "no rate specified" (nullopt), which must not + // overwrite a previously configured rate on re-subscribe. + std::unordered_map> requested_topics; for (const auto& topic : request["topics"]) { if (topic.is_string()) { - requested_topics[topic.get()] = 0.0; + requested_topics[topic.get()] = std::nullopt; } else if (topic.is_object() && topic.contains("name") && topic["name"].is_string()) { - double rate_hz = 0.0; + std::optional rate_hz; if (topic.contains("max_rate_hz") && topic["max_rate_hz"].is_number()) { - rate_hz = topic["max_rate_hz"].get(); - if (rate_hz < 0.0) { - rate_hz = 0.0; - } + rate_hz = clamp_rate_hz(topic["max_rate_hz"].get()); } requested_topics[topic["name"].get()] = rate_hz; } @@ -256,9 +289,12 @@ std::string BridgeServer::handle_subscribe(const std::string& client_id, const n topic_types[topic.name] = topic.type; } - // Subscribe to new topics — track successes and failures + // Validate topics and extract schemas BEFORE taking any refs, so a schema + // failure can never corrupt the reference count. json schemas = json::object(); json failures = json::array(); + std::vector> validated; // (name, type) + std::unordered_map extracted_schemas; for (const auto& topic_name : topics_to_add) { if (topic_types.find(topic_name) == topic_types.end()) { @@ -272,8 +308,8 @@ std::string BridgeServer::handle_subscribe(const std::string& client_id, const n std::string topic_type = topic_types[topic_name]; - // Get schema BEFORE subscribing to avoid corrupting the reference count - // if schema extraction fails after subscribe() increments it. + // get_schema throws on failure; an empty return is a legitimately empty + // definition (e.g. std_msgs/msg/Empty) and must not fail the subscribe. std::string schema; try { schema = topic_source_->get_schema(topic_name); @@ -286,46 +322,58 @@ std::string BridgeServer::handle_subscribe(const std::string& client_id, const n continue; } - if (schema.empty()) { - spdlog::error("Empty schema for topic '{}' (type: {})", topic_name, topic_type); - json failure; - failure["topic"] = topic_name; - failure["reason"] = "Schema extraction returned empty definition"; - failures.push_back(failure); - continue; - } + validated.emplace_back(topic_name, topic_type); + extracted_schemas[topic_name] = std::move(schema); + } - // Subscribe via subscription manager (ref-counted) - bool success = subscription_manager_->subscribe(topic_name, topic_type); - if (!success) { - spdlog::error("Failed to subscribe to topic '{}'", topic_name); - json failure; - failure["topic"] = topic_name; - failure["reason"] = "Subscription manager failed to create subscription"; - failures.push_back(failure); - continue; - } + // Acquire refs and record subscriptions atomically with respect to + // pause/resume and cleanup_session (which run on other threads). + // Invariant: a middleware ref is held iff set_ref_held(topic, true). + // Paused clients hold no refs — their topics are acquired on resume. + { + std::lock_guard lock(cleanup_mutex_); + const bool paused = session_manager_->is_paused(client_id); + + for (const auto& [topic_name, topic_type] : validated) { + bool ref_acquired = false; + if (!paused) { + if (!subscription_manager_->subscribe(topic_name, topic_type)) { + spdlog::error("Failed to subscribe to topic '{}'", topic_name); + json failure; + failure["topic"] = topic_name; + failure["reason"] = "Subscription manager failed to create subscription"; + failures.push_back(failure); + continue; + } + ref_acquired = true; + } - // Immediately add to session. If session is gone (client disconnected - // between subscribe and now), roll back the ref count. - if (!session_manager_->add_subscription(client_id, topic_name, requested_topics[topic_name])) { - spdlog::warn("Session gone for client '{}', rolling back subscribe for '{}'", client_id, topic_name); - subscription_manager_->unsubscribe(topic_name); - continue; - } + // Add to session. If the session is gone (client disconnected between + // request and now), roll back the ref. + const double rate = requested_topics[topic_name].value_or(0.0); + if (!session_manager_->add_subscription(client_id, topic_name, rate)) { + spdlog::warn("Session gone for client '{}', rolling back subscribe for '{}'", client_id, topic_name); + if (ref_acquired) { + subscription_manager_->unsubscribe(topic_name); + } + continue; + } + session_manager_->set_ref_held(client_id, topic_name, ref_acquired); - nlohmann::json schema_obj; - schema_obj["encoding"] = topic_source_->schema_encoding(); - schema_obj["definition"] = schema; - schemas[topic_name] = schema_obj; + nlohmann::json schema_obj; + schema_obj["encoding"] = topic_source_->schema_encoding(); + schema_obj["definition"] = extracted_schemas[topic_name]; + schemas[topic_name] = schema_obj; - spdlog::info("Client '{}' subscribed to topic '{}' (type: {})", client_id, topic_name, topic_type); - } + spdlog::info("Client '{}' subscribed to topic '{}' (type: {})", client_id, topic_name, topic_type); + } - // Update rates for topics that were already subscribed but may have a new rate - for (const auto& [topic, rate] : requested_topics) { - if (current_subs.find(topic) != current_subs.end()) { - session_manager_->add_subscription(client_id, topic, rate); + // Update rates for already-subscribed topics, only where a rate was + // explicitly given (bare strings must not reset an existing limit). + for (const auto& [topic, rate] : requested_topics) { + if (rate.has_value() && current_subs.find(topic) != current_subs.end()) { + session_manager_->add_subscription(client_id, topic, *rate); + } } } @@ -376,31 +424,38 @@ std::string BridgeServer::handle_unsubscribe(const std::string& client_id, const return create_error_response("INVALID_REQUEST", "Missing or invalid 'topics' array", request); } - auto current_subs = session_manager_->get_subscriptions(client_id); - std::vector removed; - for (const auto& topic_item : request["topics"]) { - std::string topic_name; - if (topic_item.is_string()) { - topic_name = topic_item.get(); - } else if (topic_item.is_object() && topic_item.contains("name") && topic_item["name"].is_string()) { - topic_name = topic_item["name"].get(); - } else { - continue; - } + { + std::lock_guard lock(cleanup_mutex_); + auto current_subs = session_manager_->get_subscriptions(client_id); + auto held_refs = session_manager_->get_ref_held_topics(client_id); + + for (const auto& topic_item : request["topics"]) { + std::string topic_name; + if (topic_item.is_string()) { + topic_name = topic_item.get(); + } else if (topic_item.is_object() && topic_item.contains("name") && topic_item["name"].is_string()) { + topic_name = topic_item["name"].get(); + } else { + continue; + } - if (current_subs.find(topic_name) != current_subs.end()) { - subscription_manager_->unsubscribe(topic_name); - current_subs.erase(topic_name); - removed.push_back(topic_name); + if (current_subs.find(topic_name) != current_subs.end()) { + // Release the middleware ref only if this session actually holds one + // (a paused client's refs were already released by pause). + if (held_refs.count(topic_name) > 0) { + subscription_manager_->unsubscribe(topic_name); + } + session_manager_->remove_subscription(client_id, topic_name); + current_subs.erase(topic_name); + removed.push_back(topic_name); - spdlog::info("Client '{}' unsubscribed from topic '{}'", client_id, topic_name); + spdlog::info("Client '{}' unsubscribed from topic '{}'", client_id, topic_name); + } } } - session_manager_->update_subscriptions(client_id, current_subs); - json response; response["status"] = "success"; response["removed"] = removed; @@ -434,25 +489,32 @@ std::string BridgeServer::handle_pause(const std::string& client_id, const nlohm session_manager_->update_heartbeat(client_id); - if (session_manager_->is_paused(client_id)) { - spdlog::debug("Client '{}' already paused", client_id); - json response; - response["status"] = "ok"; - response["paused"] = true; - inject_response_fields(response, request); - return response.dump(); - } + size_t released = 0; + { + std::lock_guard lock(cleanup_mutex_); + + if (session_manager_->is_paused(client_id)) { + spdlog::debug("Client '{}' already paused", client_id); + json response; + response["status"] = "ok"; + response["paused"] = true; + inject_response_fields(response, request); + return response.dump(); + } - session_manager_->set_paused(client_id, true); + session_manager_->set_paused(client_id, true); - // Decrement ref counts for all subscribed topics - auto subs = session_manager_->get_subscriptions(client_id); - for (const auto& [topic, rate] : subs) { - subscription_manager_->unsubscribe(topic); - spdlog::debug("Decremented ref count for topic '{}' (client '{}' paused)", topic, client_id); + // Release exactly the refs this session holds + auto held_refs = session_manager_->get_ref_held_topics(client_id); + for (const auto& topic : held_refs) { + subscription_manager_->unsubscribe(topic); + session_manager_->set_ref_held(client_id, topic, false); + spdlog::debug("Released ref for topic '{}' (client '{}' paused)", topic, client_id); + } + released = held_refs.size(); } - spdlog::info("Client '{}' paused ({} topics refs decremented)", client_id, subs.size()); + spdlog::info("Client '{}' paused ({} topic refs released)", client_id, released); json response; response["status"] = "ok"; @@ -469,55 +531,66 @@ std::string BridgeServer::handle_resume(const std::string& client_id, const nloh session_manager_->update_heartbeat(client_id); - if (!session_manager_->is_paused(client_id)) { - spdlog::debug("Client '{}' not paused", client_id); - json response; - response["status"] = "ok"; - response["paused"] = false; - inject_response_fields(response, request); - return response.dump(); - } - - session_manager_->set_paused(client_id, false); - - // Build topic name → type lookup for subscribe calls + // Build topic name → type lookup for subscribe calls (graph query — keep + // it outside the cleanup lock) auto available_topics = topic_source_->get_topics(); std::unordered_map topic_types; for (const auto& t : available_topics) { topic_types[t.name] = t.type; } - // Increment ref counts for all subscribed topics - auto subs = session_manager_->get_subscriptions(client_id); std::vector failed_topics; - for (const auto& [topic, rate] : subs) { - auto type_it = topic_types.find(topic); - if (type_it == topic_types.end()) { - spdlog::warn("Topic '{}' no longer exists, cannot re-subscribe for client '{}'", topic, client_id); - failed_topics.push_back(topic); - continue; - } - bool ok = subscription_manager_->subscribe(topic, type_it->second); - if (ok) { - spdlog::debug("Incremented ref count for topic '{}' (client '{}' resumed)", topic, client_id); - } else { - spdlog::warn("Topic '{}' subscription failed on resume for client '{}'", topic, client_id); - failed_topics.push_back(topic); + size_t resubscribed = 0; + { + std::lock_guard lock(cleanup_mutex_); + + if (!session_manager_->is_paused(client_id)) { + spdlog::debug("Client '{}' not paused", client_id); + json response; + response["status"] = "ok"; + response["paused"] = false; + inject_response_fields(response, request); + return response.dump(); } - } - // Remove failed topics from session to prevent ref-count underflow on disconnect - for (const auto& topic : failed_topics) { - session_manager_->remove_subscription(client_id, topic); + session_manager_->set_paused(client_id, false); + + // Re-acquire refs for all session topics not already held. Topics that + // are temporarily missing from discovery stay in the session (per the + // pause contract: subscriptions are preserved) with no ref held — they + // are re-acquired on a later resume once the publisher is back. + auto subs = session_manager_->get_subscriptions(client_id); + auto held_refs = session_manager_->get_ref_held_topics(client_id); + for (const auto& [topic, rate] : subs) { + if (held_refs.count(topic) > 0) { + continue; + } + auto type_it = topic_types.find(topic); + if (type_it == topic_types.end()) { + spdlog::warn("Topic '{}' currently unavailable, keeping subscription for client '{}'", topic, client_id); + failed_topics.push_back(topic); + continue; + } + if (subscription_manager_->subscribe(topic, type_it->second)) { + session_manager_->set_ref_held(client_id, topic, true); + resubscribed++; + spdlog::debug("Re-acquired ref for topic '{}' (client '{}' resumed)", topic, client_id); + } else { + spdlog::warn("Topic '{}' subscription failed on resume for client '{}'", topic, client_id); + failed_topics.push_back(topic); + } + } } spdlog::info( - "Client '{}' resumed ({} topics re-subscribed, {} failed)", client_id, subs.size() - failed_topics.size(), - failed_topics.size()); + "Client '{}' resumed ({} topics re-subscribed, {} unavailable)", client_id, resubscribed, failed_topics.size()); json response; response["status"] = "ok"; response["paused"] = false; + if (!failed_topics.empty()) { + response["unavailable_topics"] = failed_topics; + } inject_response_fields(response, request); return response.dump(); } @@ -560,15 +633,12 @@ void BridgeServer::cleanup_session(const std::string& client_id) { return; } - auto subscriptions = session_manager_->get_subscriptions(client_id); - bool was_paused = session_manager_->is_paused(client_id); - - // Unsubscribe from all topics (only if not paused — paused clients already decremented ref counts) - if (!was_paused) { - for (const auto& [topic, rate] : subscriptions) { - subscription_manager_->unsubscribe(topic); - spdlog::debug("Unsubscribed client '{}' from topic '{}'", client_id, topic); - } + // Release exactly the refs this session holds. Paused clients hold none; + // there is no separate paused check to race against. + auto held_refs = session_manager_->get_ref_held_topics(client_id); + for (const auto& topic : held_refs) { + subscription_manager_->unsubscribe(topic); + spdlog::debug("Unsubscribed client '{}' from topic '{}'", client_id, topic); } session_manager_->remove_session(client_id); @@ -578,7 +648,7 @@ void BridgeServer::cleanup_session(const std::string& client_id) { last_sent_times_.erase(client_id); } - spdlog::info("Cleaned up session for client '{}' ({} topics unsubscribed)", client_id, subscriptions.size()); + spdlog::info("Cleaned up session for client '{}' ({} topic refs released)", client_id, held_refs.size()); } size_t BridgeServer::get_active_session_count() const { diff --git a/app/src/message_buffer.cpp b/app/src/message_buffer.cpp index ce03343..653f80f 100644 --- a/app/src/message_buffer.cpp +++ b/app/src/message_buffer.cpp @@ -23,7 +23,12 @@ namespace pj_bridge { -MessageBuffer::MessageBuffer(uint64_t max_message_age_ns) : max_message_age_ns_(max_message_age_ns) {} +MessageBuffer::MessageBuffer(uint64_t max_message_age_ns, ClockFn clock_fn) + : max_message_age_ns_(max_message_age_ns), clock_fn_(std::move(clock_fn)) {} + +uint64_t MessageBuffer::now_ns() const { + return clock_fn_ ? clock_fn_() : get_current_time_ns(); +} void MessageBuffer::add_message( const std::string& topic_name, uint64_t timestamp_ns, std::shared_ptr> data) { @@ -33,7 +38,7 @@ void MessageBuffer::add_message( BufferedMessage msg; msg.timestamp_ns = timestamp_ns; - msg.received_at_ns = get_current_time_ns(); + msg.received_at_ns = now_ns(); msg.data = std::move(data); topic_buffers_[topic_name].push_back(std::move(msg)); @@ -62,12 +67,15 @@ size_t MessageBuffer::size() const { } void MessageBuffer::cleanup_old_messages() { - uint64_t current_time = get_current_time_ns(); + uint64_t current_time = now_ns(); for (auto& [topic, buffer] : topic_buffers_) { while (!buffer.empty()) { const auto& oldest_msg = buffer.front(); - if (current_time - oldest_msg.received_at_ns > max_message_age_ns_) { + // received_at_ns can be ahead of current_time after a backwards + // wall-clock step (NTP); unsigned subtraction would wrap and purge + // fresh messages, so only age messages the clock is actually past. + if (current_time > oldest_msg.received_at_ns && current_time - oldest_msg.received_at_ns > max_message_age_ns_) { buffer.pop_front(); } else { break; diff --git a/app/src/middleware/websocket_middleware.cpp b/app/src/middleware/websocket_middleware.cpp index d48f693..d8e325d 100644 --- a/app/src/middleware/websocket_middleware.cpp +++ b/app/src/middleware/websocket_middleware.cpp @@ -30,6 +30,18 @@ WebSocketMiddleware::WebSocketMiddleware() : initialized_(false) {} WebSocketMiddleware::~WebSocketMiddleware() { shutdown(); + + // Join any stop thread that outlived its shutdown timeout. Blocking here + // is deliberate: letting it run detached past main() races static + // destruction. + std::thread pending; + { + std::lock_guard lock(state_mutex_); + pending = std::move(pending_stop_thread_); + } + if (pending.joinable()) { + pending.join(); + } } tl::expected WebSocketMiddleware::initialize(uint16_t port) { @@ -44,11 +56,16 @@ tl::expected WebSocketMiddleware::initialize(uint16_t port) { server_->setOnClientMessageCallback([this]( std::shared_ptr connection_state, ix::WebSocket& web_socket, const ix::WebSocketMessagePtr& msg) { + // Copy the server pointer under the same lock as the initialized_ check: + // shutdown() concurrently moves server_ away, so touching the member + // after releasing the lock would dereference a null shared_ptr. + std::shared_ptr server; { std::lock_guard state_lock(state_mutex_); - if (!initialized_) { + if (!initialized_ || !server_) { return; } + server = server_; } std::string client_id = connection_state->getId(); @@ -56,7 +73,7 @@ tl::expected WebSocketMiddleware::initialize(uint16_t port) { if (msg->type == ix::WebSocketMessageType::Open) { { std::lock_guard clients_lock(clients_mutex_); - for (const auto& client : server_->getClients()) { + for (const auto& client : server->getClients()) { if (client.get() == &web_socket) { clients_[client_id] = client; break; @@ -104,7 +121,6 @@ tl::expected WebSocketMiddleware::initialize(uint16_t port) { } incoming_queue_.push(std::move(req)); } - queue_cv_.notify_one(); } } }); @@ -138,12 +154,10 @@ void WebSocketMiddleware::shutdown() { server_to_stop = std::move(server_); } - queue_cv_.notify_all(); - if (server_to_stop) { // Replace the message callback with a no-op before stopping. The original - // callback captures `this`, so it must not fire after the WebSocketMiddleware - // instance is destroyed (which can happen if the stop thread is detached on timeout). + // callback captures `this`, so it must not fire while the stop thread + // outlives this shutdown call (it is joined in the destructor on timeout). server_to_stop->setOnClientMessageCallback( [](std::shared_ptr, ix::WebSocket&, const ix::WebSocketMessagePtr&) {}); @@ -173,14 +187,19 @@ void WebSocketMiddleware::shutdown() { stop_thread.join(); spdlog::debug("[shutdown] Server stop completed (took {}ms)", elapsed_ms); } else { - // Timeout reached. Detach the thread and let it finish in the background. - // The thread captures server_to_stop by shared_ptr, so the server - // will be destroyed when stop() finishes and the thread exits. - // The message callback was already cleared above, so no use-after-free. + // Timeout reached. Keep the thread joinable and hand it to the + // destructor: a detached thread still running IXWebSocket teardown + // during static destruction (after main() returns) is undefined + // behavior. shutdown() itself stays bounded; only final destruction + // waits for a hung stop() to complete. spdlog::warn( - "[shutdown] Server stop timed out after {}s ({}ms elapsed), detaching shutdown thread", + "[shutdown] Server stop timed out after {}s ({}ms elapsed), deferring join to destructor", kShutdownTimeoutSeconds, elapsed_ms); - stop_thread.detach(); + std::lock_guard lock(state_mutex_); + if (pending_stop_thread_.joinable()) { + pending_stop_thread_.join(); + } + pending_stop_thread_ = std::move(stop_thread); } } @@ -204,10 +223,12 @@ bool WebSocketMiddleware::receive_request(std::vector& data, std::strin } } - std::unique_lock lock(queue_mutex_); + // Truly non-blocking: callers poll from timers/event loops, so waiting on + // the condition variable here would park the caller's thread (up to 10 ms + // per empty poll) and delay everything else it drives. + std::lock_guard lock(queue_mutex_); - if (!queue_cv_.wait_for( - lock, std::chrono::milliseconds(kReceiveTimeoutMs), [this]() { return !incoming_queue_.empty(); })) { + if (incoming_queue_.empty()) { return false; } diff --git a/app/src/session_manager.cpp b/app/src/session_manager.cpp index dc3f605..a8c7008 100644 --- a/app/src/session_manager.cpp +++ b/app/src/session_manager.cpp @@ -75,6 +75,14 @@ bool SessionManager::update_subscriptions( } it->second.subscribed_topics = topics; + // Refs can only be held for subscribed topics + for (auto held_it = it->second.ref_held_topics.begin(); held_it != it->second.ref_held_topics.end();) { + if (topics.find(*held_it) == topics.end()) { + held_it = it->second.ref_held_topics.erase(held_it); + } else { + ++held_it; + } + } return true; } @@ -98,9 +106,37 @@ bool SessionManager::remove_subscription(const std::string& client_id, const std return false; } + it->second.ref_held_topics.erase(topic); return it->second.subscribed_topics.erase(topic) > 0; } +bool SessionManager::set_ref_held(const std::string& client_id, const std::string& topic, bool held) { + std::lock_guard lock(mutex_); + + auto it = sessions_.find(client_id); + if (it == sessions_.end()) { + return false; + } + + if (held) { + it->second.ref_held_topics.insert(topic); + } else { + it->second.ref_held_topics.erase(topic); + } + return true; +} + +std::unordered_set SessionManager::get_ref_held_topics(const std::string& client_id) const { + std::lock_guard lock(mutex_); + + auto it = sessions_.find(client_id); + if (it == sessions_.end()) { + return {}; + } + + return it->second.ref_held_topics; +} + std::unordered_map SessionManager::get_subscriptions(const std::string& client_id) const { std::lock_guard lock(mutex_); diff --git a/docs/API.md b/docs/API.md index e193b78..ebfcc90 100644 --- a/docs/API.md +++ b/docs/API.md @@ -92,9 +92,11 @@ Discover available ROS2 topics. Subscribe to one or more topics. **Breaking change:** Subscribe now uses an additive model - it only adds topics without removing existing subscriptions. Use the `unsubscribe` command to remove topics. -Each topic in the array can be either a plain string (unlimited rate) or an object with a `max_rate_hz` field for per-topic rate limiting. Both formats can be mixed in the same request. +Each topic in the array can be either a plain string or an object with a `max_rate_hz` field for per-topic rate limiting. Both formats can be mixed in the same request. -When `max_rate_hz` is set, the server decimates messages for that topic, sending at most one message per rate interval (the latest available). A value of `0` or omitting the field means unlimited (all messages forwarded). +When `max_rate_hz` is set, the server decimates messages for that topic, sending at most one message per rate interval (the first eligible buffered message). An explicit value of `0` means unlimited (all messages forwarded). A plain string leaves the rate unspecified: new subscriptions default to unlimited, and re-subscribing to an already-subscribed topic with a plain string preserves its previously configured rate limit. + +Rates are clamped server-side to the representable range `[0.001, 1000000]` Hz (values below 0.001 are raised to 0.001; values above 1e6 are lowered to 1e6). The effective rate is echoed in `rate_limits`. **Request (string-only, backward compatible):** ```json @@ -172,7 +174,7 @@ Topics not currently subscribed are silently ignored. ## Pause / Resume -Pause stops binary frame delivery to the client. Subscriptions and rate limits are preserved. +Pause stops binary frame delivery to the client. Subscriptions and rate limits are preserved — including topics whose publisher disappears while paused: they stay subscribed and are re-acquired on a later resume once the publisher is back. Resume restarts binary frame delivery. **Pause Request:** @@ -195,6 +197,11 @@ Resume restarts binary frame delivery. {"status": "ok", "id": "r1", "protocol_version": 1, "paused": false} ``` +If some subscribed topics are not currently available (publisher down) or fail to re-subscribe at resume time, the response includes an `unavailable_topics` array listing them. These subscriptions are kept and re-acquired on a later resume: +```json +{"status": "ok", "id": "r1", "protocol_version": 1, "paused": false, "unavailable_topics": ["/camera/image"]} +``` + Both commands are idempotent. Smart ROS2 management: when all clients interested in a topic are paused, the ROS2 subscription is released. ## Heartbeat diff --git a/ros2/include/pj_bridge_ros2/generic_subscription_manager.hpp b/ros2/include/pj_bridge_ros2/generic_subscription_manager.hpp index 5df3bfb..8343555 100644 --- a/ros2/include/pj_bridge_ros2/generic_subscription_manager.hpp +++ b/ros2/include/pj_bridge_ros2/generic_subscription_manager.hpp @@ -56,6 +56,8 @@ class GenericSubscriptionManager { void unsubscribe_all(); private: + rclcpp::QoS adapt_qos(const std::string& topic_name) const; + struct SubscriptionInfo { std::shared_ptr subscription; size_t reference_count; diff --git a/ros2/include/pj_bridge_ros2/schema_extractor.hpp b/ros2/include/pj_bridge_ros2/schema_extractor.hpp index 1c8c778..5d21954 100644 --- a/ros2/include/pj_bridge_ros2/schema_extractor.hpp +++ b/ros2/include/pj_bridge_ros2/schema_extractor.hpp @@ -26,6 +26,8 @@ #include #include +#include "tl/expected.hpp" + namespace pj_bridge { /** @@ -44,10 +46,23 @@ class SchemaExtractor { * @brief Get message definition text * * @param message_type Full message type name (e.g., "std_msgs/msg/String") - * @return Message definition text, or empty string on failure + * @return Message definition text, or empty string on failure. + * Note: a legitimately empty definition (std_msgs/msg/Empty) is + * indistinguishable from failure here — use + * try_get_message_definition() to tell them apart. */ std::string get_message_definition(const std::string& message_type); + /** + * @brief Get message definition text, distinguishing failure from a + * legitimately empty definition + * + * @param message_type Full message type name (e.g., "std_msgs/msg/String") + * @return Definition text (possibly empty, e.g. std_msgs/msg/Empty) on + * success, or an error description on failure. + */ + tl::expected try_get_message_definition(const std::string& message_type); + private: bool parse_message_type(const std::string& message_type, std::string& library_name, std::string& type_name) const; diff --git a/ros2/src/generic_subscription_manager.cpp b/ros2/src/generic_subscription_manager.cpp index f79223f..9e27b73 100644 --- a/ros2/src/generic_subscription_manager.cpp +++ b/ros2/src/generic_subscription_manager.cpp @@ -25,6 +25,41 @@ namespace pj_bridge { GenericSubscriptionManager::GenericSubscriptionManager(rclcpp::Node::SharedPtr node) : node_(node) {} +rclcpp::QoS GenericSubscriptionManager::adapt_qos(const std::string& topic_name) const { + // Match the QoS the publishers actually offer (same policy as rosbag2): + // a RELIABLE subscription never matches a BEST_EFFORT publisher (sensor + // topics!) and a VOLATILE one misses latched (TRANSIENT_LOCAL) samples. + rclcpp::QoS qos(100); + + auto publishers = node_->get_publishers_info_by_topic(topic_name); + if (publishers.empty()) { + return qos; + } + + bool any_best_effort = false; + bool all_transient_local = true; + for (const auto& info : publishers) { + const auto& profile = info.qos_profile(); + if (profile.reliability() == rclcpp::ReliabilityPolicy::BestEffort) { + any_best_effort = true; + } + if (profile.durability() != rclcpp::DurabilityPolicy::TransientLocal) { + all_transient_local = false; + } + } + + // BEST_EFFORT matches both kinds of publisher; TRANSIENT_LOCAL only + // matches if every publisher offers it. + if (any_best_effort) { + qos.best_effort(); + } + if (all_transient_local) { + qos.transient_local(); + } + + return qos; +} + bool GenericSubscriptionManager::subscribe( const std::string& topic_name, const std::string& topic_type, Ros2MessageCallback callback) { std::lock_guard lock(mutex_); @@ -42,7 +77,7 @@ bool GenericSubscriptionManager::subscribe( callback(topic_name, msg, receive_time); }; - auto subscription = node_->create_generic_subscription(topic_name, topic_type, rclcpp::QoS(100), sub_callback); + auto subscription = node_->create_generic_subscription(topic_name, topic_type, adapt_qos(topic_name), sub_callback); subscriptions_[topic_name] = SubscriptionInfo{subscription, 1}; diff --git a/ros2/src/main.cpp b/ros2/src/main.cpp index a773368..6cc8607 100644 --- a/ros2/src/main.cpp +++ b/ros2/src/main.cpp @@ -81,13 +81,13 @@ int main(int argc, char** argv) { auto timeout_timer = node->create_wall_timer(1s, [&server]() { server.check_session_timeouts(); }); - // Spin until shutdown + // Spin until shutdown. spin() blocks waiting for work and returns when + // rclcpp::shutdown() runs (e.g. on SIGINT) — unlike spin_some() in a + // loop, which returns immediately when idle and busy-spins a full core. rclcpp::executors::SingleThreadedExecutor executor; executor.add_node(node); - while (rclcpp::ok()) { - executor.spin_some(100ms); - } + executor.spin(); // Graceful shutdown RCLCPP_INFO(node->get_logger(), "Shutting down bridge server..."); diff --git a/ros2/src/ros2_topic_source.cpp b/ros2/src/ros2_topic_source.cpp index bfde758..6b6753e 100644 --- a/ros2/src/ros2_topic_source.cpp +++ b/ros2/src/ros2_topic_source.cpp @@ -19,6 +19,8 @@ #include "pj_bridge_ros2/ros2_topic_source.hpp" +#include + #include "pj_bridge/protocol_constants.hpp" namespace pj_bridge { @@ -40,10 +42,16 @@ std::vector Ros2TopicSource::get_topics() { std::string Ros2TopicSource::get_schema(const std::string& topic_name) { auto it = topic_type_cache_.find(topic_name); if (it == topic_type_cache_.end()) { - return ""; + throw std::runtime_error("unknown topic '" + topic_name + "'"); + } + + auto definition = schema_extractor_.try_get_message_definition(it->second); + if (!definition.has_value()) { + throw std::runtime_error(definition.error()); } - return schema_extractor_.get_message_definition(it->second); + // May be legitimately empty (e.g. std_msgs/msg/Empty) + return *definition; } std::string Ros2TopicSource::schema_encoding() const { diff --git a/ros2/src/schema_extractor.cpp b/ros2/src/schema_extractor.cpp index 848d4e9..d920092 100644 --- a/ros2/src/schema_extractor.cpp +++ b/ros2/src/schema_extractor.cpp @@ -30,6 +30,11 @@ namespace pj_bridge { std::string SchemaExtractor::get_message_definition(const std::string& message_type) { + auto result = try_get_message_definition(message_type); + return result.has_value() ? *result : ""; +} + +tl::expected SchemaExtractor::try_get_message_definition(const std::string& message_type) { { std::shared_lock lock(cache_mutex_); auto it = definition_cache_.find(message_type); @@ -42,14 +47,14 @@ std::string SchemaExtractor::get_message_definition(const std::string& message_t std::string type_name; if (!parse_message_type(message_type, package_name, type_name)) { - return ""; + return tl::unexpected("invalid message type format: '" + message_type + "'"); } std::ostringstream result; std::unordered_set processed_types; if (!build_message_definition_recursive(package_name, type_name, result, processed_types, true)) { - return ""; + return tl::unexpected("failed to resolve message definition for '" + message_type + "'"); } std::string definition = result.str(); @@ -143,6 +148,13 @@ bool SchemaExtractor::build_message_definition_recursive( base_type = base_type.substr(0, bracket_pos); } + // Strip a bound suffix ("string<=256" is a bounded builtin string, + // not a nested message type) + size_t bound_pos = base_type.find('<'); + if (bound_pos != std::string::npos) { + base_type = base_type.substr(0, bound_pos); + } + static const std::unordered_set kBuiltinTypes = {"bool", "byte", "char", "float32", "float64", "int8", "uint8", "int16", "uint16", "int32", "uint32", "int64", "uint64", "string", "wstring"}; diff --git a/tests/unit/test_bridge_server.cpp b/tests/unit/test_bridge_server.cpp index 12f3d51..781eee0 100644 --- a/tests/unit/test_bridge_server.cpp +++ b/tests/unit/test_bridge_server.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -182,6 +183,9 @@ class MockTopicSource : public TopicSourceInterface { } std::string get_schema(const std::string& topic_name) override { + if (empty_schema_topics_.count(topic_name) > 0) { + return ""; // legitimately empty definition (e.g. std_msgs/msg/Empty) + } for (const auto& t : topics_) { if (t.name == topic_name) { return "uint32 seq\nstring data\n"; @@ -206,8 +210,14 @@ class MockTopicSource : public TopicSourceInterface { topics_.end()); } + // Test helper: mark a topic whose schema is legitimately empty + void set_empty_schema_topic(const std::string& name) { + empty_schema_topics_.insert(name); + } + private: std::vector topics_; + std::unordered_set empty_schema_topics_; }; // --------------------------------------------------------------------------- @@ -216,10 +226,12 @@ class MockTopicSource : public TopicSourceInterface { class MockSubscriptionManager : public SubscriptionManagerInterface { public: void set_message_callback(MessageCallback callback) override { + std::lock_guard lock(mutex_); callback_ = std::move(callback); } bool subscribe(const std::string& topic_name, const std::string& /*topic_type*/) override { + std::lock_guard lock(mutex_); if (known_topics_.find(topic_name) == known_topics_.end()) { return false; } @@ -228,6 +240,7 @@ class MockSubscriptionManager : public SubscriptionManagerInterface { } bool unsubscribe(const std::string& topic_name) override { + std::lock_guard lock(mutex_); auto it = ref_counts_.find(topic_name); if (it == ref_counts_.end() || it->second <= 0) { underflow_detected_ = true; @@ -241,33 +254,42 @@ class MockSubscriptionManager : public SubscriptionManagerInterface { } void unsubscribe_all() override { + std::lock_guard lock(mutex_); ref_counts_.clear(); } // Test helpers void add_known_topic(const std::string& topic_name) { + std::lock_guard lock(mutex_); known_topics_.insert(topic_name); } bool is_subscribed(const std::string& topic_name) const { + std::lock_guard lock(mutex_); auto it = ref_counts_.find(topic_name); return it != ref_counts_.end() && it->second > 0; } int ref_count(const std::string& topic_name) const { + std::lock_guard lock(mutex_); auto it = ref_counts_.find(topic_name); return (it != ref_counts_.end()) ? it->second : 0; } bool has_underflow() const { + std::lock_guard lock(mutex_); return underflow_detected_; } void reset_underflow() { + std::lock_guard lock(mutex_); underflow_detected_ = false; } private: + // Locked like the real backend managers so concurrency tests can drive + // BridgeServer from multiple threads without racing inside the mock. + mutable std::mutex mutex_; MessageCallback callback_; std::unordered_set known_topics_; std::unordered_map ref_counts_; @@ -1635,3 +1657,349 @@ TEST_F(BridgeServerTest, InitializeRejectsNegativePublishRate) { std::make_unique(mock_topic_source_, mock_sub_manager_, mock_, 19999, 10.0, -1.0); EXPECT_FALSE(neg_rate_server->initialize()); } + +// --------------------------------------------------------------------------- +// UnsubscribeWhilePausedDoesNotDoubleDecrement +// +// A paused client's subscription refs were already released by pause. +// Unsubscribing while paused must NOT decrement again, or a topic shared +// with another client would be destroyed under it. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, UnsubscribeWhilePausedDoesNotDoubleDecrement) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/shared", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/shared"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({"/shared"}); + + // Client A and client B both subscribe to /shared (ref = 2) + mock_->push_request("client_a", sub.dump()); + server_->process_requests(); + mock_->push_request("client_b", sub.dump()); + server_->process_requests(); + ASSERT_EQ(mock_sub_manager_->ref_count("/shared"), 2); + + // Client A pauses (releases its ref, ref = 1) + json pause_req; + pause_req["command"] = "pause"; + mock_->push_request("client_a", pause_req.dump()); + server_->process_requests(); + ASSERT_EQ(mock_sub_manager_->ref_count("/shared"), 1); + + // Client A unsubscribes while paused — its ref is already released, + // so the count must stay at 1 (client B's ref). + json unsub; + unsub["command"] = "unsubscribe"; + unsub["topics"] = json::array({"/shared"}); + mock_->push_request("client_a", unsub.dump()); + server_->process_requests(); + + EXPECT_EQ(mock_sub_manager_->ref_count("/shared"), 1); + EXPECT_TRUE(mock_sub_manager_->is_subscribed("/shared")); + EXPECT_FALSE(mock_sub_manager_->has_underflow()); + + // Client A resuming afterwards must not re-acquire the unsubscribed topic. + json resume_req; + resume_req["command"] = "resume"; + mock_->push_request("client_a", resume_req.dump()); + server_->process_requests(); + EXPECT_EQ(mock_sub_manager_->ref_count("/shared"), 1); +} + +// --------------------------------------------------------------------------- +// SubscribeWhilePausedDefersRefUntilResume +// +// A paused client holds zero subscription refs by invariant. Subscribing +// while paused must record the topic in the session but only acquire the +// middleware ref on resume — otherwise resume double-acquires and the ref +// leaks forever after disconnect. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, SubscribeWhilePausedDefersRefUntilResume) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/x", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/x"); + + // Pause first (creates the session) + json pause_req; + pause_req["command"] = "pause"; + mock_->push_request("client_p", pause_req.dump()); + server_->process_requests(); + + // Subscribe while paused: session records the topic, no ref acquired yet + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({"/x"}); + mock_->push_request("client_p", sub.dump()); + server_->process_requests(); + EXPECT_EQ(mock_sub_manager_->ref_count("/x"), 0); + + // Resume: exactly one ref acquired + json resume_req; + resume_req["command"] = "resume"; + mock_->push_request("client_p", resume_req.dump()); + server_->process_requests(); + EXPECT_EQ(mock_sub_manager_->ref_count("/x"), 1); + + // Disconnect: the ref is fully released, nothing leaks + mock_->simulate_disconnect("client_p"); + EXPECT_EQ(mock_sub_manager_->ref_count("/x"), 0); + EXPECT_FALSE(mock_sub_manager_->has_underflow()); +} + +// --------------------------------------------------------------------------- +// PauseVsDisconnectRaceKeepsRefCountsConsistent +// +// cleanup_session runs on the middleware (WebSocket) thread while pause +// runs on the event-loop thread. If pause's state flip and ref release are +// not atomic with respect to cleanup, both paths release the same refs and +// the count underflows (destroying subscriptions other clients still use). +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, PauseVsDisconnectRaceKeepsRefCountsConsistent) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/r", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/r"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({"/r"}); + json pause_req; + pause_req["command"] = "pause"; + + constexpr int kIterations = 2000; + for (int i = 0; i < kIterations; i++) { + const std::string client = "racer_" + std::to_string(i); + + mock_->push_request(client, sub.dump()); + server_->process_requests(); + ASSERT_EQ(mock_sub_manager_->ref_count("/r"), 1); + + // Pause on the "event loop" thread, disconnect on the "middleware" thread + mock_->push_request(client, pause_req.dump()); + std::thread event_loop([&]() { server_->process_requests(); }); + std::thread ws_thread([&]() { mock_->simulate_disconnect(client); }); + event_loop.join(); + ws_thread.join(); + + ASSERT_FALSE(mock_sub_manager_->has_underflow()) << "double ref release on iteration " << i; + ASSERT_EQ(mock_sub_manager_->ref_count("/r"), 0) << "leaked ref on iteration " << i; + + // A pause that lost the race may have recreated an empty session; drop it + // so the next iteration starts clean. + mock_->simulate_disconnect(client); + } +} + +// --------------------------------------------------------------------------- +// SubscribeClampsExcessiveRateLimit +// +// max_rate_hz is client-controlled. Values whose milli-hertz representation +// exceeds int range would be undefined behavior in the rate-limit math, so +// the server must clamp to a sane maximum (1e6 Hz) at parse time. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, SubscribeClampsExcessiveRateLimit) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/t", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/t"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({{{"name", "/t"}, {"max_rate_hz", 1e15}}}); + mock_->push_request("client_clamp_hi", sub.dump()); + server_->process_requests(); + + json response = mock_->pop_reply("client_clamp_hi"); + ASSERT_FALSE(response.is_discarded()); + ASSERT_TRUE(response.contains("rate_limits")); + ASSERT_TRUE(response["rate_limits"].contains("/t")); + EXPECT_LE(response["rate_limits"]["/t"].get(), 1e6); +} + +// --------------------------------------------------------------------------- +// SubscribeClampsTinyRateLimitUp +// +// Rates in (0, 0.001) truncate to zero milli-hertz, which the publish path +// treats as "unlimited" — the opposite of the client's intent. Such rates +// must be clamped UP to the smallest representable limit (0.001 Hz). +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, SubscribeClampsTinyRateLimitUp) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/t", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/t"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({{{"name", "/t"}, {"max_rate_hz", 0.0005}}}); + mock_->push_request("client_clamp_lo", sub.dump()); + server_->process_requests(); + + json response = mock_->pop_reply("client_clamp_lo"); + ASSERT_FALSE(response.is_discarded()); + ASSERT_TRUE(response.contains("rate_limits")); + ASSERT_TRUE(response["rate_limits"].contains("/t")); + EXPECT_GE(response["rate_limits"]["/t"].get(), 0.001); +} + +// --------------------------------------------------------------------------- +// ResubscribeWithBareStringPreservesRateLimit +// +// A bare-string topic entry means "no rate specified", not "unlimited". +// Re-asserting a subscription with a plain string (a natural client pattern) +// must not silently wipe a previously configured rate limit. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, ResubscribeWithBareStringPreservesRateLimit) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/scan", "sensor_msgs/msg/LaserScan"}}); + mock_sub_manager_->add_known_topic("/scan"); + + json sub_with_rate; + sub_with_rate["command"] = "subscribe"; + sub_with_rate["topics"] = json::array({{{"name", "/scan"}, {"max_rate_hz", 10.0}}}); + mock_->push_request("client_rate_keep", sub_with_rate.dump()); + server_->process_requests(); + mock_->pop_reply("client_rate_keep"); + + json sub_bare; + sub_bare["command"] = "subscribe"; + sub_bare["topics"] = json::array({"/scan"}); + mock_->push_request("client_rate_keep", sub_bare.dump()); + server_->process_requests(); + + json response = mock_->pop_reply("client_rate_keep"); + ASSERT_FALSE(response.is_discarded()); + ASSERT_TRUE(response.contains("rate_limits")) << "bare-string re-subscribe wiped the rate limit"; + ASSERT_TRUE(response["rate_limits"].contains("/scan")); + EXPECT_DOUBLE_EQ(response["rate_limits"]["/scan"].get(), 10.0); +} + +// --------------------------------------------------------------------------- +// ResubscribeWithExplicitZeroClearsRateLimit +// +// Companion to the bare-string test: an explicit max_rate_hz of 0 is the +// documented way to remove a rate limit, and must keep working. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, ResubscribeWithExplicitZeroClearsRateLimit) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/scan", "sensor_msgs/msg/LaserScan"}}); + mock_sub_manager_->add_known_topic("/scan"); + + json sub_with_rate; + sub_with_rate["command"] = "subscribe"; + sub_with_rate["topics"] = json::array({{{"name", "/scan"}, {"max_rate_hz", 10.0}}}); + mock_->push_request("client_rate_clear", sub_with_rate.dump()); + server_->process_requests(); + mock_->pop_reply("client_rate_clear"); + + json sub_zero; + sub_zero["command"] = "subscribe"; + sub_zero["topics"] = json::array({{{"name", "/scan"}, {"max_rate_hz", 0.0}}}); + mock_->push_request("client_rate_clear", sub_zero.dump()); + server_->process_requests(); + + json response = mock_->pop_reply("client_rate_clear"); + ASSERT_FALSE(response.is_discarded()); + EXPECT_FALSE(response.contains("rate_limits") && response["rate_limits"].contains("/scan")) + << "explicit max_rate_hz=0 must clear the limit"; +} + +// --------------------------------------------------------------------------- +// ResumePreservesTopicsMissingFromDiscovery +// +// docs/API.md: pause "preserves subscriptions and rate limits". A publisher +// that is briefly down while the client resumes must not cost the client its +// subscription: the topic stays in the session (without a middleware ref) +// and is re-acquired on a later resume once the publisher is back. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, ResumePreservesTopicsMissingFromDiscovery) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/gone", "std_msgs/msg/String"}}); + mock_sub_manager_->add_known_topic("/gone"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({"/gone"}); + mock_->push_request("client_flaky", sub.dump()); + server_->process_requests(); + ASSERT_EQ(mock_sub_manager_->ref_count("/gone"), 1); + + json pause_req; + pause_req["command"] = "pause"; + json resume_req; + resume_req["command"] = "resume"; + + // Pause, then the publisher restarts: topic vanishes from discovery + mock_->push_request("client_flaky", pause_req.dump()); + server_->process_requests(); + ASSERT_EQ(mock_sub_manager_->ref_count("/gone"), 0); + mock_topic_source_->remove_topic("/gone"); + + // Resume while the topic is missing: no ref, no underflow, but the + // subscription must survive in the session + mock_->push_request("client_flaky", resume_req.dump()); + server_->process_requests(); + EXPECT_EQ(mock_sub_manager_->ref_count("/gone"), 0); + EXPECT_FALSE(mock_sub_manager_->has_underflow()); + + // Publisher comes back; a later pause/resume cycle re-acquires the topic + mock_topic_source_->set_topics({{"/gone", "std_msgs/msg/String"}}); + mock_->push_request("client_flaky", pause_req.dump()); + server_->process_requests(); + mock_->push_request("client_flaky", resume_req.dump()); + server_->process_requests(); + EXPECT_EQ(mock_sub_manager_->ref_count("/gone"), 1) << "subscription was dropped while the publisher was away"; + + // Disconnect releases exactly the refs that are held + mock_->simulate_disconnect("client_flaky"); + EXPECT_EQ(mock_sub_manager_->ref_count("/gone"), 0); + EXPECT_FALSE(mock_sub_manager_->has_underflow()); +} + +// --------------------------------------------------------------------------- +// ProcessRequestsDrainsAllPendingRequests +// +// process_requests() is driven by a 10 ms timer, so handling a single +// request per invocation caps command throughput at ~100 req/s and lets a +// burst (or one chatty client) starve other clients' heartbeats into false +// session timeouts. One invocation must drain the pending queue. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, ProcessRequestsDrainsAllPendingRequests) { + ASSERT_TRUE(server_->initialize()); + + json hb; + hb["command"] = "heartbeat"; + for (int i = 0; i < 5; i++) { + mock_->push_request("drain_client_" + std::to_string(i), hb.dump()); + } + + server_->process_requests(); + + EXPECT_EQ(server_->get_active_session_count(), 5u) << "one process_requests call must handle all queued requests"; +} + +// --------------------------------------------------------------------------- +// SubscribeAcceptsEmptySchemaDefinition +// +// Some types have legitimately empty definitions (std_msgs/msg/Empty is a +// 0-byte .msg). The topic source signals extraction FAILURE by throwing; +// an empty string is a valid schema and must not fail the subscription. +// --------------------------------------------------------------------------- +TEST_F(BridgeServerTest, SubscribeAcceptsEmptySchemaDefinition) { + ASSERT_TRUE(server_->initialize()); + mock_topic_source_->set_topics({{"/trigger", "std_msgs/msg/Empty"}}); + mock_topic_source_->set_empty_schema_topic("/trigger"); + mock_sub_manager_->add_known_topic("/trigger"); + + json sub; + sub["command"] = "subscribe"; + sub["topics"] = json::array({"/trigger"}); + mock_->push_request("client_empty_schema", sub.dump()); + server_->process_requests(); + + json response = mock_->pop_reply("client_empty_schema"); + ASSERT_FALSE(response.is_discarded()); + EXPECT_EQ(response["status"], "success") << response.dump(); + ASSERT_TRUE(response["schemas"].contains("/trigger")); + EXPECT_EQ(response["schemas"]["/trigger"]["definition"], ""); + EXPECT_EQ(mock_sub_manager_->ref_count("/trigger"), 1); +} diff --git a/tests/unit/test_generic_subscription_manager.cpp b/tests/unit/test_generic_subscription_manager.cpp index 78c04c7..46c40e6 100644 --- a/tests/unit/test_generic_subscription_manager.cpp +++ b/tests/unit/test_generic_subscription_manager.cpp @@ -19,7 +19,11 @@ #include +#include +#include #include +#include +#include #include "pj_bridge_ros2/generic_subscription_manager.hpp" @@ -211,3 +215,35 @@ TEST_F(GenericSubscriptionManagerTest, MultipleTopicsIndependent) { EXPECT_FALSE(manager_->is_subscribed("/topic1")); EXPECT_TRUE(manager_->is_subscribed("/topic2")); } + +// --------------------------------------------------------------------------- +// QoS adaptation +// +// Sensor publishers (cameras, lidars, IMUs) typically offer BEST_EFFORT +// reliability. A RELIABLE subscription is QoS-incompatible with them in +// ROS2 — it matches nothing and silently receives zero messages — so the +// manager must adapt its subscription QoS to what publishers actually offer. +// --------------------------------------------------------------------------- +TEST_F(GenericSubscriptionManagerTest, AdaptsQosToBestEffortPublisher) { + auto publisher = node_->create_publisher("/qos_be_topic", rclcpp::SensorDataQoS()); + + std::atomic received{0}; + auto callback = [&received](const std::string&, const std::shared_ptr&, uint64_t) { + received++; + }; + + ASSERT_TRUE(manager_->subscribe("/qos_be_topic", "sensor_msgs/msg/Imu", callback)); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node_); + + sensor_msgs::msg::Imu msg; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + while (received.load() == 0 && std::chrono::steady_clock::now() < deadline) { + publisher->publish(msg); + executor.spin_some(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + EXPECT_GT(received.load(), 0) << "RELIABLE subscription never matches a BEST_EFFORT publisher"; +} diff --git a/tests/unit/test_message_buffer.cpp b/tests/unit/test_message_buffer.cpp index c5b5015..31ab4a4 100644 --- a/tests/unit/test_message_buffer.cpp +++ b/tests/unit/test_message_buffer.cpp @@ -353,3 +353,47 @@ TEST_F(MessageBufferTest, MoveMessagesOverwritesExistingOutput) { // Buffer should be empty after move EXPECT_EQ(buffer_.size(), 0); } + +// --------------------------------------------------------------------------- +// BackwardsClockStepDoesNotPurgeFreshMessages +// +// TTL cleanup uses the wall clock, which is not monotonic: an NTP step can +// move it backwards between add_message() calls. Fresh messages whose +// received_at_ns is ahead of the stepped-back clock must NOT be evicted +// (unsigned subtraction would wrap to a huge age and purge everything). +// --------------------------------------------------------------------------- +TEST_F(MessageBufferTest, BackwardsClockStepDoesNotPurgeFreshMessages) { + uint64_t fake_now = 10'000'000'000ULL; // t = 10 s + MessageBuffer buffer(MessageBuffer::kDefaultMaxMessageAgeNs, [&fake_now]() { return fake_now; }); + + buffer.add_message("topic1", 1, create_test_data({1})); + ASSERT_EQ(buffer.size(), 1); + + // NTP steps the clock back 500 ms — well within the 1 s TTL + fake_now -= 500'000'000ULL; + buffer.add_message("topic1", 2, create_test_data({2})); + + // Both messages are fresh; neither may be purged + EXPECT_EQ(buffer.size(), 2); +} + +// --------------------------------------------------------------------------- +// ExpiredMessagesStillPurgedWithInjectedClock +// +// Companion to the backwards-step test: normal forward aging must still +// evict messages older than the TTL. +// --------------------------------------------------------------------------- +TEST_F(MessageBufferTest, ExpiredMessagesStillPurgedWithInjectedClock) { + uint64_t fake_now = 10'000'000'000ULL; + MessageBuffer buffer(MessageBuffer::kDefaultMaxMessageAgeNs, [&fake_now]() { return fake_now; }); + + buffer.add_message("topic1", 1, create_test_data({1})); + ASSERT_EQ(buffer.size(), 1); + + // Advance past the 1 s TTL + fake_now += 1'500'000'000ULL; + buffer.add_message("topic1", 2, create_test_data({2})); + + // The first message is stale and must be gone; the new one remains + EXPECT_EQ(buffer.size(), 1); +} diff --git a/tests/unit/test_schema_extractor.cpp b/tests/unit/test_schema_extractor.cpp index 461e320..24ef7d6 100644 --- a/tests/unit/test_schema_extractor.cpp +++ b/tests/unit/test_schema_extractor.cpp @@ -19,6 +19,8 @@ #include +#include +#include #include #include @@ -258,3 +260,79 @@ TEST_F(SchemaExtractorTest, ImuSchemaContainsExpectedFields) { EXPECT_NE(actual_no_comments.find("MSG: std_msgs/Header"), std::string::npos); EXPECT_NE(actual_no_comments.find("MSG: builtin_interfaces/Time"), std::string::npos); } + +// --------------------------------------------------------------------------- +// Bounded-string handling +// +// "string<=256 name" is a bounded string — a builtin type with a bound +// suffix. It must not be mistaken for a nested message type (which makes the +// whole schema extraction fail and returns ""). Uses a fixture package +// registered via AMENT_PREFIX_PATH. +// --------------------------------------------------------------------------- +class SchemaExtractorFixturePackageTest : public ::testing::Test { + protected: + void SetUp() override { + const char* old_path = getenv("AMENT_PREFIX_PATH"); + old_ament_prefix_path_ = old_path ? old_path : ""; + + fixture_prefix_ = std::filesystem::temp_directory_path() / "pj_bridge_schema_fixture"; + std::filesystem::remove_all(fixture_prefix_); + + // Minimal ament index entry: marker file + share//msg/.msg + std::filesystem::create_directories(fixture_prefix_ / "share/ament_index/resource_index/packages"); + std::ofstream(fixture_prefix_ / "share/ament_index/resource_index/packages/pj_fixture_msgs").close(); + std::filesystem::create_directories(fixture_prefix_ / "share/pj_fixture_msgs/msg"); + std::ofstream msg(fixture_prefix_ / "share/pj_fixture_msgs/msg/BoundedField.msg"); + msg << "string<=256 name\n"; + msg << "string<=10[<=5] tags\n"; + msg << "uint32 id\n"; + msg.close(); + + std::string new_path = fixture_prefix_.string(); + if (!old_ament_prefix_path_.empty()) { + new_path += ":" + old_ament_prefix_path_; + } + setenv("AMENT_PREFIX_PATH", new_path.c_str(), 1); + + extractor_ = std::make_unique(); + } + + void TearDown() override { + setenv("AMENT_PREFIX_PATH", old_ament_prefix_path_.c_str(), 1); + std::filesystem::remove_all(fixture_prefix_); + extractor_.reset(); + } + + std::filesystem::path fixture_prefix_; + std::string old_ament_prefix_path_; + std::unique_ptr extractor_; +}; + +TEST_F(SchemaExtractorFixturePackageTest, BoundedStringFieldDoesNotEmptySchema) { + std::string schema = extractor_->get_message_definition("pj_fixture_msgs/msg/BoundedField"); + + ASSERT_FALSE(schema.empty()) << "bounded string field made the whole schema extraction fail"; + EXPECT_NE(schema.find("string<=256 name"), std::string::npos); + EXPECT_NE(schema.find("uint32 id"), std::string::npos); + // The bounded string must NOT have been expanded as a nested type + EXPECT_EQ(schema.find("MSG: pj_fixture_msgs/string<=256"), std::string::npos); +} + +// --------------------------------------------------------------------------- +// Empty definition vs extraction failure +// +// std_msgs/msg/Empty has a legitimately empty definition (0-byte .msg file). +// The extractor must distinguish that from a failed extraction so that +// callers can accept the topic instead of rejecting it as an error. +// --------------------------------------------------------------------------- +TEST_F(SchemaExtractorTest, EmptyMessageDefinitionDistinguishedFromFailure) { + auto ok = extractor_->try_get_message_definition("std_msgs/msg/Empty"); + ASSERT_TRUE(ok.has_value()) << "std_msgs/msg/Empty is a valid type: " << ok.error(); + EXPECT_TRUE(ok->empty()); + + auto bad = extractor_->try_get_message_definition("definitely_not_a_pkg/msg/Nope"); + EXPECT_FALSE(bad.has_value()); + + auto malformed = extractor_->try_get_message_definition("not-a-type"); + EXPECT_FALSE(malformed.has_value()); +} From eec7fca02c6f9de6ebacd52e894fdfa593418251 Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 2 Jul 2026 13:17:17 +0200 Subject: [PATCH 2/2] Make large-message stripping opt-in (default: full data forwarded) Stripping uint8[] data from Image/PointCloud2/LaserScan/OccupancyGrid messages was previously on by default, so clients received metadata-only messages unless the operator knew to disable it. Flip the default: messages are forwarded intact, and stripping is enabled explicitly with strip_large_messages:=true for low-bandwidth deployments. - ros2 main: strip_large_messages parameter default true -> false - Ros2SubscriptionManager: constructor default flipped to match - New test suite covering both defaults-include-data and opt-in-strips - README/CLAUDE.md updated to document the opt-in semantics Co-Authored-By: Claude Fable 5 --- CLAUDE.md | 6 +- CMakeLists.txt | 1 + README.md | 4 +- .../ros2_subscription_manager.hpp | 2 +- ros2/src/main.cpp | 2 +- tests/unit/test_ros2_subscription_manager.cpp | 106 ++++++++++++++++++ 6 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 tests/unit/test_ros2_subscription_manager.cpp diff --git a/CLAUDE.md b/CLAUDE.md index c7285c9..b741737 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -197,7 +197,7 @@ Then, for each message in the (compressed) payload: ## Testing -### Test Count: 174 unit tests across 10 test suites +### Test Count: 176 unit tests across 11 test suites ### Commands ```bash @@ -232,7 +232,7 @@ pre-commit run -a port: 9090 # WebSocket port publish_rate: 50.0 # Hz session_timeout: 10.0 # seconds -strip_large_messages: true # Strip Image/PointCloud2/etc data fields +strip_large_messages: false # Opt-in: strip Image/PointCloud2/etc data fields ``` ### RTI (via CLI flags): @@ -263,5 +263,5 @@ pj_bridge_fastdds --domains 0 1 --port 9090 --publish-rate 50 --session-timeout **Last Updated**: 2026-07-02 **Project Phase**: Unified multi-backend architecture -**Test Status**: 174 unit tests passing (all sanitizers clean) +**Test Status**: 176 unit tests passing (all sanitizers clean) **Executables**: `pj_bridge_ros2` (ROS2), `pj_bridge_rti` (RTI DDS, disabled), `pj_bridge_fastdds` (FastDDS) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7a207ef..1fc0143 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -249,6 +249,7 @@ if(BUILD_TESTING AND ament_cmake_FOUND) tests/unit/test_topic_discovery.cpp tests/unit/test_schema_extractor.cpp tests/unit/test_generic_subscription_manager.cpp + tests/unit/test_ros2_subscription_manager.cpp tests/unit/test_message_stripper.cpp ) diff --git a/README.md b/README.md index f7dfd17..752f8a6 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ independently. - **High Performance**: 50 Hz message aggregation with ZSTD compression. The original message timestamp is preserved, and less bandwidth is used. - **Multi-Client Support**: Multiple clients can connect simultaneously with shared subscriptions - **Runtime Schema Discovery**: Automatic extraction of message schemas from installed ROS2 packages on the server side. -- **Large Message Stripping**: Automatic stripping of large array fields (Image, PointCloud2, LaserScan, OccupancyGrid) to reduce bandwidth while preserving metadata +- **Large Message Stripping** (opt-in): Optional stripping of large array fields (Image, PointCloud2, LaserScan, OccupancyGrid) to reduce bandwidth while preserving metadata. Disabled by default — full message data is forwarded; enable with `strip_large_messages:=true` for low-bandwidth links ## CI Status @@ -38,7 +38,7 @@ independently. | `port` | int | 9090 | WebSocket server port | | `publish_rate` | double | 50.0 | Aggregation publish rate in Hz | | `session_timeout` | double | 10.0 | Client timeout duration in seconds | -| `strip_large_messages` | bool | true | Strip large arrays from Image, PointCloud2, LaserScan, OccupancyGrid messages | +| `strip_large_messages` | bool | false | Opt-in: strip large arrays from Image, PointCloud2, LaserScan, OccupancyGrid messages | ## Just "Download and Run" diff --git a/ros2/include/pj_bridge_ros2/ros2_subscription_manager.hpp b/ros2/include/pj_bridge_ros2/ros2_subscription_manager.hpp index 510cbcf..d3f6f95 100644 --- a/ros2/include/pj_bridge_ros2/ros2_subscription_manager.hpp +++ b/ros2/include/pj_bridge_ros2/ros2_subscription_manager.hpp @@ -37,7 +37,7 @@ namespace pj_bridge { */ class Ros2SubscriptionManager : public SubscriptionManagerInterface { public: - explicit Ros2SubscriptionManager(rclcpp::Node::SharedPtr node, bool strip_large_messages = true); + explicit Ros2SubscriptionManager(rclcpp::Node::SharedPtr node, bool strip_large_messages = false); Ros2SubscriptionManager(const Ros2SubscriptionManager&) = delete; Ros2SubscriptionManager& operator=(const Ros2SubscriptionManager&) = delete; diff --git a/ros2/src/main.cpp b/ros2/src/main.cpp index 6cc8607..53900db 100644 --- a/ros2/src/main.cpp +++ b/ros2/src/main.cpp @@ -39,7 +39,7 @@ int main(int argc, char** argv) { node->declare_parameter("port", 9090); node->declare_parameter("publish_rate", 50.0); node->declare_parameter("session_timeout", 10.0); - node->declare_parameter("strip_large_messages", true); + node->declare_parameter("strip_large_messages", false); int port = node->get_parameter("port").as_int(); double publish_rate = node->get_parameter("publish_rate").as_double(); diff --git a/tests/unit/test_ros2_subscription_manager.cpp b/tests/unit/test_ros2_subscription_manager.cpp new file mode 100644 index 0000000..ace2f2d --- /dev/null +++ b/tests/unit/test_ros2_subscription_manager.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2026 Davide Faconti + * + * This file is part of pj_bridge. + * + * pj_bridge is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * pj_bridge is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with pj_bridge. If not, see . + */ + +#include + +#include +#include + +#include +#include +#include + +#include "pj_bridge_ros2/ros2_subscription_manager.hpp" + +using namespace pj_bridge; + +namespace { + +// Publish an Image with a large data payload through the manager and return +// the size of the serialized message delivered to the bridge callback. +size_t roundtrip_image_bytes(bool strip_large_messages, bool use_default_config) { + auto node = std::make_shared( + use_default_config ? "test_strip_default" : (strip_large_messages ? "test_strip_on" : "test_strip_off")); + auto manager = use_default_config ? std::make_shared(node) + : std::make_shared(node, strip_large_messages); + + std::atomic received_size{0}; + manager->set_message_callback( + [&received_size](const std::string&, std::shared_ptr> data, uint64_t) { + received_size = data->size(); + }); + + const std::string topic = "/strip_test_image_" + std::string(node->get_name()); + auto publisher = node->create_publisher(topic, rclcpp::QoS(10)); + EXPECT_TRUE(manager->subscribe(topic, "sensor_msgs/msg/Image")); + + sensor_msgs::msg::Image img; + img.width = 100; + img.height = 100; + img.step = 300; + img.encoding = "rgb8"; + img.data.assign(static_cast(img.height) * img.step, 0xAB); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + while (received_size.load() == 0 && std::chrono::steady_clock::now() < deadline) { + publisher->publish(img); + executor.spin_some(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + manager->unsubscribe_all(); + return received_size.load(); +} + +constexpr size_t kImagePayloadBytes = 100 * 300; + +} // namespace + +class Ros2SubscriptionManagerTest : public ::testing::Test { + protected: + void SetUp() override { + rclcpp::init(0, nullptr); + } + + void TearDown() override { + rclcpp::shutdown(); + } +}; + +// --------------------------------------------------------------------------- +// Stripping is opt-in: by default, large data fields are forwarded intact. +// --------------------------------------------------------------------------- +TEST_F(Ros2SubscriptionManagerTest, DataFieldsIncludedByDefault) { + size_t received = roundtrip_image_bytes(false, /*use_default_config=*/true); + ASSERT_GT(received, 0u) << "no message received"; + EXPECT_GE(received, kImagePayloadBytes) << "image data was stripped despite default (opt-in) configuration"; +} + +// --------------------------------------------------------------------------- +// Opting in still strips: with strip_large_messages=true the payload is +// removed and only the metadata remains. +// --------------------------------------------------------------------------- +TEST_F(Ros2SubscriptionManagerTest, OptInStrippingRemovesData) { + size_t received = roundtrip_image_bytes(/*strip_large_messages=*/true, /*use_default_config=*/false); + ASSERT_GT(received, 0u) << "no message received"; + EXPECT_LT(received, kImagePayloadBytes) << "opt-in stripping did not remove the data payload"; +}