Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Get commit offset for correct group topic partitions (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo authored Sep 2, 2019
1 parent e7974d8 commit 76f28a9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
23 changes: 12 additions & 11 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,35 +141,36 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,

/**
* Get last committed Consumer Group offsets for all group topic partitions given a list of consumer groups. When a
* topic partition has no matched Consumer Group offset then a default offset of 0 is provided.
* topic partition has no matched Consumer Group offset then a default of None is provided.
* @return A series of Future's for Consumer Group offsets requests to Kafka.
*/
def getGroupOffsets(now: Long, groups: List[String], gtps: List[Domain.GroupTopicPartition]): Future[GroupOffsets] = {
def getGroupOffsets(now: Long, groups: List[String], allGtps: List[Domain.GroupTopicPartition]): Future[GroupOffsets] = {
val groupOffsetsF: Future[List[GroupOffsets]] = Future.sequence {
groups.map { group =>
kafkaFuture(getListConsumerGroupOffsets(group))
val gtps = allGtps.filter(_.id == group)
getListConsumerGroupOffsets(group)
.map(offsetMap => getGroupOffsets(now, gtps, offsetMap.asScala.toMap))
}
}

groupOffsetsF
.map(_.flatten.toMap)
.map(go => getOffsetOrZero(gtps, go))
.map(go => getOffsetOrZero(allGtps, go))
}


/**
* Call to `AdminClient` to get group offset info. This is only its own method so it can be mocked out in a test
* because it's not possible to instantiate or mock the `ListConsumerGroupOffsetsResult` type for some reason.
*/
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): KafkaFuture[util.Map[KafkaTopicPartition, OffsetAndMetadata]] = {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}

/**
* Backfill any group topic partitions with no offset as 0
* Backfill any group topic partitions with no offset as None
*/
private[kafkalagexporter] def getOffsetOrZero(
gtps: List[Domain.GroupTopicPartition],
Expand Down
55 changes: 41 additions & 14 deletions src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import java.util.Optional
import com.lightbend.kafkalagexporter.Domain.GroupOffsets
import com.lightbend.kafkalagexporter.KafkaClient.KafkaTopicPartitionOps
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.KafkaFuture
import org.mockito.MockitoSugar
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FreeSpec, Matchers}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.FiniteDuration

class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoSugar with ScalaFutures {
Expand All @@ -37,29 +36,57 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS

val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")))

val offset = new OffsetAndMetadata(1, Optional.empty(), "")

val groupId0Results = KafkaFuture.completedFuture(Map(
topicPartition0.asKafka -> offset
val groupId0Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "")
// missing topicPartition1
// missing topicPartition2
).asJava)
doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0)

val groupId1Results = KafkaFuture.completedFuture(Map(
topicPartition0.asKafka -> offset
val groupId1Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "")
).asJava)

doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1)


val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue

groupOffsets shouldEqual GroupOffsets(
gtp0_0 -> Some(LookupTable.Point(1, 0)),
gtp1_0 -> None, // missing partition
gtp2_0 -> None, // missing partition
gtp0_1 -> Some(LookupTable.Point(1, 0))
)
groupOffsets.size shouldEqual 4
groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0))
groupOffsets(gtp1_0) shouldEqual None // missing partition
groupOffsets(gtp2_0) shouldEqual None // missing partition
groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0))

}

"getGroupOffsets returns distinct offsets when multiple groups subscribe to same partitions" in {
val groupId0 = "testGroupId0"
val groupId1 = "testGroupId1"

val groups = List(groupId0, groupId1)

val gtp0_0 = gtp0.copy(id = groupId0)
val gtp0_1 = gtp0.copy(id = groupId1)

val gtps = List(gtp0_0, gtp0_1)

val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")))

val groupId0Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "")
).asJava)
doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0)

val groupId1Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "")
).asJava)
doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1)

val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue

groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0))
groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0))
}

"getOffsetOrZero returns offsets of None (Option[Point]) for missing partitions" in {
Expand Down

0 comments on commit 76f28a9

Please sign in to comment.