From 10b552b3e643589d7275dd48a30030a5c5872708 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 00:02:04 -0700 Subject: [PATCH 01/12] adds the missing subscribers to camerad for header data collection --- camerad/astrocam.cpp | 282 ++++++++++++++++++++----------------------- camerad/astrocam.h | 103 +++++----------- camerad/camerad.cpp | 2 +- 3 files changed, 160 insertions(+), 227 deletions(-) diff --git a/camerad/astrocam.cpp b/camerad/astrocam.cpp index cb6c547e..2899c355 100644 --- a/camerad/astrocam.cpp +++ b/camerad/astrocam.cpp @@ -67,6 +67,68 @@ namespace AstroCam { /***** AstroCam::Interface::handletopic_snapshot ****************************/ + // Each subscriber handler caches the latest full JSON snapshot from its + // provider, keyed by topic. The JSON->FITS-keyword conversion is deferred + // to exposure lock-in (see do_expose / add_cached_telem). + // + void Interface::handletopic_calib( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::CALIBD] = jmessage; + } + void Interface::handletopic_flexure( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::FLEXURED] = jmessage; + } + void Interface::handletopic_focus( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::FOCUSD] = jmessage; + } + void Interface::handletopic_power( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::POWERD] = jmessage; + } + void Interface::handletopic_slit( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::SLITD] = jmessage; + } + void Interface::handletopic_targetinfo( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::TARGETINFO] = jmessage; + } + void Interface::handletopic_tcs( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::TCSD] = jmessage; + } + void Interface::handletopic_thermal( const nlohmann::json &jmessage ) { + std::unique_lock lock(live_telemetry_mtx); + this->live_telemetry[Topic::THERMALD] = jmessage; + } + + + /***** AstroCam::Interface::get_live_airmass ********************************/ + /** + * @brief return the latest airmass from cached tcsd pub-sub telemetry + * @details AIRMASS is averaged over the exposure in dothread_shutter and + * written to systemkeys, so it is intentionally not part of the + * FITS telemkeys. Returns NAN when no valid (on-sky) value is + * available. + * @return airmass as double, or NAN if unavailable + * + */ + double Interface::get_live_airmass() { + std::unique_lock lock(live_telemetry_mtx); + auto it = this->live_telemetry.find( Topic::TCSD ); + if ( it != this->live_telemetry.end() ) { + const auto &jmsg = it->second; + if ( jmsg.contains( Key::Tcsd::AIRMASS ) && jmsg.at( Key::Tcsd::AIRMASS ).is_number() ) { + return jmsg.at( Key::Tcsd::AIRMASS ).get(); + } + } + return NAN; + } + /***** AstroCam::Interface::get_live_airmass ********************************/ + + long NewAstroCam::new_expose( std::string nseq_in ) { logwrite( "NewAstroCam::new_expose", nseq_in ); return( NO_ERROR ); @@ -1948,9 +2010,9 @@ namespace AstroCam { interface.do_native( "SPC" ); } - // get the airmass now + // get the latest airmass collected from tcsd telemetry now // - interface.collect_telemetry_key( "tcsd", "AIRMASS", airmass0 ); + airmass0 = interface.get_live_airmass(); // If configured, send a command to the ARC controller to open // the shutter. This is not connected to the shutter but can be @@ -2010,9 +2072,9 @@ namespace AstroCam { interface.broadcast.notice( function, "external shutter closed at "+timestring ); } - // get the airmass again + // get the latest airmass again // - interface.collect_telemetry_key( "tcsd", "AIRMASS", airmass1 ); + airmass1 = interface.get_live_airmass(); // average airmass // @@ -2633,6 +2695,34 @@ namespace AstroCam { /***** AstroCam::Interface::dothread_monitor_exposure_pending ***************/ + /***** AstroCam::add_cached_telem ******************************************/ + /** + * @brief add one provider's cached JSON telemetry into a FITS Header + * @details Primary tables route to the primary header; Extension tables + * carry a channel and route to the extension (elmo) map. This is + * the same keyinfo routing the subscriber handlers used to do, + * now applied once at lock-in from the cached JSON snapshot. + * @param[in,out] telem Header to populate + * @param[in] jmsg cached JSON snapshot for one provider + * @param[in] keys keyinfo table (Primary[] or Extension[]) + * + */ + template + static void add_cached_telem( Common::Header &telem, + const nlohmann::json &jmsg, + const KeyT (&keys)[N] ) { + for ( const auto &k : keys ) { + if constexpr ( std::is_same_v ) { + telem.add_json_key( jmsg, k.jkey, k.keyword, k.comment, k.type, EXT, k.chan ); + } + else { + telem.add_json_key( jmsg, k.jkey, k.jkey, k.comment, k.type, PRI ); + } + } + } + /***** AstroCam::add_cached_telem ******************************************/ + + /***** AstroCam::Interface::do_expose ***************************************/ /** * @brief initiate an exposure @@ -2705,9 +2795,32 @@ namespace AstroCam { logwrite( function, message.str() ); #endif - // Collect telemetry, which will be stored in camera_info.telemkeys + // telemetry is locked-in here -- + // build the FITS telemetry header from the latest JSON snapshots cached by + // the subscriber handlers. Built fresh so a provider that has gone silent + // does not leave stale keys behind. // - this->collect_telemetry(); + { + std::unique_lock lock(live_telemetry_mtx); + + Common::Header telem; + + auto add = [&]( const std::string &topic, const auto &keytable ) { + auto it = this->live_telemetry.find( topic ); + if ( it != this->live_telemetry.end() ) add_cached_telem( telem, it->second, keytable ); + }; + + add( Topic::CALIBD, FitsHeaderKeys::CalibInfoKeys ); // primary + add( Topic::POWERD, FitsHeaderKeys::PowerInfoKeys ); // primary + add( Topic::SLITD, FitsHeaderKeys::SlitInfoKeys ); // primary + add( Topic::TARGETINFO, FitsHeaderKeys::TargetInfoKeys ); // primary + add( Topic::TCSD, FitsHeaderKeys::TcsInfoKeys ); // primary + add( Topic::FLEXURED, FitsHeaderKeys::FlexureInfoKeys ); // extension + add( Topic::FOCUSD, FitsHeaderKeys::FocusInfoKeys ); // extension + add( Topic::THERMALD, FitsHeaderKeys::ThermalInfoKeys ); // extension + + this->camera_info.telemkeys = telem; + } // Make a copy of this->camera_info for this particular exposure buffer number. // This expinfo will be used for this particular exposure. @@ -2810,11 +2923,9 @@ namespace AstroCam { timespec timenow = Time::getTimeNow(); // get the time NOW std::string timestring = timestamp_from( timenow ); // format that time as YYYY-MM-DDTHH:MM:SS.sss double mjd = mjd_from( timenow ); // modified Julian date of start - double airmass=NAN; - - // get the airmass from tcsd telemetry now + // get the latest airmass collected from tcsd telemetry now // - this->collect_telemetry_key( "tcsd", "AIRMASS", airmass ); + double airmass = this->get_live_airmass(); this->fitsinfo[this_expbuf]->systemkeys.primary().addkey( "EXPSTART", timestring, "exposure start time" ); this->fitsinfo[this_expbuf]->systemkeys.primary().addkey( "MJD0", mjd, "exposure start time (modified Julian Date)" ); @@ -3093,27 +3204,6 @@ namespace AstroCam { /***** AstroCam::Interface::make_telemetry_message **************************/ - /***** AstroCam::Interface::collect_telemetry *******************************/ - /** - * @brief send the TELEMREQUEST command to each configured daemon to get telemetry - * @details This overloaded version accepts a name, for the case where - * telemetry is needed from one provider only (e.g. TCS) - * @param[in] name name of provider from TELEM_PROVIDER config key - * @param[out] retstring serialized string of json telemetry message - * - */ - void Interface::collect_telemetry(const std::string name, std::string &retstring) { - Common::DaemonClient jclient("", "\n", JEOF ); - auto it = this->telemetry_providers.find(name); - if ( it != this->telemetry_providers.end() ) { - jclient.set_name(it->first); - jclient.set_port(it->second); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - } - return; - } /***** AstroCam::Interface::collect_telemetry *******************************/ /** * @brief send the TELEMREQUEST command to each configured daemon to get telemetry @@ -3172,35 +3262,6 @@ namespace AstroCam { return ERROR; } - /** - * @struct PrimaryInfo - * @brief holds info for extracting primary header keys from json message - * @details The value in jmessage with key jkey will be added to the primary - * FITS header, using comment and optional keyword. If keyword is - * not specified then the header keyword uses jkey. - */ - struct PrimaryInfo { - std::string jkey; // key to extract from jmessage - std::string keyword; // optional FITS keyword (uses jkey if not specified) - std::string comment; // FITS key comment - std::string type=""; // optional keyword datatype - }; - - /** - * @struct ExtensionInfo - * @brief holds info for extracting extension header keys from json message - * @details The value in jmessage with key jkey will be added to the FITS - * header specified by channel chan, using comment and optional keyword. - * If keyword is not specified then the header keyword uses jkey. - */ - struct ExtensionInfo { - std::string chan; // chan name identifies which extension - std::string jkey; // key to extract from jmessage - std::string keyword; // optional FITS keyword (uses jkey if not specified) - std::string comment; // FITS key comment - std::string type=""; // optional keyword datatype - }; - auto &telemkeys = this->camera_info.telemkeys; // use to select whether to write to extension or primary @@ -3238,17 +3299,7 @@ namespace AstroCam { // telemetry from calibd goes in the primary header // if ( messagetype == "calibinfo" ) { - const PrimaryInfo keyarray[] = { - {"MODFEAR", "", "FeAr lamp modulator pow dut per"}, - {"MODTHAR", "", "ThAr lamp modulator pow dut per"}, - {"MODBLCON", "", "Blue continuum modulator pow dut per"}, - {"MODBLBYP", "", "Blue bypass modulator pow dut per"}, - {"MODRDCON", "", "Red continuum modulator pow dut per"}, - {"MODRDBYP", "", "Red bypass modulator pow dut per"}, - {"CALCOVER", "", "calib cover state"}, - {"CALDOOR", "", "calib door state"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::CalibInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); } } @@ -3257,21 +3308,7 @@ namespace AstroCam { // telemetry from flexured goes in the extension header corresponding to the channel // if ( messagetype == "flexureinfo" ) { - const ExtensionInfo keyarray[] = { - {"I", "FLXSPE_I", "FLXSPE", "I flexure spectral axis 2 (X) in um"}, - {"I", "FLXSPA_I", "FLXSPA", "I flexure spatial axis 3 (Y) in um"}, - {"I", "FLXPIS_I", "FLXPIS", "I flexure piston axis 1 (Z) in um"}, - {"R", "FLXSPE_R", "FLXSPE", "R flexure spectral axis 2 (X) in um"}, - {"R", "FLXSPA_R", "FLXSPA", "R flexure spatial axis 3 (Y) in um"}, - {"R", "FLXPIS_R", "FLXPIS", "R flexure piston axis 1 (Z) in um"}, - {"G", "FLXSPE_G", "FLXSPE", "G flexure spectral axis 2 (X) in um"}, - {"G", "FLXSPA_G", "FLXSPA", "G flexure spatial axis 3 (Y) in um"}, - {"G", "FLXPIS_G", "FLXPIS", "G flexure piston axis 1 (Z) in um"}, - {"U", "FLXSPE_U", "FLXSPE", "U flexure spectral axis 2 (X) in um"}, - {"U", "FLXSPA_U", "FLXSPA", "U flexure spatial axis 3 (Y) in um"}, - {"U", "FLXPIS_U", "FLXPIS", "U flexure piston axis 1 (Z) in um"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::FlexureInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); } } @@ -3280,13 +3317,7 @@ namespace AstroCam { // telemetry from focusd goes in the extension header corresponding to the channel // if ( messagetype == "focusinfo" ) { - const ExtensionInfo keyarray[] = { - {"I", "FOCUSI", "FOCUS", "science camera I focus position in mm" }, - {"R", "FOCUSR", "FOCUS", "science camera R focus position in mm" }, - {"G", "FOCUSG", "FOCUS", "science camera G focus position in mm" }, - {"U", "FOCUSU", "FOCUS", "science camera U focus position in mm" } - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::FocusInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); } } @@ -3295,15 +3326,7 @@ namespace AstroCam { // telemetry from powerd goes in the primary header // if ( messagetype == "powerinfo" ) { - const PrimaryInfo keyarray[] = { - {"LAMPTHAR", "", "is ThAr lamp on"}, - {"LAMPFEAR", "", "is FeAr lamp on"}, - {"LAMPBLUC", "", "is blue Xe continuum lamp on"}, - {"LAMPREDC", "", "is red continuum lamp on"}, - {"LAMPXE", "", "is Xe lamp on"}, - {"LAMPINCA", "", "is Incandescent lamp on"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::PowerInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); } } @@ -3312,14 +3335,7 @@ namespace AstroCam { // telemetry from calibd goes in the primary header // if ( messagetype == "slitinfo" ) { - const PrimaryInfo keyarray[] = { - {"SLITW", "", "slit width in arcsec"}, - {"SLITO", "", "slit offset in arcsec"}, - {"SLITPOSA", "", "slit actuator A position in mm"}, - {"SLITPOSA", "", "slit actuator A position in mm"}, - {"SLITPOSB", "", "slit actuator B position in mm"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::SlitInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); } } @@ -3328,19 +3344,7 @@ namespace AstroCam { // targetinfo telemetry comes from sequencerd and goes in the primary header // if ( messagetype == "targetinfo" ) { - const PrimaryInfo keyarray[] = { - {"OBS_ID", "", "Observation ID", "INT"}, - {"NAME", "", "target name", "STRING"}, -// {"BINSPECT", "", "binning in spectral direction"}, -// {"BINSPAT", "", "binning in spatial direction"}, - {"SLITA", "", "slit angle in deg", "FLOAT"}, - {"POINTMDE", "", "pointing mode", "STRING"}, - {"RA", "", "requested Right Ascension in J2000", "STRING"}, - {"DECL", "", "requested Declination in J2000", "STRING"} - }; - for ( const auto &keyinfo : keyarray ) { - message.str(""); message << "[DEBUG] targetinfo key " << keyinfo.jkey << "=" << jmessage[keyinfo.jkey]; - logwrite(function,message.str()); + for ( const auto &keyinfo : FitsHeaderKeys::TargetInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); } } @@ -3350,20 +3354,7 @@ namespace AstroCam { // AIRMASS is intentionally left out since it is handled differently // if ( messagetype == "tcsinfo" ) { - const PrimaryInfo keyarray[] = { - {"CASANGLE", "", "TCS reported Cassegrain angle in deg", "FLOAT"}, - {"HA", "", "hour angle"}, - {"RAOFFSET", "", "offset Right Ascension"}, - {"DECLOFFS", "", "offset Declination"}, - {"TELRA", "", "TCS reported Right Ascension"}, - {"TELDEC", "", "TCS reported Declination"}, - {"AZ", "", "TCS reported azimuth"}, - {"ZENANGLE", "", "TCS reported Zenith angle", "FLOAT"}, - {"DOMEAZ", "", "TCS reported dome azimuth", "FLOAT"}, - {"DOMESHUT", "", "dome shutters"}, - {"TELFOCUS", "", "TCS reported telescope focus position in mm", "FLOAT"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::TcsInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); } } @@ -3372,22 +3363,7 @@ namespace AstroCam { // telemetry from thermald // if ( messagetype == "thermalinfo" ) { - const ExtensionInfo keyarray[] = { - {"I", "TCCD_I", "CCDTEMP", "I CCD temperature in Kelvin", "FLOAT"}, - {"R", "TCCD_R", "CCDTEMP", "R CCD temperature in Kelvin", "FLOAT"}, - {"G", "TCCD_G", "CCDTEMP", "G CCD temperature in Kelvin", "FLOAT"}, - {"U", "TCCD_U", "CCDTEMP", "U CCD temperature in Kelvin", "FLOAT"}, - - {"I", "TCOLL_I", "COLTEMP", "I collimator temp in deg C", "FLOAT"}, - {"R", "TCOLL_R", "COLTEMP", "R collimator temp in deg C", "FLOAT"}, - {"G", "TCOLL_G", "COLTEMP", "G collimator temp in deg C", "FLOAT"}, - - {"I", "TFOCUS_I", "FOCTEMP", "I focus temp in deg C", "FLOAT"}, - {"R", "TFOCUS_R", "FOCTEMP", "R focus temp in deg C", "FLOAT"}, - {"G", "TFOCUS_G", "FOCTEMP", "G focus temp in deg C", "FLOAT"}, - {"U", "TFOCUS_U", "FOCTEMP", "U focus temp in deg C", "FLOAT"} - }; - for ( const auto &keyinfo : keyarray ) { + for ( const auto &keyinfo : FitsHeaderKeys::ThermalInfoKeys ) { telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); } } diff --git a/camerad/astrocam.h b/camerad/astrocam.h index 116072ba..0ee13a61 100644 --- a/camerad/astrocam.h +++ b/camerad/astrocam.h @@ -23,6 +23,7 @@ #include #include +#include "fits_header_defs.h" #include "utilities.h" #include "common.h" #include "camera.h" @@ -637,7 +638,23 @@ namespace AstroCam { useframes(true) { topic_handlers = { { Topic::SNAPSHOT, std::function( - [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) } + [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) }, + { Topic::CALIBD, std::function( + [this](const nlohmann::json &msg) { handletopic_calib(msg); } ) }, + { Topic::FLEXURED, std::function( + [this](const nlohmann::json &msg) { handletopic_flexure(msg); } ) }, + { Topic::FOCUSD, std::function( + [this](const nlohmann::json &msg) { handletopic_focus(msg); } ) }, + { Topic::POWERD, std::function( + [this](const nlohmann::json &msg) { handletopic_power(msg); } ) }, + { Topic::SLITD, std::function( + [this](const nlohmann::json &msg) { handletopic_slit(msg); } ) }, + { Topic::TARGETINFO, std::function( + [this](const nlohmann::json &msg) { handletopic_targetinfo(msg); } ) }, + { Topic::TCSD, std::function( + [this](const nlohmann::json &msg) { handletopic_tcs(msg); } ) }, + { Topic::THERMALD, std::function( + [this](const nlohmann::json &msg) { handletopic_thermal(msg); } ) } }; this->pFits.resize( NUM_EXPBUF ); // pre-allocate FITS_file object pointers for each exposure buffer @@ -670,6 +687,9 @@ namespace AstroCam { Camera::Camera camera; /// instantiate a Camera object Camera::Information camera_info; /// this is the main camera_info object + std::map live_telemetry; ///< latest JSON snapshot per provider, keyed by Topic + std::mutex live_telemetry_mtx; + std::unique_ptr publisher; ///< publisher object std::string publisher_address; ///< publish socket endpoint std::string publisher_topic; ///< my default topic for publishing @@ -693,6 +713,14 @@ namespace AstroCam { void publish_status(bool force=false); void request_snapshot(); void handletopic_snapshot(const nlohmann::json &jmessage_in); + void handletopic_calib(const nlohmann::json &jmessage_in); + void handletopic_flexure(const nlohmann::json &jmessage_in); + void handletopic_focus(const nlohmann::json &jmessage_in); + void handletopic_power(const nlohmann::json &jmessage_in); + void handletopic_slit(const nlohmann::json &jmessage_in); + void handletopic_targetinfo(const nlohmann::json &jmessage_in); + void handletopic_tcs(const nlohmann::json &jmessage_in); + void handletopic_thermal(const nlohmann::json &jmessage_in); Common::Broadcaster broadcast { this->publisher, Daemon::CAMERAD }; @@ -1212,7 +1240,7 @@ std::vector> fitsinfo; long do_expose(int nexp_in); void make_telemetry_message( std::string &retstring ); void collect_telemetry(); - void collect_telemetry(std::string name, std::string &retstring); + double get_live_airmass(); ///< latest airmass from cached tcsd telemetry, or NAN long native(std::string cmdstr); long native(std::string cmdstr, std::string &retstring); @@ -1254,77 +1282,6 @@ std::vector> fitsinfo; // int get_image_rows() { return this->rows; }; // REMOVE // int get_image_cols() { return this->cols; }; // REMOVE - using json = nlohmann::json; - template - void collect_telemetry_key( const std::string &name, const std::string &key, T &value ) { - const std::string function="AstroCam::Interface::collect_telemetry_key"; - std::stringstream message; - - std::string retstring; - - // collect the telemetry from this one named provider - // - collect_telemetry(name, retstring); - - // extract the correct typed value for the requested key from that - // telemetry message - // - try { - // get a JSON message from the serialized return string - // - nlohmann::json jmessage = nlohmann::json::parse( retstring ); - - // extract the value from the JSON message using jkey as the key - // - auto jvalue = jmessage.at( key ); - - if ( jvalue == nullptr ) return; - - if constexpr ( std::is_same::value ) { - if ( jvalue.type() == json::value_t::boolean ) { - value = jvalue.template get(); - } - } - else - if constexpr ( std::is_same::value ) { - if ( jvalue.type() == json::value_t::number_integer ) { - value = jvalue.template get(); - } - } - else - if constexpr ( std::is_same::value ) { - if ( jvalue.type() == json::value_t::number_unsigned ) { - value = jvalue.template get(); - } - } - else - if constexpr ( std::is_same::value || std::is_same::value ) { - if ( jvalue.type() == json::value_t::number_float ) { - value = jvalue.template get(); - } - } - else - if constexpr ( std::is_same::value ) { - if ( jvalue.type() == json::value_t::string ) { - value = jvalue.template get(); - } - } - else { - message << "ERROR unknown type for key " << key << " from provider " << name; - logwrite( function, message.str() ); - return; - } - } - catch( const json::exception &e ) { - message << "JSON exception parsing value for key " << key << " from provider " << name << ": " << e.what(); - logwrite( function, message.str() ); - } - catch( const std::exception &e ) { - message << "ERROR exception parsing value for key " << key << " from provider " << name << ": " << e.what(); - logwrite( function, message.str() ); - } - return; - } }; /***** AstroCam::Interface **************************************************/ diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index a026a5bc..3c115610 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -184,8 +184,8 @@ int main(int argc, char **argv) { Topic::FLEXURED, Topic::FOCUSD, Topic::POWERD, - Topic::TARGETINFO, Topic::SLITD, + Topic::TARGETINFO, Topic::TCSD, Topic::THERMALD } ) == ERROR ) { logwrite(function, "ERROR initializing publisher-subscriber handler"); From 32f0bb92e9dfaec7ba58cd98e67938ed0bbb5cf8 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 00:11:45 -0700 Subject: [PATCH 02/12] fix: update missed home state in slitd --- slitd/slit_interface.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/slitd/slit_interface.cpp b/slitd/slit_interface.cpp index 96edc275..42fa3d6f 100644 --- a/slitd/slit_interface.cpp +++ b/slitd/slit_interface.cpp @@ -218,9 +218,23 @@ namespace Slit { return ERROR; } - // All the work is done by the PI motor interface class + // The work is done by the PI motor interface class, which blocks until + // homing is complete. + // + long error = this->motorinterface.home( arg, retstring ); + + // Homing changes the home state and the actuator positions. Refresh the + // home state (as open() does) and read back the positions with get(), + // which refreshes width/offset/posA/posB and publishes the new status. // - return this->motorinterface.home( arg, retstring ); + if ( error == NO_ERROR ) { + std::string homestate; + this->is_home( "", homestate ); + status.ishome = ( homestate=="true" ? true : false ); + error = this->get( retstring ); + } + + return error; } /***** Slit::Interface::home ************************************************/ From b0834ab9da864dcf4a072db21c99c5318e6df64f Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 00:45:06 -0700 Subject: [PATCH 03/12] fix: lamp modulator naming was off by 1 --- sequencerd/sequencer_interface.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sequencerd/sequencer_interface.cpp b/sequencerd/sequencer_interface.cpp index 810feb63..1821616b 100644 --- a/sequencerd/sequencer_interface.cpp +++ b/sequencerd/sequencer_interface.cpp @@ -1000,9 +1000,9 @@ namespace Sequencer { info.domelamp[i] = on_off(tokens.at(11+i)); } - // tokens 13-19 + // tokens 13-19 -- modulator numbers are {1:6} for (size_t i=0; i<6; i++) { - info.lampmod[i] = on_off(tokens.at(13+i)); + info.lampmod[i+1] = on_off(tokens.at(13+i)); } } catch (const std::exception &e) { From fe320762c58b9b1ce104d0aa60f5dc000f57340b Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 01:09:53 -0700 Subject: [PATCH 04/12] powerd publishes status --- powerd/power_interface.cpp | 72 ++++++++++++++++++-------------------- powerd/power_interface.h | 20 ++++++++--- powerd/power_server.cpp | 20 +---------- powerd/powerd.cpp | 6 ++-- 4 files changed, 55 insertions(+), 63 deletions(-) diff --git a/powerd/power_interface.cpp b/powerd/power_interface.cpp index bfa2dfdb..316b7bb1 100644 --- a/powerd/power_interface.cpp +++ b/powerd/power_interface.cpp @@ -286,15 +286,15 @@ namespace Power { /***** Power::Interface::list ***********************************************/ - /***** Power::Interface::status *********************************************/ + /***** Power::Interface::get_status *****************************************/ /** * @brief list status of all plug devices * @param[out] retstring reference to string to contain the status of plug devices * @return ERROR | NO_ERROR | HELP * */ - long Interface::status( std::string args, std::string &retstring ) { - std::string function = "Power::Interface::status"; + long Interface::get_status( std::string args, std::string &retstring ) { + std::string function = "Power::Interface::get_status"; std::stringstream message, plugid; // Help @@ -344,7 +344,7 @@ namespace Power { message << plugid.str() << " " << status_string << " " << this->plugname[ plugid.str() ] << "\n"; - this->telemetry_map[this->plugname[plugid.str()]] = status; + this->status.plugstate[this->plugname[plugid.str()]] = status; } } message << this->missing; // notify of missing hardware, if any @@ -357,9 +357,13 @@ namespace Power { retstring = message.str(); + // status has been refreshed from hardware; publish if it changed + // + this->publish_status(); + return NO_ERROR; } - /***** Power::Interface::status *********************************************/ + /***** Power::Interface::get_status *****************************************/ /***** Power::Interface::command ********************************************/ @@ -580,6 +584,15 @@ namespace Power { return( ERROR ); break; } + + // After a successful set (ON/OFF/BOOT), re-read state which refreshes the + // status struct and publishes the change. Reads (command==-1) don't change state. + // + if ( command >= 0 && error == NO_ERROR ) { + std::string dontcare; + this->get_status( "", dontcare ); + } + return( error ); } catch ( const std::exception &e ) { @@ -591,62 +604,47 @@ namespace Power { /***** Power::Interface::command ********************************************/ - /***** Power::Interface::publish_snapshot ***********************************/ + /***** Power::Interface::publish_status *************************************/ /** - * @brief assembles a telemetry message - * @details This creates a JSON message for my telemetry info, then serializes - * it into a std::string ready to be sent over a socket. - * @param[out] retstring string containing the serialization of the JSON message + * @brief publish the power state, but only if it changed (or forced) + * @param[in] force optional (default=false) publish irrespective of change * * powerd telemetry is reported as true|false if the plug is on * */ - void Interface::publish_snapshot() { - std::string dontcare; - this->publish_snapshot(dontcare); - } - void Interface::publish_snapshot( std::string &retstring ) { + void Interface::publish_status( bool force ) { - // assemble the telemetry into a json message - // Set a messagetype keyword to indicate what kind of message this is. + // unless forced, only publish if the power state changed // - nlohmann::json jmessage_out; - jmessage_out["source"] = "powerd"; // source of this telemetry + if ( !force && this->status == this->last_published_status ) return; - // get power status - // - this->status("", retstring); + nlohmann::json jmessage_out; + jmessage_out[Key::SOURCE] = Topic::POWERD; - // fill the jmessage_out with boolean values for the key/val pairs just retrieved - // to represent the powered state of the plug ("on" is true) + // a plug is reported true when it is on // - for ( const auto &[key,val] : this->telemetry_map ) { - jmessage_out[key]=(val==1?true:false); + for ( const auto &[key,val] : this->status.plugstate ) { + jmessage_out[key] = ( val==1 ? true : false ); } - // for backwards compatibility - jmessage_out["messagetype"]="powerinfo"; - retstring = jmessage_out.dump(); // serialize the json message into retstring - retstring.append(JEOF); // append the JSON message terminator + this->last_published_status = this->status; - // publish the jmessage - // try { this->publisher->publish( jmessage_out ); } catch( const std::exception &e ) { - logwrite( "Power::Interface::publish_snapshot", + logwrite( "Power::Interface::publish_status", "ERROR publishing message: "+std::string(e.what()) ); } } - /***** Power::Interface::publish_snapshot ***********************************/ + /***** Power::Interface::publish_status *************************************/ void Interface::handletopic_snapshot( const nlohmann::json &jmessage ) { - // If my name is in the jmessage then publish my snapshot + // If my topic is in the jmessage then publish my status // - if ( jmessage.contains( Power::DAEMON_NAME ) ) { - this->publish_snapshot(); + if ( jmessage.contains( Topic::POWERD ) ) { + this->publish_status(); } } } diff --git a/powerd/power_interface.h b/powerd/power_interface.h index b6a21874..e8dff43c 100644 --- a/powerd/power_interface.h +++ b/powerd/power_interface.h @@ -184,7 +184,18 @@ namespace Power { zmqpp::context context; bool class_initialized; size_t numdev; ///< number of NPS devices, or "units" - std::map telemetry_map; ///< map of plug status 0|1 indexed by plug nam + + /** + * @struct Status + * @brief published power state: plug power (0=off,1=on,-1=err) indexed by plug name + */ + struct Status { + std::map plugstate; + bool operator==(const Status &o) const { return plugstate == o.plugstate; } + bool operator!=(const Status &o) const { return !(*this == o); } + }; + Status status; ///< current power state + Status last_published_status; ///< last published power state public: Interface() @@ -195,7 +206,7 @@ namespace Power { should_subscriber_thread_run(false) { topic_handlers = { - { "_snapshot", std::function( + { Topic::SNAPSHOT, std::function( [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) } }; } @@ -249,9 +260,8 @@ namespace Power { bool isopen(); ///< is the NPS socket connection open? long command( std::string cmd, std::string &retstring ); ///< parse and form a command to send to the NPS unit void list( std::string args, std::string &retstring ); ///< list plug devices - long status( std::string args, std::string &retstring ); ///< status of all plug devices - void publish_snapshot(); ///< make serialized JSON telemetry message - void publish_snapshot( std::string &retstring ); ///< make serialized JSON telemetry message + long get_status( std::string args, std::string &retstring ); ///< status of all plug devices + void publish_status( bool force=false ); ///< publish power state on change (or force) }; /***** Power::Interface *****************************************************/ diff --git a/powerd/power_server.cpp b/powerd/power_server.cpp index 48699065..1d8a2e56 100644 --- a/powerd/power_server.cpp +++ b/powerd/power_server.cpp @@ -606,28 +606,12 @@ namespace Power { // power status // if ( cmd == POWERD_STATUS ) { - ret = this->interface.status( args, retstring ); + ret = this->interface.get_status( args, retstring ); if ( ret==NO_ERROR ) { ret=NOTHING; if ( sock.Write( retstring ) < 0 ) connection_open=false; } } - else - - // telemetry request - // - if ( cmd == SNAPSHOT || cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->interface.publish_snapshot( retstring ); - ret = JSON; - } - } // all other commands go to the powerd interface for parsing // @@ -680,8 +664,6 @@ namespace Power { if ( sock.Write( retstring ) < 0 ) connection_open=false; } - if ( ret==NO_ERROR ) this->interface.publish_snapshot(); - if (!sock.isblocking()) break; // Non-blocking connection exits immediately. // Keep blocking connection open for interactive session. } diff --git a/powerd/powerd.cpp b/powerd/powerd.cpp index 3763fd7f..a4154096 100644 --- a/powerd/powerd.cpp +++ b/powerd/powerd.cpp @@ -126,8 +126,10 @@ int main(int argc, char **argv) { } std::this_thread::sleep_for( std::chrono::milliseconds(500) ); - // publish snapshot of my telemetry so the world knows I'm online - powerd.interface.publish_snapshot(); + // read current state, then force-publish so the world knows I'm online + std::string dontcare; + powerd.interface.get_status( "", dontcare ); + powerd.interface.publish_status( true ); // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. From 0cdabe1b72805f53c518c8310bc5fb2dbc588bba Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 01:45:56 -0700 Subject: [PATCH 05/12] sequencer writes IMGTYPE keyword --- Config/sequencerd.cfg.in | 29 ++++++++++++++-------------- sequencerd/sequence.cpp | 31 +++++++++++++++++++++++++++++- sequencerd/sequence.h | 1 + sequencerd/sequencer_interface.cpp | 11 +++++++---- sequencerd/sequencer_interface.h | 1 + 5 files changed, 54 insertions(+), 19 deletions(-) diff --git a/Config/sequencerd.cfg.in b/Config/sequencerd.cfg.in index 5a9cbbce..bb89e6f1 100644 --- a/Config/sequencerd.cfg.in +++ b/Config/sequencerd.cfg.in @@ -162,7 +162,7 @@ ACQUIRE_MIN_REPEAT=2 # minimum number of sequential successful a ACQUIRE_TCS_MAX_OFFSET=60 # the maximum allowable offset sent to the TCS, in arcsec # Calibration Settings -# CAL_TARGET=(name caldoor calcover U G R I lampthar lampfear lampbluc lampredc lolamp hilamp mod1 mod2 ... mod6) +# CAL_TARGET=(name caldoor calcover U G R I lampthar lampfear lampbluc lampredc lolamp hilamp mod1 mod2 ... mod6 imgtype) # # where name must be "DEFAULT" or start with "CAL_" # caldoor = open | close @@ -170,21 +170,22 @@ ACQUIRE_TCS_MAX_OFFSET=60 # the maximum allowable offset sent to the # U,G,R,I = on | off # indicates which channels to enable/disable # lamp* = on | off # lamp power # mod* = on | off # lamp modulator -# for a total of 19 required parameters +# imgtype = # FITS IMGTYPE keyword +# for a total of 20 required parameters # name=SCIENCE defines science target operation # -# name door cover U G R I thar fear bluc redc llmp hlmp mod1 mod2 mod3 mod4 mod5 mod6 -CAL_TARGET=(CAL_THAR open close on on on on on on on on off off off off off off off on ) -CAL_TARGET=(CAL_FEAR open close on on on on on on on on off off on off off off off off) -CAL_TARGET=(CAL_THAR_UG open close on on off off on on on on off off off off off off off on ) -CAL_TARGET=(CAL_FEAR_UG open close on on off off on on on on off off on off off off off off) -CAL_TARGET=(CAL_CONTR open close on on on on on on on on off off off off off on off off) -CAL_TARGET=(CAL_CONTB open close on on on on on on on on off off off off off off on off) -CAL_TARGET=(CAL_DOME close open on on on on off off off off off on off off off off off off) -CAL_TARGET=(CAL_DOME_UG close open on on off off off off off off off on off off off off off off) -CAL_TARGET=(CAL_BIAS close close on on on on off off off off off off off off off off off off) -CAL_TARGET=(CAL_DARK close close on on on on off off off off off off off off off off off off) -CAL_TARGET=(SCIENCE close open on on on on off off off off off off off off off off off off) +# name door cover U G R I thar fear bluc redc llmp hlmp mod1 mod2 mod3 mod4 mod5 mod6 imgtype +CAL_TARGET=(CAL_THAR open close on on on on on on on on off off off off off off off on THAR ) +CAL_TARGET=(CAL_FEAR open close on on on on on on on on off off on off off off off off FEAR ) +CAL_TARGET=(CAL_THAR_UG open close on on off off on on on on off off off off off off off on THAR ) +CAL_TARGET=(CAL_FEAR_UG open close on on off off on on on on off off on off off off off off FEAR ) +CAL_TARGET=(CAL_CONTR open close on on on on on on on on off off off off off on off off CONT ) +CAL_TARGET=(CAL_CONTB open close on on on on on on on on off off off off off off on off CONT ) +CAL_TARGET=(CAL_DOME close open on on on on off off off off off on off off off off off off DOMEFLAT) +CAL_TARGET=(CAL_DOME_UG close open on on off off off off off off off on off off off off off off DOMEFLAT) +CAL_TARGET=(CAL_BIAS close close on on on on off off off off off off off off off off off off BIAS ) +CAL_TARGET=(CAL_DARK close close on on on on off off off off off off off off off off off off DARK ) +CAL_TARGET=(SCIENCE close open on on on on off off off off off off off off off off off off SCI ) # miscellaneous # diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index fe18ade3..52b3a4ad 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -2705,7 +2705,9 @@ namespace Sequencer { this->arm_readout_flag = true; // enables the async_listener to look for the readout and clear the EXPOSE bit - logwrite( function, "[DEBUG] sending expose command" ); + this->set_imgtype(); + + logwrite( function, "sending expose command" ); // Send the EXPOSE command to camera daemon and wait for the reply. // Also verify the reply contains "DONE": command_timeout returns NO_ERROR @@ -2786,6 +2788,33 @@ namespace Sequencer { /***** Sequencer::Sequence::modify_exptime **********************************/ + /***** Sequencer::Sequence::set_imgtype *************************************/ + /** + * @brief set IMGTYPE FITS keyword in camerad before each exposure + * @details Looks up the imgtype field from the CalibrationTarget config for + * the current target and sends it to camerad as a key command. + * @return ERROR|NO_ERROR + * + */ + long Sequence::set_imgtype() { + const std::string function("Sequencer::Sequence::set_imgtype"); + std::string reply; + + const std::string calname = std::string(this->target.iscal ? this->target.name : "SCIENCE"); + const std::string imgtype = this->caltarget.get_info(calname).imgtype; + + const std::string cmd = CAMERAD_KEY + " IMGTYPE=" + imgtype + + (this->target.iscal ? "//Calibration" : ""); + if ( this->camerad.send( cmd, reply ) != NO_ERROR ) { + logwrite( function, "ERROR sending '"+cmd+"': "+reply ); + return ERROR; + } + + return NO_ERROR; + } + /***** Sequencer::Sequence::set_imgtype *************************************/ + + /***** Sequencer::Sequence::startup *****************************************/ /** * @brief performs nightly startup diff --git a/sequencerd/sequence.h b/sequencerd/sequence.h index 2e04272c..fac6b2cd 100644 --- a/sequencerd/sequence.h +++ b/sequencerd/sequence.h @@ -579,6 +579,7 @@ namespace Sequencer { // These are various jobs that are done in their own threads // long trigger_exposure(); ///< trigger and wait for exposure + long set_imgtype(); ///< set IMGTYPE void abort_process(); ///< tries to abort everything void stop_exposure(); ///< stop exposure timer in progress long repeat_exposure(); ///< repeat the last exposure diff --git a/sequencerd/sequencer_interface.cpp b/sequencerd/sequencer_interface.cpp index 1821616b..9ac519c4 100644 --- a/sequencerd/sequencer_interface.cpp +++ b/sequencerd/sequencer_interface.cpp @@ -959,9 +959,9 @@ namespace Sequencer { auto size = Tokenize( args, tokens, " \t" ); - // there must be 19 args. see cfg file for complete description - if ( size != 19 ) { - logwrite(function, "ERROR bad config file. expected 19 but received " + // there must be 20 args. see cfg file for complete description + if ( size != 20 ) { + logwrite(function, "ERROR bad config file. expected 20 but received " +std::to_string(size)+" parameters"); return ERROR; } @@ -1000,10 +1000,13 @@ namespace Sequencer { info.domelamp[i] = on_off(tokens.at(11+i)); } - // tokens 13-19 -- modulator numbers are {1:6} + // tokens 13-18 -- modulator numbers are {1:6} for (size_t i=0; i<6; i++) { info.lampmod[i+1] = on_off(tokens.at(13+i)); } + + // token[19] is FITS IMGTYPE + info.imgtype = tokens.at(19); } catch (const std::exception &e) { logwrite(function, "ERROR: "+std::string(e.what())); diff --git a/sequencerd/sequencer_interface.h b/sequencerd/sequencer_interface.h index f4569e7c..d2ba283b 100644 --- a/sequencerd/sequencer_interface.h +++ b/sequencerd/sequencer_interface.h @@ -139,6 +139,7 @@ namespace Sequencer { ///< struct holds all calibration parameters not in the target database typedef struct { std::string name; // calibration target name + std::string imgtype; // FITS IMGTYPE keyword for target std::map channel_active; // true=on bool caldoor; // true=open bool calcover; // true=open From a574eadbf94d25c4b49b36571b0020665f7b4293 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 02:35:32 -0700 Subject: [PATCH 06/12] updates default slit width --- Config/sequencerd.cfg.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Config/sequencerd.cfg.in b/Config/sequencerd.cfg.in index bb89e6f1..69d2f9a2 100644 --- a/Config/sequencerd.cfg.in +++ b/Config/sequencerd.cfg.in @@ -87,7 +87,7 @@ CAMERA_EPILOGUE=(close) # slit init and shutdown states # formatted as (width offset) # -SLIT__INIT=(0.5 3.0) +SLIT__INIT=(0.4 3.0) SLIT__SHUTDOWN= # ACAM init and shutdown states @@ -107,7 +107,7 @@ CALIB_DOOR__SHUTDOWN=close # Virtual Slit Mode slit offset positions # units are arcseconds # -VIRTUAL_SLITW_ACQUIRE=0.5 # slit width during acquire +VIRTUAL_SLITW_ACQUIRE=0.4 # slit width during acquire VIRTUAL_SLITO_ACQUIRE=-3.0 # slit offset for acquiring target VIRTUAL_SLITO_EXPOSE=3.0 # slit offset for science exposure From 5859cf400d13e4948d02aeae9efdf74c4e659251 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 02:40:20 -0700 Subject: [PATCH 07/12] fix: drain DaemonClient send buffer from DONTWAIT or timed-out sends --- common/common.cpp | 17 +++++++++++++++++ sequencerd/sequence.cpp | 7 ++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/common/common.cpp b/common/common.cpp index 149d225c..ec54f431 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -758,6 +758,23 @@ namespace Common { command += this->term_write; } + // Drain any stale reply that was buffered from a prior send whose reply was + // never read (e.g. a DONTWAIT send, or a send that timed out before the reply + // arrived and reconnected without draining). Without this, the stale CID-tagged + // reply would be read as the response to the current command and cause a + // mismatch, propagating through every subsequent send until the socket is cycled. + // Safe: client_access mutex is held, so no concurrent sender touches this socket; + // and in the command/reply protocol daemons never push unsolicited TCP data. + // + { + std::string discard; + while ( this->socket.Poll(0) > 0 ) { + if ( this->socket.Read( discard, this->term_read ) <= 0 ) break; + message.str(""); message << "drained stale buffered reply from " << this->name << ": \"" << discard << "\""; + logwrite( function, message.str() ); + } + } + int trys=0; int retry_limit=3; int pollret=0; diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 52b3a4ad..17d7bc64 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -2187,7 +2187,12 @@ namespace Sequencer { } // Send casangle using tcsd wrapper for RINGGO command - // do not wait for reply + // do not wait for reply — intentional: Cassegrain rotation can take tens of + // seconds and the sequence continues while the operator guides on-target. + // + // WATCH: tcsd still sends a CID-tagged reply that accumulates in the socket + // receive buffer unread. DaemonClient::send() drains stale data before each + // new write, which prevents that orphaned reply from poisoning the next send. // { std::stringstream ringgo_cmd; From 1d7dd2031f1f687e15e53657de175a2a0885a452 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 02:46:09 -0700 Subject: [PATCH 08/12] calibd publishes status --- calibd/calib_interface.cpp | 90 ++++++++++++++++++++------------------ calibd/calib_interface.h | 19 ++++++-- calibd/calib_server.cpp | 18 +------- calibd/calibd.cpp | 4 +- 4 files changed, 67 insertions(+), 64 deletions(-) diff --git a/calibd/calib_interface.cpp b/calibd/calib_interface.cpp index 216412ce..96eee47b 100644 --- a/calibd/calib_interface.cpp +++ b/calibd/calib_interface.cpp @@ -687,89 +687,95 @@ namespace Calib { /***** Calib::Interface::close **********************************************/ - /***** Calib::Interface::publish_snapshot ***********************************/ + /***** Calib::Interface::get_status *****************************************/ /** - * @brief publishes a snapshot of my telemetry - * @details This publishes a JSON message containing a snapshot of my - * telemetry. + * @brief read current calib state (modulators + actuators) into status * */ - void Interface::publish_snapshot() { - std::string dontcare; - this->publish_snapshot(dontcare); - } - void Interface::publish_snapshot( std::string &retstring ) { - const std::string function("Calib::Interface::publish_snapshot"); + void Interface::get_status() { std::string ret_isopen; - // assemble the telemetry into a json message - // - nlohmann::json jmessage_out; - - jmessage_out["source"] = "calibd"; // source of this telemetry + this->status.values.clear(); - // get the status for each modulator in the map - // assemble message string of "pow dut per" indexed by name + // modulator state: "pow dut per" indexed by modulator name // - this->is_open("lampmod",ret_isopen); + this->is_open( "lampmod", ret_isopen ); for ( const auto &[num,name] : this->modulator.modmap_num ) { - // initialize these on each pass double dut=NAN; double per=NAN; int pow=-1; std::stringstream retstream; - if (ret_isopen=="true") this->modulator.status( num, dut, per, pow ); + if ( ret_isopen=="true" ) this->modulator.status( num, dut, per, pow ); switch(pow) { case 0 : retstream << "off "; break; case 1 : retstream << "on " ; break; default: retstream << "err "; break; } - if ( std::isnan(dut) ) retstream << "nan "; else retstream << dut << " "; if ( std::isnan(per) ) retstream << "nan "; else retstream << per; - jmessage_out[name] = retstream.str(); + this->status.values[name] = retstream.str(); } - // get a copy of the motormap and - // loop through all motors, getting their actuator position + // actuator state: CAL indexed by uppercase motor name // - auto _motormap = this->motion.motorinterface.get_motormap(); // local copy of motormap - + auto _motormap = this->motion.motorinterface.get_motormap(); std::string retmotion; - this->is_open("motion",retmotion); - bool ismotion = (retmotion=="true" ? true : false); - + this->is_open( "motion", retmotion ); + bool ismotion = ( retmotion=="true" ); for ( const auto &mot : _motormap ) { - if (ismotion) this->motion.get( mot.first, retstring ); // get position of actuator - std::string key="CAL"+mot.first; // key = CALxxxx - make_uppercase(key); // make key uppercase - jmessage_out[ key ] = (ismotion?retstring:"not_connected"); // store in JSON message + std::string posret; + if ( ismotion ) this->motion.get( mot.first, posret ); + std::string key = "CAL" + mot.first; + make_uppercase( key ); + this->status.values[key] = ( ismotion ? posret : "not_connected" ); + } + } + /***** Calib::Interface::get_status *****************************************/ + + + /***** Calib::Interface::publish_status *************************************/ + /** + * @brief publish calib state, but only if it changed (or forced) + * @param[in] force optional (default=false) publish irrespective of change + * + */ + void Interface::publish_status( bool force ) { + + // refresh current state from hardware + // + this->get_status(); + + // unless forced, only publish if the state changed + // + if ( !force && this->status == this->last_published_status ) return; + + nlohmann::json jmessage_out; + jmessage_out[Key::SOURCE] = Topic::CALIBD; + for ( const auto &[key,val] : this->status.values ) { + jmessage_out[key] = val; } - // for backwards compatibility - jmessage_out["messagetype"]="calibinfo"; - retstring = jmessage_out.dump(); // serialize the json message into retstring - retstring.append(JEOF); // append the JSON message terminator + this->last_published_status = this->status; try { this->publisher->publish( jmessage_out ); } catch( const std::exception &e ) { - logwrite( "Calib::Interface::publish_snapshot", + logwrite( "Calib::Interface::publish_status", "ERROR publishing message: "+std::string(e.what()) ); } } - /***** Calib::Interface::publish_snapshot ***********************************/ + /***** Calib::Interface::publish_status *************************************/ void Interface::handletopic_snapshot( const nlohmann::json &jmessage ) { - // If my name is in the jmessage then publish my snapshot + // If my topic is in the jmessage then publish my status // - if ( jmessage.contains( Calib::DAEMON_NAME ) ) { - this->publish_snapshot(); + if ( jmessage.contains( Topic::CALIBD ) ) { + this->publish_status(); } else if ( jmessage.contains( "test" ) ) { diff --git a/calibd/calib_interface.h b/calibd/calib_interface.h index 430b67be..2639653e 100644 --- a/calibd/calib_interface.h +++ b/calibd/calib_interface.h @@ -145,6 +145,20 @@ namespace Calib { private: zmqpp::context context; + /** + * @struct Status + * @brief published calib state: value strings indexed by key (modulators + actuators) + */ + struct Status { + std::map values; + bool operator==(const Status &o) const { return values == o.values; } + bool operator!=(const Status &o) const { return !(*this == o); } + }; + Status status; ///< current calib state + Status last_published_status; ///< last published calib state + + void get_status(); ///< refresh status from hardware + public: Interface() : context(), @@ -153,7 +167,7 @@ namespace Calib { should_subscriber_thread_run(false) { topic_handlers = { - { "_snapshot", std::function( + { Topic::SNAPSHOT, std::function( [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) } }; } @@ -188,8 +202,7 @@ namespace Calib { void handletopic_snapshot( const nlohmann::json &jmessage ); - void publish_snapshot(); - void publish_snapshot(std::string &retstring); + void publish_status( bool force=false ); ///< publish calib state on change (or force) long open(std::string args, std::string &retstring); long is_open(std::string args, std::string &retstring); diff --git a/calibd/calib_server.cpp b/calibd/calib_server.cpp index 73460ae7..0dc335fa 100644 --- a/calibd/calib_server.cpp +++ b/calibd/calib_server.cpp @@ -629,22 +629,6 @@ namespace Calib { if ( cmd == CALIBD_LAMPMOD ) { ret = this->interface.modulator.control( args, retstring ); } - else - - // telemetry request - // - if ( cmd == SNAPSHOT || cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->interface.publish_snapshot( retstring ); - ret = JSON; - } - } // unknown commands generate an error // @@ -692,7 +676,7 @@ namespace Calib { if ( sock.Write( retstring ) < 0 ) connection_open=false; } - if ( ret==NO_ERROR ) this->interface.publish_snapshot(); + if ( ret==NO_ERROR ) this->interface.publish_status(); if (!sock.isblocking()) break; // Non-blocking connection exits immediately. // Keep blocking connection open for interactive session. diff --git a/calibd/calibd.cpp b/calibd/calibd.cpp index 2c9da1cd..70663f55 100644 --- a/calibd/calibd.cpp +++ b/calibd/calibd.cpp @@ -126,8 +126,8 @@ int main(int argc, char **argv) { } std::this_thread::sleep_for( std::chrono::milliseconds(500) ); - // publish snapshot of my telemetry so the world knows I'm online - calibd.interface.publish_snapshot(); + // read current state and force-publish so the world knows I'm online + calibd.interface.publish_status( true ); // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. From 925b063ce5d14334ec9b4f290a60b7fca104ecab Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 03:44:51 -0700 Subject: [PATCH 09/12] sequencer skips targets if nexp==0 --- sequencerd/sequence.cpp | 6 ++++++ sequencerd/sequencer_interface.cpp | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index 17d7bc64..d0e10941 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -628,6 +628,12 @@ namespace Sequencer { if ( targetstate == TargetInfo::TARGET_FOUND ) { // target found, get the threads going + if (this->target.nexp==0) { // skip target if nexp==0 + message.str(""); message << "skipping target " << this->target.name; + logwrite(function, message.str()); + continue; + } + // If the TCS is not ready and the target contains TCS coordinates, // then we cannot proceed. // diff --git a/sequencerd/sequencer_interface.cpp b/sequencerd/sequencer_interface.cpp index 9ac519c4..a4787c2b 100644 --- a/sequencerd/sequencer_interface.cpp +++ b/sequencerd/sequencer_interface.cpp @@ -735,9 +735,10 @@ namespace Sequencer { } } - // number of exposures must be >= 1 + // number of exposures must be >= 0 + // class constructed with 1 but an intentional 0 means skip this target // - if (this->nexp <= 0) this->nexp=1; + if (this->nexp < 0) this->nexp=1; return NO_ERROR; } From 6ff50a981869c3510a0e5db785d7087592bbdaa2 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 14:59:43 -0700 Subject: [PATCH 10/12] fix: focusd publishes on status change --- focusd/focus_interface.cpp | 67 +++++++++++++++++++++----------------- focusd/focus_interface.h | 29 +++++++++++++++-- focusd/focus_server.cpp | 18 ++-------- focusd/focusd.cpp | 4 +++ 4 files changed, 71 insertions(+), 47 deletions(-) diff --git a/focusd/focus_interface.cpp b/focusd/focus_interface.cpp index 0c735641..2c3b4aef 100644 --- a/focusd/focus_interface.cpp +++ b/focusd/focus_interface.cpp @@ -537,25 +537,13 @@ namespace Focus { /***** Focus::Interface::send_command ***************************************/ - /***** Focus::Interface::make_telemetry_message *****************************/ + /***** Focus::Interface::get_status *****************************************/ /** - * @brief assembles a telemetry message - * @details This creates a JSON message for telemetry info, then serializes - * it into a std::string ready to be sent over a socket. - * @param[out] retstring string containing the serialization of the JSON message + * @brief read current focus positions into status * */ - void Interface::make_telemetry_message( std::string &retstring ) { - const std::string function="Focus::Interface::make_telemetry_message"; - - // assemble the telemetry into a json message - // Set a messagetype keyword to indicate what kind of message this is. - // - nlohmann::json jmessage; - jmessage["messagetype"]="focusinfo"; - - // get focus position for each motor - // + void Interface::get_status() { + this->status.positions.clear(); auto _motormap = this->motorinterface.get_motormap(); for ( const auto &mot : _motormap ) { auto name = mot.second.name; @@ -564,29 +552,50 @@ namespace Focus { float position = NAN; std::string posname; this->motorinterface.get_pos( name, axis, addr, position, posname ); + this->status.positions[ "FOCUS"+mot.first ] = position; + } + } + /***** Focus::Interface::get_status *****************************************/ - std::string key = "FOCUS" + mot.first; - // assign the position or NaN to a key in the JSON jmessage - // - if ( !std::isnan(position) ) jmessage[key]=position; else jmessage[key]="NAN"; - } + /***** Focus::Interface::publish_status ***********************************/ + /** + * @brief publish focus state, but only if it changed (or forced) + * @param[in] force optional (default=false) publish irrespective of change + * + */ + void Interface::publish_status( bool force ) { - retstring = jmessage.dump(); // serialize the json message into retstring + // refresh current state from hardware + // + this->get_status(); - this->publisher->publish(retstring); + // unless forced, only publish if the state changed + // + if ( !force && this->status == this->last_published_status ) return; - retstring.append(JEOF); // append the JSON message terminator + nlohmann::json jmessage; + jmessage[Key::SOURCE] = Topic::FOCUSD; + for ( const auto &[key,pos] : this->status.positions ) { + if ( !std::isnan(pos) ) jmessage[key] = pos; else jmessage[key] = "NAN"; + } + + this->last_published_status = this->status; - return; + try { + this->publisher->publish( jmessage ); + } + catch( const std::exception &e ) { + logwrite( "Focus::Interface::publish_status", + "ERROR publishing message: "+std::string(e.what()) ); + } } - /***** Focus::Interface::make_telemetry_message *****************************/ + /***** Focus::Interface::publish_status ***********************************/ void Interface::handletopic_snapshot( const nlohmann::json &jmessage ) { - if ( jmessage.contains( Focus::DAEMON_NAME ) ) { - std::string dontcare; - this->make_telemetry_message(dontcare); + if ( jmessage.contains( Topic::FOCUSD ) ) { + this->publish_status(); } else if ( jmessage.contains( "test" ) ) { diff --git a/focusd/focus_interface.h b/focusd/focus_interface.h index 5bc6183e..1e8d2858 100644 --- a/focusd/focus_interface.h +++ b/focusd/focus_interface.h @@ -17,6 +17,7 @@ #include #include #include +#include #define FOCUS_MOVE_TIMEOUT 5000 ///< timeout in msec for moves #define FOCUS_HOME_TIMEOUT 5000 ///< timeout in msec for home @@ -46,6 +47,30 @@ namespace Focus { zmqpp::context context; size_t numdev; bool class_initialized; + + /** + * @struct Status + * @brief published focus state: focus position (mm) by channel; NaN if unavailable + */ + struct Status { + std::map positions; + bool operator==(const Status &o) const { + if ( positions.size() != o.positions.size() ) return false; + for ( const auto &[k,v] : positions ) { + auto it = o.positions.find(k); + if ( it == o.positions.end() ) return false; + if ( std::isnan(v) && std::isnan(it->second) ) continue; // NaN==NaN treated equal + if ( v != it->second ) return false; + } + return true; + } + bool operator!=(const Status &o) const { return !(*this == o); } + }; + Status status; ///< current focus state + Status last_published_status; ///< last published focus state + + void get_status(); ///< refresh status from hardware + public: Interface() : context(), @@ -56,7 +81,7 @@ namespace Focus { should_subscriber_thread_run(false) { topic_handlers = { - { "_snapshot", std::function( + { Topic::SNAPSHOT, std::function( [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) } }; } @@ -107,7 +132,7 @@ namespace Focus { long stop(); ///< send the stop-all-motion command to all controllers long send_command( const std::string &name, std::string cmd ); ///< writes the raw command as received to the master controller, no reply long send_command( const std::string &name, std::string cmd, std::string &retstring ); ///< writes command?, reads reply - void make_telemetry_message( std::string &retstring ); ///< assembles a telemetry message + void publish_status( bool force=false ); ///< publish focus state on change (or force) long test( std::string args, std::string &retstring ); diff --git a/focusd/focus_server.cpp b/focusd/focus_server.cpp index 0a8b4f1b..b46949a7 100644 --- a/focusd/focus_server.cpp +++ b/focusd/focus_server.cpp @@ -616,22 +616,6 @@ namespace Focus { } else - // send telemetry upon request - // - if ( cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->interface.make_telemetry_message( retstring ); - ret = JSON; - } - } - else - // test routines // if ( cmd == FOCUSD_TEST ) { @@ -684,6 +668,8 @@ namespace Focus { if ( sock.Write( retstring ) < 0 ) connection_open=false; } + if ( ret==NO_ERROR ) this->interface.publish_status(); + if (!sock.isblocking()) break; // Non-blocking connection exits immediately. // Keep blocking connection open for interactive session. } diff --git a/focusd/focusd.cpp b/focusd/focusd.cpp index 4926a650..321aa45d 100644 --- a/focusd/focusd.cpp +++ b/focusd/focusd.cpp @@ -126,6 +126,10 @@ int main(int argc, char **argv) { logwrite(function, "ERROR initializing publisher-subscriber handler"); focusd.exit_cleanly(); } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + + // read current state and force-publish so the world knows I'm online + focusd.interface.publish_status( true ); // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. From 4bb6f457467e62f5f4885ee0ab2c2cbbb15eb8f7 Mon Sep 17 00:00:00 2001 From: David Hale Date: Wed, 20 May 2026 15:49:06 -0700 Subject: [PATCH 11/12] fix: sequencerd publishes targetinfo remove legacy telemetry from camerad --- acamd/acam_interface.cpp | 10 +- camerad/astrocam.cpp | 268 +------------------------------- camerad/astrocam.h | 2 - common/fits_header_defs.h | 12 +- common/message_keys.h | 11 ++ sequencerd/sequence.cpp | 61 ++++---- sequencerd/sequence.h | 3 +- sequencerd/sequencer_server.cpp | 17 -- 8 files changed, 60 insertions(+), 324 deletions(-) diff --git a/acamd/acam_interface.cpp b/acamd/acam_interface.cpp index 71ab89c7..2b8a7fd4 100644 --- a/acamd/acam_interface.cpp +++ b/acamd/acam_interface.cpp @@ -1639,11 +1639,11 @@ namespace Acam { * */ void Interface::handletopic_targetinfo( const nlohmann::json &jmessage ) { - this->database.add_from_json( jmessage, "OBS_ID" ); - this->database.add_from_json( jmessage, "NAME" ); - this->database.add_from_json( jmessage, "POINTMODE" ); - this->database.add_from_json( jmessage, "RA" ); - this->database.add_from_json( jmessage, "DECL" ); + this->database.add_from_json( jmessage, Key::TargetInfo::OBS_ID ); + this->database.add_from_json( jmessage, Key::TargetInfo::NAME ); + this->database.add_from_json( jmessage, Key::TargetInfo::POINTMODE ); + this->database.add_from_json( jmessage, Key::TargetInfo::RA ); + this->database.add_from_json( jmessage, Key::TargetInfo::DECL ); } /***** Acam::Interface::handletopic_targetinfo ******************************/ diff --git a/camerad/astrocam.cpp b/camerad/astrocam.cpp index 2899c355..b65cc729 100644 --- a/camerad/astrocam.cpp +++ b/camerad/astrocam.cpp @@ -2716,7 +2716,7 @@ namespace AstroCam { telem.add_json_key( jmsg, k.jkey, k.keyword, k.comment, k.type, EXT, k.chan ); } else { - telem.add_json_key( jmsg, k.jkey, k.jkey, k.comment, k.type, PRI ); + telem.add_json_key( jmsg, k.jkey, k.keyword, k.comment, k.type, PRI ); } } } @@ -3204,197 +3204,8 @@ namespace AstroCam { /***** AstroCam::Interface::make_telemetry_message **************************/ - /***** AstroCam::Interface::collect_telemetry *******************************/ - /** - * @brief send the TELEMREQUEST command to each configured daemon to get telemetry - * - */ - void Interface::collect_telemetry() { - std::string retstring; - - // Instantiate a client to communicate with each daemon, - // constructed with no name, newline termination on command writes, - // and JEOF termination on reply reads. - // - Common::DaemonClient jclient("", "\n", JEOF ); - - // Loop through each configured telemetry provider, which is a map of - // ports indexed by daemon name, both of which are used to update - // the jclient object. - // - // Send the command TELEMREQUEST to each daemon and read back the reply into - // retstring, which will be the serialized JSON telemetry message. - // - // handle_json_message() will parse the reply and set the FITS header - // keys in the telemkeys database. - // - for ( const auto &[name, port] : this->telemetry_providers ) { - jclient.set_name(name); - jclient.set_port(port); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - handle_json_message(retstring); - } - - return; - } - /***** AstroCam::Interface::collect_telemetry *******************************/ - - - /***** AstroCam::Interface::handle_json_message *****************************/ - /** - * @brief parses incoming telemetry messages - * @param[in] message_in serialized JSON message string - * @return ERROR | NO_ERROR - * - */ - long Interface::handle_json_message( std::string message_in ) { - const std::string function="AstroCam::Interface::handle_json_message"; - std::stringstream message; - std::string messagetype; - long error; - - // nothing to do if the message is empty - // - if ( message_in.empty() ) { - logwrite( function, "empty JSON message" ); - return ERROR; - } - - auto &telemkeys = this->camera_info.telemkeys; - - // use to select whether to write to extension or primary - // - bool ext = true; - bool pri = !ext; - - size_t eof_pos = message_in.find(JEOF); - if ( eof_pos != std::string::npos ) message_in.erase(eof_pos); - - try { - nlohmann::json jmessage = nlohmann::json::parse( message_in ); - - // jmessage must not contain key "error" and must contain key "messagetype" - // - if ( !jmessage.contains("error") ) { - if ( jmessage.contains("messagetype") ) { - messagetype = jmessage["messagetype"]; - error = NO_ERROR; - } - else { - logwrite( function, "ERROR received JSON message with no messagetype" ); - error = ERROR; - } - } - else { - logwrite( function, "ERROR in JSON message" ); - error = ERROR; - } - - // If jmessage contained error or no messagetype then get out now. - // - if ( error != NO_ERROR ) return error; - - // telemetry from calibd goes in the primary header - // - if ( messagetype == "calibinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::CalibInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); - } - } - else - - // telemetry from flexured goes in the extension header corresponding to the channel - // - if ( messagetype == "flexureinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::FlexureInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); - } - } - else - - // telemetry from focusd goes in the extension header corresponding to the channel - // - if ( messagetype == "focusinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::FocusInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); - } - } - else - - // telemetry from powerd goes in the primary header - // - if ( messagetype == "powerinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::PowerInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); - } - } - else - // telemetry from calibd goes in the primary header - // - if ( messagetype == "slitinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::SlitInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); - } - } - else - - // targetinfo telemetry comes from sequencerd and goes in the primary header - // - if ( messagetype == "targetinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::TargetInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); - } - } - else - - // telemetry from tcsd goes into primary header - // AIRMASS is intentionally left out since it is handled differently - // - if ( messagetype == "tcsinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::TcsInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.jkey, keyinfo.comment, keyinfo.type, pri); - } - } - else - - // telemetry from thermald - // - if ( messagetype == "thermalinfo" ) { - for ( const auto &keyinfo : FitsHeaderKeys::ThermalInfoKeys ) { - telemkeys.add_json_key(jmessage, keyinfo.jkey, keyinfo.keyword, keyinfo.comment, keyinfo.type, ext, keyinfo.chan); - } - } - else - - // test message - // - if ( messagetype == "test" ) { - message.str(""); message << "received JSON test message: \"" << jmessage["test"].get() << "\""; - logwrite( function, message.str() ); - } - else { - message.str(""); message << "ERROR received unhandled JSON message type \"" << messagetype << "\""; - logwrite( function, message.str() ); - error = ERROR; - } - } - catch ( const nlohmann::json::parse_error &e ) { - message.str(""); message << "ERROR json exception parsing message: " << e.what(); - logwrite( function, message.str() ); - error = ERROR; - } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing message: " << e.what(); - logwrite( function, message.str() ); - error = ERROR; - } - return error; - } - /***** AstroCam::Interface::handle_json_message *****************************/ /***** AstroCam::Interface::do_load_firmware ********************************/ @@ -6659,83 +6470,6 @@ logwrite(function, message.str()); } else // ---------------------------------------------------- - // telem - // ---------------------------------------------------- - // test sending the telem command - // - if ( testname == "telem" ) { - if ( tokens.size() < 2 ) { - logwrite( function, "ERROR expected an argument" ); - retstring="invalid_argument"; - return ERROR; - } - - if ( tokens[1] == "?" || tokens[1] == "help" ) { - retstring = CAMERAD_TEST; - retstring.append( " telem collect | test | calibd | flexured | focusd | tcsd\n" ); - retstring.append( " collect collects telemetry from all daemons\n" ); - retstring.append( " test sends a test JSON message back to myself (camerad)\n" ); - retstring.append( " all other args collect telemetry from named daemon only\n" ); - return HELP; - } - - if ( tokens[1] == "collect" ) { - this->collect_telemetry(); - return NO_ERROR; - } - - Common::DaemonClient jclient("", "\n", JEOF ); - - if ( tokens[1]=="calibd" ) { - jclient.set_name("calibd"); - jclient.set_port(9101); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - } - else - if ( tokens[1]=="flexured" ) { - jclient.set_name("flexured"); - jclient.set_port(9103); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - } - else - if ( tokens[1]=="focusd" ) { - jclient.set_name("focusd"); - jclient.set_port(9104); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - } - else - if ( tokens[1]=="tcsd" ) { - jclient.set_name("tcsd"); - jclient.set_port(9107); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - } - else - if ( tokens[1]=="test" ) { - nlohmann::json jmessage; - jmessage["messagetype"] = "test"; - jmessage["test"] = "Hello, world!"; - logwrite( function, "returning JSON test message" ); - retstring = jmessage.dump(); - } - else { - jclient.set_name("camerd"); - jclient.set_port(server.nbport); - jclient.connect(); - jclient.command("test json test", retstring); - jclient.disconnect(); - } - this->handle_json_message( retstring ); - } - else - // ---------------------------------------------------- // isready // ---------------------------------------------------- // am I ready for an exposure? diff --git a/camerad/astrocam.h b/camerad/astrocam.h index 0ee13a61..3e948e78 100644 --- a/camerad/astrocam.h +++ b/camerad/astrocam.h @@ -1178,7 +1178,6 @@ std::vector> fitsinfo; Controller* get_active_controller(const int dev); void exposure_progress(); void make_image_keywords( int dev ); - long handle_json_message( std::string message_in ); long parse_spec_info( std::string args ); long parse_det_geometry( std::string args ); long parse_controller_config( std::string args ); @@ -1239,7 +1238,6 @@ std::vector> fitsinfo; long expose(std::string nexp_in); long do_expose(int nexp_in); void make_telemetry_message( std::string &retstring ); - void collect_telemetry(); double get_live_airmass(); ///< latest airmass from cached tcsd telemetry, or NAN long native(std::string cmdstr); long native(std::string cmdstr, std::string &retstring); diff --git a/common/fits_header_defs.h b/common/fits_header_defs.h index a439ef3d..26f8c4a7 100644 --- a/common/fits_header_defs.h +++ b/common/fits_header_defs.h @@ -83,12 +83,12 @@ namespace FitsHeaderKeys { }; const Primary TargetInfoKeys[] = { - { "OBS_ID", "", "Observation ID", "INT" }, - { "NAME", "", "target name", "STRING" }, - { "SLITA", "", "slit angle in deg", "FLOAT" }, - { "POINTMDE", "", "pointing mode", "STRING" }, - { "RA", "", "requested Right Ascension in J2000", "STRING" }, - { "DECL", "", "requested Declination in J2000", "STRING" } + { Key::TargetInfo::OBS_ID.c_str(), "", "Observation ID", "INT" }, + { Key::TargetInfo::NAME.c_str(), "", "target name", "STRING" }, + { Key::TargetInfo::SLITA.c_str(), "", "slit angle in deg", "FLOAT" }, + { Key::TargetInfo::POINTMODE.c_str(), "POINTMDE", "pointing mode", "STRING" }, + { Key::TargetInfo::RA.c_str(), "", "requested Right Ascension in J2000", "STRING" }, + { Key::TargetInfo::DECL.c_str(), "", "requested Declination in J2000", "STRING" } }; const Primary TcsInfoKeys[] = { diff --git a/common/message_keys.h b/common/message_keys.h index 328ef249..7863ed49 100644 --- a/common/message_keys.h +++ b/common/message_keys.h @@ -63,6 +63,17 @@ namespace Key { inline const std::string SHOULD_FINEACQUIRE = "should_fineacquire"; } + namespace TargetInfo { + inline const std::string OBS_ID = "OBS_ID"; + inline const std::string NAME = "NAME"; + inline const std::string SLITA = "SLITA"; + inline const std::string BINSPECT = "BINSPECT"; + inline const std::string BINSPAT = "BINSPAT"; + inline const std::string POINTMODE = "POINTMODE"; + inline const std::string RA = "RA"; + inline const std::string DECL = "DECL"; + } + namespace Camerad { inline const std::string READY = "ready"; inline const std::string SHUTTERTIME = "shuttime_sec"; diff --git a/sequencerd/sequence.cpp b/sequencerd/sequence.cpp index d0e10941..ad06af46 100644 --- a/sequencerd/sequence.cpp +++ b/sequencerd/sequence.cpp @@ -175,6 +175,7 @@ namespace Sequencer { this->publish_seqstate(); this->publish_waitstate(); this->publish_daemonstate(); + this->publish_targetinfo( true ); } /***** Sequencer::Sequence::publish_snapshot *******************************/ @@ -365,6 +366,7 @@ namespace Sequencer { // publish the structured seqstate topic // this->publish_seqstate(); + this->publish_targetinfo(); // targetinfo content is gated on seq_state (READY/RUNNING) this->cv.notify_all(); // emit a NOTICE on Topic::BROADCAST only when the lifecycle state has @@ -652,6 +654,8 @@ namespace Sequencer { this->thread_error_manager.set( THR_SEQUENCE_START ); // report any error break; } + + this->publish_targetinfo(); // publish the now-active target } else // targetstate not TARGET_FOUND if ( targetstate == TargetInfo::TARGET_NOT_FOUND ) { // no target found is an automatic stop @@ -3683,44 +3687,49 @@ namespace Sequencer { /***** Sequencer::Sequence::target_offset ***********************************/ - /***** Sequencer::Sequence::make_telemetry_message **************************/ + /***** Sequencer::Sequence::publish_targetinfo *****************************/ /** - * @brief assembles a telemetry message - * @details This creates a JSON message for my telemetry info, then serializes - * it into a std::string ready to be sent over a socket. - * @param[out] retstring string containing the serialization of the JSON message + * @brief publish target info on Topic::TARGETINFO, on change (or force) + * @details Builds a JSON message of the current target and publishes it + * only when it differs from the last published message, unless + * force is set. The message is empty unless seq state is + * READY or RUNNING. + * @param[in] force optional (default=false) publish irrespective of change * */ - void Sequence::make_telemetry_message( std::string &retstring ) { - // assemble the telemetry I want to report into a json message - // Set a messagetype keyword to indicate what kind of message this is. - // + void Sequence::publish_targetinfo( bool force ) { nlohmann::json jmessage; - jmessage["messagetype"] = "targetinfo"; + jmessage[Key::SOURCE] = Sequencer::DAEMON_NAME; - // fill telemetry message only when READY or RUNNING + // fill telemetry only when READY or RUNNING; otherwise an empty (no-target) message // if ( this->seq_state_manager.are_any_set( Sequencer::SEQ_READY, Sequencer::SEQ_RUNNING ) ) { - // Store unconfigured values as NAN. - // NAN values are not logged to the database. + // unconfigured values are stored as NAN // - jmessage["OBS_ID"] = this->target.obsid < 0 ? NAN : this->target.obsid; // OBSERVATION_ID - jmessage["NAME"] = this->target.name; // NAME - jmessage["SLITA"] = this->target.slitangle; // *OTMslitangle - jmessage["BINSPECT"] = this->target.binspect < 1 ? NAN : this->target.binspect; // *BINSPECT - jmessage["BINSPAT"] = this->target.binspat < 1 ? NAN : this->target.binspat; // *BINSPAT - jmessage["POINTMODE"] = this->target.pointmode; // *POINTMODE - jmessage["RA"] = this->target.ra_hms; // *RA - jmessage["DECL"] = this->target.dec_dms; // *DECL + jmessage[Key::TargetInfo::OBS_ID] = this->target.obsid < 0 ? NAN : this->target.obsid; + jmessage[Key::TargetInfo::NAME] = this->target.name; + jmessage[Key::TargetInfo::SLITA] = this->target.slitangle; + jmessage[Key::TargetInfo::BINSPECT] = this->target.binspect < 1 ? NAN : this->target.binspect; + jmessage[Key::TargetInfo::BINSPAT] = this->target.binspat < 1 ? NAN : this->target.binspat; + jmessage[Key::TargetInfo::POINTMODE] = this->target.pointmode; + jmessage[Key::TargetInfo::RA] = this->target.ra_hms; + jmessage[Key::TargetInfo::DECL] = this->target.dec_dms; } - retstring = jmessage.dump(); // serialize the json message into a string - - retstring.append(JEOF); // append JSON message terminator + // unless forced, only publish if the target info changed + // + if ( !force && jmessage == this->last_published_targetinfo ) return; + this->last_published_targetinfo = jmessage; - return; + try { + this->publisher->publish( jmessage, Topic::TARGETINFO ); + } + catch ( const std::exception &e ) { + logwrite( "Sequencer::Sequence::publish_targetinfo", + "ERROR publishing message: "+std::string(e.what()) ); + } } - /***** Sequencer::Sequence::make_telemetry_message **************************/ + /***** Sequencer::Sequence::publish_targetinfo *****************************/ /***** Sequencer::Sequence::dothread_test_fpoffset **************************/ diff --git a/sequencerd/sequence.h b/sequencerd/sequence.h index fac6b2cd..24cfd378 100644 --- a/sequencerd/sequence.h +++ b/sequencerd/sequence.h @@ -527,6 +527,7 @@ namespace Sequencer { Common::Broadcaster broadcast { this->publisher, Sequencer::DAEMON_NAME }; ///< logs and publishes a narrative message on Topic::BROADCAST std::string last_seqstate_str; ///< last seqstate string announced via broadcast_seqstate() (for change detection) + nlohmann::json last_published_targetinfo; ///< last published targetinfo (for change detection) uint32_t get_reqstate(); ///< get the reqstate word @@ -564,7 +565,7 @@ namespace Sequencer { long get_tcs_cass( double &cass ); long target_offset(); - void make_telemetry_message( std::string &retstring ); ///< assembles my telemetry message + void publish_targetinfo( bool force=false ); ///< publish target info on change (or force) long set_power_switch( PowerState state, const std::string which, std::chrono::seconds delay ); long check_power_switch( PowerState checkstate, const std::string which, bool &is_set ); diff --git a/sequencerd/sequencer_server.cpp b/sequencerd/sequencer_server.cpp index 0fc7aed5..997fa79b 100644 --- a/sequencerd/sequencer_server.cpp +++ b/sequencerd/sequencer_server.cpp @@ -1594,23 +1594,6 @@ namespace Sequencer { if ( ret != NO_ERROR ) logwrite(function, "ERROR: unable to load config file"); else ret = this->configure_sequencer(); } - else - - // send my telemetry upon request - // - if ( cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing my telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->sequence.make_telemetry_message( retstring ); - ret = JSON; - } - } - // Unknown commands generate an error // else { From ca90f196cb75bcdfb00ee98ec134bebaa63ee5fb Mon Sep 17 00:00:00 2001 From: David Hale Date: Sun, 24 May 2026 15:05:10 -0700 Subject: [PATCH 12/12] finish migrating telemetry from TELEMREQUEST pull to ZMQ pub/sub --- Config/acamd.cfg.in | 9 +- Config/camerad.cfg.in | 19 +-- Config/flexured.cfg.in | 9 +- Config/thermald.cfg.in | 9 +- acamd/acam_interface.cpp | 28 +++- acamd/acam_interface.h | 4 +- acamd/acam_server.cpp | 24 ---- acamd/acamd.cpp | 13 ++ camerad/astrocam.cpp | 26 ---- camerad/astrocam.h | 3 - camerad/camerad.cpp | 15 --- camerad/camerad.h | 24 ---- common/acamd_commands.h | 1 - common/calibd_commands.h | 4 +- common/common.cpp | 28 ---- common/common.h | 4 - common/flexured_commands.h | 1 - common/focusd_commands.h | 1 - common/powerd_commands.h | 1 - common/sequencerd_commands.h | 1 - common/slitd_commands.h | 3 +- common/tcsd_commands.h | 1 - common/thermald_commands.h | 1 - flexured/flexure_interface.cpp | 168 +++++++---------------- flexured/flexure_interface.h | 65 ++++++++- flexured/flexure_server.cpp | 68 ++++------ flexured/flexured.cpp | 12 ++ slicecamd/slicecam_interface.cpp | 33 +++++ slicecamd/slicecam_interface.h | 1 + slicecamd/slicecamd.cpp | 15 ++- slitd/slitd.cpp | 2 +- tcsd/tcs_server.cpp | 4 +- thermald/thermal_interface.cpp | 221 +++++++++++-------------------- thermald/thermal_interface.h | 49 ++++++- thermald/thermal_server.cpp | 84 ++++++------ thermald/thermald.cpp | 18 +++ 36 files changed, 433 insertions(+), 536 deletions(-) diff --git a/Config/acamd.cfg.in b/Config/acamd.cfg.in index ce354d8a..3007f567 100644 --- a/Config/acamd.cfg.in +++ b/Config/acamd.cfg.in @@ -169,14 +169,11 @@ ACQUIRE_TCS_MAX_OFFSET=60 # the maximum allowable offset sent to the TCS, in SKYSIM_IMAGE_SIZE=1024 # ----------------------------------------------------------------------------- -# TELEM_PROVIDER=( ) +# SUBSCRIBE_TO=( ) # -# This is a list of telemetry providers where is the daemon name, -# and is the port on which to send the telemetry request. +# This is a list of pub/sub sources to subscribe to, where is the daemon +# name and is its ZeroMQ publish endpoint. # Provide one per line. # -TELEM_PROVIDER=(tcsd @TCSD_NB_PORT@) -TELEM_PROVIDER=(sequencerd @SEQUENCERD_NB_PORT@) -# SUBSCRIBE_TO=(tcsd "tcp://127.0.0.1:@TCSD_PUB_PORT@") SUBSCRIBE_TO=(sequencerd "tcp://127.0.0.1:@SEQUENCERD_PUB_PORT@") diff --git a/Config/camerad.cfg.in b/Config/camerad.cfg.in index e90faefd..10ff478a 100644 --- a/Config/camerad.cfg.in +++ b/Config/camerad.cfg.in @@ -155,20 +155,11 @@ ACTIVATE_COMMANDS=(G PON, ERS 1000 1000, EPG 500, CLR) ACTIVATE_COMMANDS=(U PON, CLR) # ----------------------------------------------------------------------------- -# TELEM_PROVIDER=( ) -# -# This is a list of telemetry providers where is the daemon name, -# and is the port on which to send the telemetry request. These are the -# sources for telemetry information for FITS headers. Provide one per line. -# -TELEM_PROVIDER=(calibd @CALIBD_NB_PORT@) -TELEM_PROVIDER=(flexured @FLEXURED_NB_PORT@) -TELEM_PROVIDER=(focusd @FOCUSD_NB_PORT@) -TELEM_PROVIDER=(powerd @POWERD_NB_PORT@) -TELEM_PROVIDER=(sequencerd @SEQUENCERD_NB_PORT@) -TELEM_PROVIDER=(slitd @SLITD_NB_PORT@) -TELEM_PROVIDER=(tcsd @TCSD_NB_PORT@) -TELEM_PROVIDER=(thermald @THERMALD_NB_PORT@) +# SUBSCRIBE_TO=( ) +# +# This is a list of pub/sub sources to subscribe to, where is the daemon +# name and is its ZeroMQ publish endpoint. These are the sources for +# telemetry information for FITS headers. Provide one per line. # SUBSCRIBE_TO=(calibd "tcp://127.0.0.1:@CALIBD_PUB_PORT@") SUBSCRIBE_TO=(flexured "tcp://127.0.0.1:@FLEXURED_PUB_PORT@") diff --git a/Config/flexured.cfg.in b/Config/flexured.cfg.in index c71cbe23..5a0c19ca 100644 --- a/Config/flexured.cfg.in +++ b/Config/flexured.cfg.in @@ -61,14 +61,11 @@ MOTOR_AXIS="I 3 -1000 1000 0 na 300.0" # # ----------------------------------------------------------------------------- -# TELEM_PROVIDER=( ) +# SUBSCRIBE_TO=( ) # -# This is a list of telemetry providers where is the daemon name, -# and is the port on which to send the telemetry request. +# This is a list of pub/sub sources to subscribe to, where is the daemon +# name and is its ZeroMQ publish endpoint. # Provide one per line. # -TELEM_PROVIDER=(tcsd @TCSD_NB_PORT@) -TELEM_PROVIDER=(thermald @THERMALD_NB_PORT@) -# SUBSCRIBE_TO=(tcsd "tcp://127.0.0.1:@TCSD_PUB_PORT@") SUBSCRIBE_TO=(thermald "tcp://127.0.0.1:@THERMALD_PUB_PORT@") diff --git a/Config/thermald.cfg.in b/Config/thermald.cfg.in index 3fa71973..8b54556a 100644 --- a/Config/thermald.cfg.in +++ b/Config/thermald.cfg.in @@ -102,15 +102,12 @@ CAMP_CHAN="22 TFLEXCON_UR" CAMP_CHAN="23 TACAM" # ----------------------------------------------------------------------------- -# TELEM_PROVIDER=( ) +# SUBSCRIBE_TO=( ) # -# This is a list of telemetry providers where is the daemon name, -# and is the port on which to send the telemetry request. +# This is a list of pub/sub sources to subscribe to, where is the daemon +# name and is its ZeroMQ publish endpoint. # Provide one per line. # -TELEM_PROVIDER=(acamd @ACAMD_NB_PORT@) -TELEM_PROVIDER=(slicecamd @SLICECAMD_NB_PORT@) -# SUBSCRIBE_TO=(acamd "tcp://127.0.0.1:@ACAMD_PUB_PORT@") SUBSCRIBE_TO=(slicecamd "tcp://127.0.0.1:@SLICECAMD_PUB_PORT@") diff --git a/acamd/acam_interface.cpp b/acamd/acam_interface.cpp index 2b8a7fd4..2bd34d66 100644 --- a/acamd/acam_interface.cpp +++ b/acamd/acam_interface.cpp @@ -1446,7 +1446,7 @@ namespace Acam { "not_connected" ); try { - this->publisher->publish( jmessage_out, Topic::SNAPSHOT ); + this->publisher->publish( jmessage_out, Topic::ACAMD ); } catch ( const std::exception &e ) { logwrite( "Acam::Interface::publish_snapshot", @@ -1507,6 +1507,32 @@ namespace Acam { /***** Acam::Interface::publish_status **************************************/ + /***** Acam::Interface::publish_temperature ********************************/ + /** + * @brief publish only the andor CCD temperature on Topic::ACAMD + * @details Published on a fixed interval (see acamd.cpp), not on change, + * since the CCD temperature varies continuously. + * + */ + void Interface::publish_temperature() { + int ccdtemp=99; + this->camera.andor.get_temperature( ccdtemp ); // temp is int + nlohmann::json jmessage; + jmessage[Key::SOURCE] = Topic::ACAMD; + jmessage[Key::Acamd::TANDOR] = ( this->isopen("camera") ? + static_cast(ccdtemp) : // database wants float + NAN ); + try { + this->publisher->publish( jmessage, Topic::ACAMD ); + } + catch ( const std::exception &e ) { + logwrite( "Acam::Interface::publish_temperature", + "ERROR publishing message: "+std::string(e.what()) ); + } + } + /***** Acam::Interface::publish_temperature ********************************/ + + /***** Acam::Interface::request_snapshot ************************************/ /** * @brief [obsolete] publises request for snapshot diff --git a/acamd/acam_interface.h b/acamd/acam_interface.h index 04e29eb8..9793d863 100644 --- a/acamd/acam_interface.h +++ b/acamd/acam_interface.h @@ -540,8 +540,6 @@ namespace Acam { std::vector db_info; ///< info for constructing telemetry Database object - std::map telemetry_providers; ///< map of port[daemon_name] for external telemetry providers - struct { std::string tcsname; bool is_tcs_open; @@ -664,9 +662,9 @@ namespace Acam { long bin( std::string args, std::string &retstring ); void publish_snapshot(); void publish_status(bool force=false); + void publish_temperature(); ///< publish only the andor temperature on Topic::ACAMD (periodic) void request_snapshot(); bool wait_for_snapshots(); - long handle_json_message( std::string message_in ); long initialize_python_objects(); /// provides interface to initialize all Python modules for objects in this class long test_image(); /// long open( std::string args, std::string &help); /// wrapper to open all acam-related hardware components diff --git a/acamd/acam_server.cpp b/acamd/acam_server.cpp index 082cf6b5..2cf80001 100644 --- a/acamd/acam_server.cpp +++ b/acamd/acam_server.cpp @@ -365,30 +365,6 @@ namespace Acam { applied++; } - // TELEM_PROVIDER : contains daemon name and port to contact for header telemetry info - // (these don't get counted with "applied++") - // - if ( config.param[entry] == "TELEM_PROVIDER" ) { - std::vector tokens; - Tokenize( config.arg[entry], tokens, " " ); - try { - if ( tokens.size() == 2 ) { - this->interface.telemetry_providers[tokens.at(0)] = std::stod(tokens.at(1)); - } - else { - message.str(""); message << "ERROR bad format TELEM_PROVIDER=\"" << config.arg[entry] << "\": expected "; - logwrite( function, message.str() ); - return ERROR; - } - } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing TELEM_PROVIDER from " << config.arg[entry] << ": " << e.what(); - logwrite( function, message.str() ); - return ERROR; - } - message.str(""); message << "config:" << config.param[entry] << "=" << config.arg[entry]; - this->interface.async.enqueue_and_log( to_uppercase(DAEMON_NAME), function, message.str() ); - } } // end loop through the entries in the configuration file diff --git a/acamd/acamd.cpp b/acamd/acamd.cpp index 0bc34dcb..3664b7b8 100644 --- a/acamd/acamd.cpp +++ b/acamd/acamd.cpp @@ -185,6 +185,19 @@ int main(int argc, char **argv) { // publish snapshot of my telemetry so the world knows I'm online acamd.interface.publish_snapshot(); + std::this_thread::sleep_for( std::chrono::milliseconds(250) ); + acamd.interface.request_snapshot(); + + // publish the andor CCD temperature on a fixed 60-second interval + // (temperature varies continuously, so it is not published on change) + // + std::thread( []( Acam::Interface &iface ) { + while ( true ) { + iface.publish_temperature(); + std::this_thread::sleep_for( std::chrono::seconds(60) ); + } + }, std::ref(acamd.interface) ).detach(); + // This will pre-thread N_THREADS threads. // There will be N_THREADS-1 non-blocking threads, then // loop forever on Accept to dynamically spawn a new thread for each diff --git a/camerad/astrocam.cpp b/camerad/astrocam.cpp index b65cc729..963c27e6 100644 --- a/camerad/astrocam.cpp +++ b/camerad/astrocam.cpp @@ -3178,32 +3178,6 @@ namespace AstroCam { /***** AstroCam::Interface::do_expose ***************************************/ - /***** AstroCam::Interface::make_telemetry_message **************************/ - /** - * @brief assembles my telemetry message - * @details This creates a JSON message for my telemetry info, then serializes - * it into a std::string ready to be sent over a socket. - * @param[out] retstring string containing the serialization of the JSON message - * - */ - void Interface::make_telemetry_message( std::string &retstring ) { - // assemble the telemetry I want to report into a json message - // Set a messagetype keyword to indicate what kind of message this is. - // - nlohmann::json jmessage; - jmessage["messagetype"] = "camerainfo"; - - jmessage["SHUTTIME_SEC"] = this->camera.shutter.get_duration(); // shutter open time in sec - - retstring = jmessage.dump(); // serialize the json message into a string - - retstring.append(JEOF); // append JSON message terminator - - return; - } - /***** AstroCam::Interface::make_telemetry_message **************************/ - - diff --git a/camerad/astrocam.h b/camerad/astrocam.h index 3e948e78..10048284 100644 --- a/camerad/astrocam.h +++ b/camerad/astrocam.h @@ -1167,8 +1167,6 @@ std::vector> fitsinfo; std::map< std::string, readout_info_t > readout_source; //!< STL map of readout sources indexed by readout name - std::map telemetry_providers; //!< a map of port[daemon_name] for telemetry providers - // Functions // void get_logical(Controller* pcontroller, @@ -1237,7 +1235,6 @@ std::vector> fitsinfo; long expose(std::string nexp_in); long do_expose(int nexp_in); - void make_telemetry_message( std::string &retstring ); double get_live_airmass(); ///< latest airmass from cached tcsd telemetry, or NAN long native(std::string cmdstr); long native(std::string cmdstr, std::string &retstring); diff --git a/camerad/camerad.cpp b/camerad/camerad.cpp index 3c115610..e8c795cc 100644 --- a/camerad/camerad.cpp +++ b/camerad/camerad.cpp @@ -518,21 +518,6 @@ void doit(Network::TcpSocket &sock) { sock.Write( " " ); ret = NO_ERROR; } - // send telemetry as json message - // - if ( cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing my telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - server.make_telemetry_message( retstring ); - ret = JSON; - } - } - else if ( cmd == CAMERAD_OPEN ) { ret = server.connect_controller(args, retstring); } diff --git a/camerad/camerad.h b/camerad/camerad.h index 15618461..5d67c9fb 100644 --- a/camerad/camerad.h +++ b/camerad/camerad.h @@ -169,30 +169,6 @@ namespace Camera { applied++; } - // TELEM_PROVIDER : contains daemon name and port to contact for header telemetry info - // - if ( config.param[entry] == "TELEM_PROVIDER" ) { - std::vector tokens; - Tokenize( config.arg[entry], tokens, " " ); - try { - if ( tokens.size() == 2 ) { - this->telemetry_providers[tokens.at(0)] = std::stod(tokens.at(1)); - } - else { - message.str(""); message << "bad format \"" << config.arg[entry] << "\": expected "; - this->camera.log_error( function, message.str() ); - return ERROR; - } - } - catch ( const std::exception &e ) { - message.str(""); message << "parsing TELEM_PROVIDER from " << config.arg[entry] << ": " << e.what(); - this->camera.log_error( function, message.str() ); - return ERROR; - } - message.str(""); message << "config:" << config.param[entry] << "=" << config.arg[entry]; - this->camera.async.enqueue_and_log( "CAMERAD", function, message.str() ); - applied++; - } // ASYNCPORT if (config.param[entry].compare(0, 9, "ASYNCPORT")==0) { diff --git a/common/acamd_commands.h b/common/acamd_commands.h index ae7fd68b..2a22cb66 100644 --- a/common/acamd_commands.h +++ b/common/acamd_commands.h @@ -100,7 +100,6 @@ const std::vector ACAMD_SYNTAX = { ACAMD_OFFSETGOAL+" [ ? | ]", ACAMD_OFFSETPERIOD+" [ ? | ]", ACAMD_PUTONSLIT+" [ ? | ]", - TELEMREQUEST+" [ ? ]", ACAMD_SHUTDOWN+" [ ? ]", ACAMD_TEST+" ? | ..." }; diff --git a/common/calibd_commands.h b/common/calibd_commands.h index 88ca0e03..35129ad3 100644 --- a/common/calibd_commands.h +++ b/common/calibd_commands.h @@ -31,7 +31,5 @@ const std::vector CALIBD_SYNTAX = { CALIBD_NATIVE+" ", CALIBD_SET+" [ =open|close ... ] | [?]", " LAMP MODULATOR CONTROL", - CALIBD_LAMPMOD+" ? | open | close | reconnect | default | [ [ on|off ] | [ ] ]", - " OTHER", - TELEMREQUEST+" [ ? ]" + CALIBD_LAMPMOD+" ? | open | close | reconnect | default | [ [ on|off ] | [ ] ]" }; diff --git a/common/common.cpp b/common/common.cpp index ec54f431..0cd9ee22 100644 --- a/common/common.cpp +++ b/common/common.cpp @@ -44,34 +44,6 @@ namespace Common { /***** Common::Broadcaster::emit ********************************************/ - /***** Common::collect_telemetry ********************************************/ - /** - * @brief send the TELEMREQUEST command to daemon to get telemetry - * @param[in] provider pair contains <"provider", port> - * @param[out] retstring serialized string of json telemetry message - * - */ - void collect_telemetry(const std::pair &provider, std::string &retstring) { - // Instantiate a client to communicate with each daemon, - // constructed with no name, newline termination on command writes, - // and JEOF termination on reply reads. - // - Common::DaemonClient jclient("", "\n", JEOF ); - - // Send the command TELEMREQUEST to each daemon and read back the reply into - // retstring, which will be the serialized JSON telemetry message. - // - jclient.set_name(provider.first); - jclient.set_port(provider.second); - jclient.connect(); - jclient.command(TELEMREQUEST, retstring); - jclient.disconnect(); - - return; - } - /***** Common::collect_telemetry ********************************************/ - - /***** Common::extract_correlation_id ***************************************/ /** * @brief detect and strip a correlation ID prefix from an inter-daemon message diff --git a/common/common.h b/common/common.h index bee87cb4..692ecb5f 100644 --- a/common/common.h +++ b/common/common.h @@ -35,7 +35,6 @@ const long ABORT = 6; const long EXIT = 999; const std::string JEOF = "EOF\n"; ///< used to terminate JSON messages -const std::string TELEMREQUEST = "sendtelem"; ///< common daemon command used to request telemetry const std::string SNAPSHOT = "snapshot"; ///< common daemon command forces publish of telemetry const std::string CID_PREFIX = "#cid:"; ///< correlation ID marker for inter-daemon commands @@ -436,9 +435,6 @@ namespace Common { /**************** Common::Broadcaster ***************************************/ - void collect_telemetry(const std::pair &provider, std::string &retstring); - - /***** Common::extract_correlation_id ***************************************/ /** * @brief detect and strip a correlation ID prefix from an inter-daemon message diff --git a/common/flexured_commands.h b/common/flexured_commands.h index e3a9eb07..46b71935 100644 --- a/common/flexured_commands.h +++ b/common/flexured_commands.h @@ -27,7 +27,6 @@ const std::vector FLEXURED_SYNTAX = { FLEXURED_NATIVE+" ? | ", FLEXURED_OPEN, FLEXURED_SET+" ? | ", - TELEMREQUEST+" [ ? ]", FLEXURED_TEST+" ? | ...", " motormap", " posmap", diff --git a/common/focusd_commands.h b/common/focusd_commands.h index c6647ef9..05456b32 100644 --- a/common/focusd_commands.h +++ b/common/focusd_commands.h @@ -29,7 +29,6 @@ const std::vector FOCUSD_SYNTAX = { FOCUSD_NATIVE+" ? | ", FOCUSD_OPEN, FOCUSD_SET+" ? | { | nominal }", - TELEMREQUEST+" [ ? ]", FOCUSD_TEST+" ...", " motormap", " posmap", diff --git a/common/powerd_commands.h b/common/powerd_commands.h index da8ef3fa..ae45aa67 100644 --- a/common/powerd_commands.h +++ b/common/powerd_commands.h @@ -21,7 +21,6 @@ const std::vector POWERD_SYNTAX = { POWERD_OPEN, POWERD_REOPEN+" [?]", POWERD_STATUS+" [?]", - TELEMREQUEST+" [?]", "", " [ ON | OFF | BOOT ]", " [ ON | OFF | BOOT ]" diff --git a/common/sequencerd_commands.h b/common/sequencerd_commands.h index e42d9202..b69dce30 100644 --- a/common/sequencerd_commands.h +++ b/common/sequencerd_commands.h @@ -59,7 +59,6 @@ const std::vector SEQUENCERD_SYNTAX = { SEQUENCERD_PAUSE, SEQUENCERD_REPEAT, SEQUENCERD_RESUME, - TELEMREQUEST+" [?]", SEQUENCERD_USERCONTINUE, SEQUENCERD_SHUTDOWN, SEQUENCERD_START, diff --git a/common/slitd_commands.h b/common/slitd_commands.h index ffe3b121..d892dea0 100644 --- a/common/slitd_commands.h +++ b/common/slitd_commands.h @@ -27,8 +27,7 @@ const std::vector SLITD_SYNTAX = { SLITD_NATIVE+" ? | [ ]", SLITD_OFFSET+" ? | ", SLITD_OPEN+" [ ? ]", - SLITD_SET+" ? | [ ]", - TELEMREQUEST+" [?]" + SLITD_SET+" ? | [ ]" }; #endif diff --git a/common/tcsd_commands.h b/common/tcsd_commands.h index 01fa1f85..a34968c6 100644 --- a/common/tcsd_commands.h +++ b/common/tcsd_commands.h @@ -56,7 +56,6 @@ const std::vector TCSD_SYNTAX = { TCSD_RETOFFSETS+" [ ? ]", TCSD_RINGGO+" ? | ", TCSD_SET_FOCUS+" ? | ", - TELEMREQUEST+" [ ? ]", TCSD_WEATHER_COORDS+" [ ? ]", TCSD_ZERO_OFFSETS+" [ ? ]" }; diff --git a/common/thermald_commands.h b/common/thermald_commands.h index 136c837a..00161418 100644 --- a/common/thermald_commands.h +++ b/common/thermald_commands.h @@ -25,7 +25,6 @@ const std::vector THERMALD_SYNTAX = { THERMALD_NATIVE+" [ ? | [] ]", THERMALD_PRINTLABELS+" [?]", THERMALD_RECONNECT+" [ ? ]", - TELEMREQUEST+" [?]", THERMALD_SETPOINT+" [ ? | [ ] ]", THERMALD_SHOWTELEM+" [ ? | force ]", THERMALD_TELEMETRY+" ? | start | stop | status " diff --git a/flexured/flexure_interface.cpp b/flexured/flexure_interface.cpp index 626eaa9c..5ee67ffe 100644 --- a/flexured/flexure_interface.cpp +++ b/flexured/flexure_interface.cpp @@ -322,10 +322,6 @@ namespace Flexure { return HELP; } - // get the needed telemetry (telescope position and temperatures) - // - this->get_external_telemetry(); - // perform the calculations // retstring="not_yet_implemented"; @@ -390,31 +386,19 @@ namespace Flexure { /***** Flexure::Interface::send_command *************************************/ - /***** Flexure::Interface::make_telemetry_message ***************************/ + /***** Flexure::Interface::get_status **************************************/ /** - * @brief assembles a telemetry message - * @details This creates a JSON message for telemetry info, then serializes - * it into a std::string ready to be sent over a socket. - * @param[out] retstring string containing the serialization of the JSON message + * @brief read all flexure actuator positions into status * */ - void Interface::make_telemetry_message( std::string &retstring ) { - const std::string function="Flexure::Interface::make_telemetry_message"; + void Interface::get_status() { + const std::string function="Flexure::Interface::get_status"; std::stringstream message; - // assemble the telemetry into a json message - // Set a messagetype keyword to indicate what kind of message this is. - // - nlohmann::json jmessage; - jmessage["messagetype"]="flexureinfo"; + this->status.positions.clear(); - // get all flexure actuator positions - // auto _motormap = this->motorinterface.get_motormap(); - - // loop through all motors in motormap for ( const auto &mot : _motormap ) { - // loop through all axes for each motor for ( const auto &axis : mot.second.axes ) { auto chan = mot.second.name; auto addr = mot.second.addr; @@ -424,130 +408,70 @@ namespace Flexure { this->motorinterface.get_pos( chan, axis.second.axisnum, addr, position, posname ); switch ( axis.second.axisnum ) { case 1 : key = "FLXPIS_" + chan; break; - case 2: key = "FLXSPE_" + chan; break; - case 3: key = "FLXSPA_" + chan; break; - default: key = "error"; - message.str(""); message << "ERROR unknown axis " << axis.second.axisnum; + case 2 : key = "FLXSPE_" + chan; break; + case 3 : key = "FLXSPA_" + chan; break; + default: message.str(""); message << "ERROR unknown axis " << axis.second.axisnum; logwrite( function, message.str() ); + continue; } - - // assign the position or NaN to a key in the JSON jmessage - // - if ( !std::isnan(position) ) jmessage[key]=position; else jmessage[key]="NAN"; + this->status.positions[key] = position; } } - - retstring = jmessage.dump(); // serialize the json message into retstring - - retstring.append(JEOF); // append the JSON message terminator - - return; } - /***** Flexure::Interface::make_telemetry_message ***************************/ + /***** Flexure::Interface::get_status **************************************/ - /***** Flexure::Interface::get_external_telemetry ***************************/ + /***** Flexure::Interface::publish_status **********************************/ /** - * @brief collect telemetry from another daemon - * @details This is used for any telemetry that I need to collect from - * another daemon. Send the command "sendtelem" to the daemon, which - * will respond with a JSON message. The daemon(s) to contact - * are configured with the TELEM_PROVIDER key in the config file. + * @brief publish flexure state, but only if it changed (or forced) + * @param[in] force optional (default=false) publish irrespective of change * */ - void Interface::get_external_telemetry() { + void Interface::publish_status( bool force ) { - // Loop through each configured telemetry provider. This requests - // their telemetry which is returned as a serialized json string - // held in retstring. + // refresh current state from hardware // - // handle_json_message() will parse the serialized json string. + this->get_status(); + + // unless forced, only publish if the state changed // - std::string retstring; - for ( const auto &provider : this->telemetry_providers ) { - Common::collect_telemetry( provider, retstring ); - handle_json_message(retstring); + if ( !force && this->status == this->last_published_status ) return; + + nlohmann::json jmessage; + jmessage[Key::SOURCE] = Topic::FLEXURED; + for ( const auto &[key,pos] : this->status.positions ) { + if ( !std::isnan(pos) ) jmessage[key] = pos; else jmessage[key] = "NAN"; + } + + this->last_published_status = this->status; + + try { + this->publisher->publish( jmessage, Topic::FLEXURED ); + } + catch( const std::exception &e ) { + logwrite( "Flexure::Interface::publish_status", + "ERROR publishing message: "+std::string(e.what()) ); } - return; } - /***** Flexure::Interface::get_external_telemetry ***************************/ + /***** Flexure::Interface::publish_status **********************************/ - /***** Flexure::Interface::handle_json_message ******************************/ + /***** Flexure::Interface::handletopic_snapshot ****************************/ /** - * @brief parses incoming telemetry messages - * @details Requesting telemetry from another daemon returns a serialized - * JSON message which needs to be passed in here to parse it. - * @param[in] message_in incoming serialized JSON message (as a string) - * @return ERROR | NO_ERROR + * @brief respond to a snapshot request by publishing my status + * @param[in] jmessage subscribed-received JSON message * */ - long Interface::handle_json_message( std::string message_in ) { - const std::string function="Flexure::Interface::handle_json_message"; - std::stringstream message; - - try { - nlohmann::json jmessage = nlohmann::json::parse( message_in ); - std::string messagetype; - - // jmessage must not contain key "error" and must contain key "messagetype" - // - if ( !jmessage.contains("error") ) { - if ( jmessage.contains("messagetype") && jmessage["messagetype"].is_string() ) { - messagetype = jmessage["messagetype"]; - } - else { - logwrite( function, "ERROR received JSON message with missing or invalid messagetype" ); - return ERROR; - } - } - else { - logwrite( function, "ERROR in JSON message" ); - return ERROR; - } - - // no errors, so disseminate the message contents based on the message type - // - if ( messagetype == "thermalinfo" ) { - double TCOLL_I=NAN, TCOLL_R=NAN, TCOLL_G=NAN, TCOLL_U=NAN; - Common::extract_telemetry_value( message_in, "TCOLL_I", TCOLL_I ); - Common::extract_telemetry_value( message_in, "TCOLL_R", TCOLL_R ); - Common::extract_telemetry_value( message_in, "TCOLL_G", TCOLL_G ); - Common::extract_telemetry_value( message_in, "TCOLL_U", TCOLL_U ); - message.str(""); message << "TCOLL_I=" << TCOLL_I << " TCOLL_R=" << TCOLL_R << " TCOLL_G=" << TCOLL_G << " TCOLL_U=" << TCOLL_U; - logwrite( function, message.str() ); - } - else - if ( messagetype == "tcsinfo" ) { - double casangle=NAN, alt=NAN; - Common::extract_telemetry_value( message_in, Key::Tcsd::CASANGLE, casangle ); - Common::extract_telemetry_value( message_in, Key::Tcsd::ALT, alt ); - message.str(""); message << "casangle=" << casangle << " alt=" << alt; - logwrite( function, message.str() ); - } - else - if ( messagetype == "test" ) { - } - else { - message.str(""); message << "ERROR received unhandled JSON message type \"" << messagetype << "\""; - logwrite( function, message.str() ); - return ERROR; - } - } - catch ( const nlohmann::json::parse_error &e ) { - message.str(""); message << "ERROR json exception parsing message: " << e.what(); - logwrite( function, message.str() ); - return ERROR; + void Interface::handletopic_snapshot( const nlohmann::json &jmessage ) { + if ( jmessage.contains( Topic::FLEXURED ) ) { + this->publish_status(); } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing message: " << e.what(); - logwrite( function, message.str() ); - return ERROR; + else + if ( jmessage.contains( "test" ) ) { + logwrite( "Flexure::Interface::handletopic_snapshot", jmessage.dump() ); } - - return NO_ERROR; } - /***** Flexure::Interface::handle_json_message ******************************/ + /***** Flexure::Interface::handletopic_snapshot ****************************/ /***** Flexure::Interface::test *********************************************/ diff --git a/flexured/flexure_interface.h b/flexured/flexure_interface.h index 291f22e9..18777bfe 100644 --- a/flexured/flexure_interface.h +++ b/flexured/flexure_interface.h @@ -17,6 +17,7 @@ #include #include #include +#include #define FLEXURE_MOVE_TIMEOUT 1000 ///< timeout in msec for moves #define FLEXURE_POSNAME_TOLERANCE 0.0001 ///< tolerance to determine posname from position @@ -42,13 +43,68 @@ namespace Flexure { */ class Interface { private: + zmqpp::context context; size_t numdev; bool class_initialized; - public: - Interface() : numdev(-1), motorinterface( FLEXURE_MOVE_TIMEOUT, 0, FLEXURE_POSNAME_TOLERANCE ) {} + /** + * @struct Status + * @brief published flexure state: actuator position (um) by FLX_ key; NaN if unavailable + */ + struct Status { + std::map positions; + bool operator==(const Status &o) const { + if ( positions.size() != o.positions.size() ) return false; + for ( const auto &[k,v] : positions ) { + auto it = o.positions.find(k); + if ( it == o.positions.end() ) return false; + if ( std::isnan(v) && std::isnan(it->second) ) continue; // NaN==NaN treated equal + if ( v != it->second ) return false; + } + return true; + } + bool operator!=(const Status &o) const { return !(*this == o); } + }; + Status status; ///< current flexure state + Status last_published_status; ///< last published flexure state + + public: - std::map telemetry_providers; ///< map of port[daemon_name] for external telemetry providers + Interface() + : context(), + numdev(-1), + subscriber(std::make_unique(context, Common::PubSub::Mode::SUB)), + is_subscriber_thread_running(false), + should_subscriber_thread_run(false), + motorinterface( FLEXURE_MOVE_TIMEOUT, 0, FLEXURE_POSNAME_TOLERANCE ) + { + topic_handlers = { + { Topic::SNAPSHOT, std::function( + [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) } + }; + } + + std::unique_ptr publisher; ///< publisher object + std::string publisher_address; ///< publish socket endpoint + std::string publisher_topic; ///< my default topic for publishing + std::unique_ptr subscriber; ///< subscriber object + std::string subscriber_address; ///< subscribe socket endpoint + std::vector subscriber_topics; ///< list of topics I subscribe to + std::atomic is_subscriber_thread_running; ///< is my subscriber thread running? + std::atomic should_subscriber_thread_run; ///< should my subscriber thread run? + std::unordered_map> topic_handlers; + ///< maps a handler function to each topic + + long init_pubsub(const std::initializer_list &topics={}) { + return Common::PubSubHandler::init_pubsub(context, *this, topics); + } + void start_subscriber_thread() { Common::PubSubHandler::start_subscriber_thread(*this); } + void stop_subscriber_thread() { Common::PubSubHandler::stop_subscriber_thread(*this); } + + void handletopic_snapshot( const nlohmann::json &jmessage ); ///< respond to a snapshot request + void get_status(); ///< refresh status from hardware + void publish_status( bool force=false ); ///< publish flexure state on change (or force) Common::Queue async; @@ -68,9 +124,6 @@ namespace Flexure { long stop(); ///< send the stop-all-motion command to all controllers long send_command( const std::string &name, std::string cmd ); ///< writes the raw command as received to the master controller, no reply long send_command( const std::string &name, std::string cmd, std::string &retstring ); ///< writes command?, reads reply - void make_telemetry_message( std::string &retstring ); ///< assembles a telemetry message - void get_external_telemetry(); ///< collect telemetry from other daemon(s) - long handle_json_message( std::string message_in ); ///< parses incoming telemetry messages long test( std::string args, std::string &retstring ); ///< test routines std::mutex pi_mutex; ///< mutex to protect multi-threaded access to PI controller diff --git a/flexured/flexure_server.cpp b/flexured/flexure_server.cpp index e3be7f59..4bcf3156 100644 --- a/flexured/flexure_server.cpp +++ b/flexured/flexure_server.cpp @@ -17,6 +17,9 @@ namespace Flexure { */ void Server::exit_cleanly(void) { std::string function = "Flexure::Server::exit_cleanly"; + + this->interface.stop_subscriber_thread(); + logwrite( function, "exiting" ); exit(EXIT_SUCCESS); @@ -116,6 +119,28 @@ namespace Flexure { applied++; } + // PUB_ENDPOINT -- my ZeroMQ socket endpoint for publishing telemetry + // SUB_ENDPOINT -- the broker endpoint I subscribe to (for snapshot requests) + // + // NOTE: these two keys must be present in the flexured config file for + // publishing to work. Without PUB_ENDPOINT, init_pubsub() fails and + // no telemetry is published on Topic::FLEXURED. + // + if ( config.param[entry] == "PUB_ENDPOINT" ) { + this->interface.publisher_address = config.arg[entry]; + this->interface.publisher_topic = DAEMON_NAME; // default publish topic is my name + message.str(""); message << "FLEXURED:config:" << config.param[entry] << "=" << config.arg[entry]; + this->interface.async.enqueue_and_log( function, message.str() ); + applied++; + } + + if ( config.param[entry] == "SUB_ENDPOINT" ) { + this->interface.subscriber_address = config.arg[entry]; + message.str(""); message << "FLEXURED:config:" << config.param[entry] << "=" << config.arg[entry]; + this->interface.async.enqueue_and_log( function, message.str() ); + applied++; + } + // MOTOR_CONTROLLER -- address and name of each PI motor controller in daisy-chain // Each CONTROLLER is stored in an STL map indexed by motorname // @@ -161,31 +186,6 @@ namespace Flexure { } } - // TELEM_PROVIDER : contains daemon name and port to contact for header telemetry info - // - if ( config.param[entry] == "TELEM_PROVIDER" ) { - std::vector tokens; - Tokenize( config.arg[entry], tokens, " " ); - try { - if ( tokens.size() == 2 ) { - this->interface.telemetry_providers[tokens.at(0)] = std::stod(tokens.at(1)); - } - else { - message.str(""); message << "ERROR bad format TELEM_PROVIDER=\"" << config.arg[entry] << "\": expected "; - logwrite( function, message.str() ); - return ERROR; - } - } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing TELEM_PROVIDER from " << config.arg[entry] << ": " << e.what(); - logwrite( function, message.str() ); - return ERROR; - } - message.str(""); message << "config:" << config.param[entry] << "=" << config.arg[entry]; - this->interface.async.enqueue_and_log( to_uppercase(DAEMON_NAME), function, message.str() ); - applied++; - } - } // end loop through the entries in the configuration file message.str(""); @@ -543,22 +543,6 @@ namespace Flexure { } else - // send telemetry upon request - // - if ( cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->interface.make_telemetry_message( retstring ); - ret = JSON; - } - } - else - // test routines // if ( cmd == FLEXURED_TEST ) { @@ -611,6 +595,8 @@ namespace Flexure { if ( sock.Write( retstring ) < 0 ) connection_open=false; } + if ( ret==NO_ERROR ) this->interface.publish_status(); + if (!sock.isblocking()) break; // Non-blocking connection exits immediately. // Keep blocking connection open for interactive session. } diff --git a/flexured/flexured.cpp b/flexured/flexured.cpp index f78cefb6..8ee6fd6a 100644 --- a/flexured/flexured.cpp +++ b/flexured/flexured.cpp @@ -122,6 +122,18 @@ int main(int argc, char **argv) { flexured.exit_cleanly(); } + // initialize the pub/sub handler + // + if ( flexured.interface.init_pubsub() == ERROR ) { + logwrite(function, "ERROR initializing publisher-subscriber handler"); + flexured.exit_cleanly(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + + // publish current state so the world knows I'm online + // + flexured.interface.publish_status( true ); + // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. // Each thread gets a socket object. All of the socket objects are stored in a vector container. diff --git a/slicecamd/slicecam_interface.cpp b/slicecamd/slicecam_interface.cpp index 8ed2ea8a..fbd689a6 100644 --- a/slicecamd/slicecam_interface.cpp +++ b/slicecamd/slicecam_interface.cpp @@ -639,6 +639,39 @@ namespace Slicecam { /***** Slicecam::Interface::publish_snapshot ********************************/ + /***** Slicecam::Interface::publish_temperature *****************************/ + /** + * @brief publish only the andor CCD temperatures on Topic::SLICECAMD + * @details Published on a fixed interval (see slicecamd.cpp), not on + * change, since the CCD temperature varies continuously. + * + */ + void Interface::publish_temperature() { + nlohmann::json jmessage; + jmessage[Key::SOURCE] = Topic::SLICECAMD; + + for ( const auto &[name, cam] : this->camera.andor ) { + std::string key="TANDOR_SCAM_"+name; + if ( cam->is_open() ) { + int ccdtemp=99; + cam->get_temperature(ccdtemp); + jmessage[key] = static_cast(ccdtemp); // the database wants a float + } + else { + jmessage[key] = NAN; + } + } + try { + this->publisher->publish( jmessage, Topic::SLICECAMD ); + } + catch ( const std::exception &e ) { + logwrite( "Slicecam::Interface::publish_temperature", + "ERROR publishing message: "+std::string(e.what()) ); + } + } + /***** Slicecam::Interface::publish_temperature *****************************/ + + /***** Slicecam::Interface::request_snapshot ********************************/ /** * @brief sends request for snapshot diff --git a/slicecamd/slicecam_interface.h b/slicecamd/slicecam_interface.h index 0785aa84..c6a3d21a 100644 --- a/slicecamd/slicecam_interface.h +++ b/slicecamd/slicecam_interface.h @@ -263,6 +263,7 @@ namespace Slicecam { void handletopic_tcsd( const nlohmann::json &jmessage ); void publish_status(bool force=false); void publish_snapshot(); + void publish_temperature(); ///< publish only the andor temperatures on Topic::SLICECAMD (periodic) void request_snapshot(); bool wait_for_snapshots(); diff --git a/slicecamd/slicecamd.cpp b/slicecamd/slicecamd.cpp index 82279803..e4aa0006 100644 --- a/slicecamd/slicecamd.cpp +++ b/slicecamd/slicecamd.cpp @@ -153,9 +153,22 @@ int main(int argc, char **argv) { slicecamd.exit_cleanly(); } - std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + std::this_thread::sleep_for( std::chrono::milliseconds(250) ); + slicecamd.interface.publish_snapshot(); + + std::this_thread::sleep_for( std::chrono::milliseconds(250) ); slicecamd.interface.request_snapshot(); + // publish the andor CCD temperatures on a fixed 60-second interval + // (temperature varies continuously, so it is not published on change) + // + std::thread( []( Slicecam::Interface &iface ) { + while ( true ) { + iface.publish_temperature(); + std::this_thread::sleep_for( std::chrono::seconds(60) ); + } + }, std::ref(slicecamd.interface) ).detach(); + // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. // Each thread gets a socket object. All of the socket objects are stored in a vector container. diff --git a/slitd/slitd.cpp b/slitd/slitd.cpp index 4fd6a1fb..9b215537 100644 --- a/slitd/slitd.cpp +++ b/slitd/slitd.cpp @@ -126,7 +126,7 @@ int main(int argc, char **argv) { slitd.exit_cleanly(); } - std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + std::this_thread::sleep_for( std::chrono::milliseconds(250) ); slitd.interface.publish_status(true); // This will pre-thread N_THREADS threads. diff --git a/tcsd/tcs_server.cpp b/tcsd/tcs_server.cpp index 11ff5713..751cf972 100644 --- a/tcsd/tcs_server.cpp +++ b/tcsd/tcs_server.cpp @@ -800,9 +800,9 @@ void doit(TcsIO &tcs_io, const std::string &client_cmd, bool is_slow_command) { ret = this->interface.native( args, retstring ); } else - if ( cmd == SNAPSHOT || cmd == TELEMREQUEST ) { + if ( cmd == SNAPSHOT ) { if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; + retstring=SNAPSHOT+"\n"; retstring.append( " Returns a serialized JSON message containing telemetry\n" ); retstring.append( " information, terminated with \"EOF\\n\".\n" ); ret=HELP; diff --git a/thermald/thermal_interface.cpp b/thermald/thermal_interface.cpp index 9f59a09c..e144c7f1 100644 --- a/thermald/thermal_interface.cpp +++ b/thermald/thermal_interface.cpp @@ -12,180 +12,114 @@ namespace Thermal { - /***** Thermal::Interface::make_telemetry_message ***************************/ + + + /***** Thermal::Interface::publish_status **********************************/ /** - * @brief assembles a telemetry message - * @details This creates a JSON message for telemetry info, then serializes - * it into a std::string ready to be sent over a socket so that - * outside clients can ask for my telemetry. - * @param[out] retstring string containing the serialization of the JSON message + * @brief publish a thermalinfo snapshot on Topic::THERMALD + * @details Publishes the current merged telemdata (Lakeshore + Campbell + + * external) as float values, blocking NaNs. Called periodically + * from the telemetry loop and on snapshot request. * */ - void Interface::make_telemetry_message( std::string &retstring ) { - - // read the data only if the maps are empty - // - if ( this->lakeshoredata.empty() ) this->lakeshore_readall(); - if ( this->campbell.datamap.empty() ) this->campbell.read_data(); - - // assemble the telemetry into a json message - // Set a messagetype keyword to indicate what kind of message this is. - // + void Interface::publish_status() { nlohmann::json jmessage; - jmessage["messagetype"] = "thermalinfo"; + jmessage[Key::SOURCE] = Topic::THERMALD; - // Loop through the two datamaps, campbell.datamap and lakeshoredata + // copy the latest readings under lock, then build the message from the copy // - try { - - // Make a copy of telemdata which contains all the latest readings - // - auto showdata = this->telemdata; - - // If that is empty, or the arg is "force" then read all sensors now - // - if ( showdata.empty() ) { - this->get_external_telemetry(); - this->lakeshore_readall(); - this->campbell.read_data(); - showdata.merge( this->externaldata ); - showdata.merge( this->campbell.datamap ); - showdata.merge( this->lakeshoredata ); - } - - // Now loop through that map and if the value is a float then - // add it to the jmessage (this blocks NANs). - // - for ( const auto &[key,val] : showdata ) { - if ( val.getType() == mysqlx::Value::FLOAT ) { - jmessage[key] = val.get(); - } - } + std::map showdata; + { + std::lock_guard lock( this->telemdata_mtx ); + showdata = this->telemdata; + } - retstring = jmessage.dump(); // serialize the json message into retstring + for ( const auto &[key,val] : showdata ) { + if ( val.getType() == mysqlx::Value::FLOAT ) jmessage[key] = val.get(); + } - retstring.append(JEOF); // append the JSON message terminator + try { + this->publisher->publish( jmessage, Topic::THERMALD ); } catch( const std::exception &e ) { - logwrite( "Thermal::Interface::make_telemetry_message", - "ERROR assembling telemetry message: "+std::string(e.what()) ); + logwrite( "Thermal::Interface::publish_status", + "ERROR publishing message: "+std::string(e.what()) ); } - - return; } - /***** Thermal::Interface::make_telemetry_message ***************************/ + /***** Thermal::Interface::publish_status **********************************/ - /***** Thermal::Interface::get_external_telemetry ***************************/ + /***** Thermal::Interface::handletopic_snapshot ****************************/ /** - * @brief collect telemetry from another daemon - * @details This is used for any telemetry that I need to collect from - * another daemon. Send the command "sendtelem" to the daemon, which - * will respond with a JSON message. The daemon(s) to contact - * are configured with the TELEM_PROVIDER key in the config file. + * @brief respond to a snapshot request by publishing my status + * @param[in] jmessage subscribed-received JSON message * */ - void Interface::get_external_telemetry() { + void Interface::handletopic_snapshot( const nlohmann::json &jmessage ) { + if ( jmessage.contains( Topic::THERMALD ) ) { + this->publish_status(); + } + else + if ( jmessage.contains( "test" ) ) { + logwrite( "Thermal::Interface::handletopic_snapshot", jmessage.dump() ); + } + } + /***** Thermal::Interface::handletopic_snapshot ****************************/ - // protects externaldata from simultaneous access - // - std::lock_guard lock( this->externaldata_mtx ); - // clear the external telemetry map - // any external telemetry collected here gets put into this - // map by handle_json_message() - // - this->externaldata.clear(); + /***** Thermal::Interface::handletopic_acamd ******************************/ + /** + * @brief stash the acam CCD temperature into externaldata + * @param[in] jmessage subscribed-received JSON message on Topic::ACAMD + * + */ + void Interface::handletopic_acamd( const nlohmann::json &jmessage ) { + std::lock_guard lock( this->externaldata_mtx ); + this->process_key( jmessage, Key::Acamd::TANDOR ); + } + /***** Thermal::Interface::handletopic_acamd ******************************/ - // Loop through each configured telemetry provider. This requests - // their telemetry which is returned as a serialized json string - // held in retstring. - // - // handle_json_message() will parse the serialized json string. - // - std::string retstring; -/*** - for ( const auto &provider : this->telemetry_providers ) { - Common::collect_telemetry( provider, retstring ); - if ( !retstring.empty() ) handle_json_message(retstring); - } -***/ - return; + /***** Thermal::Interface::handletopic_slicecamd **************************/ + /** + * @brief stash the slicecam CCD temperatures into externaldata + * @param[in] jmessage subscribed-received JSON message on Topic::SLICECAMD + * + */ + void Interface::handletopic_slicecamd( const nlohmann::json &jmessage ) { + std::lock_guard lock( this->externaldata_mtx ); + this->process_key( jmessage, "TANDOR_SCAM_L" ); + this->process_key( jmessage, "TANDOR_SCAM_R" ); } - /***** Thermal::Interface::get_external_telemetry ***************************/ + /***** Thermal::Interface::handletopic_slicecamd **************************/ - /***** Thermal::Interface::handle_json_message ******************************/ + /***** Thermal::Interface::request_snapshot *******************************/ /** - * @brief parses incoming telemetry messages - * @details The Interface::get_external_telemetry() will receive telemetry - * from another daemon in a JSON message. Pass that message - * to this function to parse it. The process_key() function - * verifies the key before storing it in the externaldata map. - * @param[in] message_in incoming JSON message - * @return ERROR | NO_ERROR + * @brief ask subscribed daemons to re-publish their current status + * @details Publishes a SNAPSHOT request naming each daemon whose topic + * this daemon subscribes to. Each named daemon responds by + * publishing its own status, ensuring current telemetry is + * received even if the daemon published before I subscribed. * */ - long Interface::handle_json_message( std::string message_in ) { - const std::string function="Thermal::Interface::handle_json_message"; - std::stringstream message; - + void Interface::request_snapshot() { + nlohmann::json jmessage; + jmessage[Topic::ACAMD] = true; + jmessage[Topic::SLICECAMD] = true; try { - nlohmann::json jmessage = nlohmann::json::parse( message_in ); - std::string messagetype; - - // jmessage must not contain key "error" and must contain key "messagetype" - // - if ( !jmessage.contains("error") ) { - if ( jmessage.contains("messagetype") ) { - messagetype = jmessage["messagetype"]; - } - else { - logwrite( function, "ERROR received JSON message with no messagetype" ); - return ERROR; - } - } - else { - logwrite( function, "ERROR in JSON message" ); - return ERROR; - } - - // no errors, so disseminate the message contents based on the message type - // - if ( messagetype == "acaminfo" ) { - this->process_key( jmessage, Key::Acamd::TANDOR ); - } - else - if ( messagetype == "slicecaminfo" ) { - this->process_key( jmessage, "TANDOR_SCAM_L" ); - this->process_key( jmessage, "TANDOR_SCAM_R" ); - } - else - if ( messagetype == "test" ) { - message.str(""); message << "received JSON test message: \"" << jmessage["test"].get() << "\""; - logwrite( function, message.str() ); - } - else { - message.str(""); message << "ERROR received unhandled JSON message type \"" << messagetype << "\""; - logwrite( function, message.str() ); - return ERROR; - } - } - catch ( const nlohmann::json::parse_error &e ) { - message.str(""); message << "ERROR json exception parsing message: " << e.what(); - logwrite( function, message.str() ); - return ERROR; + this->publisher->publish( jmessage, Topic::SNAPSHOT ); } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing message: " << e.what(); - logwrite( function, message.str() ); - return ERROR; + catch( const std::exception &e ) { + logwrite( "Thermal::Interface::request_snapshot", + "ERROR publishing message: "+std::string(e.what()) ); } - - return NO_ERROR; } - /***** Thermal::Interface::handle_json_message ******************************/ + /***** Thermal::Interface::request_snapshot *******************************/ + + + + /***** Thermal::Interface::open_campbell ***********************************/ @@ -482,7 +416,6 @@ namespace Thermal { // If that is empty, or the arg is "force" then read all sensors now // if ( args=="force" || showdata.empty() ) { - this->get_external_telemetry(); this->lakeshore_readall(); this->campbell.read_data(); showdata.merge( this->externaldata ); diff --git a/thermald/thermal_interface.h b/thermald/thermal_interface.h index 9d3d0c8d..21dfecc5 100644 --- a/thermald/thermal_interface.h +++ b/thermald/thermal_interface.h @@ -110,15 +110,56 @@ namespace Thermal { * */ class Interface { + private: + zmqpp::context context; + public: + Interface() + : context(), + subscriber(std::make_unique(context, Common::PubSub::Mode::SUB)), + is_subscriber_thread_running(false), + should_subscriber_thread_run(false) + { + topic_handlers = { + { Topic::SNAPSHOT, std::function( + [this](const nlohmann::json &msg) { handletopic_snapshot(msg); } ) }, + { Topic::ACAMD, std::function( + [this](const nlohmann::json &msg) { handletopic_acamd(msg); } ) }, + { Topic::SLICECAMD, std::function( + [this](const nlohmann::json &msg) { handletopic_slicecamd(msg); } ) } + }; + } + + std::unique_ptr publisher; ///< publisher object + std::string publisher_address; ///< publish socket endpoint + std::string publisher_topic; ///< my default topic for publishing + std::unique_ptr subscriber; ///< subscriber object + std::string subscriber_address; ///< subscribe socket endpoint + std::vector subscriber_topics; ///< list of topics I subscribe to + std::atomic is_subscriber_thread_running; ///< is my subscriber thread running? + std::atomic should_subscriber_thread_run; ///< should my subscriber thread run? + std::unordered_map> topic_handlers; + ///< maps a handler function to each topic + + long init_pubsub(const std::initializer_list &topics={}) { + return Common::PubSubHandler::init_pubsub(context, *this, topics); + } + void start_subscriber_thread() { Common::PubSubHandler::start_subscriber_thread(*this); } + void stop_subscriber_thread() { Common::PubSubHandler::stop_subscriber_thread(*this); } + + void handletopic_snapshot( const nlohmann::json &jmessage ); ///< respond to a snapshot request + void handletopic_acamd( const nlohmann::json &jmessage ); ///< stash acam CCD temperature into externaldata + void handletopic_slicecamd( const nlohmann::json &jmessage ); ///< stash slicecam CCD temperatures into externaldata + void publish_status(); ///< publish thermalinfo on Topic::THERMALD + void request_snapshot(); ///< ask subscribed daemons to publish their status + Common::Queue async; std::map lakeshore; ///< STL map of all Lakeshores indexed by LKS# Thermal::Campbell campbell; ///< Campbell object for datalogger - std::map telemetry_providers; ///< map of port[daemon_name] for external telemetry providers - std::mutex lakeshoredata_mtx; std::mutex telemdata_mtx; std::mutex externaldata_mtx; @@ -179,10 +220,6 @@ namespace Thermal { std::map thermal_info; ///< thermal info database, indexed by channel label - void make_telemetry_message( std::string &retstring ); ///< assembles JSON telemetry message - void get_external_telemetry(); ///< collect telemetry from other daemons - long handle_json_message( std::string message_in ); ///< parses incoming telemetry messages - long reconnect( std::string args, std::string &retstring ); ///< close,open all hardware devices /** diff --git a/thermald/thermal_server.cpp b/thermald/thermal_server.cpp index 6876f6cb..a411b464 100644 --- a/thermald/thermal_server.cpp +++ b/thermald/thermal_server.cpp @@ -18,6 +18,8 @@ namespace Thermal { void Server::exit_cleanly(void) { std::string function = "Thermal::Server::exit_cleanly"; + this->interface.stop_subscriber_thread(); + logwrite( function, "closing Lakeshores" ); this->interface.close_lakeshores(); this->interface.close_campbell(); @@ -108,28 +110,25 @@ namespace Thermal { applied++; } - // TELEM_PROVIDER : contains daemon name and port to contact for header telemetry info + // PUB_ENDPOINT -- my ZeroMQ socket endpoint for publishing telemetry + // SUB_ENDPOINT -- the broker endpoint I subscribe to (for snapshot requests) // - if ( config.param[entry] == "TELEM_PROVIDER" ) { - std::vector tokens; - Tokenize( config.arg[entry], tokens, " " ); - try { - if ( tokens.size() == 2 ) { - this->interface.telemetry_providers[tokens.at(0)] = std::stod(tokens.at(1)); - } - else { - message.str(""); message << "ERROR bad format TELEM_PROVIDER=\"" << config.arg[entry] << "\": expected "; - logwrite( function, message.str() ); - return ERROR; - } - } - catch ( const std::exception &e ) { - message.str(""); message << "ERROR parsing TELEM_PROVIDER from " << config.arg[entry] << ": " << e.what(); - logwrite( function, message.str() ); - return ERROR; - } - message.str(""); message << "config:" << config.param[entry] << "=" << config.arg[entry]; - this->interface.async.enqueue_and_log( "THERMALD", function, message.str() ); + // NOTE: these two keys must be present in the thermald config file for + // publishing to work. Without PUB_ENDPOINT, init_pubsub() fails and + // no telemetry is published on Topic::THERMALD. + // + if ( config.param[entry] == "PUB_ENDPOINT" ) { + this->interface.publisher_address = config.arg[entry]; + this->interface.publisher_topic = DAEMON_NAME; // default publish topic is my name + message.str(""); message << DAEMON_NAME << ":config:" << config.param[entry] << "=" << config.arg[entry]; + this->interface.async.enqueue_and_log( function, message.str() ); + applied++; + } + + if ( config.param[entry] == "SUB_ENDPOINT" ) { + this->interface.subscriber_address = config.arg[entry]; + message.str(""); message << DAEMON_NAME << ":config:" << config.param[entry] << "=" << config.arg[entry]; + this->interface.async.enqueue_and_log( function, message.str() ); applied++; } @@ -610,23 +609,42 @@ namespace Thermal { while ( server.telem_sleeptimer.running() ) { // Gather the data, each source stores in its own map // - server.interface.get_external_telemetry(); // collect telemetry from other daemons server.interface.lakeshore_readall(); // read all Lakeshores server.interface.campbell.read_data(); // read Campbell CR1000 - // erase the telemdata map, - // timestamp it now, then merge each source into that + // snapshot externaldata under its own lock, then copy (not move) so + // the values received asynchronously via pub/sub persist between the + // updates that populate them. (merge() would move the nodes out, + // emptying externaldata.) + // + std::map extcopy; + { + std::lock_guard extlock( server.interface.externaldata_mtx ); + extcopy = server.interface.externaldata; + } + + // erase the telemdata map, timestamp it now, then merge each source + // into that. Done under lock to exclude readers (publish_status() and + // show_telemdata()) from the concurrent map mutation. + // extcopy is a throwaway local, so it is safe to merge (move) from. // + { + std::lock_guard lock( server.interface.telemdata_mtx ); server.interface.telemdata.clear(); server.interface.telemdata["datetime"] = get_datetime(); - server.interface.telemdata.merge( server.interface.externaldata ); + server.interface.telemdata.merge( extcopy ); server.interface.telemdata.merge( server.interface.campbell.datamap ); server.interface.telemdata.merge( server.interface.lakeshoredata ); + } // insert the telemdata map to the database // database.insert( server.interface.telemdata ); + // publish the latest readings to subscribers on Topic::THERMALD + // + server.interface.publish_status(); + server.telem_sleeptimer.sleepFor( std::chrono::seconds( duration ) ); timeout( 0, "sec" ); } @@ -1047,22 +1065,6 @@ namespace Thermal { if ( cmd == THERMALD_SHOWTELEM ) { ret = this->interface.show_telemdata( args, retstring ); } - else - - // send telemetry upon request - // - if ( cmd == TELEMREQUEST ) { - if ( args=="?" || args=="help" ) { - retstring=TELEMREQUEST+"\n"; - retstring.append( " Returns a serialized JSON message containing telemetry\n" ); - retstring.append( " information, terminated with \"EOF\\n\".\n" ); - ret=HELP; - } - else { - this->interface.make_telemetry_message( retstring ); - ret = JSON; - } - } // unknown commands generate an error // diff --git a/thermald/thermald.cpp b/thermald/thermald.cpp index 26fccb1f..d58d49d1 100644 --- a/thermald/thermald.cpp +++ b/thermald/thermald.cpp @@ -123,6 +123,24 @@ int main(int argc, char **argv) { thermald.exit_cleanly(); } + // initialize the pub/sub handler, subscribing to the camera daemons whose + // andor CCD temperatures I fold into my telemetry + // + if ( thermald.interface.init_pubsub( { Topic::ACAMD, + Topic::SLICECAMD } ) == ERROR ) { + logwrite(function, "ERROR initializing publisher-subscriber handler"); + thermald.exit_cleanly(); + } + + // unconditionally publish current telemetry so the world knows I'm online, + // then request a snapshot so I collect the current status of those I + // subscribe to (in case they came online before I subscribed) + // + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + thermald.interface.publish_status(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + thermald.interface.request_snapshot(); + // This will pre-thread N_THREADS threads. // The 0th thread is reserved for the blocking port, and the rest are for the non-blocking port. // Each thread gets a socket object. All of the socket objects are stored in a vector container.