diff --git a/build.sbt b/build.sbt index c97c5d2..29aa6a7 100644 --- a/build.sbt +++ b/build.sbt @@ -59,6 +59,7 @@ lazy val `http4s-consul-middleware` = crossProject(JSPlatform, JVMPlatform) "org.http4s" %%% "http4s-laws" % http4sVersion % Test, "org.scalameta" %%% "munit" % munitVersion % Test, "org.scalameta" %%% "munit-scalacheck" % munitVersion % Test, + "com.comcast" %%% "ip4s-test-kit" % "3.6.0" % Test, ) ++ (if (scalaVersion.value.startsWith("2.")) Seq( compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full), compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"), diff --git a/core/shared/src/main/scala/com/dwolla/consul/package.scala b/core/shared/src/main/scala/com/dwolla/consul/package.scala index 8de0901..71d4214 100644 --- a/core/shared/src/main/scala/com/dwolla/consul/package.scala +++ b/core/shared/src/main/scala/com/dwolla/consul/package.scala @@ -1,12 +1,14 @@ package com.dwolla -import cats.syntax.all._ +import io.circe.Encoder import monix.newtypes.NewtypeWrapped import org.http4s.Header.Single -import org.http4s.{Header, QueryParam, QueryParamEncoder, Uri} +import org.http4s.QueryParamDecoder.fromUnsafeCast +import org.http4s._ import org.typelevel.ci._ import scala.concurrent.duration._ +import scala.util.matching.Regex package object consul { case object OnlyHealthyServices { @@ -21,20 +23,49 @@ package object consul { package consul { object ServiceName extends NewtypeWrapped[String] { implicit val serviceNameSegmentEncoder: Uri.Path.SegmentEncoder[ServiceName] = Uri.Path.SegmentEncoder[String].contramap(_.value) + + def unapply(str: String): Some[ServiceName] = + Some(ServiceName(str)) + + implicit val serviceNameEncoder: Encoder[ServiceName] = Encoder[String].contramap(_.value) } - object ConsulIndex extends NewtypeWrapped[String] { - implicit val consulIndexHeader: Header[ConsulIndex, Single] = Header.create( + /* The Consul documentation (https://developer.hashicorp.com/consul/api-docs/features/blocking#implementation-details) says + * that "clients should sanity check that their index is at least 1 after each blocking response is handled" + * so it should be safe to use a Long and confirm in the parser that the value is positive. + */ + object ConsulIndex extends NewtypeWrapped[Long] { + implicit val consulIndexHeader: Header[ConsulIndex, Single] = Header.create[ConsulIndex, Single]( ci"X-Consul-Index", - _.value, - ConsulIndex(_).asRight + _.value.toString, + s => + ParseResult.fromTryCatchNonFatal("Consul index must be a positive integer value")(s.toLong) + .flatMap { + case x if x > 0 => Right(ConsulIndex(x)) + case x => Left(ParseFailure("Consul index must be greater than 0", s"Found $x <= 0")) + } ) implicit val consulIndexQueryParam: QueryParam[ConsulIndex] = QueryParam.fromKey("index") - implicit val consulIndexQueryParamEncoder: QueryParamEncoder[ConsulIndex] = QueryParamEncoder[String].contramap(_.value) + implicit val consulIndexQueryParamEncoder: QueryParamEncoder[ConsulIndex] = QueryParamEncoder[Long].contramap(_.value) + implicit val consulIndexQueryParamDecoder: QueryParamDecoder[ConsulIndex] = QueryParamDecoder[Long].map(ConsulIndex(_)) } object WaitPeriod extends NewtypeWrapped[FiniteDuration] { + private val regex: Regex = """(?\d+)(?[sm])""".r implicit val waitQueryParam: QueryParam[WaitPeriod] = QueryParam.fromKey("wait") implicit val waitQueryParamEncoder: QueryParamEncoder[WaitPeriod] = QueryParamEncoder[String].contramap(d => s"${d.value.toSeconds}s") + implicit val waitQueryParamDecoder: QueryParamDecoder[WaitPeriod] = + fromUnsafeCast[WaitPeriod] { _.value match { + case regex(value, unit) => + val valueInt = value.toInt + + val seconds = unit match { + case "s" => valueInt + case "m" => valueInt * 60 + } + + WaitPeriod(seconds.seconds) + case other => throw new MatchError(other) + }}("WaitPeriod") } } diff --git a/core/shared/src/test/scala/cats/effect/std/ArbitraryRandom.scala b/core/shared/src/test/scala/cats/effect/std/ArbitraryRandom.scala new file mode 100644 index 0000000..9cabb35 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/std/ArbitraryRandom.scala @@ -0,0 +1,17 @@ +package cats.effect.std + +import cats.syntax.all._ +import cats.effect.Sync +import cats.effect.std.Random.ScalaRandom +import org.scalacheck.{Arbitrary, Gen, Shrink} + +trait ArbitraryRandom { + def genRandom[F[_] : Sync]: Gen[Random[F]] = + Gen.long + .map(new scala.util.Random(_).pure[F]) + .map(new ScalaRandom[F](_) {}) + + implicit def arbRandom[F[_] : Sync]: Arbitrary[Random[F]] = Arbitrary(genRandom[F]) + + implicit def shrinkRandom[F[_]]: Shrink[Random[F]] = Shrink.shrinkAny +} diff --git a/core/shared/src/test/scala/com/dwolla/consul/Arbitraries.scala b/core/shared/src/test/scala/com/dwolla/consul/Arbitraries.scala index a3d9ab2..cadffc3 100644 --- a/core/shared/src/test/scala/com/dwolla/consul/Arbitraries.scala +++ b/core/shared/src/test/scala/com/dwolla/consul/Arbitraries.scala @@ -6,7 +6,7 @@ trait ConsulArbitraries { val genServiceName: Gen[ServiceName] = Gen.identifier.map(ServiceName(_)) implicit val arbServiceName: Arbitrary[ServiceName] = Arbitrary(genServiceName) - val genConsulIndex: Gen[ConsulIndex] = Gen.identifier.map(ConsulIndex(_)) + val genConsulIndex: Gen[ConsulIndex] = Gen.posNum[Long].map(ConsulIndex(_)) implicit val arbConsulIndex: Arbitrary[ConsulIndex] = Arbitrary(genConsulIndex) } diff --git a/core/shared/src/test/scala/com/dwolla/consul/ConsulServiceDiscoveryAlgSpec.scala b/core/shared/src/test/scala/com/dwolla/consul/ConsulServiceDiscoveryAlgSpec.scala index c864662..dd50225 100644 --- a/core/shared/src/test/scala/com/dwolla/consul/ConsulServiceDiscoveryAlgSpec.scala +++ b/core/shared/src/test/scala/com/dwolla/consul/ConsulServiceDiscoveryAlgSpec.scala @@ -1,15 +1,42 @@ package com.dwolla.consul +import cats.effect.std._ +import cats.effect.syntax.all._ +import cats.effect.{IO, IOLocal, Temporal} import cats.syntax.all._ +import com.comcast.ip4s.Arbitraries._ +import com.comcast.ip4s.{IpAddress, Port} import com.dwolla.consul.arbitraries._ -import munit.ScalaCheckSuite -import org.http4s.Uri +import com.dwolla.consul.examples.LocalTracing +import io.circe.Encoder +import io.circe.literal._ +import io.circe.syntax._ +import munit.{CatsEffectSuite, ScalaCheckEffectSuite} +import natchez.Span +import natchez.mtl.natchezMtlTraceForLocal +import org.http4s.Uri.Host +import org.http4s._ +import org.http4s.circe._ +import org.http4s.client.Client +import org.http4s.dsl.Http4sDsl import org.http4s.laws.discipline.arbitrary.http4sTestingArbitraryForUri -import org.scalacheck.Prop +import org.http4s.syntax.all._ +import org.scalacheck.Arbitrary.arbitrary +import org.scalacheck.effect.PropF +import org.scalacheck._ +import org.typelevel.log4cats.LoggerFactory +import org.typelevel.log4cats.noop.NoOpFactory -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + +class ConsulServiceDiscoveryAlgSpec + extends CatsEffectSuite + with ScalaCheckEffectSuite + with cats.effect.std.ArbitraryRandom + with LocalTracing { + + private implicit val noopLoggerFactory: LoggerFactory[IO] = NoOpFactory[IO] -class ConsulServiceDiscoveryAlgSpec extends ScalaCheckSuite { test("Consul service lookup URI construction") { Prop.forAll { (consulBase: Uri, serviceName: ServiceName, @@ -31,4 +58,78 @@ class ConsulServiceDiscoveryAlgSpec extends ScalaCheckSuite { assertEquals(output, expected) } } + + test("authoritiesForService returns the URIs for the service returned by Consul") { + PropF.forAllF { (services: Vector[ConsulApi.Service], + randomInstance: Random[IO], + ) => + val http4sClient: Client[IO] = Client.fromHttpApp(new ConsulApi[IO](services).app) + implicit val random: Random[IO] = randomInstance + + IOLocal(Span.noop[IO]).flatMap { implicit ioLocal => + for { + alg <- ConsulServiceDiscoveryAlg(uri"/", 5.seconds, http4sClient) + serviceName <- Random[IO].shuffleVector(services).map(_.headOption.map(_.name).getOrElse(ServiceName("missing"))) + output <- alg.authoritiesForService(serviceName).use(_.delayBy(10.millis)) // delay a bit to make sure the background request is also made + expected = services.collect { + case ConsulApi.Service(`serviceName`, host, port, true) => + Uri.Authority(None, Host.fromIpAddress(host), port.value.some) + } + } yield { + assertEquals(output, expected) + } + } + } + } +} + +object ConsulApi { + case class Service(name: ServiceName, address: IpAddress, port: Port, isHealthy: Boolean) + + object Service { + val genService: Gen[Service] = + for { + serviceName <- arbitrary[ServiceName] + address <- arbitrary[IpAddress] + port <- arbitrary[Port] + healthy <- arbitrary[Boolean] + } yield Service(serviceName, address, port, healthy) + + implicit val arbService: Arbitrary[Service] = Arbitrary(genService) + + implicit val encoder: Encoder[Service] = a => + json"""{ + "Node": {}, + "Service": { + "Service": ${a.name}, + "Address": ${a.address.toString}, + "Port": ${a.port.value} + }, + "Checks": [ + { + "Status": ${(if (a.isHealthy) "passing" else "critical"): String} + } + ] + }""" + } +} + +class ConsulApi[F[_] : Temporal](services: Vector[ConsulApi.Service]) extends Http4sDsl[F] { + object OnlyHealthyServices extends FlagQueryParamMatcher("passing") + object ConsulIndex extends OptionalQueryParamDecoderMatcher[com.dwolla.consul.ConsulIndex]("index") + object WaitPeriod extends OptionalQueryParamDecoderMatcher[com.dwolla.consul.WaitPeriod]("wait") + + def routes: HttpRoutes[F] = HttpRoutes.of { + case GET -> Root / "v1" / "health" / "service" / serviceName :? OnlyHealthyServices(_) :? ConsulIndex(idx) :? WaitPeriod(timeout) => + val json = services.filter { + case ConsulApi.Service(ServiceName(`serviceName`), _, _, true) => true + case _ => false + }.asJson + + val consulIndex = idx.getOrElse(com.dwolla.consul.ConsulIndex(1L)) + + timeout.map(_.value).foldLeft(Ok(json, consulIndex))(_.delayBy(_)) + } + + def app: HttpApp[F] = routes.orNotFound }