diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 9068c6e09..84c4cdd2c 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -452,7 +452,7 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte let pk = Batch.tableKeyForStreamTip stream let! item = context.UpdateItemAsync(pk, updateExpr, ?precondition = precondition) |> Async.startImmediateAsTask ct return item |> Batch.ofSchema, rm.Consumed } - member _.QueryBatches(stream, consistentRead, minN, maxI, backwards, batchSize, ct) : IAsyncEnumerable = + member _.QueryBatches(stream, consistentRead, minN, maxI, backwards, batchSize, ct) : IAsyncEnumerable = taskSeq { let compile = (createContext ignore).Template.PrecomputeConditionalExpr let kc = match maxI with | Some maxI -> compile <@ fun (b : Batch.Schema) -> b.p = stream && b.i < maxI @> @@ -460,20 +460,21 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte let fc = match minN with | Some minN -> compile <@ fun (b : Batch.Schema) -> b.n > minN @> |> Some | None -> None - let rec aux (i, le) = taskSeq { + let mutable index, lastEvaluated, more = 0, None, true + while more do // TOCONSIDER could avoid projecting `p` let rm = Metrics() let context = createContext rm.Add - let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = le, + let! t, res = context.QueryPaginatedAsync(kc, ?filterCondition = fc, limit = batchSize, ?exclusiveStartKey = lastEvaluated, scanIndexForward = not backwards, consistentRead = consistentRead) |> Stopwatch.timeAsync ct - yield i, t, Array.map Batch.ofSchema res.Records, rm.Consumed - match res.LastEvaluatedKey with - | None -> () - | le -> yield! aux (i + 1, le) } - aux (0, None) - member internal _.QueryIAndNOrderByNAscending(stream, maxItems, ct) : IAsyncEnumerable = - let rec aux (index, lastEvaluated) = taskSeq { + yield index, t, Array.map Batch.ofSchema res.Records, rm.Consumed + lastEvaluated <- res.LastEvaluatedKey + more <- Option.isSome lastEvaluated + index <- index + 1 } + member internal _.QueryIAndNOrderByNAscending(stream, maxItems, ct) : IAsyncEnumerable = taskSeq { + let mutable index, lastEvaluated, more = 0, None, true + while more do let rm = Metrics() let context = createContext rm.Add let keyCond = <@ fun (b : Batch.Schema) -> b.p = stream @> @@ -481,10 +482,9 @@ type Container(tableName, createContext : (RequestMetrics -> unit) -> TableConte let! t, res = context.QueryProjectedPaginatedAsync(keyCond, proj, ?exclusiveStartKey = lastEvaluated, scanIndexForward = true, limit = maxItems) |> Stopwatch.timeAsync ct yield index, t, [| for i, c, n in res -> { isTip = Batch.isTip i; index = n - int64 c.Length; n = n } |], rm.Consumed - match res.LastEvaluatedKey with - | None -> () - | le -> yield! aux (index + 1, le) } - aux (0, None) + lastEvaluated <- res.LastEvaluatedKey + more <- Option.isSome lastEvaluated + index <- index + 1 } member x.DeleteItem(stream : string, i, ct) : Task = task { let rm = Metrics() let context = createContext rm.Add