From 43f311d10fef38486f4c0b65485819e2dbd70e95 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 29 Oct 2022 03:54:48 +0200 Subject: [PATCH] Improve GetAsyncEnumerator, refactor and split logic, ensure ResultFrom remains stable --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 77 ++++++++++----------------- 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 29f455b4..422c2c2d 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -103,7 +103,9 @@ and [] TaskSeq<'T>() = abstract TailcallTarget: TaskSeq<'T> option abstract MoveNextAsyncResult: unit -> ValueTask - abstract InitIterator: CancellationToken -> IAsyncEnumerator<'T> + + /// Initializes the machine data on 'self' + abstract InitMachineDataForTailcalls: ct: CancellationToken -> unit interface IAsyncEnumerator<'T> with member _.Current = raiseNotImpl () @@ -141,39 +143,17 @@ and [] TaskSeq<'Machine, 'T [] val mutable _machine: 'Machine - override this.InitIterator(ct) = - // if this is null, it means it's the first time for this Enumerable to create an Enumerator - // so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately. + override this.InitMachineDataForTailcalls(ct) = match this._machine.Data :> obj with - | null -> - //if - // (not data.taken - // && initialThreadId = Environment.CurrentManagedThreadId) - let data = TaskSeqStateMachineData() - data.boxedSelf <- this - data.cancellationToken <- ct - data.builder <- AsyncIteratorMethodBuilder.Create() - this._machine.Data <- data - this :> IAsyncEnumerator<_> - - | _ -> - if verbose then - printfn "GetAsyncEnumerator, cloning..." - - // We need to reset state, but only to the "initial machine", resetting the _machine to - // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, - // we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration - // returning an empty sequence. - // - // Solution: we shadow the initial machine, which we then re-assign here: - let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> - clone._machine <- clone._initialMachine - clone._machine.Data <- TaskSeqStateMachineData() - clone._machine.Data.cancellationToken <- ct - clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - clone._machine.Data.boxedSelf <- clone + | null -> this.InitMachineData(ct, &this._machine) + | _ -> () - clone :> System.Collections.Generic.IAsyncEnumerator<'T> + member this.InitMachineData(ct, machine: 'Machine byref) = + let data = TaskSeqStateMachineData() + data.boxedSelf <- this + data.cancellationToken <- ct + data.builder <- AsyncIteratorMethodBuilder.Create() + machine.Data <- data member internal this.hijack() = @@ -265,13 +245,12 @@ and [] TaskSeq<'Machine, 'T interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = - + // if this is null, it means it's the first time for this Enumerable to create an Enumerator + // so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately. match this._machine.Data :> obj with - | null -> - //if - // (not data.taken - // && initialThreadId = Environment.CurrentManagedThreadId) - this.InitIterator ct + | null when initialThreadId = Environment.CurrentManagedThreadId -> + this.InitMachineData(ct, &this._machine) + this // just return 'self' here | _ -> if verbose then @@ -283,15 +262,11 @@ and [] TaskSeq<'Machine, 'T // returning an empty sequence. // // Solution: we shadow the initial machine, which we then re-assign here: + // let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> clone._machine <- clone._initialMachine - clone._machine.Data <- TaskSeqStateMachineData() - clone._machine.Data.cancellationToken <- ct - //clone._machine.Data.taken <- true - clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create() - clone._machine.Data.boxedSelf <- clone - - clone :> System.Collections.Generic.IAsyncEnumerator<'T> + clone.InitMachineData(ct, &clone._machine) + clone interface System.Collections.Generic.IAsyncEnumerator<'T> with member this.Current = @@ -505,7 +480,8 @@ type TaskSeqBuilder() = ts._machine <- sm ts :> IAsyncEnumerable<'T>)) else - failwith "no dynamic implementation as yet" + NotImplementedException "No dynamic implementation for TaskSeq yet." + |> raise // let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm)) // let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f -> // // TODO: add exception handling? @@ -760,11 +736,16 @@ type TaskSeqBuilder() = match other with | :? TaskSeq<'T> as other -> // in cases that this is null - other.InitIterator sm.Data.cancellationToken |> ignore + other.InitMachineDataForTailcalls(sm.Data.cancellationToken) + + // set 'self' to point to the 'other', and unset Current sm.Data.tailcallTarget <- Some other sm.Data.awaiter <- null sm.Data.current <- ValueNone + // For tailcalls we return 'false' and re-run from the entry (trampoline) false - | _ -> b.YieldFrom(other).Invoke(&sm)) + | _ -> + // other types of IAsyncEnumerable, just yield + b.YieldFrom(other).Invoke(&sm))