From acf8e978ddc792be3d8fd71f03b840d53fa406a7 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 02:41:34 +0200 Subject: [PATCH 1/4] Fix/improve/clarify tests --- ...askSeq.StateTransitionBug-delayed.Tests.CE.fs | 7 +++++-- src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs | 16 ++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs index 08ce0641..72e62a80 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug-delayed.Tests.CE.fs @@ -60,7 +60,10 @@ let ``CE empty taskSeq, GetAsyncEnumerator multiple times`` variant = task { [] let ``CE empty taskSeq, GetAsyncEnumerator multiple times and then MoveNextAsync`` variant = task { let tskSeq = getEmptyVariant variant - use enumerator = tskSeq.GetAsyncEnumerator() + use _ = tskSeq.GetAsyncEnumerator() + use _ = tskSeq.GetAsyncEnumerator() + use _ = tskSeq.GetAsyncEnumerator() + use _ = tskSeq.GetAsyncEnumerator() use enumerator = tskSeq.GetAsyncEnumerator() do! moveNextAndCheck false enumerator } @@ -75,7 +78,7 @@ let ``CE empty taskSeq, GetAsyncEnumerator + MoveNextAsync multiple times`` vari // getting the enumerator again use enumerator2 = tskSeq.GetAsyncEnumerator() do! moveNextAndCheck false enumerator1 // original should still work without raising - do! moveNextAndCheck false enumerator2 // new hone should also work without raising + do! moveNextAndCheck false enumerator2 // new one should also work without raising } // Note: this test used to cause xUnit to crash (#42), please leave it in, no matter how silly it looks diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs index af74e4bf..b821ef6b 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.Tests.CE.fs @@ -57,20 +57,20 @@ let ``CE taskSeq with nested yield!`` () = task { [] let ``CE taskSeq with nested deeply yield! perf test 8521 nested tasks`` () = task { - let control = seq { + let expected = seq { yield! [ 1..10 ] - - // original: yield! Seq.concat <| Seq.init 4251 (fun _ -> [ 1; 2 ]) - //yield! Seq.concat <| Seq.init 120 (fun _ -> [ 1; 2 ]) } let createTasks = createDummyTaskSeqWith 1L<µs> 10L<µs> - // FIXME: it appears that deeply nesting adds to performance degradation, need to benchmark/profile this - // probably cause: since this is *fast* with DirectTask, the reason is likely the way the Task.Delay causes + // + // NOTES: it appears that deeply nesting adds to performance degradation, need to benchmark/profile this + // probable cause: since this is *fast* with DirectTask, the reason is likely the way the Task.Delay causes // *many* subtasks to be delayed, resulting in exponential delay. Reason: max accuracy of Delay is about 15ms (!) - + // // RESOLUTION: seems to have been caused by erratic Task.Delay which has only a 15ms resolution + // + let tskSeq = taskSeq { yield! createTasks 10 @@ -107,7 +107,7 @@ let ``CE taskSeq with nested deeply yield! perf test 8521 nested tasks`` () = ta let! data = tskSeq |> TaskSeq.toListAsync data |> List.length |> should equal 8512 - data |> should equal (List.ofSeq control) + data |> should equal (List.ofSeq expected) // cannot compare seq this way, so, List.ofSeq it is } [] From 214846ee84af00e60248bad511b7479408809dd6 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 02:42:48 +0200 Subject: [PATCH 2/4] Remove 'taken' bool, add InitIterator method, remove outer AsyncEnumerator from implementing IAsyncDisposable, which is shouldn't --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 196 +++++++++++++++----------- 1 file changed, 116 insertions(+), 80 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 2ab3c239..29f455b4 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -58,6 +58,7 @@ type TaskSeqStateMachineData<'T>() = [] val mutable cancellationToken: CancellationToken + /// Keeps track of the objects that need to be disposed off on IAsyncDispose. [] val mutable disposalStack: ResizeArray<(unit -> Task)> @@ -67,21 +68,24 @@ type TaskSeqStateMachineData<'T>() = [] val mutable promiseOfValueOrEnd: ManualResetValueTaskSourceCore + /// Helper struct providing methods for awaiting 'next' in async iteration scenarios. [] val mutable builder: AsyncIteratorMethodBuilder - [] - val mutable taken: bool - + /// Whether or not a full iteration through the IAsyncEnumerator has completed [] val mutable completed: bool + /// Used by the AsyncEnumerator interface to return the Current value when + /// IAsyncEnumerator.Current is called [] val mutable current: ValueOption<'T> + /// A reference to 'self', because otherwise we can't use byref in the resumable code. [] - val mutable boxed: TaskSeq<'T> - // For tailcalls using 'return!' + val mutable boxedSelf: TaskSeq<'T> + + /// If set, used for tailcalls using 'return!', contains the target. [] val mutable tailcallTarget: TaskSeq<'T> option @@ -99,12 +103,11 @@ and [] TaskSeq<'T>() = abstract TailcallTarget: TaskSeq<'T> option abstract MoveNextAsyncResult: unit -> ValueTask + abstract InitIterator: CancellationToken -> IAsyncEnumerator<'T> interface IAsyncEnumerator<'T> with member _.Current = raiseNotImpl () member _.MoveNextAsync() = raiseNotImpl () - - interface IAsyncDisposable with member _.DisposeAsync() = raiseNotImpl () interface IAsyncEnumerable<'T> with @@ -129,22 +132,62 @@ and [] TaskSeq<'Machine, 'T inherit TaskSeq<'T>() let initialThreadId = Environment.CurrentManagedThreadId + /// Shadows the initial machine, just after it is initialized by the F# compiler-generated state. + /// Used on GetAsyncEnumerator, to ensure a clean state, and a ResumptionPoint of 0. [] val mutable _initialMachine: 'Machine + /// Keeps the active state machine. [] val mutable _machine: 'Machine + override this.InitIterator(ct) = + // if this is null, it means it's the first time for this Enumerable to create an Enumerator + // so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately. + match this._machine.Data :> obj with + | null -> + //if + // (not data.taken + // && initialThreadId = Environment.CurrentManagedThreadId) + let data = TaskSeqStateMachineData() + data.boxedSelf <- this + data.cancellationToken <- ct + data.builder <- AsyncIteratorMethodBuilder.Create() + this._machine.Data <- data + this :> IAsyncEnumerator<_> + + | _ -> + if verbose then + printfn "GetAsyncEnumerator, cloning..." + + // We need to reset state, but only to the "initial machine", resetting the _machine to + // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, + // we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration + // returning an empty sequence. + // + // Solution: we shadow the initial machine, which we then re-assign here: + let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> + clone._machine <- clone._initialMachine + clone._machine.Data <- TaskSeqStateMachineData() + clone._machine.Data.cancellationToken <- ct + clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + clone._machine.Data.boxedSelf <- clone + + clone :> System.Collections.Generic.IAsyncEnumerator<'T> + + member internal this.hijack() = let res = this._machine.Data.tailcallTarget match res with | Some tg -> - // we get here only when there are multiple returns (it seems) - // hence the tailcall logic + // We get here only when there are multiple ReturnFroms + // which allows us to do tailcalls. + + // This recurses itself, e.g. tg.TailcallTarget calls this.hijack(). match tg.TailcallTarget with | None -> res - | (Some tg2 as res2) -> + | Some tg2 as res2 -> // Cut out chains of tailcalls this._machine.Data.tailcallTarget <- Some tg2 res2 @@ -211,89 +254,53 @@ and [] TaskSeq<'Machine, 'T /// The MoveNext method is called by builder.MoveNext() in the resumable code member this.MoveNext() = match this.hijack () with + | None -> moveNextRef &this._machine | Some tg -> // jump to the hijacked method (tg :> IAsyncStateMachine).MoveNext() - | None -> moveNextRef &this._machine - /// SetStatemachine is (currently) never called - member _.SetStateMachine(_state) = - if verbose then - printfn "Setting state machine -- ignored" - () // not needed for reference type + /// SetStatemachine is (currently) never called + member _.SetStateMachine(_state) = () // not needed for reference type interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = - let data = this._machine.Data - - if - (not data.taken - && initialThreadId = Environment.CurrentManagedThreadId) - then - let data = this._machine.Data - data.taken <- true - data.cancellationToken <- ct - data.builder <- AsyncIteratorMethodBuilder.Create() - - this :> IAsyncEnumerator<_> - else + + match this._machine.Data :> obj with + | null -> + //if + // (not data.taken + // && initialThreadId = Environment.CurrentManagedThreadId) + this.InitIterator ct + + | _ -> if verbose then printfn "GetAsyncEnumerator, cloning..." - // it appears that the issue is possibly caused by the problem - // of having ValueTask all over the place, and by going over the - // iteration twice, we are trying to *await* twice, which is not allowed - // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 + // We need to reset state, but only to the "initial machine", resetting the _machine to + // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, + // we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration + // returning an empty sequence. + // + // Solution: we shadow the initial machine, which we then re-assign here: let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> clone._machine <- clone._initialMachine clone._machine.Data <- TaskSeqStateMachineData() clone._machine.Data.cancellationToken <- ct - clone._machine.Data.taken <- true + //clone._machine.Data.taken <- true clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - - //// calling reset causes NRE in IValueTaskSource.GetResult above - //clone.Machine.Data.promiseOfValueOrEnd.Reset() - clone._machine.Data.boxed <- clone - ////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack - //////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception - //clone.Machine.Data.awaiter <- null - //clone.Machine.Data.current <- ValueNone - //clone.Machine.Data.completed <- false + clone._machine.Data.boxedSelf <- clone clone :> System.Collections.Generic.IAsyncEnumerator<'T> - interface IAsyncDisposable with - member this.DisposeAsync() = - match this.hijack () with - | Some tg -> (tg :> IAsyncDisposable).DisposeAsync() - | None -> - if verbose then - printfn "DisposeAsync..." - - task { - match this._machine.Data.disposalStack with - | null -> () - | _ -> - let mutable exn = None - - for d in Seq.rev this._machine.Data.disposalStack do - try - do! d () - with e -> - if exn.IsNone then - exn <- Some e - - match exn with - | None -> () - | Some e -> raise e - } - |> ValueTask - interface System.Collections.Generic.IAsyncEnumerator<'T> with member this.Current = match this.hijack () with - | Some tg -> (tg :> IAsyncEnumerator<'T>).Current + | Some tg -> + // recurse, but not really: we jump to a different instance of our taskSeq + // in case there's a tail call target. + (tg :> IAsyncEnumerator<'T>).Current + | None -> match this._machine.Data.current with | ValueSome x -> x @@ -306,7 +313,11 @@ and [] TaskSeq<'Machine, 'T member this.MoveNextAsync() = match this.hijack () with - | Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync() + | Some tg -> + // recurse, but not really: we jump to a different instance of our taskSeq + // in case there's a tail call target. + (tg :> IAsyncEnumerator<'T>).MoveNextAsync() + | None -> if verbose then printfn "MoveNextAsync..." @@ -346,6 +357,33 @@ and [] TaskSeq<'Machine, 'T | Some tg -> tg.MoveNextAsyncResult() | None -> this.MoveNextAsyncResult() + /// Disposes of the IAsyncEnumerator (*not* the IAsyncEnumerable!!!) + member this.DisposeAsync() = + match this.hijack () with + | Some tg -> (tg :> IAsyncDisposable).DisposeAsync() + | None -> + if verbose then + printfn "DisposeAsync..." + + task { + match this._machine.Data.disposalStack with + | null -> () + | _ -> + let mutable exn = None + + for d in Seq.rev this._machine.Data.disposalStack do + try + do! d () + with e -> + if exn.IsNone then + exn <- Some e + + match exn with + | None -> () + | Some e -> raise e + } + |> ValueTask + override this.MoveNextAsyncResult() = let data = this._machine.Data @@ -439,7 +477,9 @@ type TaskSeqBuilder() = if verbose then printfn $"at Run.MoveNext, await" - let boxed = sm.Data.boxed + // don't capture the full object in the next closure (won't work because: byref) + // but only a reference to itself. + let boxed = sm.Data.boxedSelf sm.Data.awaiter.UnsafeOnCompleted( Action(fun () -> @@ -455,11 +495,7 @@ type TaskSeqBuilder() = sm.Data.builder.Complete() //-- RESUMABLE CODE END )) - (SetStateMachineMethodImpl<_>(fun sm state -> - if verbose then - printfn "at SetStatemachingMethodImpl, ignored" - - ())) + (SetStateMachineMethodImpl<_>(fun sm state -> ())) // not used in reference impl (AfterCode<_, _>(fun sm -> if verbose then printfn "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" @@ -467,8 +503,6 @@ type TaskSeqBuilder() = let ts = TaskSeq, 'T>() ts._initialMachine <- sm ts._machine <- sm - ts._machine.Data <- TaskSeqStateMachineData() - ts._machine.Data.boxed <- ts ts :> IAsyncEnumerable<'T>)) else failwith "no dynamic implementation as yet" @@ -725,6 +759,8 @@ type TaskSeqBuilder() = TaskSeqCode<_>(fun sm -> match other with | :? TaskSeq<'T> as other -> + // in cases that this is null + other.InitIterator sm.Data.cancellationToken |> ignore sm.Data.tailcallTarget <- Some other sm.Data.awaiter <- null sm.Data.current <- ValueNone From 80c217b98cf150eeebeae38505387f7aebf9ff85 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 03:54:48 +0200 Subject: [PATCH 3/4] Improve GetAsyncEnumerator, refactor and split logic, ensure ResultFrom remains stable --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 77 ++++++++++----------------- 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 29f455b4..422c2c2d 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -103,7 +103,9 @@ and [] TaskSeq<'T>() = abstract TailcallTarget: TaskSeq<'T> option abstract MoveNextAsyncResult: unit -> ValueTask - abstract InitIterator: CancellationToken -> IAsyncEnumerator<'T> + + /// Initializes the machine data on 'self' + abstract InitMachineDataForTailcalls: ct: CancellationToken -> unit interface IAsyncEnumerator<'T> with member _.Current = raiseNotImpl () @@ -141,39 +143,17 @@ and [] TaskSeq<'Machine, 'T [] val mutable _machine: 'Machine - override this.InitIterator(ct) = - // if this is null, it means it's the first time for this Enumerable to create an Enumerator - // so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately. + override this.InitMachineDataForTailcalls(ct) = match this._machine.Data :> obj with - | null -> - //if - // (not data.taken - // && initialThreadId = Environment.CurrentManagedThreadId) - let data = TaskSeqStateMachineData() - data.boxedSelf <- this - data.cancellationToken <- ct - data.builder <- AsyncIteratorMethodBuilder.Create() - this._machine.Data <- data - this :> IAsyncEnumerator<_> - - | _ -> - if verbose then - printfn "GetAsyncEnumerator, cloning..." - - // We need to reset state, but only to the "initial machine", resetting the _machine to - // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, - // we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration - // returning an empty sequence. - // - // Solution: we shadow the initial machine, which we then re-assign here: - let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - clone._machine <- clone._initialMachine - clone._machine.Data <- TaskSeqStateMachineData() - clone._machine.Data.cancellationToken <- ct - clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - clone._machine.Data.boxedSelf <- clone + | null -> this.InitMachineData(ct, &this._machine) + | _ -> () - clone :> System.Collections.Generic.IAsyncEnumerator<'T> + member this.InitMachineData(ct, machine: 'Machine byref) = + let data = TaskSeqStateMachineData() + data.boxedSelf <- this + data.cancellationToken <- ct + data.builder <- AsyncIteratorMethodBuilder.Create() + machine.Data <- data member internal this.hijack() = @@ -265,13 +245,12 @@ and [] TaskSeq<'Machine, 'T interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = - + // if this is null, it means it's the first time for this Enumerable to create an Enumerator + // so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately. match this._machine.Data :> obj with - | null -> - //if - // (not data.taken - // && initialThreadId = Environment.CurrentManagedThreadId) - this.InitIterator ct + | null when initialThreadId = Environment.CurrentManagedThreadId -> + this.InitMachineData(ct, &this._machine) + this // just return 'self' here | _ -> if verbose then @@ -283,15 +262,11 @@ and [] TaskSeq<'Machine, 'T // returning an empty sequence. // // Solution: we shadow the initial machine, which we then re-assign here: + // let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> clone._machine <- clone._initialMachine - clone._machine.Data <- TaskSeqStateMachineData() - clone._machine.Data.cancellationToken <- ct - //clone._machine.Data.taken <- true - clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - clone._machine.Data.boxedSelf <- clone - - clone :> System.Collections.Generic.IAsyncEnumerator<'T> + clone.InitMachineData(ct, &clone._machine) + clone interface System.Collections.Generic.IAsyncEnumerator<'T> with member this.Current = @@ -505,7 +480,8 @@ type TaskSeqBuilder() = ts._machine <- sm ts :> IAsyncEnumerable<'T>)) else - failwith "no dynamic implementation as yet" + NotImplementedException "No dynamic implementation for TaskSeq yet." + |> raise // let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm)) // let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f -> // // TODO: add exception handling? @@ -760,11 +736,16 @@ type TaskSeqBuilder() = match other with | :? TaskSeq<'T> as other -> // in cases that this is null - other.InitIterator sm.Data.cancellationToken |> ignore + other.InitMachineDataForTailcalls(sm.Data.cancellationToken) + + // set 'self' to point to the 'other', and unset Current sm.Data.tailcallTarget <- Some other sm.Data.awaiter <- null sm.Data.current <- ValueNone + // For tailcalls we return 'false' and re-run from the entry (trampoline) false - | _ -> b.YieldFrom(other).Invoke(&sm)) + | _ -> + // other types of IAsyncEnumerable, just yield + b.YieldFrom(other).Invoke(&sm)) From f8229590abff1d63bd267210580a67f1dd932c33 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 03:59:19 +0200 Subject: [PATCH 4/4] Remove dependence on MemberwiseClone. TODO: check if this is approach is more performant --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 422c2c2d..1b42f7c7 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -263,8 +263,9 @@ and [] TaskSeq<'Machine, 'T // // Solution: we shadow the initial machine, which we then re-assign here: // - let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - clone._machine <- clone._initialMachine + let clone = TaskSeq<'Machine, 'T>() // we used MemberwiseClone, TODO: test difference in perf, but this should be faster + clone._machine <- this._initialMachine + clone._initialMachine <- this._initialMachine // TODO: proof with a test that this is necessary: probably not clone.InitMachineData(ct, &clone._machine) clone