Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update guide ("Embedding Kafka" -> "Dockerizing Kafka") #777

Merged
merged 12 commits into from
Jul 20, 2023
Merged
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ managed-kafka-streams = { module = 'org.apache.kafka:kafka-streams', version.ref
opentracing-kafka-client = { module = 'io.opentracing.contrib:opentracing-kafka-client', version.ref = 'opentracing-kafka-client' }
opentracing-mock = { module = 'io.opentracing:opentracing-mock', version.ref = 'opentracing-mock' }

testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = 'testcontainers' }

zipkin-brave-kafka-clients = { module = 'io.zipkin.brave:brave-instrumentation-kafka-clients', version.ref = 'zipkin-brave-kafka-clients' }
awaitility = { module = 'org.awaitility:awaitility', version.ref = 'awaitility' }

Expand All @@ -51,6 +53,7 @@ micronaut-reactor = { module = "io.micronaut.reactor:micronaut-reactor-bom", ver
micronaut-rxjava2 = { module = "io.micronaut.rxjava2:micronaut-rxjava2-bom", version.ref = "micronaut-rxjava2" }
micronaut-serde = { module = "io.micronaut.serde:micronaut-serde-bom", version.ref = "micronaut-serde" }
micronaut-tracing = { module = "io.micronaut.tracing:micronaut-tracing-bom", version.ref = "micronaut-tracing" }
junit-jupiter-engine = { module = 'org.junit.jupiter:junit-jupiter-engine' }

micronaut-gradle-plugin = { module = "io.micronaut.gradle:micronaut-gradle-plugin", version.ref = "micronaut-gradle-plugin" }

3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ include 'kafka'
include 'kafka-streams'
include 'tests:tasks-sasl-plaintext'

include 'test-suite'
include 'test-suite-groovy'
include 'test-suite-kotlin'
2 changes: 1 addition & 1 deletion src/main/docs/guide/kafkaClient/kafkaClientScope.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The previous example can be tested in JUnit with the following test:
include::{testskafka}/producer/inject/BookSenderTest.java[tags=test, indent=0]
----

<1> An embedded version of Kafka is used
<1> A Kafka docker container is used
<2> The `BookSender` is retrieved from the api:context.ApplicationContext[] and a `ProducerRecord` sent

By using the link:{kafkaapi}/org/apache/kafka/clients/producer/KafkaProducer.html[KafkaProducer] API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.
13 changes: 13 additions & 0 deletions src/main/docs/guide/kafkaClient/kafkaDockerized.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
https://micronaut-projects.github.io/micronaut-test-resources/latest/guide/#modules-kafka[Micronaut Test Resources] simplifies running Kafka for local development and testing.

> Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the `kafka.bootstrap.servers` property.

https://micronaut.io/launch[Micronaut Launch] and CLI already apply Test Resources to your build when you https://micronaut.io/launch?features=kafka[select the `kafka` feature].

Micronaut Test Resources uses https://testcontainers.com[Test Containers] under the hood. If you prefer to use Test Containers directly, you can create a https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers[Singleton Container] and combine it with https://micronaut-projects.github.io/micronaut-test/latest/api/io/micronaut/test/support/TestPropertyProvider.html[Micronaut Test `TestPropertyProvider`]:

snippet::io.micronaut.kafka.docs.AbstractKafkaTest[]

And then test:

snippet::io.micronaut.kafka.docs.MyTest[]
35 changes: 0 additions & 35 deletions src/main/docs/guide/kafkaClient/kafkaEmbedded.adoc

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ kafkaClient:
kafkaClientBatch: Sending Records in Batch
kafkaClientScope: Injecting Kafka Producer Beans
kafkaClientTx: Transactions
kafkaEmbedded: Embedding Kafka
kafkaDockerized: Running Kafka while testing and developing
kafkaListener:
title: Kafka Consumers Using @KafkaListener
kafkaListenerMethods: Defining @KafkaListener Methods
Expand Down
17 changes: 17 additions & 0 deletions test-suite-groovy/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
groovy
}

dependencies {
testImplementation(platform(mn.micronaut.core.bom))
testCompileOnly(mn.micronaut.inject.groovy)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.spock)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.Specification

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest extends Specification implements TestPropertyProvider {

static final KafkaContainer MY_KAFKA

static {
MY_KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
MY_KAFKA.start()
}

@Override
Map<String, String> getProperties() {
["kafka.bootstrap.servers": MY_KAFKA.getBootstrapServers()]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.micronaut.kafka.docs

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.*
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject

import static java.util.concurrent.TimeUnit.SECONDS
import static org.awaitility.Awaitility.await

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
class MyTest extends AbstractKafkaTest {

@Inject
MyProducer producer
@Inject
MyConsumer consumer

void "test kafka running"() {
given:
String message = "hello"

when:
producer.produce(message)

then:
await().atMost(5, SECONDS).until(() -> consumer.consumed == message)

cleanup:
MY_KAFKA.stop()
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
static interface MyProducer {
@Topic("my-topic")
void produce(String message)
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed

@Topic("my-topic")
void consume(String message) {
consumed = message
}
}

}
18 changes: 18 additions & 0 deletions test-suite-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("org.jetbrains.kotlin.jvm") version "1.8.22"
id("org.jetbrains.kotlin.kapt") version "1.8.22"
}

dependencies {
kaptTest(platform(mn.micronaut.core.bom))
kaptTest(mn.micronaut.inject.java)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import java.util.*

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest : TestPropertyProvider {

companion object {
var MY_KAFKA: KafkaContainer
init {
MY_KAFKA = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
MY_KAFKA.start()
}
}

override fun getProperties(): MutableMap<String, String> {
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.micronaut.kafka.docs

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.*
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.*
import java.util.concurrent.*

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MyTest : AbstractKafkaTest() {
@Test
fun testKafkaRunning(producer: MyProducer, consumer: MyConsumer) {
val message = "hello"
producer.produce(message)
await().atMost(5, TimeUnit.SECONDS)
.until(Callable<Boolean> { message == consumer.consumed })
MY_KAFKA.stop()
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
fun produce(message: String)
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class MyConsumer {
var consumed: String? = null

@Topic("my-topic")
fun consume(message: String) {
consumed = message
}
}
}
17 changes: 17 additions & 0 deletions test-suite/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
java
}

dependencies {
testAnnotationProcessor(platform(mn.micronaut.core.bom))
testAnnotationProcessor(mn.micronaut.inject.java)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.micronaut.kafka.docs;

import io.micronaut.test.support.TestPropertyProvider;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.*;

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest implements TestPropertyProvider {

static final KafkaContainer MY_KAFKA;

static {
MY_KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest"));
MY_KAFKA.start();
}

@Override
public Map<String, String> getProperties() {
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
);
}
}
39 changes: 39 additions & 0 deletions test-suite/src/test/java/io/micronaut/kafka/docs/MyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.micronaut.kafka.docs;

import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.*;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.*;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest extends AbstractKafkaTest {
@Test
void testKafkaRunning(MyProducer producer, MyConsumer consumer) {
final String message = "hello";
producer.produce(message);
await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed));
MY_KAFKA.stop();
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
void produce(String message);
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed;
@Topic("my-topic")
public void consume(String message) {
consumed = message;
}
}
}