Skip to content

Commit

Permalink
Add PrometheusMetricsCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcardell committed Jan 10, 2023
1 parent b0b8cc8 commit d002ace
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 55 deletions.
21 changes: 21 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ lazy val xml = project
)
)

lazy val prometheus = project
.settings(name := "com.itv")
.settings(moduleName := "bucky-prometheus")
.settings(kernelSettings: _*)
.aggregate(core, test)
.dependsOn(core, test % "test,it")
.configs(IntegrationTest)
.settings(Defaults.itSettings)
.settings(
internalDependencyClasspath in IntegrationTest += Attributed.blank((classDirectory in Test).value),
parallelExecution in IntegrationTest := false
)
.settings(
libraryDependencies ++= Seq(
/* "io.chrisdavenport" %% "prometheus" % "0.5.0-M2", */
"io.prometheus" % "simpleclient" % "0.6.0",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test, it",
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test,it"
)
)

lazy val root = (project in file("."))
.aggregate(xml, circe, kamon, argonaut, example, test, core)
.settings(publishArtifact := false)
131 changes: 76 additions & 55 deletions core/src/main/scala/com/itv/bucky/AmqpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ import java.util.{Collections, UUID}
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.language.higherKinds
import com.rabbitmq.client.MetricsCollector
import com.rabbitmq.client.NoOpMetricsCollector

trait AmqpClient[F[_]] {
def declare(declarations: Declaration*): F[Unit]
def declare(declarations: Iterable[Declaration]): F[Unit]
def publisher(): Publisher[F, PublishCommand]
def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction = DeadLetter,
prefetchCount: Int = 1,
shutdownTimeout: FiniteDuration = 1.minutes,
shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit]
def registerConsumer(
queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction = DeadLetter,
prefetchCount: Int = 1,
shutdownTimeout: FiniteDuration = 1.minutes,
shutdownRetry: FiniteDuration = 500.millis
): Resource[F, Unit]
def isConnectionOpen: F[Boolean]
}

