diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index f4711761e..e2cf53043 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -166,7 +166,8 @@ impl Capability { // to send data and request notification at the associated timestamp. impl Drop for Capability { fn drop(&mut self) { - self.internal.borrow_mut().update(self.time.clone(), -1); + let time = ::std::mem::replace(&mut self.time, T::minimum()); + self.internal.borrow_mut().update(time, -1); } } diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index 1832ddbc5..1a484581c 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -133,7 +133,7 @@ impl Capture for Stream<'_, T, C> { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(Event::Progress(to_send.into_inner().to_vec())); + event_pusher.push(Event::Progress(to_send.into_inner().into_vec())); } // turn each received message into an event. diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 50ebee4aa..82420ea72 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -230,7 +230,7 @@ where fn outputs(&self) -> usize { self.shape.outputs } // announce internal topology as fully connected, and hold all default capabilities. - fn initialize(self: Box) -> (Connectivity, Rc>>, Box) { + fn initialize(mut self: Box) -> (Connectivity, Rc>>, Box) { // Request the operator to be scheduled at least once. self.activations.borrow_mut().activate(&self.address[..]); @@ -242,7 +242,8 @@ where .iter_mut() .for_each(|output| output.update(T::minimum(), self.shape.peers as i64)); - (self.summary.clone(), Rc::clone(&self.shared_progress), self) + // The summary is not read again; move it out rather than clone it. + (::std::mem::take(&mut self.summary), Rc::clone(&self.shared_progress), self) } fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 1364a9241..681609814 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -317,14 +317,19 @@ impl FrontierNotificator { } self.pending.retain(|x| x.1 > 0); - for i in 0 .. self.pending.len() { - if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) { - // TODO : This clones a capability, whereas we could move it instead. - self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1)); - self.pending[i].1 = 0; + // Move available capabilities rather than cloning them, which would + // tour a spurious increment and decrement through progress tracking. + // `available` is a heap, so the order disruption is harmless. + let mut index = 0; + while index < self.pending.len() { + if frontiers.iter().all(|f| !f.less_equal(&self.pending[index].0)) { + let (capability, count) = self.pending.swap_remove(index); + self.available.push(OrderReversed::new(capability, count)); + } + else { + index += 1; } } - self.pending.retain(|x| x.1 > 0); } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 918ee70f4..2c77f223f 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -143,8 +143,10 @@ impl Progcaster { }); }); - // We clone rather than drain to avoid deserialization. - changes.extend(recv_changes.iter().map(|(u,d)| (u.clone(), *d))); + // The puller yields an owned, already deserialized message, which we + // may drain. A zero-copy receive path would hand us a borrowed view + // instead, and this would return to cloning. + changes.extend(recv_changes.drain()); } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index d5d8ef81d..735f8d122 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -167,8 +167,9 @@ where // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); - for (index, child) in children.iter().enumerate().skip(1) { - builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); + for (index, child) in children.iter_mut().enumerate().skip(1) { + let summary = std::mem::take(&mut child.internal_summary); + builder.add_node(index, child.inputs, child.outputs, summary); } for (source, target) in self.edge_stash { @@ -618,7 +619,7 @@ struct PerOperatorState { shared_progress: Rc>>, - internal_summary: Connectivity, // cached result from initialize. + internal_summary: Connectivity, // from initialize; moved into the reachability builder. logging: Option, } @@ -762,14 +763,18 @@ impl PerOperatorState { for (output, internal) in shared_progress.internals.iter_mut().enumerate() { let source = Location::new_source(self.index, output); for (time, delta) in internal.drain() { - pointstamps.update((source, time.clone()), delta); + pointstamps.update((source, time), delta); } } for (output, produced) in shared_progress.produceds.iter_mut().enumerate() { for (time, delta) in produced.drain() { - for target in &self.edges[output] { - pointstamps.update((Location::from(*target), time.clone()), delta); - temp_active.push(Reverse(target.node)); + if let Some((last, rest)) = self.edges[output].split_last() { + for target in rest { + pointstamps.update((Location::from(*target), time.clone()), delta); + temp_active.push(Reverse(target.node)); + } + pointstamps.update((Location::from(*last), time), delta); + temp_active.push(Reverse(last.node)); } } }