-
Notifications
You must be signed in to change notification settings - Fork 16
/
ExampleRediscalaBlocking.scala
41 lines (32 loc) · 1.03 KB
/
ExampleRediscalaBlocking.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import redis.{RedisBlockingClient, RedisClient}
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object ExampleRediscalaBlocking extends App {
implicit val akkaSystem = akka.actor.ActorSystem()
val redis = RedisClient()
val redisBlocking = RedisBlockingClient()
val r = redis.del("workList").flatMap(_ => {
consumer()
publisher()
})
Await.result(r, 15 seconds)
akkaSystem.shutdown()
def publisher() = {
redis.lpush("workList", "doSomeWork")
Thread.sleep(2000)
redis.rpush("otherKeyWithWork", "doSomeWork1", "doSomeWork2")
}
def consumer() = Future {
val waitWork = 3
val sequenceFuture = for {i <- 0 to waitWork}
yield {
redisBlocking.blpop(Seq("workList", "otherKeyWithWork"), 5 seconds).map(result => {
result.map({
case (key, work) => println(s"list $key has work : ${work.utf8String}")
})
})
}
Await.result(Future.sequence(sequenceFuture), 10 seconds)
}
}