Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ resolver = "2"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.19.1" }
timely = { version = "0.26", default-features = false }
#timely = { version = "0.26", default-features = false }
columnar = { version = "0.11", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }
timely = { path = "../timely-dataflow/timely/", default-features = false }

[profile.release]
opt-level = 3
Expand Down
35 changes: 18 additions & 17 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@ use crate::operators::iterate::Variable;
/// Goals that cannot reach from the source to the target are relatively expensive, as
/// the entire graph must be explored to confirm this. A graph connectivity pre-filter
/// could be good insurance here.
pub fn bidijkstra<G, N>(edges: &VecCollection<G, (N,N)>, goals: &VecCollection<G, (N,N)>) -> VecCollection<G, ((N,N), u32)>
pub fn bidijkstra<G, N>(edges: VecCollection<G, (N,N)>, goals: VecCollection<G, (N,N)>) -> VecCollection<G, ((N,N), u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
N: ExchangeData+Hash,
{
let forward = edges.arrange_by_key();
let forward = edges.clone().arrange_by_key();
let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
bidijkstra_arranged(&forward, &reverse, goals)
bidijkstra_arranged(forward, reverse, goals)
}

use crate::trace::TraceReader;
use crate::operators::arrange::Arranged;

/// Bi-directional Dijkstra search using arranged forward and reverse edge collections.
pub fn bidijkstra_arranged<G, N, Tr>(
forward: &Arranged<G, Tr>,
reverse: &Arranged<G, Tr>,
goals: &VecCollection<G, (N,N)>
forward: Arranged<G, Tr>,
reverse: Arranged<G, Tr>,
goals: VecCollection<G, (N,N)>
) -> VecCollection<G, ((N,N), u32)>
where
G: Scope<Timestamp=Tr::Time>,
Expand Down Expand Up @@ -71,50 +71,51 @@ where
// This is a cyclic join, which should scare us a bunch.
let reached =
forward
.join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.join_map((&*reverse).clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(&goals);
.semijoin(goals);

let active =
reached
.negate()
.map(|(srcdst,_)| srcdst)
.concat(&goals)
.concat(goals)
.consolidate();

// Let's expand out forward queries that are active.
let forward_active = active.map(|(x,_y)| x).distinct();
let forward_next =
forward
.clone()
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(&forward_active)
.semijoin(forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat(&forward)
.join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat((&*forward).clone())
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));

forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));

forward.set(&forward_next);
forward.set(forward_next);

// Let's expand out reverse queries that are active.
let reverse_active = active.map(|(_x,y)| y).distinct();
let reverse_next =
reverse
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(&reverse_active)
.semijoin(reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(&reverse)
.join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));

reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));

reverse.set(&reverse_next);
reverse.set(reverse_next);

reached.leave()
})
Expand Down
16 changes: 8 additions & 8 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::difference::{Abelian, Multiply};
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
pub fn propagate<G, N, L, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
Expand All @@ -22,15 +22,15 @@ where
R: From<i8>,
L: ExchangeData,
{
propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
propagate_core(edges.arrange_by_key(), nodes, |_label| 0)
}

/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: &VecCollection<G, (N,N), R>, nodes: &VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
pub fn propagate_at<G, N, L, F, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
Expand All @@ -40,7 +40,7 @@ where
L: ExchangeData,
F: Fn(&L)->u64+Clone+'static,
{
propagate_core(&edges.arrange_by_key(), nodes, logic)
propagate_core(edges.arrange_by_key(), nodes, logic)
}

use crate::trace::TraceReader;
Expand All @@ -51,7 +51,7 @@ use crate::operators::arrange::arrangement::Arranged;
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
/// a method `logic` to specify the rounds in which we introduce various labels. The output
/// of `logic should be a number in the interval \[0,64\],
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
pub fn propagate_core<G, N, L, Tr, F, R>(edges: Arranged<G,Tr>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Expand Down Expand Up @@ -91,14 +91,14 @@ where

let labels =
proposals
.concat(&nodes)
.concat(nodes)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: VecCollection<_, (N, L), R> =
labels
.join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
.join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));

proposals.set(&propagate);
proposals.set(propagate);

labels
.as_collection(|k,v| (k.clone(), v.clone()))
Expand Down
18 changes: 9 additions & 9 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
{
graph.iterate(|edges| {
graph.clone().iterate(|edges| {
// keep edges from active edge destinations.
let active =
edges.map(|(_src,dst)| dst)
.threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });

graph.enter(&edges.scope())
.semijoin(&active)
.semijoin(active)
})
}

