From 276494b7f7560f6ba3d235da3d563dfb2eaa4e2a Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 24 Oct 2022 03:41:21 +0200 Subject: [PATCH] Cherry-pick the verbose-logging statements and non-rule-changing refactorings from #42 for smaller diffing --- src/FSharpy.TaskSeq/TaskSeqBuilder.fs | 127 ++++++++++++++++++++------ 1 file changed, 98 insertions(+), 29 deletions(-) diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 250f0ab7..7b835749 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -17,7 +17,8 @@ open FSharp.Core.CompilerServices.StateMachineHelpers module Internal = // cannot be marked with 'internal' scope let verbose = false - let inline MoveNext (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() + /// Call MoveNext on an IAsyncStateMachine by reference + let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() // F# requires that we implement interfaces even on an abstract class let inline raiseNotImpl () = @@ -63,17 +64,15 @@ type TaskSeqStateMachineData<'T>() = [] val mutable tailcallTarget: TaskSeq<'T> option - member data.PushDispose(f: unit -> Task) = - match data.disposalStack with - | null -> data.disposalStack <- ResizeArray() - | _ -> () + member data.PushDispose(disposer: unit -> Task) = + if isNull data.disposalStack then + data.disposalStack <- ResizeArray() - data.disposalStack.Add(f) + data.disposalStack.Add disposer member data.PopDispose() = - match data.disposalStack with - | null -> () - | _ -> data.disposalStack.RemoveAt(data.disposalStack.Count - 1) + if not (isNull data.disposalStack) then + data.disposalStack.RemoveAt(data.disposalStack.Count - 1) and [] TaskSeq<'T>() = @@ -117,6 +116,8 @@ and [] TaskSeq<'Machine, 'T match res with | Some tg -> + // we get here only when there are multiple returns (it seems) + // hence the tailcall logic match tg.TailcallTarget with | None -> res | (Some tg2 as res2) -> @@ -153,21 +154,46 @@ and [] TaskSeq<'Machine, 'T member this.GetResult(token: int16) = match this.hijack () with - | Some tg -> (tg :> IValueTaskSource).GetResult(token) - | None -> this.Machine.Data.promiseOfValueOrEnd.GetResult(token) - + | Some tg -> + if verbose then + printfn + "Getting result for token on 'Some' branch, status: %A" + ((tg :> IValueTaskSource).GetStatus(token)) + (tg :> IValueTaskSource).GetResult(token) + | None -> + try + if verbose then + printfn + "Getting result for token on 'None' branch, status: %A" + (this.Machine.Data.promiseOfValueOrEnd.GetStatus(token)) + this.Machine.Data.promiseOfValueOrEnd.GetResult(token) + with e -> + // FYI: an exception here is usually caused by the CE statement (user code) throwing an exception + // We're just logging here because the following error would also be caught right here: + // "An attempt was made to transition a task to a final state when it had already completed." + if verbose then + printfn "Error '%s' for token: %i" e.Message token + + reraise () member this.OnCompleted(continuation, state, token, flags) = match this.hijack () with | Some tg -> (tg :> IValueTaskSource).OnCompleted(continuation, state, token, flags) | None -> this.Machine.Data.promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags) interface IAsyncStateMachine with + /// The MoveNext method is called by builder.MoveNext() in the resumable code member this.MoveNext() = match this.hijack () with - | Some tg -> (tg :> IAsyncStateMachine).MoveNext() - | None -> MoveNext(&this.Machine) + | Some tg -> + // jump to the hijacked method + (tg :> IAsyncStateMachine).MoveNext() + | None -> moveNextRef &this.Machine - member _.SetStateMachine(_state) = () // not needed for reference type + /// SetStatemachine is (currently) never called + member _.SetStateMachine(_state) = + if verbose then + printfn "Setting state machine -- ignored" + () // not needed for reference type interface IAsyncEnumerable<'T> with member this.GetAsyncEnumerator(ct) = @@ -180,11 +206,17 @@ and [] TaskSeq<'Machine, 'T data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then + printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint (this :> IAsyncEnumerator<_>) else if verbose then printfn "GetAsyncEnumerator, cloning..." + // it appears that the issue is possibly caused by the problem + // of having ValueTask all over the place, and by going over the + // iteration twice, we are trying to *await* twice, which is not allowed + // see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4 let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> data.taken <- true clone.Machine.Data.cancellationToken <- ct @@ -234,13 +266,21 @@ and [] TaskSeq<'Machine, 'T printfn "MoveNextAsync..." if this.Machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable + if verbose then + printfn "at MoveNextAsync: Resumption point = -1" ValueTask() else + if verbose then + printfn "at MoveNextAsync: normal resumption scenario" let data = this.Machine.Data data.promiseOfValueOrEnd.Reset() let mutable ts = this + if verbose then + printfn "at MoveNextAsync: start calling builder.MoveNext()" data.builder.MoveNext(&ts) + if verbose then + printfn "at MoveNextAsync: done calling builder.MoveNext()" // If the move did a hijack then get the result from the final one match this.hijack () with @@ -253,14 +293,25 @@ and [] TaskSeq<'Machine, 'T let version = data.promiseOfValueOrEnd.Version let status = data.promiseOfValueOrEnd.GetStatus(version) - if status = ValueTaskSourceStatus.Succeeded then + match status with + | ValueTaskSourceStatus.Succeeded -> + if verbose then + printfn "at MoveNextAsyncResult: case succeeded..." let result = data.promiseOfValueOrEnd.GetResult(version) ValueTask(result) - else + + | ValueTaskSourceStatus.Faulted + | ValueTaskSourceStatus.Canceled + | ValueTaskSourceStatus.Pending -> if verbose then - printfn "MoveNextAsync pending/faulted/cancelled..." + printfn "at MoveNextAsyncResult: case pending/faulted/cancelled..." ValueTask(this, version) // uses IValueTaskSource<'T> + | _ -> + if verbose then + printfn "at MoveNextAsyncResult: Unexpected state" + // assume it's a possibly new, not yet supported case, treat as default + ValueTask(this, version) // uses IValueTaskSource<'T> override cr.TailcallTarget = cr.hijack () @@ -282,43 +333,57 @@ type TaskSeqBuilder() = //-- RESUMABLE CODE START __resumeAt sm.ResumptionPoint + if verbose then + printfn "Resuming at resumption point %i" sm.ResumptionPoint try - //printfn "at Run.MoveNext start" - //Console.WriteLine("[{0}] resuming by invoking {1}....", sm.MethodBuilder.Task.Id, hashq sm.ResumptionFunc ) + if verbose then + printfn "at Run.MoveNext start" + let __stack_code_fin = code.Invoke(&sm) - //printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" + if verbose then + printfn $"at Run.MoveNext, __stack_code_fin={__stack_code_fin}" if __stack_code_fin then - //printfn $"at Run.MoveNext, done" + if verbose then + printfn $"at Run.MoveNext, done" sm.Data.promiseOfValueOrEnd.SetResult(false) sm.Data.builder.Complete() elif sm.Data.current.IsSome then - //printfn $"at Run.MoveNext, yield" + if verbose then + printfn $"at Run.MoveNext, yield" sm.Data.promiseOfValueOrEnd.SetResult(true) else // Goto request match sm.Data.tailcallTarget with | Some tg -> - //printfn $"at Run.MoveNext, hijack" + if verbose then + printfn $"at Run.MoveNext, hijack" let mutable tg = tg - MoveNext(&tg) + moveNextRef &tg | None -> - //printfn $"at Run.MoveNext, await" + if verbose then + printfn $"at Run.MoveNext, await" let boxed = sm.Data.boxed sm.Data.awaiter.UnsafeOnCompleted( Action(fun () -> let mutable boxed = boxed - MoveNext(&boxed)) + moveNextRef &boxed) ) with exn -> - //Console.WriteLine("[{0}] SetException {1}", sm.MethodBuilder.Task.Id, exn) + if verbose then + printfn "Setting exception of PromiseOfValueOrEnd to: %s" exn.Message sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() //-- RESUMABLE CODE END )) - (SetStateMachineMethodImpl<_>(fun sm state -> ())) + (SetStateMachineMethodImpl<_>(fun sm state -> + if verbose then + printfn "at SetStatemachingMethodImpl, ignored" + ())) (AfterCode<_, _>(fun sm -> + if verbose then + printfn "at AfterCode<_, _>, setting the Machine field to the StateMachine" let ts = TaskSeq, 'T>() ts.Machine <- sm ts.Machine.Data <- TaskSeqStateMachineData() @@ -355,9 +420,13 @@ type TaskSeqBuilder() = let __stack_vtask = condition () if __stack_vtask.IsCompleted then + if verbose then + printfn "Returning completed task (in while)" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else + if verbose then + printfn "Awaiting non-completed task (in while)" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() // This will yield with __stack_fin = false