From 50ff048e72e659e05adc3106505ac4a9feba9d5c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Feb 2026 22:47:36 -0500 Subject: [PATCH 1/3] Break Variable into (Variable, Collection) --- differential-dataflow/examples/dynamic.rs | 8 ++- differential-dataflow/examples/graspan.rs | 12 +++-- .../examples/iterate_container.rs | 8 +-- differential-dataflow/examples/monoid-bfs.rs | 5 +- differential-dataflow/examples/pagerank.rs | 6 +-- .../src/algorithms/graphs/bijkstra.rs | 24 ++++----- .../src/algorithms/graphs/propagate.rs | 5 +- .../src/operators/iterate.rs | 53 +++++++------------ mdbook/src/chapter_2/chapter_2_7.md | 8 +-- 9 files changed, 55 insertions(+), 74 deletions(-) diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index 3076efc32..a844c28bb 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -113,20 +113,18 @@ where // Create a variable for label iteration. let inner = feedback_summary::(1, 1); - let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner }); + let (label_bind, label) = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner }); let next = label - .collection() .join_map(edges, |_k,l,d| (*d, l+1)) .concat(nodes) .reduce(|_, s, t| t.push((*s[0].0, 1))) ; - label.set(next.clone()); + label_bind.set(next.clone()); // Leave the dynamic iteration, stripping off the last timestamp coordinate. - next - .leave_dynamic(1) + next.leave_dynamic(1) .inspect(|x| println!("{:?}", x)) .leave() }) diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index f061f6dfe..79c7fd184 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -81,6 +81,7 @@ type Arrange = Arranged::Tim /// completely defined, in support of recursively defined productions. pub struct EdgeVariable> { variable: VecVariable, + collection: VecCollection, current: VecCollection, forward: Option>, reverse: Option>, @@ -89,9 +90,10 @@ pub struct EdgeVariable> { impl> EdgeVariable { /// Creates a new variable initialized with `source`. pub fn from(source: VecCollection, step: ::Summary) -> Self { - let variable = VecVariable::new(&mut source.scope(), step); + let (variable, collection) = VecVariable::new(&mut source.scope(), step); EdgeVariable { - variable: variable, + variable, + collection, current: source.clone(), forward: None, reverse: None, @@ -113,14 +115,14 @@ impl> EdgeVariable { /// The collection arranged in the forward direction. pub fn forward(&mut self) -> &Arrange { if self.forward.is_none() { - self.forward = Some(self.variable.collection().arrange_by_key()); + self.forward = Some(self.collection.clone().arrange_by_key()); } self.forward.as_ref().unwrap() } /// The collection arranged in the reverse direction. pub fn reverse(&mut self) -> &Arrange { if self.reverse.is_none() { - self.reverse = Some(self.variable.collection().map(|(x,y)| (y,x)).arrange_by_key()); + self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key()); } self.reverse.as_ref().unwrap() } @@ -169,7 +171,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.variable.collection().leave().arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave().arrange_by_self().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index 223382bd0..f7ef3764e 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -55,8 +55,8 @@ fn main() { scope.iterative::(|nested| { let summary = Product::new(Default::default(), 1); - let variable = Variable::new_from(numbers.enter(nested), summary); - let mapped: Collection<_, _> = variable.collection().inner.unary(Pipeline, "Map", |_,_| { + let (variable, collection) = Variable::new_from(numbers.enter(nested), summary); + let mapped: Collection<_, _> = collection.clone().inner.unary(Pipeline, "Map", |_,_| { |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); @@ -76,8 +76,8 @@ fn main() { } }).as_collection().consolidate(); let result = wrap(result.inner).as_collection(); - variable.set(result) - .leave() + variable.set(result); + collection.leave() }); }) } diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 8720ff8fb..07d0cee36 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -133,14 +133,13 @@ where use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder}; use timely::order::Product; - let variable = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let (variable, collection) = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(scope); let roots = roots.enter(scope); let result = - variable - .collection() + collection .map(|n| (n,())) .join_map(edges, |_k,&(),d| *d) .concat(roots) diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index 5c9633191..d779ed36e 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -105,11 +105,11 @@ where // Define a recursive variable to track surfers. // We start from `inits` and cycle only `iters`. - let ranks = Variable::new_from(inits, Product::new(Default::default(), 1)); + let (ranks_bind, ranks) = Variable::new_from(inits, Product::new(Default::default(), 1)); // Match each surfer with the degree, scale numbers down. let to_push = - degrs.semijoin(ranks.collection()) + degrs.semijoin(ranks) .threshold(|(_node, degr), rank| (5 * rank) / (6 * degr)) .map(|(node, _degr)| node); @@ -129,7 +129,7 @@ where } // Bind the recursive variable, return its limit. - ranks.set(pushed.clone()); + ranks_bind.set(pushed.clone()); pushed.leave() }) } diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index e4c556d3a..9c61716ef 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -55,11 +55,11 @@ where // is a corresponding destination or source that has not yet been reached. // forward and reverse (node, (root, dist)) - let forward = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1)); - let reverse = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1)); + let (forward_bind, forward) = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1)); + let (reverse_bind, reverse) = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1)); - forward.collection().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x)); - reverse.collection().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x)); + forward.clone().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x)); + reverse.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x)); let goals = goals.enter(inner); // let edges = edges.enter(inner); @@ -71,8 +71,8 @@ where // This is a cyclic join, which should scare us a bunch. let reached = forward - .collection() - .join_map(reverse.collection(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2)) + .clone() + .join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) .semijoin(goals.clone()); @@ -88,37 +88,37 @@ where let forward_active = active.clone().map(|(x,_y)| x).distinct(); let forward_next = forward - .collection() + .clone() .map(|(med, (src, dist))| (src, (med, dist))) .semijoin(forward_active) .map(|(src, (med, dist))| (med, (src, dist))) .join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1)))) - .concat(forward.collection()) + .concat(forward) .map(|(next, (src, dist))| ((next, src), dist)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) .map(|((next, src), dist)| (next, (src, dist))); forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x)); - forward.set(forward_next); + forward_bind.set(forward_next); // Let's expand out reverse queries that are active. let reverse_active = active.map(|(_x,y)| y).distinct(); let reverse_next = reverse - .collection() + .clone() .map(|(med, (rev, dist))| (rev, (med, dist))) .semijoin(reverse_active) .map(|(rev, (med, dist))| (med, (rev, dist))) .join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1)))) - .concat(reverse.collection()) + .concat(reverse) .map(|(next, (rev, dist))| ((next, rev), dist)) .reduce(|_key, s, t| t.push((*s[0].0, 1))) .map(|((next,rev), dist)| (next, (rev, dist))); reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x)); - reverse.set(reverse_next); + reverse_bind.set(reverse_next); reached.leave() }) diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index eaff45322..dc88a9bee 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -87,11 +87,10 @@ where let edges = edges.enter(scope); let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize)); - let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize)); + let (proposals_bind, proposals) = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize)); let labels = proposals - .collection() .concat(nodes) .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8)))); @@ -100,7 +99,7 @@ where .clone() .join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone()))); - proposals.set(propagate); + proposals_bind.set(propagate); labels .as_collection(|k,v| (k.clone(), v.clone())) diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index cc2c8471f..00f01c3ac 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -90,8 +90,8 @@ impl, D: Ord+Data+Debug, R: Abelian+'static> Iterat // wrapped by `variable`, but it also results in substantially more // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. - let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1)); - let result = logic(subgraph.clone(), variable.collection()); + let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1)); + let result = logic(subgraph.clone(), collection); variable.set(result.clone()); result.leave() }) @@ -110,8 +110,8 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter // wrapped by `variable`, but it also results in substantially more // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. - let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1)); - let result = logic(subgraph.clone(), variable.collection()); + let (variable, collection) = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1)); + let result = logic(subgraph.clone(), collection); variable.set(result.clone()); result.leave() } @@ -142,12 +142,11 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter /// /// scope.iterative::(|nested| { /// let summary = Product::new(Default::default(), 1); -/// let variable = Variable::new_from(numbers.enter(nested), summary); -/// let result = variable.collection() -/// .map(|x| if x % 2 == 0 { x/2 } else { x }) -/// .consolidate(); -/// variable.set(result) -/// .leave() +/// let (variable, collection) = Variable::new_from(numbers.enter(nested), summary); +/// let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x }) +/// .consolidate(); +/// variable.set(result.clone()); +/// result.leave() /// }); /// }) /// ``` @@ -156,7 +155,6 @@ where G: Scope, C: Container, { - collection: Collection, feedback: Handle, source: Option>, step: ::Summary, @@ -174,24 +172,24 @@ where /// /// This method produces a simpler dataflow graph than `new_from`, and should /// be used whenever the variable has an empty input. - pub fn new(scope: &mut G, step: ::Summary) -> Self { + pub fn new(scope: &mut G, step: ::Summary) -> (Self, Collection) { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::::new(updates); - Self { collection, feedback, source: None, step } + (Self { feedback, source: None, step }, collection) } /// Creates a new `Variable` from a supplied `source` stream. - pub fn new_from(source: Collection, step: ::Summary) -> Self where C: Clone { + pub fn new_from(source: Collection, step: ::Summary) -> (Self, Collection) where C: Clone { let (feedback, updates) = source.inner.scope().feedback(step.clone()); let collection = Collection::::new(updates).concat(source.clone()); - Variable { collection, feedback, source: Some(source), step } + (Variable { feedback, source: Some(source), step }, collection) } /// Set the definition of the `Variable` to a collection. /// /// This method binds the `Variable` to be equal to the supplied collection, /// which may be recursively defined in terms of the variable itself. - pub fn set(mut self, mut result: Collection) -> Collection where C: Clone { + pub fn set(mut self, mut result: Collection) { if let Some(source) = self.source.take() { result = result.concat(source.negate()); } @@ -207,20 +205,13 @@ where /// /// This behavior can also be achieved by using `new` to create an empty initial /// collection, and then using `self.set(self.concat(result))`. - pub fn set_concat(self, result: Collection) -> Collection { + pub fn set_concat(self, result: Collection) { let step = self.step; result .results_in(step) .inner .connect_loop(self.feedback); - - self.collection } - - /// Acquires a copy of the collection the variable presents as. - /// - /// This is the intended way to use the contents of the `Variable`. - pub fn collection(&self) -> Collection where C: Clone { self.collection.clone() } } /// A recursively defined collection that only "grows". @@ -234,7 +225,6 @@ where G: Scope, C: Container, { - collection: Collection, feedback: Handle, step: ::Summary, } @@ -245,25 +235,18 @@ where C: crate::collection::containers::ResultsIn<::Summary>, { /// Creates a new initially empty `SemigroupVariable`. - pub fn new(scope: &mut G, step: ::Summary) -> Self { + pub fn new(scope: &mut G, step: ::Summary) -> (Self, Collection) { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::::new(updates); - SemigroupVariable { collection, feedback, step } + (SemigroupVariable { feedback, step }, collection) } /// Adds a new source of data to `self`. - pub fn set(self, result: Collection) -> Collection { + pub fn set(self, result: Collection) { let step = self.step; result .results_in(step) .inner .connect_loop(self.feedback); - - self.collection } - - /// Acquires a copy of the collection the variable presents as. - /// - /// This is the intended way to use the contents of the `SemigroupVariable`. - pub fn collection(&self) -> Collection where C: Clone { self.collection.clone() } } diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index afa7f8437..def1c4dcd 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -90,17 +90,17 @@ As an example, the implementation of the `iterate` operator looks something like # use differential_dataflow::VecCollection; # use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; -# fn logic<'a, G: Scope>(variable: &VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)> +# fn logic<'a, G: Scope>(collection: VecCollection, (u64, u64), isize>) -> VecCollection, (u64, u64)> # where G::Timestamp: Lattice # { -# (*variable).collection() +# collection # } # fn example<'a, G: Scope>(collection: VecCollection) //, logic: impl Fn(&VecVariable, (u64, u64), isize>) -> VecCollection, (u64, u64)>) # where G::Timestamp: Lattice # { collection.scope().scoped("inner", |subgraph| { - let variable = VecVariable::new_from(collection.enter(subgraph), 1); - let result = logic(&variable); + let (variable, collection) = VecVariable::new_from(collection.enter(subgraph), 1); + let result = logic(collection); variable.set(result.clone()); result.leave() }); From e21b1c3119c847a0ed30d25fef2df87d0ac0ba47 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 24 Feb 2026 23:07:10 -0500 Subject: [PATCH 2/3] Retire SemigroupVariable --- differential-dataflow/examples/monoid-bfs.rs | 4 +- .../src/algorithms/graphs/propagate.rs | 4 +- .../src/operators/iterate.rs | 63 ++----------------- experiments/src/bin/deals.rs | 6 +- experiments/src/bin/graphs-static.rs | 8 +-- experiments/src/bin/graspan1.rs | 4 +- experiments/src/bin/graspan2.rs | 10 +-- 7 files changed, 24 insertions(+), 75 deletions(-) diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 07d0cee36..cce06674e 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -129,11 +129,11 @@ where // repeatedly update minimal distances each node can be reached from each root roots.scope().iterative::(|scope| { - use differential_dataflow::operators::iterate::SemigroupVariable; + use differential_dataflow::operators::iterate::Variable; use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder}; use timely::order::Product; - let (variable, collection) = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let (variable, collection) = Variable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(scope); let roots = roots.enter(scope); diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index dc88a9bee..3d5d8c65c 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -79,7 +79,7 @@ where nodes.scope().iterative::(|scope| { - use crate::operators::iterate::SemigroupVariable; + use crate::operators::iterate::Variable; use crate::trace::implementations::{ValBuilder, ValSpine}; use timely::order::Product; @@ -87,7 +87,7 @@ where let edges = edges.enter(scope); let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize)); - let (proposals_bind, proposals) = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize)); + let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize)); let labels = proposals diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 00f01c3ac..042eae59b 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -110,7 +110,7 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter // wrapped by `variable`, but it also results in substantially more // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. - let (variable, collection) = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1)); + let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1)); let result = logic(subgraph.clone(), collection); variable.set(result.clone()); result.leave() @@ -166,7 +166,7 @@ pub type VecVariable = Variable::Timestam impl Variable where G: Scope, - C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<::Summary>, + C: crate::collection::containers::ResultsIn<::Summary>, { /// Creates a new initially empty `Variable`. /// @@ -179,10 +179,10 @@ where } /// Creates a new `Variable` from a supplied `source` stream. - pub fn new_from(source: Collection, step: ::Summary) -> (Self, Collection) where C: Clone { + pub fn new_from(source: Collection, step: ::Summary) -> (Self, Collection) where C: Clone + crate::collection::containers::Negate { let (feedback, updates) = source.inner.scope().feedback(step.clone()); let collection = Collection::::new(updates).concat(source.clone()); - (Variable { feedback, source: Some(source), step }, collection) + (Variable { feedback, source: Some(source.negate()), step }, collection) } /// Set the definition of the `Variable` to a collection. @@ -191,61 +191,10 @@ where /// which may be recursively defined in terms of the variable itself. pub fn set(mut self, mut result: Collection) { if let Some(source) = self.source.take() { - result = result.concat(source.negate()); + result = result.concat(source); } - self.set_concat(result) - } - - /// Set the definition of the `Variable` to a collection concatenated to `self`. - /// - /// This method is a specialization of `set` which has the effect of concatenating - /// `result` and `self` before calling `set`. This method avoids some dataflow - /// complexity related to retracting the initial input, and will do less work in - /// that case. - /// - /// This behavior can also be achieved by using `new` to create an empty initial - /// collection, and then using `self.set(self.concat(result))`. - pub fn set_concat(self, result: Collection) { - let step = self.step; - result - .results_in(step) - .inner - .connect_loop(self.feedback); - } -} - -/// A recursively defined collection that only "grows". -/// -/// `SemigroupVariable` is a weakening of `Variable` to allow difference types -/// that do not implement `Abelian` and only implement `Semigroup`. This means -/// that it can be used in settings where the difference type does not support -/// negation. -pub struct SemigroupVariable -where - G: Scope, - C: Container, -{ - feedback: Handle, - step: ::Summary, -} - -impl SemigroupVariable -where - G: Scope, - C: crate::collection::containers::ResultsIn<::Summary>, -{ - /// Creates a new initially empty `SemigroupVariable`. - pub fn new(scope: &mut G, step: ::Summary) -> (Self, Collection) { - let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::::new(updates); - (SemigroupVariable { feedback, step }, collection) - } - - /// Adds a new source of data to `self`. - pub fn set(self, result: Collection) { - let step = self.step; result - .results_in(step) + .results_in(self.step) .inner .connect_loop(self.feedback); } diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index e2457ebbd..897b3597e 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -10,7 +10,7 @@ use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatch use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::operators::iterate::SemigroupVariable; +use differential_dataflow::operators::iterate::Variable; use differential_dataflow::difference::Present; type EdgeArranged = Arranged::Timestamp, R>>>; @@ -88,7 +88,7 @@ fn tc>(edges: &EdgeArranged) -> V // repeatedly update minimal distances each node can be reached from each root edges.stream.scope().iterative::(|scope| { - let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let inner = Variable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(&inner.scope()); let result = @@ -115,7 +115,7 @@ fn sg>(edges: &EdgeArranged) -> V // repeatedly update minimal distances each node can be reached from each root peers.scope().iterative::(|scope| { - let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let inner = Variable::new(scope, Product::new(Default::default(), 1)); let edges = edges.enter(&inner.scope()); let peers = peers.enter(&inner.scope()); diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 3493de8a3..09eab9559 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -6,7 +6,7 @@ use timely::dataflow::operators::ToStream; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; use differential_dataflow::operators::*; -use differential_dataflow::operators::iterate::SemigroupVariable; +use differential_dataflow::operators::iterate::Variable; use differential_dataflow::AsCollection; use graph_map::GraphMMap; @@ -118,7 +118,7 @@ fn reach> ( let graph = graph.enter(scope); let roots = roots.enter(scope); - let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let inner = Variable::new(scope, Product::new(Default::default(), 1)); let result = graph.join_core(inner.arrange_by_self(), |_src,&dst,&()| Some(dst)) @@ -144,7 +144,7 @@ fn bfs> ( let graph = graph.enter(scope); let roots = roots.enter(scope); - let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let inner = Variable::new(scope, Product::new(Default::default(), 1)); let result = graph.join_core(inner.arrange_by_key(), |_src,&dest,&dist| [(dest, dist+1)]) .concat(roots) @@ -176,7 +176,7 @@ fn connected_components>( let reverse = reverse.enter(scope); let nodes = nodes.enter(scope); - let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let inner = Variable::new(scope, Product::new(Default::default(), 1)); let labels = inner.arrange_by_key(); let f_prop = labels.join_core(forward, |_k,l,d| Some((*d,*l))); diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index be5e5b930..0fdcdb5fb 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -9,7 +9,7 @@ use differential_dataflow::input::Input; use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::operators::iterate::SemigroupVariable; +use differential_dataflow::operators::iterate::Variable; type Node = u32; type Time = (); @@ -41,7 +41,7 @@ fn main() { let nodes = nodes.enter(inner).map(|(a,b)| (b,a)); let edges = edges.enter(inner); - let labels = SemigroupVariable::new(inner, Product::new(Default::default(), 1)); + let labels = Variable::new(inner, Product::new(Default::default(), 1)); let next = labels.join_core(edges, |_b, a, c| Some((*c, *a))) diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 7cd9d0dd7..f36db2316 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -4,7 +4,7 @@ use std::fs::File; use timely::dataflow::Scope; use timely::order::Product; -use differential_dataflow::operators::iterate::SemigroupVariable; +use differential_dataflow::operators::iterate::Variable; use differential_dataflow::VecCollection; use differential_dataflow::input::Input; @@ -57,8 +57,8 @@ fn unoptimized() { let assignment = assignment.enter(scope); let dereference = dereference.enter(scope); - let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let value_flow = Variable::new(scope, Product::new(Default::default(), 1)); + let memory_alias = Variable::new(scope, Product::new(Default::default(), 1)); let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); @@ -182,8 +182,8 @@ fn optimized() { let assignment = assignment.enter(scope); let dereference = dereference.enter(scope); - let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); + let value_flow = Variable::new(scope, Product::new(Default::default(), 1)); + let memory_alias = Variable::new(scope, Product::new(Default::default(), 1)); let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); From f62da857a48af8e5080110d3d5f9b3fb53e91fa7 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 25 Feb 2026 13:07:35 -0500 Subject: [PATCH 3/3] Improve and extend documentation --- .../src/operators/iterate.rs | 76 ++++++++++++++++++- 1 file changed, 72 insertions(+), 4 deletions(-) diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 042eae59b..e5486bb6e 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -150,6 +150,43 @@ impl, D: Ord+Data+Debug, R: Semigroup+'static> Iter /// }); /// }) /// ``` +/// +/// Variables support iterative patterns that can be both more flexible, and more efficient. +/// +/// Mutual recursion is when one defines multiple variables in the same iterative context, +/// and their definitions are not independent. For example, odd numbers and even numbers +/// can be determined from each other, iteratively. +/// ``` +/// use timely::order::Product; +/// use timely::dataflow::Scope; +/// +/// use differential_dataflow::input::Input; +/// use differential_dataflow::operators::iterate::Variable; +/// +/// ::timely::example(|scope| { +/// +/// let numbers = scope.new_collection_from(10 .. 20u32).1; +/// +/// scope.iterative::(|nested| { +/// let summary = Product::new(Default::default(), 1); +/// let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary); +/// let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary); +/// odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct()); +/// even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct()); +/// }); +/// }) +/// ``` +/// +/// Direct construction can be more efficient than `iterate` when you know a way to directly +/// determine the changes to make to the initial collection, rather than simply adding that +/// collection, running your intended logic, and then subtracting the collection. +/// +/// An an example, the logic in `identifiers.rs` looks for hash collisions, and tweaks the salt +/// for all but one element in each group of collisions. Most elements do not collide, and we +/// we don't need to circulate the non-colliding elements to confirm that they subtract away. +/// By iteratively developing a variable of the *edits* to the input, we can produce and circulate +/// a smaller volume of updates. This can be especially impactful when the initial collection is +/// large, and the edits to perform are relatively smaller. pub struct Variable where G: Scope, @@ -168,17 +205,48 @@ where G: Scope, C: crate::collection::containers::ResultsIn<::Summary>, { - /// Creates a new initially empty `Variable`. + /// Creates a new initially empty `Variable` and its associated `Collection`. /// - /// This method produces a simpler dataflow graph than `new_from`, and should - /// be used whenever the variable has an empty input. + /// The collection should be used, along with other potentially recursive collections, + /// to define a output collection to which the variable is then `set`. + /// In an iterative context, each collection starts empty and are repeatedly updated by + /// the logic used to produce the collection their variable is bound to. This process + /// continues until no changes occur, at which point we have reached a fixed point (or + /// the range of timestamps have been exhausted). Calling `leave()` on any collection + /// will produce its fixed point in the outer scope. + /// + /// In a non-iterative scope the mechanics are the same, but the interpretation varies. pub fn new(scope: &mut G, step: ::Summary) -> (Self, Collection) { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::::new(updates); (Self { feedback, source: None, step }, collection) } - /// Creates a new `Variable` from a supplied `source` stream. + /// Creates a new `Variable` and its associated `Collection`, initially `source`. + /// + /// This method is a short-cut for a pattern that one can write manually with `new()`, + /// but which is easy enough to get wrong that the help is valuable. + /// + /// This pattern uses a variable `x` to develop `x = logic(x + source) - source`, + /// which finds a fixed point `x` that satisfies `x + source = logic(x + source)`. + /// The fixed point equals the repeated application of `logic` to `source` plus the + /// + /// To implement the pattern one would create a new initially empty variable with `new()`, + /// then concatenate `source` into that collection, and use it as `logic` dictates. + /// Just before the variable is set to the result collection, `source` is subtracted. + /// + /// If using this pattern manually, it is important to bear in mind that the collection + /// that result from `logic` converges to its fixed point, but that once `source` is + /// subtracted the collection converges to this limit minus `source`, a collection that + /// may have records that accumulate to negative multiplicities, and for which the model + /// of them as "data sets" may break down. Be careful when applying non-linear operations + /// like `reduce` that they make sense when updates may have non-positive differences. + /// + /// Finally, implementing this pattern manually has the ability to more directly implement + /// the logic `x = logic(x + source) - source`. If there is a different mechanism than just + /// adding the source, doing the logic, then subtracting the source, it is appropriate to do. + /// For example, if the logic modifies a few records it is possible to produce this update + /// directly without using the backstop implementation this method provides. pub fn new_from(source: Collection, step: ::Summary) -> (Self, Collection) where C: Clone + crate::collection::containers::Negate { let (feedback, updates) = source.inner.scope().feedback(step.clone()); let collection = Collection::::new(updates).concat(source.clone());