Skip to content

Commit

Permalink
Merge pull request #16 from ChristopherDavenport/useAllClusterToRefresh
Browse files Browse the repository at this point in the history
Use all cluster to refresh
  • Loading branch information
ChristopherDavenport authored Jan 12, 2021
2 parents ac104d6 + 6dc4b4e commit e9c66b3
Showing 1 changed file with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import _root_.io.chrisdavenport.rediculous.cluster.HashSlot
import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands
import fs2.io.tls.TLSContext
import fs2.io.tls.TLSParameters
import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands.ClusterSlots

sealed trait RedisConnection[F[_]]
object RedisConnection{
Expand Down Expand Up @@ -206,15 +207,19 @@ object RedisConnection{
(
refTopology.get
.flatMap{ case (topo, setAt) =>
if (useDynamicRefreshSource) topo.random.map((_, setAt))
else Applicative[F].pure(((host, port), setAt))
if (useDynamicRefreshSource)
Applicative[F].pure((NonEmptyList((host, port), topo.l.flatMap(c => c.replicas).map(r => (r.host, r.port))), setAt))
else Applicative[F].pure((NonEmptyList.of((host, port)), setAt))
},
Clock[F].instantNow
).tupled
.flatMap{
case ((_, setAt), now) if setAt.isAfter(now.minusSeconds(cacheTopologySeconds.toSeconds)) => Applicative[F].unit
case (((host, port), _), _) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
case ((l, _), _) =>
val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
}
raceNThrowFirst(nelActions)
.flatMap(s => Clock[F].instantNow.flatMap(now => refTopology.set((s,now))))
}
)
Expand Down Expand Up @@ -305,4 +310,31 @@ object RedisConnection{
Either.catchNonFatal(port.toInt).toOption.map((host, _))
} else None
}

def raceN[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[Either[NonEmptyList[Throwable], A]] = {
for {
deferred <- Deferred[F, A]
out <- Bracket[F, Throwable].bracket(
nel.traverse(fa =>
Concurrent[F].start(fa.flatMap(a => deferred.complete(a).as(a)).attempt)
)
){
fibers: NonEmptyList[Fiber[F, Either[Throwable, A]]] =>
Concurrent[F].race(
fibers.traverse(_.join).map(
_.traverse(_.swap).swap
),
deferred.get
)
}(
fibers => fibers.traverse_(_.cancel.attempt)
)
} yield out.fold(identity, Either.right)
}

def raceNThrowFirst[F[_]: Concurrent, A](nel: NonEmptyList[F[A]]): F[A] =
raceN(nel).flatMap{
case Left(NonEmptyList(a, _)) => Concurrent[F].raiseError(a)
case Right(a) => Concurrent[F].pure(a)
}
}

0 comments on commit e9c66b3

Please sign in to comment.