diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java index 62cf703a80aa..5230b4782171 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; +import java.lang.reflect.Proxy; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -75,13 +76,46 @@ private TextMapPropagator propagator() { } /** Returns a decorated {@link Producer} that emits spans for each sent message. */ + @SuppressWarnings("unchecked") public Producer wrap(Producer producer) { - return new TracingProducer<>(producer, this); + return (Producer) + Proxy.newProxyInstance( + KafkaTelemetry.class.getClassLoader(), + new Class[] {Producer.class}, + (proxy, method, args) -> { + // Future send(ProducerRecord record) + // Future send(ProducerRecord record, Callback callback) + if ("send".equals(method.getName()) + && method.getParameterCount() >= 1 + && method.getParameterTypes()[0] == ProducerRecord.class) { + ProducerRecord record = (ProducerRecord) args[0]; + Callback callback = + method.getParameterCount() >= 2 + && method.getParameterTypes()[1] == Callback.class + ? (Callback) args[1] + : null; + return buildAndInjectSpan(record, callback, producer::send); + } + return method.invoke(producer, args); + }); } /** Returns a decorated {@link Consumer} that consumes spans for each received message. */ + @SuppressWarnings("unchecked") public Consumer wrap(Consumer consumer) { - return new TracingConsumer<>(consumer, this); + return (Consumer) + Proxy.newProxyInstance( + KafkaTelemetry.class.getClassLoader(), + new Class[] {Consumer.class}, + (proxy, method, args) -> { + Object result = method.invoke(consumer, args); + // ConsumerRecords poll(long timeout) + // ConsumerRecords poll(Duration duration) + if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) { + buildAndFinishSpan((ConsumerRecords) result); + } + return result; + }); } /** diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumer.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumer.java deleted file mode 100644 index 857f438d60b3..000000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumer.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients; - -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -class TracingConsumer implements Consumer { - private final Consumer consumer; - private final KafkaTelemetry telemetry; - - TracingConsumer(Consumer consumer, KafkaTelemetry telemetry) { - this.consumer = consumer; - this.telemetry = telemetry; - } - - @Override - public Set assignment() { - return consumer.assignment(); - } - - @Override - public Set subscription() { - return consumer.subscription(); - } - - @Override - public void subscribe(Collection topics, ConsumerRebalanceListener listener) { - consumer.subscribe(topics, listener); - } - - @Override - public void subscribe(Collection topics) { - consumer.subscribe(topics); - } - - @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { - consumer.subscribe(pattern, listener); - } - - @Override - public void subscribe(Pattern pattern) { - consumer.subscribe(pattern); - } - - @Override - public void unsubscribe() { - consumer.unsubscribe(); - } - - @Override - public void assign(Collection partitions) { - consumer.assign(partitions); - } - - @Override - @Deprecated - public ConsumerRecords poll(long timeout) { - return poll(Duration.ofMillis(timeout)); - } - - @Override - public ConsumerRecords poll(Duration duration) { - ConsumerRecords records = consumer.poll(duration); - telemetry.buildAndFinishSpan(records); - return records; - } - - @Override - public void commitSync() { - consumer.commitSync(); - } - - @Override - public void commitSync(Duration duration) { - consumer.commitSync(duration); - } - - @Override - public void commitSync(Map offsets) { - consumer.commitSync(offsets); - } - - @Override - public void commitSync(Map map, Duration duration) { - consumer.commitSync(map, duration); - } - - @Override - public void commitAsync() { - consumer.commitAsync(); - } - - @Override - public void commitAsync(OffsetCommitCallback callback) { - consumer.commitAsync(callback); - } - - @Override - public void commitAsync( - Map offsets, OffsetCommitCallback callback) { - consumer.commitAsync(offsets, callback); - } - - @Override - public void seek(TopicPartition partition, long offset) { - consumer.seek(partition, offset); - } - - @Override - public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) { - consumer.seek(partition, offsetAndMetadata); - } - - @Override - public void seekToBeginning(Collection partitions) { - consumer.seekToBeginning(partitions); - } - - @Override - public void seekToEnd(Collection partitions) { - consumer.seekToEnd(partitions); - } - - @Override - public long position(TopicPartition partition) { - return consumer.position(partition); - } - - @Override - public long position(TopicPartition topicPartition, Duration duration) { - return consumer.position(topicPartition, duration); - } - - @Override - @Deprecated - public OffsetAndMetadata committed(TopicPartition partition) { - return consumer.committed(partition); - } - - @Override - @Deprecated - public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) { - return consumer.committed(topicPartition, duration); - } - - @Override - public Map committed(Set partitions) { - return consumer.committed(partitions); - } - - @Override - public Map committed( - Set partitions, Duration timeout) { - return consumer.committed(partitions, timeout); - } - - @Override - public Map metrics() { - return consumer.metrics(); - } - - @Override - public List partitionsFor(String topic) { - return consumer.partitionsFor(topic); - } - - @Override - public List partitionsFor(String s, Duration duration) { - return consumer.partitionsFor(s, duration); - } - - @Override - public Map> listTopics() { - return consumer.listTopics(); - } - - @Override - public Map> listTopics(Duration duration) { - return consumer.listTopics(duration); - } - - @Override - public void pause(Collection partitions) { - consumer.pause(partitions); - } - - @Override - public void resume(Collection partitions) { - consumer.resume(partitions); - } - - @Override - public Set paused() { - return consumer.paused(); - } - - @Override - public Map offsetsForTimes( - Map timestampsToSearch) { - return consumer.offsetsForTimes(timestampsToSearch); - } - - @Override - public Map offsetsForTimes( - Map map, Duration duration) { - return consumer.offsetsForTimes(map, duration); - } - - @Override - public Map beginningOffsets(Collection partitions) { - return consumer.beginningOffsets(partitions); - } - - @Override - public Map beginningOffsets( - Collection collection, Duration duration) { - return consumer.beginningOffsets(collection, duration); - } - - @Override - public Map endOffsets(Collection partitions) { - return consumer.endOffsets(partitions); - } - - @Override - public Map endOffsets( - Collection collection, Duration duration) { - return consumer.endOffsets(collection, duration); - } - - @Override - public ConsumerGroupMetadata groupMetadata() { - return consumer.groupMetadata(); - } - - @Override - public void enforceRebalance() { - consumer.enforceRebalance(); - } - - @Override - public void close() { - consumer.close(); - } - - @Override - @Deprecated - public void close(long l, TimeUnit timeUnit) { - consumer.close(l, timeUnit); - } - - @Override - public void close(Duration duration) { - consumer.close(duration); - } - - @Override - public void wakeup() { - consumer.wakeup(); - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducer.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducer.java deleted file mode 100644 index f6c6551b01b1..000000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducer.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients; - -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -class TracingProducer implements Producer { - private final Producer producer; - private final KafkaTelemetry telemetry; - - TracingProducer(Producer producer, KafkaTelemetry telemetry) { - this.producer = producer; - this.telemetry = telemetry; - } - - @Override - public void initTransactions() { - producer.initTransactions(); - } - - @Override - public void beginTransaction() { - producer.beginTransaction(); - } - - @Override - public void sendOffsetsToTransaction( - Map offsets, String consumerGroupId) { - producer.sendOffsetsToTransaction(offsets, consumerGroupId); - } - - @Override - public void sendOffsetsToTransaction( - Map offsets, ConsumerGroupMetadata groupMetadata) { - producer.sendOffsetsToTransaction(offsets, groupMetadata); - } - - @Override - public void commitTransaction() { - producer.commitTransaction(); - } - - @Override - public void abortTransaction() { - producer.abortTransaction(); - } - - @Override - public Future send(ProducerRecord record) { - return send(record, null); - } - - @Override - public Future send(ProducerRecord record, Callback callback) { - return telemetry.buildAndInjectSpan(record, callback, producer::send); - } - - @Override - public void flush() { - producer.flush(); - } - - @Override - public List partitionsFor(String topic) { - return producer.partitionsFor(topic); - } - - @Override - public Map metrics() { - return producer.metrics(); - } - - @Override - public void close() { - producer.close(); - } - - @Override - public void close(Duration duration) { - producer.close(duration); - } -}