Skip to content

Commit

Permalink
Second attempt at fixing serializing kafka configuration (#7789)
Browse files Browse the repository at this point in the history
Hopefully resolves
#7597
Without reproducing the issue it is hard to tell whether this will help.
Another issue that could arise is that we add our metrics class in
`metric.reporters` property which will probably break if this
configuration is used to build consumer or producer after deserializing
as our classes don't seem to be available there. If this fails we'll
need to ask the issue reporter for instructions how to reproduce and
find a different strategy for fixing this.
  • Loading branch information
laurit authored Feb 13, 2023
1 parent 54d7241 commit 39e7ed4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.kafka.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
Expand All @@ -13,8 +14,10 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -126,15 +129,39 @@ void serializableConfig() throws IOException, ClassNotFoundException {
}

@SuppressWarnings("unchecked")
private static Map<String, Object> testSerialize(Map<String, Object> map)
private static void testSerialize(Map<String, Object> map)
throws IOException, ClassNotFoundException {
OpenTelemetrySupplier supplier =
(OpenTelemetrySupplier)
map.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
assertThat(supplier).isNotNull();
assertThat(supplier.get()).isNotNull();
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
outputStream.writeObject(map);
}

class CustomObjectInputStream extends ObjectInputStream {
CustomObjectInputStream(InputStream inputStream) throws IOException {
super(inputStream);
}

@Override
protected Class<?> resolveClass(ObjectStreamClass desc)
throws IOException, ClassNotFoundException {
if (desc.getName().startsWith("io.opentelemetry.")) {
throw new IllegalStateException(
"Serial form contains opentelemetry class " + desc.getName());
}
return super.resolveClass(desc);
}
}

try (ObjectInputStream inputStream =
new ObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
return (Map<String, Object>) inputStream.readObject();
new CustomObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
Map<String, Object> result = (Map<String, Object>) inputStream.readObject();
assertThat(result.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER))
.isNull();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public OpenTelemetrySupplier(OpenTelemetry openTelemetry) {
public OpenTelemetry get() {
return openTelemetry;
}

private Object writeReplace() {
// serialize this object to null
return null;
}
}

0 comments on commit 39e7ed4

Please sign in to comment.