From a5dacb0b7721d505e15692598f6bf28cc76557f7 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 12 Jan 2021 09:52:41 -0800 Subject: [PATCH 1/3] Use Full Cluster to Refresh Topology --- .../rediculous/RedisConnection.scala | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 70ee4ff..31e11a6 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -206,16 +206,24 @@ object RedisConnection{ ( refTopology.get .flatMap{ case (topo, setAt) => - if (useDynamicRefreshSource) topo.random.map((_, setAt)) - else Applicative[F].pure(((host, port), setAt)) + if (useDynamicRefreshSource) + Applicative[F].pure((topo.l.flatMap(c => c.replicas).map(r => (r.host, r.port)).toNel.getOrElse(NonEmptyList.of((host, port))), setAt)) + // topo.random.map((_, setAt)) + else Applicative[F].pure((NonEmptyList.of((host, port)), setAt)) }, Clock[F].instantNow ).tupled .flatMap{ case ((_, setAt), now) if setAt.isAfter(now.minusSeconds(cacheTopologySeconds.toSeconds)) => Applicative[F].unit - case (((host, port), _), _) => + case ((NonEmptyList((host, port), Nil), _), _) => keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) + case ((l, _), _) => + val nelActions: NonEmptyList[F[Unit]] = l.map{ case (host, port) => + keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) + .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) + } + raceNThrowFirst(nelActions) } ) @@ -305,4 +313,31 @@ object RedisConnection{ Either.catchNonFatal(port.toInt).toOption.map((host, _)) } else None } + + def raceN[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[Either[NonEmptyList[Throwable], A]] = { + for { + deferred <- Deferred[F, A] + out <- Bracket[F, Throwable].bracket( + nel.traverse(fa => + Concurrent[F].start(fa.flatMap(a => deferred.complete(a).as(a)).attempt) + ) + ){ + fibers: NonEmptyList[Fiber[F, Either[Throwable, A]]] => + Concurrent[F].race( + fibers.traverse(_.join).map( + _.traverse(_.swap).swap + ), + deferred.get + ) + }( + fibers => fibers.traverse_(_.cancel.attempt) + ) + } yield out.fold(identity, Either.right) + } + + def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] = + raceN(nel).flatMap{ + case Left(NonEmptyList(a, _)) => Concurrent[F].raiseError(a) + case Right(a) => Concurrent[F].pure(a) + } } \ No newline at end of file From 689833d228333d38dee61a6da46e9ac83a035d20 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 12 Jan 2021 09:56:38 -0800 Subject: [PATCH 2/3] Use All Cluster Topology to Refresh Faster, and If Any of the topology is valid --- .../io/chrisdavenport/rediculous/RedisConnection.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 31e11a6..8f49561 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -16,6 +16,7 @@ import _root_.io.chrisdavenport.rediculous.cluster.HashSlot import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands import fs2.io.tls.TLSContext import fs2.io.tls.TLSParameters +import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands.ClusterSlots sealed trait RedisConnection[F[_]] object RedisConnection{ @@ -207,8 +208,7 @@ object RedisConnection{ refTopology.get .flatMap{ case (topo, setAt) => if (useDynamicRefreshSource) - Applicative[F].pure((topo.l.flatMap(c => c.replicas).map(r => (r.host, r.port)).toNel.getOrElse(NonEmptyList.of((host, port))), setAt)) - // topo.random.map((_, setAt)) + Applicative[F].pure((NonEmptyList((host, port), topo.l.flatMap(c => c.replicas).map(r => (r.host, r.port))), setAt)) else Applicative[F].pure((NonEmptyList.of((host, port)), setAt)) }, Clock[F].instantNow @@ -219,11 +219,11 @@ object RedisConnection{ keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) case ((l, _), _) => - val nelActions: NonEmptyList[F[Unit]] = l.map{ case (host, port) => + val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) => keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) - .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) } raceNThrowFirst(nelActions) + .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) } ) From 6dc4b4eb19d8803778f905374acc5d31af7f32a3 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Tue, 12 Jan 2021 09:58:47 -0800 Subject: [PATCH 3/3] Same Logic for Both --- .../scala/io/chrisdavenport/rediculous/RedisConnection.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 8f49561..8dffd21 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -215,9 +215,6 @@ object RedisConnection{ ).tupled .flatMap{ case ((_, setAt), now) if setAt.isAfter(now.minusSeconds(cacheTopologySeconds.toSeconds)) => Applicative[F].unit - case ((NonEmptyList((host, port), Nil), _), _) => - keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)) - .flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now)))) case ((l, _), _) => val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) => keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))