Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions patchbay/src/nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::{Ipv4Addr, Ipv6Addr};

use anyhow::{anyhow, Context, Result};
use ipnet::Ipv6Net;
use tracing::debug;
use tracing::{debug, trace};

use crate::{
core::RouterConfig, netns, qdisc, wiring::set_sysctl_root, ConntrackTimeouts, LinkCondition,
Expand Down Expand Up @@ -43,7 +43,7 @@ async fn run_nft(rules: &str) -> Result<()> {

/// Applies nftables rules inside `ns` on the namespace's async worker.
pub(crate) async fn run_nft_in(netns: &netns::NetnsManager, ns: &str, rules: &str) -> Result<()> {
debug!(ns = %ns, rules = %rules, "nft: apply rules");
trace!(ns = %ns, rules = %rules, "nft: apply rules");
let rules = rules.to_string();
let rt = netns.rt_handle_for(ns)?;
rt.spawn(async move { run_nft(&rules).await })
Expand Down
28 changes: 24 additions & 4 deletions patchbay/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub(crate) fn udp_roundtrip(reflector: SocketAddr) -> Result<SocketAddr> {
}

/// Returns UDP round-trip time to `reflector` (blocking).
///
/// Retries up to 3 times on transient EAGAIN errors, which can occur
/// under high-latency impairment when the socket buffer is temporarily
/// full on a loaded CI runner.
pub(crate) fn udp_rtt_sync(reflector: SocketAddr) -> Result<Duration> {
let bind = if reflector.is_ipv4() {
"0.0.0.0:0"
Expand All @@ -104,10 +108,26 @@ pub(crate) fn udp_rtt_sync(reflector: SocketAddr) -> Result<Duration> {
let sock = UdpSocket::bind(bind).context("udp_rtt_sync bind")?;
sock.set_read_timeout(Some(Duration::from_secs(2)))?;
let mut buf = [0u8; 256];
let start = Instant::now();
sock.send_to(b"PING", reflector)?;
let _ = sock.recv_from(&mut buf)?;
Ok(start.elapsed())
for attempt in 0..3 {
let start = Instant::now();
match sock.send_to(b"PING", reflector) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::WouldBlock && attempt < 2 => {
std::thread::sleep(Duration::from_millis(100));
continue;
}
Err(e) => return Err(e).context("udp_rtt_sync send"),
}
match sock.recv_from(&mut buf) {
Ok(_) => return Ok(start.elapsed()),
Err(e) if e.kind() == ErrorKind::WouldBlock && attempt < 2 => {
std::thread::sleep(Duration::from_millis(100));
continue;
}
Err(e) => return Err(e).context("udp_rtt_sync recv"),
}
}
anyhow::bail!("udp_rtt_sync: all retries exhausted")
}

/// Async UDP round-trip time measurement.
Expand Down
21 changes: 13 additions & 8 deletions patchbay/src/wiring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use anyhow::{anyhow, bail, Context, Result};
use ipnet::{Ipv4Net, Ipv6Net};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, Instrument as _};
use tracing::{debug, instrument, trace, Instrument as _};

use crate::{
core::{CoreConfig, DeviceData, IfaceBuild, NodeId, RaRuntimeCfg, RouterData},
Expand Down Expand Up @@ -190,7 +190,7 @@ pub(crate) async fn setup_router_async(
.await?;

if let Some(upstream_ip4) = router.upstream_ip {
debug!(nat = ?router.cfg.nat, ip = %upstream_ip4, "router: apply NAT");
trace!(nat = ?router.cfg.nat, ip = %upstream_ip4, "router: apply NAT");
apply_nat_for_router(netns, &router.ns, &router.cfg, &ns_if, upstream_ip4).await?;
}

Expand All @@ -202,7 +202,7 @@ pub(crate) async fn setup_router_async(
let lan_pfx = Ipv6Net::new(dl_gw_v6, dl_prefix)
.unwrap_or_else(|_| Ipv6Net::new(dl_gw_v6, 64).unwrap());
let wan_pfx = nptv6_wan_prefix(up_v6, lan_pfx.prefix_len());
debug!(nat_v6 = ?router.cfg.nat_v6, %wan_pfx, %lan_pfx, "router: apply NAT v6");
trace!(nat_v6 = ?router.cfg.nat_v6, %wan_pfx, %lan_pfx, "router: apply NAT v6");
apply_nat_v6(
netns,
&router.ns,
Expand Down Expand Up @@ -313,7 +313,7 @@ pub(crate) async fn setup_router_async(
.await?;

if let Some(upstream_ip4) = router.upstream_ip {
debug!(nat = ?router.cfg.nat, ip = %upstream_ip4, "router: apply NAT");
trace!(nat = ?router.cfg.nat, ip = %upstream_ip4, "router: apply NAT");
apply_nat_for_router(netns, &router.ns, &router.cfg, &wan_if, upstream_ip4).await?;
}

Expand All @@ -325,7 +325,7 @@ pub(crate) async fn setup_router_async(
let lan_pfx = Ipv6Net::new(dl_gw_v6, dl_prefix)
.unwrap_or_else(|_| Ipv6Net::new(dl_gw_v6, 64).unwrap());
let wan_pfx = nptv6_wan_prefix(up_v6, lan_pfx.prefix_len());
debug!(nat_v6 = ?router.cfg.nat_v6, %wan_pfx, %lan_pfx, "router: apply NAT v6");
trace!(nat_v6 = ?router.cfg.nat_v6, %wan_pfx, %lan_pfx, "router: apply NAT v6");
apply_nat_v6(
netns,
&router.ns,
Expand Down Expand Up @@ -457,6 +457,8 @@ pub(crate) async fn setup_router_async(
)?;
}

debug!(name = %router.name, ns = %router.ns, wan_ip = ?router.upstream_ip, nat = ?router.cfg.nat, "router: ready");

Ok(())
}

Expand Down Expand Up @@ -669,6 +671,7 @@ pub(crate) async fn setup_device_async(
Vec::new()
};
debug!(id = dev.id.0, name = %dev.name, ns = %dev.ns, "device: setup");
let dev_ip = ifaces.iter().find(|i| i.is_default).and_then(|i| i.dev_ip);
let log_prefix = format!("{}.{}", crate::consts::KIND_DEVICE, dev.name);
create_named_netns(netns, &dev.ns, dns_overlay, Some(log_prefix), dad_mode)?;

Expand Down Expand Up @@ -700,13 +703,15 @@ pub(crate) async fn setup_device_async(
.await?;
}

debug!(name = %dev.name, ns = %dev.ns, ip = ?dev_ip, "device: ready");

Ok(())
}

/// Wire a dummy interface inside a device namespace.
#[instrument(name = "iface_dummy", skip_all, fields(iface = %build.ifname))]
async fn wire_dummy_async(netns: &Arc<netns::NetnsManager>, build: &IfaceBuild) -> Result<()> {
debug!(ip = ?build.dev_ip, ip6 = ?build.dev_ip_v6, "iface_dummy: setup");
trace!(ip = ?build.dev_ip, ip6 = ?build.dev_ip_v6, "iface_dummy: setup");
nl_run(netns, &build.dev_ns, {
let ifname = build.ifname.clone();
let dev_ip = build.dev_ip;
Expand Down Expand Up @@ -747,7 +752,7 @@ pub(crate) async fn wire_iface_async(
if dev.dummy {
return wire_dummy_async(netns, &dev).await;
}
debug!(ip = ?dev.dev_ip, ip6 = ?dev.dev_ip_v6, gw = ?dev.gw_ip, gw6 = ?dev.gw_ip_v6, "iface: assigned addresses");
trace!(ip = ?dev.dev_ip, ip6 = ?dev.dev_ip_v6, gw = ?dev.gw_ip, gw6 = ?dev.gw_ip_v6, "iface: assigned addresses");
let root_gw = format!("{}g{}", prefix, dev.idx);
let root_dev = format!("{}e{}", prefix, dev.idx);

Expand Down Expand Up @@ -898,7 +903,7 @@ pub(crate) fn create_named_netns(

/// Sets a sysctl value in the current namespace (caller must already be in the ns).
pub(crate) fn set_sysctl_root(path: &str, val: &str) -> Result<()> {
debug!(path = %path, val = %val, "sysctl: set");
trace!(path = %path, val = %val, "sysctl: set");
std::fs::write(format!("/proc/sys/{}", path), val)
.with_context(|| format!("sysctl write {}", path))
}
Expand Down
Loading