diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index ed8e355..e3472af 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -131,6 +131,14 @@ const H2_OPEN_TIMEOUT_SECS: u64 = 8; /// long. Prevents every concurrent caller during an h2 outage from /// paying its own full handshake-timeout cost in turn. const H2_OPEN_FAILURE_BACKOFF_SECS: u64 = 15; +/// Same idea as `H2_OPEN_TIMEOUT_SECS` but for the legacy h1 socket +/// path. Without this, a stuck TCP connect or TLS handshake to a +/// blackholed `connect_host:443` would block `acquire()` (and the +/// `warm()` prewarm loop) until the outer batch budget elapsed — +/// the same symptom #924 hit during the warm-race window. Bounded +/// here so a single hung handshake aborts fast and the loop / caller +/// makes progress on the next attempt. +const H1_OPEN_TIMEOUT_SECS: u64 = 8; /// Cadence for Apps Script container keepalive pings. Apps Script /// containers go cold after ~5min idle and cost 1-3s on the first /// request to wake back up — most painful on YouTube / streaming where @@ -156,10 +164,23 @@ struct PoolEntry { /// `generation` is monotonic per fronter and lets `poison_h2_if_gen` /// avoid the race where task A's stale failure clears task B's /// freshly-reopened healthy cell. +/// +/// `dead` is set by the spawned connection-driver task when the h2 +/// `Connection` future ends (GOAWAY, network error, normal close). +/// Without this, the cell silently held a dead `SendRequest` after a +/// mid-session disconnect — the next request paid a wasted h2 round +/// trip to detect it via `ready()` failure, AND `run_pool_refill` +/// kept maintaining the small `POOL_MIN_H2_FALLBACK` (2-socket) pool +/// instead of expanding to `POOL_MIN` (8). With the flag, +/// `run_pool_refill` notices h2 is dead within one tick (≤5 s) and +/// pre-warms the larger fallback pool before the next request burst, +/// and `ensure_h2` short-circuits the `H2_CONN_TTL_SECS`-based +/// liveness check on a known-dead cell. struct H2Cell { send: h2::client::SendRequest, created: Instant, generation: u64, + dead: Arc, } /// "Did this request reach Apps Script?" signal carried out of every @@ -864,44 +885,63 @@ impl DomainFronter { } async fn open(&self) -> Result { - let tcp = TcpStream::connect((self.connect_host.as_str(), 443u16)).await?; - let _ = tcp.set_nodelay(true); - let sni = self.next_sni(); - let name = ServerName::try_from(sni)?; - // Always use the h1-only connector here — the pool only holds - // sockets that the raw HTTP/1.1 fallback path can write to. - // Using the shared connector would let some pooled sockets - // negotiate h2, which would then misframe every fallback - // request that lands on them. - let tls = self.tls_connector_h1.connect(name, tcp).await?; - Ok(tls) + // Bounded TCP+TLS open. See `H1_OPEN_TIMEOUT_SECS`. + let work = async { + let tcp = TcpStream::connect((self.connect_host.as_str(), 443u16)).await?; + let _ = tcp.set_nodelay(true); + let sni = self.next_sni(); + let name = ServerName::try_from(sni)?; + // Always use the h1-only connector here — the pool only holds + // sockets that the raw HTTP/1.1 fallback path can write to. + // Using the shared connector would let some pooled sockets + // negotiate h2, which would then misframe every fallback + // request that lands on them. + let tls = self.tls_connector_h1.connect(name, tcp).await?; + Ok::<_, FronterError>(tls) + }; + match tokio::time::timeout(Duration::from_secs(H1_OPEN_TIMEOUT_SECS), work).await { + Ok(r) => r, + Err(_) => Err(FronterError::Relay(format!( + "h1 open timed out after {}s", + H1_OPEN_TIMEOUT_SECS + ))), + } } /// Open outbound TLS connections eagerly so the first relay request /// doesn't pay a cold handshake. /// - /// When h2 is enabled, attempts to open the multiplexed h2 cell - /// first. Success there means one TCP/TLS handshake serves all - /// future requests, so we only need a tiny fallback h1 pool - /// (clamped to 2) instead of the full `n` requested. On h2 failure - /// (ALPN refusal, network error), falls back to the legacy - /// behavior: warm the full `n` h1 sockets. + /// h2 and h1 prewarm run in parallel: a request that arrives while + /// the h2 handshake is still in flight (or has just hit its 8 s + /// timeout) needs a warm h1 socket waiting for it, otherwise the + /// h1 fallback path pays a cold handshake on the same slow network + /// and the 30 s outer batch budget elapses (#924). v1.9.14 warmed + /// h1 unconditionally; v1.9.15 (PR #799) accidentally gated the h1 + /// prewarm behind `ensure_h2()` so the h1 pool stayed empty during + /// the h2 init window. /// - /// Staggered 500 ms apart so we don't burst N TLS handshakes at the - /// Google edge simultaneously, and each connection gets an 8 s - /// expiry offset so they roll off gradually instead of all hitting - /// POOL_TTL_SECS at once. + /// The spawned h2 handshake races h1[0] — boot fires two TLS + /// handshakes back-to-back. The 500 ms stagger only applies between + /// h1[i] and h1[i+1] for i ≥ 1, so we don't burst the remaining + /// h1[1..n] handshakes at the Google edge simultaneously. Each + /// connection gets an 8 s expiry offset so they roll off gradually + /// instead of all hitting POOL_TTL_SECS at once. If h2 ends up the + /// active fast path, `run_pool_refill` trims the pool back down to + /// `POOL_MIN_H2_FALLBACK` on the next tick — the extra warm h1 + /// sockets just age out naturally instead of being kept alive. pub async fn warm(self: &Arc, n: usize) { - // Try to bring up the h2 fast path first. If that succeeds, - // shrink the h1 pool warm count to the fallback minimum — the - // multiplexed h2 conn handles all real traffic, so the h1 pool - // only needs to cover the rare case where h2 dies mid-session. - let h2_alive = !self.h2_disabled.load(Ordering::Relaxed) - && self.ensure_h2().await.is_some(); - let h1_target = if h2_alive { 2.min(n) } else { n }; + // Spawn the h2 prewarm in parallel so the h1 prewarm loop + // below isn't blocked on it. Capturing the join handle lets + // us still log "h2 fast path active" / "h1 fallback only" + // accurately at the end. + let h2_self = self.clone(); + let h2_handle = tokio::spawn(async move { + !h2_self.h2_disabled.load(Ordering::Relaxed) + && h2_self.ensure_h2().await.is_some() + }); let mut warmed = 0usize; - for i in 0..h1_target { + for i in 0..n { if i > 0 { tokio::time::sleep(Duration::from_millis(500)).await; } @@ -922,6 +962,17 @@ impl DomainFronter { } } } + // Join the h2 prewarm here only to log whether it landed; the + // h1 pool above is already populated either way. A panic in + // the spawned task surfaces as `JoinError` — log it explicitly + // so it isn't indistinguishable from a clean ALPN refusal. + let h2_alive = match h2_handle.await { + Ok(v) => v, + Err(e) => { + tracing::warn!("h2 prewarm task failed to join: {}", e); + false + } + }; if h2_alive { tracing::info!( "h2 fast path active; h1 fallback pool pre-warmed with {} connection(s)", @@ -970,7 +1021,10 @@ impl DomainFronter { let cell = self.h2_cell.lock().await; let h2_alive = cell .as_ref() - .map(|c| c.created.elapsed().as_secs() < H2_CONN_TTL_SECS) + .map(|c| { + c.created.elapsed().as_secs() < H2_CONN_TTL_SECS + && !c.dead.load(Ordering::Relaxed) + }) .unwrap_or(false); if h2_alive { POOL_MIN_H2_FALLBACK } else { POOL_MIN } }; @@ -1115,16 +1169,18 @@ impl DomainFronter { return None; } - // Fast path: existing cell, within TTL. Clone (Arc bump) and - // return without touching the open machinery. We can't peek at - // SendRequest liveness directly (h2 0.4 doesn't expose - // `is_closed`), so a request against a dead conn fails at - // `ready()`/`send_request` and the caller poisons by - // generation from there. + // Fast path: existing cell, within TTL and not flagged dead by + // the connection driver. We can't peek at SendRequest liveness + // synchronously (h2 0.4 doesn't expose `is_closed`), but the + // driver task does flip `dead` when the underlying connection + // ends — so a known-dead cell is rejected here without paying + // a wasted h2 round trip to discover it. { let cell = self.h2_cell.lock().await; if let Some(c) = cell.as_ref() { - if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS { + if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS + && !c.dead.load(Ordering::Relaxed) + { return Some((c.send.clone(), c.generation)); } } @@ -1155,7 +1211,9 @@ impl DomainFronter { { let cell = self.h2_cell.lock().await; if let Some(c) = cell.as_ref() { - if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS { + if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS + && !c.dead.load(Ordering::Relaxed) + { return Some((c.send.clone(), c.generation)); } } @@ -1168,8 +1226,8 @@ impl DomainFronter { tokio::time::timeout(Duration::from_secs(H2_OPEN_TIMEOUT_SECS), self.open_h2()) .await; - let send = match open_result { - Ok(Ok(s)) => s, + let (send, dead) = match open_result { + Ok(Ok(pair)) => pair, Ok(Err(OpenH2Error::AlpnRefused)) => { // Definitive: this peer doesn't speak h2. Sticky-disable // so we never re-attempt the handshake. @@ -1206,6 +1264,7 @@ impl DomainFronter { send: send.clone(), created: Instant::now(), generation, + dead, }); Some((send, generation)) } @@ -1213,7 +1272,11 @@ impl DomainFronter { /// Open one TLS connection and run the h2 handshake. Returns a /// typed `OpenH2Error` so the caller can recognize ALPN refusal /// (sticky disable) without string-matching across boundaries. - async fn open_h2(&self) -> Result, OpenH2Error> { + /// The returned `Arc` is the death flag the connection + /// driver flips when the h2 `Connection` future ends. + async fn open_h2( + &self, + ) -> Result<(h2::client::SendRequest, Arc), OpenH2Error> { let tcp = TcpStream::connect((self.connect_host.as_str(), 443u16)).await?; let _ = tcp.set_nodelay(true); let sni = self.next_sni(); @@ -1228,7 +1291,7 @@ impl DomainFronter { /// bypassing the hard-coded `connect_host:443` target. async fn h2_handshake_post_tls( tls: PooledStream, - ) -> Result, OpenH2Error> { + ) -> Result<(h2::client::SendRequest, Arc), OpenH2Error> { let alpn_h2 = tls .get_ref() .1 @@ -1251,15 +1314,19 @@ impl DomainFronter { .map_err(|e| OpenH2Error::Handshake(e.to_string()))?; // The connection task drives frame I/O independently of any // SendRequest handle. When it ends (GOAWAY, network error, TTL), - // existing handles will start failing on `ready()` / `send_request` - // and `ensure_h2` will reopen on the next call. + // we flip the `dead` flag so `ensure_h2` and `run_pool_refill` + // can react within one refill tick instead of waiting for a + // request to discover the breakage via `ready()` failure. + let dead = Arc::new(AtomicBool::new(false)); + let dead_for_driver = dead.clone(); tokio::spawn(async move { if let Err(e) = conn.await { tracing::debug!("h2 connection closed: {}", e); } + dead_for_driver.store(true, Ordering::Relaxed); }); tracing::info!("h2 connection established to relay edge"); - Ok(send) + Ok((send, dead)) } /// React to an h2-fronting-incompatibility HTTP response (status @@ -5120,6 +5187,7 @@ hello"; send: send_v2.clone(), created: Instant::now(), generation: 2, + dead: Arc::new(AtomicBool::new(false)), }); } // Task A poisons with stale gen=1. @@ -5141,6 +5209,52 @@ hello"; server_handle.abort(); } + #[tokio::test(flavor = "current_thread")] + async fn ensure_h2_rejects_dead_cell_within_ttl() { + // Cell is within H2_CONN_TTL_SECS but the connection driver + // already flipped `dead` (e.g., upstream sent GOAWAY). Without + // the dead-flag check `ensure_h2` would happily hand out the + // stale SendRequest and the next request would pay a wasted + // h2 round trip to discover the breakage. With the check in + // place a second pre-existing healthy cell still works fine — + // the dead one is replaced via the open-lock path. + let (addr, server_handle) = spawn_h2c_server(|_req| { + let resp = http::Response::builder().status(200).body(()).unwrap(); + (resp, Vec::new()) + }) + .await; + let send = h2c_client(addr).await; + + let fronter = fronter_for_test(false); + let dead = Arc::new(AtomicBool::new(true)); // simulate driver having exited + { + let mut cell = fronter.h2_cell.lock().await; + *cell = Some(H2Cell { + send, + created: Instant::now(), // well within TTL + generation: 1, + dead: dead.clone(), + }); + } + + // The fast path normally returns Some(send, gen) when the cell + // is within TTL. With dead=true it must NOT return the stale + // SendRequest. Pre-set the failure-backoff timestamp so + // ensure_h2 short-circuits at the backoff check (no network + // I/O) regardless of whatever's bound on 127.0.0.1:443 on the + // dev/CI host. This isolates the assertion to the new + // dead-flag check. + *fronter.h2_open_failed_at.lock().await = Some(Instant::now()); + + let result = fronter.ensure_h2().await; + assert!( + result.is_none(), + "ensure_h2 must not serve a cell whose driver flipped `dead`" + ); + + server_handle.abort(); + } + #[tokio::test(flavor = "current_thread")] async fn ensure_h2_skips_reopen_during_failure_backoff() { // After an open failure, ensure_h2 must return None for at @@ -5566,6 +5680,7 @@ hello"; send: send.clone(), created: Instant::now(), generation: 7, + dead: Arc::new(AtomicBool::new(false)), }); } // Pretend a round-trip just incremented h2_calls (which is @@ -5682,7 +5797,7 @@ hello"; match result { Err(OpenH2Error::AlpnRefused) => {} // expected Err(other) => panic!("expected AlpnRefused, got {:?}", other), - Ok(_) => panic!("expected AlpnRefused, got Ok"), + Ok((_send, _dead)) => panic!("expected AlpnRefused, got Ok"), } server.await.unwrap(); }