diff --git a/camerad/astrocam.cpp b/camerad/astrocam.cpp index 43853898..cb6c547e 100644 --- a/camerad/astrocam.cpp +++ b/camerad/astrocam.cpp @@ -1941,7 +1941,7 @@ namespace AstroCam { std::string timestring; timespec timenow; double mjd0, mjd1, mjd; - double airmass0=NAN, airmass1=NAN, airmass=NAN; + double airmass0, airmass1, airmass; if ( !interface.in_readout() ) { logwrite( function, "sending command to stop clocks!" ); @@ -2618,6 +2618,14 @@ namespace AstroCam { interface.status.can_expose.store(true); interface.publish_status(); logwrite( function, "ready for next exposure" ); + // Republish periodically so a subscriber that missed the single-fire + // transition (e.g. ZMQ reconnect gap) recovers without manual intervention. + std::thread( [&interface]() { + for ( int i = 0; i < 5 && interface.status.can_expose.load(); ++i ) { + std::this_thread::sleep_for( std::chrono::seconds(2) ); + if ( interface.status.can_expose.load() ) interface.publish_status(true); + } + }).detach(); } return; diff --git a/common/common.h b/common/common.h index e5e1ca02..bee87cb4 100644 --- a/common/common.h +++ b/common/common.h @@ -38,6 +38,10 @@ const std::string JEOF = "EOF\n"; ///< used to terminate JSON messa const std::string TELEMREQUEST = "sendtelem"; ///< common daemon command used to request telemetry const std::string SNAPSHOT = "snapshot"; ///< common daemon command forces publish of telemetry +const std::string CID_PREFIX = "#cid:"; ///< correlation ID marker for inter-daemon commands +constexpr size_t CID_HEX_LEN = 8; ///< number of hex chars in a correlation ID +constexpr size_t CID_HEADER_LEN = 5 + CID_HEX_LEN + 1; ///< total prefix length: "#cid:" + 8 hex + 1 space + constexpr bool EXT = true; ///< constant for use_extension arg of Common::Header::add_key() constexpr bool PRI = !EXT; ///< constant for use_extension arg of Common::Header::add_key() @@ -66,7 +70,7 @@ namespace Common { private: zmqpp::context &_context; zmqpp::socket _socket; - mutable std::mutex _publish_mtx; + zmqpp::poller _poller; ///< persistent poller — avoids per-call reconstruction Mode _mode; ///< publisher or subscriber? std::string _topic; ///< publisher topic std::vector _topics; ///< list of subscriber topics @@ -78,17 +82,23 @@ namespace Common { PubSub( zmqpp::context &context, Mode mode ) : _context(context), _socket(context, (mode==Mode::PUB ? zmqpp::socket_type::publish : zmqpp::socket_type::subscribe)), - _mode(mode) { } + _mode(mode) + { + _socket.set( zmqpp::socket_option::linger, 0 ); + _socket.set( zmqpp::socket_option::send_high_water_mark, 0 ); + _socket.set( zmqpp::socket_option::receive_high_water_mark, 0 ); + _poller.add( _socket, zmqpp::poller::poll_in ); + } ~PubSub() { _socket.close(); } /** - * @brief publishers bind to a socket endpoint (not for brokers) + * @brief poll for a waiting message + * @param[in] timeout_ms poll timeout in milliseconds (default 100, 0 = non-blocking) + * @return true if at least one message is ready to receive */ - bool has_message() { - zmqpp::poller poller; - poller.add(_socket, zmqpp::poller::poll_in); - return ( poller.poll(100) > 0 ); + bool has_message( int timeout_ms = 100 ) { + return ( _poller.poll(timeout_ms) > 0 ); } /** @@ -180,7 +190,6 @@ namespace Common { if ( _mode != Mode::PUB ) { throw std::runtime_error( "(Common::PubSub::publish) not a publisher" ); } - std::lock_guard lock( _publish_mtx ); zmqpp::message message_zmq; // Publish to either class default _topic or topic specified as // optional arg. @@ -242,6 +251,7 @@ namespace Common { +" with default topic " +iface.publisher_topic ); iface.publisher = std::make_unique( context, Common::PubSub::Mode::PUB ); iface.publisher->connect_to_broker( iface.publisher_address, iface.publisher_topic ); + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); // publisher slow-joiner settle } catch ( const zmqpp::zmq_internal_exception &e ) { logwrite( function, "ERROR initializing message handler: "+std::string(e.what()) ); @@ -292,13 +302,17 @@ namespace Common { BoolState thread_running( iface.is_subscriber_thread_running ); - // listen for published messages and handle them + // listen for published messages and handle them. + // drain all pending messages per poll wake-up to avoid per-message 100ms stalls + // when several daemons publish simultaneously (e.g. after request_snapshot()). // while ( iface.should_subscriber_thread_run ) { try { if ( iface.subscriber->has_message() ) { - auto [topic,payload] = iface.subscriber->receive(); - process_incoming_message(iface, topic, payload); + do { + auto [topic,payload] = iface.subscriber->receive(); + process_incoming_message(iface, topic, payload); + } while ( iface.subscriber->has_message(0) ); } } catch ( const std::exception &e ) { @@ -424,6 +438,26 @@ namespace Common { void collect_telemetry(const std::pair &provider, std::string &retstring); + + /***** Common::extract_correlation_id ***************************************/ + /** + * @brief detect and strip a correlation ID prefix from an inter-daemon message + * @details The wire format for a tagged message is: + * "#cid:HHHHHHHH " + * where HHHHHHHH is exactly CID_HEX_LEN lowercase hex digits and is + * followed by a single space. Untagged messages (e.g. CLI users) + * are left untouched and the function returns false. + * @param[in] input full message as received from the wire + * @param[out] id_out extracted ID on success; unchanged otherwise + * @param[out] payload_out message with the prefix stripped on success; copy of input otherwise + * @return true if a well-formed correlation ID prefix was found, false otherwise + * + */ + bool extract_correlation_id( const std::string &input, + std::string &id_out, + std::string &payload_out ); + + /***** Common::extract_telemetry_value **************************************/ /** * @brief extract a correctly typed value from a JSON message using a specific key @@ -1189,6 +1223,42 @@ namespace Common { /**************** Common::Queue *********************************************/ + /***** Common::CorrIdCache **************************************************/ + /** + * @class CorrIdCache + * @brief bounded TTL cache of recent inter-daemon replies, keyed by correlation ID + * @details Used by daemons to make command retries idempotent: when a tagged + * command is received whose correlation ID matches a cached entry, + * the previously-computed reply is replayed verbatim and the + * underlying handler is NOT re-invoked. Entries expire after + * TTL_SECONDS or when the cache reaches MAX_ENTRIES, whichever + * comes first. The cache is intentionally small and bounded; it + * guards only against same-second retries by DaemonClient::send. + * + * The stored reply is the daemon's bare retstring as it would have + * been written to the wire WITHOUT any correlation ID prefix; the + * caller is responsible for prepending the prefix on replay. + * + */ + class CorrIdCache { + public: + static constexpr size_t MAX_ENTRIES = 64; ///< max cached replies per server + static constexpr int TTL_SECONDS = 60; ///< per-entry lifetime in seconds + + bool lookup( const std::string &id, std::string &reply_out ); + void insert( const std::string &id, const std::string &reply ); + + private: + struct Entry { + std::string reply; + std::chrono::steady_clock::time_point expires; + }; + std::map cache; + std::mutex mtx; + }; + /**************** Common::CorrIdCache ***************************************/ + + /***** Common::DaemonClient *********************************************************/ /** * @class DaemonClient diff --git a/messaged/messaged.cpp b/messaged/messaged.cpp index d3f6eec3..3dbed96b 100644 --- a/messaged/messaged.cpp +++ b/messaged/messaged.cpp @@ -86,6 +86,17 @@ void runbroker() { void* xsub_socket = zmq_socket(context, ZMQ_XSUB); void* xpub_socket = zmq_socket(context, ZMQ_XPUB); + // unlimited queues: prevent silent drops when any daemon is slow to drain. + // LINGER=0: broker exits cleanly without blocking on pending messages. + // + int zero = 0; + zmq_setsockopt( xsub_socket, ZMQ_SNDHWM, &zero, sizeof(zero) ); + zmq_setsockopt( xsub_socket, ZMQ_RCVHWM, &zero, sizeof(zero) ); + zmq_setsockopt( xsub_socket, ZMQ_LINGER, &zero, sizeof(zero) ); + zmq_setsockopt( xpub_socket, ZMQ_SNDHWM, &zero, sizeof(zero) ); + zmq_setsockopt( xpub_socket, ZMQ_RCVHWM, &zero, sizeof(zero) ); + zmq_setsockopt( xpub_socket, ZMQ_LINGER, &zero, sizeof(zero) ); + // bind the sockets // zmq_bind(xsub_socket, "tcp://127.0.0.1:5555"); diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 8c673580..e7f8ad0b 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -98,10 +98,10 @@ namespace Sequencer { // when it re-acquires fineacquire_mtx inside fineacquire_cv.wait(). std::lock_guard lock(this->fineacquire_mtx); if ( has_running ) { - this->is_fineacquire_running.store( running, std::memory_order_relaxed ); + this->is_fineacquire_running.store( running ); } if ( has_locked ) { - this->is_fineacquire_locked.store( locked, std::memory_order_relaxed ); + this->is_fineacquire_locked.store( locked ); } this->fineacquire_cv.notify_all(); } @@ -154,11 +154,11 @@ namespace Sequencer { // Store under the mutex so the writes are visible to any waiter's predicate // when it re-acquires acam_mtx inside acam_cv.wait(). std::lock_guard lock(this->acam_mtx); - this->is_acam_guiding.store( acquired, std::memory_order_relaxed ); + this->is_acam_guiding.store( acquired ); if ( has_mode ) { - this->is_acam_acquiring.store( acquiring, std::memory_order_relaxed ); + this->is_acam_acquiring.store( acquiring ); } - this->acam_pubtime.store( pubtime, std::memory_order_relaxed ); + this->acam_pubtime.store( pubtime ); this->acam_cv.notify_all(); } /***** Sequencer::Sequence::handletopic_acamd ******************************/ @@ -834,9 +834,13 @@ namespace Sequencer { if (!this->is_science_frame_transfer) { logwrite( function, "waiting for readout" ); std::unique_lock lock(this->camerad_mtx); - this->camerad_cv.wait( lock, [this]() { - return this->can_expose.load() || this->cancel_flag.load(); - } ); + while ( !this->camerad_cv.wait_for( lock, std::chrono::seconds(15), + [this]() { return this->can_expose.load() || this->cancel_flag.load(); } ) ) { + logwrite( function, "timeout waiting for readout — requesting snapshot" ); + lock.unlock(); + this->request_snapshot(); + lock.lock(); + } } this->wait_state_manager.clear( Sequencer::SEQ_WAIT_READOUT ); @@ -905,9 +909,13 @@ namespace Sequencer { this->broadcast.notice( function, "waiting for camera to be ready to expose"); - this->camerad_cv.wait( lock, [this]() { - return( this->can_expose.load() || this->cancel_flag.load() ); - } ); + while ( !this->camerad_cv.wait_for( lock, std::chrono::seconds(30), + [this]() { return this->can_expose.load() || this->cancel_flag.load(); } ) ) { + logwrite( function, "timeout waiting for camera ready — requesting snapshot" ); + lock.unlock(); + this->request_snapshot(); + lock.lock(); + } if (this->cancel_flag.load()) { logwrite(function, "sequence cancelled"); @@ -935,7 +943,7 @@ namespace Sequencer { // send two commands, one for each if (!activechans.str().empty()) { std::string cmd = CAMERAD_ACTIVATE + activechans.str(); - if (this->camerad.send(cmd, reply)!=NO_ERROR) { + if (this->camerad.send(cmd, reply, 12000)!=NO_ERROR) { logwrite( function, "ERROR sending \""+cmd+"\": "+reply); throw std::runtime_error("camera returned "+reply); } @@ -1964,9 +1972,16 @@ namespace Sequencer { } } - // Ask if all devices use frame transfer + // Ask if all devices use frame transfer. The reply is expected to be a + // "yes"/"no" token followed by " DONE". An empty or non-confirming reply + // would silently leave is_science_frame_transfer in the wrong state, which + // controls whether the readout wait is entered. // - this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ); + if ( this->camerad.send( CAMERAD_FRAMETRANSFER+" all", reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR querying frame transfer state: no confirmation (reply=\""+reply+"\")" ); + throw std::runtime_error("querying camera frame transfer state"); + } this->is_science_frame_transfer = ( reply.find("yes") != std::string::npos ); this->thread_error_manager.clear( THR_CAMERA_INIT ); // success @@ -2015,10 +2030,15 @@ namespace Sequencer { throw std::runtime_error("no connection to camera"); } - // send all of the epilogue commands + // send all of the epilogue commands. Log but do not abort on failure: + // shutdown must continue regardless so power-off can complete. // for ( const auto &cmd : this->camera_epilogue ) { - this->camerad.command( cmd ); + std::string reply; + if ( this->camerad.command( cmd, reply ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + logwrite( function, "ERROR sending epilogue command \""+cmd+"\" (reply=\""+reply+"\")" ); + } } // disconnect me from camerad, irrespective of any previous error @@ -2687,12 +2707,15 @@ namespace Sequencer { logwrite( function, "[DEBUG] sending expose command" ); - // Send the EXPOSE command to camera daemon on the non-blocking port and don't wait for reply + // Send the EXPOSE command to camera daemon and wait for the reply. + // Also verify the reply contains "DONE": command_timeout returns NO_ERROR + // whenever the reply does not contain "ERROR", including when the reply is + // empty because the socket was lost and no response was ever received. message.str(""); message << CAMERAD_EXPOSE << " " << this->target.nexp; -// if ( this->camerad.async( message.str() ) != NO_ERROR ) { -// if ( this->camerad.send( message.str(), reply ) != NO_ERROR ) { - if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR ) { - this->broadcast.error( function, "sending camera "+message.str() ); + if ( this->camerad.command_timeout( message.str(), reply, 30000 ) != NO_ERROR + || reply.find("DONE") == std::string::npos ) { + message.str(""); message << "sending camera expose: no confirmation (reply=\"" << reply << "\")"; + this->broadcast.error( function, message.str() ); this->thread_error_manager.set( THR_TRIGGER_EXPOSURE ); // tell the world this thread had an error this->target.update_state( Sequencer::TARGET_PENDING ); // return the target state to pending this->wait_state_manager.clear( Sequencer::SEQ_WAIT_EXPOSE ); // clear EXPOSE bit @@ -3728,7 +3751,7 @@ namespace Sequencer { cmd << " " << reqstatestr; logwrite( function, "switching plug "+plug+" "+reqstatestr ); error = this->powerd.send( cmd.str(), reply ); - if ( error != NO_ERROR || reply.find(" DONE") != std::string::npos ) { + if ( error != NO_ERROR || reply.find("DONE") == std::string::npos ) { logwrite( function, "ERROR switching plug: "+plug+" "+reqstatestr ); continue; } @@ -4024,6 +4047,7 @@ namespace Sequencer { retstring.append( " isready [ ? ]\n" ); retstring.append( " moveto [ ? | ]\n" ); retstring.append( " notify [ ? ]\n" ); + retstring.append( " ping \n" ); retstring.append( " pause [ ? ]\n" ); retstring.append( " pending [ ? ]\n" ); retstring.append( " targetinfo [ ? ]\n" ); @@ -4897,6 +4921,63 @@ namespace Sequencer { logwrite( function, message.str() ); } } + else + + // --------------------------------------------------------- + // ping -- exercise inter-daemon communication round-trip + // --------------------------------------------------------- + // + if ( testname == "ping" ) { + if ( tokens.size() < 2 ) { + retstring = "usage: test ping "; + logwrite( function, "ERROR no daemon name provided" ); + return ERROR; + } + + const std::map daemon_map = { + { "acamd", &this->acamd }, + { "calibd", &this->calibd }, + { "camerad", &this->camerad }, + { "flexured", &this->flexured }, + { "focusd", &this->focusd }, + { "powerd", &this->powerd }, + { "slicecamd", &this->slicecamd }, + { "slitd", &this->slitd }, + { "tcsd", &this->tcsd } + }; + + auto daemon_it = daemon_map.find( tokens[1] ); + if ( daemon_it == daemon_map.end() ) { + retstring = "unknown daemon: " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + Common::DaemonClient *daemon_ptr = daemon_it->second; + + if ( this->connect_to_daemon( *daemon_ptr ) == ERROR ) { + retstring = "could not connect to " + tokens[1]; + logwrite( function, "ERROR "+retstring ); + return ERROR; + } + + message.str(""); message << "sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + + std::string reply; + long send_error = daemon_ptr->send( "isopen", reply ); + + message.str(""); message << "reply from " << tokens[1] << ": \"" << reply << "\""; + logwrite( function, message.str() ); + + retstring = reply; + + if ( send_error != NO_ERROR ) { + message.str(""); message << "ERROR sending \"isopen\" to " << tokens[1]; + logwrite( function, message.str() ); + return ERROR; + } + } else { // ----------------------------------------------------