Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl<T: Timestamp> Capability<T> {
// to send data and request notification at the associated timestamp.
impl<T: Timestamp> Drop for Capability<T> {
fn drop(&mut self) {
self.internal.borrow_mut().update(self.time.clone(), -1);
let time = ::std::mem::replace(&mut self.time, T::minimum());
self.internal.borrow_mut().update(time, -1);
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl<T: Timestamp, C: Container> Capture<T, C> for Stream<'_, T, C> {
if !progress.frontiers[0].is_empty() {
// transmit any frontier progress.
let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
event_pusher.push(Event::Progress(to_send.into_inner().into_vec()));
}

// turn each received message into an event.
Expand Down
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
fn outputs(&self) -> usize { self.shape.outputs }

// announce internal topology as fully connected, and hold all default capabilities.
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
fn initialize(mut self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {

// Request the operator to be scheduled at least once.
self.activations.borrow_mut().activate(&self.address[..]);
Expand All @@ -242,7 +242,8 @@ where
.iter_mut()
.for_each(|output| output.update(T::minimum(), self.shape.peers as i64));

(self.summary.clone(), Rc::clone(&self.shared_progress), self)
// The summary is not read again; move it out rather than clone it.
(::std::mem::take(&mut self.summary), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
Expand Down
17 changes: 11 additions & 6 deletions timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,19 @@ impl<T: Timestamp> FrontierNotificator<T> {
}
self.pending.retain(|x| x.1 > 0);

for i in 0 .. self.pending.len() {
if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
// TODO : This clones a capability, whereas we could move it instead.
self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
self.pending[i].1 = 0;
// Move available capabilities rather than cloning them, which would
// tour a spurious increment and decrement through progress tracking.
// `available` is a heap, so the order disruption is harmless.
let mut index = 0;
while index < self.pending.len() {
if frontiers.iter().all(|f| !f.less_equal(&self.pending[index].0)) {
let (capability, count) = self.pending.swap_remove(index);
self.available.push(OrderReversed::new(capability, count));
}
else {
index += 1;
}
}
self.pending.retain(|x| x.1 > 0);
}
}

Expand Down
6 changes: 4 additions & 2 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ impl<T:Timestamp+Send> Progcaster<T> {
});
});

// We clone rather than drain to avoid deserialization.
changes.extend(recv_changes.iter().map(|(u,d)| (u.clone(), *d)));
// The puller yields an owned, already deserialized message, which we
// may drain. A zero-copy receive path would hand us a borrowed view
// instead, and this would return to cloning.
changes.extend(recv_changes.drain());
}

}
Expand Down
19 changes: 12 additions & 7 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ where
// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
builder.add_node(0, outputs, inputs, summary);
for (index, child) in children.iter().enumerate().skip(1) {
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
for (index, child) in children.iter_mut().enumerate().skip(1) {
let summary = std::mem::take(&mut child.internal_summary);
builder.add_node(index, child.inputs, child.outputs, summary);
}

for (source, target) in self.edge_stash {
Expand Down Expand Up @@ -618,7 +619,7 @@ struct PerOperatorState<T: Timestamp> {

shared_progress: Rc<RefCell<SharedProgress<T>>>,

internal_summary: Connectivity<T::Summary>, // cached result from initialize.
internal_summary: Connectivity<T::Summary>, // from initialize; moved into the reachability builder.

logging: Option<Logger>,
}
Expand Down Expand Up @@ -762,14 +763,18 @@ impl<T: Timestamp> PerOperatorState<T> {
for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
let source = Location::new_source(self.index, output);
for (time, delta) in internal.drain() {
pointstamps.update((source, time.clone()), delta);
pointstamps.update((source, time), delta);
}
}
for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
for (time, delta) in produced.drain() {
for target in &self.edges[output] {
pointstamps.update((Location::from(*target), time.clone()), delta);
temp_active.push(Reverse(target.node));
if let Some((last, rest)) = self.edges[output].split_last() {
for target in rest {
pointstamps.update((Location::from(*target), time.clone()), delta);
temp_active.push(Reverse(target.node));
}
pointstamps.update((Location::from(*last), time), delta);
temp_active.push(Reverse(last.node));
}
}
}
Expand Down
Loading