diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala index d9c0e1a..e7fdda0 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisCommands.scala @@ -315,8 +315,11 @@ object RedisCommands { val block = xreadOpts.blockMillisecond.toList.flatMap(l => List("BLOCK", l.encode)) val count = xreadOpts.count.toList.flatMap(l => List("COUNT", l.encode)) val noAck = Alternative[List].guard(xreadOpts.noAck).as("NOACK") - val streamKeys = streams.map(_.stream.encode).toList - val streamOffsets = streams.map(_.offset.encode).toList + val (streamKeys, streamOffsets) = streams + .foldLeft((List.empty[String], List.empty[String])){ + case ((keys, offsets), streamOffset) => + (streamOffset.stream :: keys, streamOffset.offset :: offsets) + } val streamPairs = "STREAMS" :: streamKeys ::: streamOffsets RedisCtx[F].unkeyed(NEL("XREAD", block ::: count ::: noAck ::: streamPairs)) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala index 8228cec..82c4e5f 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisStream.scala @@ -1,7 +1,7 @@ package io.chrisdavenport.rediculous import cats.implicits._ -import fs2.{Stream, Pipe, Chunk} +import fs2.{Stream, Chunk} import scala.concurrent.duration.Duration import RedisCommands.{XAddOpts, XReadOpts, StreamOffset, Trimming, xadd, xread} import cats.effect._ @@ -41,7 +41,7 @@ object RedisStream { key => msg => StreamOffset.From(key, msg.recordId) private val offsetsByKey: List[RedisCommands.XReadResponse] => Map[String, Option[StreamOffset]] = - list => list.groupBy(_.stream).map { case (k, values) => k -> values.flatMap(_.records).lastOption.map(nextOffset(k)) } + list => list.groupBy(_.stream).map { case (k, values) => k -> values.lastOption.flatMap(_.records.lastOption).map(nextOffset(k)) } def read(keys: Set[String], initialOffset: String => StreamOffset, block: Duration, count: Option[Long]): Stream[F, RedisCommands.XReadResponse] = { val initial = keys.map(k => k -> initialOffset(k)).toMap diff --git a/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala b/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala index 8138f9a..5ad955b 100644 --- a/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala +++ b/core/src/test/scala/io/chrisdavenport/rediculous/RedisStreamSpec.scala @@ -28,7 +28,7 @@ class RedisStreamSpec extends CatsEffectSuite { (hostS, portI) = t host <- Resource.eval(Host.fromString(hostS).liftTo[IO](new Throwable("Invalid Host"))) port <- Resource.eval(Port.fromInt(portI).liftTo[IO](new Throwable("Invalid Port"))) - connection <- RedisConnection.pool[IO].withHost(host).withPort(port).build + connection <- RedisConnection.queued[IO].withHost(host).withPort(port).build } yield connection ) @@ -51,7 +51,6 @@ class RedisStreamSpec extends CatsEffectSuite { rStream.read(Set("foo")).take(1).compile.lastOrError }.map{ xrr => - val i = xrr.stream assertEquals(xrr.stream, "foo") val i2 = xrr.records.flatMap(sr => sr.keyValues) assertEquals(i2.toSet, messages.toList.flatMap(_.body).toSet) @@ -60,24 +59,50 @@ class RedisStreamSpec extends CatsEffectSuite { test("consume messages from offset"){ //connection => val messages = fs2.Chunk( - RedisStream.XAddMessage("fee", List("zoom" -> "zad")), - RedisStream.XAddMessage("fee", List("bar" -> "baz")) + RedisStream.XAddMessage("fee", List("1" -> "1")), + RedisStream.XAddMessage("fee", List("2" -> "2")), + RedisStream.XAddMessage("fee", List("3" -> "3")), ) redisConnection().flatMap{connection => val rStream = RedisStream.fromConnection(connection) rStream.append(messages) >> rStream - .read(Set("fee"), (_ => RedisCommands.StreamOffset.From("fee", "0-0")), Duration.Zero, 100L.some) + .read(Set("fee"), (_ => RedisCommands.StreamOffset.From("fee", "0-0")), Duration.Zero, 1L.some) .take(4) - .timeout(100.milli) + .timeout(250.milli) .handleErrorWith(_ => fs2.Stream.empty) .compile .toList }.map{ resps => - val records = resps.flatMap(_.records) - assertEquals(records.length, 2) + val records = resps.flatMap(_.records).flatMap(_.keyValues.map(_._1)) + assertEquals(records, List("1", "2", "3")) + } + } + + test("consume messages from multiple streams"){ //connection => + val messages = fs2.Chunk( + RedisStream.XAddMessage("baf", List("1" -> "1")), + RedisStream.XAddMessage("baz", List("2" -> "2")), + RedisStream.XAddMessage("bar", List("3" -> "3")), + RedisStream.XAddMessage("baf", List("4" -> "4")), + RedisStream.XAddMessage("baz", List("5" -> "5")), + RedisStream.XAddMessage("bar", List("6" -> "6")), + ) + redisConnection().flatMap{connection => + + val rStream = RedisStream.fromConnection(connection) + rStream.append(messages) >> + rStream + .read(Set("baf", "baz", "bar"), stream => RedisCommands.StreamOffset.From(stream, "0"), Duration.Zero, 1L.some) + .take(6) + .compile + .toList + + }.map{ resps => + val records = resps.flatMap(_.records).flatMap(_.keyValues.map(_._1)).toSet + assertEquals(records, Set("1", "2", "3", "4", "5", "6")) } } }