Skip to content

Commit

Permalink
Fix instrumentation for reactor kafka 1.3.21 (#9445)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Sep 12, 2023
1 parent 7e16e40 commit f0533ae
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies {

testImplementation("com.fasterxml.jackson.core:jackson-databind:2.14.2")
testImplementation("org.testcontainers:elasticsearch")

latestDepTestLibrary("co.elastic.clients:elasticsearch-java:8.0.+")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation(project(":instrumentation:reactor:reactor-3.1:library"))

// using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3
// using 1.3 to be able to implement several new KafkaReceiver methods added in 1.3.3 and 1.3.21
// @NoMuzzle is used to ensure that this does not break muzzle checks
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3")
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.21")

testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))
Expand Down Expand Up @@ -60,6 +60,27 @@ testing {
}
}
}

val testV1_3_21 by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))

if (testLatestDeps) {
implementation("io.projectreactor.kafka:reactor-kafka:+")
implementation("io.projectreactor:reactor-core:3.4.+")
} else {
implementation("io.projectreactor.kafka:reactor-kafka:1.3.21")
}
}

targets {
all {
testTask.configure {
systemProperty("hasConsumerGroupAndId", true)
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public InstrumentedKafkaReceiver(KafkaReceiver<K, V> actual) {
// added in 1.3.3
@Override
public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
return wrap(KafkaReceiver133Access.receive(actual, prefetch));
return wrap(KafkaReceiver13Access.receive(actual, prefetch));
}

@Override
Expand All @@ -36,7 +36,7 @@ public Flux<ReceiverRecord<K, V>> receive() {
// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return KafkaReceiver133Access.receiveAutoAck(actual, prefetch)
return KafkaReceiver13Access.receiveAutoAck(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

Expand All @@ -48,7 +48,7 @@ public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
// added in 1.3.3
@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch));
return wrap(KafkaReceiver13Access.receiveAtmostOnce(actual, prefetch));
}

@Override
Expand All @@ -66,7 +66,7 @@ public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager, Integer prefetch) {
return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch)
return KafkaReceiver13Access.receiveExactlyOnce(actual, transactionManager, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

Expand All @@ -75,6 +75,19 @@ public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function)
return actual.doOnConsumer(function);
}

// added in 1.3.21
@Override
public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(Integer prefetch) {
return KafkaReceiver13Access.receiveBatch(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}

// added in 1.3.21
@Override
public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch() {
return KafkaReceiver13Access.receiveBatch(actual).map(InstrumentedKafkaReceiver::wrap);
}

private static <K, V, R extends ConsumerRecord<K, V>> Flux<R> wrap(Flux<R> flux) {
return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;

final class KafkaReceiver133Access {
final class KafkaReceiver13Access {

@NoMuzzle
static <K, V> Flux<ReceiverRecord<K, V>> receive(KafkaReceiver<K, V> receiver, Integer prefetch) {
Expand All @@ -37,5 +37,16 @@ static <K, V> Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
return receiver.receiveExactlyOnce(transactionManager, prefetch);
}

private KafkaReceiver133Access() {}
@NoMuzzle
static <K, V> Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveBatch(prefetch);
}

@NoMuzzle
static <K, V> Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(KafkaReceiver<K, V> receiver) {
return receiver.receiveBatch();
}

private KafkaReceiver13Access() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;

import org.junit.jupiter.api.Test;

class ReactorKafka1321InstrumentationTest extends AbstractReactorKafkaTest {

@Test
void receiveBatch() {
testSingleRecordProcess(
recordConsumer ->
receiver
.receiveBatch()
.concatMap(r -> r)
.doOnNext(r -> r.receiverOffset().acknowledge())
.subscribe(recordConsumer));
}

@Test
void receiveBatchWithSize() {
testSingleRecordProcess(
recordConsumer ->
receiver
.receiveBatch(1)
.concatMap(r -> r)
.doOnNext(r -> r.receiverOffset().acknowledge())
.subscribe(recordConsumer));
}
}

0 comments on commit f0533ae

Please sign in to comment.