forked from microsoft/fsharplu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Async.fs
136 lines (129 loc) · 4.82 KB
/
Async.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
namespace Microsoft.FSharpLu.Tests
open System.Threading
open Microsoft.FSharpLu
open Microsoft.VisualStudio.TestTools.UnitTesting
open System.Threading.Tasks
[<TestClass>]
type AsyncTest() =
[<TestMethod>]
[<Description("Competing worfklows should terminate in the expected order")>]
member __.CompeteOnSleep () =
async {
let tasks =
[ async { do! Async.Sleep(100)
printfn "Task1 done" }
async { do! Async.Sleep(5000)
Assert.Fail("Task2 finished first") }
] |> Seq.ofList
do! Async.Compete tasks
printfn "Done"
}
|> Async.RunSynchronously
[<TestMethod>]
[<Description("Cancelling the parent worfklow should cancel competing worfklows started with Async.Compete")>]
member __.CompeteCancellable () =
let task1Cancelled = ref false
let task2Cancelled = ref false
let compete =
let jobs =
[ async {
try
do! Async.Sleep(-1)
Assert.Fail("task1 finished")
finally
task1Cancelled := true
}
async {
try
do! Async.Sleep(-1)
Assert.Fail("task2 finished")
finally
task2Cancelled := true
}
] |> Seq.ofList
Async.Compete jobs
let main = async {
use cts = new CancellationTokenSource()
Async.Start(compete, cts.Token)
do! Async.Sleep(100)
cts.Cancel()
do! Async.Sleep(100)
}
Async.RunSynchronously main
Assert.IsTrue !task1Cancelled
Assert.IsTrue !task2Cancelled
[<TestMethod>]
[<Description("Async created with Async.CompeteWithThreadingObject returns expected result when the threading object is acquired")>]
member __.``CompeteWithThreadingObject when semaphore wins``() =
let t1 = async {
do! Async.Sleep(1000)
printfn "t1 done"
Assert.Fail()
}
use s = new SemaphoreSlim(1)
s.Wait()
let main =
async {
let compete = Async.CompeteWithThreadingObject t1 s
let! c = Async.StartChild(compete)
do! Async.Sleep(100)
printfn "Releasing"
let z = s.Release()
printfn "semaphore released. %d" z
let! r = c
match r with
| Choice1Of3 _
| Choice3Of3 _ -> Assert.Fail("Unexpected result")
| Choice2Of3 y -> y.Dispose()
}
Async.RunSynchronously main
[<TestMethod>]
[<Description("Async created with Async.CompeteWithThreadingObject returns expected result when the threading object is acquired")>]
member __.``CompeteWithThreadingObject when workflow wins``() =
let t1 = async {
do! Async.Sleep(1000)
printfn "t1 done"
return 7
}
use s = new SemaphoreSlim(0)
let main =
async {
let compete = Async.CompeteWithThreadingObject t1 s
let! r = compete
match r with
| Choice2Of3 _
| Choice3Of3 _ -> Assert.Fail("Unexpected result")
| Choice1Of3 y -> Assert.AreEqual(y, 7)
}
Async.RunSynchronously main
[<TestMethod>]
[<Description("It should be possible to cancel a workflow created with Async.CompeteWithThreadingObject and the underlying computations should also be cancelled")>]
member __.``CompeteWithThreadingObject propagates cancellation`` () =
let t1 = async {
do! Async.Sleep(1000)
printfn "t1 done"
Assert.Fail()
}
use s = new SemaphoreSlim(1)
s.Wait()
let taskCancelled = ref false
let compete =
async {
try
let! newEvent = Async.CompeteWithThreadingObject t1 s
printfn "compete done"
Assert.Fail("compting workflow should not complete")
finally
taskCancelled := true
}
use cts = new CancellationTokenSource()
Async.Start(compete, cts.Token)
let main =
async {
do! Async.Sleep(100)
printfn "cancelling"
cts.Cancel()
do! Async.Sleep(100)
}
Async.RunSynchronously main
Assert.IsTrue(!taskCancelled)