From 08dc659074f2840f1d81aad1a96ff6c771fafffc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 12 Jun 2026 14:23:27 -0400 Subject: [PATCH] Add a consuming iterator for PortConnectivity; move summaries rather than clone Tracker::allocate_from now moves builder and computed summaries into its columnar form rather than cloning every element, and Subgraph::initialize takes self.scope_summary (read only here) and moves its elements through TInner::summarize, releasing the compiled scope summary at initialization. Co-Authored-By: Claude Fable 5 --- timely/src/progress/operate.rs | 7 +++++++ timely/src/progress/reachability.rs | 24 ++++++++++++------------ timely/src/progress/subgraph.rs | 6 +++--- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index c9453b49a..dcd377d7e 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -153,6 +153,13 @@ impl Default for PortConnectivity { } } +impl IntoIterator for PortConnectivity { + type Item = (usize, Antichain); + type IntoIter = std::vec::IntoIter<(usize, Antichain)>; + /// Consumes the connectivity, yielding each port and its antichain. + fn into_iter(self) -> Self::IntoIter { self.entries.into_iter() } +} + impl PortConnectivity { /// Borrowing iterator of port identifiers and antichains. pub fn iter_ports(&self) -> impl Iterator)> { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 8cea273ef..4ef1b0c32 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -579,10 +579,10 @@ impl Tracker { } // Build columnar nodes: Vecs>>. - let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| { - connectivity.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) + let nodes = build_nested_vecs(builder.nodes.into_iter().map(|connectivity| { + connectivity.into_iter().map(|port_conn| { + port_conn.into_iter().flat_map(|(port, antichain)| { + antichain.into_iter().map(move |s| (port, s)) }) }) })); @@ -593,17 +593,17 @@ impl Tracker { })); // Build columnar target and source summaries. - let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| { - ports.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) + let target_summaries = build_nested_vecs(target_sum.into_iter().map(|ports| { + ports.into_iter().map(|port_conn| { + port_conn.into_iter().flat_map(|(port, antichain)| { + antichain.into_iter().map(move |s| (port, s)) }) }) })); - let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| { - ports.iter().map(|port_conn| { - port_conn.iter_ports().flat_map(|(port, antichain)| { - antichain.elements().iter().map(move |s| (port, s.clone())) + let source_summaries = build_nested_vecs(source_sum.into_iter().map(|ports| { + ports.into_iter().map(|port_conn| { + port_conn.into_iter().flat_map(|(port, antichain)| { + antichain.into_iter().map(move |s| (port, s)) }) }) })); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index d5d8ef81d..5fbf127fb 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -566,9 +566,9 @@ where // with each element containing `self.outputs()` antichains regardless // of how long `self.scope_summary` is let mut internal_summary = vec![PortConnectivityBuilder::default(); self.inputs()]; - for (input_idx, input) in self.scope_summary.iter().enumerate() { - for (output_idx, output) in input.iter_ports() { - for outer in output.elements().iter().cloned().map(TInner::summarize) { + for (input_idx, input) in std::mem::take(&mut self.scope_summary).into_iter().enumerate() { + for (output_idx, output) in input { + for outer in output.into_iter().map(TInner::summarize) { internal_summary[input_idx].insert(output_idx, outer); } }