Skip to content

Commit

Permalink
Ensure kafka configuration remains serializable (#7754)
Browse files Browse the repository at this point in the history
Resolves
#7597
I wasn't able to reproduce this. Figuring out how to run beam, flink and
kafka together feels like too much effort. Without reproducing it is too
hard to tell why the configuration is serialized, but my hunch is that
it is enough to ensure that the configuration can be serialized.
  • Loading branch information
laurit authored Feb 9, 2023
1 parent 1d3752f commit 5d18255
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
Expand Down Expand Up @@ -75,7 +76,8 @@ public static void enhanceConfig(Map<? super String, Object> config) {
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> class1 + "," + class2);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get());
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Collections;
Expand Down Expand Up @@ -162,7 +163,9 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(openTelemetry));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
KafkaTelemetryBuilder.INSTRUMENTATION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -37,21 +42,21 @@ void badConfig() {
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
.hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo");
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
"Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
Expand All @@ -77,21 +82,21 @@ void badConfig() {
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
.hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier");
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo");
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
"Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier");
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
Expand All @@ -113,4 +118,23 @@ void badConfig() {
.hasRootCauseMessage(
"Configuration property opentelemetry.instrumentation_name is not instance of String");
}

@Test
void serializableConfig() throws IOException, ClassNotFoundException {
testSerialize(producerConfig());
testSerialize(consumerConfig());
}

@SuppressWarnings("unchecked")
private static Map<String, Object> testSerialize(Map<String, Object> map)
throws IOException, ClassNotFoundException {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
outputStream.writeObject(map);
}
try (ObjectInputStream inputStream =
new ObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
return (Map<String, Object>) inputStream.readObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public final class OpenTelemetryMetricsReporter implements MetricsReporter {

public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance";
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";
public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME =
"opentelemetry.instrumentation_name";

Expand Down Expand Up @@ -150,8 +150,9 @@ private static void closeInstrument(AutoCloseable observable) {

@Override
public void configure(Map<String, ?> configs) {
OpenTelemetry openTelemetry =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class);
OpenTelemetrySupplier openTelemetrySupplier =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_SUPPLIER, OpenTelemetrySupplier.class);
OpenTelemetry openTelemetry = openTelemetrySupplier.get();
String instrumentationName =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class);
String instrumentationVersion =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Wrapper for OpenTelemetry that can be injected into kafka configuration without breaking
* serialization. https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7597
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class OpenTelemetrySupplier implements Supplier<OpenTelemetry>, Serializable {
private static final long serialVersionUID = 1L;
private final transient OpenTelemetry openTelemetry;

public OpenTelemetrySupplier(OpenTelemetry openTelemetry) {
Objects.requireNonNull(openTelemetry);
this.openTelemetry = openTelemetry;
}

@Override
public OpenTelemetry get() {
return openTelemetry;
}
}

0 comments on commit 5d18255

Please sign in to comment.