Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus impl of RabbitMQ MetricsCollector #78

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ lazy val xml = project
)
)

lazy val prometheus = project
.settings(name := "com.itv")
.settings(moduleName := "bucky-prometheus")
.settings(kernelSettings: _*)
.aggregate(core, test)
.dependsOn(core, test % "test,it")
.configs(IntegrationTest)
.settings(Defaults.itSettings)
.settings(
internalDependencyClasspath in IntegrationTest += Attributed.blank((classDirectory in Test).value),
parallelExecution in IntegrationTest := false,
)
.settings(
libraryDependencies ++= Seq(
/* "io.chrisdavenport" %% "prometheus" % "0.5.0-M2", */
"io.prometheus" % "simpleclient" % "0.6.0",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test, it",
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test,it"
)
)

lazy val root = (project in file("."))
.aggregate(xml, circe, kamon, argonaut, example, test, core)
.aggregate(prometheus, xml, circe, kamon, argonaut, example, test, core)
.settings(publishArtifact := false)
131 changes: 76 additions & 55 deletions core/src/main/scala/com/itv/bucky/AmqpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ import java.util.{Collections, UUID}
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.language.higherKinds
import com.rabbitmq.client.MetricsCollector
import com.rabbitmq.client.NoOpMetricsCollector

trait AmqpClient[F[_]] {
def declare(declarations: Declaration*): F[Unit]
def declare(declarations: Iterable[Declaration]): F[Unit]
def publisher(): Publisher[F, PublishCommand]
def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction = DeadLetter,
prefetchCount: Int = 1,
shutdownTimeout: FiniteDuration = 1.minutes,
shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit]
def registerConsumer(
queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction = DeadLetter,
prefetchCount: Int = 1,
shutdownTimeout: FiniteDuration = 1.minutes,
shutdownRetry: FiniteDuration = 500.millis
): Resource[F, Unit]
def isConnectionOpen: F[Boolean]
}

Expand All @@ -36,18 +40,17 @@ object AmqpClient extends StrictLogging {
): Resource[F, RabbitChannel] = {
val make =
F.blocking {
logger.info(s"Starting Channel")
val channel = connection.createChannel()
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
if (cause.isInitiatedByApplication)
logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}")
else
logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause)
})
channel
}
.attempt
logger.info(s"Starting Channel")
val channel = connection.createChannel()
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
if (cause.isInitiatedByApplication)
logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}")
else
logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause)
})
channel
}.attempt
.flatTap {
case Right(_) =>
F.delay(logger.info(s"Channel has been started successfully!"))
Expand All @@ -59,38 +62,38 @@ object AmqpClient extends StrictLogging {
Resource.make(make.evalOn(executionContext))(channel => F.blocking(channel.close()))
}

private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext)(implicit
private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext, metricsCollector: MetricsCollector)(implicit
F: Async[F]
): Resource[F, RabbitConnection] = {
val make =
F.blocking {
logger.info(s"Starting AmqpClient")
val connectionFactory = new ConnectionFactory()
connectionFactory.setHost(config.host)
connectionFactory.setPort(config.port)
connectionFactory.setUsername(config.username)
connectionFactory.setPassword(config.password)
connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined)
connectionFactory.setSharedExecutor(executionContext match {
case null => throw null
case eces: ExecutionContextExecutorService => eces
case other =>
new AbstractExecutorService with ExecutionContextExecutorService {
override def prepare(): ExecutionContext = other
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = other execute runnable
override def reportFailure(t: Throwable): Unit = other reportFailure t
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
})
config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval)
config.virtualHost.foreach(connectionFactory.setVirtualHost)
connectionFactory.newConnection()
}
.attempt
logger.info(s"Starting AmqpClient")
val connectionFactory = new ConnectionFactory()
connectionFactory.setHost(config.host)
connectionFactory.setPort(config.port)
connectionFactory.setUsername(config.username)
connectionFactory.setPassword(config.password)
connectionFactory.setMetricsCollector(metricsCollector)
connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined)
connectionFactory.setSharedExecutor(executionContext match {
case null => throw null
case eces: ExecutionContextExecutorService => eces
case other =>
new AbstractExecutorService with ExecutionContextExecutorService {
override def prepare(): ExecutionContext = other
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = other execute runnable
override def reportFailure(t: Throwable): Unit = other reportFailure t
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
})
config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval)
config.virtualHost.foreach(connectionFactory.setVirtualHost)
connectionFactory.newConnection()
}.attempt
.flatTap {
case Right(_) =>
logger.info(s"AmqpClient has been started successfully!").pure[F]
Expand All @@ -112,13 +115,28 @@ object AmqpClient extends StrictLogging {
client <- apply[F](config, connectionExecutionContext, channelEc)
} yield client

def apply[F[_]](config: AmqpClientConfig, connectionExecutionContext: ExecutionContext, channelExecutionContext: ExecutionContext)(implicit
def apply[F[_]](config: AmqpClientConfig, metricsCollector: MetricsCollector)(implicit
F: Async[F],
t: Temporal[F],
connectionExecutionContext: ExecutionContext
): Resource[F, AmqpClient[F]] =
for {
channelEc <- defaultChannelExecutionContext
client <- apply[F](config, connectionExecutionContext, channelEc, metricsCollector)
} yield client

def apply[F[_]](
config: AmqpClientConfig,
connectionExecutionContext: ExecutionContext,
channelExecutionContext: ExecutionContext,
metricsCollector: MetricsCollector = new NoOpMetricsCollector()
)(implicit
F: Async[F],
t: Temporal[F]
): Resource[F, AmqpClient[F]] =
for {
dispatcher <- Dispatcher[F]
connection <- createConnection(config, connectionExecutionContext)
connection <- createConnection(config, connectionExecutionContext, metricsCollector)
publishChannel = createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext))
buildChannel = () => createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext))
client <- apply[F](config, buildChannel, publishChannel, dispatcher, channelExecutionContext)
Expand Down Expand Up @@ -157,16 +175,19 @@ object AmqpClient extends StrictLogging {

override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext)

override def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration): Resource[F, Unit] =
override def registerConsumer(
queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration
): Resource[F, Unit] =
for {
channel <- buildChannel()
handling <- Resource.make(Ref.of[F, Set[UUID]](Set.empty))(set =>
repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout))
repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
newHandler = (delivery: Delivery) =>
for {
id <- F.delay(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package com.itv.bucky.prometheus;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.impl.AbstractMetricsCollector;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.CollectorRegistry;

public class PrometheusMetricsCollector extends AbstractMetricsCollector {

private final Gauge connections;

private final Gauge channels;

private final Counter publishedMessages;

private final Counter failedToPublishMessages;

private final Counter ackedPublishedMessages;

private final Counter nackedPublishedMessages;

private final Counter unroutedPublishedMessages;

private final Counter consumedMessages;

private final Counter acknowledgedMessages;

private final Counter rejectedMessages;

public PrometheusMetricsCollector(final CollectorRegistry registry) {
this(registry, "rabbitmq");
}

public PrometheusMetricsCollector(final CollectorRegistry registry, final String prefix) {
this.connections = Gauge.build()
.name(prefix + "_connections")
.help("Current connections")
.create();
registry.register(this.connections);

this.channels = Gauge.build()
.name(prefix + "_channels")
.help("Current channels")
.create();
registry.register(this.channels);

this.publishedMessages = Counter.build()
.name(prefix + "_published_messages")
.help("Count of published messages")
.create();
registry.register(this.publishedMessages);

this.failedToPublishMessages = Counter.build()
.name(prefix + "_failed_to_publish_messages")
.help("Count of failed to publish messages")
.create();
registry.register(failedToPublishMessages);

this.ackedPublishedMessages = Counter.build()
.name(prefix + "_acked_published_messages")
.help("Count of acknowledged publish messages")
.create();
registry.register(ackedPublishedMessages);

this.nackedPublishedMessages = Counter.build()
.name(prefix + "_nacked_published_messages")
.help("Count of not acknowledged publish messages")
.create();
registry.register(nackedPublishedMessages);

this.unroutedPublishedMessages = Counter.build()
.name(prefix + "_unrouted_published_messages")
.help("Count of unrouted publish messages")
.create();
registry.register(unroutedPublishedMessages);

this.consumedMessages = Counter.build()
.name(prefix + "_consumed_messages")
.help("Count of consumed messages")
.create();
registry.register(consumedMessages);

this.acknowledgedMessages = Counter.build()
.name(prefix + "_acknowledged_messages")
.help("Count of acknowledged consumed messages")
.create();
registry.register(acknowledgedMessages);

this.rejectedMessages = Counter.build()
.name(prefix + "_rejected_messages")
.help("Count of rejected consumed messages")
.create();
registry.register(rejectedMessages);

}

@Override
protected void incrementConnectionCount(Connection connection) {
connections.inc();
}

@Override
protected void decrementConnectionCount(Connection connection) {
connections.dec();
}

@Override
protected void incrementChannelCount(Channel channel) {
channels.inc();
}

@Override
protected void decrementChannelCount(Channel channel) {
channels.dec();
}

@Override
protected void markPublishedMessage() {
publishedMessages.inc();
}

@Override
protected void markMessagePublishFailed() {
failedToPublishMessages.inc();
}

@Override
protected void markConsumedMessage() {
consumedMessages.inc();
}

@Override
protected void markAcknowledgedMessage() {
acknowledgedMessages.inc();
}

@Override
protected void markRejectedMessage() {
rejectedMessages.inc();
}

@Override
protected void markMessagePublishAcknowledged() {
ackedPublishedMessages.inc();
}

@Override
protected void markMessagePublishNotAcknowledged() {
nackedPublishedMessages.inc();
}

@Override
protected void markPublishedMessageUnrouted() {
unroutedPublishedMessages.inc();
}
}