Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ CMakeUserPresets.json
/build/*
/install/*
/log/*

# Sanitizer build/install dirs (in-repo colcon builds)
build_tsan/
install_tsan/
build_asan/
install_asan/
log/
19 changes: 14 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,17 @@ BridgeServer does NOT own timers. The entry point (`main.cpp`) drives the event

### Message Serialization Format

Each binary frame starts with a fixed 16-byte header, followed by the
ZSTD-compressed message stream (see docs/API.md for the full layout):

```
For each message (streamed, no header):
Frame header (16 bytes, uncompressed):
- Magic "PJRB" (uint32_t LE, 0x42524A50)
- Message count (uint32_t LE)
- Uncompressed payload size (uint32_t LE)
- Flags (uint32_t LE)

Then, for each message in the (compressed) payload:
- Topic name length (uint16_t LE)
- Topic name (N bytes UTF-8)
- Timestamp (uint64_t ns since epoch, LE)
Expand Down Expand Up @@ -188,7 +197,7 @@ For each message (streamed, no header):

## Testing

### Test Count: 154 unit tests across 10 test suites
### Test Count: 176 unit tests across 11 test suites

### Commands
```bash
Expand Down Expand Up @@ -223,7 +232,7 @@ pre-commit run -a
port: 9090 # WebSocket port
publish_rate: 50.0 # Hz
session_timeout: 10.0 # seconds
strip_large_messages: true # Strip Image/PointCloud2/etc data fields
strip_large_messages: false # Opt-in: strip Image/PointCloud2/etc data fields
```

### RTI (via CLI flags):
Expand Down Expand Up @@ -252,7 +261,7 @@ pj_bridge_fastdds --domains 0 1 --port 9090 --publish-rate 50 --session-timeout

---

**Last Updated**: 2026-02-26
**Last Updated**: 2026-07-02
**Project Phase**: Unified multi-backend architecture
**Test Status**: 154 unit tests passing (all sanitizers clean)
**Test Status**: 176 unit tests passing (all sanitizers clean)
**Executables**: `pj_bridge_ros2` (ROS2), `pj_bridge_rti` (RTI DDS, disabled), `pj_bridge_fastdds` (FastDDS)
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ if(BUILD_TESTING AND ament_cmake_FOUND)
tests/unit/test_topic_discovery.cpp
tests/unit/test_schema_extractor.cpp
tests/unit/test_generic_subscription_manager.cpp
tests/unit/test_ros2_subscription_manager.cpp
tests/unit/test_message_stripper.cpp
)

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ independently.
- **High Performance**: 50 Hz message aggregation with ZSTD compression. The original message timestamp is preserved, and less bandwidth is used.
- **Multi-Client Support**: Multiple clients can connect simultaneously with shared subscriptions
- **Runtime Schema Discovery**: Automatic extraction of message schemas from installed ROS2 packages on the server side.
- **Large Message Stripping**: Automatic stripping of large array fields (Image, PointCloud2, LaserScan, OccupancyGrid) to reduce bandwidth while preserving metadata
- **Large Message Stripping** (opt-in): Optional stripping of large array fields (Image, PointCloud2, LaserScan, OccupancyGrid) to reduce bandwidth while preserving metadata. Disabled by default — full message data is forwarded; enable with `strip_large_messages:=true` for low-bandwidth links

## CI Status

Expand All @@ -38,7 +38,7 @@ independently.
| `port` | int | 9090 | WebSocket server port |
| `publish_rate` | double | 50.0 | Aggregation publish rate in Hz |
| `session_timeout` | double | 10.0 | Client timeout duration in seconds |
| `strip_large_messages` | bool | true | Strip large arrays from Image, PointCloud2, LaserScan, OccupancyGrid messages |
| `strip_large_messages` | bool | false | Opt-in: strip large arrays from Image, PointCloud2, LaserScan, OccupancyGrid messages |

## Just "Download and Run"

Expand Down
5 changes: 3 additions & 2 deletions app/include/pj_bridge/bridge_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class BridgeServer {
/**
* @brief Process incoming API requests
*
* Non-blocking call that checks for pending requests,
* Non-blocking call that drains pending requests (bounded per call),
* processes them, and sends responses.
*
* @return true if a request was processed, false if no requests pending
* @return true if at least one request was processed, false if none pending
*/
bool process_requests();

