Skip to content

Commit

Permalink
Remove 'taken' bool, add InitIterator method, remove outer AsyncEnume…
Browse files Browse the repository at this point in the history
…rator from implementing IAsyncDisposable, which is shouldn't
  • Loading branch information
abelbraaksma committed Oct 29, 2022
1 parent dc96a17 commit 5aa168b
Showing 1 changed file with 116 additions and 80 deletions.
196 changes: 116 additions & 80 deletions src/FSharpy.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable cancellationToken: CancellationToken

/// Keeps track of the objects that need to be disposed off on IAsyncDispose.
[<DefaultValue(false)>]
val mutable disposalStack: ResizeArray<(unit -> Task)>

Expand All @@ -67,21 +68,24 @@ type TaskSeqStateMachineData<'T>() =
[<DefaultValue(false)>]
val mutable promiseOfValueOrEnd: ManualResetValueTaskSourceCore<bool>

/// Helper struct providing methods for awaiting 'next' in async iteration scenarios.
[<DefaultValue(false)>]
val mutable builder: AsyncIteratorMethodBuilder

[<DefaultValue(false)>]
val mutable taken: bool

/// Whether or not a full iteration through the IAsyncEnumerator has completed
[<DefaultValue(false)>]
val mutable completed: bool

/// Used by the AsyncEnumerator interface to return the Current value when
/// IAsyncEnumerator.Current is called
[<DefaultValue(false)>]
val mutable current: ValueOption<'T>

/// A reference to 'self', because otherwise we can't use byref in the resumable code.
[<DefaultValue(false)>]
val mutable boxed: TaskSeq<'T>
// For tailcalls using 'return!'
val mutable boxedSelf: TaskSeq<'T>

/// If set, used for tailcalls using 'return!', contains the target.
[<DefaultValue(false)>]
val mutable tailcallTarget: TaskSeq<'T> option

Expand All @@ -99,12 +103,11 @@ and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =

abstract TailcallTarget: TaskSeq<'T> option
abstract MoveNextAsyncResult: unit -> ValueTask<bool>
abstract InitIterator: CancellationToken -> IAsyncEnumerator<'T>

interface IAsyncEnumerator<'T> with
member _.Current = raiseNotImpl ()
member _.MoveNextAsync() = raiseNotImpl ()

interface IAsyncDisposable with
member _.DisposeAsync() = raiseNotImpl ()

interface IAsyncEnumerable<'T> with
Expand All @@ -129,22 +132,62 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
inherit TaskSeq<'T>()
let initialThreadId = Environment.CurrentManagedThreadId

/// Shadows the initial machine, just after it is initialized by the F# compiler-generated state.
/// Used on GetAsyncEnumerator, to ensure a clean state, and a ResumptionPoint of 0.
[<DefaultValue(false)>]
val mutable _initialMachine: 'Machine

/// Keeps the active state machine.
[<DefaultValue(false)>]
val mutable _machine: 'Machine

override this.InitIterator(ct) =
// if this is null, it means it's the first time for this Enumerable to create an Enumerator
// so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately.
match this._machine.Data :> obj with
| null ->
//if
// (not data.taken
// && initialThreadId = Environment.CurrentManagedThreadId)
let data = TaskSeqStateMachineData()
data.boxedSelf <- this
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
this._machine.Data <- data
this :> IAsyncEnumerator<_>

| _ ->
if verbose then
printfn "GetAsyncEnumerator, cloning..."

// We need to reset state, but only to the "initial machine", resetting the _machine to
// Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However,
// we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration
// returning an empty sequence.
//
// Solution: we shadow the initial machine, which we then re-assign here:
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
clone._machine <- clone._initialMachine
clone._machine.Data <- TaskSeqStateMachineData()
clone._machine.Data.cancellationToken <- ct
clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create()
clone._machine.Data.boxedSelf <- clone

clone :> System.Collections.Generic.IAsyncEnumerator<'T>


member internal this.hijack() =
let res = this._machine.Data.tailcallTarget

match res with
| Some tg ->
// we get here only when there are multiple returns (it seems)
// hence the tailcall logic
// We get here only when there are multiple ReturnFroms
// which allows us to do tailcalls.

// This recurses itself, e.g. tg.TailcallTarget calls this.hijack().
match tg.TailcallTarget with
| None -> res
| (Some tg2 as res2) ->
| Some tg2 as res2 ->
// Cut out chains of tailcalls
this._machine.Data.tailcallTarget <- Some tg2
res2
Expand Down Expand Up @@ -211,89 +254,53 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
/// The MoveNext method is called by builder.MoveNext() in the resumable code
member this.MoveNext() =
match this.hijack () with
| None -> moveNextRef &this._machine
| Some tg ->
// jump to the hijacked method
(tg :> IAsyncStateMachine).MoveNext()
| None -> moveNextRef &this._machine

/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) =
if verbose then
printfn "Setting state machine -- ignored"

() // not needed for reference type
/// SetStatemachine is (currently) never called
member _.SetStateMachine(_state) = () // not needed for reference type

interface IAsyncEnumerable<'T> with
member this.GetAsyncEnumerator(ct) =
let data = this._machine.Data

if
(not data.taken
&& initialThreadId = Environment.CurrentManagedThreadId)
then
let data = this._machine.Data
data.taken <- true
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()

this :> IAsyncEnumerator<_>
else

