Skip to content

Commit

Permalink
fix(doc): Warn about flat test classpath and decorators with old vers…
Browse files Browse the repository at this point in the history
…ions of Quarkus

We noticed flackiness with QuarkusTest and usage of `Processor` decorators.
It is generating randomly ClassNotFoundException.
The solution is to force a flat class path for tests.
The configuration property is activated for all users of the library with a definition in the `microprofile-config.properties` file of `quarkus-kafka-streams-processor-impl`.
  • Loading branch information
edeweerd1A committed Oct 3, 2024
1 parent 50cb5f9 commit bd6ebd0
Show file tree
Hide file tree
Showing 17 changed files with 601 additions and 34 deletions.
19 changes: 16 additions & 3 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -769,13 +769,26 @@ You can also, as a regular CDI bean, inject any another CDI bean reference to be
Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations.
The priority will control at which point your decorator will be called among all other decorators.

[WARNING]
====
For Quarkus version prior to 3.11.0, you might notice a random failure of your QuarkusTest with a `ClassNotFoundException` of your custom decorator class.
It can be mitigated by adding the following configuration:
[source,properties]
----
quarkus.test.flat-class-path=true
----
Fortunately, the `quarkus-kafka-streams-processor-impl` module already sets this configuration for you.
====

=== Producer interceptor

Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation.
It does not support CDI resolution.

This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI.
This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI.
Example of usage:

.MyProducerInterceptor.java
Expand Down Expand Up @@ -883,4 +896,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[]

== Configuration from other extension

include::includes/quarkus-other-extension-configurations.adoc[]
include::includes/quarkus-other-extension-configurations.adoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ kafka-streams.internal.leave.group.on.close=true
# Deactivate exposure of metrics through JMX beans
# It is still adding a mxBean in AppInfoParser though
kafka-streams.auto.include.jmx.reporter=false
# For compatibility with generic decorators and Quarkus versions prior to 3.11.0
# TODO: remove in main branch as the problem is not reproducible with Quarkus 3.11.0 and later versions
quarkus.test.flat-class-path=true
18 changes: 18 additions & 0 deletions integration-tests/custom-serde/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Sample with multiple TopologyConfigCustomizers

EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/)

## Introduction

This module showcases the implementation of a
[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances.

## Quarkus Dev mode

The sample is fully working with the Quarkus Dev mode that allows to
modify the code and have a hot replacement when the file is saved. It
can be used also to launch the application.

```
$> mvn clean install quarkus:dev
```
184 changes: 184 additions & 0 deletions integration-tests/custom-serde/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-integration-tests</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-kafka-streams-processor-custom-serde-sample</artifactId>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Microprofile APIs -->
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.config</groupId>
<artifactId>microprofile-config-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- Quarkus extensions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-protobuf-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>de.sven-jacobs</groupId>
<artifactId>loremipsum</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.daniel-shuy</groupId>
<artifactId>kafka-protobuf-serde</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-framework</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.quarkiverse.kafkastreamsprocessor.sample.customserde;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class CustomType {
private int value;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.quarkiverse.kafkastreamsprocessor.sample.customserde;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;

@Dependent
@Priority(1)
public class CustomTypeConfigCustomizer implements ConfigurationCustomizer {
private final CustomTypeSerde serde;
private final CustomTypeSerializer serializer;

@Inject
public CustomTypeConfigCustomizer(CustomTypeSerde serde, CustomTypeSerializer serializer) {
this.serde = serde;
this.serializer = serializer;
}

@Override
public void fillConfiguration(Configuration configuration) {
configuration.setSourceValueSerde(serde);
configuration.setSinkValueSerializer(serializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkiverse.kafkastreamsprocessor.sample.customserde;

import java.nio.charset.StandardCharsets;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.kafka.common.serialization.Deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;

@ApplicationScoped
@Slf4j
public class CustomTypeDeserializer implements Deserializer<CustomType> {
private final ObjectMapper objectMapper;

@Inject
public CustomTypeDeserializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public CustomType deserialize(String topic, byte[] data) {
try {
CustomType readValue = objectMapper.readValue(data, CustomType.class);
return new CustomType(readValue.getValue() - CustomTypeSerde.SHIFT);
} catch (Exception e) {
log.error("Could not deserialize: {}", new String(data, StandardCharsets.UTF_8));
throw new RuntimeException("Error deserializing CustomType", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkiverse.kafkastreamsprocessor.sample.customserde;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

@ApplicationScoped
public class CustomTypeSerde implements Serde<CustomType> {
static final int SHIFT = 10;

private final CustomTypeSerializer customTypeSerializer;

private final CustomTypeDeserializer customTypeDeserializer;

@Inject
public CustomTypeSerde(CustomTypeSerializer customTypeSerializer, CustomTypeDeserializer customTypeDeserializer) {
this.customTypeSerializer = customTypeSerializer;
this.customTypeDeserializer = customTypeDeserializer;
}

@Override
public Serializer<CustomType> serializer() {
return customTypeSerializer;
}

@Override
public Deserializer<CustomType> deserializer() {
return customTypeDeserializer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkiverse.kafkastreamsprocessor.sample.customserde;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.databind.ObjectMapper;

@ApplicationScoped
public class CustomTypeSerializer implements Serializer<CustomType> {
private final ObjectMapper objectMapper;

@Inject
public CustomTypeSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public byte[] serialize(String topic, CustomType data) {
CustomType valueToSerialize = new CustomType(data.getValue() + CustomTypeSerde.SHIFT);
try {
return objectMapper.writeValueAsBytes(valueToSerialize);
} catch (Exception e) {
throw new RuntimeException("Error serializing CustomType", e);
}
}

}
Loading

0 comments on commit bd6ebd0

Please sign in to comment.