Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TaskSeq.fold and TaskSeq.foldAsync functions #18

Merged
merged 2 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/FSharpy.TaskSeq.Test/FSharpy.TaskSeq.Test.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<Compile Include="TaskSeq.Iter.Tests.fs" />
<Compile Include="TaskSeq.Map.Tests.fs" />
<Compile Include="TaskSeq.Collect.Tests.fs" />
<Compile Include="TaskSeq.Fold.Tests.fs" />
<Compile Include="TaskSeq.Tests.Utility.fs" />
<Compile Include="TaskSeq.Tests.fs" />
<Compile Include="TaskSeq.PocTests.fs" />
Expand Down
49 changes: 49 additions & 0 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Fold.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
module FSharpy.TaskSeq.Tests.Fold

open System.Text
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharpy


[<Fact>]
let ``TaskSeq-fold folds with every item`` () = task {
let! alphabet =
createDummyTaskSeqWith 50L<µs> 1000L<µs> 26
|> TaskSeq.fold (fun (state: StringBuilder) item -> state.Append(char item + '@')) (StringBuilder())

alphabet.ToString()
|> should equal "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
}

[<Fact>]
let ``TaskSeq-foldAsync folds with every item`` () = task {
let! alphabet =
createDummyTaskSeqWith 50L<µs> 1000L<µs> 26
|> TaskSeq.foldAsync
(fun (state: StringBuilder) item -> task { return state.Append(char item + '@') })
(StringBuilder())

alphabet.ToString()
|> should equal "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
}

[<Fact>]
let ``TaskSeq-fold takes state on empty IAsyncEnumberable`` () = task {
let! empty =
TaskSeq.empty
|> TaskSeq.fold (fun _ item -> char (item + 64)) '_'

empty |> should equal '_'
}

[<Fact>]
let ``TaskSeq-foldAsync takes state on empty IAsyncEnumerable`` () = task {
let! alphabet =
TaskSeq.empty
|> TaskSeq.foldAsync (fun _ item -> task { return char (item + 64) }) '_'

alphabet |> should equal '_'
}
2 changes: 1 addition & 1 deletion src/FSharpy.TaskSeq.Test/TestUtils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,4 @@ module TestUtils =
}

/// Create a bunch of dummy tasks, each lasting between 10-30ms with spin-wait delays.
let createDummyTaskSeq = createDummyTaskSeqWith 10_0000L<µs> 30_0000L<µs>
let createDummyTaskSeq = createDummyTaskSeqWith 10_000L<µs> 30_000L<µs>
6 changes: 6 additions & 0 deletions src/FSharpy.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,9 @@ module TaskSeq =
/// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
/// if the sequences are or unequal length.
let zip taskSeq1 taskSeq2 = Internal.zip taskSeq1 taskSeq2

/// Applies a function to each element of the task sequence, threading an accumulator argument through the computation.
let fold folder state taskSeq = Internal.fold (FolderAction folder) state taskSeq

/// Applies an async function to each element of the task sequence, threading an accumulator argument through the computation.
let foldAsync folder state taskSeq = Internal.fold (AsyncFolderAction folder) state taskSeq
32 changes: 23 additions & 9 deletions src/FSharpy.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ module ExtraTaskSeqOperators =

[<Struct>]
type Action<'T, 'U, 'TaskU when 'TaskU :> Task<'U>> =
| CountableAction of c_action: (int -> 'T -> 'U)
| SimpleAction of s_action: ('T -> 'U)
| AsyncCountableAction of ac_action: (int -> 'T -> 'TaskU)
| AsyncSimpleAction of as_action: ('T -> 'TaskU)
| CountableAction of countable_action: (int -> 'T -> 'U)
| SimpleAction of simple_action: ('T -> 'U)
| AsyncCountableAction of async_countable_action: (int -> 'T -> 'TaskU)
| AsyncSimpleAction of async_simple_action: ('T -> 'TaskU)

[<Struct>]
type FolderAction<'T, 'State, 'TaskState when 'TaskState :> Task<'State>> =
| FolderAction of state_action: ('State -> 'T -> 'State)
| AsyncFolderAction of async_state_action: ('State -> 'T -> 'TaskState)

module internal TaskSeqInternal =
let iter action (taskSeq: taskSeq<_>) = task {
Expand Down Expand Up @@ -59,17 +64,26 @@ module internal TaskSeqInternal =
go <- step
}

let fold (action: 'State -> 'T -> 'State) initial (taskSeq: taskSeq<_>) = task {
let fold folder initial (taskSeq: taskSeq<_>) = task {
let e = taskSeq.GetAsyncEnumerator(CancellationToken())
let mutable go = true
let mutable result = initial
let! step = e.MoveNextAsync()
go <- step

while go do
result <- action result e.Current
let! step = e.MoveNextAsync()
go <- step
match folder with
| FolderAction folder ->
while go do
result <- folder result e.Current
let! step = e.MoveNextAsync()
go <- step

| AsyncFolderAction folder ->
while go do
let! tempResult = folder result e.Current
result <- tempResult
let! step = e.MoveNextAsync()
go <- step

return result
}
Expand Down