Skip to content

Commit

Permalink
Add histogram-based metrics (#440)
Browse files Browse the repository at this point in the history
* Extract ProducerMetrics implementations into named classes

* Add Histograms implementation for ProducerMetrics

* Extract MapK into a named class

* Extract ConsumerMetrics implementations into named classes

* Add Histogram implementation of ConsumerMetrics

* Name Throwables in MonadCancel

* Docs for latencyBuckets

* Apply suggestions from code review

Co-authored-by: Mareks Rampāns <[email protected]>

* Update skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala

Co-authored-by: Mareks Rampāns <[email protected]>

* changed name

* fix scala 3

* Apply suggestions from code review

Co-authored-by: Mareks Rampāns <[email protected]>

* bump minor version

---------

Co-authored-by: Mareks Rampāns <[email protected]>
Co-authored-by: Denys Fakhritdinov <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2024
1 parent 3ae0e56 commit 0b37012
Show file tree
Hide file tree
Showing 4 changed files with 514 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,7 @@ object ConsumerMetricsOf {
private def consumerMetricsOf[F[_]](
source: ConsumerMetrics[F],
registry: KafkaMetricsRegistry[F],
): ConsumerMetrics[F] =
new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
source.poll(topic, bytes, records, age)

override def count(name: String, topic: Topic): F[Unit] =
source.count(name, topic)

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
source.rebalance(name, topicPartition)

override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)

}
): ConsumerMetrics[F] = new WithJavaClientMetrics(source, registry)

implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {

Expand All @@ -89,4 +69,25 @@ object ConsumerMetricsOf {
withJavaClientMetrics(source, prometheus, prefix)

}

private final class WithJavaClientMetrics[F[_]](source: ConsumerMetrics[F], registry: KafkaMetricsRegistry[F])
extends ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
source.poll(topic, bytes, records, age)

override def count(name: String, topic: Topic): F[Unit] =
source.count(name, topic)

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
source.rebalance(name, topicPartition)

override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)
}
}
Loading

0 comments on commit 0b37012

Please sign in to comment.