diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java index a8342732177d..1ecb17c1ff77 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java @@ -59,7 +59,7 @@ public Flux> receiveAtmostOnce() { @Override public Flux>> receiveExactlyOnce( TransactionManager transactionManager) { - return actual.receiveAutoAck().map(InstrumentedKafkaReceiver::wrap); + return actual.receiveExactlyOnce(transactionManager).map(InstrumentedKafkaReceiver::wrap); } // added in 1.3.3