Skip to content

Commit

Permalink
Improve GetAsyncEnumerator, refactor and split logic, ensure ResultFr…
Browse files Browse the repository at this point in the history
…om remains stable
  • Loading branch information
abelbraaksma committed Oct 29, 2022
1 parent 5aa168b commit 43f311d
Showing 1 changed file with 29 additions and 48 deletions.
77 changes: 29 additions & 48 deletions src/FSharpy.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =

abstract TailcallTarget: TaskSeq<'T> option
abstract MoveNextAsyncResult: unit -> ValueTask<bool>
abstract InitIterator: CancellationToken -> IAsyncEnumerator<'T>

/// Initializes the machine data on 'self'
abstract InitMachineDataForTailcalls: ct: CancellationToken -> unit

interface IAsyncEnumerator<'T> with
member _.Current = raiseNotImpl ()
Expand Down Expand Up @@ -141,39 +143,17 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
[<DefaultValue(false)>]
val mutable _machine: 'Machine

override this.InitIterator(ct) =
// if this is null, it means it's the first time for this Enumerable to create an Enumerator
// so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately.
override this.InitMachineDataForTailcalls(ct) =
match this._machine.Data :> obj with
| null ->
//if
// (not data.taken
// && initialThreadId = Environment.CurrentManagedThreadId)
let data = TaskSeqStateMachineData()
data.boxedSelf <- this
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
this._machine.Data <- data
this :> IAsyncEnumerator<_>

| _ ->
if verbose then
printfn "GetAsyncEnumerator, cloning..."

// We need to reset state, but only to the "initial machine", resetting the _machine to
// Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However,
// we do need a zeroed ResumptionPoint, otherwise we would continue after the last iteration
// returning an empty sequence.
//
// Solution: we shadow the initial machine, which we then re-assign here:
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
clone._machine <- clone._initialMachine
clone._machine.Data <- TaskSeqStateMachineData()
clone._machine.Data.cancellationToken <- ct
clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create()
clone._machine.Data.boxedSelf <- clone
| null -> this.InitMachineData(ct, &this._machine)
| _ -> ()

clone :> System.Collections.Generic.IAsyncEnumerator<'T>
member this.InitMachineData(ct, machine: 'Machine byref) =
let data = TaskSeqStateMachineData()
data.boxedSelf <- this
data.cancellationToken <- ct
data.builder <- AsyncIteratorMethodBuilder.Create()
machine.Data <- data


member internal this.hijack() =
Expand Down Expand Up @@ -265,13 +245,12 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T

interface IAsyncEnumerable<'T> with
member this.GetAsyncEnumerator(ct) =

// if this is null, it means it's the first time for this Enumerable to create an Enumerator
// so, to prevent extra allocations, we just return 'self', with the iterator vars set appropriately.
match this._machine.Data :> obj with
| null ->
//if
// (not data.taken
// && initialThreadId = Environment.CurrentManagedThreadId)
this.InitIterator ct
| null when initialThreadId = Environment.CurrentManagedThreadId ->
this.InitMachineData(ct, &this._machine)
this // just return 'self' here

| _ ->
if verbose then
Expand All @@ -283,15 +262,11 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
// returning an empty sequence.
//
// Solution: we shadow the initial machine, which we then re-assign here:
//
let clone = this.MemberwiseClone() :?> TaskSeq<'Machine, 'T>
clone._machine <- clone._initialMachine
clone._machine.Data <- TaskSeqStateMachineData()
clone._machine.Data.cancellationToken <- ct
//clone._machine.Data.taken <- true
clone._machine.Data.builder <- AsyncIteratorMethodBuilder.Create()
clone._machine.Data.boxedSelf <- clone

clone :> System.Collections.Generic.IAsyncEnumerator<'T>
clone.InitMachineData(ct, &clone._machine)
clone

interface System.Collections.Generic.IAsyncEnumerator<'T> with
member this.Current =
Expand Down Expand Up @@ -505,7 +480,8 @@ type TaskSeqBuilder() =
ts._machine <- sm
ts :> IAsyncEnumerable<'T>))
else
failwith "no dynamic implementation as yet"
NotImplementedException "No dynamic implementation for TaskSeq yet."
|> raise
// let initialResumptionFunc = TaskSeqResumptionFunc<'T>(fun sm -> code.Invoke(&sm))
// let resumptionFuncExecutor = TaskSeqResumptionExecutor<'T>(fun sm f ->
// // TODO: add exception handling?
Expand Down Expand Up @@ -760,11 +736,16 @@ type TaskSeqBuilder() =
match other with
| :? TaskSeq<'T> as other ->
// in cases that this is null
other.InitIterator sm.Data.cancellationToken |> ignore
other.InitMachineDataForTailcalls(sm.Data.cancellationToken)

// set 'self' to point to the 'other', and unset Current
sm.Data.tailcallTarget <- Some other
sm.Data.awaiter <- null
sm.Data.current <- ValueNone

// For tailcalls we return 'false' and re-run from the entry (trampoline)
false

| _ -> b.YieldFrom(other).Invoke(&sm))
| _ ->
// other types of IAsyncEnumerable, just yield
b.YieldFrom(other).Invoke(&sm))

0 comments on commit 43f311d

Please sign in to comment.