Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,18 @@ where

// Create a variable for label iteration.
let inner = feedback_summary::<usize>(1, 1);
let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });
let (label_bind, label) = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });

let next =
label
.collection()
.join_map(edges, |_k,l,d| (*d, l+1))
.concat(nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
;

label.set(next.clone());
label_bind.set(next.clone());
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
next
.leave_dynamic(1)
next.leave_dynamic(1)
.inspect(|x| println!("{:?}", x))
.leave()
})
Expand Down
12 changes: 7 additions & 5 deletions differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Tim
/// completely defined, in support of recursively defined productions.
pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
variable: VecVariable<G, Edge, Diff>,
collection: VecCollection<G, Edge, Diff>,
current: VecCollection<G, Edge, Diff>,
forward: Option<Arrange<G, Node, Node, Diff>>,
reverse: Option<Arrange<G, Node, Node, Diff>>,
Expand All @@ -89,9 +90,10 @@ pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
/// Creates a new variable initialized with `source`.
pub fn from(source: VecCollection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let variable = VecVariable::new(&mut source.scope(), step);
let (variable, collection) = VecVariable::new(&mut source.scope(), step);
EdgeVariable {
variable: variable,
variable,
collection,
current: source.clone(),
forward: None,
reverse: None,
Expand All @@ -113,14 +115,14 @@ impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
/// The collection arranged in the forward direction.
pub fn forward(&mut self) -> &Arrange<G, Node, Node, Diff> {
if self.forward.is_none() {
self.forward = Some(self.variable.collection().arrange_by_key());
self.forward = Some(self.collection.clone().arrange_by_key());
}
self.forward.as_ref().unwrap()
}
/// The collection arranged in the reverse direction.
pub fn reverse(&mut self) -> &Arrange<G, Node, Node, Diff> {
if self.reverse.is_none() {
self.reverse = Some(self.variable.collection().map(|(x,y)| (y,x)).arrange_by_key());
self.reverse = Some(self.collection.clone().map(|(x,y)| (y,x)).arrange_by_key());
}
self.reverse.as_ref().unwrap()
}
Expand Down Expand Up @@ -169,7 +171,7 @@ impl Query {
// create variables and result handles for each named relation.
for (name, (input, collection)) in input_map {
let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1));
let trace = edge_variable.variable.collection().leave().arrange_by_self().trace;
let trace = edge_variable.collection.clone().leave().arrange_by_self().trace;
result_map.insert(name.clone(), RelationHandles { input, trace });
variable_map.insert(name.clone(), edge_variable);
}
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/examples/iterate_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ fn main() {

scope.iterative::<u64,_,_>(|nested| {
let summary = Product::new(Default::default(), 1);
let variable = Variable::new_from(numbers.enter(nested), summary);
let mapped: Collection<_, _> = variable.collection().inner.unary(Pipeline, "Map", |_,_| {
let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
let mapped: Collection<_, _> = collection.clone().inner.unary(Pipeline, "Map", |_,_| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
Expand All @@ -76,8 +76,8 @@ fn main() {
}
}).as_collection().consolidate();
let result = wrap(result.inner).as_collection();
variable.set(result)
.leave()
variable.set(result);
collection.leave()
});
})
}
7 changes: 3 additions & 4 deletions differential-dataflow/examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,17 @@ where
// repeatedly update minimal distances each node can be reached from each root
roots.scope().iterative::<u32,_,_>(|scope| {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder};

use timely::order::Product;
let variable = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let (variable, collection) = Variable::new(scope, Product::new(Default::default(), 1));

let edges = edges.enter(scope);
let roots = roots.enter(scope);

let result =
variable
.collection()
collection
.map(|n| (n,()))
.join_map(edges, |_k,&(),d| *d)
.concat(roots)
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ where

// Define a recursive variable to track surfers.
// We start from `inits` and cycle only `iters`.
let ranks = Variable::new_from(inits, Product::new(Default::default(), 1));
let (ranks_bind, ranks) = Variable::new_from(inits, Product::new(Default::default(), 1));

// Match each surfer with the degree, scale numbers down.
let to_push =
degrs.semijoin(ranks.collection())
degrs.semijoin(ranks)
.threshold(|(_node, degr), rank| (5 * rank) / (6 * degr))
.map(|(node, _degr)| node);

Expand All @@ -129,7 +129,7 @@ where
}

// Bind the recursive variable, return its limit.
ranks.set(pushed.clone());
ranks_bind.set(pushed.clone());
pushed.leave()
})
}
24 changes: 12 additions & 12 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ where
// is a corresponding destination or source that has not yet been reached.

// forward and reverse (node, (root, dist))
let forward = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let reverse = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let (forward_bind, forward) = Variable::new_from(goals.clone().map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
let (reverse_bind, reverse) = Variable::new_from(goals.clone().map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));

forward.collection().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
reverse.collection().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
forward.clone().map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
reverse.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));

let goals = goals.enter(inner);
// let edges = edges.enter(inner);
Expand All @@ -71,8 +71,8 @@ where
// This is a cyclic join, which should scare us a bunch.
let reached =
forward
.collection()
.join_map(reverse.collection(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.clone()
.join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(goals.clone());

Expand All @@ -88,37 +88,37 @@ where
let forward_active = active.clone().map(|(x,_y)| x).distinct();
let forward_next =
forward
.collection()
.clone()
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat(forward.collection())
.concat(forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));

forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));

forward.set(forward_next);
forward_bind.set(forward_next);

// Let's expand out reverse queries that are active.
let reverse_active = active.map(|(_x,y)| y).distinct();
let reverse_next =
reverse
.collection()
.clone()
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(reverse.collection())
.concat(reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));

reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));

reverse.set(reverse_next);
reverse_bind.set(reverse_next);

reached.leave()
})
Expand Down
7 changes: 3 additions & 4 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,18 @@ where

nodes.scope().iterative::<usize,_,_>(|scope| {

use crate::operators::iterate::SemigroupVariable;
use crate::operators::iterate::Variable;
use crate::trace::implementations::{ValBuilder, ValSpine};

use timely::order::Product;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));

let labels =
proposals
.collection()
.concat(nodes)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

Expand All @@ -100,7 +99,7 @@ where
.clone()
.join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));

proposals.set(propagate);
proposals_bind.set(propagate);

labels
.as_collection(|k,v| (k.clone(), v.clone()))
Expand Down
Loading
Loading