Expand All @@ -36,18 +40,17 @@ object AmqpClient extends StrictLogging {
): Resource[F, RabbitChannel] = {
val make =
F.blocking {
logger.info(s"Starting Channel")
val channel = connection.createChannel()
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
if (cause.isInitiatedByApplication)
logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}")
else
logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause)
})
channel
}
.attempt
logger.info(s"Starting Channel")
val channel = connection.createChannel()
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException): Unit =
if (cause.isInitiatedByApplication)
logger.info(s"Channel shut down due to explicit application action: ${cause.getMessage}")
else
logger.error(s"Channel shut down by broker or because of detectable non-deliberate application failure", cause)
})
channel
}.attempt
.flatTap {
case Right(_) =>
F.delay(logger.info(s"Channel has been started successfully!"))
Expand All @@ -59,38 +62,38 @@ object AmqpClient extends StrictLogging {
Resource.make(make.evalOn(executionContext))(channel => F.blocking(channel.close()))
}

private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext)(implicit
private def createConnection[F[_]](config: AmqpClientConfig, executionContext: ExecutionContext, metricsCollector: MetricsCollector)(implicit
F: Async[F]
): Resource[F, RabbitConnection] = {
val make =
F.blocking {
logger.info(s"Starting AmqpClient")
val connectionFactory = new ConnectionFactory()
connectionFactory.setHost(config.host)
connectionFactory.setPort(config.port)
connectionFactory.setUsername(config.username)
connectionFactory.setPassword(config.password)
connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined)
connectionFactory.setSharedExecutor(executionContext match {
case null => throw null
case eces: ExecutionContextExecutorService => eces
case other =>
new AbstractExecutorService with ExecutionContextExecutorService {
override def prepare(): ExecutionContext = other
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = other execute runnable
override def reportFailure(t: Throwable): Unit = other reportFailure t
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
})
config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval)
config.virtualHost.foreach(connectionFactory.setVirtualHost)
connectionFactory.newConnection()
}
.attempt
logger.info(s"Starting AmqpClient")
val connectionFactory = new ConnectionFactory()
connectionFactory.setHost(config.host)
connectionFactory.setPort(config.port)
connectionFactory.setUsername(config.username)
connectionFactory.setPassword(config.password)
connectionFactory.setMetricsCollector(metricsCollector)
connectionFactory.setAutomaticRecoveryEnabled(config.networkRecoveryInterval.isDefined)
connectionFactory.setSharedExecutor(executionContext match {
case null => throw null
case eces: ExecutionContextExecutorService => eces
case other =>
new AbstractExecutorService with ExecutionContextExecutorService {
override def prepare(): ExecutionContext = other
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = other execute runnable
override def reportFailure(t: Throwable): Unit = other reportFailure t
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
})
config.networkRecoveryInterval.map(_.toMillis.toInt).foreach(connectionFactory.setNetworkRecoveryInterval)
config.virtualHost.foreach(connectionFactory.setVirtualHost)
connectionFactory.newConnection()
}.attempt
.flatTap {
case Right(_) =>
logger.info(s"AmqpClient has been started successfully!").pure[F]
Expand All @@ -112,13 +115,28 @@ object AmqpClient extends StrictLogging {
client <- apply[F](config, connectionExecutionContext, channelEc)
} yield client

def apply[F[_]](config: AmqpClientConfig, connectionExecutionContext: ExecutionContext, channelExecutionContext: ExecutionContext)(implicit
def apply[F[_]](config: AmqpClientConfig, metricsCollector: MetricsCollector)(implicit
F: Async[F],
t: Temporal[F],
connectionExecutionContext: ExecutionContext
): Resource[F, AmqpClient[F]] =
for {
channelEc <- defaultChannelExecutionContext
client <- apply[F](config, connectionExecutionContext, channelEc, metricsCollector)
} yield client

def apply[F[_]](
config: AmqpClientConfig,
connectionExecutionContext: ExecutionContext,
channelExecutionContext: ExecutionContext,
metricsCollector: MetricsCollector = new NoOpMetricsCollector()
)(implicit
F: Async[F],
t: Temporal[F]
): Resource[F, AmqpClient[F]] =
for {
dispatcher <- Dispatcher[F]
connection <- createConnection(config, connectionExecutionContext)
connection <- createConnection(config, connectionExecutionContext, metricsCollector)
publishChannel = createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext))
buildChannel = () => createChannel(connection, channelExecutionContext).map(Channel.apply[F](_, dispatcher, channelExecutionContext))
client <- apply[F](config, buildChannel, publishChannel, dispatcher, channelExecutionContext)
Expand Down Expand Up @@ -157,16 +175,19 @@ object AmqpClient extends StrictLogging {

override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext)

override def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration): Resource[F, Unit] =
override def registerConsumer(
queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration
): Resource[F, Unit] =
for {
channel <- buildChannel()
handling <- Resource.make(Ref.of[F, Set[UUID]](Set.empty))(set =>
repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout))
repeatUntil(F.blocking(logger.debug("Verifying running handlers.")) *> set.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
newHandler = (delivery: Delivery) =>
for {
id <- F.delay(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.itv.bucky.prometheus;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.impl.AbstractMetricsCollector;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.CollectorRegistry;

public class PrometheusMetricsCollector extends AbstractMetricsCollector {

private final Gauge connections;

private final Gauge channels;

private final Counter publishedMessages;

private final Counter failedToPublishMessages;

private final Counter ackedPublishedMessages;

private final Counter nackedPublishedMessages;

private final Counter unroutedPublishedMessages;

private final Counter consumedMessages;

private final Counter acknowledgedMessages;

private final Counter rejectedMessages;

public PrometheusMetricsCollector(final CollectorRegistry registry) {
this(registry, "rabbitmq");
}

public PrometheusMetricsCollector(final CollectorRegistry registry, final String prefix) {
this.connections = Gauge.build().name(prefix + "_connections").create();
registry.register(this.connections);

this.channels = Gauge.build()
.name(prefix + "_channels")
.create();
registry.register(this.channels);

this.publishedMessages = Counter.build()
.name(prefix + "_published_messages")
.create();
registry.register(this.publishedMessages);

this.failedToPublishMessages = Counter.build()
.name(prefix + "_failed_to_publish_messages")
.create();
registry.register(failedToPublishMessages);

this.ackedPublishedMessages = Counter.build()
.name(prefix + "_acked_published_messages")
.create();
registry.register(ackedPublishedMessages);

this.nackedPublishedMessages = Counter.build()
.name(prefix + "_nacked_published_messages")
.create();
registry.register(nackedPublishedMessages);

this.unroutedPublishedMessages = Counter.build()
.name(prefix + "_unrouted_published_messages")
.create();
registry.register(unroutedPublishedMessages);

this.consumedMessages = Counter.build()
.name(prefix + "_consumed_messages")
.create();
registry.register(consumedMessages);

this.acknowledgedMessages = Counter.build()
.name(prefix + "_acknowledged_messages")
.create();
registry.register(acknowledgedMessages);

this.rejectedMessages = Counter.build()
.name(prefix + "_rejected_messages")
.create();
registry.register(rejectedMessages);

}

@Override
protected void incrementConnectionCount(Connection connection) {
connections.inc();
}

@Override
protected void decrementConnectionCount(Connection connection) {
connections.dec();
}

@Override
protected void incrementChannelCount(Channel channel) {
channels.inc();
}

@Override
protected void decrementChannelCount(Channel channel) {
channels.dec();
}

@Override
protected void markPublishedMessage() {
publishedMessages.inc();
}

@Override
protected void markMessagePublishFailed() {
failedToPublishMessages.inc();
}

@Override
protected void markConsumedMessage() {
consumedMessages.inc();
}

@Override
protected void markAcknowledgedMessage() {
acknowledgedMessages.inc();
}

@Override
protected void markRejectedMessage() {
rejectedMessages.inc();
}

@Override
protected void markMessagePublishAcknowledged() {
ackedPublishedMessages.inc();
}

@Override
protected void markMessagePublishNotAcknowledged() {
nackedPublishedMessages.inc();
}

@Override
protected void markPublishedMessageUnrouted() {
unroutedPublishedMessages.inc();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// package com.itv.bucky.epimetheus

// import scala.language.higherKinds

// import com.itv.bucky.AmqpClient
// import io.chrisdavenport.epimetheus.CollectorRegistry
// import io.chrisdavenport.epimetheus.Counter
// import io.chrisdavenport.epimetheus.Gauge

// case class PrometheusMetrics[F[_]](
// connections: Gauge[F],
// channels: Gauge[F],
// publishedMessages: Counter[F],
// consumedMessages: Counter[F],
// acknowledgedMessages: Counter[F],
// rejectedMessages: Counter[F],
// failedToPublishMessages: Counter[F],
// ackedPublishedMessages: Counter[F],
// nackedPublishedMessages: Counter[F],
// unroutedPublishedMessages: Counter[F]
// )

// object Prometheus {

// def apply[F[_]](cr: CollectorRegistry[F])(amqpClient: AmqpClient[F]): AmqpClient[F] =
// amqpClient
// }


0 comments on commit d002ace

Please sign in to comment.