diff --git a/Cargo.toml b/Cargo.toml index f7ec724d8..86343ddb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ resolver = "2" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.19.1" } -timely = { version = "0.26", default-features = false } +#timely = { version = "0.26", default-features = false } columnar = { version = "0.11", default-features = false } -#timely = { path = "../timely-dataflow/timely/", default-features = false } +timely = { path = "../timely-dataflow/timely/", default-features = false } [profile.release] opt-level = 3 diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 96425dad0..31102654a 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -19,14 +19,14 @@ use crate::operators::iterate::Variable; /// Goals that cannot reach from the source to the target are relatively expensive, as /// the entire graph must be explored to confirm this. A graph connectivity pre-filter /// could be good insurance here. -pub fn bidijkstra(edges: &VecCollection, goals: &VecCollection) -> VecCollection +pub fn bidijkstra(edges: VecCollection, goals: VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, { - let forward = edges.arrange_by_key(); + let forward = edges.clone().arrange_by_key(); let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key(); - bidijkstra_arranged(&forward, &reverse, goals) + bidijkstra_arranged(forward, reverse, goals) } use crate::trace::TraceReader; @@ -34,9 +34,9 @@ use crate::operators::arrange::Arranged; /// Bi-directional Dijkstra search using arranged forward and reverse edge collections. pub fn bidijkstra_arranged( - forward: &Arranged, - reverse: &Arranged, - goals: &VecCollection + forward: Arranged, + reverse: Arranged, + goals: VecCollection ) -> VecCollection where G: Scope, @@ -71,50 +71,51 @@ where // This is a cyclic join, which should scare us a bunch. let reached = forward - .join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2)) + .join_map((&*reverse).clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) - .semijoin(&goals); + .semijoin(goals); let active = reached .negate() .map(|(srcdst,_)| srcdst) - .concat(&goals) + .concat(goals) .consolidate(); // Let's expand out forward queries that are active. let forward_active = active.map(|(x,_y)| x).distinct(); let forward_next = forward + .clone() .map(|(med, (src, dist))| (src, (med, dist))) - .semijoin(&forward_active) + .semijoin(forward_active) .map(|(src, (med, dist))| (med, (src, dist))) - .join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1)))) - .concat(&forward) + .join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1)))) + .concat((&*forward).clone()) .map(|(next, (src, dist))| ((next, src), dist)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) .map(|((next, src), dist)| (next, (src, dist))); forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x)); - forward.set(&forward_next); + forward.set(forward_next); // Let's expand out reverse queries that are active. let reverse_active = active.map(|(_x,y)| y).distinct(); let reverse_next = reverse .map(|(med, (rev, dist))| (rev, (med, dist))) - .semijoin(&reverse_active) + .semijoin(reverse_active) .map(|(rev, (med, dist))| (med, (rev, dist))) - .join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1)))) - .concat(&reverse) + .join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1)))) + .concat(reverse) .map(|(next, (rev, dist))| ((next, rev), dist)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) .map(|((next,rev), dist)| (next, (rev, dist))); reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x)); - reverse.set(&reverse_next); + reverse.set(reverse_next); reached.leave() }) diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 635bb38e9..2a9c1332b 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -13,7 +13,7 @@ use crate::difference::{Abelian, Multiply}; /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate(edges: &VecCollection, nodes: &VecCollection) -> VecCollection +pub fn propagate(edges: VecCollection, nodes: VecCollection) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -22,7 +22,7 @@ where R: From, L: ExchangeData, { - propagate_core(&edges.arrange_by_key(), nodes, |_label| 0) + propagate_core(edges.arrange_by_key(), nodes, |_label| 0) } /// Propagates labels forward, retaining the minimum label. @@ -30,7 +30,7 @@ where /// This algorithm naively propagates all labels at once, much like standard label propagation. /// To more carefully control the label propagation, consider `propagate_core` which supports a /// method to limit the introduction of labels. -pub fn propagate_at(edges: &VecCollection, nodes: &VecCollection, logic: F) -> VecCollection +pub fn propagate_at(edges: VecCollection, nodes: VecCollection, logic: F) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -40,7 +40,7 @@ where L: ExchangeData, F: Fn(&L)->u64+Clone+'static, { - propagate_core(&edges.arrange_by_key(), nodes, logic) + propagate_core(edges.arrange_by_key(), nodes, logic) } use crate::trace::TraceReader; @@ -51,7 +51,7 @@ use crate::operators::arrange::arrangement::Arranged; /// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows /// a method `logic` to specify the rounds in which we introduce various labels. The output /// of `logic should be a number in the interval \[0,64\], -pub fn propagate_core(edges: &Arranged, nodes: &VecCollection, logic: F) -> VecCollection +pub fn propagate_core(edges: Arranged, nodes: VecCollection, logic: F) -> VecCollection where G: Scope, N: ExchangeData+Hash, @@ -91,14 +91,14 @@ where let labels = proposals - .concat(&nodes) + .concat(nodes) .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); let propagate: VecCollection<_, (N, L), R> = labels - .join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone()))); + .join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone()))); - proposals.set(&propagate); + proposals.set(propagate); labels .as_collection(|k,v| (k.clone(), v.clone())) diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index c3ad20a81..2d406db84 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -21,19 +21,19 @@ where R: Multiply, R: From, { - graph.iterate(|edges| { + graph.clone().iterate(|edges| { // keep edges from active edge destinations. let active = edges.map(|(_src,dst)| dst) .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) }); graph.enter(&edges.scope()) - .semijoin(&active) + .semijoin(active) }) } /// Returns the subset of edges in the same strongly connected component. -pub fn strongly_connected(graph: &VecCollection) -> VecCollection +pub fn strongly_connected(graph: VecCollection) -> VecCollection where G: Scope, N: ExchangeData + Hash, @@ -43,12 +43,12 @@ where { graph.iterate(|inner| { let edges = graph.enter(&inner.scope()); - let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); - trim_edges(&trim_edges(inner, &edges), &trans) + let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); + trim_edges(trim_edges(inner, edges), trans) }) } -fn trim_edges(cycle: &VecCollection, edges: &VecCollection) +fn trim_edges(cycle: VecCollection, edges: VecCollection) -> VecCollection where G: Scope, @@ -62,10 +62,10 @@ where // NOTE: With a node -> int function, can be improved by: // let labels = propagate_at(&cycle, &nodes, |x| *x as u64); - let labels = propagate(cycle, &nodes); + let labels = propagate(cycle, nodes); - edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone()))) - .join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))) + edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone()))) + .join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))) .filter(|(_,(l1,l2))| l1 == l2) .map(|((x1,x2),_)| (x2,x1)) } diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 3f4bbcf29..f9c70b4f2 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -9,17 +9,17 @@ use crate::lattice::Lattice; use crate::operators::*; use crate::hashable::Hashable; -fn _color(edges: &VecCollection) -> VecCollection)> +fn _color(edges: VecCollection) -> VecCollection)> where G: Scope, N: ExchangeData+Hash, { // need some bogus initial values. - let start = edges.map(|(x,_y)| (x,u32::max_value())) + let start = edges.clone().map(|(x,_y)| (x,u32::max_value())) .distinct(); // repeatedly apply color-picking logic. - sequence(&start, edges, |_node, vals| { + sequence(start, edges, |_node, vals| { // look for the first absent positive integer. // start at 1 in case we ever use NonZero. @@ -40,8 +40,8 @@ where /// fired, and we apply `logic` to the new state of lower neighbors and /// the old state (input) of higher neighbors. pub fn sequence( - state: &VecCollection, - edges: &VecCollection, + state: VecCollection, + edges: VecCollection, logic: F) -> VecCollection)> where G: Scope, @@ -60,18 +60,18 @@ where // .map(|x| (x.0, Some(x.1))); // break edges into forward and reverse directions. - let forward = edges.filter(|edge| edge.0 < edge.1); + let forward = edges.clone().filter(|edge| edge.0 < edge.1); let reverse = edges.filter(|edge| edge.0 > edge.1); // new state goes along forward edges, old state along reverse edges - let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone())); + let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone())); - let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct(); + let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct(); let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap())); - let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone())); + let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone())); - let messages = new_messages.concat(&old_messages).antijoin(&incomplete); + let messages = new_messages.concat(old_messages).antijoin(incomplete.clone()); // // determine who has incoming `None` messages, and suppress all of them. // let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct(); @@ -81,6 +81,6 @@ where // .concat(&old_messages) // /-- possibly too clever: None if any inputs None. // .antijoin(&incomplete) .reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1))) - .concat(&incomplete.map(|x| (x, None))) + .concat(incomplete.map(|x| (x, None))) }) } diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index e44dbe92c..e226382d9 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -54,7 +54,7 @@ where use crate::collection::AsCollection; - let init = self.map(|record| (0, record)); + let init = self.clone().map(|record| (0, record)); timely::dataflow::operators::generic::operator::empty(&init.scope()) .as_collection() .iterate(|diff| @@ -80,7 +80,7 @@ where }) .map(|(_hash, pair)| pair) ) - .concat(&init) + .concat(init) .map(|pair| { let hash = pair.hashed(); (pair.1, hash) }) } } @@ -109,8 +109,10 @@ mod tests { timely::dataflow::operators::generic::operator::empty(&init.scope()) .as_collection() .iterate(|diff| - init.enter(&diff.scope()) - .concat(&diff) + init + .clone() + .enter(&diff.scope()) + .concat(diff) .map(|(round, num)| ((round + num) / 10, (round, num))) .reduce(|_hash, input, output| { println!("Input: {:?}", input); @@ -129,7 +131,7 @@ mod tests { .inspect(|x| println!("{:?}", x)) .map(|(_hash, pair)| pair) ) - .concat(&init) + .concat(init) .map(|(round, num)| { (num, (round + num) / 10) }) .map(|(_data, id)| id) .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 }) diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index e479c8229..5823054e0 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -13,10 +13,10 @@ pub trait PrefixSum { /// The prefix sum is data-parallel, in the sense that the sums are computed independently for /// each key of type `K`. For a single prefix sum this type can be `()`, but this permits the /// more general accumulation of multiple independent sequences. - fn prefix_sum(&self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; + fn prefix_sum(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; /// Determine the prefix sum at each element of `location`. - fn prefix_sum_at(&self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; + fn prefix_sum_at(self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static; } impl PrefixSum for VecCollection @@ -25,11 +25,11 @@ where K: ExchangeData + ::std::hash::Hash, D: ExchangeData + ::std::hash::Hash, { - fn prefix_sum(&self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { - self.prefix_sum_at(self.map(|(x,_)| x), zero, combine) + fn prefix_sum(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { + self.clone().prefix_sum_at(self.map(|(x,_)| x), zero, combine) } - fn prefix_sum_at(&self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { + fn prefix_sum_at(self, locations: VecCollection, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { let combine1 = ::std::rc::Rc::new(combine); let combine2 = combine1.clone(); @@ -51,13 +51,13 @@ where let unit_ranges = collection.map(|((index, key), data)| ((index, 0, key), data)); unit_ranges - .iterate(|ranges| + .iterate(|ranges| { // Each available range, of size less than usize::max_value(), advertises itself as the range // twice as large, aligned to integer multiples of its size. Each range, which may contain at // most two elements, then summarizes itself using the `combine` function. Finally, we re-add // the initial `unit_ranges` intervals, so that the set of ranges grows monotonically. - + let scope = ranges.scope(); ranges .filter(|&((_pos, log, _), _)| log < 64) .map(|((pos, log, key), data)| ((pos >> 1, log + 1, key), (pos, data))) @@ -66,8 +66,8 @@ where if input.len() > 1 { result = combine(key, &result, &(input[1].0).1); } output.push((result, 1)); }) - .concat(&unit_ranges.enter(&ranges.scope())) - ) + .concat(unit_ranges.enter(&scope)) + }) } /// Produces the accumulated values at each of the `usize` locations in `queries`. @@ -115,17 +115,17 @@ where // Acquire each requested range. let full_ranges = ranges - .semijoin(&requests); + .semijoin(requests); // Each requested range should exist, even if as a zero range, for correct reconstruction. let zero_ranges = full_ranges .map(move |((idx, log, key), _)| ((idx, log, key), zero0.clone())) .negate() - .concat(&requests.map(move |(idx, log, key)| ((idx, log, key), zero1.clone()))); + .concat(requests.map(move |(idx, log, key)| ((idx, log, key), zero1.clone()))); // Merge occupied and empty ranges. - let used_ranges = full_ranges.concat(&zero_ranges); + let used_ranges = full_ranges.concat(zero_ranges); // Each key should initiate a value of `zero` at position `0`. let init_states = @@ -141,8 +141,8 @@ where .map(|((pos, log, key), data)| ((pos << log, key), (log, data))) .join_map(states, move |&(pos, ref key), &(log, ref data), state| ((pos + (1 << log), key.clone()), combine(key, state, data))) - .concat(&init_states.enter(&states.scope())) + .concat(init_states.enter(&states.scope())) .distinct() }) - .semijoin(&queries) + .semijoin(queries) } diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index 23d9ffd9e..d2a24cb5a 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -390,7 +390,7 @@ pub mod source { // Step 2: The UPDATES operator. let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone()); - let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed())); + let mut input = updates_op.new_input(updates, Exchange::new(|x: &(D, T, R)| x.hashed())); let (changes_out, changes) = updates_op.new_output(); let mut changes_out = OutputBuilder::from(changes_out); let (counts_out, counts) = updates_op.new_output(); @@ -440,11 +440,11 @@ pub mod source { // Step 3: The PROGRESS operator. let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone()); let mut input = progress_op.new_input( - &progress, + progress, Exchange::new(|x: &(usize, Progress)| x.0 as u64), ); let mut counts = - progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed())); + progress_op.new_input(counts, Exchange::new(|x: &(T, i64)| (x.0).hashed())); let (frontier_out, frontier) = progress_op.new_output(); let mut frontier_out = OutputBuilder::from(frontier_out); progress_op.build(move |_capability| { @@ -468,12 +468,12 @@ pub mod source { // Drain all relevant update counts in to the mutable antichain tracking its frontier. counts.for_each(|cap, counts| { updates_frontier.update_iter(counts.iter().cloned()); - capability = Some(cap.retain()); + capability = Some(cap.retain(0)); }); // Drain all progress statements into the queue out of which we will work. input.for_each(|cap, progress| { progress_queue.extend(progress.iter().map(|x| (x.1).clone())); - capability = Some(cap.retain()); + capability = Some(cap.retain(0)); }); // Extract and act on actionable progress messages. @@ -524,7 +524,7 @@ pub mod source { // Step 4: The FEEDBACK operator. let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone()); let mut input = feedback_op.new_input( - &frontier, + frontier, Exchange::new(|x: &(usize, ChangeBatch)| x.0 as u64), ); feedback_op.build(move |_capability| { @@ -574,7 +574,7 @@ pub mod sink { /// performed before calling the method, the recorded output may not be correctly /// reconstructed by readers. pub fn build( - stream: &Stream, + stream: Stream, sink_hash: u64, updates_sink: Weak>, progress_sink: Weak>, @@ -585,11 +585,12 @@ pub mod sink { T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice, R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, { + let scope = stream.scope(); // First we record the updates that stream in. // We can simply record all updates, under the presumption that the have been consolidated // and so any record we see is in fact guaranteed to happen. - let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope()); - let reactivator = stream.scope().activator_for(builder.operator_info().address); + let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), scope.clone()); + let reactivator = scope.activator_for(builder.operator_info().address); let mut input = builder.new_input(stream, Pipeline); let (updates_out, updates) = builder.new_output(); let mut updates_out = OutputBuilder::from(updates_out); @@ -643,10 +644,10 @@ pub mod sink { ); // We use a lower-level builder here to get access to the operator address, for rescheduling. - let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope()); - let reactivator = stream.scope().activator_for(builder.operator_info().address); - let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash)); - let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers(); + let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), scope.clone()); + let reactivator = scope.activator_for(builder.operator_info().address); + let mut input = builder.new_input(updates, Exchange::new(move |_| sink_hash)); + let should_write = scope.index() == (sink_hash as usize) % scope.peers(); // We now record the numbers of updates at each timestamp between lower and upper bounds. // Track the advancing frontier, to know when to produce utterances. diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 6fc091cab..3859b4710 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -8,7 +8,7 @@ //! manually. The higher-level of programming allows differential dataflow to provide efficient //! implementations, and to support efficient incremental updates to the collections. -use timely::{Container, Data}; +use timely::Container; use timely::progress::Timestamp; use timely::dataflow::scopes::Child; use timely::dataflow::Scope; @@ -23,7 +23,7 @@ use crate::difference::Abelian; /// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions /// on the containers, and streams of containers, are left to the container implementor to describe. #[derive(Clone)] -pub struct Collection { +pub struct Collection { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is @@ -70,46 +70,16 @@ impl Collection { /// .assert_eq(&data); /// }); /// ``` - pub fn concat(&self, other: &Self) -> Self { + pub fn concat(self, other: Self) -> Self { self.inner - .concat(&other.inner) - .as_collection() - } - /// Creates a new collection accumulating the contents of the two collections. - /// - /// Despite the name, differential dataflow collections are unordered. This method is so named because the - /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the - /// two collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concatenate(Some(evens)) - /// .assert_eq(&data); - /// }); - /// ``` - pub fn concatenate(&self, sources: I) -> Self - where - I: IntoIterator - { - self.inner - .concatenate(sources.into_iter().map(|x| x.inner)) + .concat(other.inner) .as_collection() } // Brings a Collection into a nested region. /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'a>(&self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, C> { + pub fn enter_region<'a>(self, child: &Child<'a, G, ::Timestamp>) -> Collection::Timestamp>, C> { self.inner .enter(child) .as_collection() @@ -132,7 +102,7 @@ impl Collection { /// .inspect_container(|event| println!("event: {:?}", event)); /// }); /// ``` - pub fn inspect_container(&self, func: F) -> Self + pub fn inspect_container(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static, { @@ -144,7 +114,7 @@ impl Collection { /// /// This probe is used to determine when the state of the Collection has stabilized and can /// be read out. - pub fn probe(&self) -> probe::Handle { + pub fn probe(self) -> probe::Handle { self.inner .probe() } @@ -154,7 +124,7 @@ impl Collection { /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to /// avoid swamping the system. - pub fn probe_with(&self, handle: &probe::Handle) -> Self { + pub fn probe_with(self, handle: &probe::Handle) -> Self { Self::new(self.inner.probe_with(handle)) } /// The scope containing the underlying timely dataflow stream. @@ -185,7 +155,7 @@ impl Collection { /// .assert_eq(&evens); /// }); /// ``` - pub fn negate(&self) -> Self where C: containers::Negate { + pub fn negate(self) -> Self where C: containers::Negate { use timely::dataflow::channels::pact::Pipeline; self.inner .unary(Pipeline, "Negate", move |_,_| move |input, output| { @@ -214,7 +184,7 @@ impl Collection { /// data.assert_eq(&result); /// }); /// ``` - pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, ::Timestamp, T>>::InnerContainer> + pub fn enter<'a, T>(self, child: &Child<'a, G, T>) -> Collection, ::Timestamp, T>>::InnerContainer> where C: containers::Enter<::Timestamp, T, InnerContainer: Container>, T: Refines<::Timestamp>, @@ -248,7 +218,7 @@ impl Collection { /// data.results_in(summary1); /// }); /// ``` - pub fn results_in(&self, step: ::Summary) -> Self + pub fn results_in(self, step: ::Summary) -> Self where C: containers::ResultsIn<::Summary>, { @@ -290,7 +260,7 @@ where /// data.assert_eq(&result); /// }); /// ``` - pub fn leave(&self) -> Collection>::OuterContainer> { + pub fn leave(self) -> Collection>::OuterContainer> { use timely::dataflow::channels::pact::Pipeline; self.inner .leave() @@ -302,13 +272,13 @@ where } /// Methods requiring a region as the scope. -impl Collection, C> +impl Collection, C> { /// Returns the value of a Collection from a nested region to its containing scope. /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region(&self) -> Collection { + pub fn leave_region(self) -> Collection { self.inner .leave() .as_collection() @@ -321,7 +291,6 @@ pub mod vec { use std::hash::Hash; - use timely::Data; use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::scopes::child::Iterative; @@ -368,9 +337,9 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn map(&self, mut logic: L) -> Collection + pub fn map(self, mut logic: L) -> Collection where - D2: Data, + D2: Clone + 'static, L: FnMut(D) -> D2 + 'static, { self.inner @@ -395,7 +364,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn map_in_place(&self, mut logic: L) -> Collection + pub fn map_in_place(self, mut logic: L) -> Collection where L: FnMut(&mut D) + 'static, { @@ -419,10 +388,10 @@ pub mod vec { /// .flat_map(|x| 0 .. x); /// }); /// ``` - pub fn flat_map(&self, mut logic: L) -> Collection + pub fn flat_map(self, mut logic: L) -> Collection where G::Timestamp: Clone, - I: IntoIterator, + I: IntoIterator, L: FnMut(D) -> I + 'static, { self.inner @@ -443,7 +412,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn filter(&self, mut logic: L) -> Collection + pub fn filter(self, mut logic: L) -> Collection where L: FnMut(&D) -> bool + 'static, { @@ -471,9 +440,9 @@ pub mod vec { /// x1.assert_eq(&x2); /// }); /// ``` - pub fn explode(&self, mut logic: L) -> Collection>::Output> + pub fn explode(self, mut logic: L) -> Collection>::Output> where - D2: Data, + D2: Clone + 'static, R2: Semigroup+Multiply, I: IntoIterator, L: FnMut(D)->I+'static, @@ -505,10 +474,10 @@ pub mod vec { /// ); /// }); /// ``` - pub fn join_function(&self, mut logic: L) -> Collection>::Output> + pub fn join_function(self, mut logic: L) -> Collection>::Output> where G::Timestamp: Lattice, - D2: Data, + D2: Clone + 'static, R2: Semigroup+Multiply, I: IntoIterator, L: FnMut(D)->I+'static, @@ -540,7 +509,7 @@ pub mod vec { /// data.assert_eq(&result); /// }); /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, @@ -562,7 +531,7 @@ pub mod vec { /// ordered, they should have the same order or compare equal once `func` is applied to them (this /// is because we advance the timely capability with the same logic, and it must remain `less_equal` /// to all of the data timestamps). - pub fn delay(&self, func: F) -> Collection + pub fn delay(self, func: F) -> Collection where G::Timestamp: Hash, F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, @@ -599,7 +568,7 @@ pub mod vec { /// .inspect(|x| println!("error: {:?}", x)); /// }); /// ``` - pub fn inspect(&self, func: F) -> Collection + pub fn inspect(self, func: F) -> Collection where F: FnMut(&(D, G::Timestamp, R))+'static, { @@ -625,7 +594,7 @@ pub mod vec { /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs)); /// }); /// ``` - pub fn inspect_batch(&self, mut func: F) -> Collection + pub fn inspect_batch(self, mut func: F) -> Collection where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, { @@ -653,7 +622,7 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn assert_empty(&self) + pub fn assert_empty(self) where D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Hashable + Semigroup, @@ -665,7 +634,7 @@ pub mod vec { } /// Methods requiring an Abelian difference, to support negation. - impl, D: Clone+'static, R: Abelian+'static> Collection { + impl, D: Clone+'static, R: Abelian+'static> Collection { /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation @@ -689,7 +658,7 @@ pub mod vec { /// .assert_eq(&data); /// }); /// ``` - pub fn assert_eq(&self, other: &Self) + pub fn assert_eq(self, other: Self) where D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Hashable, @@ -738,13 +707,13 @@ pub mod vec { /// }); /// }); /// ``` - pub fn reduce(&self, logic: L) -> Collection + pub fn reduce(self, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { self.reduce_named("Reduce", logic) } /// As `reduce` with the ability to name the operator. - pub fn reduce_named(&self, name: &str, logic: L) -> Collection + pub fn reduce_named(self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { use crate::trace::implementations::{ValBuilder, ValSpine}; @@ -777,7 +746,7 @@ pub mod vec { /// .trace; /// }); /// ``` - pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T2: for<'a> Trace= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static, Bu: Builder, Output = T2::Batch>, @@ -795,9 +764,9 @@ pub mod vec { /// Unlike `reduce_arranged`, this method may be called with an empty `input`, /// and it may not be safe to index into the first element. /// At least one of the two collections will be non-empty. - pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where - V: Data, + V: Clone + 'static, T2: for<'a> Trace=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, @@ -828,7 +797,7 @@ pub mod vec { /// .distinct(); /// }); /// ``` - pub fn distinct(&self) -> Collection { + pub fn distinct(self) -> Collection { self.distinct_core() } @@ -837,7 +806,7 @@ pub mod vec { /// This method allows `distinct` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - pub fn distinct_core>(&self) -> Collection { + pub fn distinct_core>(self) -> Collection { self.threshold_named("Distinct", |_,_| R2::from(1i8)) } @@ -859,12 +828,12 @@ pub mod vec { /// .threshold(|_,c| c % 2); /// }); /// ``` - pub fn thresholdR2+'static>(&self, thresh: F) -> Collection { + pub fn thresholdR2+'static>(self, thresh: F) -> Collection { self.threshold_named("Threshold", thresh) } /// A `threshold` with the ability to name the operator. - pub fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { + pub fn threshold_namedR2+'static>(self, name: &str, mut thresh: F) -> Collection { use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) @@ -895,14 +864,14 @@ pub mod vec { /// .count(); /// }); /// ``` - pub fn count(&self) -> Collection { self.count_core() } + pub fn count(self) -> Collection { self.count_core() } /// Count for general integer differences. /// /// This method allows `count` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - pub fn count_core + 'static>(&self) -> Collection { + pub fn count_core + 'static>(self) -> Collection { use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") .reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -913,7 +882,7 @@ pub mod vec { /// Methods which require data be arrangeable. impl Collection where - G: Scope, + G: Scope, D: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup, { @@ -937,14 +906,14 @@ pub mod vec { /// .assert_empty(); /// }); /// ``` - pub fn consolidate(&self) -> Self { + pub fn consolidate(self) -> Self { use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) } /// As `consolidate` but with the ability to name the operator, specify the trace type, /// and provide the function `reify` to produce owned keys and values.. - pub fn consolidate_named(&self, name: &str, reify: F) -> Self + pub fn consolidate_named(self, name: &str, reify: F) -> Self where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Tr: for<'a> crate::trace::Trace+'static, @@ -980,7 +949,7 @@ pub mod vec { /// .consolidate_stream(); /// }); /// ``` - pub fn consolidate_stream(&self) -> Self { + pub fn consolidate_stream(self) -> Self { use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; @@ -1011,14 +980,14 @@ pub mod vec { V: crate::ExchangeData, R: crate::ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) + crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name) } } @@ -1026,14 +995,14 @@ pub mod vec { where G: Scope, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, Bu: crate::trace::Builder, Tr: crate::trace::Trace + 'static, { let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) + crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name) } } @@ -1047,12 +1016,12 @@ pub mod vec { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - pub fn arrange_by_key(&self) -> Arranged>> { + pub fn arrange_by_key(self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") } /// As `arrange_by_key` but with the ability to name the arrangement. - pub fn arrange_by_key_named(&self, name: &str) -> Arranged>> { + pub fn arrange_by_key_named(self, name: &str) -> Arranged>> { self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) } } @@ -1066,12 +1035,12 @@ pub mod vec { /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - pub fn arrange_by_self(&self) -> Arranged>> { + pub fn arrange_by_self(self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") } /// As `arrange_by_self` but with the ability to name the arrangement. - pub fn arrange_by_self_named(&self, name: &str) -> Arranged>> { + pub fn arrange_by_self_named(self, name: &str) -> Arranged>> { self.map(|k| (k, ())) .arrange_named::,KeyBuilder<_,_,_>,_>(name) } @@ -1103,7 +1072,7 @@ pub mod vec { /// .assert_eq(&z); /// }); /// ``` - pub fn join(&self, other: &Collection) -> Collection>::Output> + pub fn join(self, other: Collection) -> Collection>::Output> where K: crate::ExchangeData, V2: crate::ExchangeData, @@ -1130,11 +1099,11 @@ pub mod vec { /// .assert_eq(&z); /// }); /// ``` - pub fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + pub fn join_map(self, other: Collection, mut logic: L) -> Collection>::Output> where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_key(); - arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) + arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied. @@ -1158,11 +1127,11 @@ pub mod vec { /// .assert_eq(&z); /// }); /// ``` - pub fn semijoin(&self, other: &Collection) -> Collection>::Output> + pub fn semijoin(self, other: Collection) -> Collection>::Output> where R: Multiply { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_self(); - arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) + arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone()))) } /// Subtracts the semijoin with `other` from `self`. @@ -1190,9 +1159,9 @@ pub mod vec { /// .assert_eq(&z); /// }); /// ``` - pub fn antijoin(&self, other: &Collection) -> Collection + pub fn antijoin(self, other: Collection) -> Collection where R: Multiply, R: Abelian+'static { - self.concat(&self.semijoin(other).negate()) + self.clone().concat(self.semijoin(other).negate()) } /// Joins two arranged collections with the same key type. @@ -1223,7 +1192,7 @@ pub mod vec { /// .assert_eq(&z); /// }); /// ``` - pub fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> + pub fn join_core (self, stream2: Arranged, result: L) -> Collection>::Output> where Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, R: Multiply, @@ -1239,16 +1208,16 @@ pub mod vec { /// Conversion to a differential dataflow Collection. pub trait AsCollection { /// Converts the type to a differential dataflow collection. - fn as_collection(&self) -> Collection; + fn as_collection(self) -> Collection; } -impl AsCollection for StreamCore { +impl AsCollection for StreamCore { /// Converts the type to a differential dataflow collection. /// /// By calling this method, you guarantee that the timestamp invariant (as documented on /// [Collection]) is upheld. This method will not check it. - fn as_collection(&self) -> Collection { - Collection::::new(self.clone()) + fn as_collection(self) -> Collection { + Collection::::new(self) } } diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index 27217b778..19c5472a6 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -35,16 +35,16 @@ where TOuter: Timestamp, { /// Enters a dynamically created scope which has `level` timestamp coordinates. - pub fn enter_dynamic(&self, _level: usize) -> Self { - (*self).clone() + pub fn enter_dynamic(self, _level: usize) -> Self { + self } /// Leaves a dynamically created scope which has `level` timestamp coordinates. - pub fn leave_dynamic(&self, level: usize) -> Self { + pub fn leave_dynamic(self, level: usize) -> Self { // Create a unary operator that will strip all but `level-1` timestamp coordinates. let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope()); let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - let mut input = builder.new_input_connection(&self.inner, Pipeline, [(0, Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } }))]); + let mut input = builder.new_input_connection(self.inner, Pipeline, [(0, Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } }))]); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); @@ -53,7 +53,7 @@ where let mut vec = std::mem::take(&mut new_time.inner).into_inner(); vec.truncate(level - 1); new_time.inner = PointStamp::new(vec); - let new_cap = cap.delayed(&new_time); + let new_cap = cap.delayed(&new_time, 0); for (_data, time, _diff) in data.iter_mut() { let mut vec = std::mem::take(&mut time.inner).into_inner(); vec.truncate(level - 1); diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index ae0f69eb7..98474038f 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -121,7 +121,7 @@ impl Input for G where ::Timestamp: Lattice { let (handle, stream) = self.new_input(); let source = data.to_stream(self).as_collection(); - (InputSession::from(handle), stream.as_collection().concat(&source)) + (InputSession::from(handle), stream.as_collection().concat(source)) }} /// An input session wrapping a single timely dataflow capability. diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index a2a265664..8ec202f58 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -85,12 +85,12 @@ pub use difference::Abelian as Diff; /// Most differential dataflow operators require the ability to cancel corresponding updates, and the /// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort /// the data, at which point we can consolidate updates for equivalent records. -pub trait Data : timely::Data + Ord + Debug { } -impl Data for T { } +pub trait Data : Ord + Debug + Clone + 'static { } +impl Data for T { } /// Data types exchangeable in differential dataflow. -pub trait ExchangeData : timely::ExchangeData + Ord + Debug { } -impl ExchangeData for T { } +pub trait ExchangeData : timely::ExchangeData + Ord + Debug + Clone + 'static { } +impl ExchangeData for T { } pub mod hashable; pub mod operators; diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 8a1a1fbe3..ea14f7388 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -87,7 +87,7 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>) + pub fn enter<'a, TInner>(self, child: &Child<'a, G, TInner>) -> Arranged, TraceEnter> where TInner: Refines+Lattice+Timestamp+Clone, @@ -102,7 +102,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>) + pub fn enter_region<'a>(self, child: &Child<'a, G, G::Timestamp>) -> Arranged, Tr> { Arranged { stream: self.stream.enter(child), @@ -115,7 +115,7 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P) + pub fn enter_at<'a, TInner, F, P>(self, child: &Child<'a, G, TInner>, logic: F, prior: P) -> Arranged, TraceEnterAt> where TInner: Refines+Lattice+Timestamp+Clone+'static, @@ -135,7 +135,7 @@ where /// The underlying `Stream>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(&self, mut logic: L) -> VecCollection + pub fn as_collection(self, mut logic: L) -> VecCollection where L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static, { @@ -147,7 +147,7 @@ where /// The underlying `Stream>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_vecs(&self) -> VecCollection + pub fn as_vecs(self) -> VecCollection where Tr::KeyOwn: crate::ExchangeData, Tr::ValOwn: crate::ExchangeData, @@ -159,12 +159,12 @@ where /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. - pub fn flat_map_ref(&self, logic: L) -> VecCollection + pub fn flat_map_ref(self, logic: L) -> VecCollection where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, { - Self::flat_map_batches(&self.stream, logic) + Self::flat_map_batches(self.stream, logic) } /// Extracts elements from a stream of batches as a collection. @@ -174,7 +174,7 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: &Stream, mut logic: L) -> VecCollection + pub fn flat_map_batches(stream: Stream, mut logic: L) -> VecCollection where I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, @@ -214,7 +214,7 @@ where /// A convenience method to join and produce `VecCollection` output. /// /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion. - pub fn join_core(&self, other: &Arranged, mut result: L) -> VecCollection>::Output> + pub fn join_core(self, other: Arranged, mut result: L) -> VecCollection>::Output> where T2: for<'a> TraceReader=T1::Key<'a>,Time=T1::Time>+Clone+'static, T1::Diff: Multiply, @@ -249,7 +249,7 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -272,7 +272,7 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(self, name: &str, logic: L) -> Arranged> where T1: TraceReader, T2: for<'a> Trace< @@ -299,7 +299,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn leave_region(&self) -> Arranged { + pub fn leave_region(self) -> Arranged { use timely::dataflow::operators::Leave; Arranged { stream: self.stream.leave(), @@ -309,12 +309,12 @@ where } /// A type that can be arranged as if a collection of updates. -pub trait Arrange +pub trait Arrange : Sized where G: Scope, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(self) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -324,7 +324,7 @@ where } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(self, name: &str) -> Arranged> where Ba: Batcher + 'static, Bu: Builder, @@ -337,7 +337,7 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, P: ParallelizationContract, @@ -398,7 +398,7 @@ where // when we realize that time intervals are complete. input.for_each(|cap, data| { - capabilities.insert(cap.retain()); + capabilities.insert(cap.retain(0)); batcher.push_container(data); }); diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index a2aaed772..335db3776 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -128,7 +128,7 @@ use super::TraceAgent; /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, + stream: Stream, G::Timestamp)>, name: &str, ) -> Arranged> where @@ -149,19 +149,20 @@ where let reader = &mut reader; let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option,G::Timestamp)| (update.0).hashed().into()); + let scope = stream.scope(); stream.unary_frontier(exchange, name, move |_capability, info| { // Acquire a logger for arrange events. - let logger = stream.scope().logger_for::("differential/arrange").map(Into::into); + let logger = scope.logger_for::("differential/arrange").map(Into::into); // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); // Form the trace we will both use internally and publish. - let activator = Some(stream.scope().activator_for(info.address.clone())); + let activator = Some(scope.activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - if let Some(exert_logic) = stream.scope().config().get::("differential/default_exert_logic").cloned() { + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { empty_trace.set_exert_logic(exert_logic); } @@ -180,7 +181,7 @@ where // Stash capabilities and associated data (ordered by time). input.for_each(|cap, data| { - capabilities.insert(cap.retain()); + capabilities.insert(cap.retain(0)); for (key, val, time) in data.drain(..) { priority_queue.push(std::cmp::Reverse((time, key, val))) } diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index c7455bc3e..713c73c86 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -14,7 +14,7 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `count` differential dataflow method. -pub trait CountTotal, K: ExchangeData, R: Semigroup> { +pub trait CountTotal, K: ExchangeData, R: Semigroup> : Sized { /// Counts the number of occurrences of each element. /// /// # Examples @@ -30,7 +30,7 @@ pub trait CountTotal, K: ExchangeDat /// .count_total(); /// }); /// ``` - fn count_total(&self) -> VecCollection { + fn count_total(self) -> VecCollection { self.count_total_core() } @@ -39,14 +39,14 @@ pub trait CountTotal, K: ExchangeDat /// This method allows `count_total` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn count_total_core + 'static>(&self) -> VecCollection; + fn count_total_core + 'static>(self) -> VecCollection; } impl CountTotal for VecCollection where G: Scope, { - fn count_total_core + 'static>(&self) -> VecCollection { + fn count_total_core + 'static>(self) -> VecCollection { self.arrange_by_self_named("Arrange: CountTotal") .count_total_core() } @@ -63,7 +63,7 @@ where >+Clone+'static, K: ExchangeData, { - fn count_total_core + 'static>(&self) -> VecCollection { + fn count_total_core + 'static>(self) -> VecCollection { let mut trace = self.trace.clone(); @@ -85,7 +85,7 @@ where let mut cap = None; input.for_each(|capability, batches| { if cap.is_none() { // NB: Assumes batches are in-order - cap = Some(capability.retain()); + cap = Some(capability.retain(0)); } for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 5dc11042a..8ef46eabd 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -71,15 +71,15 @@ pub trait Iterate, D: Data, R: Semigroup> { /// }); /// }); /// ``` - fn iterate(&self, logic: F) -> VecCollection + fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>; + for<'a> F: FnOnce(VecCollection, D, R>)->VecCollection, D, R>; } impl, D: Ord+Data+Debug, R: Abelian+'static> Iterate for VecCollection { - fn iterate(&self, logic: F) -> VecCollection + fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>, + for<'a> F: FnOnce(VecCollection, D, R>)->VecCollection, D, R>, { self.inner.scope().scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. @@ -89,17 +89,17 @@ impl, D: Ord+Data+Debug, R: Abelian+'static> Iterat // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1)); - let result = logic(&variable); - variable.set(&result); + let result = logic(variable.collection.clone()); + variable.set(result.clone()); result.leave() }) } } impl, D: Ord+Data+Debug, R: Semigroup+'static> Iterate for G { - fn iterate(&self, logic: F) -> VecCollection + fn iterate(self, logic: F) -> VecCollection where - for<'a> F: FnOnce(&VecCollection, D, R>)->VecCollection, D, R>, + for<'a> F: FnOnce(VecCollection, D, R>)->VecCollection, D, R>, { // TODO: This makes me think we have the wrong ownership pattern here. let mut clone = self.clone(); @@ -112,8 +112,8 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1)); - let result = logic(&variable); - variable.set(&result); + let result = logic(variable.collection.clone()); + variable.set(result.clone()); result.leave() } ) @@ -165,7 +165,7 @@ where /// A `Variable` specialized to a vector container of update triples (data, time, diff). pub type VecVariable = Variable::Timestamp, R)>>; -impl Variable +impl Variable where G: Scope, C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<::Summary>, @@ -183,7 +183,7 @@ where /// Creates a new `Variable` from a supplied `source` stream. pub fn new_from(source: Collection, step: ::Summary) -> Self { let (feedback, updates) = source.inner.scope().feedback(step.clone()); - let collection = Collection::::new(updates).concat(&source); + let collection = Collection::::new(updates).concat(source.clone()); Variable { collection, feedback, source: Some(source), step } } @@ -191,12 +191,12 @@ where /// /// This method binds the `Variable` to be equal to the supplied collection, /// which may be recursively defined in terms of the variable itself. - pub fn set(self, result: &Collection) -> Collection { - let mut in_result = result.clone(); + pub fn set(self, result: Collection) -> Collection { + let mut in_result = result; if let Some(source) = &self.source { - in_result = in_result.concat(&source.negate()); + in_result = in_result.concat(source.clone().negate()); } - self.set_concat(&in_result) + self.set_concat(in_result) } /// Set the definition of the `Variable` to a collection concatenated to `self`. @@ -208,7 +208,7 @@ where /// /// This behavior can also be achieved by using `new` to create an empty initial /// collection, and then using `self.set(self.concat(result))`. - pub fn set_concat(self, result: &Collection) -> Collection { + pub fn set_concat(self, result: Collection) -> Collection { let step = self.step; result .results_in(step) @@ -255,7 +255,7 @@ where } /// Adds a new source of data to `self`. - pub fn set(self, result: &Collection) -> Collection { + pub fn set(self, result: Collection) -> Collection { let step = self.step; result .results_in(step) diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 65d353d5c..3ce9cafd1 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -66,7 +66,7 @@ impl, D> PushInto for EffortBuilder { /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. /// /// [`AsCollection`]: crate::collection::AsCollection -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore +pub fn join_traces(arranged1: Arranged, arranged2: Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static, @@ -77,12 +77,13 @@ where // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); let mut trace2 = arranged2.trace.clone(); + let scope = arranged1.stream.scope(); - arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| { + arranged1.stream.binary_frontier(arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| { // Acquire an activator to reschedule the operator when it has unfinished work. use timely::scheduling::Activator; - let activations = arranged1.stream.scope().activations().clone(); + let activations = scope.activations().clone(); let activator = Activator::new(info.address, activations); // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. @@ -165,7 +166,7 @@ where input1.for_each(|capability, data| { // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace2) = trace2_option { - let capability = capability.retain(); + let capability = capability.retain(0); for batch1 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged1, batch1.lower()) { @@ -192,7 +193,7 @@ where input2.for_each(|capability, data| { // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace1) = trace1_option { - let capability = capability.retain(); + let capability = capability.retain(0); for batch2 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged2, batch2.lower()) { diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 7423d6e02..ef9e4a191 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -26,7 +26,7 @@ use crate::trace::TraceReader; /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, T1: TraceReader + Clone + 'static, @@ -38,17 +38,17 @@ where // fabricate a data-parallel operator using the `unary_notify` pattern. let stream = { - + let scope = trace.stream.scope(); let result_trace = &mut result_trace; trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { // Acquire a logger for arrange events. - let logger = trace.stream.scope().logger_for::("differential/arrange").map(Into::into); + let logger = scope.logger_for::("differential/arrange").map(Into::into); - let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone())); + let activator = Some(scope.activator_for(operator_info.address.clone())); let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); // If there is default exert logic set, install it. - if let Some(exert_logic) = trace.stream.scope().config().get::("differential/default_exert_logic").cloned() { + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { empty.set_exert_logic(exert_logic); } @@ -80,7 +80,7 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - let id = trace.stream.scope().index(); + let id = scope.index(); move |(input, _frontier), output| { @@ -123,7 +123,7 @@ where // Ensure that `capabilities` covers the capability of the batch. capabilities.retain(|cap| !capability.time().less_than(cap.time())); if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) { - capabilities.push(capability.retain()); + capabilities.push(capability.retain(0)); } }); diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 39d5b9041..91bbd16fe 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -17,9 +17,9 @@ use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. -pub trait ThresholdTotal, K: ExchangeData, R: ExchangeData+Semigroup> { +pub trait ThresholdTotal, K: ExchangeData, R: ExchangeData+Semigroup> : Sized { /// Reduces the collection to one occurrence of each distinct element. - fn threshold_semigroup(&self, thresh: F) -> VecCollection + fn threshold_semigroup(self, thresh: F) -> VecCollection where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -39,7 +39,7 @@ pub trait ThresholdTotal, K: Exchang /// .threshold_total(|_,c| c % 2); /// }); /// ``` - fn threshold_totalR2+'static>(&self, mut thresh: F) -> VecCollection { + fn threshold_totalR2+'static>(self, mut thresh: F) -> VecCollection { self.threshold_semigroup(move |key, new, old| { let mut new = thresh(key, new); if let Some(old) = old { @@ -69,7 +69,7 @@ pub trait ThresholdTotal, K: Exchang /// .distinct_total(); /// }); /// ``` - fn distinct_total(&self) -> VecCollection { + fn distinct_total(self) -> VecCollection { self.distinct_total_core() } @@ -78,7 +78,7 @@ pub trait ThresholdTotal, K: Exchang /// This method allows `distinct` to produce collections whose difference /// type is something other than an `isize` integer, for example perhaps an /// `i32`. - fn distinct_total_core+'static>(&self) -> VecCollection { + fn distinct_total_core+'static>(self) -> VecCollection { self.threshold_total(|_,_| R2::from(1i8)) } @@ -88,7 +88,7 @@ impl ThresholdTot where G: Scope, { - fn threshold_semigroup(&self, thresh: F) -> VecCollection + fn threshold_semigroup(self, thresh: F) -> VecCollection where R2: Semigroup+'static, F: FnMut(&K,&R,Option<&R>)->Option+'static, @@ -109,7 +109,7 @@ where >+Clone+'static, K: ExchangeData, { - fn threshold_semigroup(&self, mut thresh: F) -> VecCollection + fn threshold_semigroup(self, mut thresh: F) -> VecCollection where R2: Semigroup+'static, F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option+'static, @@ -135,7 +135,7 @@ where let mut cap = None; input.for_each(|capability, batches| { if cap.is_none() { // NB: Assumes batches are in-order - cap = Some(capability.retain()); + cap = Some(capability.retain(0)); } for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 3ef79f2df..5c932af9f 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -239,7 +239,7 @@ pub mod container { use std::marker::PhantomData; use timely::container::{PushInto, SizableContainer}; use timely::progress::frontier::{Antichain, AntichainRef}; - use timely::{Accountable, Data, PartialOrder}; + use timely::{Accountable, PartialOrder}; use timely::container::DrainContainer; use crate::trace::implementations::merge_batcher::Merger; @@ -325,7 +325,7 @@ pub mod container { impl Merger for ContainerMerger where - for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, + for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, CQ: ContainerQueue, { type Time = MC::TimeOwned;