diff --git a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs index 9d7deedc..c7389115 100644 --- a/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs +++ b/src/FSharpy.TaskSeq.Test/TaskSeq.StateTransitionBug.Tests.CE.fs @@ -99,6 +99,37 @@ let ``CE empty taskSeq, call Current after MoveNextAsync returns false`` variant enumerator.Current |> should equal 0 // we return Unchecked.defaultof, which is Zero in the case of an integer } +[] +let ``CE taskSeq, proper two-item task sequence`` () = task { + let tskSeq = taskSeq { + yield "foo" + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + +[] +let ``CE taskSeq, proper two-item task sequence -- async variant`` () = task { + let tskSeq = taskSeq { + yield "foo" + do! delayRandom () + yield "bar" + } + + let enum = tskSeq.GetAsyncEnumerator() + do! moveNextAndCheck true enum // first item + enum.Current |> should equal "foo" + do! moveNextAndCheck true enum // second item + enum.Current |> should equal "bar" + do! moveNextAndCheck false enum // third item: false +} + [] let ``CE taskSeq, call Current before MoveNextAsync`` () = task { let tskSeq = taskSeq { diff --git a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs index 12fa049e..0b25d201 100644 --- a/src/FSharpy.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharpy.TaskSeq/TaskSeqBuilder.fs @@ -54,6 +54,26 @@ type IPriority2 = [] type TaskSeqStateMachineData<'T>() = + member this.LogDump() = + printfn " CancellationToken: %A" this.cancellationToken + + printfn + " Disposal stack count: %A" + (if isNull this.disposalStack then + 0 + else + this.disposalStack.Count) + + printfn " Awaiter: %A" this.awaiter + + printfn " Promise status: %A" + <| this.promiseOfValueOrEnd.GetStatus(this.promiseOfValueOrEnd.Version) + + printfn " Builder hash: %A" <| this.builder.GetHashCode() + printfn " Taken: %A" this.taken + printfn " Completed: %A" this.completed + printfn " Current: %A" this.current + [] val mutable cancellationToken: CancellationToken @@ -227,18 +247,28 @@ and [] TaskSeq<'Machine, 'T (not data.taken && initialThreadId = Environment.CurrentManagedThreadId) then + //let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T> + let data = this.Machine.Data data.taken <- true data.cancellationToken <- ct data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then + printfn "All data (no clone):" + data.LogDump() + if verbose then printfn "No cloning, resumption point: %i" this.Machine.ResumptionPoint - (this :> IAsyncEnumerator<_>) + this :> IAsyncEnumerator<_> else if verbose then printfn "GetAsyncEnumerator, cloning..." + if verbose then + printfn "All data before clone:" + data.LogDump() + // it appears that the issue is possibly caused by the problem // of having ValueTask all over the place, and by going over the // iteration twice, we are trying to *await* twice, which is not allowed @@ -259,6 +289,7 @@ and [] TaskSeq<'Machine, 'T // let b = e2.Current // let isTrue = a = b // true with this, false without it clone.Machine <- Unchecked.defaultof<_> + //clone.Machine.ResumptionPoint <- 0 // the following lines just re-initialize the key data fields State. clone.Machine.Data <- TaskSeqStateMachineData() @@ -266,6 +297,11 @@ and [] TaskSeq<'Machine, 'T clone.Machine.Data.taken <- true clone.Machine.Data.builder <- AsyncIteratorMethodBuilder.Create() + if verbose then + printfn "All data after clone:" + clone.Machine.Data.LogDump() + + //// calling reset causes NRE in IValueTaskSource.GetResult above //clone.Machine.Data.promiseOfValueOrEnd.Reset() //clone.Machine.Data.boxed <- clone @@ -281,7 +317,7 @@ and [] TaskSeq<'Machine, 'T this.Machine.ResumptionPoint clone.Machine.ResumptionPoint - (clone :> System.Collections.Generic.IAsyncEnumerator<'T>) + clone :> System.Collections.Generic.IAsyncEnumerator<'T> interface IAsyncDisposable with member this.DisposeAsync() = @@ -501,9 +537,16 @@ type TaskSeqBuilder() = //sm.Start() - member inline _.Zero() : TaskSeqCode<'T> = ResumableCode.Zero() + member inline _.Zero() : TaskSeqCode<'T> = + if verbose then + printfn "at Zero()" + + ResumableCode.Zero() member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at Combine(.., ..)" + ResumableCode.Combine(task1, task2) member inline _.WhileAsync @@ -521,13 +564,13 @@ type TaskSeqBuilder() = if __stack_vtask.IsCompleted then if verbose then - printfn "Returning completed task (in while)" + printfn "at WhileAsync: returning completed task" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else if verbose then - printfn "Awaiting non-completed task (in while)" + printfn "at WhileAsync: awaiting non-completed task" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() @@ -536,6 +579,11 @@ type TaskSeqBuilder() = let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) __stack_condition_fin <- __stack_yield_fin + if verbose then + printfn + "at WhileAsync: after Yield().Invoke(sm), __stack_condition_fin=%b" + __stack_condition_fin + if __stack_condition_fin then condition_res <- task.Result else @@ -550,6 +598,9 @@ type TaskSeqBuilder() = ) member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = + if verbose then + printfn "at While(...), calling WhileAsync()" + b.WhileAsync((fun () -> ValueTask(condition ())), body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = @@ -643,7 +694,14 @@ type TaskSeqBuilder() = TaskSeqCode<'T>(fun sm -> // This will yield with __stack_fin = false // This will resume with __stack_fin = true + if verbose then + printfn "at Yield, before Yield().Invoke(sm)" + let __stack_fin = ResumableCode.Yield().Invoke(&sm) + + if verbose then + printfn "at Yield, __stack_fin = %b" __stack_fin + sm.Data.current <- ValueSome v sm.Data.awaiter <- null __stack_fin) @@ -658,18 +716,31 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at Bind" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) __stack_fin <- __stack_fin2 + if verbose then + printfn "at Bind: with __stack_fin = %b" __stack_fin + if __stack_fin then + if verbose then + printfn "at Bind: with getting result from awaiter" + let result = awaiter.GetResult() + + if verbose then + printfn "at Bind: calling continuation" + (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at Bind: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -680,6 +751,9 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true + if verbose then + printfn "at BindV" + if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false // This will resume with __stack_fin2 = true @@ -691,7 +765,7 @@ type TaskSeqBuilder() = (continuation result).Invoke(&sm) else if verbose then - printfn "calling AwaitUnsafeOnCompleted" + printfn "at BindV: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone