Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve GetAsyncEnumerator, move inheritance of IAsyncDisposable #51

Merged
merged 4 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task {
[<Theory; InlineData "do"; InlineData "do!"; InlineData "yield! (seq)"; InlineData "yield! (taskseq)">]
let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task {
let tskSeq = getEmptyVariant variant
use enumerator = tskSeq.GetAsyncEnumerator()
use _ = tskSeq.GetAsyncEnumerator()
use _ = tskSeq.GetAsyncEnumerator()
use _ = tskSeq.GetAsyncEnumerator()
use _ = tskSeq.GetAsyncEnumerator()
use enumerator = tskSeq.GetAsyncEnumerator()
do! moveNextAndCheck false enumerator
}
Expand All @@ -75,7 +78,7 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` vari
// getting the enumerator again
use enumerator2 = tskSeq.GetAsyncEnumerator()
do! moveNextAndCheck false enumerator1 // original should still work without raising
do! moveNextAndCheck false enumerator2 // new hone should also work without raising
do! moveNextAndCheck false enumerator2 // new one should also work without raising
}

// Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks
Expand Down
16 changes: 8 additions & 8 deletions src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,20 @@ let ``CE taskSeq with nested yield!`` () = task {

[<Fact>]
let ``CE taskSeq with nested deeply yield! perf test 8521 nested tasks`` () = task {
let control = seq {
let expected = seq {
yield! [ 1..10 ]

// original:
yield! Seq.concat <| Seq.init 4251 (fun _ -> [ 1; 2 ])
//yield! Seq.concat <| Seq.init 120 (fun _ -> [ 1; 2 ])
}

let createTasks = createDummyTaskSeqWith 1L<µs> 10L<µs>
// FIXME: it appears that deeply nesting adds to performance degradation, need to benchmark/profile this
// probably cause: since this is *fast* with DirectTask, the reason is likely the way the Task.Delay causes
//
// NOTES: it appears that deeply nesting adds to performance degradation, need to benchmark/profile this
// probable cause: since this is *fast* with DirectTask, the reason is likely the way the Task.Delay causes
// *many* subtasks to be delayed, resulting in exponential delay. Reason: max accuracy of Delay is about 15ms (!)

//
// RESOLUTION: seems to have been caused by erratic Task.Delay which has only a 15ms resolution
//

let tskSeq = taskSeq {
yield! createTasks 10

Expand Down Expand Up @@ -107,7 +107,7 @@ let ``CE taskSeq with nested deeply yield! perf test 8521 nested tasks`` () = ta

let! data = tskSeq |> TaskSeq.toListAsync
data |> List.length |> should equal 8512
data |> should equal (List.ofSeq control)
data |> should equal (List.ofSeq expected) // cannot compare seq this way, so, List.ofSeq it is
}

[<Fact>]
Expand Down
196 changes: 107 additions & 89 deletions src/FSharpy.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable cancellationToken: CancellationToken

/// Keeps track of the objects that need to be disposed off on IAsyncDispose.
[<DefaultValue(false)>]
val mutable disposalStack: ResizeArray<(unit -> Task)>

Expand All @@ -67,21 +68,24 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable promiseOfValueOrEnd: ManualResetValueTaskSourceCore<bool>

/// Helper struct providing methods for awaiting 'next' in async iteration scenarios.
[<DefaultValue(false)>]
val mutable builder: AsyncIteratorMethodBuilder

[<DefaultValue(false)>]
val mutable taken: bool

/// Whether or not a full iteration through the IAsyncEnumerator has completed
[<DefaultValue(false)>]
val mutable completed: bool

/// Used by the AsyncEnumerator interface to return the Current value when
/// IAsyncEnumerator.Current is called
[<DefaultValue(false)>]
val mutable current: ValueOption<'T>

/// A reference to 'self', because otherwise we can't use byref in the resumable code.
[<DefaultValue(false)>]
val mutable boxed: TaskSeq<'T>
// For tailcalls using 'return!'
val mutable boxedSelf: TaskSeq<'T>

/// If set, used for tailcalls using 'return!', contains the target.
[<DefaultValue(false)>]
val mutable tailcallTarget: TaskSeq<'T> option

Expand All @@ -100,11 +104,12 @@ and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =
abstract TailcallTarget: TaskSeq<'T> option
abstract MoveNextAsyncResult: unit -> ValueTask<bool>

/// Initializes the machine data on 'self'
abstract InitMachineDataForTailcalls: ct: CancellationToken -> unit

interface IAsyncEnumerator<'T> with
member _.Current = raiseNotImpl ()
member _.MoveNextAsync() = raiseNotImpl ()

interface IAsyncDisposable with
member _.DisposeAsync() = raiseNotImpl ()

interface IAsyncEnumerable<'T> with
Expand All @@ -129,22 +134,40 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
inherit TaskSeq<'T>()
let initialThreadId = Environment.CurrentManagedThreadId

/// Shadows the initial machine, just after it is initialized by the F# compiler-generated state.
/// Used on GetAsyncEnumerator, to ensure a clean state, and a ResumptionPoint of 0.
[<DefaultValue(false)>]
val mutable _initialMachine: 'Machine

/// Keeps the active state machine.
[<DefaultValue(false)>]
val mutable _machine: 'Machine

override this.InitMachineDataForTailcalls(ct) =
match this._machine.Data :> obj with
| null -> this.InitMachineData(ct, &this._machine)
| _ -> ()

member this.InitMachineData(ct, machine: 'Machine byref) =
let data = TaskSeqStateMachineData()
data.boxedSelf <- this
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
machine.Data <- data


member internal this.hijack() =
let res = this._machine.Data.tailcallTarget

match res with
| Some tg ->
// we get here only when there are multiple returns (it seems)
// hence the tailcall logic
// We get here only when there are multiple ReturnFroms
// which allows us to do tailcalls.

// This recurses itself, e.g. tg.TailcallTarget calls this.hijack().
match tg.TailcallTarget with
| None -> res
| (Some tg2 as res2) ->
| Some tg2 as res2 ->
// Cut out chains of tailcalls
this._machine.Data.tailcallTarget <- Some tg2
res2
Expand Down Expand Up @@ -211,89 +234,49 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
/// The MoveNext method is called by builder.MoveNext() in the resumable code
member this.MoveNext() =
match this.hijack () with
| None -> moveNextRef &this._machine
| Some tg ->
// jump to the hijacked method
(tg :> IAsyncStateMachine).MoveNext()
| None -> moveNextRef &this._machine

/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) =
if verbose then
printfn "Setting state machine -- ignored"

() // not needed for reference type
/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) = () // not needed for reference type

interface IAsyncEnumerable<'T> with
member this.GetAsyncEnumerator(ct) =
let data = this._machine.Data

if
(not data.taken
&& initialThreadId = Environment.CurrentManagedThreadId)
then
let data = this._machine.Data
data.taken <- true
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()

this :> IAsyncEnumerator<_>
else
// if this is null, it means it's the first time for this Enumerable to create an Enumerator
// so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately.
match this._machine.Data :> obj with
| null when initialThreadId = Environment.CurrentManagedThreadId ->
this.InitMachineData(ct, &this._machine)
this // just return 'self' here

| _ ->
if verbose then
printfn "GetAsyncEnumerator, cloning..."

// it appears that the issue is possibly caused by the problem
// of having ValueTask all over the place, and by going over the
// iteration twice, we are trying to *await* twice, which is not allowed
// see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
clone._machine <- clone._initialMachine
clone._machine.Data <- TaskSeqStateMachineData()
clone._machine.Data.cancellationToken <- ct
clone._machine.Data.taken <- true
clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create()

//// calling reset causes NRE in IValueTaskSource.GetResult above
//clone.Machine.Data.promiseOfValueOrEnd.Reset()
clone._machine.Data.boxed <- clone
////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack
//////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception
//clone.Machine.Data.awaiter <- null
//clone.Machine.Data.current <- ValueNone
//clone.Machine.Data.completed <- false

clone :> System.Collections.Generic.IAsyncEnumerator<'T>

interface IAsyncDisposable with
member this.DisposeAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncDisposable).DisposeAsync()
| None ->
if verbose then
printfn "DisposeAsync..."

task {
match this._machine.Data.disposalStack with
| null -> ()
| _ ->
let mutable exn = None

for d in Seq.rev this._machine.Data.disposalStack do
try
do! d ()
with e ->
if exn.IsNone then
exn <- Some e

match exn with
| None -> ()
| Some e -> raise e
}
|> ValueTask
// We need to reset state, but only to the "initial machine", resetting the _machine to
// Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However,
// we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration
// returning an empty sequence.
//
// Solution: we shadow the initial machine, which we then re-assign here:
//
let clone = TaskSeq<'Machine, 'T>() // we used MemberwiseClone, TODO: test difference in perf, but this should be faster
clone._machine <- this._initialMachine
clone._initialMachine <- this._initialMachine // TODO: proof with a test that this is necessary: probably not
clone.InitMachineData(ct, &clone._machine)
clone

interface System.Collections.Generic.IAsyncEnumerator<'T> with
member this.Current =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).Current
| Some tg ->
// recurse, but not really: we jump to a different instance of our taskSeq
// in case there's a tail call target.
(tg :> IAsyncEnumerator<'T>).Current

| None ->
match this._machine.Data.current with
| ValueSome x -> x
Expand All @@ -306,7 +289,11 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T

member this.MoveNextAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync()
| Some tg ->
// recurse, but not really: we jump to a different instance of our taskSeq
// in case there's a tail call target.
(tg :> IAsyncEnumerator<'T>).MoveNextAsync()

| None ->
if verbose then
printfn "MoveNextAsync..."
Expand Down Expand Up @@ -346,6 +333,33 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
| Some tg -> tg.MoveNextAsyncResult()
| None -> this.MoveNextAsyncResult()

/// Disposes of the IAsyncEnumerator (*not* the IAsyncEnumerable!!!)
member this.DisposeAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncDisposable).DisposeAsync()
| None ->
if verbose then
printfn "DisposeAsync..."

task {
match this._machine.Data.disposalStack with
| null -> ()
| _ ->
let mutable exn = None

for d in Seq.rev this._machine.Data.disposalStack do
try
do! d ()
with e ->
if exn.IsNone then
exn <- Some e

match exn with
| None -> ()
| Some e -> raise e
}
|> ValueTask


override this.MoveNextAsyncResult() =
let data = this._machine.Data
Expand Down Expand Up @@ -439,7 +453,9 @@ type TaskSeqBuilder() =
if verbose then
printfn $"at Run.MoveNext, await"

let boxed = sm.Data.boxed
// don't capture the full object in the next closure (won't work because: byref)
// but only a reference to itself.
let boxed = sm.Data.boxedSelf

sm.Data.awaiter.UnsafeOnCompleted(
Action(fun () ->
Expand All @@ -455,23 +471,18 @@ type TaskSeqBuilder() =
sm.Data.builder.Complete()
//-- RESUMABLE CODE END
))
(SetStateMachineMethodImpl<_>(fun sm state ->
if verbose then
printfn "at SetStatemachingMethodImpl, ignored"

()))
(SetStateMachineMethodImpl<_>(fun sm state -> ())) // not used in reference impl
(AfterCode<_, _>(fun sm ->
if verbose then
printfn "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info"

let ts = TaskSeq<TaskSeqStateMachine<'T>, 'T>()
ts._initialMachine <- sm
ts._machine <- sm
ts._machine.Data <- TaskSeqStateMachineData()
ts._machine.Data.boxed <- ts
ts :> IAsyncEnumerable<'T>))
else
failwith "no dynamic implementation as yet"
NotImplementedException "No dynamic implementation for TaskSeq yet."
|> raise
// let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm))
// let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f ->
// // TODO: add exception handling?
Expand Down Expand Up @@ -725,10 +736,17 @@ type TaskSeqBuilder() =
TaskSeqCode<_>(fun sm ->
match other with
| :? TaskSeq<'T> as other ->
// in cases that this is null
other.InitMachineDataForTailcalls(sm.Data.cancellationToken)

// set 'self' to point to the 'other', and unset Current
sm.Data.tailcallTarget <- Some other
sm.Data.awaiter <- null
sm.Data.current <- ValueNone

// For tailcalls we return 'false' and re-run from the entry (trampoline)
false

| _ -> b.YieldFrom(other).Invoke(&sm))
| _ ->
// other types of IAsyncEnumerable, just yield
b.YieldFrom(other).Invoke(&sm))