From 7201f7a3d6d93190fcabcb035f1101f5931c4349 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 23 Feb 2026 09:49:20 +0100 Subject: [PATCH] probe: Return input stream The `probe` function returns the input stream it was called on. This avoids the need for the user to clone the stream if they want to create and attach a probe. The function now returns the handle and stream. Signed-off-by: Moritz Hoffmann --- mdbook/src/chapter_0/chapter_0_1.md | 1 + mdbook/src/chapter_1/chapter_1_1.md | 1 + mdbook/src/chapter_1/chapter_1_3.md | 1 + timely/examples/event_driven.rs | 2 +- timely/examples/exchange.rs | 1 + timely/src/dataflow/operators/core/probe.rs | 16 +++++++++------- timely/src/synchronization/barrier.rs | 3 +-- timely/src/worker.rs | 2 ++ timely/tests/gh_523.rs | 1 + 9 files changed, 18 insertions(+), 10 deletions(-) diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 2858d9780..fba86fd1c 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -22,6 +22,7 @@ timely::execute_from_args(std::env::args(), |worker| { .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() + .0 ); // introduce data and watch! diff --git a/mdbook/src/chapter_1/chapter_1_1.md b/mdbook/src/chapter_1/chapter_1_1.md index 9d5c75381..196a956fa 100644 --- a/mdbook/src/chapter_1/chapter_1_1.md +++ b/mdbook/src/chapter_1/chapter_1_1.md @@ -28,6 +28,7 @@ fn main() { .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() + .0 ); // introduce data and watch! diff --git a/mdbook/src/chapter_1/chapter_1_3.md b/mdbook/src/chapter_1/chapter_1_3.md index 0d7107ec7..a7f24a1e4 100644 --- a/mdbook/src/chapter_1/chapter_1_3.md +++ b/mdbook/src/chapter_1/chapter_1_3.md @@ -23,6 +23,7 @@ fn main() { .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() + .0 ); // introduce data and watch! diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index a8956c253..4eb6c7298 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -24,7 +24,7 @@ fn main() { for _step in 0 .. length { stream = stream.map(|x: ()| x); } - let probe = stream.probe(); + let (probe, _stream) = stream.probe(); inputs.push(input); probes.push(probe); }); diff --git a/timely/examples/exchange.rs b/timely/examples/exchange.rs index 895fc4c34..c31d2709f 100644 --- a/timely/examples/exchange.rs +++ b/timely/examples/exchange.rs @@ -15,6 +15,7 @@ fn main() { .input_from(&mut input) .exchange(|&x| x as u64) .probe() + .0 ); diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 7625e390c..f3e22a7d3 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -19,6 +19,8 @@ use crate::dataflow::channels::Message; pub trait Probe { /// Constructs a progress probe which indicates which timestamps have elapsed at the operator. /// + /// Returns a tuple of a probe handle and the input stream. + /// /// # Examples /// ``` /// use timely::*; @@ -31,8 +33,8 @@ pub trait Probe { /// // add an input and base computation off of it /// let (mut input, probe) = worker.dataflow(|scope| { /// let (input, stream) = scope.new_input(); - /// let probe = stream.inspect(|x| println!("hello {:?}", x)) - /// .probe(); + /// let (probe, _) = stream.inspect(|x| println!("hello {:?}", x)) + /// .probe(); /// (input, probe) /// }); /// @@ -44,7 +46,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe(self) -> Handle; + fn probe(self) -> (Handle, Self); /// Inserts a progress probe in a stream. /// @@ -80,12 +82,12 @@ pub trait Probe { } impl Probe for StreamCore { - fn probe(self) -> Handle { + fn probe(self) -> (Handle, Self) { // the frontier is shared state; scope updates, handle reads. let handle = Handle::::new(); - self.probe_with(&handle); - handle + let stream = self.probe_with(&handle); + (handle, stream) } fn probe_with(self, handle: &Handle) -> StreamCore { @@ -197,7 +199,7 @@ mod tests { // create a new input, and inspect its output let (mut input, probe) = worker.dataflow(move |scope| { let (input, stream) = scope.new_input::(); - (input, stream.probe()) + (input, stream.probe().0) }); // introduce data and watch! diff --git a/timely/src/synchronization/barrier.rs b/timely/src/synchronization/barrier.rs index e7d1fe460..4d51094ed 100644 --- a/timely/src/synchronization/barrier.rs +++ b/timely/src/synchronization/barrier.rs @@ -18,7 +18,7 @@ impl Barrier { use crate::dataflow::operators::{Input, Probe}; let (input, probe) = worker.dataflow(|scope| { let (handle, stream) = scope.new_input::<()>(); - (handle, stream.probe()) + (handle, stream.probe().0) }); Barrier { input, probe, worker: worker.clone() } } @@ -51,4 +51,3 @@ impl Barrier { !self.probe.less_than(self.input.time()) } } - diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 29f40aeda..23eb9e90b 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -448,6 +448,7 @@ impl Worker { /// .to_stream(scope) /// .inspect(|x| println!("{:?}", x)) /// .probe() + /// .0 /// }); /// /// worker.step_while(|| probe.less_than(&0)); @@ -477,6 +478,7 @@ impl Worker { /// .to_stream(scope) /// .inspect(|x| println!("{:?}", x)) /// .probe() + /// .0 /// }); /// /// worker.step_or_park_while(None, || probe.less_than(&0)); diff --git a/timely/tests/gh_523.rs b/timely/tests/gh_523.rs index 589420081..e70dc22da 100644 --- a/timely/tests/gh_523.rs +++ b/timely/tests/gh_523.rs @@ -23,6 +23,7 @@ fn gh_523() { }) .exchange(|x| *x) .probe() + .0 }); for round in 0..2 {