match this._machine.Data :> obj with
| null ->
//if
// (not data.taken
// && initialThreadId = Environment.CurrentManagedThreadId)
this.InitIterator ct

| _ ->
if verbose then
printfn "GetAsyncEnumerator, cloning..."

// it appears that the issue is possibly caused by the problem
// of having ValueTask all over the place, and by going over the
// iteration twice, we are trying to *await* twice, which is not allowed
// see, for instance: https://itnext.io/why-can-a-valuetask-only-be-awaited-once-31169b324fa4
// We need to reset state, but only to the "initial machine", resetting the _machine to
// Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However,
// we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration
// returning an empty sequence.
//
// Solution: we shadow the initial machine, which we then re-assign here:
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
clone._machine <- clone._initialMachine
clone._machine.Data <- TaskSeqStateMachineData()
clone._machine.Data.cancellationToken <- ct
clone._machine.Data.taken <- true
//clone._machine.Data.taken <- true
clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create()

//// calling reset causes NRE in IValueTaskSource.GetResult above
//clone.Machine.Data.promiseOfValueOrEnd.Reset()
clone._machine.Data.boxed <- clone
////clone.Machine.Data.disposalStack <- null // reference type, would otherwise still reference original stack
//////clone.Machine.Data.tailcallTarget <- Some clone // this will lead to an SO exception
//clone.Machine.Data.awaiter <- null
//clone.Machine.Data.current <- ValueNone
//clone.Machine.Data.completed <- false
clone._machine.Data.boxedSelf <- clone

clone :> System.Collections.Generic.IAsyncEnumerator<'T>

interface IAsyncDisposable with
member this.DisposeAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncDisposable).DisposeAsync()
| None ->
if verbose then
printfn "DisposeAsync..."

task {
match this._machine.Data.disposalStack with
| null -> ()
| _ ->
let mutable exn = None

for d in Seq.rev this._machine.Data.disposalStack do
try
do! d ()
with e ->
if exn.IsNone then
exn <- Some e

match exn with
| None -> ()
| Some e -> raise e
}
|> ValueTask

interface System.Collections.Generic.IAsyncEnumerator<'T> with
member this.Current =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).Current
| Some tg ->
// recurse, but not really: we jump to a different instance of our taskSeq
// in case there's a tail call target.
(tg :> IAsyncEnumerator<'T>).Current

| None ->
match this._machine.Data.current with
| ValueSome x -> x
Expand All @@ -306,7 +313,11 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T

member this.MoveNextAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncEnumerator<'T>).MoveNextAsync()
| Some tg ->
// recurse, but not really: we jump to a different instance of our taskSeq
// in case there's a tail call target.
(tg :> IAsyncEnumerator<'T>).MoveNextAsync()

| None ->
if verbose then
printfn "MoveNextAsync..."
Expand Down Expand Up @@ -346,6 +357,33 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
| Some tg -> tg.MoveNextAsyncResult()
| None -> this.MoveNextAsyncResult()

/// Disposes of the IAsyncEnumerator (*not* the IAsyncEnumerable!!!)
member this.DisposeAsync() =
match this.hijack () with
| Some tg -> (tg :> IAsyncDisposable).DisposeAsync()
| None ->
if verbose then
printfn "DisposeAsync..."

task {
match this._machine.Data.disposalStack with
| null -> ()
| _ ->
let mutable exn = None

for d in Seq.rev this._machine.Data.disposalStack do
try
do! d ()
with e ->
if exn.IsNone then
exn <- Some e

match exn with
| None -> ()
| Some e -> raise e
}
|> ValueTask


override this.MoveNextAsyncResult() =
let data = this._machine.Data
Expand Down Expand Up @@ -439,7 +477,9 @@ type TaskSeqBuilder() =
if verbose then
printfn $"at Run.MoveNext, await"

let boxed = sm.Data.boxed
// don't capture the full object in the next closure (won't work because: byref)
// but only a reference to itself.
let boxed = sm.Data.boxedSelf

sm.Data.awaiter.UnsafeOnCompleted(
Action(fun () ->
Expand All @@ -455,20 +495,14 @@ type TaskSeqBuilder() =
sm.Data.builder.Complete()
//-- RESUMABLE CODE END
))
(SetStateMachineMethodImpl<_>(fun sm state ->
if verbose then
printfn "at SetStatemachingMethodImpl, ignored"

()))
(SetStateMachineMethodImpl<_>(fun sm state -> ())) // not used in reference impl
(AfterCode<_, _>(fun sm ->
if verbose then
printfn "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info"

let ts = TaskSeq<TaskSeqStateMachine<'T>, 'T>()
ts._initialMachine <- sm
ts._machine <- sm
ts._machine.Data <- TaskSeqStateMachineData()
ts._machine.Data.boxed <- ts
ts :> IAsyncEnumerable<'T>))
else
failwith "no dynamic implementation as yet"
Expand Down Expand Up @@ -725,6 +759,8 @@ type TaskSeqBuilder() =
TaskSeqCode<_>(fun sm ->
match other with
| :? TaskSeq<'T> as other ->
// in cases that this is null
other.InitIterator sm.Data.cancellationToken |> ignore
sm.Data.tailcallTarget <- Some other
sm.Data.awaiter <- null
sm.Data.current <- ValueNone
Expand Down

0 comments on commit 5aa168b

Please sign in to comment.