diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 4f46ed2cf..74d06f806 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -74,6 +74,7 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( config.network_type, signal.clone(), &metrics, + config.daemon_conn_max_age, )?); info!("opening database at {}", config.db_path.display()); let store = Arc::new(Store::open(&config, &metrics, true)); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 220c2bd14..c31202665 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -40,6 +40,7 @@ fn main() { config.network_type, signal, &metrics, + config.daemon_conn_max_age, ) .unwrap(), ); diff --git a/src/config.rs b/src/config.rs index d1408f610..ba21be7fc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use stderrlog; use crate::chain::Network; @@ -27,6 +28,7 @@ pub struct Config { pub daemon_rpc_addr: SocketAddr, pub daemon_rpc_fallback_addr: Option, pub daemon_parallelism: usize, + pub daemon_conn_max_age: Option, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, @@ -177,6 +179,13 @@ impl Config { .help("Number of JSONRPC requests to send in parallel") .default_value("4") ) + .arg( + Arg::with_name("daemon_rpc_conn_max_age") + .long("daemon-rpc-conn-max-age") + .help("Max age (in seconds) of a daemon RPC TCP connection before it is proactively recycled. Recycling re-establishes the connection, letting a load balancer (e.g. a Kubernetes ClusterSetIP) re-select a backend after node rotations. The reconnect happens inline on the next request, so prefer a generous value (minutes, not seconds) to avoid periodic latency spikes. 0 = unlimited / never recycle (default)") + .default_value("0") + .takes_value(true), + ) .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") @@ -425,6 +434,12 @@ impl Config { .value_of("daemon_rpc_fallback_addr") .map(|e| str_to_socketaddr(e, "Bitcoin Fallback RPC")); + let daemon_conn_max_age: Option = + match value_t_or_exit!(m, "daemon_rpc_conn_max_age", u64) { + 0 => None, // 0 = unlimited / never recycle + secs => Some(Duration::from_secs(secs)), + }; + let electrum_rpc_addr: SocketAddr = str_to_socketaddr( m.value_of("electrum_rpc_addr") .unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)), @@ -494,6 +509,7 @@ impl Config { daemon_rpc_addr, daemon_rpc_fallback_addr, daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), + daemon_conn_max_age, cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, diff --git a/src/daemon.rs b/src/daemon.rs index 1e7ae152a..2e336949d 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{env, fs, io}; use base64::prelude::{Engine, BASE64_STANDARD}; @@ -24,7 +24,7 @@ use elements::encode::{deserialize, serialize_hex}; use electrs_macros::trace; use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid}; -use crate::metrics::{HistogramOpts, HistogramVec, Metrics}; +use crate::metrics::{CounterVec, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::signal::Waiter; use crate::util::{HeaderList, DEFAULT_BLOCKHASH}; @@ -40,6 +40,11 @@ lazy_static! { static ref DAEMON_WRITE_TIMEOUT: Duration = Duration::from_secs( env::var("DAEMON_WRITE_TIMEOUT").map_or(10 * 60, |s| s.parse().unwrap()) ); + // Minimum delay between *failed* proactive max-age recycle attempts, so that a sustained + // inability to open new connections doesn't make every request pay a connect timeout. + static ref DAEMON_CONN_RECYCLE_COOLDOWN: Duration = Duration::from_secs( + env::var("DAEMON_CONN_RECYCLE_COOLDOWN").map_or(30, |s| s.parse().unwrap()) + ); } const MAX_ATTEMPTS: u32 = 5; @@ -187,9 +192,72 @@ struct Connection { cookie_getter: Arc, addr: SocketAddr, fallback: Option, + // The address this connection is actually established to: either `addr` (primary) + // or `fallback`. Used for accurate operational logging. + active_addr: SocketAddr, signal: Waiter, + // When the TCP connection was (re)established, used together with `max_age` to + // proactively recycle long-lived connections (see `is_expired`). + established: Instant, + // Maximum age of a connection before it is proactively recycled, or None for unlimited. + max_age: Option, + // When the last *failed* proactive recycle attempt happened, used to rate-limit retries + // (see `DAEMON_CONN_RECYCLE_COOLDOWN`). None until a recycle attempt fails. + last_recycle_attempt: Option, +} + +fn configure_stream(conn: &TcpStream) { + // can only fail if DAEMON_TIMEOUT is 0 + conn.set_read_timeout(Some(*DAEMON_READ_TIMEOUT)).unwrap(); + conn.set_write_timeout(Some(*DAEMON_WRITE_TIMEOUT)).unwrap(); } +/// Attempt a single connection to the primary address, falling back to the fallback +/// address once. Returns an error if neither is reachable. Does not retry or back off, +/// so callers that must stay available (e.g. proactive max-age recycling) can give up +/// and keep using their existing connection. +#[trace] +fn tcp_connect_once( + primary: SocketAddr, + fallback: Option, +) -> Result<(TcpStream, SocketAddr)> { + let primary_err = match TcpStream::connect_timeout(&primary, *DAEMON_CONNECTION_TIMEOUT) { + Ok(conn) => { + configure_stream(&conn); + return Ok((conn, primary)); + } + Err(err) => err, + }; + // Return a single descriptive error and let the caller decide how to log it, rather than + // warning per-attempt here (which would double-log on the best-effort recycle path). + match fallback { + Some(fallback_addr) => { + debug!( + "primary daemon at {} unreachable ({}), trying fallback {}", + primary, primary_err, fallback_addr + ); + match TcpStream::connect_timeout(&fallback_addr, *DAEMON_CONNECTION_TIMEOUT) { + Ok(conn) => { + info!("connected to fallback daemon at {}", fallback_addr); + configure_stream(&conn); + Ok((conn, fallback_addr)) + } + Err(fallback_err) => bail!(ErrorKind::Connection(format!( + "failed to connect to primary daemon at {} ({}) and fallback at {} ({})", + primary, primary_err, fallback_addr, fallback_err + ))), + } + } + None => bail!(ErrorKind::Connection(format!( + "failed to connect to daemon at {}: {}", + primary, primary_err + ))), + } +} + +/// Connect to the daemon, retrying indefinitely (with backoff) until a connection +/// succeeds. Used for startup and for reconnecting after a real send/recv failure, +/// where there is no usable connection to fall back to. #[trace] fn tcp_connect( primary: SocketAddr, @@ -197,37 +265,13 @@ fn tcp_connect( signal: &Waiter, ) -> Result<(TcpStream, SocketAddr)> { loop { - match TcpStream::connect_timeout(&primary, *DAEMON_CONNECTION_TIMEOUT) { - Ok(conn) => { - // can only fail if DAEMON_TIMEOUT is 0 - conn.set_read_timeout(Some(*DAEMON_READ_TIMEOUT)).unwrap(); - conn.set_write_timeout(Some(*DAEMON_WRITE_TIMEOUT)).unwrap(); - return Ok((conn, primary)); - } + match tcp_connect_once(primary, fallback) { + Ok(res) => return Ok(res), Err(err) => { - let suffix = if fallback.is_some() { - " (trying fallback...)" - } else { - "" - }; warn!( - "failed to connect to primary daemon at {}: {}{}", - primary, err, suffix + "{}; backoff 3 seconds before next attempt", + err.display_chain() ); - if let Some(fallback_addr) = fallback { - match TcpStream::connect_timeout(&fallback_addr, *DAEMON_CONNECTION_TIMEOUT) { - Ok(conn) => { - info!("connected to fallback daemon at {}", fallback_addr); - conn.set_read_timeout(Some(*DAEMON_READ_TIMEOUT)).unwrap(); - conn.set_write_timeout(Some(*DAEMON_WRITE_TIMEOUT)).unwrap(); - return Ok((conn, fallback_addr)); - } - Err(err) => { - warn!("failed to connect to fallback daemon at {}: {}", fallback_addr, err); - } - } - } - warn!("backoff 3 seconds before next attempt"); signal.wait(Duration::from_secs(3), false)?; continue; } @@ -235,6 +279,24 @@ fn tcp_connect( } } +/// Decide whether an expired connection is due for a (re)attempt at proactive recycling. +/// Returns false when no max age is configured, when the connection is younger than the max +/// age, or when a previous recycle attempt failed less than `cooldown` ago (to avoid paying a +/// connect timeout on every request during a sustained connect failure). Pure for testability. +fn recycle_due( + age: Duration, + max_age: Option, + since_last_attempt: Option, + cooldown: Duration, +) -> bool { + match max_age { + None => false, + Some(max_age) => { + age >= max_age && since_last_attempt.map_or(true, |since| since >= cooldown) + } + } +} + impl Connection { #[trace] fn new( @@ -242,8 +304,22 @@ impl Connection { fallback: Option, cookie_getter: Arc, signal: Waiter, + max_age: Option, ) -> Result { let (conn, active_addr) = tcp_connect(addr, fallback, &signal)?; + Connection::from_stream(conn, active_addr, addr, fallback, cookie_getter, signal, max_age) + } + + /// Build a `Connection` wrapper around an already-established TCP stream. + fn from_stream( + conn: TcpStream, + active_addr: SocketAddr, + addr: SocketAddr, + fallback: Option, + cookie_getter: Arc, + signal: Waiter, + max_age: Option, + ) -> Result { debug!("connected to bitcoind at {}", active_addr); let reader = BufReader::new( conn.try_clone() @@ -255,7 +331,11 @@ impl Connection { cookie_getter, addr, fallback, + active_addr, signal, + established: Instant::now(), + max_age, + last_recycle_attempt: None, }) } @@ -266,6 +346,37 @@ impl Connection { self.fallback, self.cookie_getter.clone(), self.signal.clone(), + self.max_age, + ) + } + + /// Attempt a single reconnect for proactive max-age recycling. Unlike `reconnect`, + /// this makes one bounded attempt (primary then fallback) and returns an error + /// instead of looping, so the caller can keep using the existing healthy connection + /// if no fresh socket is available. + #[trace] + fn try_reconnect_once(&self) -> Result { + let (conn, active_addr) = tcp_connect_once(self.addr, self.fallback)?; + Connection::from_stream( + conn, + active_addr, + self.addr, + self.fallback, + self.cookie_getter.clone(), + self.signal.clone(), + self.max_age, + ) + } + + /// Whether this connection is due to be proactively recycled now: it has exceeded its + /// configured `max_age` and no recent recycle attempt has failed within the cooldown. + /// Always false when no max age is configured (unlimited). + fn should_recycle(&self) -> bool { + recycle_due( + self.established.elapsed(), + self.max_age, + self.last_recycle_attempt.map(|at| at.elapsed()), + *DAEMON_CONN_RECYCLE_COOLDOWN, ) } @@ -372,12 +483,14 @@ pub struct Daemon { conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, + conn_max_age: Option, rpc_threads: Arc, // monitoring latency: HistogramVec, size: HistogramVec, + conn_recycle: CounterVec, } impl Daemon { @@ -391,6 +504,7 @@ impl Daemon { network: Network, signal: Waiter, metrics: &Metrics, + conn_max_age: Option, ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), @@ -401,9 +515,11 @@ impl Daemon { daemon_rpc_fallback_addr, cookie_getter, signal.clone(), + conn_max_age, )?), message_id: Counter::new(), signal: signal.clone(), + conn_max_age, rpc_threads: Arc::new( rayon::ThreadPoolBuilder::new() .num_threads(daemon_parallelism) @@ -419,6 +535,13 @@ impl Daemon { HistogramOpts::new("daemon_bytes", "Bitcoind RPC size (in bytes)"), &["method", "dir"], ), + conn_recycle: metrics.counter_vec( + MetricOpts::new( + "daemon_rpc_conn_recycled", + "Proactive daemon RPC connection recycle attempts (by result)", + ), + &["result"], + ), }; let network_info = daemon.getnetworkinfo()?; info!("{:?}", network_info); @@ -460,9 +583,11 @@ impl Daemon { conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), + conn_max_age: self.conn_max_age, rpc_threads: self.rpc_threads.clone(), latency: self.latency.clone(), size: self.size.clone(), + conn_recycle: self.conn_recycle.clone(), }) } @@ -505,6 +630,36 @@ impl Daemon { #[trace] fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { let mut conn = self.conn.lock().unwrap(); + // Proactively recycle connections older than the configured max age. Re-establishing + // the TCP connection lets a fronting load balancer (e.g. a Kubernetes ClusterSetIP) + // re-select a backend, so a long-lived connection does not stay pinned to a stale + // endpoint after node rotations. No-op when no max age is configured (the default). + if conn.should_recycle() { + match conn.try_reconnect_once() { + Ok(new_conn) => { + debug!( + "recycled expired daemon RPC connection to {} after {:?}", + conn.active_addr, + conn.established.elapsed() + ); + *conn = new_conn; + self.conn_recycle.with_label_values(&["ok"]).inc(); + } + Err(err) => { + // Recycling is best-effort: if no fresh socket is available (e.g. a + // transient load-balancer hiccup), keep using the existing healthy + // connection rather than blocking requests while it is still usable. + // Record the failed attempt so we don't retry (and pay a connect timeout) + // on every subsequent request; the next attempt waits out the cooldown. + conn.last_recycle_attempt = Some(Instant::now()); + self.conn_recycle.with_label_values(&["failed"]).inc(); + warn!( + "failed recycling expired daemon RPC connection, keeping existing connection: {}", + err.display_chain() + ); + } + } + } let timer = self.latency.with_label_values(&[method]).start_timer(); let request = request.to_string(); conn.send(&request)?; @@ -882,3 +1037,44 @@ impl Daemon { Ok(relayfee * 100_000f64) } } + +#[cfg(test)] +mod tests { + use super::recycle_due; + use std::time::Duration; + + const COOLDOWN: Duration = Duration::from_secs(30); + const MAX_AGE: Option = Some(Duration::from_secs(60)); + + fn secs(n: u64) -> Duration { + Duration::from_secs(n) + } + + #[test] + fn no_max_age_never_recycles() { + // Unlimited (the default): never recycle, regardless of age. + assert!(!recycle_due(secs(10_000), None, None, COOLDOWN)); + } + + #[test] + fn younger_than_max_age_does_not_recycle() { + assert!(!recycle_due(secs(5), MAX_AGE, None, COOLDOWN)); + } + + #[test] + fn expired_with_no_prior_attempt_recycles() { + assert!(recycle_due(secs(61), MAX_AGE, None, COOLDOWN)); + } + + #[test] + fn expired_within_cooldown_waits() { + // A recent failed attempt should suppress retries until the cooldown elapses, + // even though the connection is well past its max age. + assert!(!recycle_due(secs(600), MAX_AGE, Some(secs(5)), COOLDOWN)); + } + + #[test] + fn expired_after_cooldown_retries() { + assert!(recycle_due(secs(600), MAX_AGE, Some(secs(31)), COOLDOWN)); + } +} diff --git a/tests/common.rs b/tests/common.rs index 3fb3a87e7..56e05c595 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -94,6 +94,7 @@ impl TestRunner { db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), daemon_parallelism: 3, + daemon_conn_max_age: None, blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), daemon_rpc_fallback_addr: None, @@ -145,6 +146,7 @@ impl TestRunner { config.network_type, signal.clone(), &metrics, + config.daemon_conn_max_age, )?); let store = Arc::new(Store::open(&config, &metrics, true));