Skip to content

Commit

Permalink
Test latest version of kafka client and streams (#3803)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Aug 11, 2021
1 parent 421fec4 commit d9080a7
Show file tree
Hide file tree
Showing 12 changed files with 1,029 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ dependencies {
testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
testLibrary("org.assertj:assertj-core")
testImplementation("org.mockito:mockito-core")

// Include latest version of kafka itself along with latest version of client libs.
// This seems to help with jar compatibility hell.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,33 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)

abstract containerProperties()

Map<String, Object> senderProps() {
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
}

Map<String, Object> consumerProps(String group, String autoCommit) {
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
}

void waitForAssignment(Object container) {
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
}

def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
producerFactory.stop()
}

@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -62,7 +80,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String message = "Testing without headers"
Expand All @@ -75,7 +93,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
received.headers().iterator().hasNext() == propagationEnabled

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils

class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {

def "should not read remote context when consuming messages if propagation is disabled"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

Expand Down Expand Up @@ -90,13 +88,13 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// create a Kafka consumer factory
Expand All @@ -120,11 +118,11 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)
container
}


@Override
def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils

class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -61,7 +60,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String greeting = "Hello Spring Kafka Sender!"
Expand Down Expand Up @@ -128,12 +127,12 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

def "test spring kafka template produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -159,7 +158,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String greeting = "Hello Spring Kafka Sender!"
Expand Down Expand Up @@ -218,18 +217,18 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

def "test pass through tombstone"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -255,7 +254,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
kafkaTemplate.send(SHARED_TOPIC, null)
Expand Down Expand Up @@ -301,7 +300,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

Expand All @@ -310,11 +309,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
// set up the Kafka consumer properties
def kafkaPartition = 0
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def consumer = new KafkaConsumer<String, String>(consumerProperties)
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producer = new KafkaProducer(senderProps)
consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
Expand Down Expand Up @@ -378,7 +377,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// create a Kafka consumer factory
Expand All @@ -402,11 +401,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)
container
}
@Override
def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id("otel.javaagent-testing")
}

dependencies {
library("org.apache.kafka:kafka-clients:2.4.0")

testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))

testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")
testLibrary("org.springframework:spring-core:5.2.9.RELEASE")
testImplementation("javax.xml.bind:jaxb-api:2.2.3")

latestDepTestLibrary("org.apache.kafka:kafka_2.13:+")
}

tasks {
withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}

val testPropagationDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaClientPropagationDisabledTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaClientPropagationDisabledTest.*")
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
}

named<Test>("test") {
dependsOn(testPropagationDisabled)
filter {
excludeTestsMatching("KafkaClientPropagationDisabledTest")
isFailOnNoMatchingTests = false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll

abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {

protected static final SHARED_TOPIC = "shared.topic"

private static final boolean propagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))

@Rule
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC)

abstract containerProperties()

Map<String, Object> senderProps() {
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
}

Map<String, Object> consumerProps(String group, String autoCommit) {
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
}

void waitForAssignment(Object container) {
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
}

def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
producerFactory.destroy()
}

@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)

// set the topic that needs to be consumed
def containerProperties = containerProperties()

// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)

// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()

// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})

// start the container and underlying message listener
container.start()

// wait until the container has the required number of assigned partitions
waitForAssignment(container)

when:
String message = "Testing without headers"
kafkaTemplate.send(SHARED_TOPIC, message)

then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)

received.headers().iterator().hasNext() == propagationEnabled

cleanup:
stopProducerFactory(producerFactory)
container?.stop()
}
}
Loading

0 comments on commit d9080a7

Please sign in to comment.