Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion camerad/astrocam.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1941,7 +1941,7 @@ namespace AstroCam {
std::string timestring;
timespec timenow;
double mjd0, mjd1, mjd;
double airmass0=NAN, airmass1=NAN, airmass=NAN;
double airmass0, airmass1, airmass;

if ( !interface.in_readout() ) {
logwrite( function, "sending command to stop clocks!" );
Expand Down Expand Up @@ -2618,6 +2618,14 @@ namespace AstroCam {
interface.status.can_expose.store(true);
interface.publish_status();
logwrite( function, "ready for next exposure" );
// Republish periodically so a subscriber that missed the single-fire
// transition (e.g. ZMQ reconnect gap) recovers without manual intervention.
std::thread( [&interface]() {
for ( int i = 0; i < 5 && interface.status.can_expose.load(); ++i ) {
std::this_thread::sleep_for( std::chrono::seconds(2) );
if ( interface.status.can_expose.load() ) interface.publish_status(true);
}
}).detach();
}

return;
Expand Down
92 changes: 81 additions & 11 deletions common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const std::string JEOF = "EOF\n"; ///< used to terminate JSON messa
const std::string TELEMREQUEST = "sendtelem"; ///< common daemon command used to request telemetry
const std::string SNAPSHOT = "snapshot"; ///< common daemon command forces publish of telemetry

const std::string CID_PREFIX = "#cid:"; ///< correlation ID marker for inter-daemon commands
constexpr size_t CID_HEX_LEN = 8; ///< number of hex chars in a correlation ID
constexpr size_t CID_HEADER_LEN = 5 + CID_HEX_LEN + 1; ///< total prefix length: "#cid:" + 8 hex + 1 space

constexpr bool EXT = true; ///< constant for use_extension arg of Common::Header::add_key()
constexpr bool PRI = !EXT; ///< constant for use_extension arg of Common::Header::add_key()

Expand Down Expand Up @@ -66,7 +70,7 @@ namespace Common {
private:
zmqpp::context &_context;
zmqpp::socket _socket;
mutable std::mutex _publish_mtx;
zmqpp::poller _poller; ///< persistent poller — avoids per-call reconstruction
Mode _mode; ///< publisher or subscriber?
std::string _topic; ///< publisher topic
std::vector<std::string> _topics; ///< list of subscriber topics
Expand All @@ -78,17 +82,23 @@ namespace Common {
PubSub( zmqpp::context &context, Mode mode )
: _context(context),
_socket(context, (mode==Mode::PUB ? zmqpp::socket_type::publish : zmqpp::socket_type::subscribe)),
_mode(mode) { }
_mode(mode)
{
_socket.set( zmqpp::socket_option::linger, 0 );
_socket.set( zmqpp::socket_option::send_high_water_mark, 0 );
_socket.set( zmqpp::socket_option::receive_high_water_mark, 0 );
_poller.add( _socket, zmqpp::poller::poll_in );
}

~PubSub() { _socket.close(); }

/**
* @brief publishers bind to a socket endpoint (not for brokers)
* @brief poll for a waiting message
* @param[in] timeout_ms poll timeout in milliseconds (default 100, 0 = non-blocking)
* @return true if at least one message is ready to receive
*/
bool has_message() {
zmqpp::poller poller;
poller.add(_socket, zmqpp::poller::poll_in);
return ( poller.poll(100) > 0 );
bool has_message( int timeout_ms = 100 ) {
return ( _poller.poll(timeout_ms) > 0 );
}

/**
Expand Down Expand Up @@ -180,7 +190,6 @@ namespace Common {
if ( _mode != Mode::PUB ) {
throw std::runtime_error( "(Common::PubSub::publish) not a publisher" );
}
std::lock_guard<std::mutex> lock( _publish_mtx );
zmqpp::message message_zmq;
// Publish to either class default _topic or topic specified as
// optional arg.
Expand Down Expand Up @@ -242,6 +251,7 @@ namespace Common {
+" with default topic " +iface.publisher_topic );
iface.publisher = std::make_unique<Common::PubSub>( context, Common::PubSub::Mode::PUB );
iface.publisher->connect_to_broker( iface.publisher_address, iface.publisher_topic );
std::this_thread::sleep_for( std::chrono::milliseconds(100) ); // publisher slow-joiner settle
}
catch ( const zmqpp::zmq_internal_exception &e ) {
logwrite( function, "ERROR initializing message handler: "+std::string(e.what()) );
Expand Down Expand Up @@ -292,13 +302,17 @@ namespace Common {

BoolState thread_running( iface.is_subscriber_thread_running );

// listen for published messages and handle them
// listen for published messages and handle them.
// drain all pending messages per poll wake-up to avoid per-message 100ms stalls
// when several daemons publish simultaneously (e.g. after request_snapshot()).
//
while ( iface.should_subscriber_thread_run ) {
try {
if ( iface.subscriber->has_message() ) {
auto [topic,payload] = iface.subscriber->receive();
process_incoming_message(iface, topic, payload);
do {
auto [topic,payload] = iface.subscriber->receive();
process_incoming_message(iface, topic, payload);
} while ( iface.subscriber->has_message(0) );
}
}
catch ( const std::exception &e ) {
Expand Down Expand Up @@ -424,6 +438,26 @@ namespace Common {

void collect_telemetry(const std::pair<std::string,int> &provider, std::string &retstring);


/***** Common::extract_correlation_id ***************************************/
/**
* @brief detect and strip a correlation ID prefix from an inter-daemon message
* @details The wire format for a tagged message is:
* "#cid:HHHHHHHH <payload>"
* where HHHHHHHH is exactly CID_HEX_LEN lowercase hex digits and is
* followed by a single space. Untagged messages (e.g. CLI users)
* are left untouched and the function returns false.
* @param[in] input full message as received from the wire
* @param[out] id_out extracted ID on success; unchanged otherwise
* @param[out] payload_out message with the prefix stripped on success; copy of input otherwise
* @return true if a well-formed correlation ID prefix was found, false otherwise
*
*/
bool extract_correlation_id( const std::string &input,
std::string &id_out,
std::string &payload_out );


/***** Common::extract_telemetry_value **************************************/
/**
* @brief extract a correctly typed value from a JSON message using a specific key
Expand Down Expand Up @@ -1189,6 +1223,42 @@ namespace Common {
/**************** Common::Queue *********************************************/


/***** Common::CorrIdCache **************************************************/
/**
* @class CorrIdCache
* @brief bounded TTL cache of recent inter-daemon replies, keyed by correlation ID
* @details Used by daemons to make command retries idempotent: when a tagged
* command is received whose correlation ID matches a cached entry,
* the previously-computed reply is replayed verbatim and the
* underlying handler is NOT re-invoked. Entries expire after
* TTL_SECONDS or when the cache reaches MAX_ENTRIES, whichever
* comes first. The cache is intentionally small and bounded; it
* guards only against same-second retries by DaemonClient::send.
*
* The stored reply is the daemon's bare retstring as it would have
* been written to the wire WITHOUT any correlation ID prefix; the
* caller is responsible for prepending the prefix on replay.
*
*/
class CorrIdCache {
public:
static constexpr size_t MAX_ENTRIES = 64; ///< max cached replies per server
static constexpr int TTL_SECONDS = 60; ///< per-entry lifetime in seconds

bool lookup( const std::string &id, std::string &reply_out );
void insert( const std::string &id, const std::string &reply );

private:
struct Entry {
std::string reply;
std::chrono::steady_clock::time_point expires;
};
std::map<std::string, Entry> cache;
std::mutex mtx;
};
/**************** Common::CorrIdCache ***************************************/


/***** Common::DaemonClient *********************************************************/
/**
* @class DaemonClient
Expand Down
11 changes: 11 additions & 0 deletions messaged/messaged.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ void runbroker() {
void* xsub_socket = zmq_socket(context, ZMQ_XSUB);
void* xpub_socket = zmq_socket(context, ZMQ_XPUB);

// unlimited queues: prevent silent drops when any daemon is slow to drain.
// LINGER=0: broker exits cleanly without blocking on pending messages.
//
int zero = 0;
zmq_setsockopt( xsub_socket, ZMQ_SNDHWM, &zero, sizeof(zero) );
zmq_setsockopt( xsub_socket, ZMQ_RCVHWM, &zero, sizeof(zero) );
zmq_setsockopt( xsub_socket, ZMQ_LINGER, &zero, sizeof(zero) );
zmq_setsockopt( xpub_socket, ZMQ_SNDHWM, &zero, sizeof(zero) );
zmq_setsockopt( xpub_socket, ZMQ_RCVHWM, &zero, sizeof(zero) );
zmq_setsockopt( xpub_socket, ZMQ_LINGER, &zero, sizeof(zero) );

// bind the sockets
//
zmq_bind(xsub_socket, "tcp://127.0.0.1:5555");
Expand Down
Loading
Loading