Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 4.13.0

* Added `AsyncSeq.ofList` β€” creates an async sequence from an F# list with an optimised direct-enumerator implementation (avoids `IEnumerator<T>` boxing).
* Added `AsyncSeq.ofArray` β€” creates an async sequence from an array with an optimised index-based enumerator (avoids `IEnumerator<T>` boxing).
* Added `AsyncSeq.cycle` β€” infinitely cycles through all elements of a source async sequence; returns empty if the source is empty.

### 4.12.0

* Tests: Added tests for `mapiAsync`, `tryPickAsync`, `pickAsync`, and `groupByAsync` β€” these four async functions previously had no test coverage.
Expand Down
38 changes: 38 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,35 @@
dispose e
| _ -> () }) :> AsyncSeq<'T>

let ofList (source: 'T list) : AsyncSeq<'T> =
AsyncSeqImpl(fun () ->
let mutable remaining = source
{ new IAsyncSeqEnumerator<'T> with
member _.MoveNext() =
async {
match remaining with
| [] -> return None
| h :: t ->
remaining <- t
return Some h
}
member _.Dispose() = () }) :> AsyncSeq<'T>

let ofArray (source: 'T []) : AsyncSeq<'T> =
AsyncSeqImpl(fun () ->
let mutable i = 0
{ new IAsyncSeqEnumerator<'T> with
member _.MoveNext() =
async {
if i < source.Length then
let v = source.[i]
i <- i + 1
return Some v
else
return None
}
member _.Dispose() = () }) :> AsyncSeq<'T>

let appendSeq (seq2: seq<'T>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
append source (ofSeq seq2)

Expand Down Expand Up @@ -2160,6 +2189,15 @@
let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
#endif

let cycle (source: AsyncSeq<'T>) : AsyncSeq<'T> =
asyncSeq {
let! arr = source |> toArrayAsync
if arr.Length > 0 then
while true do
for x in arr do
yield x
}

let partitionAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<'T[] * 'T[]> = async {
let trues = ResizeArray<'T>()
let falses = ResizeArray<'T>()
Expand Down Expand Up @@ -2665,7 +2703,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2706 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2706 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
13 changes: 13 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ module AsyncSeq =
/// Creates an async sequence given by evaluating the specified async computation until it returns None.
val replicateUntilNoneAsync : Async<'T option> -> AsyncSeq<'T>

/// Returns an async sequence which infinitely cycles through all elements of the source sequence.
/// The source is materialised into an array on first enumeration. Returns an empty sequence if
/// the source is empty.
val cycle : source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns an async sequence which emits an element on a specified period.
val intervalMs : periodMs:int -> AsyncSeq<DateTime>

Expand Down Expand Up @@ -488,6 +493,14 @@ module AsyncSeq =
/// input synchronous sequence and returns them one-by-one.
val ofSeq : source:seq<'T> -> AsyncSeq<'T>

/// Creates an asynchronous sequence that lazily takes elements from an
/// F# list and returns them one-by-one.
val ofList : source:'T list -> AsyncSeq<'T>

/// Creates an asynchronous sequence that lazily takes elements from an
/// array and returns them one-by-one.
val ofArray : source:'T [] -> AsyncSeq<'T>

/// Creates an asynchronous sequence that lazily takes element from an
/// input synchronous sequence of asynchronous computation and returns them one-by-one.
val ofSeqAsync : seq<Async<'T>> -> AsyncSeq<'T>
Expand Down
66 changes: 66 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2050 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2056,7 +2056,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2059 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4407,7 +4407,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4410 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4420,7 +4420,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4423 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4429,10 +4429,76 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4432 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| ("same", [|1;2;3|]) |], result)

// ===== ofList =====

[<Test>]
let ``AsyncSeq.ofList returns elements in order`` () =
let result = AsyncSeq.ofList [1; 2; 3; 4; 5] |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 4; 5 |], result)

[<Test>]
let ``AsyncSeq.ofList on empty list returns empty`` () =
let result = AsyncSeq.ofList ([] : int list) |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.ofList produces same result as ofSeq`` () =
let xs = [10; 20; 30]
let fromList = AsyncSeq.ofList xs |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let fromSeq = AsyncSeq.ofSeq xs |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(fromSeq, fromList)

// ===== ofArray =====

[<Test>]
let ``AsyncSeq.ofArray returns elements in order`` () =
let result = AsyncSeq.ofArray [| 10; 20; 30 |] |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 10; 20; 30 |], result)

[<Test>]
let ``AsyncSeq.ofArray on empty array returns empty`` () =
let result = AsyncSeq.ofArray ([||] : int []) |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.ofArray produces same result as ofSeq`` () =
let xs = [| 1; 2; 3; 4 |]
let fromArray = AsyncSeq.ofArray xs |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let fromSeq = AsyncSeq.ofSeq xs |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(fromSeq, fromArray)

// ===== cycle =====

[<Test>]
let ``AsyncSeq.cycle repeats elements indefinitely`` () =
let result =
AsyncSeq.cycle (AsyncSeq.ofList [1; 2; 3])
|> AsyncSeq.take 7
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 1; 2; 3; 1 |], result)

[<Test>]
let ``AsyncSeq.cycle on empty sequence returns empty`` () =
let result =
AsyncSeq.cycle AsyncSeq.empty<int>
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.cycle on singleton repeats single element`` () =
let result =
AsyncSeq.cycle (AsyncSeq.singleton 42)
|> AsyncSeq.take 5
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 42; 42; 42; 42; 42 |], result)
Loading