-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(doc): Warn about flat test classpath and decorators with old vers…
…ions of Quarkus We noticed flackiness with QuarkusTest and usage of `Processor` decorators. It is generating randomly ClassNotFoundException. The solution is to force a flat class path for tests. The configuration property is activated for all users of the library with a definition in the `microprofile-config.properties` file of `quarkus-kafka-streams-processor-impl`.
- Loading branch information
1 parent
50cb5f9
commit 72f4d77
Showing
17 changed files
with
597 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Sample with multiple TopologyConfigCustomizers | ||
|
||
EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/) | ||
|
||
## Introduction | ||
|
||
This module showcases the implementation of a | ||
[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances. | ||
|
||
## Quarkus Dev mode | ||
|
||
The sample is fully working with the Quarkus Dev mode that allows to | ||
modify the code and have a hot replacement when the file is saved. It | ||
can be used also to launch the application. | ||
|
||
``` | ||
$> mvn clean install quarkus:dev | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-integration-tests</artifactId> | ||
<version>2.0.0-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>quarkus-kafka-streams-processor-custom-serde-sample</artifactId> | ||
|
||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-bom</artifactId> | ||
<version>${project.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId> | ||
<version>${project.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
|
||
<dependencies> | ||
<!-- Microprofile APIs --> | ||
<dependency> | ||
<groupId>jakarta.inject</groupId> | ||
<artifactId>jakarta.inject-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>jakarta.enterprise</groupId> | ||
<artifactId>jakarta.enterprise.cdi-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.eclipse.microprofile.config</groupId> | ||
<artifactId>microprofile-config-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-core</artifactId> | ||
</dependency> | ||
|
||
<!-- Quarkus extensions --> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-kafka-streams</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-impl</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-smallrye-health</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-micrometer-registry-prometheus</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-opentelemetry</artifactId> | ||
<scope>runtime</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-streams</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-protobuf-binding</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>de.sven-jacobs</groupId> | ||
<artifactId>loremipsum</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-api</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-junit5</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkus</groupId> | ||
<artifactId>quarkus-test-common</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.kafka</groupId> | ||
<artifactId>spring-kafka-test</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-streams-test-utils</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.awaitility</groupId> | ||
<artifactId>awaitility</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-junit-jupiter</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.github.daniel-shuy</groupId> | ||
<artifactId>kafka-protobuf-serde</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.rest-assured</groupId> | ||
<artifactId>rest-assured</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.projectlombok</groupId> | ||
<artifactId>lombok</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.hamcrest</groupId> | ||
<artifactId>hamcrest</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId> | ||
<artifactId>quarkus-kafka-streams-processor-test-framework</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
</project> |
14 changes: 14 additions & 0 deletions
14
...rde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package io.quarkiverse.kafkastreamsprocessor.sample.customserde; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.NoArgsConstructor; | ||
import lombok.Setter; | ||
|
||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
@Getter | ||
@Setter | ||
public class CustomType { | ||
private int value; | ||
} |
27 changes: 27 additions & 0 deletions
27
...a/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package io.quarkiverse.kafkastreamsprocessor.sample.customserde; | ||
|
||
import jakarta.annotation.Priority; | ||
import jakarta.enterprise.context.Dependent; | ||
import jakarta.inject.Inject; | ||
|
||
import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration; | ||
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; | ||
|
||
@Dependent | ||
@Priority(1) | ||
public class CustomTypeConfigCustomizer implements ConfigurationCustomizer { | ||
private final CustomTypeSerde serde; | ||
private final CustomTypeSerializer serializer; | ||
|
||
@Inject | ||
public CustomTypeConfigCustomizer(CustomTypeSerde serde, CustomTypeSerializer serializer) { | ||
this.serde = serde; | ||
this.serializer = serializer; | ||
} | ||
|
||
@Override | ||
public void fillConfiguration(Configuration configuration) { | ||
configuration.setSourceValueSerde(serde); | ||
configuration.setSinkValueSerializer(serializer); | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
.../java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package io.quarkiverse.kafkastreamsprocessor.sample.customserde; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.apache.kafka.common.serialization.Deserializer; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@ApplicationScoped | ||
@Slf4j | ||
public class CustomTypeDeserializer implements Deserializer<CustomType> { | ||
private final ObjectMapper objectMapper; | ||
|
||
@Inject | ||
public CustomTypeDeserializer(ObjectMapper objectMapper) { | ||
this.objectMapper = objectMapper; | ||
} | ||
|
||
@Override | ||
public CustomType deserialize(String topic, byte[] data) { | ||
try { | ||
CustomType readValue = objectMapper.readValue(data, CustomType.class); | ||
return new CustomType(readValue.getValue() - CustomTypeSerde.SHIFT); | ||
} catch (Exception e) { | ||
log.error("Could not deserialize: {}", new String(data, StandardCharsets.UTF_8)); | ||
throw new RuntimeException("Error deserializing CustomType", e); | ||
} | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
...rc/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package io.quarkiverse.kafkastreamsprocessor.sample.customserde; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
@ApplicationScoped | ||
public class CustomTypeSerde implements Serde<CustomType> { | ||
static final int SHIFT = 10; | ||
|
||
private final CustomTypeSerializer customTypeSerializer; | ||
|
||
private final CustomTypeDeserializer customTypeDeserializer; | ||
|
||
@Inject | ||
public CustomTypeSerde(CustomTypeSerializer customTypeSerializer, CustomTypeDeserializer customTypeDeserializer) { | ||
this.customTypeSerializer = customTypeSerializer; | ||
this.customTypeDeserializer = customTypeDeserializer; | ||
} | ||
|
||
@Override | ||
public Serializer<CustomType> serializer() { | ||
return customTypeSerializer; | ||
} | ||
|
||
@Override | ||
public Deserializer<CustomType> deserializer() { | ||
return customTypeDeserializer; | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
...in/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.quarkiverse.kafkastreamsprocessor.sample.customserde; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
@ApplicationScoped | ||
public class CustomTypeSerializer implements Serializer<CustomType> { | ||
private final ObjectMapper objectMapper; | ||
|
||
@Inject | ||
public CustomTypeSerializer(ObjectMapper objectMapper) { | ||
this.objectMapper = objectMapper; | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String topic, CustomType data) { | ||
CustomType valueToSerialize = new CustomType(data.getValue() + CustomTypeSerde.SHIFT); | ||
try { | ||
return objectMapper.writeValueAsBytes(valueToSerialize); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Error serializing CustomType", e); | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.