Skip to content

Commit

Permalink
Add Kafka instrumentation to the Spring Boot starter (#6371)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored Jul 28, 2022
1 parent 561ce5e commit 54b8b6a
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dependencies {
implementation("javax.validation:validation-api:2.0.1.Final")

implementation(project(":instrumentation-annotations-support"))
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library"))
implementation(project(":instrumentation:spring:spring-kafka-2.7:library"))
implementation(project(":instrumentation:spring:spring-web-3.1:library"))
implementation(project(":instrumentation:spring:spring-webmvc-3.1:library"))
implementation(project(":instrumentation:spring:spring-webflux-5.0:library"))
Expand All @@ -22,6 +24,7 @@ dependencies {
exclude("io.micrometer", "micrometer-core")
}

compileOnly("org.springframework.kafka:spring-kafka:2.7.1")
compileOnly("org.springframework.boot:spring-boot-starter-actuator:$springBootVersion")
compileOnly("org.springframework.boot:spring-boot-starter-aop:$springBootVersion")
compileOnly("org.springframework.boot:spring-boot-starter-web:$springBootVersion")
Expand All @@ -38,13 +41,15 @@ dependencies {
compileOnly("io.opentelemetry:opentelemetry-exporter-zipkin")
compileOnly(project(":instrumentation-annotations"))

testImplementation("org.springframework.kafka:spring-kafka:2.7.1")
testImplementation("org.springframework.boot:spring-boot-starter-actuator:$springBootVersion")
testImplementation("org.springframework.boot:spring-boot-starter-aop:$springBootVersion")
testImplementation("org.springframework.boot:spring-boot-starter-webflux:$springBootVersion")
testImplementation("org.springframework.boot:spring-boot-starter-web:$springBootVersion")
testImplementation("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude("org.junit.vintage", "junit-vintage-engine")
}
testImplementation("org.testcontainers:kafka")

testImplementation(project(":testing-common"))
testImplementation("io.opentelemetry:opentelemetry-sdk")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.spring.autoconfigure.kafka;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {

private final OpenTelemetry openTelemetry;

ConcurrentKafkaListenerContainerFactoryPostProcessor(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) {
return bean;
}

ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
SpringKafkaTelemetry springKafkaTelemetry = SpringKafkaTelemetry.create(openTelemetry);
listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());

return listenerContainerFactory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.spring.autoconfigure.kafka;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
@EnableConfigurationProperties(KafkaInstrumentationProperties.class)
@ConditionalOnProperty(name = "otel.springboot.kafka.enabled", matchIfMissing = true)
@ConditionalOnClass({KafkaTemplate.class, ConcurrentKafkaListenerContainerFactory.class})
public class KafkaInstrumentationAutoConfiguration {

@Bean
public DefaultKafkaProducerFactoryCustomizer producerInstrumentation(
OpenTelemetry openTelemetry) {
KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(openTelemetry);
return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap);
}

@Bean
public ConcurrentKafkaListenerContainerFactoryPostProcessor
concurrentKafkaListenerContainerFactoryPostProcessor(OpenTelemetry openTelemetry) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(openTelemetry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.spring.autoconfigure.kafka;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "otel.springboot.kafka")
public class KafkaInstrumentationProperties {

private boolean enabled = true;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ io.opentelemetry.instrumentation.spring.autoconfigure.exporters.otlp.OtlpSpanExp
io.opentelemetry.instrumentation.spring.autoconfigure.exporters.zipkin.ZipkinSpanExporterAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.resttemplate.RestTemplateAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.webclient.WebClientAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.kafka.KafkaInstrumentationAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.metrics.MicrometerShimAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.propagators.PropagationAutoConfiguration,\
io.opentelemetry.instrumentation.spring.autoconfigure.resources.OtelResourceAutoConfiguration,\
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.spring.autoconfigure.kafka;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

class KafkaIntegrationTest {

@RegisterExtension
static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension.create();

static KafkaContainer kafka;

private ApplicationContextRunner contextRunner;

@BeforeAll
static void setUpKafka() {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
}

@AfterAll
static void tearDownKafka() {
kafka.stop();
}

@BeforeEach
void setUpContext() {
contextRunner =
new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(
KafkaAutoConfiguration.class,
KafkaInstrumentationAutoConfiguration.class,
TestConfig.class))
.withBean("openTelemetry", OpenTelemetry.class, testing::getOpenTelemetry)
.withPropertyValues(
"spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.linger-ms=10",
"spring.kafka.listener.idle-between-polls=1000",
"spring.kafka.producer.transaction-id-prefix=test-");
}

@Test
void shouldInstrumentProducerAndConsumer() {
contextRunner.run(KafkaIntegrationTest::runShouldInstrumentProducerAndConsumer);
}

@SuppressWarnings("unchecked")
private static void runShouldInstrumentProducerAndConsumer(
ConfigurableApplicationContext applicationContext) {
KafkaTemplate<String, String> kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);

testing.runWithSpan(
"producer",
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testTopic", "10", "testSpan");
return 0;
});
});

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
span ->
span.hasName("testTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative)),
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
}

@Configuration
static class TestConfig {

@Bean
public NewTopic testTopic() {
return TopicBuilder.name("testTopic").partitions(1).replicas(1).build();
}

@KafkaListener(id = "testListener", topics = "testTopic")
public void listener(ConsumerRecord<String, String> record) {
testing.runWithSpan("consumer", () -> {});
}
}
}

0 comments on commit 54b8b6a

Please sign in to comment.