Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Get commit offset for correct group topic partitions #56

Merged
merged 1 commit into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,35 +141,36 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,

/**
* Get last committed Consumer Group offsets for all group topic partitions given a list of consumer groups. When a
* topic partition has no matched Consumer Group offset then a default offset of 0 is provided.
* topic partition has no matched Consumer Group offset then a default of None is provided.
* @return A series of Future's for Consumer Group offsets requests to Kafka.
*/
def getGroupOffsets(now: Long, groups: List[String], gtps: List[Domain.GroupTopicPartition]): Future[GroupOffsets] = {
def getGroupOffsets(now: Long, groups: List[String], allGtps: List[Domain.GroupTopicPartition]): Future[GroupOffsets] = {
val groupOffsetsF: Future[List[GroupOffsets]] = Future.sequence {
groups.map { group =>
kafkaFuture(getListConsumerGroupOffsets(group))
val gtps = allGtps.filter(_.id == group)
getListConsumerGroupOffsets(group)
.map(offsetMap => getGroupOffsets(now, gtps, offsetMap.asScala.toMap))
}
}

groupOffsetsF
.map(_.flatten.toMap)
.map(go => getOffsetOrZero(gtps, go))
.map(go => getOffsetOrZero(allGtps, go))
}


/**
* Call to `AdminClient` to get group offset info. This is only its own method so it can be mocked out in a test
* because it's not possible to instantiate or mock the `ListConsumerGroupOffsetsResult` type for some reason.
*/
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): KafkaFuture[util.Map[KafkaTopicPartition, OffsetAndMetadata]] = {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}

/**
* Backfill any group topic partitions with no offset as 0
* Backfill any group topic partitions with no offset as None
*/
private[kafkalagexporter] def getOffsetOrZero(
gtps: List[Domain.GroupTopicPartition],
Expand Down
55 changes: 41 additions & 14 deletions src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import java.util.Optional
import com.lightbend.kafkalagexporter.Domain.GroupOffsets
import com.lightbend.kafkalagexporter.KafkaClient.KafkaTopicPartitionOps
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.KafkaFuture
import org.mockito.MockitoSugar
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FreeSpec, Matchers}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.FiniteDuration

class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoSugar with ScalaFutures {
Expand All @@ -37,29 +36,57 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS

val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")))

val offset = new OffsetAndMetadata(1, Optional.empty(), "")

val groupId0Results = KafkaFuture.completedFuture(Map(
topicPartition0.asKafka -> offset
val groupId0Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "")
// missing topicPartition1
// missing topicPartition2
).asJava)
doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0)

val groupId1Results = KafkaFuture.completedFuture(Map(
topicPartition0.asKafka -> offset
val groupId1Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "")
).asJava)

doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1)


val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue

groupOffsets shouldEqual GroupOffsets(
gtp0_0 -> Some(LookupTable.Point(1, 0)),
gtp1_0 -> None, // missing partition
gtp2_0 -> None, // missing partition
gtp0_1 -> Some(LookupTable.Point(1, 0))
)
groupOffsets.size shouldEqual 4
groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0))
groupOffsets(gtp1_0) shouldEqual None // missing partition
groupOffsets(gtp2_0) shouldEqual None // missing partition
groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0))

}

"getGroupOffsets returns distinct offsets when multiple groups subscribe to same partitions" in {
val groupId0 = "testGroupId0"
val groupId1 = "testGroupId1"

val groups = List(groupId0, groupId1)

val gtp0_0 = gtp0.copy(id = groupId0)
val gtp0_1 = gtp0.copy(id = groupId1)

val gtps = List(gtp0_0, gtp0_1)

val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")))

val groupId0Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "")
).asJava)
doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0)

val groupId1Results = Future.successful(Map(
topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "")
).asJava)
doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1)

val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue

groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0))
groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0))
}

"getOffsetOrZero returns offsets of None (Option[Point]) for missing partitions" in {
Expand Down