Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Add Metadata poll timer metric #105

Merged
merged 2 commits into from
Jan 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ Labels: `cluster_name, topic, partition`

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.

**`kafka_consumergroup_poll_time_ms`**

Labels: `cluster_name`

The time taken to poll (milli seconds) all the information from all consumer groups for every cluster.

### Labels

Each metric may include the following labels when reported. If you define the `labels` property for configuration of a cluster then those labels will also be included.
Expand Down Expand Up @@ -389,6 +395,11 @@ kafka_consumergroup_group_max_lag{cluster_name="pipelines-strimzi",group="variab
This is an undocumented feature of the Prometheus HTTP server. For reference consult the [`parseQuery` method](https://github.com/prometheus/client_java/blob/4e0e7527b048f1ffd0382dcb74c0b9dab23b4d9f/simpleclient_httpserver/src/main/java/io/prometheus/client/exporter/HTTPServer.java#L101) for the
HTTP server in the [`prometheus/client_java`](https://github.com/prometheus/client_java/) GitHub repository.

## Health Check
`kafka_consumergroup_poll_time_ms` metric exposes the time taken the poll all the consumer group information for every cluster. This can be used as health check endpoint and optionally fail the health check if it's greater than some value (longer than the poll interval)
Ex:
```$ curl -X GET -g http://localhost:8000/metrics?name[]=kafka_consumergroup_poll_time_ms```

## Development

### Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ object ConsumerGroupCollector {
sealed trait Stop extends Message
final case object Stop extends Stop
final case class StopWithError(throwable: Throwable) extends Message
final case class MetaData(pollTime: Long) extends Message
final case class OffsetsSnapshot(
timestamp: Long,
groups: List[String],
earliestOffsets: PartitionOffsets,
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
timestamp: Long,
groups: List[String],
earliestOffsets: PartitionOffsets,
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
) extends Message {
private val TpoFormat = " %-64s%-11s%s"
private val GtpFormat = " %-64s%-64s%-11s%s"
Expand Down Expand Up @@ -136,15 +137,17 @@ object ConsumerGroupCollector {
}

context.log.info("Collecting offsets")

val startPollingTime = config.clock.instant().toEpochMilli
val f = for {
(groups, groupTopicPartitions) <- client.getGroups()
offsetSnapshot <- getOffsetSnapshot(groups, groupTopicPartitions)
} yield offsetSnapshot

f.onComplete {
case Success(newOffsets) =>
val pollTimeMs = config.clock.instant().toEpochMilli - startPollingTime
context.self ! newOffsets
context.self ! MetaData(pollTimeMs)
case Failure(t) =>
context.self ! StopWithError(t)
}(ec)
Expand Down Expand Up @@ -176,6 +179,12 @@ object ConsumerGroupCollector {
)

collector(config, client, reporter, newState)

case (context, metaData: MetaData) =>
context.log.debug("Received Meta data:\n{}", metaData)
reportPollTimeMetrics(config, reporter, metaData)
Behaviors.same

case (context, _: Stop) =>
state.scheduledCollect.cancel()
Behaviors.stopped { () =>
Expand Down Expand Up @@ -248,10 +257,10 @@ object ConsumerGroupCollector {
}

private def reportEarliestOffsetMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
offsetsSnapshot: OffsetsSnapshot
): Unit = {
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
offsetsSnapshot: OffsetsSnapshot
): Unit = {
for {(tp, topicPoint) <- offsetsSnapshot.earliestOffsets} yield {
reporter ! Metrics.TopicPartitionValueMessage(Metrics.EarliestOffsetMetric, config.cluster.name, tp, topicPoint.offset)
}
Expand Down Expand Up @@ -292,4 +301,12 @@ object ConsumerGroupCollector {
} reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic)
}
}
}

private def reportPollTimeMetrics(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
metaData: MetaData
): Unit = {
reporter ! Metrics.ClusterValueMessage(Metrics.PollTimeMetric, config.cluster.name, metaData.pollTime)
}
}
26 changes: 24 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ package com.lightbend.kafkalagexporter
import com.lightbend.kafkalagexporter.MetricsSink._

object Metrics {
sealed trait ClusterMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
override def labels: List[String] =
List(
clusterName
)
}

final case class ClusterValueMessage(definition: GaugeDefinition, clusterName: String, value: Double) extends ClusterMessage with MetricValue

sealed trait TopicPartitionMessage extends Message with Metric {
def definition: GaugeDefinition
def clusterName: String
def topicPartition: Domain.TopicPartition
override def labels: List[String] =
List(
clusterName, topicPartition.topic,
clusterName,
topicPartition.topic,
topicPartition.partition.toString
)
}
Expand Down Expand Up @@ -85,6 +97,9 @@ object Metrics {

val groupLabels = List("cluster_name", "group")

val ClusterLabels = List("cluster_name")


val MaxGroupOffsetLagMetric = GaugeDefinition(
"kafka_consumergroup_group_max_lag",
"Max group offset lag",
Expand Down Expand Up @@ -131,6 +146,12 @@ object Metrics {
groupTopicLabels
)

val PollTimeMetric = GaugeDefinition(
"kafka_consumergroup_poll_time_ms",
"Group time poll time",
ClusterLabels
)

val definitions = List(
LatestOffsetMetric,
EarliestOffsetMetric,
Expand All @@ -140,6 +161,7 @@ object Metrics {
OffsetLagMetric,
TimeLagMetric,
SumGroupOffsetLagMetric,
SumGroupTopicOffsetLagMetric
SumGroupTopicOffsetLagMetric,
PollTimeMetric
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,12 @@ class IntegrationSpec extends SpecBase(kafkaPort = 9094, exporterPort = 8000) wi

eventually(scrapeAndAssertDne(exporterPort, "Assert offset-based metrics no longer exist", rules: _*))
}

"report poll time metric greater than 0 ms" in {
assertAllStagesStopped {
val rule = Rule.create(PollTimeMetric, (actual: String) => actual.toDouble should be > 0d, clusterName)
eventually(scrapeAndAssert(exporterPort, "Assert poll time metric", rule))
}
}
}
}