Expand Down Expand Up @@ -123,6 +123,7 @@ class BridgeServer {
StatsSnapshot snapshot_and_reset_stats();

private:
void process_single_request(const std::vector<uint8_t>& request_data, const std::string& client_id);
std::string handle_get_topics(const std::string& client_id, const nlohmann::json& request);
std::string handle_subscribe(const std::string& client_id, const nlohmann::json& request);
std::string handle_unsubscribe(const std::string& client_id, const nlohmann::json& request);
Expand Down
9 changes: 8 additions & 1 deletion app/include/pj_bridge/message_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono>
#include <cstddef>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -52,8 +53,12 @@ class MessageBuffer {
public:
static constexpr uint64_t kDefaultMaxMessageAgeNs = 1'000'000'000; ///< 1 second

/// Clock source returning nanoseconds since epoch; injectable for testing.
using ClockFn = std::function<uint64_t()>;

/// @param max_message_age_ns TTL for buffered messages (default 1 second).
explicit MessageBuffer(uint64_t max_message_age_ns = kDefaultMaxMessageAgeNs);
/// @param clock_fn Clock override for tests (default: wall clock).
explicit MessageBuffer(uint64_t max_message_age_ns = kDefaultMaxMessageAgeNs, ClockFn clock_fn = {});

/// Add a message to the buffer for the given topic.
/// Triggers cleanup of stale messages before inserting.
Expand All @@ -77,7 +82,9 @@ class MessageBuffer {
mutable std::mutex mutex_;
std::unordered_map<std::string, std::deque<BufferedMessage>> topic_buffers_;
uint64_t max_message_age_ns_;
ClockFn clock_fn_;

uint64_t now_ns() const;
void cleanup_old_messages();
};

Expand Down
5 changes: 2 additions & 3 deletions app/include/pj_bridge/middleware/websocket_middleware.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <ixwebsocket/IXWebSocketServer.h>

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
Expand Down Expand Up @@ -65,7 +64,6 @@ class WebSocketMiddleware : public MiddlewareInterface {

std::queue<IncomingRequest> incoming_queue_;
mutable std::mutex queue_mutex_;
std::condition_variable queue_cv_;

std::unordered_map<std::string, std::shared_ptr<ix::WebSocket>> clients_;
mutable std::mutex clients_mutex_;
Expand All @@ -75,8 +73,9 @@ class WebSocketMiddleware : public MiddlewareInterface {

mutable std::mutex state_mutex_;
bool initialized_;
/// Stop thread that exceeded the shutdown timeout; joined in the destructor.
std::thread pending_stop_thread_;

static constexpr int kReceiveTimeoutMs = 10;
static constexpr int kShutdownTimeoutSeconds = 3;
static constexpr size_t kMaxIncomingQueueSize = 1024;
};
Expand Down
8 changes: 8 additions & 0 deletions app/include/pj_bridge/session_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace pj_bridge {

struct Session {
std::string client_id;
std::unordered_map<std::string, double> subscribed_topics;
/// Topics for which this session currently holds a middleware subscription
/// ref. Invariant: a ref is held iff the topic is in this set — pause
/// releases all refs, resume re-acquires them, and cleanup releases exactly
/// the held ones (never relying on the paused flag).
std::unordered_set<std::string> ref_held_topics;
std::chrono::steady_clock::time_point last_heartbeat;
std::chrono::steady_clock::time_point created_at;
bool paused{false};
Expand All @@ -53,6 +59,8 @@ class SessionManager {
bool session_exists(const std::string& client_id) const;
bool set_paused(const std::string& client_id, bool paused);
bool is_paused(const std::string& client_id) const;
bool set_ref_held(const std::string& client_id, const std::string& topic, bool held);
std::unordered_set<std::string> get_ref_held_topics(const std::string& client_id) const;

private:
std::unordered_map<std::string, Session> sessions_;
Expand Down
5 changes: 3 additions & 2 deletions app/include/pj_bridge/topic_source_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class TopicSourceInterface {
/// Return the full schema definition for a topic.
/// @param topic_name fully-qualified topic name
/// @return the schema text (e.g. concatenated .msg definitions for ROS2,
/// OMG IDL for RTI). Empty string if the schema cannot be resolved.
/// @throws std::exception on unrecoverable schema extraction errors.
/// OMG IDL for RTI). An empty string is a VALID schema — some types
/// have legitimately empty definitions (e.g. std_msgs/msg/Empty).
/// @throws std::exception when the schema cannot be resolved.
virtual std::string get_schema(const std::string& topic_name) = 0;

/// Return the encoding identifier for schemas produced by this source.
Expand Down
Loading
Loading