From 30493b46d784dab68f1ca466713ec75963056f4d Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 15 Mar 2024 10:50:40 +0100 Subject: [PATCH] Retreive OpenTelemetry instance through CDI injection instead of relying on GlobalOpenTelemetry.get Still falls back to "GlobalOpenTelemetry.get" Fixes #2539 --- .../messaging/amqp/AmqpConnector.java | 9 ++- .../messaging/amqp/AmqpCreditBasedSender.java | 8 ++- .../AmqpOpenTelemetryInstrumenter.java | 16 +++-- .../messaging/kafka/KafkaConnector.java | 11 ++- .../messaging/kafka/impl/KafkaSink.java | 7 +- .../messaging/kafka/impl/KafkaSource.java | 4 +- .../kafka/reply/KafkaRequestReplyFactory.java | 6 +- .../kafka/reply/KafkaRequestReplyImpl.java | 4 +- .../KafkaOpenTelemetryInstrumenter.java | 16 +++-- .../messaging/kafka/KafkaSinkTest.java | 10 ++- .../messaging/kafka/KafkaSourceTest.java | 18 +++-- .../messaging/kafka/SourceCloseTest.java | 6 +- .../ce/KafkaSinkWithCloudEventsTest.java | 68 +++++++++---------- .../KafkaSourceBatchWithCloudEventsTest.java | 18 +++-- .../ce/KafkaSourceWithCloudEventsTest.java | 33 ++++++--- .../kafka/client/ClientTestBase.java | 19 +++--- .../kafka/client/HighLatencyTest.java | 3 +- ...fkaClientReactiveStreamsPublisherTest.java | 4 +- .../kafka/client/LazyInitializedTest.java | 8 ++- .../kafka/client/PauseResumeTest.java | 16 +++-- .../ReactiveKafkaBatchConsumerTest.java | 2 +- .../client/ReactiveKafkaConsumerTest.java | 5 +- .../client/ReactiveKafkaProducerTest.java | 12 ++-- .../commit/BatchCommitStrategiesTest.java | 16 +++-- .../kafka/commit/CommitStrategiesTest.java | 22 ++++-- .../DeprecatedCommitStrategiesTest.java | 22 ++++-- .../commit/FileCheckpointStateStoreTest.java | 2 + .../kafka/commit/KafkaCommitHandlerTest.java | 16 ++--- .../messaging/kafka/commit/RebalanceTest.java | 3 +- .../commit/RedisCheckpointStateStoreTest.java | 2 + .../KeyDeserializerConfigurationTest.java | 27 +++++--- .../serde/SerializerConfigurationTest.java | 10 +-- .../ValueDeserializerConfigurationTest.java | 33 ++++++--- .../messaging/tracing/TracingUtils.java | 11 +++ .../messaging/pulsar/PulsarConnector.java | 8 ++- .../pulsar/PulsarIncomingChannel.java | 38 ++++------- .../pulsar/PulsarOutgoingChannel.java | 39 ++++------- .../PulsarOpenTelemetryInstrumenter.java | 59 ++++++++++++++++ .../pulsar/IncomingMetadataTest.java | 3 +- .../pulsar/OutgoingMetadataTest.java | 8 ++- .../pulsar/PulsarIncomingChannelTest.java | 4 +- .../pulsar/PulsarOutgoingChannelTest.java | 7 +- .../pulsar/SubscriptionTypeTest.java | 4 +- .../messaging/rabbitmq/RabbitMQConnector.java | 8 ++- .../internals/IncomingRabbitMQChannel.java | 6 +- .../internals/OutgoingRabbitMQChannel.java | 8 ++- .../internals/RabbitMQMessageSender.java | 10 ++- .../RabbitMQOpenTelemetryInstrumenter.java | 17 ++--- 48 files changed, 438 insertions(+), 248 deletions(-) create mode 100644 smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java index da838351cd..32546bbdeb 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java @@ -33,6 +33,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; @@ -121,6 +122,9 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt @Any private Instance clientSslContexts; + @Inject + private Instance openTelemetryInstance; + private final List clients = new CopyOnWriteArrayList<>(); /** @@ -230,7 +234,7 @@ public Flow.Publisher> getPublisher(Config config) { AmqpFailureHandler onNack = createFailureHandler(ic); if (tracing && amqpInstrumenter == null) { - amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(); + amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance); } Multi> multi = holder.getOrEstablishConnection() @@ -318,7 +322,8 @@ public Flow.Subscriber> getSubscriber(Config config) { this, holder, oc, - getSender); + getSender, + openTelemetryInstance); processors.put(oc.getChannel(), processor); return MultiUtils.via(processor, m -> m.onFailure().invoke(t -> { diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java index cc766cae36..800e625597 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java @@ -13,8 +13,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.CheckReturnValue; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; @@ -57,7 +60,8 @@ public class AmqpCreditBasedSender implements Processor, Message>, private volatile boolean creditRetrievalInProgress = false; public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder, - AmqpConnectorOutgoingConfiguration configuration, Uni retrieveSender) { + AmqpConnectorOutgoingConfiguration configuration, Uni retrieveSender, + Instance openTelemetryInstance) { this.connector = connector; this.holder = holder; this.retrieveSender = retrieveSender; @@ -75,7 +79,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder, this.retryInterval = configuration.getReconnectInterval(); if (tracingEnabled) { - amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(); + amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance); } else { amqpInstrumenter = null; } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java index 2201be8a2d..49a1b1f542 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/tracing/AmqpOpenTelemetryInstrumenter.java @@ -1,8 +1,10 @@ package io.smallrye.reactive.messaging.amqp.tracing; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; -import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -19,20 +21,20 @@ private AmqpOpenTelemetryInstrumenter(Instrumenter, Void> instrum this.instrumenter = instrumenter; } - public static AmqpOpenTelemetryInstrumenter createForConnector() { - return create(false); + public static AmqpOpenTelemetryInstrumenter createForConnector(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); } - public static AmqpOpenTelemetryInstrumenter createForSender() { - return create(true); + public static AmqpOpenTelemetryInstrumenter createForSender(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); } - private static AmqpOpenTelemetryInstrumenter create(boolean sender) { + private static AmqpOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean sender) { MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE; AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor(); MessagingAttributesGetter, Void> messagingAttributesGetter = amqpAttributesExtractor .getMessagingAttributesGetter(); - InstrumenterBuilder, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(), + InstrumenterBuilder, Void> builder = Instrumenter.builder(openTelemetry, "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation)); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index b42e9e7221..223e9284a2 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -24,6 +24,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.Tracer; import io.smallrye.mutiny.Multi; @@ -161,6 +162,9 @@ public SpanBuilder spanBuilder(final String spanName) { @Any Instance failureHandlerFactories; + @Inject + Instance openTelemetryInstance; + @Inject KafkaCDIEvents kafkaCDIEvents; @@ -209,7 +213,7 @@ public Flow.Publisher> getPublisher(Config config) { }); if (partitions == 1) { - KafkaSource source = new KafkaSource<>(vertx, group, ic, + KafkaSource source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance, commitHandlerFactories, failureHandlerFactories, consumerRebalanceListeners, kafkaCDIEvents, deserializationFailureHandlers, -1); @@ -231,7 +235,7 @@ public Flow.Publisher> getPublisher(Config config) { // create an instance of source per partitions. List>> streams = new ArrayList<>(); for (int i = 0; i < partitions; i++) { - KafkaSource source = new KafkaSource<>(vertx, group, ic, + KafkaSource source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance, commitHandlerFactories, failureHandlerFactories, consumerRebalanceListeners, kafkaCDIEvents, deserializationFailureHandlers, i); @@ -268,7 +272,8 @@ public Flow.Subscriber> getSubscriber(Config config) { if (oc.getHealthReadinessTimeout().isPresent()) { log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout"); } - KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors); + KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, openTelemetryInstance, + serializationFailureHandlers, producerInterceptors); sinks.add(sink); return sink.getSink(); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index 7034a2e2f7..1c1a614743 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata; @@ -78,7 +79,9 @@ public class KafkaSink { private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter; - public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents, + public KafkaSink(KafkaConnectorOutgoingConfiguration config, + KafkaCDIEvents kafkaCDIEvents, + Instance openTelemetryInstance, Instance> serializationFailureHandlers, Instance> producerInterceptors) { this.isTracingEnabled = config.getTracingEnabled(); @@ -134,7 +137,7 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk })); if (isTracingEnabled) { - kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink(); + kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink(openTelemetryInstance); } else { kafkaInstrumenter = null; } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index 06c6d35dd1..872e10c3ac 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.header.Header; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -73,6 +74,7 @@ public class KafkaSource { public KafkaSource(Vertx vertx, String consumerGroup, KafkaConnectorIncomingConfiguration config, + Instance openTelemetryInstance, Instance commitHandlerFactories, Instance failureHandlerFactories, Instance consumerRebalanceListeners, @@ -227,7 +229,7 @@ public KafkaSource(Vertx vertx, } if (isTracingEnabled) { - kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource(); + kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource(openTelemetryInstance); } else { kafkaInstrumenter = null; } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyFactory.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyFactory.java index f67898df05..fa85071a20 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyFactory.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyFactory.java @@ -15,6 +15,7 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Channel; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.reactive.messaging.ChannelRegistry; import io.smallrye.reactive.messaging.EmitterConfiguration; import io.smallrye.reactive.messaging.EmitterFactory; @@ -75,10 +76,13 @@ public class KafkaRequestReplyFactory implements EmitterFactory> configurations; + @Inject + Instance openTelemetryInstance; + @Override public KafkaRequestReplyImpl createEmitter(EmitterConfiguration configuration, long defaultBufferSize) { return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), configurations, holder.vertx(), - kafkaCDIEvents, commitStrategyFactories, failureStrategyFactories, failureHandlers, + kafkaCDIEvents, openTelemetryInstance, commitStrategyFactories, failureStrategyFactories, failureHandlers, correlationIdHandlers, replyFailureHandlers, rebalanceListeners); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 42866b9049..a269c674cd 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -26,6 +26,7 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Experimental; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -80,6 +81,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config, Instance> configurations, Vertx vertx, KafkaCDIEvents kafkaCDIEvents, + Instance openTelemetryInstance, Instance commitHandlerFactory, Instance failureHandlerFactories, Instance> deserializationFailureHandlers, @@ -116,7 +118,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config, String consumerGroup = consumerConfig.getGroupId().orElseGet(() -> UUID.randomUUID().toString()); this.waitForPartitions = getWaitForPartitions(consumerConfig); this.gracefulShutdown = consumerConfig.getGracefulShutdown(); - this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig, + this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig, openTelemetryInstance, commitHandlerFactory, failureHandlerFactories, rebalanceListeners, kafkaCDIEvents, deserializationFailureHandlers, -1); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java index 6b19f49537..4f64fbf397 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/tracing/KafkaOpenTelemetryInstrumenter.java @@ -1,8 +1,10 @@ package io.smallrye.reactive.messaging.kafka.tracing; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; -import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -23,22 +25,22 @@ private KafkaOpenTelemetryInstrumenter(Instrumenter instrument this.instrumenter = instrumenter; } - public static KafkaOpenTelemetryInstrumenter createForSource() { - return create(true); + public static KafkaOpenTelemetryInstrumenter createForSource(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); } - public static KafkaOpenTelemetryInstrumenter createForSink() { - return create(false); + public static KafkaOpenTelemetryInstrumenter createForSink(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); } - private static KafkaOpenTelemetryInstrumenter create(boolean source) { + private static KafkaOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) { MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH; KafkaAttributesExtractor kafkaAttributesExtractor = new KafkaAttributesExtractor(); MessagingAttributesGetter messagingAttributesGetter = kafkaAttributesExtractor .getMessagingAttributesGetter(); - InstrumenterBuilder builder = Instrumenter.builder(GlobalOpenTelemetry.get(), + InstrumenterBuilder builder = Instrumenter.builder(openTelemetry, "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation)); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java index b016a29acf..271787e057 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java @@ -63,6 +63,7 @@ public void testSinkUsingInteger() { .with("channel-name", "testSinkUsingInteger"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = sink.getSink(); @@ -84,6 +85,7 @@ public void testSinkUsingIntegerAndChannelName() { .with("partition", 0); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = sink.getSink(); @@ -106,6 +108,7 @@ public void testSinkUsingString() { .with("channel-name", "testSinkUsingString"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = sink.getSink(); @@ -237,7 +240,8 @@ public void testInvalidPayloadType() { .with("retries", 0L); // disable retry. KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents(); - sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -286,8 +290,8 @@ public void testInvalidTypeWithDefaultInflightMessages() { .with("retries", 0L) .with("channel-name", "testInvalidTypeWithDefaultInflightMessages"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 8cdfd29ab5..22656acb20 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -74,7 +74,8 @@ public void testSource() { MapBasedConfig config = newCommonConfigForSource() .with("value.deserializer", IntegerDeserializer.class.getName()); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -96,7 +97,8 @@ public void testSourceWithPartitions() { companion.topics().createAndWait(topic, 3, Duration.ofMinutes(1)); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -118,7 +120,8 @@ public void testSourceWithChannelName() { MapBasedConfig config = newCommonConfigForSource() .with("value.deserializer", IntegerDeserializer.class.getName()); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List messages = new ArrayList<>(); @@ -227,7 +230,8 @@ public void testRetry() { KafkaCompanion kafkaCompanion = new KafkaCompanion(kafka.getBootstrapServers()); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List messages1 = new ArrayList<>(); @@ -495,7 +499,8 @@ public void testInvalidIncomingType() { MapBasedConfig config = newCommonConfigForSource() .with("value.deserializer", IntegerDeserializer.class.getName()); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -559,7 +564,8 @@ public void testSourceWithEmptyOptionalConfiguration() { .with("sasl.jaas.config", "") //optional configuration .with("sasl.mechanism", ""); //optional configuration KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/SourceCloseTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/SourceCloseTest.java index 2c95c24a68..0a1fef0128 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/SourceCloseTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/SourceCloseTest.java @@ -49,11 +49,13 @@ public void testNoLostMessagesOnClose() { List list = new ArrayList<>(); KafkaSource source1 = new KafkaSource<>(vertx, groupId, - new KafkaConnectorIncomingConfiguration(config1), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config1), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); KafkaSource source2 = new KafkaSource<>(vertx, groupId, - new KafkaConnectorIncomingConfiguration(config2), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config2), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java index 677d4bbb47..09e728ffd6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java @@ -59,8 +59,8 @@ public void testSendingStructuredCloudEvents() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -103,8 +103,8 @@ public void testSendingStructuredCloudEventsWithComplexPayload() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -146,8 +146,8 @@ public void testSendingStructuredCloudEventsWithTimestampAndSubject() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -191,8 +191,8 @@ public void testSendingStructuredCloudEventsMissingMandatoryAttribute() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Message message = Message.of("hello").addMetadata(OutgoingCloudEventMetadata.builder() .withSource(URI.create("test://test")) @@ -225,8 +225,8 @@ public void testSendingStructuredCloudEventsWithWrongSerializer() { config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - assertThatThrownBy(() -> new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance())) + assertThatThrownBy(() -> new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance())) .isInstanceOf(IllegalStateException.class); } @@ -239,8 +239,8 @@ public void testSendingStructuredCloudEventsWithKey() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -282,8 +282,8 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSource() { config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -321,8 +321,8 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSourceAndNoClou config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -356,8 +356,8 @@ public void testSendingStructuredCloudEventsWithExtensions() { config.put("channel-name", topic); config.put("cloud-events-mode", "structured"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -398,8 +398,8 @@ public void testSendingBinaryCloudEvents() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -435,8 +435,8 @@ public void testSendingBinaryCloudEventsWithContentType() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -476,8 +476,8 @@ public void testSendingBinaryCloudEventsWithKey() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -518,8 +518,8 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSource() { config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -556,8 +556,8 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSourceButNoMetadata config.put("cloud-events-type", "my type"); config.put("cloud-events-source", "http://acme.org"); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -589,8 +589,8 @@ public void testSendingBinaryCloudEventsMissingMandatoryAttribute() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Message message = Message.of("hello").addMetadata(OutgoingCloudEventMetadata.builder() .withSource(URI.create("test://test")) @@ -624,8 +624,8 @@ public void testWithCloudEventDisabled() { config.put("key", "my-key"); config.put("cloud-events", false); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { @@ -655,8 +655,8 @@ public void testSendingBinaryCloudEventsWithExtensions() { config.put("value.serializer", StringSerializer.class.getName()); config.put("channel-name", topic); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); - sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), - UnsatisfiedInstance.instance()); + sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); try (ConsumerTask records = companion.consumeStrings().fromTopics(topic)) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceBatchWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceBatchWithCloudEventsTest.java index 267e7772aa..bfb54ed406 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceBatchWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceBatchWithCloudEventsTest.java @@ -72,7 +72,8 @@ public void testReceivingStructuredCloudEventsWithJsonObjectDeserializer() { config.put("value.deserializer", JsonObjectSerde.JsonObjectDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -126,7 +127,8 @@ public void testReceivingStructuredCloudEventsWithUnsupportedDeserializer() { config.put("value.deserializer", BufferSerde.BufferDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -159,7 +161,8 @@ public void testReceivingBinaryCloudEvents() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -305,7 +308,8 @@ public void testReceivingBinaryCloudEventsWithSupportDisabled() { config.put("channel-name", topic); config.put("cloud-events", false); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -343,7 +347,8 @@ public void testReceivingStructuredCloudEventsWithSupportDisabled() { config.put("channel-name", topic); config.put("cloud-events", false); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -385,7 +390,8 @@ public void testReceivingStructuredCloudEventsNoData() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java index 42c55caab3..f7cb0f6928 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java @@ -67,7 +67,8 @@ public void testReceivingStructuredCloudEventsWithStringDeserializer() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -124,7 +125,8 @@ public void testReceivingStructuredCloudEventsWithJsonObjectDeserializer() { config.put("value.deserializer", JsonObjectSerde.JsonObjectDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -177,7 +179,8 @@ public void testReceivingStructuredCloudEventsWithByteArrayDeserializer() { config.put("value.deserializer", ByteArrayDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -227,7 +230,8 @@ public void testReceivingStructuredCloudEventsWithUnsupportedDeserializer() { config.put("value.deserializer", BufferSerde.BufferDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -259,7 +263,8 @@ public void testReceivingStructuredCloudEventsWithoutSource() { config.put("value.deserializer", JsonObjectSerde.JsonObjectDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -293,7 +298,8 @@ public void testReceivingBinaryCloudEvents() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -353,7 +359,8 @@ public void testReceivingBinaryCloudEventsWithoutKey() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -413,7 +420,8 @@ public void testReceivingStructuredCloudEventsWithoutMatchingContentTypeIsNotRea config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -543,7 +551,8 @@ public void testReceivingBinaryCloudEventsWithSupportDisabled() { config.put("channel-name", topic); config.put("cloud-events", false); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -582,7 +591,8 @@ public void testReceivingStructuredCloudEventsWithSupportDisabled() { config.put("channel-name", topic); config.put("cloud-events", false); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); @@ -624,7 +634,8 @@ public void testReceivingStructuredCloudEventsNoData() { config.put("value.deserializer", StringDeserializer.class.getName()); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java index 11c73175c3..68f7641e08 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ClientTestBase.java @@ -130,8 +130,8 @@ public KafkaSource createSource(MapBasedConfig config, String g SingletonInstance listeners = new SingletonInstance<>(groupId, getKafkaConsumerRebalanceListenerAwaitingAssignation()); - source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, - failureHandlerFactories, + source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); sources.add(source); return source; @@ -145,8 +145,8 @@ public KafkaSource createSourceSeekToBeginning() { SingletonInstance listeners = new SingletonInstance<>(groupId, getKafkaConsumerRebalanceListenerAwaitingAssignationAndSeekToBeginning()); - source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, - failureHandlerFactories, + source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); sources.add(source); return source; @@ -161,8 +161,9 @@ public KafkaSource createSourceAssignAndSeek() { SingletonInstance listeners = new SingletonInstance<>(groupId, getKafkaConsumerRebalanceListenerAwaitingAssignation()); - source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, - failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); + source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, + listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); sources.add(source); return source; } @@ -176,7 +177,7 @@ public KafkaSource createSourceSeekToEnd() { getKafkaConsumerRebalanceListenerAwaitingAssignationAndSeekToEnd()); source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); sources.add(source); return source; @@ -191,7 +192,7 @@ public KafkaSource createSourceSeekToOffset() { getKafkaConsumerRebalanceListenerAwaitingAssignationAndSeekToOffset()); source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); sources.add(source); return source; @@ -199,7 +200,7 @@ public KafkaSource createSourceSeekToOffset() { public KafkaSource createSource(MapBasedConfig config, int index) { source = new KafkaSource<>(vertx, "groupId", new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), index); sources.add(source); return source; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/HighLatencyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/HighLatencyTest.java index 590a568ed5..afe4ac0fd2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/HighLatencyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/HighLatencyTest.java @@ -66,7 +66,8 @@ public void testHighLatency() throws InterruptedException, IOException { .with("retry-max-wait", 30); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); - source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); List> messages1 = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java index 3fab2ba25e..722dc7f695 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/KafkaClientReactiveStreamsPublisherTest.java @@ -121,8 +121,8 @@ public KafkaSource createSource() { MapBasedConfig config = createConsumerConfig(groupId) .put("topic", topic); - source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, - failureHandlerFactories, + source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java index 3df6b28650..88879f62b6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java @@ -52,7 +52,8 @@ void testLazyInitializedProducer() { MapBasedConfig config = new MapBasedConfig(props); KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); KafkaProducer producer = sink.getProducer(); assertThat(producer).isNotNull(); @@ -91,7 +92,8 @@ void testEagerInitializedProducer() { assertThatThrownBy(() -> { KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).hasCauseInstanceOf(KafkaException.class); } @@ -111,6 +113,7 @@ void testLazyInitializedConsumer() { KafkaSource source = new KafkaSource<>(Vertx.vertx(), groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), @@ -150,6 +153,7 @@ void testEagerInitializedConsumer() { assertThatThrownBy(() -> { KafkaSource source = new KafkaSource<>(Vertx.vertx(), groupId, new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeTest.java index 05772a006d..846562555f 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/PauseResumeTest.java @@ -23,6 +23,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.reactive.messaging.kafka.*; +import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -58,7 +59,8 @@ void testPauseResumeBuffer() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -123,7 +125,8 @@ void testRebalanceDuringPaused() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -195,7 +198,8 @@ void testRebalanceDuringPausedWithDifferentPartitions() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -259,7 +263,8 @@ void testPauseResumeWithBlockingConsumption() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -309,7 +314,8 @@ void testPauseResumeWithBlockingConsumptionAndConcurrency() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java index a64e29a404..548b5df7d4 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaBatchConsumerTest.java @@ -48,7 +48,7 @@ public KafkaSource createSource(MapBasedConfig config, String g getKafkaConsumerRebalanceListenerAwaitingAssignation()); source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); return source; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java index e7259b162c..a9975b9c7b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaConsumerTest.java @@ -235,7 +235,7 @@ public void testOffsetResetLatest() throws Exception { getKafkaConsumerRebalanceListenerAwaitingAssignation()); source = new KafkaSource<>(vertx, groupId, new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, listeners, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 0); AssertSubscriber> subscriber = source.getStream() @@ -418,7 +418,8 @@ public void testRebalanceWhilePausedAndPendingCommit() throws Exception { // The rebalance will split the partitions between the 2 sources, but both will restaert from offset 0, as nothing // has been acked. KafkaSource source2 = new KafkaSource<>(vertx, groupId, - new KafkaConnectorIncomingConfiguration(config2), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config2), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), 3); source2.getStream() diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java index 8140cfea54..5683ec0fb5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java @@ -165,15 +165,15 @@ public KafkaSink createSink() { .put("channel-name", "test-" + ThreadLocalRandom.current().nextInt()) .put("topic", topic); - KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } public KafkaSink createSink(MapBasedConfig config) { - KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } @@ -186,8 +186,8 @@ public KafkaSink createTransactionalSink() { .with(ProducerConfig.ACKS_CONFIG, "all") .with("topic", topic); - KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), - CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + KafkaSink sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); this.sinks.add(sink); return sink; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java index d54bd46dd2..f0aeb0401a 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/BatchCommitStrategiesTest.java @@ -25,6 +25,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata; +import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -60,7 +61,8 @@ void testLatestCommitStrategy() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -141,7 +143,8 @@ void testThrottledStrategy() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -198,7 +201,8 @@ void testThrottledStrategyWithManyRecords() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -289,7 +293,8 @@ void testThrottledStrategyWithTooManyUnackedMessages() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -357,7 +362,8 @@ public void testWithRebalanceListenerMatchGivenName() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java index 0b944cda61..e9acc24462 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/CommitStrategiesTest.java @@ -28,6 +28,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -63,7 +64,8 @@ void testLatestCommitStrategy() { .with("lazy-client", true) .with("client.id", UUID.randomUUID().toString()); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -167,7 +169,8 @@ void testThrottledStrategy() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -234,7 +237,8 @@ void testThrottledStrategyWithManyRecords() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -317,7 +321,8 @@ void testThrottledStrategyWithTooManyUnackedMessages() throws Exception { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -385,7 +390,8 @@ public void testFailureWhenNoRebalanceListenerMatchGivenName() { String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); }).isInstanceOf(UnsatisfiedResolutionException.class); @@ -400,7 +406,8 @@ public void testFailureWhenMultipleRebalanceListenerMatchGivenName() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1)) .isInstanceOf(AmbiguousResolutionException.class).hasMessageContaining("mine"); @@ -416,7 +423,8 @@ public void testWithRebalanceListenerMatchGivenName() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java index 4ccdec792e..7e81f7274c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/DeprecatedCommitStrategiesTest.java @@ -42,6 +42,7 @@ import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -84,7 +85,8 @@ void testLatestCommitStrategy() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -188,7 +190,8 @@ void testThrottledStrategy() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -254,7 +257,8 @@ void testThrottledStrategyWithManyRecords() { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -335,7 +339,8 @@ void testThrottledStrategyWithTooManyUnackedMessages() throws Exception { .with("auto.commit.interval.ms", 100); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); @@ -403,7 +408,8 @@ public void testFailureWhenNoRebalanceListenerMatchGivenName() { String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); }).isInstanceOf(UnsatisfiedResolutionException.class); @@ -418,7 +424,8 @@ public void testFailureWhenMultipleRebalanceListenerMatchGivenName() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1)) .isInstanceOf(DeploymentException.class).hasMessageContaining("mine"); @@ -434,7 +441,8 @@ public void testWithRebalanceListenerMatchGivenName() { .with("client.id", UUID.randomUUID().toString()); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStoreTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStoreTest.java index 44f7ba8131..96f2463939 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStoreTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStoreTest.java @@ -122,6 +122,7 @@ public void testMultipleIndependentConsumers(@TempDir File tempDir) { source = new KafkaSource<>(vertx, groupId, ic, + UnsatisfiedInstance.instance(), checkpointFactory, failureHandlerFactories, UnsatisfiedInstance.instance(), @@ -148,6 +149,7 @@ public void testMultipleIndependentConsumers(@TempDir File tempDir) { source2 = new KafkaSource<>(vertx, groupId, ic2, + UnsatisfiedInstance.instance(), checkpointFactory, failureHandlerFactories, UnsatisfiedInstance.instance(), diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java index feb0716270..121b099398 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/KafkaCommitHandlerTest.java @@ -64,7 +64,7 @@ public void testSourceWithAutoCommitEnabled() throws ExecutionException, Timeout source = new KafkaSource<>(vertx, "test-source-with-auto-commit-enabled", ic, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -107,7 +107,7 @@ public void testSourceWithAutoCommitDisabled() throws ExecutionException, Interr KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); source = new KafkaSource<>(vertx, "test-source-with-auto-commit-disabled", ic, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -146,7 +146,7 @@ public void testSourceWithThrottledLatestProcessedCommitEnabled() { KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); source = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit", ic, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -195,7 +195,7 @@ public void testSourceWithThrottledLatestProcessedCommitEnabledWithoutAck() { KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); source = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit-without-acking", ic, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -249,14 +249,14 @@ public void testSourceWithThrottledAndRebalance() { KafkaConnectorIncomingConfiguration ic2 = new KafkaConnectorIncomingConfiguration(config2); source = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit", ic1, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); KafkaSource source2 = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit", ic2, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -343,14 +343,14 @@ void testSourceWithThrottledAndRebalanceWithPartitionsConfig() { KafkaConnectorIncomingConfiguration ic2 = new KafkaConnectorIncomingConfiguration(config2); source = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit", ic1, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); KafkaSource source2 = new KafkaSource<>(vertx, "test-source-with-throttled-latest-processed-commit", ic2, - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java index c2b88e9838..f516618418 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RebalanceTest.java @@ -20,6 +20,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.kafka.base.WeldTestBase; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -58,7 +59,7 @@ void testRebalance() throws InterruptedException { .with("auto.commit.interval.ms", 100); source = new KafkaSource<>(vertx, group, new KafkaConnectorIncomingConfiguration(config), - commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, getConsumerRebalanceListeners(), CountKafkaCdiEvents.noCdiEvents, getDeserializationFailureHandlers(), -1); injectMockConsumer(source, consumer); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStoreTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStoreTest.java index 8c60e50685..18c0919dc9 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStoreTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/RedisCheckpointStateStoreTest.java @@ -157,6 +157,7 @@ public void testMultipleIndependentConsumers() { source = new KafkaSource<>(vertx, groupId, ic, + UnsatisfiedInstance.instance(), checkpointFactory, failureHandlerFactories, UnsatisfiedInstance.instance(), @@ -183,6 +184,7 @@ public void testMultipleIndependentConsumers() { source2 = new KafkaSource<>(vertx, groupId, ic2, + UnsatisfiedInstance.instance(), checkpointFactory, failureHandlerFactories, UnsatisfiedInstance.instance(), diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/KeyDeserializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/KeyDeserializerConfigurationTest.java index 063787d77c..dfff25c2f5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/KeyDeserializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/KeyDeserializerConfigurationTest.java @@ -51,7 +51,8 @@ public void testThatWhenKeyDeserializerIsNotSetStringIsUsed() { String group = UUID.randomUUID().toString(); MapBasedConfig config = commonConsumerConfiguration(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -79,7 +80,8 @@ public void testKeyDeserializationFailureWhenNoDeserializerSet() { String group = UUID.randomUUID().toString(); MapBasedConfig config = commonConsumerConfiguration(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -119,7 +121,8 @@ public void testKeyDeserializationFailureWithDeserializerSet() { .with("key.deserializer", JsonObjectSerde.JsonObjectDeserializer.class.getName()) .with("fail-on-deserialization-failure", false); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -160,7 +163,8 @@ public void testThatUnderlyingDeserializerReceiveTheConfiguration() { .with("key.deserializer", ConstantDeserializer.class.getName()) .with("deserializer.value", "constant"); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -202,7 +206,8 @@ public void testKeyDeserializationFailureWithMatchingHandler() { JsonObject fallback = new JsonObject().put("fallback", "fallback"); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("my-deserialization-handler", new DeserializationFailureHandler() { @@ -272,7 +277,8 @@ public void testKeyDeserializationFailureWithMatchingHandlerReturningNull() { .with("key-deserialization-failure-handler", "my-deserialization-handler"); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("my-deserialization-handler", new DeserializationFailureHandler() { @@ -324,7 +330,8 @@ public void testKeyDeserializationFailureWithNoMatchingHandler() { assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("not-matching", new DeserializationFailureHandler() { @@ -368,7 +375,8 @@ public JsonObject handleDeserializationFailure(String topic, boolean isKey, Stri String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new DoubleInstance<>("my-deserialization-handler", i1, i2), -1); @@ -381,7 +389,8 @@ public void testKeyDeserializerFailsDuringConfig() { .with("key.deserializer", BrokenDeserializerFailingDuringConfig.class.getName()); String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1)) .isInstanceOf(KafkaException.class) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java index a9d6552a3f..518909d788 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java @@ -44,7 +44,7 @@ public void cleanup() { public void testThatWhenNotSetKeySerializerIsString() { MapBasedConfig config = commonConsumerConfiguration(); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); ConsumerTask consumed = companion.consumeStrings().fromTopics(topic, 4, Duration.ofSeconds(10)); @@ -76,7 +76,7 @@ public void testKeySerializationFailure() { .with("key.serializer", JsonObjectSerde.JsonObjectSerializer.class.getName()) .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( @@ -95,7 +95,7 @@ public void testValueSerializationFailure() { .with("key.serializer", JsonObjectSerde.JsonObjectSerializer.class.getName()) .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( @@ -114,7 +114,7 @@ public void testFailureWhenValueSerializerIsNotSet() { assertThatThrownBy(() -> { sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("value.serializer"); } @@ -126,7 +126,7 @@ public void testFailureWhenSerializerFailsDuringConfiguration() { assertThatThrownBy(() -> { sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, - UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); + UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance()); }).isInstanceOf(KafkaException.class) .hasCauseInstanceOf(IllegalStateException.class) .hasStackTraceContaining("boom"); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/ValueDeserializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/ValueDeserializerConfigurationTest.java index d4ea0cc32f..016027d244 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/ValueDeserializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/ValueDeserializerConfigurationTest.java @@ -62,7 +62,8 @@ public void testMissingValueDeserializerInConfig() { String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("value.deserializer"); @@ -73,7 +74,8 @@ public void testValueDeserializationFailureWhenNoDeserializerSet() { MapBasedConfig config = commonConsumerConfiguration(); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -113,7 +115,8 @@ public void testValueDeserializationFailureWithDeserializerSet() { .with("fail-on-deserialization-failure", false); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -153,7 +156,8 @@ public void testValueDeserializationFailureWithDeserializerSetWithFatalFailureOn .with("health-enabled", true); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -177,7 +181,8 @@ public void testThatUnderlyingDeserializerReceiveTheConfiguration() { .with("deserializer.value", "constant"); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); @@ -219,7 +224,8 @@ public void testValueDeserializationFailureWithMatchingHandler() { JsonObject fallback = new JsonObject().put("fallback", "fallback"); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), UnsatisfiedInstance.instance(), + commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("my-deserialization-handler", new DeserializationFailureHandler() { @@ -290,7 +296,8 @@ public void testWhenBothValueAndKeyFailureHandlerAreSetToTheSameHandler() { JsonObject fallbackForKey = new JsonObject().put("fallback", "key"); String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("my-deserialization-handler", new DeserializationFailureHandler() { @@ -356,7 +363,8 @@ public void testValueDeserializationFailureWithMatchingHandlerReturningNull() { String group = UUID.randomUUID().toString(); source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("my-deserialization-handler", new DeserializationFailureHandler() { @@ -408,7 +416,8 @@ public void testValueDeserializationFailureWithNoMatchingHandler() { String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new SingletonInstance<>("not-matching", new DeserializationFailureHandler() { @@ -452,7 +461,8 @@ public JsonObject handleDeserializationFailure(String topic, boolean isKey, Stri String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> { source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, new DoubleInstance<>("my-deserialization-handler", i1, i2), -1); @@ -465,7 +475,8 @@ public void testWhenValueDeserializerFailsDuringConfig() { .with("value.deserializer", BrokenDeserializerFailingDuringConfig.class.getName()); String group = UUID.randomUUID().toString(); assertThatThrownBy(() -> source = new KafkaSource<>(vertx, group, - new KafkaConnectorIncomingConfiguration(config), commitHandlerFactories, failureHandlerFactories, + new KafkaConnectorIncomingConfiguration(config), + UnsatisfiedInstance.instance(), commitHandlerFactories, failureHandlerFactories, UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1)) .isInstanceOf(KafkaException.class) diff --git a/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java index 74bc791822..4e0303e380 100644 --- a/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java +++ b/smallrye-reactive-messaging-otel/src/main/java/io/smallrye/reactive/messaging/tracing/TracingUtils.java @@ -2,8 +2,12 @@ import java.util.Optional; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -15,6 +19,13 @@ public class TracingUtils { private TracingUtils() { } + public static OpenTelemetry getOpenTelemetry(Instance openTelemetryInstance) { + if (openTelemetryInstance.isResolvable()) { + return openTelemetryInstance.get(); + } + return GlobalOpenTelemetry.get(); + } + public static void traceOutgoing(Instrumenter instrumenter, Message message, T trace) { Optional tracingMetadata = TracingMetadata.fromMessage(message); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java index df9fcbadd2..2b5db2b62d 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java @@ -32,6 +32,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; @@ -94,6 +95,9 @@ public class PulsarConnector implements InboundConnector, OutboundConnector, Hea @Any private Instance failureHandlerFactories; + @Inject + private Instance openTelemetryInstance; + @PostConstruct void init() { this.vertx = executionHolder.vertx(); @@ -111,7 +115,7 @@ public Flow.Publisher> getPublisher(Config config) { PulsarIncomingChannel channel = new PulsarIncomingChannel<>(client, vertx, schemaResolver.getSchema(ic), CDIUtils.getInstanceById(ackHandlerFactories, ic.getAckStrategy()).get(), CDIUtils.getInstanceById(failureHandlerFactories, ic.getFailureStrategy()).get(), - ic, configResolver); + ic, configResolver, openTelemetryInstance); incomingChannels.add(channel); return channel.getPublisher(); } catch (PulsarClientException e) { @@ -129,7 +133,7 @@ public Flow.Subscriber> getSubscriber(Config config) { try { PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, schemaResolver.getSchema(oc), oc, - configResolver); + configResolver, openTelemetryInstance); outgoingChannels.add(channel); return channel.getSubscriber(); } catch (PulsarClientException e) { diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index 2da71b1516..4e3764ea7c 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import jakarta.enterprise.inject.Instance; + import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -19,19 +21,11 @@ import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.eclipse.microprofile.reactive.messaging.Message; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.health.HealthReport; -import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor; +import io.smallrye.reactive.messaging.pulsar.tracing.PulsarOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace; -import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapGetter; -import io.smallrye.reactive.messaging.tracing.TracingUtils; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -57,13 +51,14 @@ public class PulsarIncomingChannel { private final boolean tracingEnabled; - private final Instrumenter instrumenter; + private final PulsarOpenTelemetryInstrumenter instrumenter; public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, PulsarAckHandler.Factory ackHandlerFactory, PulsarFailureHandler.Factory failureHandlerFactory, PulsarConnectorIncomingConfiguration ic, - ConfigResolver configResolver) throws PulsarClientException { + ConfigResolver configResolver, + Instance openTelemetryInstance) throws PulsarClientException { this.channel = ic.getChannel(); this.healthEnabled = ic.getHealthEnabled(); this.tracingEnabled = ic.getTracingEnabled(); @@ -159,18 +154,11 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, this.publisher = batchReceiveMulti; } - PulsarAttributesExtractor attributesExtractor = new PulsarAttributesExtractor(); - MessagingAttributesGetter messagingAttributesGetter = attributesExtractor - .getMessagingAttributesGetter(); - InstrumenterBuilder instrumenterBuilder = Instrumenter.builder(GlobalOpenTelemetry.get(), - "io.smallrye.reactive.messaging", - MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.RECEIVE)); - - instrumenter = instrumenterBuilder - .addAttributesExtractor( - MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.RECEIVE)) - .addAttributesExtractor(attributesExtractor) - .buildConsumerInstrumenter(PulsarTraceTextMapGetter.INSTANCE); + if (tracingEnabled) { + instrumenter = PulsarOpenTelemetryInstrumenter.createForSource(openTelemetryInstance); + } else { + instrumenter = null; + } } private static boolean schemaRequiresBlockingFetch(Schema schema) { @@ -183,7 +171,7 @@ private static boolean schemaRequiresBlockingFetch(Schema schema) { public void incomingTrace(PulsarMessage pulsarMessage) { PulsarIncomingMessageMetadata metadata = pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get(); - TracingUtils.traceIncoming(instrumenter, pulsarMessage, new PulsarTrace.Builder() + instrumenter.traceIncoming(pulsarMessage, new PulsarTrace.Builder() .withConsumerName(consumer.getConsumerName()) .withMessage(metadata.getMessage()) .build()); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java index 4d049ce597..8a3c3513f3 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java @@ -9,6 +9,8 @@ import java.util.concurrent.Flow; import java.util.stream.Collectors; +import jakarta.enterprise.inject.Instance; + import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -19,23 +21,15 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.eclipse.microprofile.reactive.messaging.Message; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.providers.helpers.SenderProcessor; -import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor; +import io.smallrye.reactive.messaging.pulsar.tracing.PulsarOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace; -import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapSetter; import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata; -import io.smallrye.reactive.messaging.tracing.TracingUtils; public class PulsarOutgoingChannel { @@ -46,10 +40,10 @@ public class PulsarOutgoingChannel { private final boolean healthEnabled; private final List failures = new ArrayList<>(); private final boolean tracingEnabled; - private final Instrumenter instrumenter; + private final PulsarOpenTelemetryInstrumenter instrumenter; public PulsarOutgoingChannel(PulsarClient client, Schema schema, PulsarConnectorOutgoingConfiguration oc, - ConfigResolver configResolver) throws PulsarClientException { + ConfigResolver configResolver, Instance openTelemetryInstance) throws PulsarClientException { this.channel = oc.getChannel(); this.healthEnabled = oc.getHealthEnabled(); this.tracingEnabled = oc.getTracingEnabled(); @@ -85,18 +79,11 @@ public PulsarOutgoingChannel(PulsarClient client, Schema schema, PulsarConnec reportFailure(f); })); - PulsarAttributesExtractor attributesExtractor = new PulsarAttributesExtractor(); - MessagingAttributesGetter messagingAttributesGetter = attributesExtractor - .getMessagingAttributesGetter(); - InstrumenterBuilder instrumenterBuilder = Instrumenter.builder(GlobalOpenTelemetry.get(), - "io.smallrye.reactive.messaging", - MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH)); - - instrumenter = instrumenterBuilder - .addAttributesExtractor( - MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH)) - .addAttributesExtractor(attributesExtractor) - .buildProducerInstrumenter(PulsarTraceTextMapSetter.INSTANCE); + if (tracingEnabled) { + instrumenter = PulsarOpenTelemetryInstrumenter.createForSink(openTelemetryInstance); + } else { + instrumenter = null; + } } private Uni sendMessage(Message message) { @@ -133,7 +120,7 @@ private TypedMessageBuilder toMessageBuilder(Message message, Producer .withTopic(producer.getTopic()) .build(); properties = trace.getProperties(); - TracingUtils.traceOutgoing(instrumenter, message, trace); + instrumenter.traceOutgoing(message, trace); } messageBuilder = createMessageBuilder(message, metadata.getTransaction()); @@ -171,7 +158,7 @@ private TypedMessageBuilder toMessageBuilder(Message message, Producer PulsarTrace trace = new PulsarTrace.Builder() .withTopic(producer.getTopic()) .build(); - TracingUtils.traceOutgoing(instrumenter, message, trace); + instrumenter.traceOutgoing(message, trace); messageBuilder.properties(trace.getProperties()); } } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java new file mode 100644 index 0000000000..9c4df7dd5a --- /dev/null +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/tracing/PulsarOpenTelemetryInstrumenter.java @@ -0,0 +1,59 @@ +package io.smallrye.reactive.messaging.pulsar.tracing; + +import jakarta.enterprise.inject.Instance; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.smallrye.reactive.messaging.tracing.TracingUtils; + +public class PulsarOpenTelemetryInstrumenter { + private final Instrumenter instrumenter; + + public PulsarOpenTelemetryInstrumenter(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + public static PulsarOpenTelemetryInstrumenter createForSource(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); + } + + public static PulsarOpenTelemetryInstrumenter createForSink(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); + } + + private static PulsarOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) { + + MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH; + + PulsarAttributesExtractor attributesExtractor = new PulsarAttributesExtractor(); + MessagingAttributesGetter messagingAttributesGetter = attributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder builder = Instrumenter.builder( + openTelemetry, "io.smallrye.reactive.messaging", + MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation)); + + builder.addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, messageOperation)) + .addAttributesExtractor(attributesExtractor); + + if (source) { + return new PulsarOpenTelemetryInstrumenter(builder.buildConsumerInstrumenter(PulsarTraceTextMapGetter.INSTANCE)); + } else { + return new PulsarOpenTelemetryInstrumenter(builder.buildProducerInstrumenter(PulsarTraceTextMapSetter.INSTANCE)); + } + } + + public void traceOutgoing(Message message, PulsarTrace trace) { + TracingUtils.traceOutgoing(instrumenter, message, trace); + } + + public void traceIncoming(Message message, PulsarTrace trace) { + TracingUtils.traceIncoming(instrumenter, message, trace); + } +} diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/IncomingMetadataTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/IncomingMetadataTest.java index 7938bfcd0c..08196aa8e5 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/IncomingMetadataTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/IncomingMetadataTest.java @@ -14,6 +14,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck; import io.smallrye.reactive.messaging.pulsar.base.PulsarBaseTest; +import io.smallrye.reactive.messaging.pulsar.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -26,7 +27,7 @@ void testIncomingMetadata() throws PulsarClientException { PulsarConnectorIncomingConfiguration ic = new PulsarConnectorIncomingConfiguration(config()); PulsarIncomingChannel channel = new PulsarIncomingChannel<>(client, vertx, Schema.INT32, new PulsarMessageAck.Factory(), new PulsarNack.Factory(), - ic, configResolver); + ic, configResolver, UnsatisfiedInstance.instance()); Multi.createFrom().publisher(channel.getPublisher()) .subscribe().with(messages::add); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/OutgoingMetadataTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/OutgoingMetadataTest.java index b9f7f79c00..4903a66f59 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/OutgoingMetadataTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/OutgoingMetadataTest.java @@ -12,10 +12,10 @@ import org.apache.pulsar.client.api.Schema; import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.pulsar.base.PulsarBaseTest; +import io.smallrye.reactive.messaging.pulsar.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; public class OutgoingMetadataTest extends PulsarBaseTest { @@ -26,7 +26,8 @@ void testOutgoingMetadata() throws PulsarClientException { PulsarConnectorOutgoingConfiguration oc = new PulsarConnectorOutgoingConfiguration(config() .with("maxPendingMessagesAcrossPartitions", 10)); - PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver); + PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver, + UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = channel.getSubscriber(); @@ -64,7 +65,8 @@ void testOutgoingMessage() throws PulsarClientException { PulsarConnectorOutgoingConfiguration oc = new PulsarConnectorOutgoingConfiguration(config() .with("maxPendingMessagesAcrossPartitions", 10)); - PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver); + PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver, + UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = channel.getSubscriber(); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannelTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannelTest.java index 27e35e160d..0a5a9d9617 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannelTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannelTest.java @@ -17,6 +17,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck; import io.smallrye.reactive.messaging.pulsar.base.PulsarBaseTest; +import io.smallrye.reactive.messaging.pulsar.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -30,7 +31,8 @@ void testIncomingChannel() throws PulsarClientException { PulsarConnectorIncomingConfiguration ic = new PulsarConnectorIncomingConfiguration(config()); PulsarIncomingChannel channel = new PulsarIncomingChannel<>(client, vertx, Schema.JSON(Person.class), - new PulsarMessageAck.Factory(), new PulsarNack.Factory(), ic, configResolver); + new PulsarMessageAck.Factory(), new PulsarNack.Factory(), ic, configResolver, + UnsatisfiedInstance.instance()); Multi.createFrom().publisher(channel.getPublisher()) .subscribe().with(messages::add); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannelTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannelTest.java index c59015a693..a2ba92fc89 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannelTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannelTest.java @@ -10,10 +10,10 @@ import org.apache.pulsar.client.api.Schema; import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.pulsar.base.PulsarBaseTest; +import io.smallrye.reactive.messaging.pulsar.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; public class PulsarOutgoingChannelTest extends PulsarBaseTest { @@ -26,7 +26,8 @@ void testOutgoingChannel() throws PulsarClientException { PulsarConnectorOutgoingConfiguration oc = new PulsarConnectorOutgoingConfiguration(config() .with("maxPendingMessagesAcrossPartitions", 10)); - PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver); + PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.STRING, oc, configResolver, + UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = channel.getSubscriber(); @@ -50,7 +51,7 @@ void testOutgoingChannelJsonSchema() throws PulsarClientException { PulsarConnectorOutgoingConfiguration oc = new PulsarConnectorOutgoingConfiguration(config() .with("maxPendingMessagesAcrossPartitions", 10)); PulsarOutgoingChannel channel = new PulsarOutgoingChannel<>(client, Schema.JSON(Person.class), oc, - configResolver); + configResolver, UnsatisfiedInstance.instance()); Flow.Subscriber> subscriber = channel.getSubscriber(); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/SubscriptionTypeTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/SubscriptionTypeTest.java index a103d89c45..b058689b93 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/SubscriptionTypeTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/SubscriptionTypeTest.java @@ -26,6 +26,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck; import io.smallrye.reactive.messaging.pulsar.base.PulsarBaseTest; +import io.smallrye.reactive.messaging.pulsar.base.UnsatisfiedInstance; import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -339,7 +340,8 @@ void testKeySharedSubscription() throws PulsarClientException { private PulsarIncomingChannel createChannel(MapBasedConfig config) throws PulsarClientException { PulsarConnectorIncomingConfiguration ic = new PulsarConnectorIncomingConfiguration(config); return new PulsarIncomingChannel<>(client, vertx, Schema.INT32, - new PulsarMessageAck.Factory(), new PulsarNack.Factory(), ic, configResolver); + new PulsarMessageAck.Factory(), new PulsarNack.Factory(), ic, configResolver, + UnsatisfiedInstance.instance()); } MapBasedConfig config() { diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index cccdb95a22..23a76e9d95 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -27,6 +27,7 @@ import com.rabbitmq.client.impl.CredentialsProvider; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; @@ -160,6 +161,9 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Any Instance> configMaps; + @Inject + Instance openTelemetryInstance; + RabbitMQConnector() { // used for proxies } @@ -182,7 +186,7 @@ public class RabbitMQConnector implements InboundConnector, OutboundConnector, H @Override public Flow.Publisher> getPublisher(final Config config) { final RabbitMQConnectorIncomingConfiguration ic = new RabbitMQConnectorIncomingConfiguration(config); - IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel(this, ic); + IncomingRabbitMQChannel incoming = new IncomingRabbitMQChannel(this, ic, openTelemetryInstance); this.incomings.add(incoming); return incoming.getStream(); } @@ -204,7 +208,7 @@ public Flow.Publisher> getPublisher(final Config config) { @Override public Flow.Subscriber> getSubscriber(final Config config) { final RabbitMQConnectorOutgoingConfiguration oc = new RabbitMQConnectorOutgoingConfiguration(config); - OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc); + OutgoingRabbitMQChannel outgoing = new OutgoingRabbitMQChannel(this, oc, openTelemetryInstance); outgoings.add(outgoing); return outgoing.getSubscriber(); } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 6689d4246e..4fd2bd8966 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -15,6 +15,7 @@ import com.rabbitmq.client.AMQP; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.tuples.Tuple2; @@ -49,9 +50,10 @@ public class IncomingRabbitMQChannel { private final RabbitMQConnector connector; public IncomingRabbitMQChannel(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic) { + RabbitMQConnectorIncomingConfiguration ic, Instance openTelemetryInstance) { if (ic.getTracingEnabled()) { - instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector(); + instrumenter = RabbitMQOpenTelemetryInstrumenter + .createForConnector(openTelemetryInstance); } else { instrumenter = null; } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java index 50fdadbcba..50529ab74a 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.java @@ -6,8 +6,11 @@ import java.util.concurrent.Flow; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @@ -25,7 +28,8 @@ public class OutgoingRabbitMQChannel { private final ClientHolder holder; private volatile RabbitMQPublisher publisher; - public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc) { + public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOutgoingConfiguration oc, + Instance openTelemetryInstance) { this.config = oc; // Create a client @@ -50,7 +54,7 @@ public OutgoingRabbitMQChannel(RabbitMQConnector connector, RabbitMQConnectorOut .onFailure().recoverWithNull().memoize().indefinitely(); // Set up a sender based on the publisher we established above - final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender); + final RabbitMQMessageSender processor = new RabbitMQMessageSender(oc, getSender, openTelemetryInstance); // Return a SubscriberBuilder subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(t -> log.error(oc.getChannel(), t))); diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java index 68ccd03a05..96fa2b1050 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.java @@ -11,8 +11,11 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicReference; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.tuples.Tuple2; @@ -51,8 +54,9 @@ public class RabbitMQMessageSender implements Processor, Message>, * @param retrieveSender the underlying Vert.x {@link RabbitMQPublisher} */ public RabbitMQMessageSender( - final RabbitMQConnectorOutgoingConfiguration oc, - final Uni retrieveSender) { + RabbitMQConnectorOutgoingConfiguration oc, + Uni retrieveSender, + Instance openTelemetryInstance) { this.retrieveSender = retrieveSender; this.configuration = oc; this.configuredExchange = getExchangeName(oc); @@ -70,7 +74,7 @@ public RabbitMQMessageSender( } if (oc.getTracingEnabled()) { - instrumenter = RabbitMQOpenTelemetryInstrumenter.createForSender(); + instrumenter = RabbitMQOpenTelemetryInstrumenter.createForSender(openTelemetryInstance); } else { instrumenter = null; } diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java index bec2aa3704..a646a7bb6e 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/tracing/RabbitMQOpenTelemetryInstrumenter.java @@ -1,8 +1,10 @@ package io.smallrye.reactive.messaging.rabbitmq.tracing; +import jakarta.enterprise.inject.Instance; + import org.eclipse.microprofile.reactive.messaging.Message; -import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -18,22 +20,21 @@ protected RabbitMQOpenTelemetryInstrumenter(Instrumenter in this.instrumenter = instrumenter; } - public static RabbitMQOpenTelemetryInstrumenter createForSender() { - return create(true); + public static RabbitMQOpenTelemetryInstrumenter createForSender(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); } - public static RabbitMQOpenTelemetryInstrumenter createForConnector() { - return create(false); + public static RabbitMQOpenTelemetryInstrumenter createForConnector(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); } - private static RabbitMQOpenTelemetryInstrumenter create(boolean sender) { + private static RabbitMQOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean sender) { MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE; RabbitMQTraceAttributesExtractor rabbitMQAttributesExtractor = new RabbitMQTraceAttributesExtractor(); MessagingAttributesGetter messagingAttributesGetter = rabbitMQAttributesExtractor .getMessagingAttributesGetter(); - InstrumenterBuilder builder = Instrumenter.builder( - GlobalOpenTelemetry.get(), + InstrumenterBuilder builder = Instrumenter.builder(openTelemetry, "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));