Skip to content
Merged
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ rust-version = "1.86"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" }
timely = { version = "0.28", default-features = false }
#timely = { version = "0.28", default-features = false }
columnar = { version = "0.12", default-features = false }
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[workspace.lints.clippy]
Expand Down
56 changes: 27 additions & 29 deletions diagnostics/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::{AsCollection, VecCollection};

use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::{Event, EventLink, Replay, Capture};
Expand Down Expand Up @@ -244,12 +243,11 @@ fn quantize(time: Duration, interval: Duration) -> Duration {
}

/// Quantize timestamps in a collection's inner stream.
fn quantize_collection<S, D>(
collection: VecCollection<S, D, i64>,
fn quantize_collection<D>(
collection: VecCollection<Duration, D, i64>,
interval: Duration,
) -> VecCollection<S, D, i64>
) -> VecCollection<Duration, D, i64>
where
S: Scope<Timestamp = Duration>,
D: differential_dataflow::Data,
{
collection
Expand All @@ -276,7 +274,7 @@ where
///
/// Returns a [`LoggingState`] with trace handles and a [`SinkHandle`] for
/// the WebSocket thread.
pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> LoggingState {
pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState {
let start = Instant::now();

// Event links for logging capture (worker-internal, Rc-based).
Expand Down Expand Up @@ -377,8 +375,8 @@ pub fn register<A: Allocate>(worker: &mut Worker<A>, log_logging: bool) -> Loggi
}
}

fn install_loggers<A: Allocate>(
worker: &mut Worker<A>,
fn install_loggers(
worker: &mut Worker,
t_link: Rc<EventLink<Duration, Vec<(Duration, TimelyEvent)>>>,
d_link: Rc<EventLink<Duration, Vec<(Duration, DifferentialEvent)>>>,
) {
Expand All @@ -402,11 +400,11 @@ fn install_loggers<A: Allocate>(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct TimelyCollections<S: Scope> {
operators: VecCollection<S, (usize, String, Vec<usize>), i64>,
channels: VecCollection<S, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<S, usize, i64>,
messages: VecCollection<S, usize, i64>,
struct TimelyCollections {
operators: VecCollection<Duration, (usize, String, Vec<usize>), i64>,
channels: VecCollection<Duration, (usize, Vec<usize>, (usize, usize), (usize, usize)), i64>,
elapsed: VecCollection<Duration, usize, i64>,
messages: VecCollection<Duration, usize, i64>,
}

#[derive(Default)]
Expand All @@ -416,10 +414,10 @@ struct TimelyDemuxState {
}

/// Build timely logging collections and arrangements.
fn construct_timely<S: Scope<Timestamp = Duration>>(
scope: &mut S,
stream: Stream<S, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections<S>) {
fn construct_timely(
scope: &mut Scope<Duration>,
stream: Stream<Duration, Vec<(Duration, TimelyEvent)>>,
) -> (TimelyTraces, TimelyCollections) {
type OpUpdate = ((usize, String, Vec<usize>), Duration, i64);
type ChUpdate = ((usize, Vec<usize>, (usize, usize), (usize, usize)), Duration, i64);
type ElUpdate = (usize, Duration, i64);
Expand Down Expand Up @@ -536,21 +534,21 @@ fn construct_timely<S: Scope<Timestamp = Duration>>(
// ============================================================================

/// Internal: collections before arrangement, used for the cross-join.
struct DifferentialCollections<S: Scope> {
arrangement_batches: VecCollection<S, usize, i64>,
arrangement_records: VecCollection<S, usize, i64>,
sharing: VecCollection<S, usize, i64>,
batcher_records: VecCollection<S, usize, i64>,
batcher_size: VecCollection<S, usize, i64>,
batcher_capacity: VecCollection<S, usize, i64>,
batcher_allocations: VecCollection<S, usize, i64>,
struct DifferentialCollections {
arrangement_batches: VecCollection<Duration, usize, i64>,
arrangement_records: VecCollection<Duration, usize, i64>,
sharing: VecCollection<Duration, usize, i64>,
batcher_records: VecCollection<Duration, usize, i64>,
batcher_size: VecCollection<Duration, usize, i64>,
batcher_capacity: VecCollection<Duration, usize, i64>,
batcher_allocations: VecCollection<Duration, usize, i64>,
}

/// Build differential logging collections and arrangements.
fn construct_differential<S: Scope<Timestamp = Duration>>(
scope: &mut S,
stream: Stream<S, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections<S>) {
fn construct_differential(
scope: &mut Scope<Duration>,
stream: Stream<Duration, Vec<(Duration, DifferentialEvent)>>,
) -> (DifferentialTraces, DifferentialCollections) {
type Update = (usize, Duration, i64);

let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone());
Expand Down
9 changes: 4 additions & 5 deletions differential-dataflow/examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() {
let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let batch: usize = 10_000;

// This computation demonstrates in-place accumulation of arbitrarily large
// This computation demonstrates in-place accumulation of arbitrarily large
// volumes of input data, consuming space bounded by the number of distinct keys.
timely::execute_from_args(std::env::args().skip(2), move |worker| {

Expand All @@ -17,11 +17,10 @@ fn main() {
let mut input = worker.dataflow::<(), _, _>(|scope| {
let (input, data) = scope.new_collection::<_, isize>();

use timely::dataflow::Scope;
scope.iterative::<u32,_,_>(|inner| {
data.enter_at(inner, |_| 0)
.consolidate()
.leave()
.leave(&scope)
});

input
Expand All @@ -41,7 +40,7 @@ fn main() {
}
counter += batch;

worker.step();
worker.step();
let elapsed = timer.elapsed();

if elapsed.as_secs() as usize > last_sec {
Expand All @@ -54,4 +53,4 @@ fn main() {
}

}).unwrap();
}
}
5 changes: 2 additions & 3 deletions differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
Expand Down Expand Up @@ -91,9 +90,9 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
fn bfs<T>(edges: VecCollection<T, Edge>, roots: VecCollection<T, Node>) -> VecCollection<T, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: timely::progress::Timestamp + Lattice + Ord,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
26 changes: 11 additions & 15 deletions differential-dataflow/examples/columnar/columnar_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ pub mod arrangement {

impl<U: Update> Merger for TrieMerger<U>
where
U::Time: Ord + PartialOrder + Clone + 'static,
U::Time: 'static,
{
type Chunk = Updates<U>;
type Time = U::Time;
Expand Down Expand Up @@ -646,7 +646,7 @@ pub mod arrangement {

impl<U: Update> TrieMerger<U>
where
U::Time: Ord + PartialOrder + Clone + 'static,
U::Time: 'static,
{
/// Iterator-based merge: flatten, merge, consolidate, form.
/// Correct but slow — used as fallback.
Expand Down Expand Up @@ -1839,13 +1839,12 @@ pub mod updates {
/// joins output times with input times, multiplies output diffs with input diffs.
///
/// This subsumes map, filter, negate, and enter_at for columnar collections.
pub fn join_function<G, U, I, L>(
input: differential_dataflow::Collection<G, RecordedUpdates<U>>,
pub fn join_function<U, I, L>(
input: differential_dataflow::Collection<U::Time, RecordedUpdates<U>>,
mut logic: L,
) -> differential_dataflow::Collection<G, RecordedUpdates<U>>
) -> differential_dataflow::Collection<U::Time, RecordedUpdates<U>>
where
G: timely::dataflow::Scope,
G::Timestamp: differential_dataflow::lattice::Lattice,
U::Time: differential_dataflow::lattice::Lattice,
U: layout::ColumnarUpdate<Diff: differential_dataflow::difference::Multiply<U::Diff, Output = U::Diff>>,
I: IntoIterator<Item = (U::Key, U::Val, U::Time, U::Diff)>,
L: FnMut(
Expand Down Expand Up @@ -1893,12 +1892,11 @@ type DynTime = timely::order::Product<u64, differential_dataflow::dynamic::point
/// that tells timely how the PointStamp is affected (retain `level - 1` coordinates).
///
/// Consolidates after truncation since distinct PointStamp coordinates can collapse.
pub fn leave_dynamic<G, K, V, R>(
input: differential_dataflow::Collection<G, RecordedUpdates<(K, V, DynTime, R)>>,
pub fn leave_dynamic<K, V, R>(
input: differential_dataflow::Collection<DynTime, RecordedUpdates<(K, V, DynTime, R)>>,
level: usize,
) -> differential_dataflow::Collection<G, RecordedUpdates<(K, V, DynTime, R)>>
) -> differential_dataflow::Collection<DynTime, RecordedUpdates<(K, V, DynTime, R)>>
where
G: timely::dataflow::Scope<Timestamp = DynTime>,
K: columnar::Columnar,
V: columnar::Columnar,
R: columnar::Columnar,
Expand Down Expand Up @@ -1972,14 +1970,12 @@ where
///
/// Cursors through each batch and pushes `(key, val, time, diff)` refs into
/// a `ValColBuilder`, which sorts and consolidates on flush.
pub fn as_recorded_updates<G, U>(
pub fn as_recorded_updates<U>(
arranged: differential_dataflow::operators::arrange::Arranged<
G,
differential_dataflow::operators::arrange::TraceAgent<ValSpine<U::Key, U::Val, U::Time, U::Diff>>,
>,
) -> differential_dataflow::Collection<G, RecordedUpdates<U>>
) -> differential_dataflow::Collection<U::Time, RecordedUpdates<U>>
where
G: timely::dataflow::Scope<Timestamp = U::Time>,
U: layout::ColumnarUpdate,
{
use timely::dataflow::operators::generic::Operator;
Expand Down
25 changes: 12 additions & 13 deletions differential-dataflow/examples/columnar/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ fn main() {
mod reachability {

use timely::order::Product;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::iterate::Variable;
Expand All @@ -110,14 +109,14 @@ mod reachability {
/// Compute the set of nodes reachable from `roots` along directed `edges`.
///
/// Returns `(node, ())` for each reachable node.
pub fn reach<G: Scope<Timestamp = Time>>(
edges: Collection<G, RecordedUpdates<(Node, Node, Time, Diff)>>,
roots: Collection<G, RecordedUpdates<(Node, (), Time, Diff)>>,
) -> Collection<G, RecordedUpdates<(Node, (), Time, Diff)>>
pub fn reach(
edges: Collection<Time, RecordedUpdates<(Node, Node, Time, Diff)>>,
roots: Collection<Time, RecordedUpdates<(Node, (), Time, Diff)>>,
) -> Collection<Time, RecordedUpdates<(Node, (), Time, Diff)>>
{
let mut scope = edges.inner.scope();
let outer = edges.inner.scope();

scope.iterative::<u64, _, _>(|nested| {
outer.iterative::<u64, _, _>(|nested| {
let summary = Product::new(Time::default(), 1);

let roots_inner = roots.enter(nested);
Expand All @@ -128,21 +127,21 @@ mod reachability {
let edges_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let reach_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };

let edges_arr = arrange_core::<_, _,
let edges_arr = arrange_core::<_,
ValBatcher<Node, Node, IterTime, Diff>,
ValBuilder<Node, Node, IterTime, Diff>,
ValSpine<Node, Node, IterTime, Diff>,
>(edges_inner.inner, edges_pact, "Edges");

let reach_arr = arrange_core::<_, _,
let reach_arr = arrange_core::<_,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
>(reach.inner, reach_pact, "Reach");

// join_traces with ValColBuilder: produces Stream<_, RecordedUpdates<...>>.
let proposed =
join_traces::<_, _, _, _, ValColBuilder<(Node, (), IterTime, Diff)>>(
join_traces::<_, _, _, ValColBuilder<(Node, (), IterTime, Diff)>>(
edges_arr,
reach_arr,
|_src, dst, (), time, d1, d2, session| {
Expand All @@ -158,7 +157,7 @@ mod reachability {

// Arrange for reduce.
let combined_pact = ValPact { hashfunc: |k: columnar::Ref<'_, Node>| *k as u64 };
let combined_arr = arrange_core::<_, _,
let combined_arr = arrange_core::<_,
ValBatcher<Node, (), IterTime, Diff>,
ValBuilder<Node, (), IterTime, Diff>,
ValSpine<Node, (), IterTime, Diff>,
Expand All @@ -181,12 +180,12 @@ mod reachability {
});

// Extract RecordedUpdates from the Arranged's batch stream.
let result_col = as_recorded_updates::<_, (Node, (), IterTime, Diff)>(result);
let result_col = as_recorded_updates::<(Node, (), IterTime, Diff)>(result);

variable.set(result_col.clone());

// Leave the iterative scope.
result_col.leave()
result_col.leave(&outer)
})
}
}
10 changes: 5 additions & 5 deletions differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
Expand Down Expand Up @@ -91,9 +90,9 @@ fn main() {
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
fn bfs<T>(edges: VecCollection<T, Edge>, roots: VecCollection<T, Node>) -> VecCollection<T, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: timely::progress::Timestamp + Lattice + Ord,
{
use timely::order::Product;
use iterate::Variable;
Expand All @@ -103,7 +102,8 @@ where
let nodes = roots.map(|x| (x, 0));

// repeatedly update minimal distances each node can be reached from each root
nodes.scope().iterative::<PointStamp<usize>, _, _>(|inner| {
let outer = nodes.scope();
outer.iterative::<PointStamp<usize>, _, _>(|inner| {

// These enter the statically bound scope, rather than any iterative scopes.
// We do not *need* to enter them into the dynamic scope, as they are static
Expand All @@ -126,7 +126,7 @@ where
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
next.leave_dynamic(1)
.inspect(|x| println!("{:?}", x))
.leave()
.leave(&outer)
})

}
Loading
Loading