From b0b608356da5b6b8df513330358f435bef693c6a Mon Sep 17 00:00:00 2001 From: edeweerd Date: Thu, 3 Oct 2024 14:14:50 +0200 Subject: [PATCH] fix(doc): Transform Processor CDI Decorator into manual decorators We noticed flackiness with QuarkusTest and usage of `Processor` decorators. It is generating randomly ClassNotFoundException. It is happening only with Quarkus 3.8 LTS, recent versions like 3.15 or older like 3.2 do not present the issue. The solution found is a fallback of sorts, by removing the usage of @Decorator in the processor decorators. Instead they are transformed in old-school, composition-design-pattern-inspired beans with a lombok delegate. The generic type signature is removed so they can be transformed in Dependent beans. Why's that? 1. A class with generics cannot be a bean, according to compilation errors, and 2. processors returned by the supplier need to be new instances everytime. Priorities are kept, and used to resolve in order the beans, for a manual encapsulation achieved with a for loop on the list of beans. And with that, the flackiness is gone. Of course, those changes will not be propagated to main and the future 3.15 branch, where this flackiness is not an issue, AND the usage of CDI's Decorator can be kept. The documentation is updated accordingly. Fixes #117 --- .../processor/AbstractProcessorDecorator.java | 55 ++++++ docs/modules/ROOT/pages/index.adoc | 56 +++++- .../impl/KStreamProcessorSupplier.java | 36 +++- .../processor/CdiRequestContextDecorator.java | 39 ++-- .../decorator/processor/DlqDecorator.java | 40 ++-- .../decorator/processor/MetricsDecorator.java | 24 +-- ...unctuatorDecorationProcessorDecorator.java | 22 +-- .../decorator/processor/RetryDecorator.java | 23 +-- .../decorator/processor/TracingDecorator.java | 45 ++--- .../CdiRequestContextDecoratorTest.java | 5 +- .../decorator/processor/DlqDecoratorTest.java | 8 +- .../processor/MetricsDecoratorTest.java | 5 +- ...uatorDecorationProcessorDecoratorTest.java | 5 +- .../processor/TracingDecoratorTest.java | 15 +- integration-tests/custom-serde/Readme.md | 18 ++ integration-tests/custom-serde/pom.xml | 184 ++++++++++++++++++ .../sample/customserde/CustomType.java | 33 ++++ .../CustomTypeConfigCustomizer.java | 46 +++++ .../customserde/CustomTypeDeserializer.java | 53 +++++ .../sample/customserde/CustomTypeSerde.java | 52 +++++ .../customserde/CustomTypeSerializer.java | 48 +++++ .../sample/customserde/HeaderDecorator.java | 47 +++++ .../sample/customserde/PingProcessor.java | 38 ++++ .../src/main/resources/application.properties | 5 + .../CustomTypeDeserializerTest.java | 42 ++++ .../customserde/CustomTypeSerializerTest.java | 42 ++++ .../customserde/PingProcessorQuarkusTest.java | 97 +++++++++ integration-tests/pom.xml | 1 + pom.xml | 62 +++--- 29 files changed, 965 insertions(+), 181 deletions(-) create mode 100644 api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java create mode 100644 integration-tests/custom-serde/Readme.md create mode 100644 integration-tests/custom-serde/pom.xml create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java create mode 100644 integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java create mode 100644 integration-tests/custom-serde/src/main/resources/application.properties create mode 100644 integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java create mode 100644 integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java create mode 100644 integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java new file mode 100644 index 0000000..fdc99b4 --- /dev/null +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java @@ -0,0 +1,55 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.api.decorator.processor; + +import org.apache.kafka.streams.processor.api.Processor; + +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Delegate; + +/** + * Base class for all processor decorators. + *

+ * If a decorator does not implement this abstract class, it will not be found by the + * KafkaClientSuppliedDecorator for composition. + *

+ *

+ * We remove the generic declaration from {@link Processor} because ArC complains about generics on class declaration of + * a bean. + *

+ *

+ * Class introduced in 2.0, for compatibility with Quarkus 3.8 random failure to start when using custom processor + * decorators. + *

