diff --git a/communication/Cargo.toml b/communication/Cargo.toml index e2d490897..aded24255 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -29,6 +29,9 @@ timely_bytes = { path = "../bytes", version = "0.29" } timely_container = { path = "../container", version = "0.29.0" } timely_logging = { path = "../logging", version = "0.29" } +[dev-dependencies] +tempfile = "3" + # Lgalloc only supports linux and macos, don't depend on any other OS. [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] lgalloc = "0.6" diff --git a/communication/examples/lgalloc.rs b/communication/examples/lgalloc.rs index e93e8c99b..3f805d95e 100644 --- a/communication/examples/lgalloc.rs +++ b/communication/examples/lgalloc.rs @@ -73,7 +73,8 @@ mod example { // extract the configuration from user-supplied arguments, initialize the computation. let config = timely_communication::Config::ProcessBinary(4); - let (allocators, others) = config.try_build_with(refill).unwrap(); + let hooks = timely_communication::Hooks { refill, ..Default::default() }; + let (allocators, others) = config.try_build_with(hooks).unwrap(); let guards = timely_communication::initialize_from(allocators, others, |mut allocator| { println!("worker {} of {} started", allocator.index(), allocator.peers()); diff --git a/communication/examples/spill_stress.rs b/communication/examples/spill_stress.rs new file mode 100644 index 000000000..4794302e8 --- /dev/null +++ b/communication/examples/spill_stress.rs @@ -0,0 +1,238 @@ +//! Pushes a configurable volume of data through a MergeQueue with spill +//! enabled, printing RSS at intervals. Useful for verifying that the spill +//! mechanism keeps memory bounded under load. +//! +//! Also serves as a reference implementation of a file-backed `BytesSpill` +//! strategy, demonstrating how to implement the spill traits against a +//! concrete storage backend. +//! +//! Usage: +//! cargo run --example spill_stress -p timely_communication -- [OPTIONS] +//! +//! Options: +//! --total-mb N Total data to push (default: 512) +//! --chunk-kb N Size of each pushed Bytes (default: 256) +//! --threshold-mb N Spill threshold (default: 32) +//! --head-reserve-mb N Head reserve / prefetch budget (default: 16) +//! --spill-dir PATH Directory for tempfiles (default: std::env::temp_dir()) +//! --drain-every N Drain after every N pushes (default: 0 = push all then drain) + +use std::time::Instant; + +use timely_bytes::arc::BytesMut; +use timely_communication::allocator::zero_copy::bytes_exchange::{MergeQueue, BytesPush, BytesPull}; +use timely_communication::allocator::zero_copy::spill::*; + +fn main() { + let args: Vec = std::env::args().collect(); + + let total_mb: usize = parse_arg(&args, "--total-mb", 512); + let chunk_kb: usize = parse_arg(&args, "--chunk-kb", 256); + let threshold_mb: usize = parse_arg(&args, "--threshold-mb", 32); + let head_reserve_mb:usize = parse_arg(&args, "--head-reserve-mb",16); + let drain_every: usize = parse_arg(&args, "--drain-every", 0); + let spill_dir: std::path::PathBuf = args.iter() + .position(|a| a == "--spill-dir") + .and_then(|i| args.get(i + 1)) + .map(std::path::PathBuf::from) + .unwrap_or_else(std::env::temp_dir); + + let total_bytes = total_mb << 20; + let chunk_bytes = chunk_kb << 10; + let threshold_bytes = threshold_mb << 20; + let head_reserve = head_reserve_mb << 20; + let num_chunks = total_bytes / chunk_bytes; + + println!("spill_stress configuration:"); + println!(" total: {} MB ({} chunks of {} KB)", total_mb, num_chunks, chunk_kb); + println!(" threshold: {} MB", threshold_mb); + println!(" head_reserve: {} MB", head_reserve_mb); + println!(" drain_every: {}", if drain_every == 0 { "push-all-then-drain".to_string() } else { format!("{}", drain_every) }); + println!(" spill_dir: {}", spill_dir.display()); + println!(); + + // Build writer + reader pair. + let strategy: Box = + Box::new(file_spill::FileSpillStrategy::new(spill_dir)); + let mut tp = Threshold::new(strategy); + tp.threshold_bytes = threshold_bytes; + tp.head_reserve_bytes = head_reserve; + let writer_policy: Box = Box::new(tp); + let reader_policy: Box = Box::new(PrefetchPolicy::new(head_reserve)); + + let buzzer = timely_communication::buzzer::Buzzer::default(); + let (mut writer, mut reader) = MergeQueue::new_pair(buzzer, Some(writer_policy), Some(reader_policy)); + + let start = Instant::now(); + let mut pushed = 0usize; + let mut drained_bytes = 0usize; + let mut drain_buf: Vec = Vec::new(); + + print_rss("start"); + + // Push phase (optionally interleaved with drains). + for i in 0..num_chunks { + let data = vec![(i % 251) as u8; chunk_bytes]; + let bytes = BytesMut::from(data).freeze(); + writer.extend(Some(bytes)); + pushed += chunk_bytes; + + if drain_every > 0 && (i + 1) % drain_every == 0 { + reader.drain_into(&mut drain_buf); + for b in drain_buf.drain(..) { drained_bytes += b.len(); } + } + + if (i + 1) % (num_chunks / 8).max(1) == 0 { + print_rss(&format!("pushed {}/{} MB", pushed >> 20, total_mb)); + } + } + + print_rss("push complete, starting drain"); + + // Drain phase. + loop { + let before = drain_buf.len(); + reader.drain_into(&mut drain_buf); + if drain_buf.len() == before { break; } + for b in drain_buf.drain(..) { drained_bytes += b.len(); } + + if drained_bytes % (total_bytes / 8).max(1) < chunk_bytes { + print_rss(&format!("drained {}/{} MB", drained_bytes >> 20, total_mb)); + } + } + + let elapsed = start.elapsed(); + print_rss("drain complete"); + println!(); + println!("pushed: {} MB", pushed >> 20); + println!("drained: {} MB", drained_bytes >> 20); + println!("elapsed: {:.2?}", elapsed); + + assert_eq!(pushed, drained_bytes, "data loss: pushed {} but drained {}", pushed, drained_bytes); + println!("OK — all bytes recovered."); +} + +fn parse_arg(args: &[String], flag: &str, default: usize) -> usize { + args.iter() + .position(|a| a == flag) + .and_then(|i| args.get(i + 1)) + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +fn get_rss_kb() -> Option { + let pid = std::process::id(); + let output = std::process::Command::new("ps") + .args(["-o", "rss=", "-p", &pid.to_string()]) + .output() + .ok()?; + let text = String::from_utf8_lossy(&output.stdout); + text.trim().parse().ok() +} + +fn print_rss(label: &str) { + match get_rss_kb() { + Some(kb) => println!("[RSS {:>8} KB / {:>6} MB] {}", kb, kb / 1024, label), + None => println!("[RSS unavailable] {}", label), + } +} + +/// File-backed BytesSpill implementation. +/// +/// One tempfile per spill batch. Writes chunks sequentially; reads by +/// slurping the whole file on first fetch. Reference implementation for +/// anyone writing their own BytesSpill backend. +mod file_spill { + use std::fs::File; + use std::io::{Read, Seek, SeekFrom, Write}; + use std::path::PathBuf; + use std::sync::{Arc, Mutex}; + use timely_bytes::arc::{Bytes, BytesMut}; + use timely_communication::allocator::zero_copy::spill::{BytesSpill, BytesFetch}; + + pub struct FileSpillStrategy { + dir: PathBuf, + } + + impl FileSpillStrategy { + pub fn new(dir: PathBuf) -> Self { + FileSpillStrategy { dir } + } + } + + impl BytesSpill for FileSpillStrategy { + fn spill(&mut self, chunks: &mut Vec, handles: &mut Vec>) { + if chunks.is_empty() { return; } + let mut file = match tempfile::tempfile_in(&self.dir) { + Ok(f) => f, + Err(e) => { eprintln!("timely: file spill failed: {}", e); return; } + }; + let mut lens = Vec::with_capacity(chunks.len()); + for chunk in chunks.iter() { + if let Err(e) = file.write_all(&chunk[..]) { + eprintln!("timely: file spill write failed: {}", e); + // Leave remaining chunks for the caller to retain. + return; + } + lens.push(chunk.len()); + } + // All writes succeeded; drain chunks and produce handles. + chunks.clear(); + let state = Arc::new(Mutex::new(FileState::OnDisk { file, lens: lens.clone() })); + handles.extend((0..lens.len()) + .map(|i| Box::new(ChunkHandle { + state: Arc::clone(&state), + index: i, + }) as Box)); + } + } + + enum FileState { + OnDisk { file: File, lens: Vec }, + Slurped { chunks: Vec }, + Placeholder, + } + + struct ChunkHandle { + state: Arc>, + index: usize, + } + + impl BytesFetch for ChunkHandle { + fn fetch(self: Box) -> Result, Box> { + let mut state = self.state.lock().expect("spill state poisoned"); + + if matches!(*state, FileState::OnDisk { .. }) { + let (mut file, lens) = match std::mem::replace(&mut *state, FileState::Placeholder) { + FileState::OnDisk { file, lens } => (file, lens), + _ => unreachable!(), + }; + if let Err(e) = file.seek(SeekFrom::Start(0)) { + eprintln!("spill fetch: seek failed: {}", e); + *state = FileState::OnDisk { file, lens }; + drop(state); + return Err(self); + } + let mut chunks = Vec::with_capacity(lens.len()); + for &len in &lens { + let mut data = vec![0u8; len]; + if let Err(e) = file.read_exact(&mut data) { + eprintln!("spill fetch: read failed: {}", e); + *state = FileState::OnDisk { file, lens }; + drop(state); + return Err(self); + } + chunks.push(BytesMut::from(data).freeze()); + } + *state = FileState::Slurped { chunks }; + } + + let result = match &*state { + FileState::Slurped { chunks } => Ok(vec![chunks[self.index].clone()]), + _ => unreachable!("state should be Slurped after slurp transition"), + }; + drop(state); + result + } + } +} diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 78ba664f8..8c7cf6893 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -117,13 +117,19 @@ impl Push for Broadcaster { } use crate::allocator::zero_copy::bytes_slab::BytesRefill; +use crate::allocator::zero_copy::spill::SpillPolicyFn; /// A builder for vectors of peers. pub(crate) trait PeerBuilder { /// The peer type. type Peer: AllocateBuilder + Sized; /// Allocate a list of `Self::Peer` of length `peers`. - fn new_vector(peers: usize, refill: BytesRefill) -> Vec; + /// + /// `spill` is an optional factory for spill policies; one fresh policy + /// per `MergeQueue` that the resulting peers construct. Implementors + /// that don't use `MergeQueue` (e.g. `Typed` mpsc-based intra-process) + /// ignore it. + fn new_vector(peers: usize, refill: BytesRefill, spill: Option) -> Vec; } @@ -146,16 +152,16 @@ impl ProcessBuilder { } /// Constructs a vector of regular (mpsc-based, "Typed") intra-process builders. - pub fn new_typed_vector(peers: usize, refill: BytesRefill) -> Vec { - ::new_vector(peers, refill) + pub fn new_typed_vector(peers: usize, refill: BytesRefill, spill: Option) -> Vec { + ::new_vector(peers, refill, spill) .into_iter() .map(ProcessBuilder::Typed) .collect() } /// Constructs a vector of binary (zero-copy serialized, "Bytes") intra-process builders. - pub fn new_bytes_vector(peers: usize, refill: BytesRefill) -> Vec { - ::new_vector(peers, refill) + pub fn new_bytes_vector(peers: usize, refill: BytesRefill, spill: Option) -> Vec { + ::new_vector(peers, refill, spill) .into_iter() .map(ProcessBuilder::Bytes) .collect() diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index cc7471f9e..5080d040c 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -75,7 +75,11 @@ impl Process { impl PeerBuilder for Process { type Peer = ProcessBuilder; /// Allocate a list of connected intra-process allocators. - fn new_vector(peers: usize, _refill: crate::allocator::BytesRefill) -> Vec { + fn new_vector( + peers: usize, + _refill: crate::allocator::BytesRefill, + _spill: Option, + ) -> Vec { let mut counters_send = Vec::with_capacity(peers); let mut counters_recv = Vec::with_capacity(peers); diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 489109d03..59ce805a1 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -12,6 +12,7 @@ use crate::{Allocate, Push, Pull}; use crate::allocator::{Process, ProcessBuilder, Exchangeable}; use crate::allocator::canary::Canary; use crate::allocator::zero_copy::bytes_slab::BytesRefill; +use crate::allocator::zero_copy::spill::SpillPolicyFn; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; use super::push_pull::{Pusher, PullerInner}; @@ -29,6 +30,8 @@ pub struct TcpBuilder { promises: Vec>, // to send queues from each network thread. /// Byte slab refill function. refill: BytesRefill, + /// Optional spill factory for recv queues constructed in `build()`. + spill: Option, } /// Creates a vector of builders, sharing appropriate state. @@ -48,6 +51,7 @@ pub(crate) fn new_vector( my_process: usize, processes: usize, refill: BytesRefill, + spill: Option, ) -> (Vec, Vec>>, Vec>>) @@ -72,6 +76,7 @@ pub(crate) fn new_vector( promises, futures, refill: refill.clone(), + spill: spill.clone(), }}) .collect(); @@ -87,9 +92,17 @@ impl TcpBuilder { let mut recvs = Vec::with_capacity(self.peers); for promise in self.promises.into_iter() { let buzzer = crate::buzzer::Buzzer::default(); - let queue = MergeQueue::new(buzzer); - promise.send(queue.clone()).expect("Failed to send MergeQueue"); - recvs.push(queue.clone()); + let (writer, reader) = match self.spill.as_ref() { + Some(build_fn) => { + let (w, r) = build_fn(); + MergeQueue::new_pair(buzzer, Some(w), Some(r)) + } + None => MergeQueue::new_pair(buzzer, None, None), + }; + // `recv_loop` is the writer here (it `extend`s with bytes from + // TCP), so it keeps the original; the worker is the reader. + recvs.push(reader); + promise.send(writer).expect("Failed to send MergeQueue"); } // Extract pusher commitments. diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 8aee89b23..72a1e94b8 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -13,6 +13,7 @@ use crate::{Allocate, Push, Pull}; use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder}; use crate::allocator::canary::Canary; use crate::allocator::zero_copy::bytes_slab::BytesRefill; +use crate::allocator::zero_copy::spill::SpillPolicyFn; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; use super::push_pull::{Pusher, Puller}; @@ -29,6 +30,7 @@ pub struct ProcessBuilder { pushers: Vec>, // for pushing bytes at other workers. pullers: Vec>, // for pulling bytes from other workers. refill: BytesRefill, + spill: Option, // optional spill factory for recv queues. } impl PeerBuilder for ProcessBuilder { @@ -36,7 +38,7 @@ impl PeerBuilder for ProcessBuilder { /// Creates a vector of builders, sharing appropriate state. /// /// This method requires access to a byte exchanger, from which it mints channels. - fn new_vector(count: usize, refill: BytesRefill) -> Vec { + fn new_vector(count: usize, refill: BytesRefill, spill: Option) -> Vec { // Channels for the exchange of `MergeQueue` endpoints. let (pullers_vec, pushers_vec) = crate::promise_futures(count, count); @@ -52,6 +54,7 @@ impl PeerBuilder for ProcessBuilder { pushers, pullers, refill: refill.clone(), + spill: spill.clone(), } ) .collect() @@ -66,9 +69,17 @@ impl ProcessBuilder { let mut recvs = Vec::with_capacity(self.peers); for puller in self.pullers.into_iter() { let buzzer = crate::buzzer::Buzzer::default(); - let queue = MergeQueue::new(buzzer); - puller.send(queue.clone()).expect("Failed to send MergeQueue"); - recvs.push(queue.clone()); + let (writer, reader) = match self.spill.as_ref() { + Some(build_fn) => { + let (w, r) = build_fn(); + MergeQueue::new_pair(buzzer, Some(w), Some(r)) + } + None => MergeQueue::new_pair(buzzer, None, None), + }; + // The puller side is the writer here (the producing worker + // `extend`s into it); this builder owns the reader handle. + recvs.push(reader); + puller.send(writer).expect("Failed to send MergeQueue"); } // Extract pusher commitments. diff --git a/communication/src/allocator/zero_copy/bytes_exchange.rs b/communication/src/allocator/zero_copy/bytes_exchange.rs index d3b61fa6e..bcd0d247c 100644 --- a/communication/src/allocator/zero_copy/bytes_exchange.rs +++ b/communication/src/allocator/zero_copy/bytes_exchange.rs @@ -5,32 +5,36 @@ use std::collections::VecDeque; use timely_bytes::arc::Bytes; use super::bytes_slab::{BytesRefill, BytesSlab}; +use super::spill::{BytesFetch, SpillPolicy}; + +/// An entry in a `MergeQueue`. Either `Bytes` resident in memory, or a +/// handle to bytes previously written out via a `SpillPolicy`. +pub enum QueueEntry { + /// Bytes resident in memory, ready to be consumed directly. + Bytes(Bytes), + /// Bytes spilled to a backing store, fetched via the handle. + Paged(Box), +} /// A target for `Bytes`. pub trait BytesPush { - // /// Pushes bytes at the instance. - // fn push(&mut self, bytes: Bytes); /// Pushes many bytes at the instance. fn extend>(&mut self, iter: I); } /// A source for `Bytes`. pub trait BytesPull { - // /// Pulls bytes from the instance. - // fn pull(&mut self) -> Option; /// Drains many bytes from the instance. fn drain_into(&mut self, vec: &mut Vec); } use std::sync::atomic::{AtomicBool, Ordering}; -/// An unbounded queue of bytes intended for point-to-point communication -/// between threads. Cloning returns another handle to the same queue. -/// -/// TODO: explain "extend" -#[derive(Clone)] +/// An unbounded queue of bytes intended for point-to-point communication between threads. +/// Writer/reader handle pairs are obtained via [`MergeQueue::new_pair`]. pub struct MergeQueue { - queue: Arc>>, // queue of bytes. - buzzer: crate::buzzer::Buzzer, // awakens receiver thread. - panic: Arc, + queue: Arc>>, // queue of entries. + buzzer: crate::buzzer::Buzzer, // awakens receiver thread. + panic: Arc, // used to poison the queue. + policy: Option>, // local policy; extend or drain dispatches it. } impl MergeQueue { @@ -40,8 +44,34 @@ impl MergeQueue { queue: Arc::new(Mutex::new(VecDeque::new())), buzzer, panic: Arc::new(AtomicBool::new(false)), + policy: None, } } + /// Allocates a matched pair of handles on the same underlying queue, + /// each carrying its own policy. The first (writer) runs its policy + /// after each `extend`; the second (reader) runs its policy before + /// each `drain_into`. + pub fn new_pair( + buzzer: crate::buzzer::Buzzer, + writer_policy: Option>, + reader_policy: Option>, + ) -> (MergeQueue, MergeQueue) { + let queue = Arc::new(Mutex::new(VecDeque::new())); + let panic = Arc::new(AtomicBool::new(false)); + let writer = MergeQueue { + queue: Arc::clone(&queue), + buzzer: buzzer.clone(), + panic: Arc::clone(&panic), + policy: writer_policy, + }; + let reader = MergeQueue { + queue, + buzzer, + panic, + policy: reader_policy, + }; + (writer, reader) + } /// Indicates that all input handles to the queue have dropped. pub fn is_complete(&self) -> bool { if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); } @@ -64,25 +94,34 @@ impl BytesPush for MergeQueue { let mut iterator = iterator.into_iter(); let mut should_ping = false; if let Some(bytes) = iterator.next() { - let mut tail = if let Some(mut tail) = queue.pop_back() { - if let Err(bytes) = tail.try_merge(bytes) { - queue.push_back(::std::mem::replace(&mut tail, bytes)); + let mut tail = match queue.pop_back() { + Some(QueueEntry::Bytes(mut tail)) => { + if let Err(bytes) = tail.try_merge(bytes) { + queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, bytes))); + } + tail + } + Some(paged @ QueueEntry::Paged(_)) => { + queue.push_back(paged); + bytes + } + None => { + should_ping = true; + bytes } - tail - } - else { - should_ping = true; - bytes }; for more_bytes in iterator { if let Err(more_bytes) = tail.try_merge(more_bytes) { - queue.push_back(::std::mem::replace(&mut tail, more_bytes)); + queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, more_bytes))); } } - queue.push_back(tail); + queue.push_back(QueueEntry::Bytes(tail)); } + // Dispatch the spill policy, if any, while the lock is still held. + if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); } + // Wakeup corresponding thread *after* releasing the lock ::std::mem::drop(queue); if should_ping { @@ -102,7 +141,24 @@ impl BytesPull for MergeQueue { } let mut queue = lock_ok.expect("MergeQueue mutex poisoned."); - vec.extend(queue.drain(..)); + // If a reader-side policy is installed, let it materialize Paged + // entries near the front of the queue (up to its own budget). + if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); } + + // Drain Bytes entries from the front. Stop at the first Paged entry + // (which the policy chose not to materialize) or the empty queue. + while let Some(QueueEntry::Bytes(_)) = queue.front() { + if let Some(QueueEntry::Bytes(b)) = queue.pop_front() { + vec.push(b); + } + } + + // If we produced nothing but the queue isn't empty, something is + // stuck (failed fetch, no reader policy, budget exhausted). Buzz + // to ensure the consumer retries rather than parking. + if vec.is_empty() && !queue.is_empty() { + self.buzzer.buzz(); + } } } @@ -120,6 +176,7 @@ impl Drop for MergeQueue { } // Drop the queue before pinging. self.queue = Arc::new(Mutex::new(VecDeque::new())); + self.policy = None; self.buzzer.buzz(); } } diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 12a008dc8..02aa45afd 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -1,9 +1,6 @@ //! Network initialization. use std::sync::Arc; -use timely_logging::Logger; -use crate::allocator::zero_copy::bytes_slab::BytesRefill; -use crate::logging::CommunicationEventBuilder; use crate::networking::create_sockets; use super::tcp::{send_loop, recv_loop}; use crate::allocator::ProcessBuilder; @@ -47,13 +44,12 @@ pub fn initialize_networking( my_index: usize, threads: usize, noisy: bool, - refill: BytesRefill, - log_sender: ArcOption>+Send+Sync>, + hooks: crate::Hooks, ) -> ::std::io::Result<(Vec, CommsGuard)> { let sockets = create_sockets(addresses, my_index, noisy)?; - initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, refill, log_sender) + initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, hooks) } /// Initialize send and recv threads from sockets. @@ -68,8 +64,7 @@ pub fn initialize_networking_from_sockets( mut sockets: Vec>, my_index: usize, threads: usize, - refill: BytesRefill, - log_sender: ArcOption>+Send+Sync>, + hooks: crate::Hooks, ) -> ::std::io::Result<(Vec, CommsGuard)> { @@ -80,22 +75,22 @@ pub fn initialize_networking_from_sockets( let processes = sockets.len(); - let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone()); + let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, hooks.refill.clone(), hooks.spill.clone()); let mut promises_iter = promises.into_iter(); let mut futures_iter = futures.into_iter(); let mut send_guards = Vec::with_capacity(sockets.len()); let mut recv_guards = Vec::with_capacity(sockets.len()); - let refill = refill.clone(); // for each process, if a stream exists (i.e. not local) ... for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) { let remote_recv = promises_iter.next().unwrap(); { - let log_sender = Arc::clone(&log_sender); + let log_sender = Arc::clone(&hooks.log_fn); let stream = stream.try_clone()?; + let spill = hooks.spill.clone(); let join_guard = ::std::thread::Builder::new() .name(format!("timely:send-{}", index)) @@ -107,7 +102,7 @@ pub fn initialize_networking_from_sockets( remote: Some(index), }); - send_loop(stream, remote_recv, my_index, index, logger); + send_loop(stream, remote_recv, my_index, index, spill, logger); })?; send_guards.push(join_guard); @@ -117,9 +112,9 @@ pub fn initialize_networking_from_sockets( { // let remote_sends = remote_sends.clone(); - let log_sender = Arc::clone(&log_sender); + let log_sender = Arc::clone(&hooks.log_fn); let stream = stream.try_clone()?; - let refill = refill.clone(); + let refill = hooks.refill.clone(); let join_guard = ::std::thread::Builder::new() .name(format!("timely:recv-{}", index)) diff --git a/communication/src/allocator/zero_copy/mod.rs b/communication/src/allocator/zero_copy/mod.rs index 7d0de7c01..8d92f05d4 100644 --- a/communication/src/allocator/zero_copy/mod.rs +++ b/communication/src/allocator/zero_copy/mod.rs @@ -10,6 +10,7 @@ pub mod bytes_slab; pub mod bytes_exchange; +pub mod spill; pub mod tcp; pub mod allocator; pub mod allocator_process; diff --git a/communication/src/allocator/zero_copy/spill.rs b/communication/src/allocator/zero_copy/spill.rs new file mode 100644 index 000000000..65d62780b --- /dev/null +++ b/communication/src/allocator/zero_copy/spill.rs @@ -0,0 +1,287 @@ +//! Spill strategies and policies for `MergeQueue` entries under memory pressure. +//! +//! Three traits compose here: +//! +//! - [`SpillPolicy`] decides *whether and how* a queue should be reshaped at +//! each `extend`. It is handed the raw `VecDeque` under the +//! queue's mutex and may replace entries freely. +//! - [`BytesSpill`] decides *where* bytes go when a policy chooses to spill. +//! Pluggable: file, object store, mlock pool, in-memory mock for tests. +//! - [`BytesFetch`] is the handle returned by a `BytesSpill`; it reads the +//! spilled bytes back, consuming itself in the process. +//! +//! The shipped [`threshold::Threshold`] pairs a `BytesSpill` strategy with +//! threshold/reserve/budget knobs and encodes the "spill the middle of the +//! queue when resident bytes get too large" heuristic. Other policies can +//! make entirely different decisions (memory-pressure-driven, periodic, +//! manual trigger, adaptive) using the same strategies. + +use std::collections::VecDeque; +use std::sync::Arc; + +use timely_bytes::arc::Bytes; + +use super::bytes_exchange::QueueEntry; + +/// A function that produces pairs of writer and reader [`SpillPolicy`]s. +/// +/// This type is the entry point to spilling, and the two returned policies +/// contain the opinions about how to handle excess data for an instance of +/// a `MergeQueue`. +pub type SpillPolicyFn = Arc (Box, Box) + Send + Sync>; + +/// Inspects and optionally rewrites a `MergeQueue`'s entries. +pub trait SpillPolicy: Send { + /// Optionally transforms some (ranges of) queue entries. + /// + /// This trait is used both for spilling and rehydrating, and just acts + /// on the list of queue entries, rewriting ranges of them as it sees fit. + /// This is intented for spilling data to secondary storage, but can also + /// be used for compression, or other mechanisms to reduce resource load. + fn apply(&mut self, queue: &mut VecDeque); +} + +/// A type that can convert runs of bytes into runs of boxed bytes retrieval. +pub trait BytesSpill: Send { + /// Move entries from `chunks` into `handles`, spilling each to backing storage. + /// + /// The implementor should drain from `chunks` and push to `handles`as it goes; + /// on failure it may stop partway, leaving the data in a consistent state that + /// will be retried in the future. If it cannot leave the lists in a consistent + /// state it should panic. + fn spill(&mut self, chunks: &mut Vec, handles: &mut Vec>); +} + +/// A consume-once handle to bytes previously written via a [`BytesSpill`]. +pub trait BytesFetch: Send { + /// Consume the handle and return the spilled payload as `Bytes`. + /// + /// On failure, the handle is returned so the caller can retry later. + fn fetch(self: Box) -> Result, Box>; +} + +/// Writer-side spill policy: threshold-based, spills the middle of the queue. +pub mod threshold { + use super::*; + + /// A threshold-based [`SpillPolicy`]: when a queue's resident-byte count + /// exceeds `head_reserve_bytes + threshold_bytes`, spill all entries past + /// the head reserve (except the last entry, which stays as the `try_merge` + /// target). + pub struct Threshold { + strategy: Box, + /// Spillable surplus: spill is considered when resident bytes exceed + /// `head_reserve_bytes + threshold_bytes`. + pub threshold_bytes: usize, + /// Bytes near the head of the queue stay resident, protecting the + /// consumer from an immediate page-in stall. + pub head_reserve_bytes: usize, + } + + impl Threshold { + /// Create a new threshold policy with default knobs, dispatching spills + /// through `strategy`. + pub fn new(strategy: Box) -> Self { + Threshold { + strategy, + threshold_bytes: 256 << 20, // 256 MB + head_reserve_bytes: 64 << 20, // 64 MB + } + } + } + + impl SpillPolicy for Threshold { + fn apply(&mut self, queue: &mut VecDeque) { + let resident: usize = queue.iter().map(|e| match e { + QueueEntry::Bytes(b) => b.len(), + QueueEntry::Paged(_) => 0, + }).sum(); + if resident <= self.head_reserve_bytes + self.threshold_bytes { + return; + } + + let head_reserve = self.head_reserve_bytes; + + let mut cumulative: usize = 0; + let last_index = queue.len().saturating_sub(1); + let mut target_indices: Vec = Vec::new(); + let mut target_bytes: Vec = Vec::new(); + for (i, entry) in queue.iter().enumerate() { + if i == last_index { break; } + match entry { + QueueEntry::Bytes(b) => { + if cumulative >= head_reserve { + target_indices.push(i); + target_bytes.push(b.clone()); + } + cumulative += b.len(); + } + QueueEntry::Paged(_) => {} + } + } + + if target_bytes.is_empty() { + return; + } + + let mut handles: Vec> = Vec::new(); + self.strategy.spill(&mut target_bytes, &mut handles); + // Replace queue entries for however many chunks were spilled. + for (i, handle) in target_indices.into_iter().zip(handles) { + queue[i] = QueueEntry::Paged(handle); + } + // Remaining entries in target_bytes (if any) stay Resident. + } + } +} + +/// Reader-side policy: materializes `Paged` entries near the front. +pub mod prefetch { + use super::*; + + /// A reader-side [`SpillPolicy`] that materializes `Paged` entries near + /// the front of the queue up to a byte budget. The writer-side dual of + /// [`super::threshold::Threshold`]: the writer pages data out, the reader + /// pages data back in — both through `SpillPolicy::apply`. + pub struct PrefetchPolicy { + /// Maximum bytes to materialize per `apply` invocation. + pub budget: usize, + } + + impl PrefetchPolicy { + /// Create a prefetch policy with the given byte budget. + pub fn new(budget: usize) -> Self { + PrefetchPolicy { budget } + } + } + + impl SpillPolicy for PrefetchPolicy { + fn apply(&mut self, queue: &mut VecDeque) { + let mut resident_head = 0; + let mut i = 0; + while i < queue.len() && resident_head < self.budget { + match &queue[i] { + QueueEntry::Bytes(b) => { + resident_head += b.len(); + i += 1; + } + QueueEntry::Paged(_) => { + let entry = queue.remove(i).expect("index valid"); + if let QueueEntry::Paged(h) = entry { + match h.fetch() { + Ok(fetched) => { + let n = fetched.len(); + for (j, b) in fetched.into_iter().enumerate() { + resident_head += b.len(); + queue.insert(i + j, QueueEntry::Bytes(b)); + } + i += n; + } + Err(h) => { + // Fetch failed; put the handle back and stop. + queue.insert(i, QueueEntry::Paged(h)); + break; + } + } + } + } + } + } + } + } +} + +// Re-export the key types at the spill module level. +pub use threshold::Threshold; +pub use prefetch::PrefetchPolicy; + +#[cfg(test)] +mod tests { + use super::*; + + fn bytes_of(data: &[u8]) -> Bytes { + timely_bytes::arc::BytesMut::from(data.to_vec()).freeze() + } + + struct MockStrategy; + struct MockHandle { data: Bytes } + impl BytesSpill for MockStrategy { + fn spill(&mut self, chunks: &mut Vec, handles: &mut Vec>) { + handles.extend(chunks.drain(..) + .map(|b| Box::new(MockHandle { data: b }) as Box)); + } + } + impl BytesFetch for MockHandle { + fn fetch(self: Box) -> Result, Box> { Ok(vec![self.data]) } + } + + #[test] + fn eager_policy_moves_middle_entries() { + struct EagerPolicy { strategy: Box } + impl SpillPolicy for EagerPolicy { + fn apply(&mut self, queue: &mut VecDeque) { + let last = queue.len().saturating_sub(1); + let mut indices = Vec::new(); + let mut bytes = Vec::new(); + for (i, entry) in queue.iter().enumerate() { + if i == last { break; } + if let QueueEntry::Bytes(b) = entry { + indices.push(i); + bytes.push(b.clone()); + } + } + if bytes.is_empty() { return; } + let mut handles = Vec::new(); + self.strategy.spill(&mut bytes, &mut handles); + for (i, h) in indices.into_iter().zip(handles) { + queue[i] = QueueEntry::Paged(h); + } + } + } + + let mut p = EagerPolicy { strategy: Box::new(MockStrategy) }; + let mut queue: VecDeque = VecDeque::new(); + for i in 0..4 { + queue.push_back(QueueEntry::Bytes(bytes_of(&[i as u8; 8]))); + } + p.apply(&mut queue); + assert!(matches!(queue[0], QueueEntry::Paged(_))); + assert!(matches!(queue[1], QueueEntry::Paged(_))); + assert!(matches!(queue[2], QueueEntry::Paged(_))); + assert!(matches!(queue[3], QueueEntry::Bytes(_))); + } + + #[test] + fn merge_queue_spill_roundtrip_mock() { + use super::super::bytes_exchange::{MergeQueue, BytesPush, BytesPull}; + + let head_reserve = 128; + let mut tp = Threshold::new(Box::new(MockStrategy)); + tp.threshold_bytes = 512; + tp.head_reserve_bytes = head_reserve; + let writer_policy: Box = Box::new(tp); + let reader_policy: Box = Box::new(PrefetchPolicy::new(head_reserve)); + + let buzzer = crate::buzzer::Buzzer::default(); + let (mut writer, mut reader) = + MergeQueue::new_pair(buzzer, Some(writer_policy), Some(reader_policy)); + + let mut expected: Vec> = Vec::new(); + for i in 0..100 { + let data = vec![(i % 251) as u8; 64]; + expected.push(data.clone()); + writer.extend(Some(bytes_of(&data))); + } + + let mut received: Vec = Vec::new(); + loop { + let before = received.len(); + reader.drain_into(&mut received); + if received.len() == before { break; } + } + + let expected_flat: Vec = expected.into_iter().flatten().collect(); + let received_flat: Vec = received.iter().flat_map(|b| b.iter().copied()).collect(); + assert_eq!(expected_flat, received_flat); + } +} diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index bbed78cfb..918b718dd 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -7,6 +7,7 @@ use crate::networking::MessageHeader; use super::bytes_slab::{BytesRefill, BytesSlab}; use super::bytes_exchange::MergeQueue; +use super::spill::SpillPolicyFn; use super::stream::Stream; use timely_logging::Logger; @@ -139,17 +140,26 @@ pub fn send_loop( sources: Vec>, process: usize, remote: usize, + spill: Option, logger: Option>) { let mut logger = logger.map(|logger| logger.into_typed::()); // Log the send thread's start. logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, })); + // Send the policy-bearing queue to the worker (writer); keep a + // reader-side handle for ourselves. let mut sources: Vec = sources.into_iter().map(|x| { let buzzer = crate::buzzer::Buzzer::default(); - let queue = MergeQueue::new(buzzer); - x.send(queue.clone()).expect("failed to send MergeQueue"); - queue + let (writer, reader) = match spill.as_ref() { + Some(build_fn) => { + let (w, r) = build_fn(); + MergeQueue::new_pair(buzzer, Some(w), Some(r)) + } + None => MergeQueue::new_pair(buzzer, None, None), + }; + x.send(writer).expect("failed to send MergeQueue"); + reader }).collect(); let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer); diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 914afba5f..dcd856917 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -38,8 +38,6 @@ pub enum Config { report: bool, /// Enable intra-process zero-copy zerocopy: bool, - /// Closure to create a new logger for a communication thread - log_fn: Arc Option> + Send + Sync>, } } @@ -49,7 +47,7 @@ impl Debug for Config { Config::Thread => write!(f, "Config::Thread()"), Config::Process(n) => write!(f, "Config::Process({})", n), Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n), - Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f + Config::Cluster { threads, process, addresses, report, zerocopy } => f .debug_struct("Config::Cluster") .field("threads", threads) .field("process", process) @@ -61,6 +59,32 @@ impl Debug for Config { } } +/// Configuration hooks that (currently) live outside the configuration. +/// +/// Fields are public so callers can mutate `Hooks::default()` before +/// passing it to `try_build_with`. +pub struct Hooks { + /// A mechanism to set up loggers for each communication thread. + pub log_fn: Arc Option> + Send + Sync>, + /// A strategy for refreshing bytes for `BytesSlab`. + pub refill: BytesRefill, + /// A mechanism to get a matched pair of spill policies (writer, reader) per queue. + pub spill: Option, +} + +impl Default for Hooks { + fn default() -> Self { + Self { + log_fn: Arc::new(|_| None), + refill: BytesRefill { + logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box>), + limit: None, + }, + spill: None, + } + } +} + impl Config { /// Installs options into a [`getopts::Options`] struct that corresponds /// to the parameters in the configuration. @@ -122,7 +146,6 @@ impl Config { addresses, report, zerocopy, - log_fn: Arc::new(|_| None), }) } else if threads > 1 { if zerocopy { @@ -152,45 +175,41 @@ impl Config { /// Attempts to assemble the described communication infrastructure. pub fn try_build(self) -> Result<(Vec, Box), String> { - let refill = BytesRefill { - logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box>), - limit: None, - }; - self.try_build_with(refill) + self.try_build_with(Hooks::default()) } /// Attempts to assemble the described communication infrastructure, using the supplied refill function. - pub fn try_build_with(self, refill: BytesRefill) -> Result<(Vec, Box), String> { + pub fn try_build_with(self, hooks: Hooks) -> Result<(Vec, Box), String> { match self { Config::Thread => { Ok((vec![AllocatorBuilder::Thread(ThreadBuilder)], Box::new(()))) }, Config::Process(threads) => { - let builders = ProcessBuilder::new_typed_vector(threads, refill) + let builders = ProcessBuilder::new_typed_vector(threads, hooks.refill, hooks.spill) .into_iter() .map(AllocatorBuilder::Process) .collect(); Ok((builders, Box::new(()))) }, Config::ProcessBinary(threads) => { - let builders = ProcessBuilder::new_bytes_vector(threads, refill) + let builders = ProcessBuilder::new_bytes_vector(threads, hooks.refill, hooks.spill) .into_iter() .map(AllocatorBuilder::Process) .collect(); Ok((builders, Box::new(()))) }, - Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => { - let process_allocators = ProcessBuilder::new_typed_vector(threads, refill.clone()); - match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) { + Config::Cluster { threads, process, addresses, report, zerocopy: false } => { + let process_allocators = ProcessBuilder::new_typed_vector(threads, hooks.refill.clone(), hooks.spill.clone()); + match initialize_networking(process_allocators, addresses, process, threads, report, hooks) { Ok((stuff, guard)) => { Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard))) }, Err(err) => Err(format!("failed to initialize networking: {}", err)) } }, - Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => { - let process_allocators = ProcessBuilder::new_bytes_vector(threads, refill.clone()); - match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) { + Config::Cluster { threads, process, addresses, report, zerocopy: true } => { + let process_allocators = ProcessBuilder::new_bytes_vector(threads, hooks.refill.clone(), hooks.spill.clone()); + match initialize_networking(process_allocators, addresses, process, threads, report, hooks) { Ok((stuff, guard)) => { Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard))) }, diff --git a/communication/src/lib.rs b/communication/src/lib.rs index c594c37da..3daac7b43 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -102,7 +102,7 @@ pub mod buzzer; use allocator::Allocate; pub use allocator::{Allocator, AllocatorBuilder, Exchangeable}; -pub use initialize::{initialize, initialize_from, Config, WorkerGuards}; +pub use initialize::{initialize, initialize_from, Config, Hooks, WorkerGuards}; use std::sync::mpsc::{Sender, Receiver};