Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test latest version of kafka client and streams #3803

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ dependencies {
testLibrary("org.springframework.kafka:spring-kafka-test:1.3.3.RELEASE")
testImplementation("javax.xml.bind:jaxb-api:2.2.3")
testLibrary("org.assertj:assertj-core")
testImplementation("org.mockito:mockito-core")

// Include latest version of kafka itself along with latest version of client libs.
// This seems to help with jar compatibility hell.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,33 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)

abstract containerProperties()

Map<String, Object> senderProps() {
return KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
}

Map<String, Object> consumerProps(String group, String autoCommit) {
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka)
}

void waitForAssignment(Object container) {
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
}

def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
producerFactory.stop()
}

@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -62,7 +80,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String message = "Testing without headers"
Expand All @@ -75,7 +93,7 @@ abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {
received.headers().iterator().hasNext() == propagationEnabled

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils

class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {

def "should not read remote context when consuming messages if propagation is disabled"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

Expand Down Expand Up @@ -90,13 +88,13 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// create a Kafka consumer factory
Expand All @@ -120,11 +118,11 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)
container
}


@Override
def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils

class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

def "test kafka produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -61,7 +60,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String greeting = "Hello Spring Kafka Sender!"
Expand Down Expand Up @@ -128,12 +127,12 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

def "test spring kafka template produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -159,7 +158,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
String greeting = "Hello Spring Kafka Sender!"
Expand Down Expand Up @@ -218,18 +217,18 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

def "test pass through tombstone"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
Expand All @@ -255,7 +254,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)

when:
kafkaTemplate.send(SHARED_TOPIC, null)
Expand Down Expand Up @@ -301,7 +300,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
}

cleanup:
producerFactory.stop()
stopProducerFactory(producerFactory)
container?.stop()
}

Expand All @@ -310,11 +309,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

// set up the Kafka consumer properties
def kafkaPartition = 0
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
def consumerProperties = consumerProps("sender", "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def consumer = new KafkaConsumer<String, String>(consumerProperties)

def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def senderProps = senderProps()
def producer = new KafkaProducer(senderProps)

consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))
Expand Down Expand Up @@ -378,7 +377,7 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {

protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
Map<String, Object> consumerProperties = consumerProps(groupId, "false")
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// create a Kafka consumer factory
Expand All @@ -402,11 +401,11 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
waitForAssignment(container)
container
}


@Override
def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id("otel.javaagent-testing")
}

dependencies {
library("org.apache.kafka:kafka-clients:2.4.0")

testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))

testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")
testLibrary("org.springframework:spring-core:5.2.9.RELEASE")
testImplementation("javax.xml.bind:jaxb-api:2.2.3")

latestDepTestLibrary("org.apache.kafka:kafka_2.13:+")
}

tasks {
withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
}

val testPropagationDisabled by registering(Test::class) {
filter {
includeTestsMatching("KafkaClientPropagationDisabledTest")
isFailOnNoMatchingTests = false
}
include("**/KafkaClientPropagationDisabledTest.*")
jvmArgs("-Dotel.instrumentation.kafka.client-propagation.enabled=false")
}

named<Test>("test") {
dependsOn(testPropagationDisabled)
filter {
excludeTestsMatching("KafkaClientPropagationDisabledTest")
isFailOnNoMatchingTests = false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll

abstract class KafkaClientBaseTest extends AgentInstrumentationSpecification {

protected static final SHARED_TOPIC = "shared.topic"

private static final boolean propagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.client-propagation.enabled", "true"))

@Rule
EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, SHARED_TOPIC)

abstract containerProperties()

Map<String, Object> senderProps() {
return KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka().getBrokersAsString())
}

Map<String, Object> consumerProps(String group, String autoCommit) {
return KafkaTestUtils.consumerProps(group, autoCommit, embeddedKafka.getEmbeddedKafka())
}

void waitForAssignment(Object container) {
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic())
}

def stopProducerFactory(DefaultKafkaProducerFactory producerFactory) {
producerFactory.destroy()
}

@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = senderProps()
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = consumerProps("sender", "false")

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)

// set the topic that needs to be consumed
def containerProperties = containerProperties()

// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)

// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()

// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})

// start the container and underlying message listener
container.start()

// wait until the container has the required number of assigned partitions
waitForAssignment(container)

when:
String message = "Testing without headers"
kafkaTemplate.send(SHARED_TOPIC, message)

then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)

received.headers().iterator().hasNext() == propagationEnabled

cleanup:
stopProducerFactory(producerFactory)
container?.stop()
}
}
Loading