diff --git a/acamd/acam_server.cpp b/acamd/acam_server.cpp index 571207b2..082cf6b5 100644 --- a/acamd/acam_server.cpp +++ b/acamd/acam_server.cpp @@ -592,6 +592,31 @@ namespace Acam { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -964,6 +989,21 @@ namespace Acam { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/acamd/acam_server.h b/acamd/acam_server.h index 1f328638..d3210b5e 100644 --- a/acamd/acam_server.h +++ b/acamd/acam_server.h @@ -92,6 +92,7 @@ namespace Acam { std::string asyncgroup; ///< asynchronous multicast group std::atomic cmd_num; ///< keep a running tally of number of commands received by acamd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist Config config; ///< create a Config object for reading the configuration file diff --git a/calibd/calib_server.cpp b/calibd/calib_server.cpp index 416510fc..73460ae7 100644 --- a/calibd/calib_server.cpp +++ b/calibd/calib_server.cpp @@ -476,6 +476,31 @@ namespace Calib { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -649,6 +674,21 @@ namespace Calib { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/calibd/calib_server.h b/calibd/calib_server.h index 7f0cda61..9c9d9cb2 100644 --- a/calibd/calib_server.h +++ b/calibd/calib_server.h @@ -81,6 +81,7 @@ namespace Calib { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index efc60e02..a026a5bc 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -432,6 +432,31 @@ void doit(Network::TcpSocket &sock) { sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\r' ), sbuf.end()); sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\n' ), sbuf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( sbuf, corr_id, payload ); + sbuf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( server.corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if ( sbuf.empty() ) sbuf="help"; // no command automatically displays help try { @@ -829,6 +854,21 @@ void doit(Network::TcpSocket &sock) { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + server.corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/camerad/camerad.h b/camerad/camerad.h index d38dcde8..15618461 100644 --- a/camerad/camerad.h +++ b/camerad/camerad.h @@ -87,6 +87,7 @@ namespace Camera { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::vector jclient_ports; diff --git a/common/common.cpp b/common/common.cpp index bfd417e8..149d225c 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -7,6 +7,9 @@ #include "common.h" +#include +#include + namespace Common { /***** Common::Broadcaster::emit ********************************************/ @@ -69,6 +72,50 @@ namespace Common { /***** Common::collect_telemetry ********************************************/ + /***** Common::extract_correlation_id ***************************************/ + /** + * @brief detect and strip a correlation ID prefix from an inter-daemon message + * @details Recognizes the wire format "#cid:HHHHHHHH " where the ID + * is exactly CID_HEX_LEN hex digits followed by a single space. + * Tolerates upper- and lower-case hex on input. The function makes + * no assumption about what follows the prefix and never modifies it. + * If no valid prefix is present, payload_out is set to a copy of + * input and id_out is left unchanged so callers may detect the + * "no ID" case via the boolean return. + * @param[in] input full message as received from the wire + * @param[out] id_out extracted ID on success; unchanged otherwise + * @param[out] payload_out message with the prefix stripped on success; copy of input otherwise + * @return true if a well-formed correlation ID prefix was found, false otherwise + * + */ + bool extract_correlation_id( const std::string &input, + std::string &id_out, + std::string &payload_out ) { + if ( input.size() < CID_HEADER_LEN ) { + payload_out = input; + return false; + } + if ( input.compare( 0, CID_PREFIX.size(), CID_PREFIX ) != 0 ) { + payload_out = input; + return false; + } + for ( size_t i = CID_PREFIX.size(); i < CID_PREFIX.size() + CID_HEX_LEN; ++i ) { + if ( !std::isxdigit( static_cast( input[i] ) ) ) { + payload_out = input; + return false; + } + } + if ( input[ CID_PREFIX.size() + CID_HEX_LEN ] != ' ' ) { + payload_out = input; + return false; + } + id_out = input.substr( CID_PREFIX.size(), CID_HEX_LEN ); + payload_out = input.substr( CID_HEADER_LEN ); + return true; + } + /***** Common::extract_correlation_id ***************************************/ + + /***** Common::Queue::enqueue ***********************************************/ /** * @brief puts a message into the queue @@ -604,6 +651,30 @@ namespace Common { /***** Common::DaemonClient::async **************************************************/ + namespace { + /***** generate_cid *******************************************************/ + /** + * @brief generate an 8-char lowercase-hex correlation ID + * @details Uses a thread-local Mersenne Twister seeded once per thread + * from std::random_device. Collisions are not security-sensitive + * here; we only need IDs to differ between adjacent in-flight + * commands so a stale reply can be detected. With 32 bits of + * state per ID and at most one in-flight command per client, + * accidental collisions are vanishingly rare. + * @return 8-character hex string (no prefix, no separator) + * + */ + std::string generate_cid() { + thread_local std::mt19937 rng{ std::random_device{}() }; + std::uniform_int_distribution dist; + std::ostringstream oss; + oss << std::hex << std::setw( static_cast( CID_HEX_LEN ) ) << std::setfill( '0' ) << dist( rng ); + return oss.str(); + } + /***** generate_cid *******************************************************/ + } + + /***** Common::DaemonClient::send ***************************************************/ /** * @brief send a command, read the reply @@ -657,6 +728,16 @@ namespace Common { return ERROR; } + // Generate a correlation ID for this command and prepend it as a wire-level + // prefix. The prefix sits ahead of any user payload and any line terminator + // so daemons can detect, strip, and echo it without disturbing dispatch. + // Receiving a reply that does not carry this exact ID indicates the reply + // is stale (e.g. arrived on a reconnected socket from before the drop) and + // must not be matched to this command. + // + const std::string cid = generate_cid(); + command = CID_PREFIX + cid + " " + command; + // Determine whether to use the override values or not // for this call only. // @@ -708,12 +789,14 @@ namespace Common { << " on fd " << this->socket.getfd() << " for " << this->name; logwrite( function, message.str() ); this->timedout=true; + break; // daemon is busy, not gone — do not resend } else if ( pollret < 0 && errno ) { // this is probably a real error message.str(""); message << "ERROR polling socket " << this->socket.gethost() << "/" << this->socket.getport() << " on fd " << this->socket.getfd() << " for " << this->name << ": " << strerror(errno); logwrite( function, message.str() ); + break; // real socket error — do not resend } else if ( pollret < 0 && !errno ) { // this is probably a stale fd @@ -722,7 +805,7 @@ namespace Common { logwrite( function, message.str() ); } - // if still here then reconnect, sleep 1s, try again + // stale fd only: reconnect and try again // lock.unlock(); error = this->connect(); @@ -789,16 +872,116 @@ namespace Common { reply.erase( std::remove(reply.begin(), reply.end(), '\r' ), reply.end() ); reply.erase( std::remove(reply.begin(), reply.end(), '\n' ), reply.end() ); - // If the reply contains "ERROR" then return ERROR, otherwise NO_ERROR. + // Verify the correlation ID echoed back by the daemon matches the one we + // sent. Any non-empty reply that does not carry the expected prefix is + // stale - typically a delayed reply from a previous command that landed + // on the socket after a reconnect, or an out-of-order frame. Treat such + // a reply as a failure so callers never act on data they cannot attribute. + // Empty replies are left alone here and are handled by the empty-reply + // branch of the classification below. // - if ( reply.find( std::string( "ERROR" ) ) != std::string::npos ) { - return( ERROR ); + if ( !reply.empty() ) { + const std::string expected_prefix = CID_PREFIX + cid + " "; + if ( reply.compare( 0, expected_prefix.size(), expected_prefix ) == 0 ) { + reply.erase( 0, expected_prefix.size() ); + } + else { + message.str(""); message << "ERROR stale or unrecognized reply from " << this->name + << " (expected ID " << cid << "): \"" << reply << "\""; + logwrite( function, message.str() ); + reply.clear(); + // drain any further queued replies so stale data does not persist into the next send() call + while ( this->socket.Poll(0) > 0 ) { + std::string discard; + ( term_with_string_actual ? socket.Read( discard, term_str_read_actual ) + : socket.Read( discard, term_read ) ); + } + } } - else return( NO_ERROR ); + + // Classify the reply: + // "ERROR" in reply → command failed + // empty reply → socket was lost before any response arrived; treat + // as failure so callers are never silently misled + // anything else → success (covers "DONE", JSON payloads, query + // results such as "yes"/"no", state strings, etc.) + // + // Note: CLI help output ("HELP" mode) is never produced on inter-daemon + // command channels, so there is no need to special-case it here. + // + if ( reply.find( "ERROR" ) != std::string::npos ) return ERROR; + if ( reply.empty() ) return ERROR; + return NO_ERROR; } /***** Common::DaemonClient::send ***************************************************/ + /***** Common::CorrIdCache::lookup ******************************************/ + /** + * @brief look up a previously-cached reply by correlation ID + * @details Returns true if the ID is present and not yet expired, in which + * case the cached reply is copied to reply_out. If the ID is + * present but expired, the entry is purged and false is returned. + * @param[in] id correlation ID to look up + * @param[out] reply_out reply string written on hit; unchanged on miss + * @return true on cache hit, false on miss or expired + * + */ + bool CorrIdCache::lookup( const std::string &id, std::string &reply_out ) { + std::lock_guard lock( mtx ); + auto it = cache.find( id ); + if ( it == cache.end() ) return false; + if ( std::chrono::steady_clock::now() >= it->second.expires ) { + cache.erase( it ); + return false; + } + reply_out = it->second.reply; + return true; + } + /***** Common::CorrIdCache::lookup ******************************************/ + + + /***** Common::CorrIdCache::insert ******************************************/ + /** + * @brief store a reply under a correlation ID with TTL + * @details Opportunistically prunes expired entries on each call. If the + * cache is still at MAX_ENTRIES capacity after pruning, the + * entry expiring soonest is evicted to make room. An existing + * entry under the same ID is overwritten (which is the desired + * behavior when a slow handler completes and the client has + * already retried). + * @param[in] id correlation ID to store under + * @param[in] reply bare reply string to cache (must NOT include CID prefix) + * + */ + void CorrIdCache::insert( const std::string &id, const std::string &reply ) { + std::lock_guard lock( mtx ); + const auto now = std::chrono::steady_clock::now(); + + // prune expired entries to bound memory growth + // + for ( auto it = cache.begin(); it != cache.end(); ) { + if ( now >= it->second.expires ) it = cache.erase( it ); + else ++it; + } + + // if still at capacity, evict the entry that expires soonest + // + if ( cache.size() >= MAX_ENTRIES ) { + auto oldest = cache.begin(); + for ( auto it = cache.begin(); it != cache.end(); ++it ) { + if ( it->second.expires < oldest->second.expires ) oldest = it; + } + cache.erase( oldest ); + } + + Entry &entry = cache[id]; + entry.reply = reply; + entry.expires = now + std::chrono::seconds( TTL_SECONDS ); + } + /***** Common::CorrIdCache::insert ******************************************/ + + /***** Common::DaemonClient::dothread_command ***************************************/ /** * @brief sends a command to a daemon in a thread diff --git a/common/common.h b/common/common.h index e5e1ca02..a560de89 100644 --- a/common/common.h +++ b/common/common.h @@ -38,6 +38,10 @@ const std::string JEOF = "EOF\n"; ///< used to terminate JSON messa const std::string TELEMREQUEST = "sendtelem"; ///< common daemon command used to request telemetry const std::string SNAPSHOT = "snapshot"; ///< common daemon command forces publish of telemetry +const std::string CID_PREFIX = "#cid:"; ///< correlation ID marker for inter-daemon commands +constexpr size_t CID_HEX_LEN = 8; ///< number of hex chars in a correlation ID +constexpr size_t CID_HEADER_LEN = 5 + CID_HEX_LEN + 1; ///< total prefix length: "#cid:" + 8 hex + 1 space + constexpr bool EXT = true; ///< constant for use_extension arg of Common::Header::add_key() constexpr bool PRI = !EXT; ///< constant for use_extension arg of Common::Header::add_key() @@ -424,6 +428,26 @@ namespace Common { void collect_telemetry(const std::pair &provider, std::string &retstring); + + /***** Common::extract_correlation_id ***************************************/ + /** + * @brief detect and strip a correlation ID prefix from an inter-daemon message + * @details The wire format for a tagged message is: + * "#cid:HHHHHHHH " + * where HHHHHHHH is exactly CID_HEX_LEN lowercase hex digits and is + * followed by a single space. Untagged messages (e.g. CLI users) + * are left untouched and the function returns false. + * @param[in] input full message as received from the wire + * @param[out] id_out extracted ID on success; unchanged otherwise + * @param[out] payload_out message with the prefix stripped on success; copy of input otherwise + * @return true if a well-formed correlation ID prefix was found, false otherwise + * + */ + bool extract_correlation_id( const std::string &input, + std::string &id_out, + std::string &payload_out ); + + /***** Common::extract_telemetry_value **************************************/ /** * @brief extract a correctly typed value from a JSON message using a specific key @@ -1189,6 +1213,42 @@ namespace Common { /**************** Common::Queue *********************************************/ + /***** Common::CorrIdCache **************************************************/ + /** + * @class CorrIdCache + * @brief bounded TTL cache of recent inter-daemon replies, keyed by correlation ID + * @details Used by daemons to make command retries idempotent: when a tagged + * command is received whose correlation ID matches a cached entry, + * the previously-computed reply is replayed verbatim and the + * underlying handler is NOT re-invoked. Entries expire after + * TTL_SECONDS or when the cache reaches MAX_ENTRIES, whichever + * comes first. The cache is intentionally small and bounded; it + * guards only against same-second retries by DaemonClient::send. + * + * The stored reply is the daemon's bare retstring as it would have + * been written to the wire WITHOUT any correlation ID prefix; the + * caller is responsible for prepending the prefix on replay. + * + */ + class CorrIdCache { + public: + static constexpr size_t MAX_ENTRIES = 64; ///< max cached replies per server + static constexpr int TTL_SECONDS = 60; ///< per-entry lifetime in seconds + + bool lookup( const std::string &id, std::string &reply_out ); + void insert( const std::string &id, const std::string &reply ); + + private: + struct Entry { + std::string reply; + std::chrono::steady_clock::time_point expires; + }; + std::map cache; + std::mutex mtx; + }; + /**************** Common::CorrIdCache ***************************************/ + + /***** Common::DaemonClient *********************************************************/ /** * @class DaemonClient diff --git a/flexured/flexure_server.cpp b/flexured/flexure_server.cpp index f7365737..e3be7f59 100644 --- a/flexured/flexure_server.cpp +++ b/flexured/flexure_server.cpp @@ -387,6 +387,31 @@ namespace Flexure { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -568,6 +593,21 @@ namespace Flexure { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/flexured/flexure_server.h b/flexured/flexure_server.h index fa7a47f6..5b8faec7 100644 --- a/flexured/flexure_server.h +++ b/flexured/flexure_server.h @@ -73,6 +73,7 @@ namespace Flexure { std::atomic cmd_num; std::atomic threads_active; ///< number of blocking threads that exist + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands NumberPool id_pool; ///< creates a number pool used for socket ids diff --git a/focusd/focus_server.cpp b/focusd/focus_server.cpp index 18826b22..0a8b4f1b 100644 --- a/focusd/focus_server.cpp +++ b/focusd/focus_server.cpp @@ -454,6 +454,31 @@ namespace Focus { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -641,6 +666,21 @@ namespace Focus { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/focusd/focus_server.h b/focusd/focus_server.h index c54d26a6..441afd0e 100644 --- a/focusd/focus_server.h +++ b/focusd/focus_server.h @@ -81,6 +81,7 @@ namespace Focus { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/powerd/power_server.cpp b/powerd/power_server.cpp index 3ebd3d29..48699065 100644 --- a/powerd/power_server.cpp +++ b/powerd/power_server.cpp @@ -465,6 +465,31 @@ namespace Power { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -637,6 +662,21 @@ namespace Power { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/powerd/power_server.h b/powerd/power_server.h index 89a73c97..3cd1aade 100644 --- a/powerd/power_server.h +++ b/powerd/power_server.h @@ -81,6 +81,7 @@ namespace Power { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands bool open_on_start; ///< should daemon automatically open connection on startup? diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 8c673580..7e31f676 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -98,10 +98,10 @@ namespace Sequencer { // when it re-acquires fineacquire_mtx inside fineacquire_cv.wait(). std::lock_guard lock(this->fineacquire_mtx); if ( has_running ) { - this->is_fineacquire_running.store( running, std::memory_order_relaxed ); + this->is_fineacquire_running.store( running ); } if ( has_locked ) { - this->is_fineacquire_locked.store( locked, std::memory_order_relaxed ); + this->is_fineacquire_locked.store( locked ); } this->fineacquire_cv.notify_all(); } @@ -154,11 +154,11 @@ namespace Sequencer { // Store under the mutex so the writes are visible to any waiter's predicate // when it re-acquires acam_mtx inside acam_cv.wait(). std::lock_guard lock(this->acam_mtx); - this->is_acam_guiding.store( acquired, std::memory_order_relaxed ); + this->is_acam_guiding.store( acquired ); if ( has_mode ) { - this->is_acam_acquiring.store( acquiring, std::memory_order_relaxed ); + this->is_acam_acquiring.store( acquiring ); } - this->acam_pubtime.store( pubtime, std::memory_order_relaxed ); + this->acam_pubtime.store( pubtime ); this->acam_cv.notify_all(); } /***** Sequencer::Sequence::handletopic_acamd ******************************/ @@ -935,7 +935,7 @@ namespace Sequencer { // send two commands, one for each if (!activechans.str().empty()) { std::string cmd = CAMERAD_ACTIVATE + activechans.str(); - if (this->camerad.send(cmd, reply)!=NO_ERROR) { + if (this->camerad.send(cmd, reply, 12000)!=NO_ERROR) { logwrite( function, "ERROR sending \""+cmd+"\": "+reply); throw std::runtime_error("camera returned "+reply); } @@ -1964,9 +1964,16 @@ namespace Sequencer { } } - // Ask if all devices use frame transfer + // Ask if all devices use frame transfer. The reply is expected to be a + // "yes"/"no" token followed by " DONE". An empty or non-confirming reply + // would silently leave is_science_frame_transfer in the wrong state, which + // controls whether the readout wait is entered. // - this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ); + if ( this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR querying frame transfer state: no confirmation (reply=\""+reply+"\")" ); + throw std::runtime_error("querying camera frame transfer state"); + } this->is_science_frame_transfer = ( reply.find("yes") != std::string::npos ); this->thread_error_manager.clear( THR_CAMERA_INIT ); // success @@ -2015,10 +2022,15 @@ namespace Sequencer { throw std::runtime_error("no connection to camera"); } - // send all of the epilogue commands + // send all of the epilogue commands. Log but do not abort on failure: + // shutdown must continue regardless so power-off can complete. // for ( const auto &cmd : this->camera_epilogue ) { - this->camerad.command( cmd ); + std::string reply; + if ( this->camerad.command( cmd, reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR sending epilogue command \""+cmd+"\" (reply=\""+reply+"\")" ); + } } // disconnect me from camerad, irrespective of any previous error @@ -2687,12 +2699,15 @@ namespace Sequencer { logwrite( function, "[DEBUG] sending expose command" ); - // Send the EXPOSE command to camera daemon on the non-blocking port and don't wait for reply + // Send the EXPOSE command to camera daemon and wait for the reply. + // Also verify the reply contains "DONE": command_timeout returns NO_ERROR + // whenever the reply does not contain "ERROR", including when the reply is + // empty because the socket was lost and no response was ever received. message.str(""); message << CAMERAD_EXPOSE << " " << this->target.nexp; -// if ( this->camerad.async( message.str() ) != NO_ERROR ) { -// if ( this->camerad.send( message.str(), reply ) != NO_ERROR ) { - if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR ) { - this->broadcast.error( function, "sending camera "+message.str() ); + if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + message.str(""); message << "sending camera expose: no confirmation (reply=\"" << reply << "\")"; + this->broadcast.error( function, message.str() ); this->thread_error_manager.set( THR_TRIGGER_EXPOSURE ); // tell the world this thread had an error this->target.update_state( Sequencer::TARGET_PENDING ); // return the target state to pending this->wait_state_manager.clear( Sequencer::SEQ_WAIT_EXPOSE ); // clear EXPOSE bit @@ -2860,6 +2875,7 @@ namespace Sequencer { long __error=NO_ERROR; // keep track of the error just for this scope int attempt=1; while (attempt <= maxattempts) { + if ( this->cancel_flag.load() ) { __error=ERROR; break; } try { // launch slicecam_init async task and wait for result std::async(std::launch::async, &Sequence::slicecam_init, this).get(); @@ -2889,6 +2905,8 @@ namespace Sequencer { else { this->broadcast.error( function, "exceeded max attempts starting slicecam" ); __error=ERROR; + this->slicecamd.disconnect(); + break; } } catch (const std::exception &e) { @@ -2913,6 +2931,7 @@ namespace Sequencer { long __error=NO_ERROR; // keep track of the error just for this scope int attempt=1; while (attempt <= maxattempts) { + if ( this->cancel_flag.load() ) { __error=ERROR; break; } try { // launch acam_init async task and wait for result std::async(std::launch::async, &Sequence::acam_init, this).get(); @@ -2942,6 +2961,7 @@ namespace Sequencer { else { this->broadcast.error( function, "exceeded max attempts starting acam" ); __error=ERROR; + this->acamd.disconnect(); } } break; @@ -3728,7 +3748,7 @@ namespace Sequencer { cmd << " " << reqstatestr; logwrite( function, "switching plug "+plug+" "+reqstatestr ); error = this->powerd.send( cmd.str(), reply ); - if ( error != NO_ERROR || reply.find(" DONE") != std::string::npos ) { + if ( error != NO_ERROR || reply.find("DONE") == std::string::npos ) { logwrite( function, "ERROR switching plug: "+plug+" "+reqstatestr ); continue; } @@ -4024,6 +4044,7 @@ namespace Sequencer { retstring.append( " isready [ ? ]\n" ); retstring.append( " moveto [ ? | ]\n" ); retstring.append( " notify [ ? ]\n" ); + retstring.append( " ping \n" ); retstring.append( " pause [ ? ]\n" ); retstring.append( " pending [ ? ]\n" ); retstring.append( " targetinfo [ ? ]\n" ); @@ -4897,6 +4918,63 @@ namespace Sequencer { logwrite( function, message.str() ); } } + else + + // --------------------------------------------------------- + // ping -- exercise inter-daemon communication round-trip + // --------------------------------------------------------- + // + if ( testname == "ping" ) { + if ( tokens.size() < 2 ) { + retstring = "usage: test ping "; + logwrite( function, "ERROR no daemon name provided" ); + return ERROR; + } + + const std::map daemon_map = { + { "acamd", &this->acamd }, + { "calibd", &this->calibd }, + { "camerad", &this->camerad }, + { "flexured", &this->flexured }, + { "focusd", &this->focusd }, + { "powerd", &this->powerd }, + { "slicecamd", &this->slicecamd }, + { "slitd", &this->slitd }, + { "tcsd", &this->tcsd } + }; + + auto daemon_it = daemon_map.find( tokens[1] ); + if ( daemon_it == daemon_map.end() ) { + retstring = "unknown daemon: " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + Common::DaemonClient *daemon_ptr = daemon_it->second; + + if ( this->connect_to_daemon( *daemon_ptr ) == ERROR ) { + retstring = "could not connect to " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + message.str(""); message << "sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + + std::string reply; + long send_error = daemon_ptr->send( "isopen", reply ); + + message.str(""); message << "reply from " << tokens[1] << ": \"" << reply << "\""; + logwrite( function, message.str() ); + + retstring = reply; + + if ( send_error != NO_ERROR ) { + message.str(""); message << "ERROR sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + return ERROR; + } + } else { // ---------------------------------------------------- diff --git a/sequencerd/sequence_acquisition.cpp b/sequencerd/sequence_acquisition.cpp index 5aa90a33..fe1cfe57 100644 --- a/sequencerd/sequence_acquisition.cpp +++ b/sequencerd/sequence_acquisition.cpp @@ -45,8 +45,9 @@ namespace Sequencer { // const int64_t freshness_boundary_us = get_time_us() - ACAM_FRESHNESS_GUARD_US; - if ( this->acamd.command( cmd.str(), reply ) != NO_ERROR ) { - logwrite( function, "ERROR sending acquire command to acamd" ); + if ( this->acamd.command( cmd.str(), reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR sending acquire command to acamd: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -110,13 +111,9 @@ namespace Sequencer { // send ACQUIRE STOP command to ACAM std::string reply; - if ( this->acamd.command( ACAMD_ACQUIRE+" stop", reply ) != NO_ERROR ) { - logwrite( function, "ERROR stopping guiding" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR acam: "+reply ); + if ( this->acamd.command( ACAMD_ACQUIRE+" stop", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR stopping guiding: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -162,13 +159,9 @@ namespace Sequencer { ScopedState wait_state(wait_state_manager, Sequencer::SEQ_WAIT_FINEACQUIRE); std::string reply; - if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" start", reply ) != NO_ERROR) { - logwrite( function, "ERROR starting slicecam fine acquisition" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR slicecam fine acquisition mode: "+reply ); + if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" start", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR starting slicecam fine acquisition: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -230,13 +223,9 @@ namespace Sequencer { // send STOP command to SLICECAM std::string reply; - if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" stop", reply ) != NO_ERROR) { - logwrite( function, "ERROR stopping fine acquisition" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR slicecam fine acquisition mode: "+reply ); + if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" stop", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR stopping fine acquisition: no confirmation (reply=\""+reply+"\")" ); return ERROR; } diff --git a/sequencerd/sequencer_server.cpp b/sequencerd/sequencer_server.cpp index c24f3b8e..0fc7aed5 100644 --- a/sequencerd/sequencer_server.cpp +++ b/sequencerd/sequencer_server.cpp @@ -1107,6 +1107,31 @@ namespace Sequencer { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -1623,6 +1648,22 @@ namespace Sequencer { } } else retstring.append( "\n" ); + + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/sequencerd/sequencer_server.h b/sequencerd/sequencer_server.h index 7fff66a5..aacb2ebb 100644 --- a/sequencerd/sequencer_server.h +++ b/sequencerd/sequencer_server.h @@ -114,6 +114,7 @@ namespace Sequencer { int blocking_socket; std::atomic cmd_num; ///< keep a running tally of number of commands received by sequencerd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist NumberPool id_pool; ///< creates a number pool diff --git a/slicecamd/slicecam_server.cpp b/slicecamd/slicecam_server.cpp index 8ec19e6d..d3b35464 100644 --- a/slicecamd/slicecam_server.cpp +++ b/slicecamd/slicecam_server.cpp @@ -396,6 +396,31 @@ namespace Slicecam { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -644,6 +669,21 @@ namespace Slicecam { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/slicecamd/slicecam_server.h b/slicecamd/slicecam_server.h index 565e0a90..5123d220 100644 --- a/slicecamd/slicecam_server.h +++ b/slicecamd/slicecam_server.h @@ -94,6 +94,7 @@ namespace Slicecam { std::string asyncgroup; ///< asynchronous multicast group std::atomic cmd_num; ///< keep a running tally of number of commands received by slicecamd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; ///< create a Config object for reading the configuration file diff --git a/slitd/slit_server.cpp b/slitd/slit_server.cpp index 1c39d8f4..489ab199 100644 --- a/slitd/slit_server.cpp +++ b/slitd/slit_server.cpp @@ -463,6 +463,31 @@ namespace Slit { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -626,6 +651,21 @@ namespace Slit { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/slitd/slit_server.h b/slitd/slit_server.h index 5003023b..939f7124 100644 --- a/slitd/slit_server.h +++ b/slitd/slit_server.h @@ -82,6 +82,7 @@ namespace Slit { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/tcsd/tcs_server.cpp b/tcsd/tcs_server.cpp index 96882f93..11ff5713 100644 --- a/tcsd/tcs_server.cpp +++ b/tcsd/tcs_server.cpp @@ -578,6 +578,31 @@ void doit(TcsIO &tcs_io, const std::string &client_cmd, bool is_slow_command) { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out bool polling = false; @@ -818,6 +843,21 @@ void doit(TcsIO &tcs_io, const std::string &client_cmd, bool is_slow_command) { if ( polling ) retstring.append("\n"); + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/tcsd/tcs_server.h b/tcsd/tcs_server.h index 4c6007b5..aaa3aff9 100644 --- a/tcsd/tcs_server.h +++ b/tcsd/tcs_server.h @@ -85,6 +85,7 @@ namespace TCS { uint16_t blkport; ///< blocking port uint16_t asyncport; ///< asynchronous message port std::atomic cmd_num; ///< keep a running tally of number of commands received by tcsd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist std::string asyncgroup; ///< asynchronous multicast group