forked from microsoft/fsharplu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Async.fs
358 lines (315 loc) · 15 KB
/
Async.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/// Helpers utilities for F# Asynchronous Workflows
module Microsoft.FSharpLu.Async
open System
open System.Threading
open System.Threading.Tasks
open Microsoft.FSharpLu.Logging
/// Extension methods to work with .Net Tasks
type System.Threading.Tasks.Task with
member x.AsAsync
with get () = Async.AwaitTask(x)
/// Extension methods to work with generic .Net Task<T>
type System.Threading.Tasks.Task<'T> with
member x.AsAsync
with get () = Async.AwaitTask(x)
/// Reraise an exception from a `catch` block of an async exception handler
/// while preserving all the original exception stack trace.
/// Requires .NET 4.5 (ExceptionDispatchInfo)
let inline reraise e =
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(e).Throw()
raise <| System.InvalidProgramException() // Unreachable, used only to match any generic return type
/// Bind operator for Async computation
let bind f asyncOp =
async.Bind(asyncOp, f)
/// Map operator for Async computation
let map f asyncOp =
bind (f >> async.Return) asyncOp
/// Start multiple async-workflows concurrently
/// and return a new async that waits for the first async to return.
/// After one async has returned all the remaining ones are cancelled,
/// and the workflow may return before the cancellations of the remaining workflows complete.
let Compete workflows =
async {
use loosers = new System.Threading.CancellationTokenSource()
// The call to loosers.Cancel in the finally block is not sufficient.
// Doing it from OnCancel() guarantees that the children
// workflows are properly terminated if the parent workflow is cancelled.
use! c = Async.OnCancel(fun() -> loosers.Cancel())
try
let! winningTask =
workflows
|> Seq.map (fun w -> Async.StartAsTask(w, cancellationToken=loosers.Token))
|> Task.WhenAny
|> Async.AwaitTask
return winningTask.Result
finally
// cancell all other tasks
loosers.Cancel()
}
/// Async.Compete between an Async<'A> and a Task<'B>.
///
/// Create an asynchronous workflow that concurrently runs an async workflow and
/// a .Net Task and returns a discriminating union of three possible outcomes depending on
/// which computation finished first or if they finished simultaneously.
/// When one of the computation wins, the other 'loosing' computation
/// is cancelled synchronoulsy.
let CompeteWithTask<'A, 'B> (workflow:Async<'A>) (taskBuilder:CancellationTokenSource -> Task<'B>) =
async {
use looser = new System.Threading.CancellationTokenSource()
// The call to looser.Cancel in the finally block is not sufficient.
// Doing it from OnCancel guarantees that the children
// task/workflow are properly terminated if the parent workflow is cancelled.
use! c = Async.OnCancel(fun() -> looser.Cancel())
let t1 = Async.StartAsTask(workflow, cancellationToken=looser.Token)
let t2 = taskBuilder looser
try
let! competition = Tasks.Task.WhenAny [| t1:> Task; t2:> Task |] |> Async.AwaitTask
()
finally
looser.Cancel()
// Wait for the looser task cancellation to complete (a TaskCanceledException exception will be triggered when this happens)
do! async {
try
if not t1.IsCompleted then
let! _ = Async.AwaitTask t1 in ()
elif not t2.IsCompleted then
let! _ = Async.AwaitTask t2 in ()
with
:? System.AggregateException as e ->
if e.InnerExceptions |> Seq.exists (function e -> e :? TaskCanceledException) then
raise e
}
return
match t1.IsCompleted && not t1.IsCanceled, t2.IsCompleted && not t2.IsCanceled with
| true, false -> Choice1Of3 (t1.Result)
| false, true -> Choice2Of3 t2.Result
| true, true -> Choice3Of3 (t1.Result, t2.Result)
| false, false -> invalidOp "Both competing tasks failed to complete."
}
/// Return an object that when disposed automatically calls the object's Release() method
let inline private releaseOnDispose (eventObject: ^T) =
{ new System.IDisposable with
member __.Dispose() =
(^T : (member Release : unit -> ^R) (eventObject)) |> ignore
}
/// Create an asynchronous workflow that concurrently runs an async workflow and
/// tries to acquire a given .Net threading object (e.g. SlimSemaphore, ...).
/// It returns a discriminating union representing the task that finished first. The other one
/// (either the workflow or the threading object) is properly terminated and disposed.
let inline CompeteWithThreadingObject<'A, ^R, ^T when ^T: (member Release : unit -> ^R)
and ^T: (member WaitAsync : int -> CancellationToken -> Task<bool>)>
(workflow:Async<'A>) (threadingObject: ^T) =
async {
let! r = CompeteWithTask workflow (fun source -> (^T: (member WaitAsync : int -> CancellationToken -> Task<bool>)(threadingObject, -1, source.Token)))
return match r with
| Choice1Of3 x -> Choice1Of3 x
| Choice2Of3 true -> Choice2Of3 (releaseOnDispose threadingObject)
| Choice3Of3 (x,true) -> Choice3Of3 (x, releaseOnDispose threadingObject)
| Choice2Of3 false
| Choice3Of3 (_,false) -> assert false; invalidOp "Both competing tasks failed to complete."
}
/// Execute an asynchronous computation until it succeeds or the specified timeout expires.
let retry (timeout:TimeSpan, retryDelay:TimeSpan, f:unit -> Async<'T>) =
let beginPollTime = DateTime.UtcNow
let endPollTime = beginPollTime + timeout
let rec loop () =
async {
try
return! f()
with
| e when DateTime.UtcNow <= endPollTime ->
Trace.info "Exception in retry loop (will retry in %ds): %s" (int retryDelay.TotalSeconds) e.Message
do! Async.Sleep(int retryDelay.TotalMilliseconds)
return! loop()
}
loop ()
/// Perform an asynchronous computation until either it succeedes, an unexpected exception occurs, or the specified timeout expires.
/// Until the timeout expires, any exception thrown meeting the exception filter condition will not be thrown.
/// After the timeout expires any exception will be thrown, regardless of whether the filtering condition is met.
let retryOnSpecificFailures (timeout:TimeSpan, retryDelay:TimeSpan, f:unit -> Async<'T>, exceptionFilter: Exception -> bool) =
let beginPollTime = DateTime.UtcNow
let endPollTime = beginPollTime + timeout
let rec loop () =
async {
try
return! f()
with
| e when exceptionFilter e && DateTime.UtcNow <= endPollTime ->
do! Async.Sleep(int retryDelay.TotalMilliseconds)
return! loop()
}
loop ()
/// Execute an asynchronous computation until it returns something, an unexpected exception occurs or the specified timeout expires.
let retryUntilSome (timeout:TimeSpan) (retryDelay:TimeSpan) (f:unit -> Async<'T option>) =
let retryDelay = if retryDelay > timeout then timeout else retryDelay
let beginPollTime = DateTime.UtcNow
let endPollTime = beginPollTime + timeout
let rec loop () =
async {
let! r = f()
match r with
| Some v ->
return v
| None when DateTime.UtcNow <= endPollTime ->
do! Async.Sleep(int retryDelay.TotalMilliseconds)
return! loop()
| None ->
return raise <| System.TimeoutException()
}
loop ()
/// Execute an asynchronous computation until it returns something. Returns None if an unexpected exception occurs or the specified timeout expires.
let retryUntilSomeOrTimeout (timeout:TimeSpan) (retryDelay:TimeSpan) (f:unit -> Async<'T option>) =
let retryDelay = if retryDelay > timeout then timeout else retryDelay
let beginPollTime = DateTime.UtcNow
let endPollTime = beginPollTime + timeout
let rec loop () =
async {
let! r = f()
match r with
| None when DateTime.UtcNow <= endPollTime ->
do! Async.Sleep(int retryDelay.TotalMilliseconds)
return! loop()
| Some _ | None ->
return r
}
loop ()
module Synchronization =
/// Interface for a pool-based synchronization object
type IPool =
interface
abstract InternalSemaphore : SemaphoreSlim
abstract AcquireAsync : int option -> Async<System.IDisposable>
abstract TryAcquireAsync : int option -> Async<System.IDisposable option>
end
/// Synchronization object used to limit the total
/// number of requests that can be granted concurrently.
/// Usage:
/// let pool = new Pool(5)
/// ...
/// async {
//// use! token = pool.AcquireAsync()
/// ...
/// }
type Pool(size:int) =
let semaphore = new SemaphoreSlim(initialCount=size, maxCount=size)
interface IDisposable with
member __.Dispose() =
semaphore.Dispose()
interface IPool with
/// Returns the internal semaphore object
member __.InternalSemaphore with get() = semaphore
/// Wait until a token from the pool becomes available and acquire it
/// Return an object that automatically releases the token to the pool when disposed.
member __.AcquireAsync(?timeout) =
async {
let! token = Async.CancellationToken
let! ok = semaphore.WaitAsync(defaultArg timeout -1, token) |> Async.AwaitTask
if ok then
return releaseOnDispose semaphore
else
return failwith "Could not acquire a token from the pool"
}
/// Try acquiring a token from the pool.
/// On success returns an object that automatically releases the token
/// once disposed. Returns None on failure to acquire the token.
member __.TryAcquireAsync(?timeout) =
async {
let! token = Async.CancellationToken
let! entered = semaphore.WaitAsync(defaultArg timeout 0, token) |> Async.AwaitTask
if entered then
return releaseOnDispose semaphore |> Some
else
return None
}
/// Nested Async pool. Behaves like pool but acquires a resource from a parent pool
/// before acquring the token from this pool.
type NestedPool(size:int, parent:IPool) =
let pool = new Pool(size=size) :> IPool
interface IDisposable with
member __.Dispose() =
(pool :?> IDisposable).Dispose()
interface IPool with
/// Returns the internal semaphore object
member __.InternalSemaphore with get() = pool.InternalSemaphore
/// Wait until a token from the parent pool and this pool become available and acquire them.
/// Return an object that automatically releases the tokens when disposed.
member __.AcquireAsync(?timeout) =
async {
let! parent = parent.AcquireAsync(timeout)
let! this = pool.AcquireAsync(timeout)
return
{ new System.IDisposable with
member __.Dispose() =
this.Dispose()
parent.Dispose()
}
}
/// Try acquiring a token from the parent pool and this pool.
/// On success returns an object that automatically releases the tokens
/// once disposed. Returns None on failure to acquire a token from the parent
/// or from this pool.
member __.TryAcquireAsync(?timeout) =
async {
let! parent = parent.TryAcquireAsync(timeout)
match parent with
| None -> return None
| Some parentToken ->
let! thisToken = pool.TryAcquireAsync(timeout)
match thisToken with
| None -> parentToken.Dispose()
return None
| Some token ->
return Some
{ new System.IDisposable with
member x.Dispose() =
token.Dispose()
parentToken.Dispose()
}
}
/// Single-use event object that can be waited on asynchronoulsy
type SingleUseEvent() =
let semaphore = new SemaphoreSlim(initialCount=0, maxCount=1)
interface IDisposable with
member __.Dispose() =
semaphore.Dispose()
// Signal the event
member __.Fire() =
semaphore.Release() |> ignore
// Wait for the event to occur
member __.WaitAsync(?timeout) =
async {
let! token = Async.CancellationToken
let! ok = semaphore.WaitAsync((defaultArg timeout -1), token) |> Async.AwaitTask
if ok then
semaphore.Release() |> ignore
return ()
else
return failwith "Wait on SingleUseEvent timed-out."
}
/// Asynchronous critical section
type CriticalSection() =
inherit Pool(1)
member x.CriticalBlock(f:unit->'A, ?blockName) =
let description = match blockName with None -> "" | Some name -> " (" + name + ")"
async {
//printfn "Entering critical section%s" description
use! block = (x:> IPool).AcquireAsync None
//printfn "Critical section entered%s" description
let ret = f()
//printfn "Leaving critical section%s" description
return ret
}
member x.CriticalAsyncBlock(task:Async<'A>, ?blockName) =
let description = match blockName with None -> "" | Some name -> " (" + name + ")"
async {
use! block = (x:> IPool).AcquireAsync None
return! task
}
/// Asynchronous file copy
let copyFile source target =
async {
use sourceStream = System.IO.File.Open(source, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.Read)
use targetStream = System.IO.File.Open(target, System.IO.FileMode.Create, System.IO.FileAccess.Write)
let task = sourceStream.CopyToAsync(targetStream)
return! task |> Async.AwaitTask
}