+ * + * @deprecated It will be removed in 3.0, with the integration of Quarkus 3.15 where we will be able to go back to pure + * CDI decorators. + */ +@Deprecated(forRemoval = true, since = "2.0") +public abstract class AbstractProcessorDecorator implements Processor { + /** + * The decorated processor, holding either the next decorator layer or the final processor. + */ + @Delegate + @Getter + @Setter + private Processor delegate; +} diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index c515a81..3a1971e 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -309,6 +309,7 @@ public class PojoProcessor extends ContextualProcessor Your Processor is declared with the annotation as for a regular processor. <2> The handled value type, in this example, is a simple POJO, nothing fancy. <3> Same POJO value in the _process()_ method. @@ -423,6 +424,7 @@ public class PingerService { } } ---- + <1> Define the method to retry with `org.eclipse.microprofile.faulttolerance.Retry` annotation .application.properties @@ -484,7 +486,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g | The number of times a Punctuator's execution failed with an exception since the start of the microservice. |=== - .Dead Letter Queue Metrics [options="header",cols="30%,20%,40%"] |=== @@ -501,7 +502,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g | The number of messages sent to global DLQ. |=== - == A comparison between Reactive Messaging Kafka and Kafka Streams These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications. @@ -536,6 +536,7 @@ The purpose of increasing concurrency is to be able to cope with streaming micro return api.remoteCall(); } ---- + <1> `@Incoming` is declaring this method as a subscriber for the channel named `ping-events` <2> `@Outgoing` is declaring this method as a producer for the channel named `pong-events` <3> `@io.smallrye.reactive.messaging.annotations.Blocking` Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important. @@ -640,6 +641,7 @@ public class PingProcessor extends ContextualProcessor Your Processor is declared with the annotation as for a regular processor. <2> The definition and initialization of your state store. @@ -688,9 +690,9 @@ The extension proposes some capabilities to customize more finely the behaviour === Processor decorator The following decoration layer is already extensively used in this extension's source code and allows to use composition around the main processor class you have to define. -Example of a new decorator: +Depending on the version of Quarkus you are using, the pattern differs: -.ProcessorDecorator.java +.ProcessorDecorator.java with Quarkus 3.2 or 3.11.0+ [source,java] ---- @Decorator // <1> @@ -719,6 +721,7 @@ public class ProcessorDecorator implements Processor Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI <2> Force the instantiation of the decorator with the Priority annotation. Indeed, otherwise the decorator is not taken into account by Quarkus. @@ -759,23 +762,54 @@ The priority is to be set based on the priorities of the existing decorators whi ---- <3> The decorator should have the same generics declaration `` as the `Processor` interface that it implements <4> Delegate reference to use when decorating methods. -It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate -passthrough decorated methods that this Decorator class won't decorate. +It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate passthrough decorated methods that this Decorator class won't decorate. The selection is done through a blacklist of method signatures gathered in a private `Excludes` interface declared at the end of the class. <5> Injection constructor which must have a delegate argument annotated with the `Delegate` annotation from CDI. You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator. <6> Example of decorated method, here the main `process` method of `Processor` API of Kafka Streams. -Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations. +.ProcessorDecorator.java for Quarkus 3.8 -> 3.10 +[source,java] +---- +@Dependent // <1> +@Priority(150) // <2> +public class ProcessorDecorator extends AbstractProcessorDecorator { // <3> + @Override + public void process(Record record) { // <4> + // use bean before + getDelegate().process(record); + // use bean after + } +} +---- + +<1> We have to mark the bean `Dependent` so it is instantiated at every use. +Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification. +<2> We add a `Priority`, with same pattern as a CDI decorator. +<3> We remove the generic types from the class signature, because CDI does not like generics in beans. +<4> Example of override of process method and call to underlying decorator. + +Such a decorator will automatically been taken into account by CDI. The priority will control at which point your decorator will be called among all other decorators. +[CAUTION] +==== +We noticed with a new integration-test that is using a custom serde, that usage of custom CDI `Decorator` is causing microservices to randomly crash at startup. +This happens for specific versions of Quarkus. +Known impacted versions are 3.8.x, 3.9.x and 3.10.x. +The 3.2 LTS and upcoming 3.15 LTS versions do not suffer from this symptom. +The **only** solution found was to remove usage of `@Decorator` for `Processor` decorators for microservices based on Quarkus 3.8 LTS. +This change will be reverted in quarkus-kafka-streams-processor 3.0. +This is probably the https://github.com/quarkusio/quarkus/pull/41258[PR] on Quarkus side that has fixed the issue in Quarkus 3.11. +==== + === Producer interceptor -Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. +Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation. It does not support CDI resolution. -This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI. +This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI. Example of usage: .MyProducerInterceptor.java @@ -796,6 +830,7 @@ public class HeaderAddingProducerInterceptor implements ProducerOnSendIntercepto } } ---- + <1> Producer interceptors are discovered by CDI by the `ApplicationScoped` annotation <2> The interceptor class should extend `ProducerOnSendInterceptor`. `ProducerOnSendInterceptor` extends `ProducerInterceptor` and overrides some of its methods with default implementations to exempt their forced implementations further down the line. @@ -847,6 +882,7 @@ public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator } } ---- + <1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI <2> Force the instantiation of the decorator with the Priority annotation. Indeed, otherwise the decorator is not taken into account by Quarkus. @@ -883,4 +919,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[] == Configuration from other extension -include::includes/quarkus-other-extension-configurations.adoc[] \ No newline at end of file +include::includes/quarkus-other-extension-configurations.adoc[] diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamProcessorSupplier.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamProcessorSupplier.java index ec09ab4..f3af0b7 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamProcessorSupplier.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamProcessorSupplier.java @@ -20,15 +20,16 @@ package io.quarkiverse.kafkastreamsprocessor.impl; import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.Dependent; import jakarta.enterprise.context.RequestScoped; import jakarta.enterprise.inject.Instance; -import jakarta.enterprise.inject.spi.Bean; import jakarta.enterprise.inject.spi.BeanManager; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -36,6 +37,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import lombok.extern.slf4j.Slf4j; /** @@ -60,6 +62,8 @@ public class KStreamProcessorSupplier implements Processor */ private final Instance> adapterInstances; + private final Instance processorDecorators; + /** * Injection constructor. * @@ -76,17 +80,20 @@ public class KStreamProcessorSupplier implements Processor @Inject public KStreamProcessorSupplier(Instance> kafka3BeanInstances, Instance> beanInstances, - Instance> adapterInstances, BeanManager beanManager) { + Instance> adapterInstances, BeanManager beanManager, + Instance processorDecorators) { this.kafka3BeanInstances = kafka3BeanInstances; this.beanInstances = beanInstances; this.adapterInstances = adapterInstances; + this.processorDecorators = processorDecorators; + + List processorDecoratorNames = new ArrayList<>(processorDecorators.stream() + .map(Object::getClass) + .map(Class::getName) + .collect(Collectors.toUnmodifiableList())); + Collections.reverse(processorDecoratorNames); + log.info("Configured Processor decorators are in order: {}", String.join(", ", processorDecoratorNames)); - log.info("Configured Processor decorators are in order: {}", - beanManager.resolveDecorators(Set.of(Processor.class)) - .stream() - .map(Bean::getBeanClass) - .map(Class::getName) - .collect(Collectors.joining(", "))); } /** @@ -131,7 +138,16 @@ public Processor get() { "Processors cannot have a scope other than @Dependant, since KafkaStreams implementation classes are not thread-safe"); } - return (Processor) processor; + return wrapProcessor((Processor) processor); + } + + private Processor wrapProcessor(Processor processor) { + Processor wrappedProcessor = processor; + for (AbstractProcessorDecorator decorator : processorDecorators) { + decorator.setDelegate(wrappedProcessor); + wrappedProcessor = decorator; + } + return wrappedProcessor; } private static boolean hasAnnotation(Object bean, Class annotation) { diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java index fc92484..7781526 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java @@ -20,17 +20,16 @@ package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor; import jakarta.annotation.Priority; -import jakarta.decorator.Decorator; +import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import io.quarkus.arc.Arc; import io.quarkus.arc.ArcContainer; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; /** * This class is responsible to manage the lifecycle of {@link jakarta.enterprise.context.RequestScoped} beans. It @@ -41,16 +40,11 @@ *

