From e240cdcdf999684bb6302859685870861ad5689d Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 19 May 2026 00:27:02 -0700 Subject: [PATCH 1/4] fixes how sequencerd sends commands to camerad --- common/common.cpp | 17 ++-- sequencerd/sequence.cpp | 122 ++++++++++++++++------------ sequencerd/sequence_acquisition.cpp | 35 +++----- 3 files changed, 95 insertions(+), 79 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index bfd417e8..bc632147 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -789,12 +789,19 @@ namespace Common { reply.erase( std::remove(reply.begin(), reply.end(), '\r' ), reply.end() ); reply.erase( std::remove(reply.begin(), reply.end(), '\n' ), reply.end() ); - // If the reply contains "ERROR" then return ERROR, otherwise NO_ERROR. + // Classify the reply: + // "ERROR" in reply → command failed + // empty reply → socket was lost before any response arrived; treat + // as failure so callers are never silently misled + // anything else → success (covers "DONE", JSON payloads, query + // results such as "yes"/"no", state strings, etc.) // - if ( reply.find( std::string( "ERROR" ) ) != std::string::npos ) { - return( ERROR ); - } - else return( NO_ERROR ); + // Note: CLI help output ("HELP" mode) is never produced on inter-daemon + // command channels, so there is no need to special-case it here. + // + if ( reply.find( "ERROR" ) != std::string::npos ) return ERROR; + if ( reply.empty() ) return ERROR; + return NO_ERROR; } /***** Common::DaemonClient::send ***************************************************/ diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 352d3d12..408d73bf 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -51,13 +51,11 @@ namespace Sequencer { // when I write to the completed table I will write the actual EXPTIME this->target.column_from_json( "EXPTIME", Key::Camerad::SHUTTERTIME, jmessage ); - // updates my internal state whether the camera allows an exposure + std::lock_guard lock(camerad_mtx); if (jmessage.contains(Key::Camerad::READY)) { int isready = jmessage[Key::Camerad::READY].get(); - this->can_expose.store(isready, std::memory_order_relaxed); + this->can_expose.store(isready); } - - std::lock_guard lock(camerad_mtx); this->camerad_cv.notify_all(); } /***** Sequencer::Sequence::handletopic_camerad ****************************/ @@ -86,21 +84,26 @@ namespace Sequencer { * */ void Sequence::handletopic_slicecamd(const nlohmann::json &jmessage) { - bool changed = false; - if ( jmessage.contains( Key::Slicecamd::FINEACQUIRE_RUNNING ) ) { - this->is_fineacquire_running.store( - jmessage[Key::Slicecamd::FINEACQUIRE_RUNNING].get(), std::memory_order_relaxed ); - changed = true; - } - if ( jmessage.contains( Key::Slicecamd::FINEACQUIRE_LOCKED ) ) { - this->is_fineacquire_locked.store( - jmessage[Key::Slicecamd::FINEACQUIRE_LOCKED].get(), std::memory_order_relaxed ); - changed = true; - } - if ( changed ) { - std::lock_guard lock(this->fineacquire_mtx); - this->fineacquire_cv.notify_all(); - } + const bool has_running = jmessage.contains( Key::Slicecamd::FINEACQUIRE_RUNNING ); + const bool has_locked = jmessage.contains( Key::Slicecamd::FINEACQUIRE_LOCKED ); + if ( !has_running && !has_locked ) return; + + // Parse JSON values before taking the lock (parsing may throw). + const bool running = has_running + ? jmessage[Key::Slicecamd::FINEACQUIRE_RUNNING].get() : false; + const bool locked = has_locked + ? jmessage[Key::Slicecamd::FINEACQUIRE_LOCKED].get() : false; + + // Store under the mutex so the writes are visible to any waiter's predicate + // when it re-acquires fineacquire_mtx inside fineacquire_cv.wait(). + std::lock_guard lock(this->fineacquire_mtx); + if ( has_running ) { + this->is_fineacquire_running.store( running ); + } + if ( has_locked ) { + this->is_fineacquire_locked.store( locked ); + } + this->fineacquire_cv.notify_all(); } /***** Sequencer::Sequence::handletopic_slicecamd **************************/ @@ -133,21 +136,29 @@ namespace Sequencer { * */ void Sequence::handletopic_acamd(const nlohmann::json &jmessage) { - bool acquired; + // Parse JSON values before taking the lock (parsing may throw). + // extract_telemetry_value leaves its out-param unchanged on missing key + // or type mismatch, so default-initialize before the call. + bool acquired = false; Common::extract_telemetry_value( jmessage, Key::Acamd::IS_ACQUIRED, acquired ); - this->is_acam_guiding.store(acquired, std::memory_order_relaxed); // track whether acamd is actively trying to acquire (mode == "acquiring") - if ( jmessage.contains( Key::Acamd::ACQUIRE_MODE ) ) { - const std::string mode = jmessage[Key::Acamd::ACQUIRE_MODE].get(); - this->is_acam_acquiring.store( mode == "acquiring", std::memory_order_relaxed ); - } + const bool has_mode = jmessage.contains( Key::Acamd::ACQUIRE_MODE ); + const bool acquiring = has_mode + ? jmessage[Key::Acamd::ACQUIRE_MODE].get() == "acquiring" + : false; int64_t pubtime=0; Common::extract_telemetry_value( jmessage, Key::PUBTIME, pubtime ); - this->acam_pubtime.store( pubtime, std::memory_order_relaxed ); + // Store under the mutex so the writes are visible to any waiter's predicate + // when it re-acquires acam_mtx inside acam_cv.wait(). std::lock_guard lock(this->acam_mtx); + this->is_acam_guiding.store( acquired ); + if ( has_mode ) { + this->is_acam_acquiring.store( acquiring ); + } + this->acam_pubtime.store( pubtime ); this->acam_cv.notify_all(); } /***** Sequencer::Sequence::handletopic_acamd ******************************/ @@ -465,16 +476,6 @@ namespace Sequencer { {Sequencer::SEQ_WAIT_EXPOSE} ); // clear EXPOSE } - // --------------------------------------------- - // clear READOUT flag on the end-of-frame signal - // --------------------------------------------- - // - if ( statstr.compare( 0, 10, "FRAMECOUNT" ) == 0 ) { // async message tag FRAMECOUNT - if ( seq.wait_state_manager.is_set( Sequencer::SEQ_WAIT_READOUT ) ) { - seq.wait_state_manager.clear( Sequencer::SEQ_WAIT_READOUT ); - } - } - // --------------------- // process TEST messages // --------------------- @@ -825,15 +826,19 @@ namespace Sequencer { << " id " << this->target.obsid << " order " << this->target.obsorder; logwrite( function, message.str() ); - // If not using frame transfer then wait for readout, too + // Wait for all N exposures to complete across all active channels. + // camerad publishes can_expose=true (READY key) only after the last channel + // of the last exposure finishes — the correct completion signal for both + // single and multi-exposure sequences. Skip the wait for frame transfer. // if (!this->is_science_frame_transfer) { logwrite( function, "waiting for readout" ); - while ( !this->cancel_flag.load() && wait_state_manager.is_set( Sequencer::SEQ_WAIT_READOUT ) ) { - std::unique_lock lock(cv_mutex); - this->cv.wait( lock, [this]() { return( !wait_state_manager.is_set(SEQ_WAIT_READOUT) || this->cancel_flag.load() ); } ); - } + std::unique_lock lock(this->camerad_mtx); + this->camerad_cv.wait( lock, [this]() { + return this->can_expose.load() || this->cancel_flag.load(); + } ); } + this->wait_state_manager.clear( Sequencer::SEQ_WAIT_READOUT ); // Now that we're done waiting, check for errors or abort // @@ -1959,9 +1964,16 @@ namespace Sequencer { } } - // Ask if all devices use frame transfer + // Ask if all devices use frame transfer. The reply is expected to be a + // "yes"/"no" token followed by " DONE". An empty or non-confirming reply + // would silently leave is_science_frame_transfer in the wrong state, which + // controls whether the readout wait is entered. // - this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ); + if ( this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR querying frame transfer state: no confirmation (reply=\""+reply+"\")" ); + throw std::runtime_error("querying camera frame transfer state"); + } this->is_science_frame_transfer = ( reply.find("yes") != std::string::npos ); this->thread_error_manager.clear( THR_CAMERA_INIT ); // success @@ -2010,10 +2022,15 @@ namespace Sequencer { throw std::runtime_error("no connection to camera"); } - // send all of the epilogue commands + // send all of the epilogue commands. Log but do not abort on failure: + // shutdown must continue regardless so power-off can complete. // for ( const auto &cmd : this->camera_epilogue ) { - this->camerad.command( cmd ); + std::string reply; + if ( this->camerad.command( cmd, reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR sending epilogue command \""+cmd+"\" (reply=\""+reply+"\")" ); + } } // disconnect me from camerad, irrespective of any previous error @@ -2682,12 +2699,15 @@ namespace Sequencer { logwrite( function, "[DEBUG] sending expose command" ); - // Send the EXPOSE command to camera daemon on the non-blocking port and don't wait for reply + // Send the EXPOSE command to camera daemon and wait for the reply. + // Also verify the reply contains "DONE": command_timeout returns NO_ERROR + // whenever the reply does not contain "ERROR", including when the reply is + // empty because the socket was lost and no response was ever received. message.str(""); message << CAMERAD_EXPOSE << " " << this->target.nexp; -// if ( this->camerad.async( message.str() ) != NO_ERROR ) { -// if ( this->camerad.send( message.str(), reply ) != NO_ERROR ) { - if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR ) { - this->broadcast.error( function, "sending camera "+message.str() ); + if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + message.str(""); message << "sending camera expose: no confirmation (reply=\"" << reply << "\")"; + this->broadcast.error( function, message.str() ); this->thread_error_manager.set( THR_TRIGGER_EXPOSURE ); // tell the world this thread had an error this->target.update_state( Sequencer::TARGET_PENDING ); // return the target state to pending this->wait_state_manager.clear( Sequencer::SEQ_WAIT_EXPOSE ); // clear EXPOSE bit @@ -3723,7 +3743,7 @@ namespace Sequencer { cmd << " " << reqstatestr; logwrite( function, "switching plug "+plug+" "+reqstatestr ); error = this->powerd.send( cmd.str(), reply ); - if ( error != NO_ERROR || reply.find(" DONE") != std::string::npos ) { + if ( error != NO_ERROR || reply.find(" DONE") == std::string::npos ) { logwrite( function, "ERROR switching plug: "+plug+" "+reqstatestr ); continue; } diff --git a/sequencerd/sequence_acquisition.cpp b/sequencerd/sequence_acquisition.cpp index 5aa90a33..fe1cfe57 100644 --- a/sequencerd/sequence_acquisition.cpp +++ b/sequencerd/sequence_acquisition.cpp @@ -45,8 +45,9 @@ namespace Sequencer { // const int64_t freshness_boundary_us = get_time_us() - ACAM_FRESHNESS_GUARD_US; - if ( this->acamd.command( cmd.str(), reply ) != NO_ERROR ) { - logwrite( function, "ERROR sending acquire command to acamd" ); + if ( this->acamd.command( cmd.str(), reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR sending acquire command to acamd: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -110,13 +111,9 @@ namespace Sequencer { // send ACQUIRE STOP command to ACAM std::string reply; - if ( this->acamd.command( ACAMD_ACQUIRE+" stop", reply ) != NO_ERROR ) { - logwrite( function, "ERROR stopping guiding" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR acam: "+reply ); + if ( this->acamd.command( ACAMD_ACQUIRE+" stop", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR stopping guiding: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -162,13 +159,9 @@ namespace Sequencer { ScopedState wait_state(wait_state_manager, Sequencer::SEQ_WAIT_FINEACQUIRE); std::string reply; - if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" start", reply ) != NO_ERROR) { - logwrite( function, "ERROR starting slicecam fine acquisition" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR slicecam fine acquisition mode: "+reply ); + if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" start", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR starting slicecam fine acquisition: no confirmation (reply=\""+reply+"\")" ); return ERROR; } @@ -230,13 +223,9 @@ namespace Sequencer { // send STOP command to SLICECAM std::string reply; - if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" stop", reply ) != NO_ERROR) { - logwrite( function, "ERROR stopping fine acquisition" ); - return ERROR; - } - - if ( reply.find("ERROR") != std::string::npos ) { - logwrite( function, "ERROR slicecam fine acquisition mode: "+reply ); + if (this->slicecamd.command( SLICECAMD_FINEACQUIRE+" stop", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR stopping fine acquisition: no confirmation (reply=\""+reply+"\")" ); return ERROR; } From 4882d8af60c4eff83e12b5cba58d69085c9e8a95 Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 19 May 2026 03:42:24 -0700 Subject: [PATCH 2/4] adds a correlaton ID to inter-daemon messages to improve reliability --- acamd/acam_server.cpp | 40 ++++++++ acamd/acam_server.h | 1 + calibd/calib_server.cpp | 40 ++++++++ calibd/calib_server.h | 1 + camerad/camerad.cpp | 40 ++++++++ camerad/camerad.h | 1 + common/common.cpp | 168 ++++++++++++++++++++++++++++++++ common/common.h | 60 ++++++++++++ flexured/flexure_server.cpp | 40 ++++++++ flexured/flexure_server.h | 1 + focusd/focus_server.cpp | 40 ++++++++ focusd/focus_server.h | 1 + powerd/power_server.cpp | 40 ++++++++ powerd/power_server.h | 1 + sequencerd/sequence.cpp | 60 +++++++++++- sequencerd/sequencer_server.cpp | 41 ++++++++ sequencerd/sequencer_server.h | 1 + slicecamd/slicecam_server.cpp | 40 ++++++++ slicecamd/slicecam_server.h | 1 + slitd/slit_server.cpp | 40 ++++++++ slitd/slit_server.h | 1 + tcsd/tcs_server.cpp | 40 ++++++++ tcsd/tcs_server.h | 1 + 23 files changed, 698 insertions(+), 1 deletion(-) diff --git a/acamd/acam_server.cpp b/acamd/acam_server.cpp index 571207b2..082cf6b5 100644 --- a/acamd/acam_server.cpp +++ b/acamd/acam_server.cpp @@ -592,6 +592,31 @@ namespace Acam { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -964,6 +989,21 @@ namespace Acam { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/acamd/acam_server.h b/acamd/acam_server.h index 1f328638..d3210b5e 100644 --- a/acamd/acam_server.h +++ b/acamd/acam_server.h @@ -92,6 +92,7 @@ namespace Acam { std::string asyncgroup; ///< asynchronous multicast group std::atomic cmd_num; ///< keep a running tally of number of commands received by acamd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist Config config; ///< create a Config object for reading the configuration file diff --git a/calibd/calib_server.cpp b/calibd/calib_server.cpp index 416510fc..73460ae7 100644 --- a/calibd/calib_server.cpp +++ b/calibd/calib_server.cpp @@ -476,6 +476,31 @@ namespace Calib { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -649,6 +674,21 @@ namespace Calib { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/calibd/calib_server.h b/calibd/calib_server.h index 7f0cda61..9c9d9cb2 100644 --- a/calibd/calib_server.h +++ b/calibd/calib_server.h @@ -81,6 +81,7 @@ namespace Calib { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index efc60e02..a026a5bc 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -432,6 +432,31 @@ void doit(Network::TcpSocket &sock) { sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\r' ), sbuf.end()); sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\n' ), sbuf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( sbuf, corr_id, payload ); + sbuf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( server.corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if ( sbuf.empty() ) sbuf="help"; // no command automatically displays help try { @@ -829,6 +854,21 @@ void doit(Network::TcpSocket &sock) { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + server.corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/camerad/camerad.h b/camerad/camerad.h index d38dcde8..15618461 100644 --- a/camerad/camerad.h +++ b/camerad/camerad.h @@ -87,6 +87,7 @@ namespace Camera { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::vector jclient_ports; diff --git a/common/common.cpp b/common/common.cpp index bc632147..7ef84dfb 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -7,6 +7,9 @@ #include "common.h" +#include +#include + namespace Common { /***** Common::Broadcaster::emit ********************************************/ @@ -69,6 +72,50 @@ namespace Common { /***** Common::collect_telemetry ********************************************/ + /***** Common::extract_correlation_id ***************************************/ + /** + * @brief detect and strip a correlation ID prefix from an inter-daemon message + * @details Recognizes the wire format "#cid:HHHHHHHH " where the ID + * is exactly CID_HEX_LEN hex digits followed by a single space. + * Tolerates upper- and lower-case hex on input. The function makes + * no assumption about what follows the prefix and never modifies it. + * If no valid prefix is present, payload_out is set to a copy of + * input and id_out is left unchanged so callers may detect the + * "no ID" case via the boolean return. + * @param[in] input full message as received from the wire + * @param[out] id_out extracted ID on success; unchanged otherwise + * @param[out] payload_out message with the prefix stripped on success; copy of input otherwise + * @return true if a well-formed correlation ID prefix was found, false otherwise + * + */ + bool extract_correlation_id( const std::string &input, + std::string &id_out, + std::string &payload_out ) { + if ( input.size() < CID_HEADER_LEN ) { + payload_out = input; + return false; + } + if ( input.compare( 0, CID_PREFIX.size(), CID_PREFIX ) != 0 ) { + payload_out = input; + return false; + } + for ( size_t i = CID_PREFIX.size(); i < CID_PREFIX.size() + CID_HEX_LEN; ++i ) { + if ( !std::isxdigit( static_cast( input[i] ) ) ) { + payload_out = input; + return false; + } + } + if ( input[ CID_PREFIX.size() + CID_HEX_LEN ] != ' ' ) { + payload_out = input; + return false; + } + id_out = input.substr( CID_PREFIX.size(), CID_HEX_LEN ); + payload_out = input.substr( CID_HEADER_LEN ); + return true; + } + /***** Common::extract_correlation_id ***************************************/ + + /***** Common::Queue::enqueue ***********************************************/ /** * @brief puts a message into the queue @@ -604,6 +651,30 @@ namespace Common { /***** Common::DaemonClient::async **************************************************/ + namespace { + /***** generate_cid *******************************************************/ + /** + * @brief generate an 8-char lowercase-hex correlation ID + * @details Uses a thread-local Mersenne Twister seeded once per thread + * from std::random_device. Collisions are not security-sensitive + * here; we only need IDs to differ between adjacent in-flight + * commands so a stale reply can be detected. With 32 bits of + * state per ID and at most one in-flight command per client, + * accidental collisions are vanishingly rare. + * @return 8-character hex string (no prefix, no separator) + * + */ + std::string generate_cid() { + thread_local std::mt19937 rng{ std::random_device{}() }; + std::uniform_int_distribution dist; + std::ostringstream oss; + oss << std::hex << std::setw( static_cast( CID_HEX_LEN ) ) << std::setfill( '0' ) << dist( rng ); + return oss.str(); + } + /***** generate_cid *******************************************************/ + } + + /***** Common::DaemonClient::send ***************************************************/ /** * @brief send a command, read the reply @@ -657,6 +728,16 @@ namespace Common { return ERROR; } + // Generate a correlation ID for this command and prepend it as a wire-level + // prefix. The prefix sits ahead of any user payload and any line terminator + // so daemons can detect, strip, and echo it without disturbing dispatch. + // Receiving a reply that does not carry this exact ID indicates the reply + // is stale (e.g. arrived on a reconnected socket from before the drop) and + // must not be matched to this command. + // + const std::string cid = generate_cid(); + command = CID_PREFIX + cid + " " + command; + // Determine whether to use the override values or not // for this call only. // @@ -789,6 +870,27 @@ namespace Common { reply.erase( std::remove(reply.begin(), reply.end(), '\r' ), reply.end() ); reply.erase( std::remove(reply.begin(), reply.end(), '\n' ), reply.end() ); + // Verify the correlation ID echoed back by the daemon matches the one we + // sent. Any non-empty reply that does not carry the expected prefix is + // stale - typically a delayed reply from a previous command that landed + // on the socket after a reconnect, or an out-of-order frame. Treat such + // a reply as a failure so callers never act on data they cannot attribute. + // Empty replies are left alone here and are handled by the empty-reply + // branch of the classification below. + // + if ( !reply.empty() ) { + const std::string expected_prefix = CID_PREFIX + cid + " "; + if ( reply.compare( 0, expected_prefix.size(), expected_prefix ) == 0 ) { + reply.erase( 0, expected_prefix.size() ); + } + else { + message.str(""); message << "ERROR stale or unrecognized reply from " << this->name + << " (expected ID " << cid << "): \"" << reply << "\""; + logwrite( function, message.str() ); + reply.clear(); + } + } + // Classify the reply: // "ERROR" in reply → command failed // empty reply → socket was lost before any response arrived; treat @@ -806,6 +908,72 @@ namespace Common { /***** Common::DaemonClient::send ***************************************************/ + /***** Common::CorrIdCache::lookup ******************************************/ + /** + * @brief look up a previously-cached reply by correlation ID + * @details Returns true if the ID is present and not yet expired, in which + * case the cached reply is copied to reply_out. If the ID is + * present but expired, the entry is purged and false is returned. + * @param[in] id correlation ID to look up + * @param[out] reply_out reply string written on hit; unchanged on miss + * @return true on cache hit, false on miss or expired + * + */ + bool CorrIdCache::lookup( const std::string &id, std::string &reply_out ) { + std::lock_guard lock( mtx ); + auto it = cache.find( id ); + if ( it == cache.end() ) return false; + if ( std::chrono::steady_clock::now() >= it->second.expires ) { + cache.erase( it ); + return false; + } + reply_out = it->second.reply; + return true; + } + /***** Common::CorrIdCache::lookup ******************************************/ + + + /***** Common::CorrIdCache::insert ******************************************/ + /** + * @brief store a reply under a correlation ID with TTL + * @details Opportunistically prunes expired entries on each call. If the + * cache is still at MAX_ENTRIES capacity after pruning, the + * entry expiring soonest is evicted to make room. An existing + * entry under the same ID is overwritten (which is the desired + * behavior when a slow handler completes and the client has + * already retried). + * @param[in] id correlation ID to store under + * @param[in] reply bare reply string to cache (must NOT include CID prefix) + * + */ + void CorrIdCache::insert( const std::string &id, const std::string &reply ) { + std::lock_guard lock( mtx ); + const auto now = std::chrono::steady_clock::now(); + + // prune expired entries to bound memory growth + // + for ( auto it = cache.begin(); it != cache.end(); ) { + if ( now >= it->second.expires ) it = cache.erase( it ); + else ++it; + } + + // if still at capacity, evict the entry that expires soonest + // + if ( cache.size() >= MAX_ENTRIES ) { + auto oldest = cache.begin(); + for ( auto it = cache.begin(); it != cache.end(); ++it ) { + if ( it->second.expires < oldest->second.expires ) oldest = it; + } + cache.erase( oldest ); + } + + Entry &entry = cache[id]; + entry.reply = reply; + entry.expires = now + std::chrono::seconds( TTL_SECONDS ); + } + /***** Common::CorrIdCache::insert ******************************************/ + + /***** Common::DaemonClient::dothread_command ***************************************/ /** * @brief sends a command to a daemon in a thread diff --git a/common/common.h b/common/common.h index c045cfea..d61b9ccd 100644 --- a/common/common.h +++ b/common/common.h @@ -38,6 +38,10 @@ const std::string JEOF = "EOF\n"; ///< used to terminate JSON messa const std::string TELEMREQUEST = "sendtelem"; ///< common daemon command used to request telemetry const std::string SNAPSHOT = "snapshot"; ///< common daemon command forces publish of telemetry +const std::string CID_PREFIX = "#cid:"; ///< correlation ID marker for inter-daemon commands +constexpr size_t CID_HEX_LEN = 8; ///< number of hex chars in a correlation ID +constexpr size_t CID_HEADER_LEN = 5 + CID_HEX_LEN + 1; ///< total prefix length: "#cid:" + 8 hex + 1 space + constexpr bool EXT = true; ///< constant for use_extension arg of Common::Header::add_key() constexpr bool PRI = !EXT; ///< constant for use_extension arg of Common::Header::add_key() @@ -422,6 +426,26 @@ namespace Common { void collect_telemetry(const std::pair &provider, std::string &retstring); + + /***** Common::extract_correlation_id ***************************************/ + /** + * @brief detect and strip a correlation ID prefix from an inter-daemon message + * @details The wire format for a tagged message is: + * "#cid:HHHHHHHH " + * where HHHHHHHH is exactly CID_HEX_LEN lowercase hex digits and is + * followed by a single space. Untagged messages (e.g. CLI users) + * are left untouched and the function returns false. + * @param[in] input full message as received from the wire + * @param[out] id_out extracted ID on success; unchanged otherwise + * @param[out] payload_out message with the prefix stripped on success; copy of input otherwise + * @return true if a well-formed correlation ID prefix was found, false otherwise + * + */ + bool extract_correlation_id( const std::string &input, + std::string &id_out, + std::string &payload_out ); + + /***** Common::extract_telemetry_value **************************************/ /** * @brief extract a correctly typed value from a JSON message using a specific key @@ -1187,6 +1211,42 @@ namespace Common { /**************** Common::Queue *********************************************/ + /***** Common::CorrIdCache **************************************************/ + /** + * @class CorrIdCache + * @brief bounded TTL cache of recent inter-daemon replies, keyed by correlation ID + * @details Used by daemons to make command retries idempotent: when a tagged + * command is received whose correlation ID matches a cached entry, + * the previously-computed reply is replayed verbatim and the + * underlying handler is NOT re-invoked. Entries expire after + * TTL_SECONDS or when the cache reaches MAX_ENTRIES, whichever + * comes first. The cache is intentionally small and bounded; it + * guards only against same-second retries by DaemonClient::send. + * + * The stored reply is the daemon's bare retstring as it would have + * been written to the wire WITHOUT any correlation ID prefix; the + * caller is responsible for prepending the prefix on replay. + * + */ + class CorrIdCache { + public: + static constexpr size_t MAX_ENTRIES = 64; ///< max cached replies per server + static constexpr int TTL_SECONDS = 60; ///< per-entry lifetime in seconds + + bool lookup( const std::string &id, std::string &reply_out ); + void insert( const std::string &id, const std::string &reply ); + + private: + struct Entry { + std::string reply; + std::chrono::steady_clock::time_point expires; + }; + std::map cache; + std::mutex mtx; + }; + /**************** Common::CorrIdCache ***************************************/ + + /***** Common::DaemonClient *********************************************************/ /** * @class DaemonClient diff --git a/flexured/flexure_server.cpp b/flexured/flexure_server.cpp index f7365737..e3be7f59 100644 --- a/flexured/flexure_server.cpp +++ b/flexured/flexure_server.cpp @@ -387,6 +387,31 @@ namespace Flexure { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -568,6 +593,21 @@ namespace Flexure { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/flexured/flexure_server.h b/flexured/flexure_server.h index fa7a47f6..5b8faec7 100644 --- a/flexured/flexure_server.h +++ b/flexured/flexure_server.h @@ -73,6 +73,7 @@ namespace Flexure { std::atomic cmd_num; std::atomic threads_active; ///< number of blocking threads that exist + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands NumberPool id_pool; ///< creates a number pool used for socket ids diff --git a/focusd/focus_server.cpp b/focusd/focus_server.cpp index 18826b22..0a8b4f1b 100644 --- a/focusd/focus_server.cpp +++ b/focusd/focus_server.cpp @@ -454,6 +454,31 @@ namespace Focus { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -641,6 +666,21 @@ namespace Focus { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/focusd/focus_server.h b/focusd/focus_server.h index c54d26a6..441afd0e 100644 --- a/focusd/focus_server.h +++ b/focusd/focus_server.h @@ -81,6 +81,7 @@ namespace Focus { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/powerd/power_server.cpp b/powerd/power_server.cpp index 3ebd3d29..48699065 100644 --- a/powerd/power_server.cpp +++ b/powerd/power_server.cpp @@ -465,6 +465,31 @@ namespace Power { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -637,6 +662,21 @@ namespace Power { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/powerd/power_server.h b/powerd/power_server.h index 89a73c97..3cd1aade 100644 --- a/powerd/power_server.h +++ b/powerd/power_server.h @@ -81,6 +81,7 @@ namespace Power { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands bool open_on_start; ///< should daemon automatically open connection on startup? diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 408d73bf..72ae158c 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -3743,7 +3743,7 @@ namespace Sequencer { cmd << " " << reqstatestr; logwrite( function, "switching plug "+plug+" "+reqstatestr ); error = this->powerd.send( cmd.str(), reply ); - if ( error != NO_ERROR || reply.find(" DONE") == std::string::npos ) { + if ( error != NO_ERROR || reply.find("DONE") == std::string::npos ) { logwrite( function, "ERROR switching plug: "+plug+" "+reqstatestr ); continue; } @@ -4039,6 +4039,7 @@ namespace Sequencer { retstring.append( " isready [ ? ]\n" ); retstring.append( " moveto [ ? | ]\n" ); retstring.append( " notify [ ? ]\n" ); + retstring.append( " ping \n" ); retstring.append( " pause [ ? ]\n" ); retstring.append( " pending [ ? ]\n" ); retstring.append( " targetinfo [ ? ]\n" ); @@ -4912,6 +4913,63 @@ namespace Sequencer { logwrite( function, message.str() ); } } + else + + // --------------------------------------------------------- + // ping -- exercise inter-daemon communication round-trip + // --------------------------------------------------------- + // + if ( testname == "ping" ) { + if ( tokens.size() < 2 ) { + retstring = "usage: test ping "; + logwrite( function, "ERROR no daemon name provided" ); + return ERROR; + } + + const std::map daemon_map = { + { "acamd", &this->acamd }, + { "calibd", &this->calibd }, + { "camerad", &this->camerad }, + { "flexured", &this->flexured }, + { "focusd", &this->focusd }, + { "powerd", &this->powerd }, + { "slicecamd", &this->slicecamd }, + { "slitd", &this->slitd }, + { "tcsd", &this->tcsd } + }; + + auto daemon_it = daemon_map.find( tokens[1] ); + if ( daemon_it == daemon_map.end() ) { + retstring = "unknown daemon: " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + Common::DaemonClient *daemon_ptr = daemon_it->second; + + if ( this->connect_to_daemon( *daemon_ptr ) == ERROR ) { + retstring = "could not connect to " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + message.str(""); message << "sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + + std::string reply; + long send_error = daemon_ptr->send( "isopen", reply ); + + message.str(""); message << "reply from " << tokens[1] << ": \"" << reply << "\""; + logwrite( function, message.str() ); + + retstring = reply; + + if ( send_error != NO_ERROR ) { + message.str(""); message << "ERROR sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + return ERROR; + } + } else { // ---------------------------------------------------- diff --git a/sequencerd/sequencer_server.cpp b/sequencerd/sequencer_server.cpp index c24f3b8e..0fc7aed5 100644 --- a/sequencerd/sequencer_server.cpp +++ b/sequencerd/sequencer_server.cpp @@ -1107,6 +1107,31 @@ namespace Sequencer { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -1623,6 +1648,22 @@ namespace Sequencer { } } else retstring.append( "\n" ); + + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/sequencerd/sequencer_server.h b/sequencerd/sequencer_server.h index 7fff66a5..aacb2ebb 100644 --- a/sequencerd/sequencer_server.h +++ b/sequencerd/sequencer_server.h @@ -114,6 +114,7 @@ namespace Sequencer { int blocking_socket; std::atomic cmd_num; ///< keep a running tally of number of commands received by sequencerd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist NumberPool id_pool; ///< creates a number pool diff --git a/slicecamd/slicecam_server.cpp b/slicecamd/slicecam_server.cpp index 8ec19e6d..d3b35464 100644 --- a/slicecamd/slicecam_server.cpp +++ b/slicecamd/slicecam_server.cpp @@ -396,6 +396,31 @@ namespace Slicecam { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -644,6 +669,21 @@ namespace Slicecam { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/slicecamd/slicecam_server.h b/slicecamd/slicecam_server.h index 565e0a90..5123d220 100644 --- a/slicecamd/slicecam_server.h +++ b/slicecamd/slicecam_server.h @@ -94,6 +94,7 @@ namespace Slicecam { std::string asyncgroup; ///< asynchronous multicast group std::atomic cmd_num; ///< keep a running tally of number of commands received by slicecamd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; ///< create a Config object for reading the configuration file diff --git a/slitd/slit_server.cpp b/slitd/slit_server.cpp index 1c39d8f4..489ab199 100644 --- a/slitd/slit_server.cpp +++ b/slitd/slit_server.cpp @@ -463,6 +463,31 @@ namespace Slit { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out try { @@ -626,6 +651,21 @@ namespace Slit { logwrite( function, message.str() ); } + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/slitd/slit_server.h b/slitd/slit_server.h index 5003023b..939f7124 100644 --- a/slitd/slit_server.h +++ b/slitd/slit_server.h @@ -82,6 +82,7 @@ namespace Slit { int blocking_socket; std::atomic cmd_num; + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands Config config; diff --git a/tcsd/tcs_server.cpp b/tcsd/tcs_server.cpp index 96882f93..11ff5713 100644 --- a/tcsd/tcs_server.cpp +++ b/tcsd/tcs_server.cpp @@ -578,6 +578,31 @@ void doit(TcsIO &tcs_io, const std::string &client_cmd, bool is_slow_command) { buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end()); buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end()); + // Detect and strip an optional correlation ID prefix. Inter-daemon clients + // tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies + // can be rejected by the client. CLI users send no prefix and corr_id is + // left empty; the server then echoes no prefix on reply. + // + std::string corr_id; + { + std::string payload; + Common::extract_correlation_id( buf, corr_id, payload ); + buf = std::move( payload ); + } + + // Replay a cached reply if this command's ID matches a recent one. + // This makes DaemonClient retries idempotent: the underlying handler + // is invoked at most once per correlation ID within the cache TTL. + // + if ( !corr_id.empty() ) { + std::string cached_reply; + if ( this->corr_cache.lookup( corr_id, cached_reply ) ) { + std::string out = CID_PREFIX + corr_id + " " + cached_reply; + if ( sock.Write( out ) < 0 ) connection_open=false; + continue; + } + } + if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out bool polling = false; @@ -818,6 +843,21 @@ void doit(TcsIO &tcs_io, const std::string &client_cmd, bool is_slow_command) { if ( polling ) retstring.append("\n"); + // Cache the bare reply (without prefix) so retries with the same + // correlation ID can be replayed without re-running the handler. + // + if ( !corr_id.empty() ) { + this->corr_cache.insert( corr_id, retstring ); + } + + // Echo the correlation ID back to inter-daemon clients so they can + // verify the reply belongs to the command they just sent. CLI users + // sent no prefix, so corr_id is empty and nothing is prepended. + // + if ( !corr_id.empty() ) { + retstring = CID_PREFIX + corr_id + " " + retstring; + } + if ( sock.Write( retstring ) < 0 ) connection_open=false; } diff --git a/tcsd/tcs_server.h b/tcsd/tcs_server.h index 4c6007b5..aaa3aff9 100644 --- a/tcsd/tcs_server.h +++ b/tcsd/tcs_server.h @@ -85,6 +85,7 @@ namespace TCS { uint16_t blkport; ///< blocking port uint16_t asyncport; ///< asynchronous message port std::atomic cmd_num; ///< keep a running tally of number of commands received by tcsd + Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands std::atomic threads_active; ///< number of blocking threads that exist std::string asyncgroup; ///< asynchronous multicast group From fb8311c1341782ce642fdc814e09992fafd21fff Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 19 May 2026 04:47:59 -0700 Subject: [PATCH 3/4] in Common::DaemonClient::send retry only on stale fd increase timeout for activate command --- common/common.cpp | 4 +++- sequencerd/sequence.cpp | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/common/common.cpp b/common/common.cpp index 7ef84dfb..c818abcd 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -789,12 +789,14 @@ namespace Common { << " on fd " << this->socket.getfd() << " for " << this->name; logwrite( function, message.str() ); this->timedout=true; + break; // daemon is busy, not gone — do not resend } else if ( pollret < 0 && errno ) { // this is probably a real error message.str(""); message << "ERROR polling socket " << this->socket.gethost() << "/" << this->socket.getport() << " on fd " << this->socket.getfd() << " for " << this->name << ": " << strerror(errno); logwrite( function, message.str() ); + break; // real socket error — do not resend } else if ( pollret < 0 && !errno ) { // this is probably a stale fd @@ -803,7 +805,7 @@ namespace Common { logwrite( function, message.str() ); } - // if still here then reconnect, sleep 1s, try again + // stale fd only: reconnect and try again // lock.unlock(); error = this->connect(); diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 72ae158c..be865584 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -935,7 +935,7 @@ namespace Sequencer { // send two commands, one for each if (!activechans.str().empty()) { std::string cmd = CAMERAD_ACTIVATE + activechans.str(); - if (this->camerad.send(cmd, reply)!=NO_ERROR) { + if (this->camerad.send(cmd, reply, 12000)!=NO_ERROR) { logwrite( function, "ERROR sending \""+cmd+"\": "+reply); throw std::runtime_error("camera returned "+reply); } From ccf10f6dc9bcf29e1bad54e5f68067192cf06e55 Mon Sep 17 00:00:00 2001 From: David Hale Date: Tue, 19 May 2026 15:20:53 -0700 Subject: [PATCH 4/4] fix: startup retry loop infinite spin, stale socket, and uninterruptible abort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs in the slicecam/acam startup retry loops and DaemonClient::send: sequence.cpp — slicecam_init and acam_init retry loops: - Missing break in the else (exceeded-max-attempts) branch caused attempt <= maxattempts (3 <= 3) to remain true forever, spinning indefinitely through power-on and connect cycles. - No disconnect() in that branch left the socket open to a busy slicecamd; subsequent connect_to_daemon() calls skipped reconnect because isconnected() was still true, allowing delayed replies from the previous timed-out open command to accumulate in the receive buffer. - No cancel_flag check anywhere in either loop meant abort_process() could not interrupt a stuck startup. common.cpp — DaemonClient::send(): - On CID mismatch, drain the socket receive buffer with Poll(0) before returning ERROR so stale replies cannot bleed into the next send() call, independent of whether the caller reconnects. --- common/common.cpp | 6 ++++++ sequencerd/sequence.cpp | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/common/common.cpp b/common/common.cpp index c818abcd..149d225c 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -890,6 +890,12 @@ namespace Common { << " (expected ID " << cid << "): \"" << reply << "\""; logwrite( function, message.str() ); reply.clear(); + // drain any further queued replies so stale data does not persist into the next send() call + while ( this->socket.Poll(0) > 0 ) { + std::string discard; + ( term_with_string_actual ? socket.Read( discard, term_str_read_actual ) + : socket.Read( discard, term_read ) ); + } } } diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index be865584..7e31f676 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -2875,6 +2875,7 @@ namespace Sequencer { long __error=NO_ERROR; // keep track of the error just for this scope int attempt=1; while (attempt <= maxattempts) { + if ( this->cancel_flag.load() ) { __error=ERROR; break; } try { // launch slicecam_init async task and wait for result std::async(std::launch::async, &Sequence::slicecam_init, this).get(); @@ -2904,6 +2905,8 @@ namespace Sequencer { else { this->broadcast.error( function, "exceeded max attempts starting slicecam" ); __error=ERROR; + this->slicecamd.disconnect(); + break; } } catch (const std::exception &e) { @@ -2928,6 +2931,7 @@ namespace Sequencer { long __error=NO_ERROR; // keep track of the error just for this scope int attempt=1; while (attempt <= maxattempts) { + if ( this->cancel_flag.load() ) { __error=ERROR; break; } try { // launch acam_init async task and wait for result std::async(std::launch::async, &Sequence::acam_init, this).get(); @@ -2957,6 +2961,7 @@ namespace Sequencer { else { this->broadcast.error( function, "exceeded max attempts starting acam" ); __error=ERROR; + this->acamd.disconnect(); } } break;