Skip to content

Commit

Permalink
Add client admin wrapper (#77)
Browse files Browse the repository at this point in the history
* Add a wrapper for Kafka's AdminClient

Signed-off-by: Itamar Ravid <[email protected]>

* Add scaladoc and simplifications
  • Loading branch information
iravid authored Nov 19, 2019
1 parent 26af9c6 commit 5bbd372
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 13 deletions.
146 changes: 146 additions & 0 deletions docs/annotatedTests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Annotated Tests

By way of examples, we present some annotated tests from the test suite.

Tests are written using zio-test.

# General Notes

The tests make use of KafkaTestUtils.scala which comprises a number of helper methods
for testing zio-kafka. You may find it useful to copy this file into your own test
folder for writing your kafka-based tests (there is no zio-test-utils project
at present). Relevant portions of the KafkaTestUtils will be introduced as we work
through the tests.

# First Producer Test
```scala
object ProducerTest
extends DefaultRunnableSpec(
suite("consumer test suite")(
testM("one record") {
withProducerStrings { producer =>
for {
_ <- producer.produce(new ProducerRecord("topic", "boo", "baa"))
} yield assertCompletes
}
}
).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment)
)
```

First note the .provideManagedShared. This gives the tests a Kafka.Service
added to a full TestEnvironment (this is needed because we want to provide both
Live clock and the Kafka service)

### Kafka.Service

This follows the module pattern (see main zio docs)
```scala
object Kafka {
trait Service {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}
case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka.Service {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true))
}

case object DefaultLocal extends Kafka.Service {
override def bootstrapServers: List[String] = List(s"localhost:9092")

override def stop(): UIO[Unit] = UIO.unit
}

val makeEmbedded: Managed[Nothing, Kafka] =
ZManaged.make(ZIO.effectTotal(new Kafka {
override val kafka: Service = EmbeddedKafkaService(EmbeddedKafka.start())
}))(_.kafka.stop())

val makeLocal: Managed[Nothing, Kafka] =
ZManaged.make(ZIO.effectTotal(new Kafka {
override val kafka: Service = DefaultLocal
}))(_.kafka.stop())
```

In fact there are 2 provided implementations of service. The first is for the unit
tests and makes use of [EmbeddedKafka](https://github.com/embeddedkafka/embedded-kafka)

The second uses the default local port and is suitable for a stand-alone kafka
(I used docker installation). You could create your own Kafka.Service for testing
against remove servers (but security would need to be added).

Note the use of ZManaged to ensure the service is also stopped.

### KafkaTestEnvironment

```scala
object KafkaTestUtils {

def kafkaEnvironment(kafkaE: Managed[Nothing, Kafka]): Managed[Nothing, KafkaTestEnvironment] =
for {
testEnvironment <- TestEnvironment.Value
kafkaS <- kafkaE
} yield new TestEnvironment(
testEnvironment.blocking,
testEnvironment.clock,
testEnvironment.console,
testEnvironment.live,
testEnvironment.random,
testEnvironment.sized,
testEnvironment.system
) with Kafka {
val kafka = kafkaS.kafka
}

val embeddedKafkaEnvironment: Managed[Nothing, KafkaTestEnvironment] =
kafkaEnvironment(Kafka.makeEmbedded)
```

## Back to the producer
The producer function is wrapped in a withProducerStrings:

```scala
def producerSettings =
for {
servers <- ZIO.access[Kafka](_.kafka.bootstrapServers)
} yield ProducerSettings(
servers,
5.seconds,
Map.empty
)

def withProducer[A, K, V](
r: Producer[Any, K, V] => RIO[Any with Clock with Kafka with Blocking, A],
kSerde: Serde[Any, K],
vSerde: Serde[Any, V]
): RIO[KafkaTestEnvironment, A] =
for {
settings <- producerSettings
producer = Producer.make(settings, kSerde, vSerde)
lcb <- Kafka.liveClockBlocking
produced <- producer.use { p =>
r(p).provide(lcb)
}
} yield produced

def withProducerStrings[A](r: Producer[Any, String, String] => RIO[Any with Clock with Kafka with Blocking, A]) =
withProducer(r, Serde.string, Serde.string)
```
withProducerStrings simply wraps withProducer with the (String, String) type

withProducer creates settings and a ZManaged\[Producer\]. It then creates liveClockBlocking - a
zio environment with the Live clock (which is essential when running code that
uses scheduling or other timing features - as does much of zio-kafka)

The actual poroducer operation function is simply wrapped in the producer.use
```scala
_ <- producer.produce(new ProducerRecord("topic", "boo", "baa"))
```
producer.produce takes a ProducerRecord (defined in the java kafka client on which
this library ios based). In this case the topic is "topic" and the key and value
boo and baa




202 changes: 202 additions & 0 deletions src/main/scala/zio/kafka/client/AdminClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package zio.kafka.client

import org.apache.kafka.clients.admin.{
AdminClient => JAdminClient,
NewTopic => JNewTopic,
NewPartitions => JNewPartitions,
TopicDescription => JTopicDescription,
_
}
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.{ KafkaFuture, TopicPartitionInfo }
import zio._

import scala.collection.JavaConverters._

/**
* Thin wrapper around apache java AdminClient. See java api for descriptions
* @param adminClient
* @param semaphore
*/
case class AdminClient(private val adminClient: JAdminClient, private val semaphore: Semaphore) {
import AdminClient._

/**
* Create multiple topics.
*/
def createTopics(
newTopics: Iterable[NewTopic],
createTopicOptions: Option[CreateTopicsOptions] = None
): BlockingTask[Unit] = {
val asJava = newTopics.map(_.asJava).asJavaCollection
semaphore.withPermit(
fromKafkaFutureVoid {
blocking
.effectBlocking(
createTopicOptions
.fold(adminClient.createTopics(asJava))(opts => adminClient.createTopics(asJava, opts))
.all()
)
}
)
}

/**
* Create a single topic.
*/
def createTopic(newTopic: NewTopic, validateOnly: Boolean = false): BlockingTask[Unit] =
createTopics(List(newTopic), Some(new CreateTopicsOptions().validateOnly(validateOnly)))

/**
* Delete multiple topics.
*/
def deleteTopics(
topics: Iterable[String],
deleteTopicsOptions: Option[DeleteTopicsOptions] = None
): BlockingTask[Unit] = {
val asJava = topics.asJavaCollection

semaphore.withPermit {
fromKafkaFutureVoid {
blocking
.effectBlocking(
deleteTopicsOptions
.fold(adminClient.deleteTopics(asJava))(opts => adminClient.deleteTopics(asJava, opts))
.all()
)
}
}
}

/**
* Delete a single topic.
*/
def deleteTopic(topic: String): BlockingTask[Unit] =
deleteTopics(List(topic))

/**
* List the topics in the cluster.
*/
def listTopics(listTopicsOptions: Option[ListTopicsOptions] = None): BlockingTask[Map[String, TopicListing]] =
semaphore.withPermit {
fromKafkaFuture {
blocking.effectBlocking(
listTopicsOptions.fold(adminClient.listTopics())(opts => adminClient.listTopics(opts)).namesToListings()
)
}.map(_.asScala.toMap)
}

/**
* Describe the specified topics.
*/
def describeTopics(
topicNames: Iterable[String],
describeTopicsOptions: Option[DescribeTopicsOptions] = None
): BlockingTask[Map[String, TopicDescription]] = {
val asJava = topicNames.asJavaCollection
semaphore.withPermit {
fromKafkaFuture {
blocking.effectBlocking(
describeTopicsOptions
.fold(adminClient.describeTopics(asJava))(opts => adminClient.describeTopics(asJava, opts))
.all()
)
}.map(_.asScala.mapValues(AdminClient.TopicDescription(_)).toMap)
}
}

/**
* Add new partitions to a topic.
*/
def createPartitions(
newPartitions: Map[String, NewPartitions],
createPartitionsOptions: Option[CreatePartitionsOptions] = None
): BlockingTask[Unit] = {
val asJava = newPartitions.mapValues(_.asJava).asJava

semaphore.withPermit {
fromKafkaFutureVoid {
blocking.effectBlocking(
createPartitionsOptions
.fold(adminClient.createPartitions(asJava))(opts => adminClient.createPartitions(asJava, opts))
.all()
)
}
}
}
}

object AdminClient {
def fromKafkaFuture[R, T](kfv: RIO[R, KafkaFuture[T]]): RIO[R, T] =
kfv.flatMap { f =>
Task.effectAsync[T] { cb =>
f.whenComplete {
new KafkaFuture.BiConsumer[T, Throwable] {
def accept(t: T, e: Throwable) =
if (e ne null) cb(Task.fail(e))
else cb(Task.succeed(t))
}
}
()
}
}

def fromKafkaFutureVoid[R](kfv: RIO[R, KafkaFuture[Void]]): RIO[R, Unit] =
fromKafkaFuture(kfv).unit

case class NewTopic(
name: String,
numPartitions: Int,
replicationFactor: Short,
configs: Map[String, String] = Map()
) {
def asJava: JNewTopic = {
val jn = new JNewTopic(name, numPartitions, replicationFactor)

if (configs.nonEmpty)
jn.configs(configs.asJava)

jn
}
}

case class NewPartitions(
totalCount: Int,
newAssignments: List[List[Int]] = Nil
) {
def asJava =
if (newAssignments.nonEmpty)
JNewPartitions.increaseTo(totalCount, newAssignments.map(_.map(Int.box).asJava).asJava)
else JNewPartitions.increaseTo(totalCount)

}

case class TopicDescription(
name: String,
internal: Boolean,
partitions: List[TopicPartitionInfo],
authorizedOperations: Set[AclOperation]
)

object TopicDescription {
def apply(jt: JTopicDescription): TopicDescription =
TopicDescription(jt.name, jt.isInternal, jt.partitions.asScala.toList, jt.authorizedOperations.asScala.toSet)
}

case class KafkaAdminClientConfig(
bootstrapServers: List[String],
additionalConfig: Map[String, AnyRef] = Map.empty
)

def make(config: KafkaAdminClientConfig) =
ZManaged.make {
val configMap = (config.additionalConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.bootstrapServers
.mkString(","))).asJava
for {
ac <- ZIO(JAdminClient.create(configMap))
sem <- Semaphore.make(1L)
} yield AdminClient(ac, sem)
} { client =>
ZIO.effectTotal(client.adminClient.close())
}
}
8 changes: 6 additions & 2 deletions src/main/scala/zio/kafka/client/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Consumer private (
private val consumer: ConsumerAccess,
private val settings: ConsumerSettings,
private val runloop: Runloop
) { self =>
) {
self =>
def assignment: BlockingTask[Set[TopicPartition]] =
consumer.withConsumer(_.assignment().asScala.toSet)

Expand All @@ -30,7 +31,10 @@ class Consumer private (
partitions: Set[TopicPartition],
timeout: Duration = Duration.Infinity
): BlockingTask[Map[TopicPartition, Long]] =
consumer.withConsumer(_.endOffsets(partitions.asJava, timeout.asJava).asScala.mapValues(_.longValue()).toMap)
consumer.withConsumer { eo =>
val offs = eo.endOffsets(partitions.asJava, timeout.asJava)
offs.asScala.mapValues(_.longValue()).toMap
}

/**
* Stops consumption of data, drains buffered records, and ends the attached
Expand Down
Loading

0 comments on commit 5bbd372

Please sign in to comment.