diff --git a/build.sbt b/build.sbt index 47ecd204..597d6d47 100644 --- a/build.sbt +++ b/build.sbt @@ -284,6 +284,27 @@ lazy val xml = project ) ) +lazy val prometheus = project + .settings(name := "com.itv") + .settings(moduleName := "bucky-prometheus") + .settings(kernelSettings: _*) + .aggregate(core, test) + .dependsOn(core, test % "test,it") + .configs(IntegrationTest) + .settings(Defaults.itSettings) + .settings( + internalDependencyClasspath in IntegrationTest += Attributed.blank((classDirectory in Test).value), + parallelExecution in IntegrationTest := false, + ) + .settings( + libraryDependencies ++= Seq( + /* "io.chrisdavenport" %% "prometheus" % "0.5.0-M2", */ + "io.prometheus" % "simpleclient" % "0.6.0", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test, it", + "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test,it" + ) + ) + lazy val root = (project in file(".")) - .aggregate(xml, circe, kamon, argonaut, example, test, core) + .aggregate(prometheus, xml, circe, kamon, argonaut, example, test, core) .settings(publishArtifact := false) diff --git a/core/src/main/scala/com/itv/bucky/AmqpClient.scala b/core/src/main/scala/com/itv/bucky/AmqpClient.scala index 697de4d4..c83fd78b 100644 --- a/core/src/main/scala/com/itv/bucky/AmqpClient.scala +++ b/core/src/main/scala/com/itv/bucky/AmqpClient.scala @@ -15,17 +15,21 @@ import java.util.{Collections, UUID} import scala.concurrent.duration.{FiniteDuration, _} import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} import scala.language.higherKinds +import com.rabbitmq.client.MetricsCollector +import com.rabbitmq.client.NoOpMetricsCollector trait AmqpClient[F[_]] { def declare(declarations: Declaration*): F[Unit] def declare(declarations: Iterable[Declaration]): F[Unit] def publisher(): Publisher[F, PublishCommand] - def registerConsumer(queueName: QueueName, - handler: Handler[F, Delivery], - exceptionalAction: ConsumeAction = DeadLetter, - prefetchCount: Int = 1, - shutdownTimeout: FiniteDuration = 1.minutes, - shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit] + def registerConsumer( + queueName: QueueName, + handler: Handler[F, Delivery], + exceptionalAction: ConsumeAction = DeadLetter, + prefetchCount: Int = 1, + shutdownTimeout: FiniteDuration = 1.minutes, + shutdownRetry: FiniteDuration = 500.millis + ): Resource[F, Unit] def isConnectionOpen: F[Boolean] } @@ -36,18 +40,17 @@ object AmqpClient extends StrictLogging { ): Resource[F, RabbitChannel] = { val make = F.blocking { - logger.info(s"Starting Channel") - val channel = connection.createChannel() - channel.addShutdownListener(new ShutdownListener { - override def shutdownCompleted(cause: ShutdownSignalException): Unit = - if (cause.isInitiatedByApplication) - logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}") - else - logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause) - }) - channel - } - .attempt + logger.info(s"Starting Channel") + val channel = connection.createChannel() + channel.addShutdownListener(new ShutdownListener { + override def shutdownCompleted(cause: ShutdownSignalException): Unit = + if (cause.isInitiatedByApplication) + logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}") + else + logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause) + }) + channel + }.attempt .flatTap { case Right(_) => F.delay(logger.info(s"Channel has been started successfully!")) @@ -59,38 +62,38 @@ object AmqpClient extends StrictLogging { Resource.make(make.evalOn(executionContext))(channel => F.blocking(channel.close())) } - private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext)(implicit + private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext, metricsCollector: MetricsCollector)(implicit F: Async[F] ): Resource[F, RabbitConnection] = { val make = F.blocking { - logger.info(s"Starting AmqpClient") - val connectionFactory = new ConnectionFactory() - connectionFactory.setHost(config.host) - connectionFactory.setPort(config.port) - connectionFactory.setUsername(config.username) - connectionFactory.setPassword(config.password) - connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined) - connectionFactory.setSharedExecutor(executionContext match { - case null => throw null - case eces: ExecutionContextExecutorService => eces - case other => - new AbstractExecutorService with ExecutionContextExecutorService { - override def prepare(): ExecutionContext = other - override def isShutdown = false - override def isTerminated = false - override def shutdown() = () - override def shutdownNow() = Collections.emptyList[Runnable] - override def execute(runnable: Runnable): Unit = other execute runnable - override def reportFailure(t: Throwable): Unit = other reportFailure t - override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false - } - }) - config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval) - config.virtualHost.foreach(connectionFactory.setVirtualHost) - connectionFactory.newConnection() - } - .attempt + logger.info(s"Starting AmqpClient") + val connectionFactory = new ConnectionFactory() + connectionFactory.setHost(config.host) + connectionFactory.setPort(config.port) + connectionFactory.setUsername(config.username) + connectionFactory.setPassword(config.password) + connectionFactory.setMetricsCollector(metricsCollector) + connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined) + connectionFactory.setSharedExecutor(executionContext match { + case null => throw null + case eces: ExecutionContextExecutorService => eces + case other => + new AbstractExecutorService with ExecutionContextExecutorService { + override def prepare(): ExecutionContext = other + override def isShutdown = false + override def isTerminated = false + override def shutdown() = () + override def shutdownNow() = Collections.emptyList[Runnable] + override def execute(runnable: Runnable): Unit = other execute runnable + override def reportFailure(t: Throwable): Unit = other reportFailure t + override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false + } + }) + config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval) + config.virtualHost.foreach(connectionFactory.setVirtualHost) + connectionFactory.newConnection() + }.attempt .flatTap { case Right(_) => logger.info(s"AmqpClient has been started successfully!").pure[F] @@ -112,13 +115,28 @@ object AmqpClient extends StrictLogging { client <- apply[F](config, connectionExecutionContext, channelEc) } yield client - def apply[F[_]](config: AmqpClientConfig, connectionExecutionContext: ExecutionContext, channelExecutionContext: ExecutionContext)(implicit + def apply[F[_]](config: AmqpClientConfig, metricsCollector: MetricsCollector)(implicit + F: Async[F], + t: Temporal[F], + connectionExecutionContext: ExecutionContext + ): Resource[F, AmqpClient[F]] = + for { + channelEc <- defaultChannelExecutionContext + client <- apply[F](config, connectionExecutionContext, channelEc, metricsCollector) + } yield client + + def apply[F[_]]( + config: AmqpClientConfig, + connectionExecutionContext: ExecutionContext, + channelExecutionContext: ExecutionContext, + metricsCollector: MetricsCollector = new NoOpMetricsCollector() + )(implicit F: Async[F], t: Temporal[F] ): Resource[F, AmqpClient[F]] = for { dispatcher <- Dispatcher[F] - connection <- createConnection(config, connectionExecutionContext) + connection <- createConnection(config, connectionExecutionContext, metricsCollector) publishChannel = createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) buildChannel = () => createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext)) client <- apply[F](config, buildChannel, publishChannel, dispatcher, channelExecutionContext) @@ -157,16 +175,19 @@ object AmqpClient extends StrictLogging { override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext) - override def registerConsumer(queueName: QueueName, - handler: Handler[F, Delivery], - exceptionalAction: ConsumeAction, - prefetchCount: Int, - shutdownTimeout: FiniteDuration, - shutdownRetry: FiniteDuration): Resource[F, Unit] = + override def registerConsumer( + queueName: QueueName, + handler: Handler[F, Delivery], + exceptionalAction: ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration, + shutdownRetry: FiniteDuration + ): Resource[F, Unit] = for { channel <- buildChannel() handling <- Resource.make(Ref.of[F, Set[UUID]](Set.empty))(set => - repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)) + repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout) + ) newHandler = (delivery: Delivery) => for { id <- F.delay(UUID.randomUUID()) diff --git a/prometheus/src/main/java/com/itv/bucky/prometheus/PrometheusMetricsCollector.java b/prometheus/src/main/java/com/itv/bucky/prometheus/PrometheusMetricsCollector.java new file mode 100644 index 00000000..a4da6612 --- /dev/null +++ b/prometheus/src/main/java/com/itv/bucky/prometheus/PrometheusMetricsCollector.java @@ -0,0 +1,162 @@ +package com.itv.bucky.prometheus; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.impl.AbstractMetricsCollector; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.CollectorRegistry; + +public class PrometheusMetricsCollector extends AbstractMetricsCollector { + + private final Gauge connections; + + private final Gauge channels; + + private final Counter publishedMessages; + + private final Counter failedToPublishMessages; + + private final Counter ackedPublishedMessages; + + private final Counter nackedPublishedMessages; + + private final Counter unroutedPublishedMessages; + + private final Counter consumedMessages; + + private final Counter acknowledgedMessages; + + private final Counter rejectedMessages; + + public PrometheusMetricsCollector(final CollectorRegistry registry) { + this(registry, "rabbitmq"); + } + + public PrometheusMetricsCollector(final CollectorRegistry registry, final String prefix) { + this.connections = Gauge.build() + .name(prefix + "_connections") + .help("Current connections") + .create(); + registry.register(this.connections); + + this.channels = Gauge.build() + .name(prefix + "_channels") + .help("Current channels") + .create(); + registry.register(this.channels); + + this.publishedMessages = Counter.build() + .name(prefix + "_published_messages") + .help("Count of published messages") + .create(); + registry.register(this.publishedMessages); + + this.failedToPublishMessages = Counter.build() + .name(prefix + "_failed_to_publish_messages") + .help("Count of failed to publish messages") + .create(); + registry.register(failedToPublishMessages); + + this.ackedPublishedMessages = Counter.build() + .name(prefix + "_acked_published_messages") + .help("Count of acknowledged publish messages") + .create(); + registry.register(ackedPublishedMessages); + + this.nackedPublishedMessages = Counter.build() + .name(prefix + "_nacked_published_messages") + .help("Count of not acknowledged publish messages") + .create(); + registry.register(nackedPublishedMessages); + + this.unroutedPublishedMessages = Counter.build() + .name(prefix + "_unrouted_published_messages") + .help("Count of unrouted publish messages") + .create(); + registry.register(unroutedPublishedMessages); + + this.consumedMessages = Counter.build() + .name(prefix + "_consumed_messages") + .help("Count of consumed messages") + .create(); + registry.register(consumedMessages); + + this.acknowledgedMessages = Counter.build() + .name(prefix + "_acknowledged_messages") + .help("Count of acknowledged consumed messages") + .create(); + registry.register(acknowledgedMessages); + + this.rejectedMessages = Counter.build() + .name(prefix + "_rejected_messages") + .help("Count of rejected consumed messages") + .create(); + registry.register(rejectedMessages); + + } + + @Override + protected void incrementConnectionCount(Connection connection) { + connections.inc(); + } + + @Override + protected void decrementConnectionCount(Connection connection) { + connections.dec(); + } + + @Override + protected void incrementChannelCount(Channel channel) { + channels.inc(); + } + + @Override + protected void decrementChannelCount(Channel channel) { + channels.dec(); + } + + @Override + protected void markPublishedMessage() { + publishedMessages.inc(); + } + + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.inc(); + } + + @Override + protected void markConsumedMessage() { + consumedMessages.inc(); + } + + @Override + protected void markAcknowledgedMessage() { + acknowledgedMessages.inc(); + } + + @Override + protected void markRejectedMessage() { + rejectedMessages.inc(); + } + + @Override + protected void markMessagePublishAcknowledged() { + ackedPublishedMessages.inc(); + } + + @Override + protected void markMessagePublishNotAcknowledged() { + nackedPublishedMessages.inc(); + } + + @Override + protected void markPublishedMessageUnrouted() { + unroutedPublishedMessages.inc(); + } +}