From 08fd2d0d25bd982122b37fa8f5d48d3e3f61db1d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 16 Mar 2024 07:55:26 +0000 Subject: [PATCH] refactor(Batching)!: Relax arbitrary arrays constraint --- src/Equinox.Core/Batching.fs | 16 ++++++++-------- tests/Equinox.Core.Tests/BatchingTests.fs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/Equinox.Core/Batching.fs b/src/Equinox.Core/Batching.fs index 73e0c6e10..30d8d0998 100644 --- a/src/Equinox.Core/Batching.fs +++ b/src/Equinox.Core/Batching.fs @@ -18,16 +18,16 @@ type internal AsyncBatch<'Req, 'Res>() = // sadly there's no way to detect without a try/catch try queue.TryAdd(item) with :? InvalidOperationException -> false - let mutable attempt = Unchecked.defaultof>> + let mutable attempt = Unchecked.defaultof>> /// Attempt to add a request to the flight /// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult) /// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes) - member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) = + member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) = if not (tryEnqueue req) then false else - // Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though) - let newInstance: Lazy> = lazy task { + // Prepare a new instance, with cancellation under our control (it won't start until the .Value triggers it though) + let newInstance: Lazy> = lazy task { do! Task.Delay(lingerMs, ct) match limiter with ValueNone -> () | ValueSome s -> do! s.WaitAsync(ct) try queue.CompleteAdding() @@ -45,12 +45,12 @@ type internal AsyncBatch<'Req, 'Res>() = /// Requests are added to pending batch during the wait period, which consists of two phases: /// 1. a defined linger period (min 1ms) /// 2. (optionally) a wait to acquire capacity on a limiter semaphore (e.g. one might have a limit on concurrent dispatches across a pool) -type Batcher<'Req, 'Res> private (tryInclude: Func, 'Req, CancellationToken, bool>) = +type Batcher<'Req, 'Res> private (tryInclude: Func, 'Req, CancellationToken, bool>) = let mutable cell = AsyncBatch<'Req, 'Res>() - new(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs, limiter) = + new(dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs, limiter) = if lingerMs < 1 then invalidArg (nameof(lingerMs)) "Minimum linger period is 1ms" // concurrent waiters need to add work to the batch across their threads Batcher(fun cell req ct -> cell.TryAdd(req, dispatch, lingerMs, limiter, ct = ct)) - new(dispatch: 'Req[] -> Async<'Res[]>, ?linger : TimeSpan, + new(dispatch: 'Req[] -> Async<'Res>, ?linger: TimeSpan, // Extends the linger phase to include a period during which we await capacity on an externally managed Semaphore // The Batcher doesn't care, but a typical use is to enable limiting the number of concurrent in-flight dispatches ?limiter) = @@ -99,7 +99,7 @@ type BatcherCache<'Id, 'Entry>(cache: Cache<'Entry>, toKey: Func<'Id, string>, c let mapKey = Func<'Id, string>(fun id -> "$Batcher-" + string id) BatcherCache(Cache cache, mapKey, createEntry, ?cacheWindow = cacheWindow) - member _.GetOrAdd(id : 'Id) : 'Entry = + member _.GetOrAdd(id: 'Id) : 'Entry = // Optimise for low allocations on happy path let key = toKey.Invoke(id) match cache.TryGet key with diff --git a/tests/Equinox.Core.Tests/BatchingTests.fs b/tests/Equinox.Core.Tests/BatchingTests.fs index d20844fda..893f7fbd5 100644 --- a/tests/Equinox.Core.Tests/BatchingTests.fs +++ b/tests/Equinox.Core.Tests/BatchingTests.fs @@ -12,7 +12,7 @@ open Xunit let ``Batcher correctness`` () = async { let mutable batches = 0 let mutable active = 0 - let dispatch (reqs : int[]) = async { + let dispatch (reqs: int[]) = async { let concurrency = Interlocked.Increment &active 1 =! concurrency Interlocked.Increment &batches |> ignore @@ -23,7 +23,7 @@ let ``Batcher correctness`` () = async { } let cell = Batcher(dispatch, linger = TimeSpan.FromMilliseconds 40) let! results = [1 .. 100] |> Seq.map cell.Execute |> Async.Parallel - test <@ set (Seq.collect id results) = set [1 .. 100] @> + test <@ set (Seq.concat results) = set [1 .. 100] @> // Linger of 40ms makes this tend strongly to only be 1 batch, but no guarantees test <@ 1 <= batches && batches < 3 @> } @@ -31,7 +31,7 @@ let ``Batcher correctness`` () = async { [] let ``Batcher error handling`` shouldFail = async { let fails = ConcurrentBag() // Could be a ResizeArray per spec, but this removes all doubt - let dispatch (reqs : int[]) = async { + let dispatch (reqs: int[]) = async { if shouldFail () then reqs |> Seq.iter fails.Add failwith $"failing %A{reqs}" @@ -43,7 +43,7 @@ let ``Batcher error handling`` shouldFail = async { let oks = results |> Array.choose (function Choice1Of2 r -> Some r | _ -> None) // Note extraneous exceptions we encounter (e.g. if we remove the catch in TryAdd) let cancels = results |> Array.choose (function Choice2Of2 e when not (e.Message.Contains "failing") -> Some e | _ -> None) - let inputs, outputs = set input, set (Seq.collect id oks |> Seq.append fails) + let inputs, outputs = set input, set (Seq.concat oks |> Seq.append fails) test <@ inputs.Count = outputs.Count && Array.isEmpty cancels && inputs = outputs @>