Skip to content

Commit

Permalink
Add histogram-based metrics (#445)
Browse files Browse the repository at this point in the history
* Add histogram-based metrics (#440)

* Extract ProducerMetrics implementations into named classes

* Add Histograms implementation for ProducerMetrics

* Extract ConsumerMetrics implementations into named classes

* Add Histogram implementation of ConsumerMetrics

* Name Throwables in MonadCancel

* Docs for latencyBuckets

* Apply suggestions from code review

Co-authored-by: Mareks Rampāns <[email protected]>

* Update skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala

Co-authored-by: Mareks Rampāns <[email protected]>

* changed name

* fix scala 3

* Apply suggestions from code review

Co-authored-by: Mareks Rampāns <[email protected]>

* bump minor version

---------

Co-authored-by: Mareks Rampāns <[email protected]>
Co-authored-by: Denys Fakhritdinov <[email protected]>

(cherry picked from commit 0b37012)

* bump stuff
  • Loading branch information
vilunov authored Sep 25, 2024
1 parent d976f4b commit 46ee112
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 135 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
strategy:
matrix:
scala:
- 2.13.8
- 2.12.15
- 2.13.14
- 2.12.20

steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ lazy val commonSettings = Seq(
organizationName := "Evolution",
organizationHomepage := Some(url("https://evolution.com")),
scalaVersion := crossScalaVersions.value.head,
crossScalaVersions := Seq("2.13.8", "2.12.15"),
crossScalaVersions := Seq("2.13.14", "2.12.20"),
licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))),
releaseCrossBuild := true,
Compile / doc / scalacOptions += "-no-link-warnings",
Expand All @@ -22,7 +22,7 @@ lazy val commonSettings = Seq(
// KeyRanks.Invisible to suppress sbt warning about key not being used in root/tests where MiMa plugin is disabled
mimaPreviousArtifacts.withRank(KeyRanks.Invisible) := {
val versions = List(
"11.0.0",
"11.17.0",
)
versions.map(organization.value %% moduleName.value % _).toSet
},
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Dependencies {
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.3"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"

object Kafka {
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.7.1
sbt.version=1.10.1
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.0")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.2")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.evolutiongaming.skafka.consumer

import cats.data.NonEmptyList
import cats.effect.Resource
import cats.implicits._
import cats.{Applicative, Monad, ~>}
import com.evolutiongaming.skafka.producer.ProducerMetrics
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles}
import com.evolutiongaming.smetrics._

import scala.concurrent.duration.FiniteDuration

Expand All @@ -32,17 +34,18 @@ object ConsumerMetrics {

def empty[F[_]: Applicative]: ConsumerMetrics[F] = const(().pure[F])

def const[F[_]](unit: F[Unit]): ConsumerMetrics[F] = new ConsumerMetrics[F] {
def const[F[_]](unit: F[Unit]): ConsumerMetrics[F] = new Const(unit)

def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = unit
private final class Const[F[_]](unit: F[Unit]) extends ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = unit

def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = unit
override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = unit

def count(name: String, topic: Topic) = unit
override def count(name: String, topic: Topic): F[Unit] = unit

def rebalance(name: String, topicPartition: TopicPartition) = unit
override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = unit

def topics(latency: FiniteDuration) = unit
override def topics(latency: FiniteDuration): F[Unit] = unit
}

def of[F[_]: Monad](
Expand Down Expand Up @@ -104,57 +107,233 @@ object ConsumerMetrics {
bytesSummary <- bytesSummary
rebalancesCounter <- rebalancesCounter
topicsLatency <- topicsLatency
ageSummary <- registry.summary(
name = s"${ prefix }_poll_age",
help = "Poll records age, time since record.timestamp",
ageSummary <- registry.summary(
name = s"${prefix}_poll_age",
help = "Poll records age, time since record.timestamp",
quantiles = Quantiles.Default,
labels = LabelNames("client", "topic")
labels = LabelNames("client", "topic")
)
} yield { clientId: ClientId =>
new ConsumerMetrics[F] {
} yield { (clientId: ClientId) =>
new Summaries(
callsCounter,
resultCounter,
latencySummary,
recordsSummary,
bytesSummary,
rebalancesCounter,
topicsLatency,
ageSummary,
clientId
)
}
}

def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = {
val result = if (success) "success" else "failure"
for {
_ <- latencySummary.labels(clientId, topic, name).observe(latency.toNanos.nanosToSeconds)
_ <- resultCounter.labels(clientId, topic, name, result).inc()
} yield {}
}
private final class Summaries[F[_]: Monad](
callsCounter: LabelValues.`3`[Counter[F]],
resultCounter: LabelValues.`4`[Counter[F]],
latencySummary: LabelValues.`3`[Summary[F]],
recordsSummary: LabelValues.`2`[Summary[F]],
bytesSummary: LabelValues.`2`[Summary[F]],
rebalancesCounter: LabelValues.`3`[Counter[F]],
topicsLatency: LabelValues.`1`[Summary[F]],
ageSummary: LabelValues.`2`[Summary[F]],
clientId: ClientId,
) extends ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = {
val result = if (success) "success" else "failure"
for {
_ <- latencySummary.labels(clientId, topic, name).observe(latency.toNanos.nanosToSeconds)
_ <- resultCounter.labels(clientId, topic, name, result).inc()
} yield {}
}

def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = {
for {
_ <- recordsSummary
.labels(clientId, topic)
.observe(records.toDouble)
_ <- bytesSummary
.labels(clientId, topic)
.observe(bytes.toDouble)
a <- age.foldMapM { age =>
ageSummary
.labels(clientId, topic)
.observe(age.toNanos.nanosToSeconds)
}
} yield a
override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = {
for {
_ <- recordsSummary
.labels(clientId, topic)
.observe(records.toDouble)
_ <- bytesSummary
.labels(clientId, topic)
.observe(bytes.toDouble)
a <- age.foldMapM { age =>
ageSummary
.labels(clientId, topic)
.observe(age.toNanos.nanosToSeconds)
}
} yield a
}

def count(name: String, topic: Topic) = {
callsCounter
.labels(clientId, topic, name)
.inc()
}
override def count(name: String, topic: Topic): F[Unit] = {
callsCounter
.labels(clientId, topic, name)
.inc()
}

def rebalance(name: String, topicPartition: TopicPartition) = {
rebalancesCounter
.labels(clientId, topicPartition.topic, name)
.inc()
}
override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = {
rebalancesCounter
.labels(clientId, topicPartition.topic, name)
.inc()
}

def topics(latency: FiniteDuration) = {
topicsLatency
.labels(clientId)
.observe(latency.toNanos.nanosToSeconds)
override def topics(latency: FiniteDuration): F[Unit] = {
topicsLatency
.labels(clientId)
.observe(latency.toNanos.nanosToSeconds)
}
}

def histograms[F[_]: Monad](
registry: CollectorRegistry[F],
prefix: Prefix = Prefix.Default
): Resource[F, ClientId => ConsumerMetrics[F]] = {

val callsCounter = registry.counter(
name = s"${prefix}_calls_total",
help = "Number of topic calls",
labels = LabelNames("client", "topic", "type")
)

val resultCounter = registry.counter(
name = s"${prefix}_results_total",
help = "Topic call result: success or failure",
labels = LabelNames("client", "topic", "type", "result")
)

val latencyHistogram = registry.histogram(
name = s"${prefix}_latency_seconds",
help = "Topic call latency in seconds",
buckets = latencyBuckets,
labels = LabelNames("client", "topic", "type")
)

val recordsHistogram = registry.histogram(
name = s"${prefix}_poll_records_total",
help = "Number of records per poll",
buckets = pollCountBuckets,
labels = LabelNames("client", "topic")
)

val bytesHistogram = registry.histogram(
name = s"${prefix}_poll_histogram_bytes",
help = "Number of bytes per poll",
buckets = pollBytesBuckets,
labels = LabelNames("client", "topic")
)

val rebalancesCounter = registry.counter(
name = s"${prefix}_rebalances_total",
help = "Number of rebalances",
labels = LabelNames("client", "topic", "type")
)

val topicsLatency = registry.histogram(
name = s"${prefix}_topics_latency_seconds",
help = "List topics latency in seconds",
buckets = latencyBuckets,
labels = LabelNames("client")
)
val ageHistogram = registry.histogram(
name = s"${prefix}_poll_age_seconds",
help = "Poll records age, time since record.timestamp",
buckets = pollAgeBuckets,
labels = LabelNames("client", "topic")
)

for {
callsCounter <- callsCounter
resultCounter <- resultCounter
latencyHistogram <- latencyHistogram
recordsHistogram <- recordsHistogram
bytesHistogram <- bytesHistogram
rebalancesCounter <- rebalancesCounter
topicsLatency <- topicsLatency
ageHistogram <- ageHistogram
} yield { (clientId: ClientId) =>
new Histograms(
callsCounter,
resultCounter,
latencyHistogram,
recordsHistogram,
bytesHistogram,
rebalancesCounter,
topicsLatency,
ageHistogram,
clientId
)
}
}
private val latencyBuckets =
ProducerMetrics.latencyBuckets
private val pollCountBuckets =
Buckets(NonEmptyList.of(1, 20, 50, 200, 500))
private val pollBytesBuckets =
Buckets(
NonEmptyList.of(
2 * 1024,
32 * 1024,
64 * 1024,
128 * 1024,
512 * 1024,
1024 * 1024, // 1 MB – max record size
2 * 1024 * 1024,
32 * 1024 * 1024,
512 * 1024 * 1024, // 500 records per poll – the max poll size defaults
)
)
private val pollAgeBuckets =
Buckets(NonEmptyList.of(10e-3, 20e-3, 50e-3, 200e-3, 500e-3, 2, 5, 20, 60, 120))

private final class Histograms[F[_]: Monad](
callsCounter: LabelValues.`3`[Counter[F]],
resultCounter: LabelValues.`4`[Counter[F]],
latencySummary: LabelValues.`3`[Histogram[F]],
recordsSummary: LabelValues.`2`[Histogram[F]],
bytesSummary: LabelValues.`2`[Histogram[F]],
rebalancesCounter: LabelValues.`3`[Counter[F]],
topicsLatency: LabelValues.`1`[Histogram[F]],
ageSummary: LabelValues.`2`[Histogram[F]],
clientId: ClientId,
) extends ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = {
val result = if (success) "success" else "failure"
for {
_ <- latencySummary.labels(clientId, topic, name).observe(latency.toNanos.nanosToSeconds)
_ <- resultCounter.labels(clientId, topic, name, result).inc()
} yield {}
}

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = {
for {
_ <- recordsSummary
.labels(clientId, topic)
.observe(records.toDouble)
_ <- bytesSummary
.labels(clientId, topic)
.observe(bytes.toDouble)
a <- age.foldMapM { age =>
ageSummary
.labels(clientId, topic)
.observe(age.toNanos.nanosToSeconds)
}
}
} yield a
}

override def count(name: String, topic: Topic): F[Unit] = {
callsCounter
.labels(clientId, topic, name)
.inc()
}

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = {
rebalancesCounter
.labels(clientId, topicPartition.topic, name)
.inc()
}

override def topics(latency: FiniteDuration): F[Unit] = {
topicsLatency
.labels(clientId)
.observe(latency.toNanos.nanosToSeconds)
}
}

Expand Down
Loading

0 comments on commit 46ee112

Please sign in to comment.