Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 4.11.0

* Added `AsyncSeq.tryFindBack` / `findBack` — returns the **last** element in a sequence for which the predicate returns `true`. Mirrors `Seq.tryFindBack` / `Seq.findBack`.
* Added `AsyncSeq.tryFindBackAsync` / `findBackAsync` — async-predicate variants of the above.

### 4.10.0

* Added `AsyncSeq.withCancellation` — returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).
Expand Down
32 changes: 32 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,38 @@
let forallAsync f (source : AsyncSeq<'T>) =
source |> existsAsync (fun v -> async { let! b = f v in return not b }) |> Async.map not

/// Returns the last element for which the given async predicate returns true, or None if no
/// such element exists. The entire sequence is consumed.
let tryFindBackAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<'T option> = async {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let mutable b = move
let mutable result = None
while b.IsSome do
let! ok = predicate b.Value
if ok then result <- b
let! next = ie.MoveNext()
b <- next
return result }

/// Returns the last element for which the given predicate returns true, or None if no
/// such element exists. The entire sequence is consumed.
let tryFindBack (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<'T option> =
tryFindBackAsync (predicate >> async.Return) source

/// Returns the last element for which the given async predicate returns true.
/// Raises <c>KeyNotFoundException</c> if no such element exists.
let findBackAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<'T> = async {
let! result = tryFindBackAsync predicate source
match result with
| None -> return raise (System.Collections.Generic.KeyNotFoundException("An element satisfying the predicate was not found in the collection."))
| Some v -> return v }

/// Returns the last element for which the given predicate returns true.
/// Raises <c>KeyNotFoundException</c> if no such element exists.
let findBack (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<'T> =
findBackAsync (predicate >> async.Return) source

let foldAsync f (state:'State) (source : AsyncSeq<'T>) =
match source with
| :? AsyncSeqOp<'T> as source -> source.FoldAsync f state
Expand Down Expand Up @@ -2450,7 +2482,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2485 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2485 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
16 changes: 16 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,22 @@ module AsyncSeq =
/// Asynchronously determine if the async predicate returns true for all values in the sequence
val forallAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<bool>

/// Returns the last element in the sequence for which the given async predicate returns true,
/// or <c>None</c> if no such element exists. The entire sequence is consumed.
val tryFindBackAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<'T option>

/// Returns the last element in the sequence for which the given predicate returns true,
/// or <c>None</c> if no such element exists. The entire sequence is consumed.
val tryFindBack : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T option>

/// Returns the last element in the sequence for which the given async predicate returns true.
/// Raises <c>KeyNotFoundException</c> if no such element exists.
val findBackAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<'T>

/// Returns the last element in the sequence for which the given predicate returns true.
/// Raises <c>KeyNotFoundException</c> if no such element exists.
val findBack : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T>

/// Return an asynchronous sequence which, when iterated, includes an integer indicating the index of each element in the sequence.
val indexed : source:AsyncSeq<'T> -> AsyncSeq<int64 * 'T>

Expand Down
84 changes: 84 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3719,3 +3719,87 @@
|> Async.RunSynchronously
|> ignore)
|> ignore

// ===== tryFindBack / findBack / tryFindBackAsync / findBackAsync =====

[<Test>]
let ``AsyncSeq.tryFindBack returns last matching element`` () =
let result =
AsyncSeq.ofSeq [1; 3; 5; 2; 4; 6; 7]
|> AsyncSeq.tryFindBack (fun x -> x % 2 = 0)
|> Async.RunSynchronously
Assert.AreEqual(Some 6, result)

[<Test>]
let ``AsyncSeq.tryFindBack returns None when no element matches`` () =
let result =
AsyncSeq.ofSeq [1; 3; 5]
|> AsyncSeq.tryFindBack (fun x -> x % 2 = 0)
|> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryFindBack returns None for empty sequence`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.tryFindBack (fun _ -> true)
|> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryFindBack returns last element when all match`` () =
let result =
AsyncSeq.ofSeq [10; 20; 30]
|> AsyncSeq.tryFindBack (fun _ -> true)
|> Async.RunSynchronously
Assert.AreEqual(Some 30, result)

[<Test>]
let ``AsyncSeq.findBack returns last matching element`` () =
let result =
AsyncSeq.ofSeq [2; 4; 1; 3; 6; 5]
|> AsyncSeq.findBack (fun x -> x % 2 = 0)
|> Async.RunSynchronously
Assert.AreEqual(6, result)

[<Test>]
let ``AsyncSeq.findBack raises KeyNotFoundException when no match`` () =
Assert.Throws<System.Collections.Generic.KeyNotFoundException>(fun () ->
AsyncSeq.ofSeq [1; 3; 5]
|> AsyncSeq.findBack (fun x -> x % 2 = 0)
|> Async.RunSynchronously
|> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.tryFindBackAsync returns last element satisfying async predicate`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3; 4; 5]
|> AsyncSeq.tryFindBackAsync (fun x -> async { return x < 4 })
|> Async.RunSynchronously
Assert.AreEqual(Some 3, result)

[<Test>]
let ``AsyncSeq.tryFindBackAsync returns None when nothing matches`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.tryFindBackAsync (fun x -> async { return x > 99 })
|> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.findBackAsync returns last matching element`` () =
let result =
AsyncSeq.ofSeq [10; 20; 5; 15; 30]
|> AsyncSeq.findBackAsync (fun x -> async { return x > 10 })
|> Async.RunSynchronously
Assert.AreEqual(30, result)

[<Test>]
let ``AsyncSeq.findBackAsync raises KeyNotFoundException when no match`` () =
Assert.Throws<System.Collections.Generic.KeyNotFoundException>(fun () ->
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.findBackAsync (fun x -> async { return x > 99 })
|> Async.RunSynchronously
|> ignore)
|> ignore
Loading