Skip to content

Commit

Permalink
chore(release): Release 4.0.1
Browse files Browse the repository at this point in the history
Changing version to 4.0.0-SNAPSHOT to reflect the update to Quarkus 3.15 and the creation of 3.8 with the fix of Decorators.
Here the Decorator fix is restored, as the flakiness can still be seen on [master](https://github.com/quarkiverse/quarkus-kafka-streams-processor/actions/runs/11296240001/job/31420666727).
The original PR was [here](#118).

The documentation is updated to only talk about ProcessorDecorator using the AbstractProcessorDecorator abstract class.
  • Loading branch information
edeweerd1A committed Oct 14, 2024
1 parent c03349c commit e1a5aa0
Show file tree
Hide file tree
Showing 37 changed files with 251 additions and 278 deletions.
4 changes: 2 additions & 2 deletions .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
release:
current-version: "3.0.1"
next-version: "3.0.0-SNAPSHOT"
current-version: "4.0.1"
next-version: "4.0.0-SNAPSHOT"
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.api.decorator.processor;

import org.apache.kafka.streams.processor.api.Processor;

import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Delegate;

/**
* Base class for all processor decorators.
* <p>
* If a decorator does not implement this abstract class, it will not be found by the
* <code>KafkaClientSuppliedDecorator</code> for composition.
* </p>
* <p>
* We remove the generic declaration from {@link Processor} because ArC complains about generics on class declaration of
* a bean.
* </p>
* <p>
* Class introduced in 2.0, for compatibility with Quarkus 3.8 random failure to start when using custom processor
* decorators.
* </p>
*
* @deprecated It will be removed in 3.0, with the integration of Quarkus 3.15 where we will be able to go back to pure
* CDI decorators.
*/
@Deprecated(forRemoval = true, since = "2.0")
public abstract class AbstractProcessorDecorator implements Processor {
/**
* The decorated processor, holding either the next decorator layer or the final processor.
*/
@Delegate
@Getter
@Setter
private Processor delegate;
}
4 changes: 2 additions & 2 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
<packaging>pom</packaging>
<properties>
<quarkus-kafka-streams-processor.version>3.0.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
<quarkus-kafka-streams-processor.version>4.0.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion bom/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-deployment</artifactId>
<dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 2.0.1
:project-version: 3.0.1

:examples-dir: ./../examples/
71 changes: 9 additions & 62 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -690,41 +690,25 @@ The extension proposes some capabilities to customize more finely the behaviour
=== Processor decorator

The following decoration layer is already extensively used in this extension's source code and allows to use composition around the main processor class you have to define.
Depending on the version of Quarkus you are using, the pattern differs:
The pattern to implement is:

.ProcessorDecorator.java with Quarkus 3.2 or 3.11.0+
[source,java]
----
@Decorator // <1>
@Dependent // <1>
@Priority(150) // <2>
public class ProcessorDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { // <3>
@lombok.experimental.Delegate(excludes = Excludes.class)
private final Processor<KIn, VIn, KOut, VOut> delegate; // <4>
private final MyBean bean;
@Inject
public TracingDecorator(@Delegate Processor<KIn, VIn, KOut, VOut> delegate, MyBean bean) { // <5>
this.delegate = delegate;
this.bean = bean;
}
public class ProcessorDecorator extends AbstractProcessorDecorator { // <3>
@Override
public void process(Record<KIn, VIn> record) { // <6>
public void process(Record record) { // <4>
// use bean before
delegate.process(record);
getDelegate().process(record);
// use bean after
}
private interface Excludes {
<KIn, VIn> void process(Record<KIn, VIn> record);
}
}
----

<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI
<2> Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
<1> We have to mark the bean `Dependent` so it is instantiated at every use.
Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification.
<2> We add a `Priority`, to control exactly the order of decorators.
The priority is to be set based on the priorities of the existing decorators which are:
+
.ProcessorDecoratorPriorities.java
Expand Down Expand Up @@ -760,49 +744,12 @@ The priority is to be set based on the priorities of the existing decorators whi
*/
public static final int RETRY = 600;
----
<3> The decorator should have the same generics declaration `<KIn, VIn, KOut, VOut>` as the `Processor<KIn, VIn, KOut, VOut>` interface that it implements
<4> Delegate reference to use when decorating methods.
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate passthrough decorated methods that this Decorator class won't decorate.
The selection is done through a blacklist of method signatures gathered in a private `Excludes` interface declared at the end of the class.
<5> Injection constructor which must have a delegate argument annotated with the `Delegate` annotation from CDI.
You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator.
<6> Example of decorated method, here the main `process` method of `Processor` API of Kafka Streams.

.ProcessorDecorator.java for Quarkus 3.8 -> 3.10
[source,java]
----
@Dependent // <1>
@Priority(150) // <2>
public class ProcessorDecorator extends AbstractProcessorDecorator { // <3>
@Override
public void process(Record record) { // <4>
// use bean before
getDelegate().process(record);
// use bean after
}
}
----

<1> We have to mark the bean `Dependent` so it is instantiated at every use.
Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification.
<2> We add a `Priority`, with same pattern as a CDI decorator.
<3> We remove the generic types from the class signature, because CDI does not like generics in beans.
<4> Example of override of process method and call to underlying decorator.

Such a decorator will automatically been taken into account by CDI.
The priority will control at which point your decorator will be called among all other decorators.

[CAUTION]
====
We noticed with a new integration-test that is using a custom serde, that usage of custom CDI `Decorator` is causing microservices to randomly crash at startup.
This happens for specific versions of Quarkus.
Known impacted versions are 3.8.x, 3.9.x and 3.10.x.
The 3.2 LTS and upcoming 3.15 LTS versions do not suffer from this symptom.
The **only** solution found was to remove usage of `@Decorator` for `Processor` decorators for microservices based on Quarkus 3.8 LTS.
This change will be reverted in quarkus-kafka-streams-processor 3.0.
This is probably the https://github.com/quarkusio/quarkus/pull/41258[PR] on Quarkus side that has fixed the issue in Quarkus 3.11.
====

=== Producer interceptor

Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
Expand Down Expand Up @@ -919,4 +866,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[]

== Configuration from other extension

include::includes/quarkus-other-extension-configurations.adoc[]
include::includes/quarkus-other-extension-configurations.adoc[]
2 changes: 1 addition & 1 deletion docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@
package io.quarkiverse.kafkastreamsprocessor.impl;

import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -60,6 +62,8 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor
*/
private final Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances;

private final Instance<AbstractProcessorDecorator> processorDecorators;

/**
* Injection constructor.
*
Expand All @@ -76,17 +80,20 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor
@Inject
public KStreamProcessorSupplier(Instance<Processor<?, ?, ?, ?>> kafka3BeanInstances,
Instance<org.apache.kafka.streams.processor.Processor<?, ?>> beanInstances,
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager) {
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager,
Instance<AbstractProcessorDecorator> processorDecorators) {
this.kafka3BeanInstances = kafka3BeanInstances;
this.beanInstances = beanInstances;
this.adapterInstances = adapterInstances;
this.processorDecorators = processorDecorators;

List<String> processorDecoratorNames = new ArrayList<>(processorDecorators.stream()
.map(Object::getClass)
.map(Class::getName)
.collect(Collectors.toUnmodifiableList()));
Collections.reverse(processorDecoratorNames);
log.info("Configured Processor decorators are in order: {}", String.join(", ", processorDecoratorNames));

log.info("Configured Processor decorators are in order: {}",
beanManager.resolveDecorators(Set.of(Processor.class))
.stream()
.map(Bean::getBeanClass)
.map(Class::getName)
.collect(Collectors.joining(", ")));
}

/**
Expand Down Expand Up @@ -131,7 +138,16 @@ public Processor<KIn, VIn, KOut, VOut> get() {
"Processors cannot have a scope other than @Dependant, since KafkaStreams implementation classes are not thread-safe");
}

return (Processor<KIn, VIn, KOut, VOut>) processor;
return wrapProcessor((Processor<KIn, VIn, KOut, VOut>) processor);
}

private Processor<KIn, VIn, KOut, VOut> wrapProcessor(Processor<KIn, VIn, KOut, VOut> processor) {
Processor<KIn, VIn, KOut, VOut> wrappedProcessor = processor;
for (AbstractProcessorDecorator decorator : processorDecorators) {
decorator.setDelegate(wrappedProcessor);
wrappedProcessor = decorator;
}
return wrappedProcessor;
}

private static boolean hasAnnotation(Object bean, Class<? extends Annotation> annotation) {
Expand Down
Loading

0 comments on commit e1a5aa0

Please sign in to comment.