/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: &VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData + Hash,
Expand All @@ -43,12 +43,12 @@ where
{
graph.iterate(|inner| {
let edges = graph.enter(&inner.scope());
let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
trim_edges(&trim_edges(inner, &edges), &trans)
let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
trim_edges(trim_edges(inner, edges), trans)
})
}

fn trim_edges<G, N, R>(cycle: &VecCollection<G, (N,N), R>, edges: &VecCollection<G, (N,N), R>)
fn trim_edges<G, N, R>(cycle: VecCollection<G, (N,N), R>, edges: VecCollection<G, (N,N), R>)
-> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
Expand All @@ -62,10 +62,10 @@ where

// NOTE: With a node -> int function, can be improved by:
// let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
let labels = propagate(cycle, &nodes);
let labels = propagate(cycle, nodes);

edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
.filter(|(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
}
22 changes: 11 additions & 11 deletions differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use crate::lattice::Lattice;
use crate::operators::*;
use crate::hashable::Hashable;

fn _color<G, N>(edges: &VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
fn _color<G, N>(edges: VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
N: ExchangeData+Hash,
{
// need some bogus initial values.
let start = edges.map(|(x,_y)| (x,u32::max_value()))
let start = edges.clone().map(|(x,_y)| (x,u32::max_value()))
.distinct();

// repeatedly apply color-picking logic.
sequence(&start, edges, |_node, vals| {
sequence(start, edges, |_node, vals| {

// look for the first absent positive integer.
// start at 1 in case we ever use NonZero<u32>.
Expand All @@ -40,8 +40,8 @@ where
/// fired, and we apply `logic` to the new state of lower neighbors and
/// the old state (input) of higher neighbors.
pub fn sequence<G, N, V, F>(
state: &VecCollection<G, (N,V)>,
edges: &VecCollection<G, (N,N)>,
state: VecCollection<G, (N,V)>,
edges: VecCollection<G, (N,N)>,
logic: F) -> VecCollection<G, (N,Option<V>)>
where
G: Scope<Timestamp: Lattice+Hash+Ord>,
Expand All @@ -60,18 +60,18 @@ where
// .map(|x| (x.0, Some(x.1)));

// break edges into forward and reverse directions.
let forward = edges.filter(|edge| edge.0 < edge.1);
let forward = edges.clone().filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);

// new state goes along forward edges, old state along reverse edges
let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone()));
let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone()));

let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));

let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone()));
let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone()));

let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
let messages = new_messages.concat(old_messages).antijoin(incomplete.clone());

// // determine who has incoming `None` messages, and suppress all of them.
// let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
Expand All @@ -81,6 +81,6 @@ where
// .concat(&old_messages) // /-- possibly too clever: None if any inputs None.
// .antijoin(&incomplete)
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(&incomplete.map(|x| (x, None)))
.concat(incomplete.map(|x| (x, None)))
})
}
12 changes: 7 additions & 5 deletions differential-dataflow/src/algorithms/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where

use crate::collection::AsCollection;

let init = self.map(|record| (0, record));
let init = self.clone().map(|record| (0, record));
timely::dataflow::operators::generic::operator::empty(&init.scope())
.as_collection()
.iterate(|diff|
Expand All @@ -80,7 +80,7 @@ where
})
.map(|(_hash, pair)| pair)
)
.concat(&init)
.concat(init)
.map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
}
}
Expand Down Expand Up @@ -109,8 +109,10 @@ mod tests {
timely::dataflow::operators::generic::operator::empty(&init.scope())
.as_collection()
.iterate(|diff|
init.enter(&diff.scope())
.concat(&diff)
init
.clone()
.enter(&diff.scope())
.concat(diff)
.map(|(round, num)| ((round + num) / 10, (round, num)))
.reduce(|_hash, input, output| {
println!("Input: {:?}", input);
Expand All @@ -129,7 +131,7 @@ mod tests {
.inspect(|x| println!("{:?}", x))
.map(|(_hash, pair)| pair)
)
.concat(&init)
.concat(init)
.map(|(round, num)| { (num, (round + num) / 10) })
.map(|(_data, id)| id)
.threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
Expand Down
Loading
Loading