-
Notifications
You must be signed in to change notification settings - Fork 18
/
VotingContestSpec.scala
172 lines (136 loc) · 4.49 KB
/
VotingContestSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.datareplication
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object VotingContestSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
"""))
}
object VotingService {
case object Open
case object OpenAck
case object Close
case object CloseAck
case class Vote(participant: String)
case object GetVotes
case class Votes(result: Map[String, Long], open: Boolean)
private case class GetVotesReq(replyTo: ActorRef)
}
class VotingService extends Actor {
import akka.contrib.datareplication.Replicator._
import VotingService._
val replicator = DataReplication(context.system).replicator
implicit val cluster = Cluster(context.system)
val OpenedKey = "contestOpened"
val ClosedKey = "contestClosed"
val CountersKey = "contestCounters"
replicator ! Subscribe(OpenedKey, self)
def receive = {
case Open ⇒
replicator ! Update(OpenedKey, Flag(), WriteAll, 5.seconds)(_.switchOn)
becomeOpen()
case Changed(OpenedKey, flag: Flag) if flag.enabled ⇒
becomeOpen()
case GetVotes ⇒
sender() ! Votes(Map.empty, open = false)
}
def becomeOpen(): Unit = {
replicator ! Unsubscribe(OpenedKey, self)
replicator ! Subscribe(ClosedKey, self)
context.become(open orElse getVotes(open = true))
}
def open: Receive = {
case v @ Vote(participant) ⇒
val update = Update(CountersKey, PNCounterMap(), request = Some(v)) {
_.increment(participant, 1)
}
replicator ! update
case _: UpdateSuccess ⇒
case Close ⇒
replicator ! Update(ClosedKey, Flag(), WriteAll, 5.seconds)(_.switchOn)
context.become(getVotes(open = false))
case Changed(ClosedKey, flag: Flag) if flag.enabled ⇒
context.become(getVotes(open = false))
}
def getVotes(open: Boolean): Receive = {
case GetVotes ⇒
replicator ! Get(CountersKey, ReadAll, 3.seconds, Some(GetVotesReq(sender())))
case GetSuccess(CountersKey, d: PNCounterMap, Some(GetVotesReq(replyTo))) ⇒
replyTo ! Votes(d.entries, open)
case NotFound(CountersKey, Some(GetVotesReq(replyTo))) ⇒
replyTo ! Votes(Map.empty, open)
case _: GetFailure ⇒
case _: UpdateSuccess ⇒
}
}
class VotingContestSpecMultiJvmNode1 extends VotingContestSpec
class VotingContestSpecMultiJvmNode2 extends VotingContestSpec
class VotingContestSpecMultiJvmNode3 extends VotingContestSpec
class VotingContestSpec extends MultiNodeSpec(VotingContestSpec) with STMultiNodeSpec with ImplicitSender {
import VotingContestSpec._
import ShoppingCart._
override def initialParticipants = roles.size
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated voting" must {
"join cluster" in {
join(node1, node1)
join(node2, node1)
join(node3, node1)
enterBarrier("after-1")
}
"count votes correctly" in within(15.seconds) {
import VotingService._
val votingService = system.actorOf(Props[VotingService], "votingService")
val N = 1000
runOn(node1) {
votingService ! Open
for (n ← 1 to N) {
votingService ! Vote("#" + ((n % 20) + 1))
}
}
runOn(node2, node3) {
// wait for it to open
val p = TestProbe()
awaitAssert {
votingService.tell(GetVotes, p.ref)
p.expectMsgPF(3.seconds) { case Votes(_, true) ⇒ true }
}
for (n ← 1 to N) {
votingService ! Vote("#" + ((n % 20) + 1))
}
}
enterBarrier("voting-done")
runOn(node3) {
votingService ! Close
}
val expected = (1 to 20).map(n ⇒ "#" + n -> (3L * N / 20)).toMap
awaitAssert {
votingService ! GetVotes
expectMsg(3.seconds, Votes(expected, false))
}
enterBarrier("after-2")
}
}
}