From 5d182557a164d3adce3c6683a728dd8575c7261e Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 9 Feb 2023 08:45:39 +0200 Subject: [PATCH] Ensure kafka configuration remains serializable (#7754) Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7597 I wasn't able to reproduce this. Figuring out how to run beam, flink and kafka together feels like too much effort. Without reproducing it is too hard to tell why the configuration is serialized, but my hunch is that it is enough to ensure that the configuration can be serialized. --- .../kafkaclients/KafkaSingletons.java | 4 +- .../kafkaclients/KafkaTelemetry.java | 5 ++- .../OpenTelemetryMetricsReporterTest.java | 40 +++++++++++++++---- .../OpenTelemetryMetricsReporter.java | 7 ++-- .../kafka/internal/OpenTelemetrySupplier.java | 33 +++++++++++++++ 5 files changed, 76 insertions(+), 13 deletions(-) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 5a72bca9dee9..129b10886359 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -9,6 +9,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; @@ -75,7 +76,8 @@ public static void enhanceConfig(Map config) { OpenTelemetryMetricsReporter.class.getName(), (class1, class2) -> class1 + "," + class2); config.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get()); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, + new OpenTelemetrySupplier(GlobalOpenTelemetry.get())); config.put( OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, INSTRUMENTATION_NAME); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java index 5531d1b145c6..6c2a8e093348 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Proxy; import java.util.Collections; @@ -162,7 +163,9 @@ public Consumer wrap(Consumer consumer) { config.put( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, OpenTelemetryMetricsReporter.class.getName()); - config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry); + config.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, + new OpenTelemetrySupplier(openTelemetry)); config.put( OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, KafkaTelemetryBuilder.INSTRUMENTATION_NAME); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java index 578ab3824d75..244a6426aaaf 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java @@ -10,6 +10,11 @@ import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -37,21 +42,21 @@ void badConfig() { assertThatThrownBy( () -> { Map producerConfig = producerConfig(); - producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); new KafkaProducer<>(producerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) - .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + .hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier"); assertThatThrownBy( () -> { Map producerConfig = producerConfig(); producerConfig.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo"); new KafkaProducer<>(producerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( - "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + "Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier"); assertThatThrownBy( () -> { Map producerConfig = producerConfig(); @@ -77,21 +82,21 @@ void badConfig() { assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); - consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE); + consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER); new KafkaConsumer<>(consumerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) - .hasRootCauseMessage("Missing required configuration property: opentelemetry.instance"); + .hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier"); assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); consumerConfig.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo"); + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo"); new KafkaConsumer<>(consumerConfig).close(); }) .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( - "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + "Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier"); assertThatThrownBy( () -> { Map consumerConfig = consumerConfig(); @@ -113,4 +118,23 @@ void badConfig() { .hasRootCauseMessage( "Configuration property opentelemetry.instrumentation_name is not instance of String"); } + + @Test + void serializableConfig() throws IOException, ClassNotFoundException { + testSerialize(producerConfig()); + testSerialize(consumerConfig()); + } + + @SuppressWarnings("unchecked") + private static Map testSerialize(Map map) + throws IOException, ClassNotFoundException { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) { + outputStream.writeObject(map); + } + try (ObjectInputStream inputStream = + new ObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) { + return (Map) inputStream.readObject(); + } + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java index baf6e11c1960..1c98642c3a4b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java @@ -35,7 +35,7 @@ */ public final class OpenTelemetryMetricsReporter implements MetricsReporter { - public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance"; + public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier"; public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME = "opentelemetry.instrumentation_name"; @@ -150,8 +150,9 @@ private static void closeInstrument(AutoCloseable observable) { @Override public void configure(Map configs) { - OpenTelemetry openTelemetry = - getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class); + OpenTelemetrySupplier openTelemetrySupplier = + getProperty(configs, CONFIG_KEY_OPENTELEMETRY_SUPPLIER, OpenTelemetrySupplier.class); + OpenTelemetry openTelemetry = openTelemetrySupplier.get(); String instrumentationName = getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class); String instrumentationVersion = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java new file mode 100644 index 000000000000..b3809370f902 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetrySupplier.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import io.opentelemetry.api.OpenTelemetry; +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Wrapper for OpenTelemetry that can be injected into kafka configuration without breaking + * serialization. https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7597 + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OpenTelemetrySupplier implements Supplier, Serializable { + private static final long serialVersionUID = 1L; + private final transient OpenTelemetry openTelemetry; + + public OpenTelemetrySupplier(OpenTelemetry openTelemetry) { + Objects.requireNonNull(openTelemetry); + this.openTelemetry = openTelemetry; + } + + @Override + public OpenTelemetry get() { + return openTelemetry; + } +}