* Warning: "Quarkus Tests" Junit extension is already managing the request scope on its own. */ -@Decorator +//@Decorator +@Dependent @Priority(ProcessorDecoratorPriorities.CDI_REQUEST_SCOPE) -@RequiredArgsConstructor(access = AccessLevel.MODULE) -public class CdiRequestContextDecorator implements Processor { - /** - * Injection point for composition - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; - +//@RequiredArgsConstructor(access = AccessLevel.MODULE) +public class CdiRequestContextDecorator extends AbstractProcessorDecorator { /** * The container object from Arc to inquire on request contextualization availability and activation */ @@ -58,13 +52,14 @@ public class CdiRequestContextDecorator implements Process /** * Constructor for injection of the delegate. - * - * @param delegate - * injection point for composition */ @Inject - public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor delegate) { - this(delegate, Arc.container()); + public CdiRequestContextDecorator() { + this(Arc.container()); + } + + public CdiRequestContextDecorator(ArcContainer container) { + this.container = container; } /** @@ -74,20 +69,16 @@ public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor record) { + public void process(Record record) { if (container.requestContext().isActive()) { - delegate.process(record); + getDelegate().process(record); } else { container.requestContext().activate(); try { - delegate.process(record); + getDelegate().process(record); } finally { container.requestContext().terminate(); } } } - - private interface Excludes { - void process(Record record); - } } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java index a13b62d..f3a6eaf 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java @@ -23,8 +23,8 @@ import java.util.Set; import jakarta.annotation.Priority; -import jakarta.decorator.Decorator; import jakarta.decorator.Delegate; +import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.kafka.common.KafkaException; @@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer; import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler; @@ -53,14 +54,10 @@ * Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees * (at least once / exactly once). */ -@Decorator +//@Decorator @Priority(ProcessorDecoratorPriorities.DLQ) -public class DlqDecorator implements Processor { - /** - * Inject point for composition - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; +@Dependent +public class DlqDecorator extends AbstractProcessorDecorator { /** * A set of sink names that are involved in the business logic. @@ -89,11 +86,10 @@ public class DlqDecorator implements Processor context; + private ProcessorContext context; - DlqDecorator(Processor delegate, Set functionalSinks, DlqMetadataHandler dlqMetadataHandler, + DlqDecorator(Set functionalSinks, DlqMetadataHandler dlqMetadataHandler, KafkaStreamsProcessorMetrics metrics, boolean activated) { - this.delegate = delegate; this.functionalSinks = functionalSinks; this.dlqMetadataHandler = dlqMetadataHandler; this.metrics = metrics; @@ -103,8 +99,6 @@ public class DlqDecorator implements Processor implements Processor delegate, + public DlqDecorator( SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler, KafkaStreamsProcessorMetrics metrics, KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config - this(delegate, sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics, + this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics, ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(), kStreamsProcessorConfig.dlq().topic())); } @@ -135,12 +129,12 @@ public DlqDecorator(@Delegate Processor delegate, * {@inheritDoc} */ @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { if (activated) { - this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks); - delegate.init(this.context); + this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks); + getDelegate().init(this.context); } else { - delegate.init(context); + getDelegate().init(context); } } @@ -154,10 +148,10 @@ public void init(final ProcessorContext context) { * {@inheritDoc} */ @Override - public void process(Record record) { + public void process(Record record) { if (activated) { try { - delegate.process(record); + getDelegate().process(record); } catch (KafkaException e) { // Do not forward to DLQ throw e; @@ -166,14 +160,14 @@ public void process(Record record) { if (recordMetadata.isPresent()) { dlqMetadataHandler.addMetadata(record.headers(), recordMetadata.get().topic(), recordMetadata.get().partition(), e); - context.forward((Record) record, TopologyProducer.DLQ_SINK_NAME); + context.forward(record, TopologyProducer.DLQ_SINK_NAME); // Re-throw so the exception gets logged metrics.microserviceDlqSentCounter().increment(); throw e; } } } else { - delegate.process(record); + getDelegate().process(record); } } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java index 0e73b00..3f44f62 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java @@ -21,27 +21,23 @@ import jakarta.annotation.Priority; import jakarta.decorator.Decorator; -import jakarta.decorator.Delegate; +import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics; /** * Decorator to enrich Kafka Streams metrics with a counter of exception raised by {@link Processor#process(Record)}. */ -@Decorator +//@Decorator @Priority(ProcessorDecoratorPriorities.METRICS) -public class MetricsDecorator implements Processor { - /** - * Injection point for composition. - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; - +@Dependent +public class MetricsDecorator extends AbstractProcessorDecorator { /** * Counter of exception raised by {@link Processor#process(Record)}. */ @@ -50,15 +46,11 @@ public class MetricsDecorator implements Processor delegate, - KafkaStreamsProcessorMetrics metrics) { - this.delegate = delegate; + public MetricsDecorator(KafkaStreamsProcessorMetrics metrics) { this.metrics = metrics; } @@ -70,9 +62,9 @@ public MetricsDecorator(@Delegate Processor delegate, * {@inheritDoc} */ @Override - public void process(Record record) { + public void process(Record record) { try { - delegate.process(record); + getDelegate().process(record); } catch (Exception e) { // NOSONAR: Catching any error metrics.processorErrorCounter().increment(); throw e; diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java index 5e1c1e0..cc4fa6c 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java @@ -24,6 +24,7 @@ import jakarta.annotation.Priority; import jakarta.decorator.Decorator; import jakarta.decorator.Delegate; +import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import io.quarkiverse.kafkastreamsprocessor.api.decorator.punctuator.DecoratedPunctuator; import lombok.RequiredArgsConstructor; @@ -44,15 +46,10 @@ * * @see PunctuatorDecorationProcessorContextDecorator */ -@Decorator +//@Decorator @Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION) -public class PunctuatorDecorationProcessorDecorator implements Processor { - /** - * Injection point for composition - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; - +@Dependent +public class PunctuatorDecorationProcessorDecorator extends AbstractProcessorDecorator { /** * List of all the {@link Punctuator} decorators defined in this library and potential extensions made with the API. */ @@ -61,16 +58,13 @@ public class PunctuatorDecorationProcessorDecorator implem /** * Injection constructor. * - * @param delegate - * injection point for composition * @param decoratedPunctuators * the list of all {@link Punctuator} decorators defined in this library and potential extensions made with * the API. */ @Inject - public PunctuatorDecorationProcessorDecorator(@Delegate Processor delegate, + public PunctuatorDecorationProcessorDecorator( Instance decoratedPunctuators) { - this.delegate = delegate; this.decoratedPunctuators = decoratedPunctuators; } @@ -84,8 +78,8 @@ public PunctuatorDecorationProcessorDecorator(@Delegate Processor context) { - delegate.init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context, + public void init(ProcessorContext context) { + getDelegate().init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context, decoratedPunctuators)); } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java index 92d75a7..5b79a55 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java @@ -21,13 +21,14 @@ import jakarta.annotation.Priority; import jakarta.decorator.Decorator; -import jakarta.decorator.Delegate; +import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.eclipse.microprofile.faulttolerance.Retry; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import lombok.extern.slf4j.Slf4j; @@ -35,15 +36,10 @@ * Decorate a {@link Processor#process} with the {@link Retry} fault tolerance annotation. */ @Slf4j -@Decorator +//@Decorator @Priority(ProcessorDecoratorPriorities.RETRY) -public class RetryDecorator implements Processor { - /** - * Injection point for composition - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; - +@Dependent +public class RetryDecorator extends AbstractProcessorDecorator { /** * The delegate object that has the processor method with the {@link Retry} annotation. *

