Skip to content

Commit

Permalink
refactor(Batching)!: Relax arbitrary arrays constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 16, 2024
1 parent 856c655 commit 08fd2d0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
16 changes: 8 additions & 8 deletions src/Equinox.Core/Batching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ type internal AsyncBatch<'Req, 'Res>() =
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res[]>>>
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res>>>

/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) =
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) =
if not (tryEnqueue req) then false else

// Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though)
let newInstance: Lazy<Task<'Res[]>> = lazy task {
// Prepare a new instance, with cancellation under our control (it won't start until the .Value triggers it though)
let newInstance: Lazy<Task<'Res>> = lazy task {
do! Task.Delay(lingerMs, ct)
match limiter with ValueNone -> () | ValueSome s -> do! s.WaitAsync(ct)
try queue.CompleteAdding()
Expand All @@ -45,12 +45,12 @@ type internal AsyncBatch<'Req, 'Res>() =
/// Requests are added to pending batch during the wait period, which consists of two phases:
/// 1. a defined linger period (min 1ms)
/// 2. (optionally) a wait to acquire capacity on a limiter semaphore (e.g. one might have a limit on concurrent dispatches across a pool)
type Batcher<'Req, 'Res> private (tryInclude: Func<AsyncBatch<_, _>, 'Req, CancellationToken, bool>) =
type Batcher<'Req, 'Res> private (tryInclude: Func<AsyncBatch<'Req, 'Res>, 'Req, CancellationToken, bool>) =
let mutable cell = AsyncBatch<'Req, 'Res>()
new(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs, limiter) =
new(dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs, limiter) =
if lingerMs < 1 then invalidArg (nameof(lingerMs)) "Minimum linger period is 1ms" // concurrent waiters need to add work to the batch across their threads
Batcher(fun cell req ct -> cell.TryAdd(req, dispatch, lingerMs, limiter, ct = ct))
new(dispatch: 'Req[] -> Async<'Res[]>, ?linger : TimeSpan,
new(dispatch: 'Req[] -> Async<'Res>, ?linger: TimeSpan,
// Extends the linger phase to include a period during which we await capacity on an externally managed Semaphore
// The Batcher doesn't care, but a typical use is to enable limiting the number of concurrent in-flight dispatches
?limiter) =
Expand Down Expand Up @@ -99,7 +99,7 @@ type BatcherCache<'Id, 'Entry>(cache: Cache<'Entry>, toKey: Func<'Id, string>, c
let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id)
BatcherCache(Cache cache, mapKey, createEntry, ?cacheWindow = cacheWindow)

member _.GetOrAdd(id : 'Id) : 'Entry =
member _.GetOrAdd(id: 'Id) : 'Entry =
// Optimise for low allocations on happy path
let key = toKey.Invoke(id)
match cache.TryGet key with
Expand Down
8 changes: 4 additions & 4 deletions tests/Equinox.Core.Tests/BatchingTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ open Xunit
let ``Batcher correctness`` () = async {
let mutable batches = 0
let mutable active = 0
let dispatch (reqs : int[]) = async {
let dispatch (reqs: int[]) = async {
let concurrency = Interlocked.Increment &active
1 =! concurrency
Interlocked.Increment &batches |> ignore
Expand All @@ -23,15 +23,15 @@ let ``Batcher correctness`` () = async {
}
let cell = Batcher(dispatch, linger = TimeSpan.FromMilliseconds 40)
let! results = [1 .. 100] |> Seq.map cell.Execute |> Async.Parallel
test <@ set (Seq.collect id results) = set [1 .. 100] @>
test <@ set (Seq.concat results) = set [1 .. 100] @>
// Linger of 40ms makes this tend strongly to only be 1 batch, but no guarantees
test <@ 1 <= batches && batches < 3 @>
}

[<Property>]
let ``Batcher error handling`` shouldFail = async {
let fails = ConcurrentBag() // Could be a ResizeArray per spec, but this removes all doubt
let dispatch (reqs : int[]) = async {
let dispatch (reqs: int[]) = async {
if shouldFail () then
reqs |> Seq.iter fails.Add
failwith $"failing %A{reqs}"
Expand All @@ -43,7 +43,7 @@ let ``Batcher error handling`` shouldFail = async {
let oks = results |> Array.choose (function Choice1Of2 r -> Some r | _ -> None)
// Note extraneous exceptions we encounter (e.g. if we remove the catch in TryAdd)
let cancels = results |> Array.choose (function Choice2Of2 e when not (e.Message.Contains "failing") -> Some e | _ -> None)
let inputs, outputs = set input, set (Seq.collect id oks |> Seq.append fails)
let inputs, outputs = set input, set (Seq.concat oks |> Seq.append fails)
test <@ inputs.Count = outputs.Count
&& Array.isEmpty cancels
&& inputs = outputs @>
Expand Down

0 comments on commit 08fd2d0

Please sign in to comment.