Skip to content

Commit

Permalink
fix(doc): Transform Processor CDI Decorator into manual decorators
Browse files Browse the repository at this point in the history
We noticed flackiness with QuarkusTest and usage of `Processor` decorators.
It is generating randomly ClassNotFoundException.
It is happening only with Quarkus 3.8 LTS, recent versions like 3.15 or older like 3.2 do not present the issue.

The solution found is a fallback of sorts, by removing the usage of @decorator in the processor decorators.
Instead they are transformed in old-school, composition-design-pattern-inspired beans with a lombok delegate.
The generic type signature is removed so they can be transformed in Dependent beans.
Why's that? 1. A class with generics cannot be a bean, according to compilation errors, and 2. processors returned by the supplier need to be new instances everytime.
Priorities are kept, and used to resolve in order the beans, for a manual encapsulation achieved with a for loop on the list of beans.

And with that, the flackiness is gone.

Of course, those changes will not be propagated to main and the future 3.15 branch, where this flackiness is not an issue, AND the usage of CDI's Decorator can be kept.

The documentation is updated accordingly.

Fixes #117
  • Loading branch information
edeweerd1A committed Oct 8, 2024
1 parent 50cb5f9 commit ef494e9
Show file tree
Hide file tree
Showing 30 changed files with 769 additions and 181 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.quarkiverse.kafkastreamsprocessor.api.decorator.processor;

import org.apache.kafka.streams.processor.api.Processor;

import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Delegate;

public class AbstractProcessorDecorator implements Processor {
@Delegate
@Getter
@Setter
private Processor delegate;
}
54 changes: 44 additions & 10 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public class PojoProcessor extends ContextualProcessor<String, SamplePojo, Strin
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
<2> The handled value type, in this example, is a simple POJO, nothing fancy.
<3> Same POJO value in the _process()_ method.
Expand Down Expand Up @@ -423,6 +424,7 @@ public class PingerService {
}
}
----

<1> Define the method to retry with `org.eclipse.microprofile.faulttolerance.Retry` annotation

.application.properties
Expand Down Expand Up @@ -484,7 +486,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g
| The number of times a Punctuator's execution failed with an exception since the start of the microservice.
|===


.Dead Letter Queue Metrics
[options="header",cols="30%,20%,40%"]
|===
Expand All @@ -501,7 +502,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g
| The number of messages sent to global DLQ.
|===


== A comparison between Reactive Messaging Kafka and Kafka Streams

These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications.
Expand Down Expand Up @@ -536,6 +536,7 @@ The purpose of increasing concurrency is to be able to cope with streaming micro
return api.remoteCall();
}
----

<1> `@Incoming` is declaring this method as a subscriber for the channel named `ping-events`
<2> `@Outgoing` is declaring this method as a producer for the channel named `pong-events`
<3> `@io.smallrye.reactive.messaging.annotations.Blocking` Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important.
Expand Down Expand Up @@ -640,6 +641,7 @@ public class PingProcessor extends ContextualProcessor<String, Ping, String, Pin
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
<2> The definition and initialization of your state store.

Expand Down Expand Up @@ -688,9 +690,9 @@ The extension proposes some capabilities to customize more finely the behaviour
=== Processor decorator

The following decoration layer is already extensively used in this extension's source code and allows to use composition around the main processor class you have to define.
Example of a new decorator:
Depending on the version of Quarkus you are using, the pattern differs:

.ProcessorDecorator.java
.ProcessorDecorator.java with Quarkus 3.2 or 3.11.0+
[source,java]
----
@Decorator // <1>
Expand Down Expand Up @@ -719,6 +721,7 @@ public class ProcessorDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn,
}
}
----

<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI
<2> Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
Expand Down Expand Up @@ -759,23 +762,52 @@ The priority is to be set based on the priorities of the existing decorators whi
----
<3> The decorator should have the same generics declaration `<KIn, VIn, KOut, VOut>` as the `Processor<KIn, VIn, KOut, VOut>` interface that it implements
<4> Delegate reference to use when decorating methods.
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate
passthrough decorated methods that this Decorator class won't decorate.
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate passthrough decorated methods that this Decorator class won't decorate.
The selection is done through a blacklist of method signatures gathered in a private `Excludes` interface declared at the end of the class.
<5> Injection constructor which must have a delegate argument annotated with the `Delegate` annotation from CDI.
You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator.
<6> Example of decorated method, here the main `process` method of `Processor` API of Kafka Streams.

Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations.
.ProcessorDecorator.java for Quarkus 3.8 -> 3.10
[source,java]
----
@Dependent // <1>
@Priority(150) // <2>
public class ProcessorDecorator extends AbstractProcessorDecorator { // <3>
@Override
public void process(Record record) { // <4>
// use bean before
getDelegate().process(record);
// use bean after
}
}
----

<1> We have to mark the bean `Dependent` so it is instantiated at every use.
Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification.
<2> We add a `Priority`, with same pattern as a CDI decorator.
<3> We remove the generic types from the class signature, because CDI does not like generics in beans.
<4> Example of override of process method and call to underlying decorator.

Such a decorator will automatically been taken into account by CDI.
The priority will control at which point your decorator will be called among all other decorators.

