-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathZkClientSpec.scala
125 lines (94 loc) · 3.63 KB
/
ZkClientSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package spinoco.fs2.zk
import fs2.Stream._
import fs2.Task
import fs2.{Chunk, time}
import concurrent.duration._
class ZkClientSpec extends Fs2ZkClientSpec {
"ZooKeeper Client" - {
val node1 = ZkNode.parse("/n1").get
def sleep1s = time.sleep_[Task](1.second)
"create and delete Node" in {
val result =
standaloneServerAndClient.flatMap { case (zks, zkc) =>
eval(zkc.create(node1, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))).map(Right(_)) ++
eval(zkc.existsNow(node1)).map(Left(_))
}.runLog.unsafeRun
result should have size(2)
result(0) shouldBe Right(node1)
result(1).left.map(_.nonEmpty) shouldBe Left(true)
}
"receive updates about node created, updated and deleted" in {
val result =
standaloneServer.flatMap { zks =>
val observe = clientTo(zks) flatMap { zkc => zkc.exists(node1) }
val modify =
sleep1s ++
clientTo(zks) flatMap { zkc =>
eval_(zkc.create(node1, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))) ++
sleep1s ++ eval_(zkc.setDataOf(node1, Some(Chunk.bytes(Array[Byte](1,2,3), 0, 3)), None)) ++
sleep1s ++ eval_(zkc.delete(node1, None))
}
observe mergeDrainR modify
}
.map { _.map(_.dataLength) }
.take(4).runLog.unsafeTimed(5.seconds).unsafeRun
result shouldBe Vector(
None
, Some(0)
, Some(3)
, None
)
}
"Signals updates of the children" in {
val nodeA = (node1 / "a").get
val nodeB = (node1 / "b").get
val nodeC = (node1 / "c").get
val result =
standaloneServer.flatMap { zks =>
val observe = clientTo(zks) flatMap { zkc => zkc.childrenOf(node1) }
val modify =
sleep1s ++
clientTo(zks) flatMap { zkc =>
eval_(zkc.create(node1, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))) ++
sleep1s ++ eval_(zkc.create(nodeA, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))) ++
sleep1s ++ eval_(zkc.create(nodeB, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))) ++
sleep1s ++ eval_(zkc.create(nodeC, ZkCreateMode.Persistent, None, List(ZkACL.ACL_OPEN_UNSAFE))) ++
sleep1s ++ eval_(zkc.delete(nodeB, None)) ++
sleep1s ++ eval_(zkc.delete(nodeC, None)) ++
sleep1s ++ eval_(zkc.delete(nodeA, None)) ++
sleep1s ++ eval_(zkc.delete(node1, None))
}
observe mergeDrainR modify
}
. map { _.map(_._1) }
.take(9).runLog.unsafeTimed(10.seconds).unsafeRun
result shouldBe Vector(
None
, Some(List.empty)
, Some(List(nodeA))
, Some(List(nodeA, nodeB))
, Some(List(nodeA, nodeB, nodeC))
, Some(List(nodeA, nodeC))
, Some(List(nodeA))
, Some(List.empty)
, None
)
}
"Signals correctly state of the client" in {
val result =
standaloneServer.flatMap { zks =>
val observe = clientTo(zks) flatMap { _.clientState }
val shutdown = time.sleep[Task](2.seconds) ++ eval_(zks.shutdown)
val startup = time.sleep[Task](1.seconds) ++ eval_(zks.startup)
observe mergeDrainR (shutdown ++ startup)
}
.take(3)
.runLog.unsafeTimed(10.seconds).unsafeRun
result shouldBe Vector(
ZkClientState.SyncConnected
, ZkClientState.Disconnected
, ZkClientState.SyncConnected
)
}
}
}