Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions acamd/acam_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,31 @@ namespace Acam {
buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end());
buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end());

// Detect and strip an optional correlation ID prefix. Inter-daemon clients
// tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies
// can be rejected by the client. CLI users send no prefix and corr_id is
// left empty; the server then echoes no prefix on reply.
//
std::string corr_id;
{
std::string payload;
Common::extract_correlation_id( buf, corr_id, payload );
buf = std::move( payload );
}

// Replay a cached reply if this command's ID matches a recent one.
// This makes DaemonClient retries idempotent: the underlying handler
// is invoked at most once per correlation ID within the cache TTL.
//
if ( !corr_id.empty() ) {
std::string cached_reply;
if ( this->corr_cache.lookup( corr_id, cached_reply ) ) {
std::string out = CID_PREFIX + corr_id + " " + cached_reply;
if ( sock.Write( out ) < 0 ) connection_open=false;
continue;
}
}

if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out

try {
Expand Down Expand Up @@ -964,6 +989,21 @@ namespace Acam {
logwrite( function, message.str() );
}

// Cache the bare reply (without prefix) so retries with the same
// correlation ID can be replayed without re-running the handler.
//
if ( !corr_id.empty() ) {
this->corr_cache.insert( corr_id, retstring );
}

// Echo the correlation ID back to inter-daemon clients so they can
// verify the reply belongs to the command they just sent. CLI users
// sent no prefix, so corr_id is empty and nothing is prepended.
//
if ( !corr_id.empty() ) {
retstring = CID_PREFIX + corr_id + " " + retstring;
}

if ( sock.Write( retstring ) < 0 ) connection_open=false;
}

Expand Down
1 change: 1 addition & 0 deletions acamd/acam_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ namespace Acam {
std::string asyncgroup; ///< asynchronous multicast group

std::atomic<int> cmd_num; ///< keep a running tally of number of commands received by acamd
Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands
std::atomic<int> threads_active; ///< number of blocking threads that exist

Config config; ///< create a Config object for reading the configuration file
Expand Down
40 changes: 40 additions & 0 deletions calibd/calib_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,31 @@ namespace Calib {
buf.erase(std::remove(buf.begin(), buf.end(), '\r' ), buf.end());
buf.erase(std::remove(buf.begin(), buf.end(), '\n' ), buf.end());

// Detect and strip an optional correlation ID prefix. Inter-daemon clients
// tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies
// can be rejected by the client. CLI users send no prefix and corr_id is
// left empty; the server then echoes no prefix on reply.
//
std::string corr_id;
{
std::string payload;
Common::extract_correlation_id( buf, corr_id, payload );
buf = std::move( payload );
}

// Replay a cached reply if this command's ID matches a recent one.
// This makes DaemonClient retries idempotent: the underlying handler
// is invoked at most once per correlation ID within the cache TTL.
//
if ( !corr_id.empty() ) {
std::string cached_reply;
if ( this->corr_cache.lookup( corr_id, cached_reply ) ) {
std::string out = CID_PREFIX + corr_id + " " + cached_reply;
if ( sock.Write( out ) < 0 ) connection_open=false;
continue;
}
}

if (buf.empty()) {sock.Write("\n"); continue;} // acknowledge empty command so client doesn't time out

try {
Expand Down Expand Up @@ -649,6 +674,21 @@ namespace Calib {
logwrite( function, message.str() );
}

// Cache the bare reply (without prefix) so retries with the same
// correlation ID can be replayed without re-running the handler.
//
if ( !corr_id.empty() ) {
this->corr_cache.insert( corr_id, retstring );
}

// Echo the correlation ID back to inter-daemon clients so they can
// verify the reply belongs to the command they just sent. CLI users
// sent no prefix, so corr_id is empty and nothing is prepended.
//
if ( !corr_id.empty() ) {
retstring = CID_PREFIX + corr_id + " " + retstring;
}

if ( sock.Write( retstring ) < 0 ) connection_open=false;
}

Expand Down
1 change: 1 addition & 0 deletions calibd/calib_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ namespace Calib {
int blocking_socket;

std::atomic<int> cmd_num;
Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands

Config config;

Expand Down
40 changes: 40 additions & 0 deletions camerad/camerad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,31 @@ void doit(Network::TcpSocket &sock) {
sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\r' ), sbuf.end());
sbuf.erase(std::remove(sbuf.begin(), sbuf.end(), '\n' ), sbuf.end());

// Detect and strip an optional correlation ID prefix. Inter-daemon clients
// tag every command with "#cid:HHHHHHHH " so stale or out-of-order replies
// can be rejected by the client. CLI users send no prefix and corr_id is
// left empty; the server then echoes no prefix on reply.
//
std::string corr_id;
{
std::string payload;
Common::extract_correlation_id( sbuf, corr_id, payload );
sbuf = std::move( payload );
}

// Replay a cached reply if this command's ID matches a recent one.
// This makes DaemonClient retries idempotent: the underlying handler
// is invoked at most once per correlation ID within the cache TTL.
//
if ( !corr_id.empty() ) {
std::string cached_reply;
if ( server.corr_cache.lookup( corr_id, cached_reply ) ) {
std::string out = CID_PREFIX + corr_id + " " + cached_reply;
if ( sock.Write( out ) < 0 ) connection_open=false;
continue;
}
}

if ( sbuf.empty() ) sbuf="help"; // no command automatically displays help

try {
Expand Down Expand Up @@ -829,6 +854,21 @@ void doit(Network::TcpSocket &sock) {
logwrite( function, message.str() );
}

// Cache the bare reply (without prefix) so retries with the same
// correlation ID can be replayed without re-running the handler.
//
if ( !corr_id.empty() ) {
server.corr_cache.insert( corr_id, retstring );
}

// Echo the correlation ID back to inter-daemon clients so they can
// verify the reply belongs to the command they just sent. CLI users
// sent no prefix, so corr_id is empty and nothing is prepended.
//
if ( !corr_id.empty() ) {
retstring = CID_PREFIX + corr_id + " " + retstring;
}

if ( sock.Write( retstring ) < 0 ) connection_open=false;
}

Expand Down
1 change: 1 addition & 0 deletions camerad/camerad.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ namespace Camera {
int blocking_socket;

std::atomic<int> cmd_num;
Common::CorrIdCache corr_cache; ///< dedup cache for tagged inter-daemon commands

std::vector<int> jclient_ports;

Expand Down
Loading
Loading