From 3eeba3be4fedd90937572b70a8dffa6c0640b6d0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 8 Apr 2026 14:38:08 -0400 Subject: [PATCH 01/13] Track timely's Child changes --- Cargo.toml | 4 +- differential-dataflow/examples/accumulate.rs | 3 +- .../examples/columnar/main.rs | 6 +-- differential-dataflow/examples/dynamic.rs | 5 +- differential-dataflow/examples/graspan.rs | 6 +-- .../examples/iterate_container.rs | 3 +- differential-dataflow/examples/monoid-bfs.rs | 5 +- differential-dataflow/examples/pagerank.rs | 5 +- .../src/algorithms/graphs/bijkstra.rs | 7 ++- .../src/algorithms/graphs/propagate.rs | 5 +- .../src/algorithms/graphs/scc.rs | 10 ++-- differential-dataflow/src/collection.rs | 46 +++++++++++-------- differential-dataflow/src/input.rs | 22 ++++----- .../src/operators/arrange/arrangement.rs | 23 ++++++---- .../src/operators/iterate.rs | 21 +++++---- dogsdogsdogs/examples/delta_query.rs | 5 +- dogsdogsdogs/examples/delta_query_wcoj.rs | 3 +- dogsdogsdogs/src/calculus.rs | 12 ++--- dogsdogsdogs/src/operators/half_join.rs | 6 +-- experiments/src/bin/deals-interactive.rs | 2 +- experiments/src/bin/deals.rs | 12 +++-- experiments/src/bin/graphs-interactive-alt.rs | 7 +-- .../src/bin/graphs-interactive-neu-zwei.rs | 2 +- experiments/src/bin/graphs-interactive-neu.rs | 7 +-- experiments/src/bin/graphs-interactive.rs | 7 +-- experiments/src/bin/graphs-static.rs | 13 ++++-- experiments/src/bin/graspan1.rs | 5 +- experiments/src/bin/graspan2.rs | 6 ++- interactive/src/plan/sfw.rs | 3 +- mdbook/src/chapter_2/chapter_2_7.md | 11 +++-- server/src/lib.rs | 3 +- 31 files changed, 154 insertions(+), 121 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d9d402c79..895c0407e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,9 @@ rust-version = "1.86" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" } -timely = { version = "0.28", default-features = false } +#timely = { version = "0.28", default-features = false } columnar = { version = "0.12", default-features = false } -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } #timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] diff --git a/differential-dataflow/examples/accumulate.rs b/differential-dataflow/examples/accumulate.rs index 0fa792926..b04fe52b1 100644 --- a/differential-dataflow/examples/accumulate.rs +++ b/differential-dataflow/examples/accumulate.rs @@ -18,10 +18,11 @@ fn main() { let (input, data) = scope.new_collection::<_, isize>(); use timely::dataflow::Scope; + let outer = scope.clone(); scope.iterative::(|inner| { data.enter_at(inner, |_| 0) .consolidate() - .leave() + .leave(&outer) }); input diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 85b573157..66db2fc4e 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -115,9 +115,9 @@ mod reachability { roots: Collection>, ) -> Collection> { - let mut scope = edges.inner.scope(); + let outer = edges.inner.scope(); - scope.iterative::(|nested| { + outer.iterative::(|nested| { let summary = Product::new(Time::default(), 1); let roots_inner = roots.enter(nested); @@ -186,7 +186,7 @@ mod reachability { variable.set(result_col.clone()); // Leave the iterative scope. - result_col.leave() + result_col.leave(&outer) }) } } diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index a844c28bb..35dbac8ba 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -103,7 +103,8 @@ where let nodes = roots.map(|x| (x, 0)); // repeatedly update minimal distances each node can be reached from each root - nodes.scope().iterative::, _, _>(|inner| { + let outer = nodes.scope(); + outer.iterative::, _, _>(|inner| { // These enter the statically bound scope, rather than any iterative scopes. // We do not *need* to enter them into the dynamic scope, as they are static @@ -126,7 +127,7 @@ where // Leave the dynamic iteration, stripping off the last timestamp coordinate. next.leave_dynamic(1) .inspect(|x| println!("{:?}", x)) - .leave() + .leave(&outer) }) } diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 79c7fd184..41d82e195 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -5,7 +5,6 @@ use std::fs::File; use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::Scope; -use timely::dataflow::scopes::ScopeParent; use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; @@ -69,7 +68,7 @@ use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; type TraceKeyHandle = TraceAgent>; type TraceValHandle = TraceAgent>; -type Arrange = Arranged::Timestamp, R>>; +type Arrange = Arranged::Timestamp, R>>; /// An evolving set of edges. /// @@ -162,6 +161,7 @@ impl Query { } // We need a subscope to allow iterative development of variables. + let outer = scope.clone(); scope.iterative::(|subscope| { // create map from relation name to input handle and collection. @@ -171,7 +171,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.collection.clone().leave().arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave(&outer).arrange_by_self().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index f7ef3764e..0a45fd7ab 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -53,6 +53,7 @@ fn main() { let numbers = scope.new_collection_from(1 .. 10u32).1; let numbers: Collection<_, _> = wrap(numbers.inner).as_collection(); + let outer = scope.clone(); scope.iterative::(|nested| { let summary = Product::new(Default::default(), 1); let (variable, collection) = Variable::new_from(numbers.enter(nested), summary); @@ -77,7 +78,7 @@ fn main() { }).as_collection().consolidate(); let result = wrap(result.inner).as_collection(); variable.set(result); - collection.leave() + collection.leave(&outer) }); }) } diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index cce06674e..00ce5f998 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -127,7 +127,8 @@ where G: Scope, { // repeatedly update minimal distances each node can be reached from each root - roots.scope().iterative::(|scope| { + let outer = roots.scope(); + outer.iterative::(|scope| { use differential_dataflow::operators::iterate::Variable; use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder}; @@ -152,6 +153,6 @@ where .as_collection(|k,()| *k); variable.set(result.clone()); - result.leave() + result.leave(&outer) }) } diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index d779ed36e..caf0e5822 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -92,7 +92,8 @@ where .map(|(src,_dst)| src) .count(); - edges.scope().iterative::(|inner| { + let outer = edges.scope(); + outer.iterative::(|inner| { // Bring various collections into the scope. let edges = edges.enter(inner); @@ -130,6 +131,6 @@ where // Bind the recursive variable, return its limit. ranks_bind.set(pushed.clone()); - pushed.leave() + pushed.leave(&outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 9c61716ef..bfc5d7606 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -43,9 +43,8 @@ where N: ExchangeData+Hash, Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, { - forward - .stream - .scope().iterative::(|inner| { + let outer = forward.stream.scope(); + outer.iterative::(|inner| { let forward_edges = forward.enter(inner); let reverse_edges = reverse.enter(inner); @@ -120,6 +119,6 @@ where reverse_bind.set(reverse_next); - reached.leave() + reached.leave(&outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 2728de6e2..07692e3bb 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -78,7 +78,8 @@ where // }) use timely::order::Product; - nodes.scope().scoped::,_,_>("Propagate", |scope| { + let outer = nodes.scope(); + outer.scoped::,_,_>("Propagate", |scope| { use crate::operators::iterate::Variable; use crate::trace::implementations::{ValBuilder, ValSpine}; @@ -104,6 +105,6 @@ where labels .as_collection(|k,v| (k.clone(), v.clone())) - .leave() + .leave(&outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index b730302e7..bd1eff850 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -21,7 +21,8 @@ where R: From { use timely::order::Product; - graph.scope().scoped::,_,_>("StronglyConnected", |scope| { + let outer = graph.scope(); + outer.scoped::,_,_>("StronglyConnected", |scope| { // Bring in edges and transposed edges. let edges = graph.enter(&scope); let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); @@ -31,7 +32,7 @@ where let result = trim_edges(trim_edges(inner, edges), trans); variable.set(result.clone()); - result.leave() + result.leave(&outer) }) } @@ -44,7 +45,8 @@ where R: Multiply, R: From { - edges.inner.scope().region_named("TrimEdges", |region| { + let outer = edges.inner.scope(); + outer.region_named("TrimEdges", |region| { let cycle = cycle.enter_region(region); let edges = edges.enter_region(region); @@ -62,6 +64,6 @@ where .join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))]) .filter(|(_,(l1,l2))| l1 == l2) .map(|((x1,x2),_)| (x2,x1)) - .leave_region() + .leave_region(&outer) }) } diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 6981cc3da..1201474de 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -109,7 +109,7 @@ impl Collection { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, C> { + pub fn enter_region<'a>(self, child: &Child<'a, G::Allocator, ::Timestamp>) -> Collection::Timestamp>, C> { self.inner .enter(child) .as_collection() @@ -206,19 +206,20 @@ impl Collection { /// /// let data = scope.new_collection_from(1 .. 10).1; /// + /// let outer = scope.clone(); /// let result = scope.region(|child| { /// data.clone() /// .enter(child) - /// .leave() + /// .leave(&outer) /// }); /// /// data.assert_eq(result); /// }); /// ``` - pub fn enter<'a, T>(self, child: &Child<'a, G, T>) -> Collection, ::Timestamp, T>>::InnerContainer> + pub fn enter<'a, T>(self, child: &Child<'a, G::Allocator, T>) -> Collection, ::Timestamp, T>>::InnerContainer> where - C: containers::Enter<::Timestamp, T, InnerContainer: Container>, - T: Refines<::Timestamp>, + C: containers::Enter<::Timestamp, T, InnerContainer: Container>, + T: Refines<::Timestamp>, { use timely::dataflow::channels::pact::Pipeline; self.inner @@ -262,14 +263,10 @@ impl Collection { } } -use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; /// Methods requiring a nested scope. -impl<'a, G: Scope, T: Timestamp, C: Container> Collection, C> -where - C: containers::Leave, - T: Refines<::Timestamp>, +impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container> Collection, C> { /// Returns the final value of a Collection from a nested scope to its containing scope. /// @@ -286,16 +283,21 @@ where /// let result = scope.region(|child| { /// data.clone() /// .enter(child) - /// .leave() + /// .leave(scope) /// }); /// /// data.assert_eq(result); /// }); /// ``` - pub fn leave(self) -> Collection>::OuterContainer> { + pub fn leave(self, outer: &G) -> Collection>::OuterContainer> + where + G: Scope, + T: Refines, + C: containers::Leave, + { use timely::dataflow::channels::pact::Pipeline; self.inner - .leave() + .leave(outer) .unary(Pipeline, "Leave", move |_,_| move |input, output| { input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave())); }) @@ -304,15 +306,18 @@ where } /// Methods requiring a region as the scope. -impl Collection, C> +impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container+Clone+'static> Collection, C> { /// Returns the value of a Collection from a nested region to its containing scope. /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region(self) -> Collection { + pub fn leave_region(self, outer: &G) -> Collection + where + G: Scope, + { self.inner - .leave() + .leave(outer) .as_collection() } } @@ -326,7 +331,7 @@ pub mod vec { use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::scopes::child::Iterative; - use timely::dataflow::{Scope, ScopeParent}; + use timely::dataflow::Scope; use timely::dataflow::operators::*; use timely::dataflow::operators::vec::*; @@ -352,7 +357,7 @@ pub mod vec { /// defaults to) `isize`, representing changes to the occurrence count of each record. /// /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`. - pub type Collection = super::Collection::Timestamp, R)>>; + pub type Collection = super::Collection::Timestamp, R)>>; impl Collection { @@ -534,16 +539,17 @@ pub mod vec { /// /// let data = scope.new_collection_from(1 .. 10).1; /// + /// let outer = scope.clone(); /// let result = scope.iterative::(|child| { /// data.clone() /// .enter_at(child, |x| *x) - /// .leave() + /// .leave(&outer) /// }); /// /// data.assert_eq(result); /// }); /// ``` - pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G::Allocator, G::Timestamp, T>, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 43a4987e8..2d669fb4f 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -9,7 +9,7 @@ use timely::progress::Timestamp; use timely::dataflow::operators::vec::Input as TimelyInput; use timely::dataflow::operators::vec::input::Handle; -use timely::dataflow::scopes::ScopeParent; +use timely::dataflow::Scope; use crate::Data; use crate::difference::Semigroup; @@ -41,7 +41,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) + fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,28 +93,28 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) - where I: IntoIterator::Timestamp,R)>+'static, D: Data, R: Semigroup+'static; + fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) + where I: IntoIterator::Timestamp,R)>+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; -impl Input for G where ::Timestamp: Lattice { - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) +impl Input for G where ::Timestamp: Lattice { + fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) where I: IntoIterator+'static, I::Item: Data { self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) where D: Data, R: Semigroup+'static, - I: IntoIterator::Timestamp,R)>+'static, + I: IntoIterator::Timestamp,R)>+'static, { use timely::dataflow::operators::ToStream; @@ -200,7 +200,7 @@ impl InputSession { /// Introduces a handle as collection. pub fn to_collection(&mut self, scope: &mut G) -> VecCollection where - G: ScopeParent, + G: Scope, { scope .input_from(&mut self.handle) diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index cd1bd0894..cd0d00dfc 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -85,8 +85,8 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>) - -> Arranged, TraceEnter> + pub fn enter<'a, TInner>(self, child: &Child<'a, G::Allocator, TInner>) + -> Arranged, TraceEnter> where TInner: Refines+Lattice+Timestamp+Clone, { @@ -100,8 +100,8 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>) - -> Arranged, Tr> { + pub fn enter_region<'a>(self, child: &Child<'a, G::Allocator, G::Timestamp>) + -> Arranged, Tr> { Arranged { stream: self.stream.enter(child), trace: self.trace, @@ -113,8 +113,8 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P) - -> Arranged, TraceEnterAt> + pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G::Allocator, TInner>, logic: F, prior: P) + -> Arranged, TraceEnterAt> where TInner: Refines+Lattice+Timestamp+Clone+'static, F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, @@ -315,19 +315,22 @@ where } -impl<'a, G, Tr> Arranged, Tr> +impl<'a, A, Tr> Arranged, Tr> where - G: Scope, + A: timely::communication::Allocate, Tr: TraceReader + Clone, { /// Brings an arranged collection out of a nested region. /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn leave_region(self) -> Arranged { + pub fn leave_region(self, outer: &G) -> Arranged + where + G: Scope, + { use timely::dataflow::operators::Leave; Arranged { - stream: self.stream.leave(), + stream: self.stream.leave(outer), trace: self.trace, } } diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index e5486bb6e..3dd3412b2 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -75,15 +75,16 @@ pub trait Iterate, D: Data, R: Semigroup> { /// ``` fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection, D, R>)->VecCollection, D, R>; + for<'a> F: FnOnce(Iterative<'a, G::Allocator, G::Timestamp, u64>, VecCollection, D, R>)->VecCollection, D, R>; } impl, D: Ord+Data+Debug, R: Abelian+'static> Iterate for VecCollection { fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection, D, R>)->VecCollection, D, R>, + for<'a> F: FnOnce(Iterative<'a, G::Allocator, G::Timestamp, u64>, VecCollection, D, R>)->VecCollection, D, R>, { - self.inner.scope().scoped("Iterate", |subgraph| { + let outer = self.inner.scope(); + outer.scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. // // this could be much more succinct if we returned the collection @@ -93,16 +94,17 @@ impl, D: Ord+Data+Debug, R: Abelian+'static> Iterat let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1)); let result = logic(subgraph.clone(), collection); variable.set(result.clone()); - result.leave() + result.leave(&outer) }) } } impl, D: Ord+Data+Debug, R: Semigroup+'static> Iterate for G { - fn iterate(mut self, logic: F) -> VecCollection + fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection, D, R>)->VecCollection, D, R>, + for<'a> F: FnOnce(Iterative<'a, G::Allocator, G::Timestamp, u64>, VecCollection, D, R>)->VecCollection, D, R>, { + let outer = self.clone(); self.scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. // @@ -113,7 +115,7 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1)); let result = logic(subgraph.clone(), collection); variable.set(result.clone()); - result.leave() + result.leave(&outer) } ) } @@ -140,13 +142,14 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter /// /// let numbers = scope.new_collection_from(1 .. 10u32).1; /// +/// let outer = scope.clone(); /// scope.iterative::(|nested| { /// let summary = Product::new(Default::default(), 1); /// let (variable, collection) = Variable::new_from(numbers.enter(nested), summary); /// let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x }) /// .consolidate(); /// variable.set(result.clone()); -/// result.leave() +/// result.leave(&outer) /// }); /// }) /// ``` @@ -198,7 +201,7 @@ where } /// A `Variable` specialized to a vector container of update triples (data, time, diff). -pub type VecVariable = Variable::Timestamp, R)>>; +pub type VecVariable = Variable::Timestamp, R)>>; impl Variable where diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index af9eb5bfd..8c46a090b 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -43,6 +43,7 @@ fn main() { // let reverse_count = edges.map(|(x,y)| y).arrange_by_self(); // Q(a,b,c) := E1(a,b), E2(b,c), E3(a,c) + let outer = scope.clone(); let (triangles_prev, triangles_next) = scope.scoped::,_,_>("DeltaQuery (Triangles)", |inner| { // Grab the stream of changes. @@ -90,7 +91,7 @@ fn main() { let changes3 = validate(changes3, reverse_self_alt.clone(), key2.clone()); let changes3 = changes3.map(|((a,c),b)| (a,b,c)); - let prev_changes = changes1.concat(changes2).concat(changes3).leave(); + let prev_changes = changes1.concat(changes2).concat(changes3).leave(&outer); // New ideas let d_edges = edges.differentiate(inner); @@ -116,7 +117,7 @@ fn main() { .join_core(forward_key_alt, |a,c,b| Some(((*c, *b), *a))) .join_core(reverse_self_alt, |(c,b), a, &()| Some((*a,*b,*c))); - let next_changes = changes1.concat(changes2).concat(changes3).integrate(); + let next_changes = changes1.concat(changes2).concat(changes3).integrate(&outer); (prev_changes, next_changes) }); diff --git a/dogsdogsdogs/examples/delta_query_wcoj.rs b/dogsdogsdogs/examples/delta_query_wcoj.rs index afceef1ee..ac8ae3e6c 100644 --- a/dogsdogsdogs/examples/delta_query_wcoj.rs +++ b/dogsdogsdogs/examples/delta_query_wcoj.rs @@ -31,6 +31,7 @@ fn main() { let reverse = edges.map(|(x,y)| (y,x)); // Q(a,b,c) := E1(a,b), E2(b,c), E3(a,c) + let outer = scope.clone(); let triangles = scope.scoped::,_,_>("DeltaQuery (Triangles)", |inner| { // Each relation we'll need. @@ -80,7 +81,7 @@ fn main() { ]) .map(|((a,c),b)| (a,b,c)); - changes1.concat(changes2).concat(changes3).leave() + changes1.concat(changes2).concat(changes3).leave(&outer) }); triangles diff --git a/dogsdogsdogs/src/calculus.rs b/dogsdogsdogs/src/calculus.rs index 94b443231..c6f057dcb 100644 --- a/dogsdogsdogs/src/calculus.rs +++ b/dogsdogsdogs/src/calculus.rs @@ -22,12 +22,12 @@ use crate::altneu::AltNeu; /// Produce a collection containing the changes at the moments they happen. pub trait Differentiate { - fn differentiate<'a>(self, child: &Child<'a, G, AltNeu>) -> VecCollection>, D, R>; + fn differentiate<'a>(self, child: &Child<'a, G::Allocator, AltNeu>) -> VecCollection>, D, R>; } /// Collect instantaneous changes back in to a collection. pub trait Integrate { - fn integrate(self) -> VecCollection; + fn integrate(self, outer: &G) -> VecCollection; } impl Differentiate for VecCollection @@ -37,7 +37,7 @@ where R: Abelian + 'static, { // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff). - fn differentiate<'a>(self, child: &Child<'a, G, AltNeu>) -> VecCollection>, D, R> { + fn differentiate<'a>(self, child: &Child<'a, G::Allocator, AltNeu>) -> VecCollection>, D, R> { self.enter(child) .inner .flat_map(|(data, time, diff)| { @@ -51,17 +51,17 @@ where } } -impl<'a, G, D, R> Integrate for VecCollection>, D, R> +impl<'a, G, D, R> Integrate for VecCollection>, D, R> where G: Scope, D: Data, R: Abelian + 'static, { // We discard each `neu` variant and strip off the `alt` wrapper. - fn integrate(self) -> VecCollection { + fn integrate(self, outer: &G) -> VecCollection { self.inner .filter(|(_d,t,_r)| !t.neu) .as_collection() - .leave() + .leave(outer) } } diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 2a18a9aa1..16aa59876 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -37,7 +37,7 @@ use std::time::Instant; use timely::ContainerBuilder; use timely::container::CapacityContainerBuilder; -use timely::dataflow::{Scope, ScopeParent, Stream}; +use timely::dataflow::{Scope, Stream}; use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::{Capability, Operator, generic::Session}; use timely::progress::Antichain; @@ -109,9 +109,9 @@ where /// This is a shorthand primarily for the reson of readability. type SessionFor<'a, 'b, G, CB> = Session<'a, 'b, - ::Timestamp, + ::Timestamp, CB, - Capability<::Timestamp>, + Capability<::Timestamp>, >; /// An unsafe variant of `half_join` where the `output_func` closure takes diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 3ea53cd14..83c7ae960 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -13,7 +13,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; type Node = u32; diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 782941e15..f67e34146 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -13,7 +13,7 @@ use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::Variable; use differential_dataflow::difference::Present; -type EdgeArranged = Arranged::Timestamp, R>>>; +type EdgeArranged = Arranged::Timestamp, R>>>; type Node = u32; type Edge = (Node, Node); @@ -86,7 +86,8 @@ use timely::order::Product; fn tc>(edges: EdgeArranged) -> VecCollection { // repeatedly update minimal distances each node can be reached from each root - edges.stream.scope().iterative::(|scope| { + let outer = edges.stream.scope(); + outer.iterative::(|scope| { let (inner, inner_collection) = Variable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(scope); @@ -102,7 +103,7 @@ fn tc>(edges: EdgeArranged) -> Ve ; inner.set(result.clone()); - result.leave() + result.leave(&outer) } ) } @@ -113,7 +114,8 @@ fn sg>(edges: EdgeArranged) -> Ve let peers = edges.clone().join_core(edges.clone(), |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y); // repeatedly update minimal distances each node can be reached from each root - peers.scope().iterative::(|scope| { + let outer = peers.scope(); + outer.iterative::(|scope| { let (inner, inner_collection) = Variable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(scope); @@ -131,7 +133,7 @@ fn sg>(edges: EdgeArranged) -> Ve ; inner.set(result.clone()); - result.leave() + result.leave(&outer) } ) } diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 6e5b0ffaa..d7533d4a0 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -258,7 +258,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( @@ -294,7 +294,8 @@ fn _bidijkstra( goals: VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { - goals.scope().iterative::(|inner| { + let outer = goals.scope(); + outer.iterative::(|inner| { // Our plan is to start evolving distances from both sources and destinations. // The evolution from a source or destination should continue as long as there @@ -356,7 +357,7 @@ where G::Timestamp: Lattice+Ord { reverse.set(reverse_next); - reached.leave() + reached.leave(&outer) }) } diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 35a118cd9..62675f93f 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -226,7 +226,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index 7e27e7ed8..dcc00b0b9 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -291,7 +291,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( @@ -327,7 +327,8 @@ fn _bidijkstra( goals: VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { - goals.scope().iterative::(|inner| { + let outer = goals.scope(); + outer.iterative::(|inner| { // Our plan is to start evolving distances from both sources and destinations. // The evolution from a source or destination should continue as long as there @@ -389,6 +390,6 @@ where G::Timestamp: Lattice+Ord { reverse.set(reverse_next); - reached.leave() + reached.leave(&outer) }) } diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 2ec74e93f..54fc27309 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -194,7 +194,7 @@ use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. @@ -231,7 +231,8 @@ fn _bidijkstra( goals: VecCollection) -> VecCollection where G::Timestamp: Lattice+Ord { - goals.scope().iterative::(|inner| { + let outer = goals.scope(); + outer.iterative::(|inner| { // Our plan is to start evolving distances from both sources and destinations. // The evolution from a source or destination should continue as long as there @@ -293,6 +294,6 @@ where G::Timestamp: Lattice+Ord { reverse.set(reverse_next); - reached.leave() + reached.leave(&outer) }) } diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 16cbbd6dd..7c382d645 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -113,7 +113,8 @@ fn reach> ( let graph = graph.import(&roots.scope()); - roots.scope().iterative::(|scope| { + let outer = roots.scope(); + outer.iterative::(|scope| { let graph = graph.enter(scope); let roots = roots.enter(scope); @@ -126,7 +127,7 @@ fn reach> ( .threshold_total(|_,_| 1); inner.set(result.clone()); - result.leave() + result.leave(&outer) }) } @@ -139,7 +140,8 @@ fn bfs> ( let graph = graph.import(&roots.scope()); let roots = roots.map(|r| (r,0)); - roots.scope().iterative::(|scope| { + let outer = roots.scope(); + outer.iterative::(|scope| { let graph = graph.enter(scope); let roots = roots.enter(scope); @@ -151,7 +153,7 @@ fn bfs> ( .reduce(|_key, input, output| output.push((*input[0].0,1))); inner.set(result.clone()); - result.leave() + result.leave(&outer) }) } @@ -169,6 +171,7 @@ fn connected_components>( let nodes_r = reverse.clone().flat_map_ref(|k,v| if k < v { Some(*k) } else { None }); let nodes = nodes_f.concat(nodes_r).consolidate().map(|x| (x,x)); + let outer = scope.clone(); scope.iterative(|scope| { // import arrangements, nodes. @@ -197,6 +200,6 @@ fn connected_components>( .reduce(|_, s, t| { t.push((*s[0].0, 1)); }); inner.set(result.clone()); - result.leave() + result.leave(&outer) }) } diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index 507f5c46b..82fbe3ba2 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -35,8 +35,9 @@ fn main() { // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) + let outer = nodes.scope(); let reached = - nodes.scope().iterative::(|inner| { + outer.iterative::(|inner| { let nodes = nodes.enter(inner).map(|(a,b)| (b,a)); let edges = edges.enter(inner); @@ -51,7 +52,7 @@ fn main() { .threshold_semigroup(|_,_,x: Option<&Present>| if x.is_none() { Some(Present) } else { None }); labels.set(next.clone()); - next.leave() + next.leave(&outer) }); reached diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 4ce8574a9..64d44dbe0 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -49,6 +49,7 @@ fn unoptimized() { let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let outer = scope.clone(); let (value_flow, memory_alias, value_alias) = scope .iterative::(|scope| { @@ -108,7 +109,7 @@ fn unoptimized() { value_flow.set(value_flow_next.clone()); memory_alias.set(memory_alias_next.clone()); - (value_flow_next.leave(), memory_alias_next.leave(), value_alias_next.leave()) + (value_flow_next.leave(&outer), memory_alias_next.leave(&outer), value_alias_next.leave(&outer)) }); value_flow.map(|_| ()).consolidate().inspect(|x| println!("VF: {:?}", x)); @@ -174,6 +175,7 @@ fn optimized() { let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let outer = scope.clone(); let (value_flow, memory_alias) = scope .iterative::(|scope| { @@ -233,7 +235,7 @@ fn optimized() { value_flow.set(value_flow_next.clone()); memory_alias.set(memory_alias_next.clone()); - (value_flow_next.leave(), memory_alias_next.leave()) + (value_flow_next.leave(&outer), memory_alias_next.leave(&outer)) }); value_flow.map(|_| ()).consolidate().inspect(|x| println!("VF: {:?}", x)); diff --git a/interactive/src/plan/sfw.rs b/interactive/src/plan/sfw.rs index 6a5c82fc8..c4a8a44a0 100644 --- a/interactive/src/plan/sfw.rs +++ b/interactive/src/plan/sfw.rs @@ -231,6 +231,7 @@ impl Render for MultiwayJoin { use differential_dogs3::altneu::AltNeu; let scope_name = format!("DeltaRule: {}/{}", index, self.sources.len()); + let outer = scope.clone(); let changes = scope.clone().scoped::,_,_>(&scope_name, |inner| { // This should default to an `AltNeu::Alt` timestamp. @@ -288,7 +289,7 @@ impl Render for MultiwayJoin { changes .map(move |tuple| extract_map.iter().map(|&i| tuple[i].clone()).collect::>()) - .leave() + .leave(&outer) }); accumulated_changes.push(changes); diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index def1c4dcd..4692fc007 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -90,19 +90,20 @@ As an example, the implementation of the `iterate` operator looks something like # use differential_dataflow::VecCollection; # use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; -# fn logic<'a, G: Scope>(collection: VecCollection, (u64, u64), isize>) -> VecCollection, (u64, u64)> +# fn logic<'a, G: Scope>(collection: VecCollection, (u64, u64), isize>) -> VecCollection, (u64, u64)> # where G::Timestamp: Lattice # { # collection # } -# fn example<'a, G: Scope>(collection: VecCollection) //, logic: impl Fn(&VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)>) +# fn example<'a, G: Scope>(collection: VecCollection) //, logic: impl Fn(&VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)>) # where G::Timestamp: Lattice # { - collection.scope().scoped("inner", |subgraph| { + let outer = collection.scope(); + outer.scoped::("inner", |subgraph| { let (variable, collection) = VecVariable::new_from(collection.enter(subgraph), 1); - let result = logic(collection); + let result = logic::(collection); variable.set(result.clone()); - result.leave() + result.leave(&outer) }); # } ``` diff --git a/server/src/lib.rs b/server/src/lib.rs index 745de0bb9..d13579876 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -6,7 +6,6 @@ use std::time::Instant; use libloading::Library; use timely::communication::Allocator; -use timely::worker::Worker; use timely::dataflow::scopes::Child; use timely::dataflow::operators::probe::Handle as ProbeHandle; @@ -21,7 +20,7 @@ pub type TraceHandle = TraceAgent; /// Arguments provided to each shared library to help build their dataflows and register their results. pub type Environment<'a, 'b> = ( - &'a mut Child<'b, Worker,usize>, + &'a mut Child<'b, Allocator, usize>, &'a mut TraceHandler, &'a mut ProbeHandle, &'a Instant, From da1698bc7ef100e583787d8e6e5f982007b94e31 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 8 Apr 2026 23:27:02 -0400 Subject: [PATCH 02/13] Tidy bounds --- differential-dataflow/src/collection.rs | 8 ++++---- differential-dataflow/src/input.rs | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 1201474de..e5016ccd4 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -109,7 +109,7 @@ impl Collection { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(self, child: &Child<'a, G::Allocator, ::Timestamp>) -> Collection::Timestamp>, C> { + pub fn enter_region<'a>(self, child: &Child<'a, G::Allocator, G::Timestamp>) -> Collection, C> { self.inner .enter(child) .as_collection() @@ -216,10 +216,10 @@ impl Collection { /// data.assert_eq(result); /// }); /// ``` - pub fn enter<'a, T>(self, child: &Child<'a, G::Allocator, T>) -> Collection, ::Timestamp, T>>::InnerContainer> + pub fn enter<'a, T>(self, child: &Child<'a, G::Allocator, T>) -> Collection, >::InnerContainer> where - C: containers::Enter<::Timestamp, T, InnerContainer: Container>, - T: Refines<::Timestamp>, + C: containers::Enter, + T: Refines, { use timely::dataflow::channels::pact::Pipeline; self.inner diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 2d669fb4f..024aeb10d 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -41,7 +41,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) + fn new_collection(&mut self) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,28 +93,28 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) - where I: IntoIterator::Timestamp,R)>+'static, D: Data, R: Semigroup+'static; + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) + where I: IntoIterator+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; -impl Input for G where ::Timestamp: Lattice { - fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, VecCollection) +impl Input for G where G::Timestamp: Lattice { + fn new_collection(&mut self) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) where I: IntoIterator+'static, I::Item: Data { self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession<::Timestamp, D, R>, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static, - I: IntoIterator::Timestamp,R)>+'static, + I: IntoIterator+'static, { use timely::dataflow::operators::ToStream; From 8e9ae63351f4752127937a7c4ed09807d7f5b487 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 9 Apr 2026 09:21:52 -0400 Subject: [PATCH 03/13] Absorb next wave of changes --- Cargo.toml | 4 +- diagnostics/src/logging.rs | 56 ++++--- differential-dataflow/examples/bfs.rs | 2 +- .../examples/columnar/columnar_support.rs | 23 ++- .../examples/columnar/main.rs | 10 +- differential-dataflow/examples/dynamic.rs | 2 +- differential-dataflow/examples/graspan.rs | 12 +- differential-dataflow/examples/interpreted.rs | 2 +- .../examples/iterate_container.rs | 2 +- differential-dataflow/examples/monoid-bfs.rs | 2 +- differential-dataflow/examples/pagerank.rs | 2 +- differential-dataflow/examples/progress.rs | 8 +- .../examples/stackoverflow.rs | 2 +- .../src/algorithms/graphs/bfs.rs | 8 +- .../src/algorithms/graphs/bijkstra.rs | 8 +- .../src/algorithms/graphs/propagate.rs | 10 +- .../src/algorithms/graphs/scc.rs | 6 +- .../src/algorithms/graphs/sequential.rs | 6 +- .../src/algorithms/identifiers.rs | 6 +- .../src/algorithms/prefix_sum.rs | 10 +- differential-dataflow/src/capture.rs | 16 +- differential-dataflow/src/collection.rs | 145 +++++++++--------- differential-dataflow/src/dynamic/mod.rs | 4 +- differential-dataflow/src/input.rs | 25 +-- differential-dataflow/src/logging.rs | 3 +- .../src/operators/arrange/agent.rs | 19 +-- .../src/operators/arrange/arrangement.rs | 77 +++++----- .../src/operators/arrange/upsert.rs | 21 +-- differential-dataflow/src/operators/count.rs | 14 +- .../src/operators/iterate.rs | 28 ++-- differential-dataflow/src/operators/join.rs | 16 +- differential-dataflow/src/operators/reduce.rs | 29 ++-- .../src/operators/threshold.rs | 16 +- differential-dataflow/tests/bfs.rs | 2 +- differential-dataflow/tests/scc.rs | 6 +- dogsdogsdogs/examples/ngo.rs | 4 +- dogsdogsdogs/src/calculus.rs | 20 +-- dogsdogsdogs/src/lib.rs | 16 +- dogsdogsdogs/src/operators/count.rs | 6 +- dogsdogsdogs/src/operators/half_join.rs | 41 ++--- dogsdogsdogs/src/operators/half_join2.rs | 37 +++-- dogsdogsdogs/src/operators/lookup_map.rs | 9 +- dogsdogsdogs/src/operators/propose.rs | 10 +- dogsdogsdogs/src/operators/validate.rs | 6 +- experiments/Cargo.toml | 2 +- experiments/src/bin/deals-interactive.rs | 7 +- experiments/src/bin/deals.rs | 7 +- experiments/src/bin/graphs-interactive-alt.rs | 16 +- .../src/bin/graphs-interactive-neu-zwei.rs | 11 +- experiments/src/bin/graphs-interactive-neu.rs | 11 +- experiments/src/bin/graphs-interactive.rs | 11 +- experiments/src/bin/graphs-static.rs | 18 +-- experiments/src/bin/graphs.rs | 16 +- interactive/src/command.rs | 3 +- interactive/src/logging.rs | 11 +- interactive/src/manager.rs | 3 +- interactive/src/plan/concat.rs | 6 +- interactive/src/plan/filter.rs | 8 +- interactive/src/plan/join.rs | 8 +- interactive/src/plan/map.rs | 8 +- interactive/src/plan/mod.rs | 17 +- interactive/src/plan/sfw.rs | 8 +- mdbook/src/chapter_2/chapter_2_1.md | 10 +- mdbook/src/chapter_2/chapter_2_2.md | 5 +- mdbook/src/chapter_2/chapter_2_3.md | 5 +- mdbook/src/chapter_2/chapter_2_4.md | 10 +- mdbook/src/chapter_2/chapter_2_5.md | 5 +- mdbook/src/chapter_2/chapter_2_6.md | 5 +- mdbook/src/chapter_2/chapter_2_7.md | 20 +-- server/Cargo.toml | 2 +- server/src/lib.rs | 7 +- 71 files changed, 483 insertions(+), 508 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 895c0407e..f1a2b61a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,8 @@ rust-version = "1.86" differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" } #timely = { version = "0.28", default-features = false } columnar = { version = "0.12", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } -#timely = { path = "../timely-dataflow/timely/", default-features = false } +#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] type_complexity = "allow" diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index b47721e62..f0fac8d2b 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -34,7 +34,6 @@ use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::{AsCollection, VecCollection}; -use timely::communication::Allocate; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::{Event, EventLink, Replay, Capture}; @@ -244,12 +243,11 @@ fn quantize(time: Duration, interval: Duration) -> Duration { } /// Quantize timestamps in a collection's inner stream. -fn quantize_collection( - collection: VecCollection, +fn quantize_collection( + collection: VecCollection, interval: Duration, -) -> VecCollection +) -> VecCollection where - S: Scope, D: differential_dataflow::Data, { collection @@ -276,7 +274,7 @@ where /// /// Returns a [`LoggingState`] with trace handles and a [`SinkHandle`] for /// the WebSocket thread. -pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState { +pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState { let start = Instant::now(); // Event links for logging capture (worker-internal, Rc-based). @@ -377,8 +375,8 @@ pub fn register(worker: &mut Worker, log_logging: bool) -> Loggi } } -fn install_loggers( - worker: &mut Worker, +fn install_loggers( + worker: &mut Worker, t_link: Rc>>, d_link: Rc>>, ) { @@ -402,11 +400,11 @@ fn install_loggers( // ============================================================================ /// Internal: collections before arrangement, used for the cross-join. -struct TimelyCollections { - operators: VecCollection), i64>, - channels: VecCollection, (usize, usize), (usize, usize)), i64>, - elapsed: VecCollection, - messages: VecCollection, +struct TimelyCollections { + operators: VecCollection), i64>, + channels: VecCollection, (usize, usize), (usize, usize)), i64>, + elapsed: VecCollection, + messages: VecCollection, } #[derive(Default)] @@ -416,10 +414,10 @@ struct TimelyDemuxState { } /// Build timely logging collections and arrangements. -fn construct_timely>( - scope: &mut S, - stream: Stream>, -) -> (TimelyTraces, TimelyCollections) { +fn construct_timely( + scope: &mut Scope, + stream: Stream>, +) -> (TimelyTraces, TimelyCollections) { type OpUpdate = ((usize, String, Vec), Duration, i64); type ChUpdate = ((usize, Vec, (usize, usize), (usize, usize)), Duration, i64); type ElUpdate = (usize, Duration, i64); @@ -536,21 +534,21 @@ fn construct_timely>( // ============================================================================ /// Internal: collections before arrangement, used for the cross-join. -struct DifferentialCollections { - arrangement_batches: VecCollection, - arrangement_records: VecCollection, - sharing: VecCollection, - batcher_records: VecCollection, - batcher_size: VecCollection, - batcher_capacity: VecCollection, - batcher_allocations: VecCollection, +struct DifferentialCollections { + arrangement_batches: VecCollection, + arrangement_records: VecCollection, + sharing: VecCollection, + batcher_records: VecCollection, + batcher_size: VecCollection, + batcher_capacity: VecCollection, + batcher_allocations: VecCollection, } /// Build differential logging collections and arrangements. -fn construct_differential>( - scope: &mut S, - stream: Stream>, -) -> (DifferentialTraces, DifferentialCollections) { +fn construct_differential( + scope: &mut Scope, + stream: Stream>, +) -> (DifferentialTraces, DifferentialCollections) { type Update = (usize, Duration, i64); let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone()); diff --git a/differential-dataflow/examples/bfs.rs b/differential-dataflow/examples/bfs.rs index af875d5d5..5c78c938b 100644 --- a/differential-dataflow/examples/bfs.rs +++ b/differential-dataflow/examples/bfs.rs @@ -93,7 +93,7 @@ fn main() { // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection where - G: Scope, + G: timely::progress::Timestamp + Lattice + Ord, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/examples/columnar/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs index 8b6e10543..825bd2a97 100644 --- a/differential-dataflow/examples/columnar/columnar_support.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -1839,13 +1839,12 @@ pub mod updates { /// joins output times with input times, multiplies output diffs with input diffs. /// /// This subsumes map, filter, negate, and enter_at for columnar collections. -pub fn join_function( - input: differential_dataflow::Collection>, +pub fn join_function( + input: differential_dataflow::Collection>, mut logic: L, -) -> differential_dataflow::Collection> +) -> differential_dataflow::Collection> where - G: timely::dataflow::Scope, - G::Timestamp: differential_dataflow::lattice::Lattice, + U::Time: differential_dataflow::lattice::Lattice, U: layout::ColumnarUpdate>, I: IntoIterator, L: FnMut( @@ -1893,12 +1892,11 @@ type DynTime = timely::order::Product( - input: differential_dataflow::Collection>, +pub fn leave_dynamic( + input: differential_dataflow::Collection>, level: usize, -) -> differential_dataflow::Collection> +) -> differential_dataflow::Collection> where - G: timely::dataflow::Scope, K: columnar::Columnar, V: columnar::Columnar, R: columnar::Columnar, @@ -1972,14 +1970,13 @@ where /// /// Cursors through each batch and pushes `(key, val, time, diff)` refs into /// a `ValColBuilder`, which sorts and consolidates on flush. -pub fn as_recorded_updates( +pub fn as_recorded_updates( arranged: differential_dataflow::operators::arrange::Arranged< - G, + U::Time, differential_dataflow::operators::arrange::TraceAgent>, >, -) -> differential_dataflow::Collection> +) -> differential_dataflow::Collection> where - G: timely::dataflow::Scope, U: layout::ColumnarUpdate, { use timely::dataflow::operators::generic::Operator; diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 66db2fc4e..9d58f154d 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -110,10 +110,10 @@ mod reachability { /// Compute the set of nodes reachable from `roots` along directed `edges`. /// /// Returns `(node, ())` for each reachable node. - pub fn reach>( - edges: Collection>, - roots: Collection>, - ) -> Collection> + pub fn reach( + edges: Collection>, + roots: Collection>, + ) -> Collection> { let outer = edges.inner.scope(); @@ -181,7 +181,7 @@ mod reachability { }); // Extract RecordedUpdates from the Arranged's batch stream. - let result_col = as_recorded_updates::<_, (Node, (), IterTime, Diff)>(result); + let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result); variable.set(result_col.clone()); diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index 35dbac8ba..345f40ab4 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -93,7 +93,7 @@ fn main() { // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection where - G: Scope, + G: timely::progress::Timestamp + Lattice + Ord, { use timely::order::Product; use iterate::Variable; diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 41d82e195..9de864eba 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -68,7 +68,7 @@ use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; type TraceKeyHandle = TraceAgent>; type TraceValHandle = TraceAgent>; -type Arrange = Arranged::Timestamp, R>>; +type Arrange = Arranged>; /// An evolving set of edges. /// @@ -78,7 +78,7 @@ type Arrange = Arranged::Timestamp /// /// An edge variable provides arranged representations of its contents, even before they are /// completely defined, in support of recursively defined productions. -pub struct EdgeVariable> { +pub struct EdgeVariable { variable: VecVariable, collection: VecCollection, current: VecCollection, @@ -86,9 +86,9 @@ pub struct EdgeVariable> { reverse: Option>, } -impl> EdgeVariable { +impl EdgeVariable { /// Creates a new variable initialized with `source`. - pub fn from(source: VecCollection, step: ::Summary) -> Self { + pub fn from(source: VecCollection, step: ::Summary) -> Self { let (variable, collection) = VecVariable::new(&mut source.scope(), step); EdgeVariable { variable, @@ -150,9 +150,9 @@ impl Query { } /// Creates a dataflow implementing the query, and returns input and trace handles. - pub fn render_in(&self, scope: &mut G) -> BTreeMap> + pub fn render_in(&self, scope: &mut Scope) -> BTreeMap> where - G: Scope, + G: Timestamp + Lattice + ::timely::order::TotalOrder, { // Create new input (handle, stream) pairs let mut input_map = BTreeMap::new(); diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index 4c6e3429c..b087af00d 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -39,7 +39,7 @@ fn main() { fn interpret(edges: VecCollection, relations: &[(usize, usize)]) -> VecCollection> where - G: Scope, + G: timely::progress::Timestamp + Lattice + Hash + Ord, { // arrange the edge relation three ways. diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index 0a45fd7ab..b2b6daf97 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -33,7 +33,7 @@ impl, TS> ResultsIn for ContainerWrapper { #[inline(always)] fn results_in(self, step: &TS) -> Self { ContainerWrapper(self.0.results_in(step)) } } -fn wrap(stream: Stream) -> Stream> { +fn wrap(stream: Stream) -> Stream> { let mut builder = OperatorBuilder::new("Wrap".to_string(), stream.scope()); let (mut output, stream_out) = builder.new_output(); let mut input = builder.new_input(stream, Pipeline); diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 00ce5f998..9400a1cdd 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -124,7 +124,7 @@ fn main() { // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection where - G: Scope, + G: timely::progress::Timestamp + Lattice + Ord, { // repeatedly update minimal distances each node can be reached from each root let outer = roots.scope(); diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index caf0e5822..2764dab23 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -79,7 +79,7 @@ fn main() { // to its PageRank in the input graph `edges`. fn pagerank(iters: Iter, edges: VecCollection) -> VecCollection where - G: Scope, + G: timely::progress::Timestamp + Lattice, { // initialize many surfers at each node. let nodes = diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 634e43d56..138de0c86 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -120,7 +120,7 @@ fn frontier( times: VecCollection, ) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, T: Timestamp+std::hash::Hash, { // Translate node and edge transitions into a common Location to Location edge with an associated Summary. @@ -154,7 +154,7 @@ fn summarize( edges: VecCollection, ) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, T: Timestamp, { // Start from trivial reachability from each input to itself. @@ -194,12 +194,12 @@ where /// Identifies cycles along paths that do not increment timestamps. -fn find_cycles( +fn find_cycles( nodes: VecCollection, edges: VecCollection, ) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, T: Timestamp, { // Retain node connections along "default" timestamp summaries. diff --git a/differential-dataflow/examples/stackoverflow.rs b/differential-dataflow/examples/stackoverflow.rs index 3fe521ee2..c88e7ea6f 100644 --- a/differential-dataflow/examples/stackoverflow.rs +++ b/differential-dataflow/examples/stackoverflow.rs @@ -107,7 +107,7 @@ fn main() { // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection where - G: Scope, + G: timely::progress::Timestamp + Lattice + Ord, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 1513622bd..79ff15e45 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -2,7 +2,7 @@ use std::hash::Hash; -use timely::dataflow::*; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::operators::*; @@ -11,7 +11,7 @@ use crate::lattice::Lattice; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs(edges: VecCollection, roots: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, N: ExchangeData+Hash, { let edges = edges.arrange_by_key(); @@ -24,9 +24,9 @@ use crate::operators::arrange::Arranged; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs_arranged(edges: Arranged, roots: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G, Diff=isize>+Clone+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index bfc5d7606..4ff6c32a2 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use timely::order::Product; -use timely::dataflow::*; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; @@ -21,7 +21,7 @@ use crate::operators::iterate::Variable; /// could be good insurance here. pub fn bidijkstra(edges: VecCollection, goals: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, N: ExchangeData+Hash, { let forward = edges.clone().arrange_by_key(); @@ -39,9 +39,9 @@ pub fn bidijkstra_arranged( goals: VecCollection ) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord, N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G, Diff=isize>+Clone+'static, { let outer = forward.stream.scope(); outer.iterative::(|inner| { diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 07692e3bb..e08d788ab 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -2,7 +2,7 @@ use std::hash::Hash; -use timely::dataflow::*; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; @@ -15,7 +15,7 @@ use crate::difference::{Abelian, Multiply}; /// method to limit the introduction of labels. pub fn propagate(edges: VecCollection, nodes: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -32,7 +32,7 @@ where /// method to limit the introduction of labels. pub fn propagate_at(edges: VecCollection, nodes: VecCollection, logic: F) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -53,13 +53,13 @@ use crate::operators::arrange::arrangement::Arranged; /// of `logic should be a number in the interval \[0,64\], pub fn propagate_core(edges: Arranged, nodes: VecCollection, logic: F) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, R: From, L: ExchangeData, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G, Diff=R>+Clone+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index bd1eff850..8efaa85c8 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -3,7 +3,7 @@ use std::mem; use std::hash::Hash; -use timely::dataflow::*; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; @@ -14,7 +14,7 @@ use super::propagate::propagate; /// Returns the subset of edges in the same strongly connected component. pub fn strongly_connected(graph: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, @@ -39,7 +39,7 @@ where fn trim_edges(cycle: VecCollection, edges: VecCollection) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 354ad32c2..930e04930 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -2,7 +2,7 @@ use std::hash::Hash; -use timely::dataflow::*; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; @@ -11,7 +11,7 @@ use crate::hashable::Hashable; fn _color(edges: VecCollection) -> VecCollection)> where - G: Scope, + G: Timestamp + Lattice + Ord + Hash, N: ExchangeData+Hash, { // need some bogus initial values. @@ -45,7 +45,7 @@ pub fn sequence( edges: VecCollection, logic: F) -> VecCollection)> where - G: Scope, + G: Timestamp + Lattice + Hash + Ord, N: ExchangeData+Hashable, V: ExchangeData, F: Fn(&N, &[(&V, isize)])->V+'static diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index 267e4c453..d3dd28fd3 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -1,6 +1,6 @@ //! Assign unique identifiers to records. -use timely::dataflow::Scope; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData, Hashable}; use crate::lattice::Lattice; @@ -8,7 +8,7 @@ use crate::operators::*; use crate::difference::Abelian; /// Assign unique identifiers to elements of a collection. -pub trait Identifiers { +pub trait Identifiers { /// Assign unique identifiers to elements of a collection. /// /// # Example @@ -32,7 +32,7 @@ pub trait Identifiers { impl Identifiers for VecCollection where - G: Scope, + G: Timestamp + Lattice, D: ExchangeData + ::std::hash::Hash, R: ExchangeData + Abelian, { diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index f76f59d9c..39c8d56c3 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -1,13 +1,13 @@ //! Implementation of Parallel Prefix Sum -use timely::dataflow::Scope; +use timely::progress::Timestamp; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; use crate::operators::*; /// Extension trait for the prefix_sum method. -pub trait PrefixSum { +pub trait PrefixSum { /// Computes the prefix sum for each element in the collection. /// /// The prefix sum is data-parallel, in the sense that the sums are computed independently for @@ -21,7 +21,7 @@ pub trait PrefixSum { impl PrefixSum for VecCollection where - G: Scope, + G: Timestamp + Lattice, K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, { @@ -42,7 +42,7 @@ where /// Accumulate data in `collection` into all powers-of-two intervals containing them. pub fn aggregate(collection: VecCollection, combine: F) -> VecCollection where - G: Scope, + G: Timestamp + Lattice, K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, @@ -79,7 +79,7 @@ pub fn broadcast( zero: D, combine: F) -> VecCollection where - G: Scope, + G: Timestamp + Lattice + Ord + ::std::fmt::Debug, K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index 74cced0ca..e9a71f6b8 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -230,7 +230,8 @@ pub mod source { use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}}; use timely::dataflow::operators::generic::OutputBuilder; use timely::progress::Timestamp; - use timely::scheduling::SyncActivator; + use timely::worker::AsWorker; + use timely::scheduling::{Scheduler, SyncActivator}; // TODO(guswynn): implement this generally in timely struct DropActivator { @@ -250,12 +251,11 @@ pub mod source { /// The stream is built in the supplied `scope` and continues to run until /// the returned `Box` token is dropped. The `source_builder` argument /// is invoked with a `SyncActivator` that will re-activate the source. - pub fn build( - scope: G, + pub fn build( + scope: Scope, source_builder: B, - ) -> (Box, Stream>) + ) -> (Box, Stream>) where - G: Scope, B: FnOnce(SyncActivator) -> I, I: Iterator> + 'static, D: ExchangeData + Hash, @@ -563,6 +563,7 @@ pub mod sink { use timely::dataflow::{Scope, Stream}; use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder}; + use timely::scheduling::Scheduler; use crate::{lattice::Lattice, ExchangeData}; use super::{Writer, Message, Progress}; @@ -573,13 +574,12 @@ pub mod sink { /// will *not* perform the consolidation on the stream's behalf. If this is not /// performed before calling the method, the recorded output may not be correctly /// reconstructed by readers. - pub fn build( - stream: Stream>, + pub fn build( + stream: Stream>, sink_hash: u64, updates_sink: Weak>, progress_sink: Weak>, ) where - G: Scope, BS: Writer> + 'static, D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice, diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index e5016ccd4..3720c90e1 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -10,7 +10,6 @@ use timely::Container; use timely::progress::Timestamp; -use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, Stream}; use timely::dataflow::operators::*; @@ -22,7 +21,7 @@ use crate::difference::Abelian; /// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions /// on the containers, and streams of containers, are left to the container implementor to describe. #[derive(Clone)] -pub struct Collection { +pub struct Collection { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is @@ -34,7 +33,7 @@ pub struct Collection { pub inner: Stream, } -impl Collection { +impl Collection { /// Creates a new Collection from a timely dataflow stream. /// /// This method seems to be rarely used, with the `as_collection` method on streams being a more @@ -46,7 +45,7 @@ impl Collection { /// method does not check it. pub fn new(stream: Stream) -> Self { Self { inner: stream } } } -impl Collection { +impl Collection { /// Creates a new collection accumulating the contents of the two collections. /// /// Despite the name, differential dataflow collections are unordered. This method is so named because the @@ -109,7 +108,7 @@ impl Collection { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(self, child: &Child<'a, G::Allocator, G::Timestamp>) -> Collection, C> { + pub fn enter_region(self, child: &Scope) -> Self { self.inner .enter(child) .as_collection() @@ -134,7 +133,7 @@ impl Collection { /// ``` pub fn inspect_container(self, func: F) -> Self where - F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static, + F: FnMut(Result<(&G, &C), &[G]>)+'static, { self.inner .inspect_container(func) @@ -144,7 +143,7 @@ impl Collection { /// /// This probe is used to determine when the state of the Collection has stabilized and can /// be read out. - pub fn probe(self) -> (probe::Handle, Self) { + pub fn probe(self) -> (probe::Handle, Self) { let (handle, stream) = self.inner.probe(); (handle, stream.as_collection()) } @@ -154,11 +153,11 @@ impl Collection { /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to /// avoid swamping the system. - pub fn probe_with(self, handle: &probe::Handle) -> Self { + pub fn probe_with(self, handle: &probe::Handle) -> Self { Self::new(self.inner.probe_with(handle)) } /// The scope containing the underlying timely dataflow stream. - pub fn scope(&self) -> G { + pub fn scope(&self) -> Scope { self.inner.scope() } @@ -216,10 +215,10 @@ impl Collection { /// data.assert_eq(result); /// }); /// ``` - pub fn enter<'a, T>(self, child: &Child<'a, G::Allocator, T>) -> Collection, >::InnerContainer> + pub fn enter(self, child: &Scope) -> Collection>::InnerContainer> where - C: containers::Enter, - T: Refines, + C: containers::Enter, + T: Refines + Timestamp, { use timely::dataflow::channels::pact::Pipeline; self.inner @@ -250,9 +249,9 @@ impl Collection { /// data.results_in(summary1); /// }); /// ``` - pub fn results_in(self, step: ::Summary) -> Self + pub fn results_in(self, step: ::Summary) -> Self where - C: containers::ResultsIn<::Summary>, + C: containers::ResultsIn<::Summary>, { use timely::dataflow::channels::pact::Pipeline; self.inner @@ -266,7 +265,7 @@ impl Collection { use timely::progress::timestamp::Refines; /// Methods requiring a nested scope. -impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container> Collection, C> +impl Collection { /// Returns the final value of a Collection from a nested scope to its containing scope. /// @@ -289,11 +288,11 @@ impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container> Collect /// data.assert_eq(result); /// }); /// ``` - pub fn leave(self, outer: &G) -> Collection>::OuterContainer> + pub fn leave(self, outer: &Scope) -> Collection>::OuterContainer> where - G: Scope, - T: Refines, - C: containers::Leave, + TOuter: Timestamp, + T: Refines, + C: containers::Leave, { use timely::dataflow::channels::pact::Pipeline; self.inner @@ -303,18 +302,14 @@ impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container> Collect }) .as_collection() } -} -/// Methods requiring a region as the scope. -impl<'a, A: timely::communication::Allocate, T: Timestamp, C: Container+Clone+'static> Collection, C> -{ /// Returns the value of a Collection from a nested region to its containing scope. /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region(self, outer: &G) -> Collection + pub fn leave_region(self, outer: &Scope) -> Self where - G: Scope, + C: Clone + 'static, { self.inner .leave(outer) @@ -330,7 +325,7 @@ pub mod vec { use timely::progress::Timestamp; use timely::order::Product; - use timely::dataflow::scopes::child::Iterative; + use timely::dataflow::scope::Iterative; use timely::dataflow::Scope; use timely::dataflow::operators::*; use timely::dataflow::operators::vec::*; @@ -356,11 +351,11 @@ pub mod vec { /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and /// defaults to) `isize`, representing changes to the occurrence count of each record. /// - /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`. - pub type Collection = super::Collection::Timestamp, R)>>; + /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G, R)>`. + pub type Collection = super::Collection>; - impl Collection { + impl Collection { /// Creates a new collection by applying the supplied function to each input element. /// /// # Examples @@ -428,7 +423,7 @@ pub mod vec { /// ``` pub fn flat_map(self, mut logic: L) -> Collection where - G::Timestamp: Clone, + G: Clone, I: IntoIterator, L: FnMut(D) -> I + 'static, { @@ -514,10 +509,10 @@ pub mod vec { /// ``` pub fn join_function(self, mut logic: L) -> Collection>::Output> where - G::Timestamp: Lattice, + G: Lattice, D2: Clone+'static, R2: Semigroup+Multiply, - I: IntoIterator, + I: IntoIterator, L: FnMut(D)->I+'static, { self.inner @@ -549,11 +544,11 @@ pub mod vec { /// data.assert_eq(result); /// }); /// ``` - pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G::Allocator, G::Timestamp, T>, mut initial: F) -> Collection, D, R> + pub fn enter_at(self, child: &Iterative, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, - G::Timestamp: Hash, + G: Hash, { self.inner .enter(child) @@ -573,8 +568,8 @@ pub mod vec { /// to all of the data timestamps). pub fn delay(self, func: F) -> Collection where - G::Timestamp: Hash, - F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, + G: Hash, + F: FnMut(&G) -> G + Clone + 'static, { let mut func1 = func.clone(); let mut func2 = func.clone(); @@ -610,7 +605,7 @@ pub mod vec { /// ``` pub fn inspect(self, func: F) -> Collection where - F: FnMut(&(D, G::Timestamp, R))+'static, + F: FnMut(&(D, G, R))+'static, { self.inner .inspect(func) @@ -636,7 +631,7 @@ pub mod vec { /// ``` pub fn inspect_batch(self, mut func: F) -> Collection where - F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, + F: FnMut(&G, &[(D, G, R)])+'static, { self.inner .inspect_batch(move |time, data| func(time, data)) @@ -666,7 +661,7 @@ pub mod vec { where D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Hashable + Semigroup, - G::Timestamp: Lattice+Ord, + G: Lattice+Ord, { self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); @@ -674,7 +669,7 @@ pub mod vec { } /// Methods requiring an Abelian difference, to support negation. - impl, D: Clone+'static, R: Abelian+'static> Collection { + impl Collection { /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation @@ -702,7 +697,7 @@ pub mod vec { where D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Hashable, - G::Timestamp: Lattice+Ord, + G: Lattice+Ord, { self.negate() .concat(other) @@ -715,7 +710,7 @@ pub mod vec { impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup, @@ -792,7 +787,7 @@ pub mod vec { /// ``` pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where - T2: for<'a> Trace= &'a K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, + T2: for<'a> Trace= &'a K, ValOwn = V, Time=G, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { @@ -811,7 +806,7 @@ pub mod vec { pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where V: Clone+'static, - T2: for<'a> Trace=&'a K, ValOwn = V, Time=G::Timestamp>+'static, + T2: for<'a> Trace=&'a K, ValOwn = V, Time=G>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { @@ -826,7 +821,7 @@ pub mod vec { impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, R1: crate::ExchangeData+Semigroup { @@ -885,7 +880,7 @@ pub mod vec { use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) - .reduce_abelian::<_,KeyBuilder,KeySpine,_>( + .reduce_abelian::<_,KeyBuilder,KeySpine,_>( name, move |k,s,t| t.push(((), thresh(k, &s[0].1))), |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, @@ -897,7 +892,7 @@ pub mod vec { impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup { @@ -926,7 +921,7 @@ pub mod vec { pub fn count_core + 'static>(self) -> Collection { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") - .reduce_abelian::<_,ValBuilder,ValSpine,_>( + .reduce_abelian::<_,ValBuilder,ValSpine,_>( "Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))), |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); }, @@ -938,7 +933,7 @@ pub mod vec { /// Methods which require data be arrangeable. impl Collection where - G: Scope, + G: Timestamp + Clone + 'static + Lattice, D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup, { @@ -972,8 +967,8 @@ pub mod vec { /// and provide the function `reify` to produce owned keys and values.. pub fn consolidate_named(self, name: &str, reify: F) -> Self where - Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, - Tr: for<'a> crate::trace::Trace+'static, + Ba: crate::trace::Batcher, Time=G> + 'static, + Tr: for<'a> crate::trace::Trace+'static, Bu: crate::trace::Builder, F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, { @@ -1031,35 +1026,35 @@ pub mod vec { use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder}; use crate::operators::arrange::Arrange; - impl Arrange> for Collection + impl Arrange> for Collection where - G: Scope, + G: Timestamp + Lattice, K: crate::ExchangeData + Hashable, V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { fn arrange_named(self, name: &str) -> Arranged> where - Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, - Bu: crate::trace::Builder, - Tr: crate::trace::Trace + 'static, + Ba: crate::trace::Batcher, Time=G> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, { - let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G,R)| (update.0).0.hashed().into()); crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name) } } - impl Arrange> for Collection + impl Arrange> for Collection where - G: Scope, + G: Timestamp + Lattice + Ord, { fn arrange_named(self, name: &str) -> Arranged> where - Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, - Bu: crate::trace::Builder, - Tr: crate::trace::Trace + 'static, + Ba: crate::trace::Batcher, Time=G> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, { - let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G,R)| (update.0).0.hashed().into()); crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } @@ -1067,38 +1062,38 @@ pub mod vec { impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, { /// Arranges a collection of `(Key, Val)` records by `Key`. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(self) -> Arranged>> { + pub fn arrange_by_key(self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } } impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, { /// Arranges a collection of `Key` records by `Key`. /// /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(self) -> Arranged>> { + pub fn arrange_by_self(self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } @@ -1106,7 +1101,7 @@ pub mod vec { impl Collection where - G: Scope, + G: Timestamp + Lattice + Ord, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup, @@ -1252,7 +1247,7 @@ pub mod vec { /// ``` pub fn join_core (self, stream2: Arranged, result: L) -> Collection>::Output> where - Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=G>+Clone+'static, R: Multiply, I: IntoIterator, L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, @@ -1264,12 +1259,12 @@ pub mod vec { } /// Conversion to a differential dataflow Collection. -pub trait AsCollection { +pub trait AsCollection { /// Converts the type to a differential dataflow collection. fn as_collection(self) -> Collection; } -impl AsCollection for Stream { +impl AsCollection for Stream { /// Converts the type to a differential dataflow collection. /// /// By calling this method, you guarantee that the timestamp invariant (as documented on @@ -1300,9 +1295,9 @@ impl AsCollection for Stream { /// .assert_eq(data); /// }); /// ``` -pub fn concatenate(scope: &mut G, iterator: I) -> Collection +pub fn concatenate(scope: &mut Scope, iterator: I) -> Collection where - G: Scope, + G: Timestamp, C: Container, I: IntoIterator>, { diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index 19c5472a6..9efe58f6c 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -13,7 +13,6 @@ pub mod pointstamp; -use timely::dataflow::Scope; use timely::order::Product; use timely::progress::Timestamp; use timely::dataflow::operators::generic::{OutputBuilder, builder_rc::OperatorBuilder}; @@ -26,9 +25,8 @@ use crate::collection::AsCollection; use crate::dynamic::pointstamp::PointStamp; use crate::dynamic::pointstamp::PointStampSummary; -impl VecCollection +impl VecCollection>, D, R> where - G: Scope>>, D: Data, R: Semigroup+'static, T: Timestamp+Default, diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 024aeb10d..0ef606ac7 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -41,7 +41,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession, VecCollection) + fn new_collection(&mut self) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,28 +93,28 @@ pub trait Input : TimelyInput { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) where I: IntoIterator+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; -impl Input for G where G::Timestamp: Lattice { - fn new_collection(&mut self) -> (InputSession, VecCollection) +impl Input for Scope { + fn new_collection(&mut self) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection) where I: IntoIterator+'static, I::Item: Data { - self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) + self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) + fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection) where D: Data, R: Semigroup+'static, - I: IntoIterator+'static, + I: IntoIterator+'static, { use timely::dataflow::operators::ToStream; @@ -122,7 +122,8 @@ impl Input for G where G::Timestamp: Lattice { let source = data.to_stream(self).as_collection(); (InputSession::from(handle), stream.as_collection().concat(source)) - }} + } +} /// An input session wrapping a single timely dataflow capability. /// @@ -198,9 +199,9 @@ impl InputSession { impl InputSession { /// Introduces a handle as collection. - pub fn to_collection(&mut self, scope: &mut G) -> VecCollection + pub fn to_collection(&mut self, scope: &mut Scope) -> VecCollection where - G: Scope, + T: timely::order::TotalOrder, { scope .input_from(&mut self.handle) diff --git a/differential-dataflow/src/logging.rs b/differential-dataflow/src/logging.rs index 4fb6ef430..1ba9833f8 100644 --- a/differential-dataflow/src/logging.rs +++ b/differential-dataflow/src/logging.rs @@ -10,9 +10,8 @@ pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder< pub type Logger = ::timely::logging_core::TypedLogger; /// Enables logging of differential dataflow events. -pub fn enable(worker: &mut timely::worker::Worker, writer: W) -> Option> +pub fn enable(worker: &mut timely::worker::Worker, writer: W) -> Option> where - A: timely::communication::Allocate, W: std::io::Write + 'static, { worker.log_register().and_then(|mut log_register| { diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 715542f55..7da94dc84 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -9,6 +9,7 @@ use timely::dataflow::operators::generic::{OperatorInfo, source}; use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; +use timely::scheduling::Scheduler; use crate::trace::{Trace, TraceReader, BatchReader}; use crate::trace::wrappers::rc::TraceBox; @@ -215,17 +216,13 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import(&mut self, scope: &G) -> Arranged> - where - G: Scope, + pub fn import(&mut self, scope: &Scope) -> Arranged> { self.import_named(scope, "ArrangedSource") } /// Same as `import`, but allows to name the source. - pub fn import_named(&mut self, scope: &G, name: &str) -> Arranged> - where - G: Scope, + pub fn import_named(&mut self, scope: &Scope, name: &str) -> Arranged> { // Drop ShutdownButton and return only the arrangement. self.import_core(scope, name).0 @@ -278,9 +275,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core(&mut self, scope: &G, name: &str) -> (Arranged>, ShutdownButton>) - where - G: Scope, + pub fn import_core(&mut self, scope: &Scope, name: &str) -> (Arranged>, ShutdownButton>) { let trace = self.clone(); @@ -393,9 +388,8 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier(&mut self, scope: &G, name: &str) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier(&mut self, scope: &Scope, name: &str) -> (Arranged>>, ShutdownButton>) where - G: Scope, Tr: TraceReader, { // This frontier describes our only guarantee on the compaction frontier. @@ -411,9 +405,8 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) + pub fn import_frontier_core(&mut self, scope: &Scope, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) where - G: Scope, Tr: TraceReader, { let trace = self.clone(); diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index cd0d00dfc..dcfa0bcf8 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -25,6 +25,8 @@ use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline}; use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; +use timely::scheduling::Scheduler; +use timely::worker::AsWorker; use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; @@ -43,7 +45,7 @@ use super::TraceAgent; /// computation, memory) required to produce and maintain an indexed representation of a collection. pub struct Arranged where - G: Scope, + G: Timestamp, Tr: TraceReader+Clone, { /// A stream containing arranged updates. @@ -60,8 +62,8 @@ where impl Clone for Arranged where - G: Scope, - Tr: TraceReader + Clone, + G: Timestamp, + Tr: TraceReader) { + pub fn execute(self, manager: &mut Manager, worker: &mut Worker) { match self { diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index 322dd16d2..0b076f36f 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -3,7 +3,6 @@ use std::hash::Hash; use std::time::Duration; -use timely::communication::Allocate; use timely::worker::Worker; use timely::logging::TimelyEvent; use timely::dataflow::operators::capture::event::EventIterator; @@ -20,16 +19,15 @@ pub trait LoggingValue : VectorFrom+VectorFrom { impl+VectorFrom> LoggingValue for V { } /// Timely logging capture and arrangement. -pub fn publish_timely_logging( +pub fn publish_timely_logging( manager: &mut Manager, - worker: &mut Worker, + worker: &mut Worker, granularity_ns: u64, name: &str, events: I ) where V: ExchangeData+Hash+LoggingValue+Datum, - A: Allocate, I : IntoIterator, ::Item: EventIterator>+'static { @@ -209,16 +207,15 @@ where } /// Timely logging capture and arrangement. -pub fn publish_differential_logging( +pub fn publish_differential_logging( manager: &mut Manager, - worker: &mut Worker, + worker: &mut Worker, granularity_ns: u64, name: &str, events: I ) where V: ExchangeData+Hash+LoggingValue+Datum, - A: Allocate, I : IntoIterator, ::Item: EventIterator>+'static { diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index 8579a538e..f9a0f3872 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -5,7 +5,6 @@ use std::hash::Hash; // use std::time::Duration; use timely::dataflow::ProbeHandle; -use timely::communication::Allocate; use timely::worker::Worker; use timely::logging::TimelyEventBuilder; @@ -79,7 +78,7 @@ impl Manager // } /// Clear the managed inputs and traces. - pub fn shutdown(&mut self, worker: &mut Worker) { + pub fn shutdown(&mut self, worker: &mut Worker) { self.inputs.sessions.clear(); self.traces.inputs.clear(); self.traces.arrangements.clear(); diff --git a/interactive/src/plan/concat.rs b/interactive/src/plan/concat.rs index 4c27941bb..149b81049 100644 --- a/interactive/src/plan/concat.rs +++ b/interactive/src/plan/concat.rs @@ -19,10 +19,10 @@ impl Render for Concat { type Value = V; - fn render>( + fn render( &self, - scope: &mut S, - arrangements: &mut TraceManager) -> VecCollection, Diff> + scope: &mut Scope