@@ -58,15 +54,12 @@ public class RetryDecorator implements Processor delegate, + public RetryDecorator( RetryDecoratorDelegate retryDecoratorDelegate) { - this.delegate = delegate; this.retryDecoratorDelegate = retryDecoratorDelegate; } @@ -78,9 +71,9 @@ public RetryDecorator(@Delegate Processor delegate, * {@inheritDoc} */ @Override - public void process(Record record) { + public void process(Record record) { try { - retryDecoratorDelegate.retryableProcess(delegate, record); + retryDecoratorDelegate.retryableProcess(getDelegate(), record); } catch (RuntimeException e) { log.info("An exception that has been raised by the processor will not be retried.\n" + "Possible causes:\n" diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java index 5c05333..5597a68 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java @@ -31,7 +31,7 @@ import jakarta.annotation.Priority; import jakarta.decorator.Decorator; -import jakarta.decorator.Delegate; +import jakarta.enterprise.context.Dependent; import jakarta.inject.Inject; import org.apache.kafka.common.KafkaException; @@ -54,13 +54,12 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; /** @@ -72,16 +71,10 @@ * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}, which executes on the polling thread. */ @Slf4j -@Decorator +//@Decorator @Priority(ProcessorDecoratorPriorities.TRACING) -@RequiredArgsConstructor(access = AccessLevel.MODULE) -public class TracingDecorator implements Processor { - /** - * Injection point for composition - */ - @lombok.experimental.Delegate(excludes = Excludes.class) - private final Processor delegate; - +@Dependent +public class TracingDecorator extends AbstractProcessorDecorator { /** * The {@link OpenTelemetry} configured by Quarkus */ @@ -115,13 +108,11 @@ public class TracingDecorator implements Processor context; + private ProcessorContext context; /** * Injection constructor. * - * @param delegate - * injection point for composition * @param openTelemetry * The {@link OpenTelemetry} configured by Quarkus * @param textMapGetter @@ -134,14 +125,24 @@ public class TracingDecorator implements Processor delegate, OpenTelemetry openTelemetry, + public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer, TopologyConfigurationImpl configuration) { - this(delegate, openTelemetry, textMapGetter, textMapSetter, tracer, + this(openTelemetry, textMapGetter, textMapSetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer()); } + public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, + Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) { + this.openTelemetry = openTelemetry; + this.textMapGetter = textMapGetter; + this.textMapSetter = textMapSetter; + this.tracer = tracer; + this.applicationName = applicationName; + this.jsonPrinter = jsonPrinter; + } + /** * Init just to capture the reference to {@link ProcessorContext}. *

@@ -150,8 +151,8 @@ public TracingDecorator(@Delegate Processor delegate, Open * {@inheritDoc} */ @Override - public void init(final ProcessorContext context) { - delegate.init(context); + public void init(final ProcessorContext context) { + getDelegate().init(context); this.context = context; } @@ -164,7 +165,7 @@ public void init(final ProcessorContext context) { * {@inheritDoc} */ @Override - public void process(Record record) { + public void process(Record record) { SpanBuilder spanBuilder = tracer.spanBuilder(applicationName); final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator(); @@ -194,7 +195,7 @@ public void process(Record record) { // the headers in the incoming message so when an outgoing message is produced with the copied // header values it already has the span id from this new child span propagator.inject(Context.current(), record.headers(), textMapSetter); - delegate.process(record); + getDelegate().process(record); span.setStatus(StatusCode.OK); } catch (KafkaException e) { // we got a Kafka exception, we record the exception in the span, log but rethrow the exception @@ -216,7 +217,7 @@ public void process(Record record) { } } - void logInputMessageMetadata(Record record) { + void logInputMessageMetadata(Record record) { if (log.isDebugEnabled()) { Map headers = toMap(record.headers()); LoggedRecord.LoggedRecordBuilder builder = LoggedRecord.builder() diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java index 3264e55..2114411 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java @@ -41,7 +41,7 @@ @ExtendWith(MockitoExtension.class) class CdiRequestContextDecoratorTest { - CdiRequestContextDecorator decorator; + CdiRequestContextDecorator decorator; @Mock ArcContainer container; @@ -55,7 +55,8 @@ class CdiRequestContextDecoratorTest { @BeforeEach public void setup() { when(container.requestContext()).thenReturn(requestContext); - decorator = new CdiRequestContextDecorator<>(processor, container); + decorator = new CdiRequestContextDecorator(container); + decorator.setDelegate(processor); } @Test diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java index 6c6059a..b0c3074 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java @@ -62,7 +62,7 @@ public class DlqDecoratorTest { private static final String RECORD_KEY = "key"; private static final String RECORD_VALUE = "value"; - DlqDecorator decorator; + DlqDecorator decorator; DlqProcessorContextDecorator contextDecorator; @@ -86,7 +86,8 @@ public class DlqDecoratorTest { @BeforeEach public void setUp() { - decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true); + decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true); + decorator.setDelegate(kafkaProcessor); decorator.init(context); headers = new RecordHeaders(); record = new Record<>(RECORD_KEY, RECORD_VALUE, 0L, headers); @@ -131,7 +132,8 @@ public void shouldForwardKeyValueToAllSinks() { @Test public void shouldDoNothingIfDeactivated() { - decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false); + decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false); + decorator.setDelegate(kafkaProcessor); decorator.init(context); decorator.process(record); diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java index f8ce1a2..bbef50b 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java @@ -48,14 +48,15 @@ public class MetricsDecoratorTest { Ping inputMessage = Ping.newBuilder().setMessage("message").build(); - MetricsDecorator processorDecorator; + MetricsDecorator processorDecorator; @Mock ArcContainer arcContainer; @BeforeEach public void setUp() { - processorDecorator = new MetricsDecorator<>(kafkaProcessor, metrics); + processorDecorator = new MetricsDecorator(metrics); + processorDecorator.setDelegate(kafkaProcessor); } @Test diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java index 9a5ffa5..ca7e4aa 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java @@ -68,9 +68,8 @@ public void process(Record record) { } }; - PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>( - processor, - decoratedPunctuators); + PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator(decoratedPunctuators); + decorator.setDelegate(processor); decorator.init(context); decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null)); decorator.close(); diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java index 930b32f..6e137cc 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java @@ -114,7 +114,7 @@ public class TracingDecoratorTest { @RegisterExtension static final OpenTelemetryExtension otel = OpenTelemetryExtension.create(); - TracingDecorator decorator; + TracingDecorator decorator; @Mock JsonFormat.Printer jsonPrinter; @@ -140,8 +140,9 @@ public void setUp() { rootLogger.addHandler(inMemoryLogHandler); rootLogger.setLevel(Level.DEBUG); when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class); - decorator = new TracingDecorator<>(kafkaProcessor, otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, + decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); } @@ -206,8 +207,9 @@ public void shouldCleanMDCAndScopeInCaseOfException() { .setMessage("blabla") .build(), 0L, headers); - decorator = new TracingDecorator<>(new ThrowExceptionProcessor(), otel.getOpenTelemetry(), kafkaTextMapGetter, + decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator.setDelegate(new ThrowExceptionProcessor()); decorator.init(processorContext); assertDoesNotThrow(() -> decorator.process(record)); @@ -307,9 +309,9 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable { void shouldLogRawToStringValueIfNotProtobuf() throws Throwable { Processor kafkaProcessor = mock(Processor.class); ProcessorContext processorContext = mock(ProcessorContext.class); - TracingDecorator decorator = new TracingDecorator<>( - kafkaProcessor, GlobalOpenTelemetry.get(), kafkaTextMapGetter, + TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter, kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); RuntimeException exception = new TestException(); @@ -334,9 +336,10 @@ void shouldPropagateOpentelemetryW3CBaggage() { .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) .build()) { - decorator = new TracingDecorator<>(new LogOpentelemetryBaggageProcessor(), openTelemetryWithBaggageSdk, + decorator = new TracingDecorator(openTelemetryWithBaggageSdk, kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME, jsonPrinter); + decorator.setDelegate(new LogOpentelemetryBaggageProcessor()); decorator.init(processorContext); decorator.process(record); diff --git a/integration-tests/custom-serde/Readme.md b/integration-tests/custom-serde/Readme.md new file mode 100644 index 0000000..d1c497b --- /dev/null +++ b/integration-tests/custom-serde/Readme.md @@ -0,0 +1,18 @@ +# Sample with multiple TopologyConfigCustomizers + +EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/) + +## Introduction + +This module showcases the implementation of a +[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances. + +## Quarkus Dev mode + +The sample is fully working with the Quarkus Dev mode that allows to +modify the code and have a hot replacement when the file is saved. It +can be used also to launch the application. + +``` +$> mvn clean install quarkus:dev +``` diff --git a/integration-tests/custom-serde/pom.xml b/integration-tests/custom-serde/pom.xml new file mode 100644 index 0000000..cecafd2 --- /dev/null +++ b/integration-tests/custom-serde/pom.xml @@ -0,0 +1,184 @@ + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-integration-tests + 2.0.0-SNAPSHOT + + 4.0.0 + + quarkus-kafka-streams-processor-custom-serde-sample + + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-bom + ${project.version} + pom + import + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-bom + ${project.version} + pom + import + + + + + + + + jakarta.inject + jakarta.inject-api + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + org.eclipse.microprofile.config + microprofile-config-api + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + io.quarkus + quarkus-kafka-streams + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + + + io.quarkus + quarkus-smallrye-health + runtime + + + io.quarkus + quarkus-micrometer-registry-prometheus + runtime + + + io.quarkus + quarkus-opentelemetry + runtime + + + + org.apache.kafka + kafka-streams + + + org.apache.kafka + kafka-clients + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-api + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-protobuf-binding + ${project.version} + + + de.sven-jacobs + loremipsum + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-api + test + + + io.quarkus + quarkus-junit5 + test + + + io.quarkus + quarkus-test-common + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-streams-test-utils + test + + + org.awaitility + awaitility + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + com.github.daniel-shuy + kafka-protobuf-serde + test + + + io.rest-assured + rest-assured + test + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + org.hamcrest + hamcrest + test + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-framework + test + + + diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java new file mode 100644 index 0000000..91f8d93 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java @@ -0,0 +1,33 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +public class CustomType { + private int value; +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java new file mode 100644 index 0000000..1df5867 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java @@ -0,0 +1,46 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; + +import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration; +import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; + +@Dependent +@Priority(1) +public class CustomTypeConfigCustomizer implements ConfigurationCustomizer { + private final CustomTypeSerde serde; + private final CustomTypeSerializer serializer; + + @Inject + public CustomTypeConfigCustomizer(CustomTypeSerde serde, CustomTypeSerializer serializer) { + this.serde = serde; + this.serializer = serializer; + } + + @Override + public void fillConfiguration(Configuration configuration) { + configuration.setSourceValueSerde(serde); + configuration.setSinkValueSerializer(serializer); + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java new file mode 100644 index 0000000..7ec4543 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java @@ -0,0 +1,53 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import java.nio.charset.StandardCharsets; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +@ApplicationScoped +@Slf4j +public class CustomTypeDeserializer implements Deserializer { + private final ObjectMapper objectMapper; + + @Inject + public CustomTypeDeserializer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public CustomType deserialize(String topic, byte[] data) { + try { + CustomType readValue = objectMapper.readValue(data, CustomType.class); + return new CustomType(readValue.getValue() - CustomTypeSerde.SHIFT); + } catch (Exception e) { + log.error("Could not deserialize: {}", new String(data, StandardCharsets.UTF_8)); + throw new RuntimeException("Error deserializing CustomType", e); + } + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java new file mode 100644 index 0000000..776252f --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java @@ -0,0 +1,52 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +@ApplicationScoped +public class CustomTypeSerde implements Serde { + static final int SHIFT = 10; + + private final CustomTypeSerializer customTypeSerializer; + + private final CustomTypeDeserializer customTypeDeserializer; + + @Inject + public CustomTypeSerde(CustomTypeSerializer customTypeSerializer, CustomTypeDeserializer customTypeDeserializer) { + this.customTypeSerializer = customTypeSerializer; + this.customTypeDeserializer = customTypeDeserializer; + } + + @Override + public Serializer serializer() { + return customTypeSerializer; + } + + @Override + public Deserializer deserializer() { + return customTypeDeserializer; + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java new file mode 100644 index 0000000..8a3791a --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java @@ -0,0 +1,48 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@ApplicationScoped +public class CustomTypeSerializer implements Serializer { + private final ObjectMapper objectMapper; + + @Inject + public CustomTypeSerializer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public byte[] serialize(String topic, CustomType data) { + CustomType valueToSerialize = new CustomType(data.getValue() + CustomTypeSerde.SHIFT); + try { + return objectMapper.writeValueAsBytes(valueToSerialize); + } catch (Exception e) { + throw new RuntimeException("Error serializing CustomType", e); + } + } + +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java new file mode 100644 index 0000000..ea5507f --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java @@ -0,0 +1,47 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import java.nio.charset.StandardCharsets; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.Dependent; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; + +@Dependent +@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION + 2) +public class HeaderDecorator extends AbstractProcessorDecorator { + @Override + public void process(Record record) { + Header header = record.headers().lastHeader("custom-header"); + if (header != null) { + String value = new String(header.value(), StandardCharsets.UTF_8); + if (value.contains("error")) { + throw new IllegalStateException("Error in header"); + } + } + getDelegate().process(record); + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java new file mode 100644 index 0000000..bf843ba --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java @@ -0,0 +1,38 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Processor +@RequiredArgsConstructor +public class PingProcessor extends ContextualProcessor { + @Override + public void process(Record ping) { + log.info("Process the custom type with value: {}", ping.value().getValue()); + context().forward(ping); + } +} diff --git a/integration-tests/custom-serde/src/main/resources/application.properties b/integration-tests/custom-serde/src/main/resources/application.properties new file mode 100644 index 0000000..c989477 --- /dev/null +++ b/integration-tests/custom-serde/src/main/resources/application.properties @@ -0,0 +1,5 @@ +kafkastreamsprocessor.input.topic=ping-events +kafkastreamsprocessor.output.topic=pong-events +quarkus.kafka-streams.bootstrap-servers=localhost:9092 +quarkus.kafka-streams.topics=ping-events,pong-events +kafka-streams.producer.linger.ms=0 diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java new file mode 100644 index 0000000..d9e223c --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java @@ -0,0 +1,42 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class CustomTypeDeserializerTest { + + CustomTypeDeserializer deserializer = new CustomTypeDeserializer(new ObjectMapper()); + + @Test + public void testDeserialize() { + byte[] data = "{\"value\":11}".getBytes(); + + Object customType = deserializer.deserialize("topic", data); + + assertThat(((CustomType) customType).getValue(), equalTo(1)); + } + +} diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java new file mode 100644 index 0000000..ae2a9bc --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java @@ -0,0 +1,42 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class CustomTypeSerializerTest { + CustomTypeSerializer serializer = new CustomTypeSerializer(new ObjectMapper()); + + @Test + public void testSerialize() { + CustomType customType = new CustomType(1); + + byte[] serialized = serializer.serialize("topic", customType); + + assertThat(new String(serialized, StandardCharsets.UTF_8), equalTo("{\"value\":11}")); + } +} diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java new file mode 100644 index 0000000..be7beb6 --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java @@ -0,0 +1,97 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Durations; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class PingProcessorQuarkusTest { + @ConfigProperty(name = "kafka.bootstrap.servers") + String kafkaBootstrapServers; + + String senderTopic = "ping-events"; + + String consumerTopic = "pong-events"; + + KafkaProducer producer; + + KafkaConsumer consumer; + + @Inject + CustomTypeSerde customTypeSerde; + + @BeforeEach + public void setup() throws Exception { + Map consumerProps = KafkaTestUtils.consumerProps(kafkaBootstrapServers, "test", "true"); + consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), customTypeSerde.deserializer()); + consumer.subscribe(List.of(consumerTopic)); + + Map producerProps = KafkaTestUtils.producerProps(kafkaBootstrapServers); + producer = new KafkaProducer<>(producerProps, new StringSerializer(), customTypeSerde.serializer()); + } + + @AfterEach + public void tearDown() { + producer.close(); + consumer.close(); + } + + @Test + public void testCount() { + producer.send(new ProducerRecord<>(senderTopic, "1", new CustomType(1))); + producer.flush(); + ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, consumerTopic, + Durations.FIVE_SECONDS); + assertThat(((CustomType) record.value()).getValue(), equalTo(1)); + } + + @Test + public void testHeaderError() { + producer.send(new ProducerRecord<>(senderTopic, 0, "1", new CustomType(1), + new RecordHeaders().add("custom-header", "error".getBytes(StandardCharsets.UTF_8)))); + producer.flush(); + assertThrows(IllegalStateException.class, + () -> KafkaTestUtils.getSingleRecord(consumer, consumerTopic, Durations.FIVE_SECONDS)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index f32c922..2b85ec8 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -15,6 +15,7 @@ multioutput simple stateful + custom-serde diff --git a/pom.xml b/pom.xml index a8d01ab..a83a5ac 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ 17 UTF-8 UTF-8 - 3.8.3 + 3.8.6 3.24.1 @@ -98,37 +98,37 @@ LF - - - - - org.codehaus.mojo - license-maven-plugin - 2.4.0 - - - add-license-header - - check-file-header - - process-sources - - - - false - apache_v2 - 2024 - Amadeus s.a.s. - Quarkus Kafka Streams Processor - - **/*.java - - - **/*$$*.java - - - + + + + org.codehaus.mojo + license-maven-plugin + 2.4.0 + + + add-license-header + + check-file-header + + process-sources + + + + false + apache_v2 + 2024 + Amadeus s.a.s. + Quarkus Kafka Streams Processor + + **/*.java + + + **/*$$*.java + + + +