diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index feccd2a..d66a963 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -35,7 +35,7 @@ object RedisConnection{ chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get } - val y: F[Chunk[Resp]] = x.flatMap(_.sequence.liftTo[F]) + val y: F[Chunk[Resp]] = x.flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) y } } @@ -69,7 +69,7 @@ object RedisConnection{ val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c => queue.offer(c.map(_._2)) >> { - c.traverse(_._1.get).flatMap(_.sequence.liftTo[F]) + c.traverse(_._1.get).flatMap(_.sequence.liftTo[F].adaptError{case e => RedisError.QueuedExceptionError(e)}) } } } @@ -311,13 +311,13 @@ object RedisConnection{ } case l@Left(_) => l.rightCast[Chunk[Resp]].pure[F] }.flatMap{ - case Right(n) => + case Right(n) => n.zipWithIndex.traverse_{ case (ref, i) => val (toSet, _) = chunk(i) toSet(Either.right(ref)) } - case e@Left(_) => + case e@Left(_) => chunk.traverse_{ case (deff, _) => deff(e.asInstanceOf[Either[Throwable, Resp]])} }) } else { diff --git a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala index ad0fdd3..27e40c1 100644 --- a/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala +++ b/core/shared/src/main/scala/io/chrisdavenport/rediculous/RedisError.scala @@ -18,4 +18,9 @@ object RedisError { final case class Generic(message: String) extends RedisError{ val cause: Option[Throwable] = None } + + final case class QueuedExceptionError(baseCase: Throwable) extends RedisError { + override val message: String = s"Error encountered in queue: ${baseCase.getMessage()}" + override val cause: Option[Throwable] = Some(baseCase) + } } \ No newline at end of file