diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 5a72bca9dee9..129b10886359 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -9,6 +9,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; @@ -75,7 +76,8 @@ public static void enhanceConfig(Map config) { OpenTelemetryMetricsReporter.class.getName(), (class1, class2) -> class1 + "," + class2); config.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get()); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, + new OpenTelemetrySupplier(GlobalOpenTelemetry.get())); config.put( OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, INSTRUMENTATION_NAME); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java index 5531d1b145c6..6c2a8e093348 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Proxy; import java.util.Collections; @@ -162,7 +163,9 @@ public Consumer wrap(Consumer consumer) { config.put( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, OpenTelemetryMetricsReporter.class.getName()); - config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry); + config.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, + new OpenTelemetrySupplier(openTelemetry)); config.put( OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, KafkaTelemetryBuilder.INSTRUMENTATION_NAME); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java index 578ab3824d75..244a6426aaaf 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java @@ -10,6 +10,11 @@ import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -37,21 +42,21 @@ void badConfig() { assertThatThrownBy( () -> { Map producerConfig = producerConfig(); - producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); new KafkaProducer<>(producerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) - .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + .hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier"); assertThatThrownBy( () -> { Map producerConfig = producerConfig(); producerConfig.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo"); new KafkaProducer<>(producerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( - "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + "Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier"); assertThatThrownBy( () -> { Map producerConfig = producerConfig(); @@ -77,21 +82,21 @@ void badConfig() { assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); - consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); new KafkaConsumer<>(consumerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) - .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + .hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier"); assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); consumerConfig.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo"); new KafkaConsumer<>(consumerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( - "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + "Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier"); assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); @@ -113,4 +118,23 @@ void badConfig() { .hasRootCauseMessage( "Configuration property opentelemetry.instrumentation_name is not instance of String"); } + + @Test + void serializableConfig() throws IOException, ClassNotFoundException { + testSerialize(producerConfig()); + testSerialize(consumerConfig()); + } + + @SuppressWarnings("unchecked") + private static Map testSerialize(Map map) + throws IOException, ClassNotFoundException { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) { + outputStream.writeObject(map); + } + try (ObjectInputStream inputStream = + new ObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) { + return (Map) inputStream.readObject(); + } + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java index baf6e11c1960..1c98642c3a4b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java @@ -35,7 +35,7 @@ */ public final class OpenTelemetryMetricsReporter implements MetricsReporter { - public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance"; + public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier"; public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME = "opentelemetry.instrumentation_name"; @@ -150,8 +150,9 @@ private static void closeInstrument(AutoCloseable observable) { @Override public void configure(Map configs) { - OpenTelemetry openTelemetry = - getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class); + OpenTelemetrySupplier openTelemetrySupplier = + getProperty(configs, CONFIG_KEY_OPENTELEMETRY_SUPPLIER, OpenTelemetrySupplier.class); + OpenTelemetry openTelemetry = openTelemetrySupplier.get(); String instrumentationName = getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class); String instrumentationVersion = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java new file mode 100644 index 000000000000..b3809370f902 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import io.opentelemetry.api.OpenTelemetry; +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Wrapper for OpenTelemetry that can be injected into kafka configuration without breaking + * serialization. https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7597 + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OpenTelemetrySupplier implements Supplier, Serializable { + private static final long serialVersionUID = 1L; + private final transient OpenTelemetry openTelemetry; + + public OpenTelemetrySupplier(OpenTelemetry openTelemetry) { + Objects.requireNonNull(openTelemetry); + this.openTelemetry = openTelemetry; + } + + @Override + public OpenTelemetry get() { + return openTelemetry; + } +}