Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ timely_bytes = { path = "../bytes", version = "0.29" }
timely_container = { path = "../container", version = "0.29.0" }
timely_logging = { path = "../logging", version = "0.29" }

[dev-dependencies]
tempfile = "3"

# Lgalloc only supports linux and macos, don't depend on any other OS.
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
lgalloc = "0.6"
3 changes: 2 additions & 1 deletion communication/examples/lgalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ mod example {

// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::ProcessBinary(4);
let (allocators, others) = config.try_build_with(refill).unwrap();
let hooks = timely_communication::Hooks { refill, ..Default::default() };
let (allocators, others) = config.try_build_with(hooks).unwrap();
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {

println!("worker {} of {} started", allocator.index(), allocator.peers());
Expand Down
238 changes: 238 additions & 0 deletions communication/examples/spill_stress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
//! Pushes a configurable volume of data through a MergeQueue with spill
//! enabled, printing RSS at intervals. Useful for verifying that the spill
//! mechanism keeps memory bounded under load.
//!
//! Also serves as a reference implementation of a file-backed `BytesSpill`
//! strategy, demonstrating how to implement the spill traits against a
//! concrete storage backend.
//!
//! Usage:
//! cargo run --example spill_stress -p timely_communication -- [OPTIONS]
//!
//! Options:
//! --total-mb N Total data to push (default: 512)
//! --chunk-kb N Size of each pushed Bytes (default: 256)
//! --threshold-mb N Spill threshold (default: 32)
//! --head-reserve-mb N Head reserve / prefetch budget (default: 16)
//! --spill-dir PATH Directory for tempfiles (default: std::env::temp_dir())
//! --drain-every N Drain after every N pushes (default: 0 = push all then drain)

use std::time::Instant;

use timely_bytes::arc::BytesMut;
use timely_communication::allocator::zero_copy::bytes_exchange::{MergeQueue, BytesPush, BytesPull};
use timely_communication::allocator::zero_copy::spill::*;

fn main() {
let args: Vec<String> = std::env::args().collect();

let total_mb: usize = parse_arg(&args, "--total-mb", 512);
let chunk_kb: usize = parse_arg(&args, "--chunk-kb", 256);
let threshold_mb: usize = parse_arg(&args, "--threshold-mb", 32);
let head_reserve_mb:usize = parse_arg(&args, "--head-reserve-mb",16);
let drain_every: usize = parse_arg(&args, "--drain-every", 0);
let spill_dir: std::path::PathBuf = args.iter()
.position(|a| a == "--spill-dir")
.and_then(|i| args.get(i + 1))
.map(std::path::PathBuf::from)
.unwrap_or_else(std::env::temp_dir);

let total_bytes = total_mb << 20;
let chunk_bytes = chunk_kb << 10;
let threshold_bytes = threshold_mb << 20;
let head_reserve = head_reserve_mb << 20;
let num_chunks = total_bytes / chunk_bytes;

println!("spill_stress configuration:");
println!(" total: {} MB ({} chunks of {} KB)", total_mb, num_chunks, chunk_kb);
println!(" threshold: {} MB", threshold_mb);
println!(" head_reserve: {} MB", head_reserve_mb);
println!(" drain_every: {}", if drain_every == 0 { "push-all-then-drain".to_string() } else { format!("{}", drain_every) });
println!(" spill_dir: {}", spill_dir.display());
println!();

// Build writer + reader pair.
let strategy: Box<dyn BytesSpill> =
Box::new(file_spill::FileSpillStrategy::new(spill_dir));
let mut tp = Threshold::new(strategy);
tp.threshold_bytes = threshold_bytes;
tp.head_reserve_bytes = head_reserve;
let writer_policy: Box<dyn SpillPolicy> = Box::new(tp);
let reader_policy: Box<dyn SpillPolicy> = Box::new(PrefetchPolicy::new(head_reserve));

let buzzer = timely_communication::buzzer::Buzzer::default();
let (mut writer, mut reader) = MergeQueue::new_pair(buzzer, Some(writer_policy), Some(reader_policy));

let start = Instant::now();
let mut pushed = 0usize;
let mut drained_bytes = 0usize;
let mut drain_buf: Vec<timely_bytes::arc::Bytes> = Vec::new();

print_rss("start");

// Push phase (optionally interleaved with drains).
for i in 0..num_chunks {
let data = vec![(i % 251) as u8; chunk_bytes];
let bytes = BytesMut::from(data).freeze();
writer.extend(Some(bytes));
pushed += chunk_bytes;

if drain_every > 0 && (i + 1) % drain_every == 0 {
reader.drain_into(&mut drain_buf);
for b in drain_buf.drain(..) { drained_bytes += b.len(); }
}

if (i + 1) % (num_chunks / 8).max(1) == 0 {
print_rss(&format!("pushed {}/{} MB", pushed >> 20, total_mb));
}
}

print_rss("push complete, starting drain");

// Drain phase.
loop {
let before = drain_buf.len();
reader.drain_into(&mut drain_buf);
if drain_buf.len() == before { break; }
for b in drain_buf.drain(..) { drained_bytes += b.len(); }

if drained_bytes % (total_bytes / 8).max(1) < chunk_bytes {
print_rss(&format!("drained {}/{} MB", drained_bytes >> 20, total_mb));
}
}

let elapsed = start.elapsed();
print_rss("drain complete");
println!();
println!("pushed: {} MB", pushed >> 20);
println!("drained: {} MB", drained_bytes >> 20);
println!("elapsed: {:.2?}", elapsed);

assert_eq!(pushed, drained_bytes, "data loss: pushed {} but drained {}", pushed, drained_bytes);
println!("OK — all bytes recovered.");
}

fn parse_arg(args: &[String], flag: &str, default: usize) -> usize {
args.iter()
.position(|a| a == flag)
.and_then(|i| args.get(i + 1))
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}

fn get_rss_kb() -> Option<u64> {
let pid = std::process::id();
let output = std::process::Command::new("ps")
.args(["-o", "rss=", "-p", &pid.to_string()])
.output()
.ok()?;
let text = String::from_utf8_lossy(&output.stdout);
text.trim().parse().ok()
}

fn print_rss(label: &str) {
match get_rss_kb() {
Some(kb) => println!("[RSS {:>8} KB / {:>6} MB] {}", kb, kb / 1024, label),
None => println!("[RSS unavailable] {}", label),
}
}

/// File-backed BytesSpill implementation.
///
/// One tempfile per spill batch. Writes chunks sequentially; reads by
/// slurping the whole file on first fetch. Reference implementation for
/// anyone writing their own BytesSpill backend.
mod file_spill {
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use timely_bytes::arc::{Bytes, BytesMut};
use timely_communication::allocator::zero_copy::spill::{BytesSpill, BytesFetch};

pub struct FileSpillStrategy {
dir: PathBuf,
}

impl FileSpillStrategy {
pub fn new(dir: PathBuf) -> Self {
FileSpillStrategy { dir }
}
}

impl BytesSpill for FileSpillStrategy {
fn spill(&mut self, chunks: &mut Vec<Bytes>, handles: &mut Vec<Box<dyn BytesFetch>>) {
if chunks.is_empty() { return; }
let mut file = match tempfile::tempfile_in(&self.dir) {
Ok(f) => f,
Err(e) => { eprintln!("timely: file spill failed: {}", e); return; }
};
let mut lens = Vec::with_capacity(chunks.len());
for chunk in chunks.iter() {
if let Err(e) = file.write_all(&chunk[..]) {
eprintln!("timely: file spill write failed: {}", e);
// Leave remaining chunks for the caller to retain.
return;
}
lens.push(chunk.len());
}
// All writes succeeded; drain chunks and produce handles.
chunks.clear();
let state = Arc::new(Mutex::new(FileState::OnDisk { file, lens: lens.clone() }));
handles.extend((0..lens.len())
.map(|i| Box::new(ChunkHandle {
state: Arc::clone(&state),
index: i,
}) as Box<dyn BytesFetch>));
}
}

enum FileState {
OnDisk { file: File, lens: Vec<usize> },
Slurped { chunks: Vec<Bytes> },
Placeholder,
}

struct ChunkHandle {
state: Arc<Mutex<FileState>>,
index: usize,
}

impl BytesFetch for ChunkHandle {
fn fetch(self: Box<Self>) -> Result<Vec<Bytes>, Box<dyn BytesFetch>> {
let mut state = self.state.lock().expect("spill state poisoned");

if matches!(*state, FileState::OnDisk { .. }) {
let (mut file, lens) = match std::mem::replace(&mut *state, FileState::Placeholder) {
FileState::OnDisk { file, lens } => (file, lens),
_ => unreachable!(),
};
if let Err(e) = file.seek(SeekFrom::Start(0)) {
eprintln!("spill fetch: seek failed: {}", e);
*state = FileState::OnDisk { file, lens };
drop(state);
return Err(self);
}
let mut chunks = Vec::with_capacity(lens.len());
for &len in &lens {
let mut data = vec![0u8; len];
if let Err(e) = file.read_exact(&mut data) {
eprintln!("spill fetch: read failed: {}", e);
*state = FileState::OnDisk { file, lens };
drop(state);
return Err(self);
}
chunks.push(BytesMut::from(data).freeze());
}
*state = FileState::Slurped { chunks };
}

let result = match &*state {
FileState::Slurped { chunks } => Ok(vec![chunks[self.index].clone()]),
_ => unreachable!("state should be Slurped after slurp transition"),
};
drop(state);
result
}
}
}
16 changes: 11 additions & 5 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,19 @@ impl<T: Clone> Push<T> for Broadcaster<T> {
}

use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use crate::allocator::zero_copy::spill::SpillPolicyFn;

/// A builder for vectors of peers.
pub(crate) trait PeerBuilder {
/// The peer type.
type Peer: AllocateBuilder + Sized;
/// Allocate a list of `Self::Peer` of length `peers`.
fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
///
/// `spill` is an optional factory for spill policies; one fresh policy
/// per `MergeQueue` that the resulting peers construct. Implementors
/// that don't use `MergeQueue` (e.g. `Typed` mpsc-based intra-process)
/// ignore it.
fn new_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self::Peer>;
}


Expand All @@ -146,16 +152,16 @@ impl ProcessBuilder {
}

/// Constructs a vector of regular (mpsc-based, "Typed") intra-process builders.
pub fn new_typed_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
<TypedProcess as PeerBuilder>::new_vector(peers, refill)
pub fn new_typed_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
<TypedProcess as PeerBuilder>::new_vector(peers, refill, spill)
.into_iter()
.map(ProcessBuilder::Typed)
.collect()
}

/// Constructs a vector of binary (zero-copy serialized, "Bytes") intra-process builders.
pub fn new_bytes_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
<BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill)
pub fn new_bytes_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
<BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill, spill)
.into_iter()
.map(ProcessBuilder::Bytes)
.collect()
Expand Down
6 changes: 5 additions & 1 deletion communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl Process {
impl PeerBuilder for Process {
type Peer = ProcessBuilder;
/// Allocate a list of connected intra-process allocators.
fn new_vector(peers: usize, _refill: crate::allocator::BytesRefill) -> Vec<ProcessBuilder> {
fn new_vector(
peers: usize,
_refill: crate::allocator::BytesRefill,
_spill: Option<crate::allocator::zero_copy::spill::SpillPolicyFn>,
) -> Vec<ProcessBuilder> {

let mut counters_send = Vec::with_capacity(peers);
let mut counters_recv = Vec::with_capacity(peers);
Expand Down
19 changes: 16 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{Allocate, Push, Pull};
use crate::allocator::{Process, ProcessBuilder, Exchangeable};
use crate::allocator::canary::Canary;
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use crate::allocator::zero_copy::spill::SpillPolicyFn;
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, PullerInner};

Expand All @@ -29,6 +30,8 @@ pub struct TcpBuilder {
promises: Vec<Sender<MergeQueue>>, // to send queues from each network thread.
/// Byte slab refill function.
refill: BytesRefill,
/// Optional spill factory for recv queues constructed in `build()`.
spill: Option<SpillPolicyFn>,
}

/// Creates a vector of builders, sharing appropriate state.
Expand All @@ -48,6 +51,7 @@ pub(crate) fn new_vector(
my_process: usize,
processes: usize,
refill: BytesRefill,
spill: Option<SpillPolicyFn>,
) -> (Vec<TcpBuilder>,
Vec<Vec<Sender<MergeQueue>>>,
Vec<Vec<Receiver<MergeQueue>>>)
Expand All @@ -72,6 +76,7 @@ pub(crate) fn new_vector(
promises,
futures,
refill: refill.clone(),
spill: spill.clone(),
}})
.collect();

Expand All @@ -87,9 +92,17 @@ impl TcpBuilder {
let mut recvs = Vec::with_capacity(self.peers);
for promise in self.promises.into_iter() {
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
promise.send(queue.clone()).expect("Failed to send MergeQueue");
recvs.push(queue.clone());
let (writer, reader) = match self.spill.as_ref() {
Some(build_fn) => {
let (w, r) = build_fn();
MergeQueue::new_pair(buzzer, Some(w), Some(r))
}
None => MergeQueue::new_pair(buzzer, None, None),
};
// `recv_loop` is the writer here (it `extend`s with bytes from
// TCP), so it keeps the original; the worker is the reader.
recvs.push(reader);
promise.send(writer).expect("Failed to send MergeQueue");
}

// Extract pusher commitments.
Expand Down
Loading
Loading