[CAUTION]
====
We noticed with a new integration-test that is using a custom serde, that usage of custom CDI `Decorator` is causing microservices to randomly crash at startup.
This happens for specific versions of Quarkus.
Known impacted versions are 3.8.x, 3.9.x and 3.10.x.
The 3.2 LTS and upcoming 3.15 LTS versions do not suffer from this symptom.
The **only** solution found was to remove usage of `@Decorator` for `Processor` decorators for microservices based on Quarkus 3.8 LTS.
====

=== Producer interceptor

Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation.
It does not support CDI resolution.

This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI.
This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI.
Example of usage:

.MyProducerInterceptor.java
Expand All @@ -796,6 +828,7 @@ public class HeaderAddingProducerInterceptor implements ProducerOnSendIntercepto
}
}
----

<1> Producer interceptors are discovered by CDI by the `ApplicationScoped` annotation
<2> The interceptor class should extend `ProducerOnSendInterceptor`.
`ProducerOnSendInterceptor` extends `ProducerInterceptor<byte[], byte[]>` and overrides some of its methods with default implementations to exempt their forced implementations further down the line.
Expand Down Expand Up @@ -847,6 +880,7 @@ public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator
}
}
----

<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI
<2> Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
Expand Down Expand Up @@ -883,4 +917,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[]

== Configuration from other extension

include::includes/quarkus-other-extension-configurations.adoc[]
include::includes/quarkus-other-extension-configurations.adoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@
package io.quarkiverse.kafkastreamsprocessor.impl;

import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -60,6 +61,8 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor
*/
private final Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances;

private final Instance<AbstractProcessorDecorator> processorDecorators;

/**
* Injection constructor.
*
Expand All @@ -76,17 +79,20 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor
@Inject
public KStreamProcessorSupplier(Instance<Processor<?, ?, ?, ?>> kafka3BeanInstances,
Instance<org.apache.kafka.streams.processor.Processor<?, ?>> beanInstances,
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager) {
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager,
Instance<AbstractProcessorDecorator> processorDecorators) {
this.kafka3BeanInstances = kafka3BeanInstances;
this.beanInstances = beanInstances;
this.adapterInstances = adapterInstances;
this.processorDecorators = processorDecorators;

List<String> processorDecoratorNames = new ArrayList<>(processorDecorators.stream()
.map(Object::getClass)
.map(Class::getName)
.collect(Collectors.toUnmodifiableList()));
processorDecoratorNames = processorDecoratorNames.reversed();
log.info("Configured Processor decorators are in order: {}", String.join(", ", processorDecoratorNames));

log.info("Configured Processor decorators are in order: {}",
beanManager.resolveDecorators(Set.of(Processor.class))
.stream()
.map(Bean::getBeanClass)
.map(Class::getName)
.collect(Collectors.joining(", ")));
}

/**
Expand Down Expand Up @@ -131,7 +137,16 @@ public Processor<KIn, VIn, KOut, VOut> get() {
"Processors cannot have a scope other than @Dependant, since KafkaStreams implementation classes are not thread-safe");
}

return (Processor<KIn, VIn, KOut, VOut>) processor;
return wrapProcessor((Processor<KIn, VIn, KOut, VOut>) processor);
}

private Processor<KIn, VIn, KOut, VOut> wrapProcessor(Processor<KIn, VIn, KOut, VOut> processor) {
Processor<KIn, VIn, KOut, VOut> wrappedProcessor = processor;
for (AbstractProcessorDecorator decorator : processorDecorators) {
decorator.setDelegate(wrappedProcessor);
wrappedProcessor = decorator;
}
return wrappedProcessor;
}

private static boolean hasAnnotation(Object bean, Class<? extends Annotation> annotation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;

import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;

import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

/**
* This class is responsible to manage the lifecycle of {@link jakarta.enterprise.context.RequestScoped} beans. It
Expand All @@ -41,30 +40,26 @@
* <p>
* <strong>Warning:</strong> "Quarkus Tests" Junit extension is already managing the request scope on its own.
*/
@Decorator
//@Decorator
@Dependent
@Priority(ProcessorDecoratorPriorities.CDI_REQUEST_SCOPE)
@RequiredArgsConstructor(access = AccessLevel.MODULE)
public class CdiRequestContextDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
/**
* Injection point for composition
*/
@lombok.experimental.Delegate(excludes = Excludes.class)
private final Processor<KIn, VIn, KOut, VOut> delegate;

//@RequiredArgsConstructor(access = AccessLevel.MODULE)
public class CdiRequestContextDecorator extends AbstractProcessorDecorator {
/**
* The container object from Arc to inquire on request contextualization availability and activation
*/
private final ArcContainer container;

/**
* Constructor for injection of the delegate.
*
* @param delegate
* injection point for composition
*/
@Inject
public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor<KIn, VIn, KOut, VOut> delegate) {
this(delegate, Arc.container());
public CdiRequestContextDecorator() {
this(Arc.container());
}

public CdiRequestContextDecorator(ArcContainer container) {
this.container = container;
}

/**
Expand All @@ -74,20 +69,16 @@ public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor<KIn, VIn
* {@inheritDoc}
*/
@Override
public void process(Record<KIn, VIn> record) {
public void process(Record record) {
if (container.requestContext().isActive()) {
delegate.process(record);
getDelegate().process(record);
} else {
container.requestContext().activate();
try {
delegate.process(record);
getDelegate().process(record);
} finally {
container.requestContext().terminate();
}
}
}

private interface Excludes {
<KIn, VIn> void process(Record<KIn, VIn> record);
}
}
Loading

0 comments on commit ef494e9

Please sign in to comment.