diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index c515a81..8b58292 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -769,13 +769,26 @@ You can also, as a regular CDI bean, inject any another CDI bean reference to be Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations. The priority will control at which point your decorator will be called among all other decorators. +[WARNING] +==== +For Quarkus version prior to 3.11.0, you might notice a random failure of your QuarkusTest with a `ClassNotFoundException` of your custom decorator class. +It can be mitigated by adding the following configuration: + +[source,properties] +---- +quarkus.test.flat-class-path=true +---- + +Fortunately, the `quarkus-kafka-streams-processor-impl` module already sets this configuration for you. +==== + === Producer interceptor -Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. +Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation. It does not support CDI resolution. -This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI. +This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI. Example of usage: .MyProducerInterceptor.java @@ -883,4 +896,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[] == Configuration from other extension -include::includes/quarkus-other-extension-configurations.adoc[] \ No newline at end of file +include::includes/quarkus-other-extension-configurations.adoc[] diff --git a/impl/src/main/resources/META-INF/microprofile-config.properties b/impl/src/main/resources/META-INF/microprofile-config.properties index b3f2dae..1cf155f 100644 --- a/impl/src/main/resources/META-INF/microprofile-config.properties +++ b/impl/src/main/resources/META-INF/microprofile-config.properties @@ -19,3 +19,6 @@ kafka-streams.internal.leave.group.on.close=true # Deactivate exposure of metrics through JMX beans # It is still adding a mxBean in AppInfoParser though kafka-streams.auto.include.jmx.reporter=false +# For compatibility with generic decorators and Quarkus versions prior to 3.11.0 +# TODO: remove in main branch as the problem is not reproducible with Quarkus 3.11.0 and later versions +quarkus.test.flat-class-path=true diff --git a/integration-tests/custom-serde/Readme.md b/integration-tests/custom-serde/Readme.md new file mode 100644 index 0000000..d1c497b --- /dev/null +++ b/integration-tests/custom-serde/Readme.md @@ -0,0 +1,18 @@ +# Sample with multiple TopologyConfigCustomizers + +EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/) + +## Introduction + +This module showcases the implementation of a +[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances. + +## Quarkus Dev mode + +The sample is fully working with the Quarkus Dev mode that allows to +modify the code and have a hot replacement when the file is saved. It +can be used also to launch the application. + +``` +$> mvn clean install quarkus:dev +``` diff --git a/integration-tests/custom-serde/pom.xml b/integration-tests/custom-serde/pom.xml new file mode 100644 index 0000000..cecafd2 --- /dev/null +++ b/integration-tests/custom-serde/pom.xml @@ -0,0 +1,184 @@ + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-integration-tests + 2.0.0-SNAPSHOT + + 4.0.0 + + quarkus-kafka-streams-processor-custom-serde-sample + + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-bom + ${project.version} + pom + import + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-bom + ${project.version} + pom + import + + + + + + + + jakarta.inject + jakarta.inject-api + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + org.eclipse.microprofile.config + microprofile-config-api + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + io.quarkus + quarkus-kafka-streams + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + + + io.quarkus + quarkus-smallrye-health + runtime + + + io.quarkus + quarkus-micrometer-registry-prometheus + runtime + + + io.quarkus + quarkus-opentelemetry + runtime + + + + org.apache.kafka + kafka-streams + + + org.apache.kafka + kafka-clients + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-api + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-protobuf-binding + ${project.version} + + + de.sven-jacobs + loremipsum + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-api + test + + + io.quarkus + quarkus-junit5 + test + + + io.quarkus + quarkus-test-common + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.apache.kafka + kafka-streams-test-utils + test + + + org.awaitility + awaitility + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + com.github.daniel-shuy + kafka-protobuf-serde + test + + + io.rest-assured + rest-assured + test + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + org.hamcrest + hamcrest + test + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-framework + test + + + diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java new file mode 100644 index 0000000..05f0df4 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java @@ -0,0 +1,14 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +public class CustomType { + private int value; +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java new file mode 100644 index 0000000..1abff5e --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java @@ -0,0 +1,27 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.annotation.Priority; +import jakarta.enterprise.context.Dependent; +import jakarta.inject.Inject; + +import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration; +import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer; + +@Dependent +@Priority(1) +public class CustomTypeConfigCustomizer implements ConfigurationCustomizer { + private final CustomTypeSerde serde; + private final CustomTypeSerializer serializer; + + @Inject + public CustomTypeConfigCustomizer(CustomTypeSerde serde, CustomTypeSerializer serializer) { + this.serde = serde; + this.serializer = serializer; + } + + @Override + public void fillConfiguration(Configuration configuration) { + configuration.setSourceValueSerde(serde); + configuration.setSinkValueSerializer(serializer); + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java new file mode 100644 index 0000000..6ffc2d0 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java @@ -0,0 +1,34 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import java.nio.charset.StandardCharsets; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; + +@ApplicationScoped +@Slf4j +public class CustomTypeDeserializer implements Deserializer { + private final ObjectMapper objectMapper; + + @Inject + public CustomTypeDeserializer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public CustomType deserialize(String topic, byte[] data) { + try { + CustomType readValue = objectMapper.readValue(data, CustomType.class); + return new CustomType(readValue.getValue() - CustomTypeSerde.SHIFT); + } catch (Exception e) { + log.error("Could not deserialize: {}", new String(data, StandardCharsets.UTF_8)); + throw new RuntimeException("Error deserializing CustomType", e); + } + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java new file mode 100644 index 0000000..c177945 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java @@ -0,0 +1,33 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +@ApplicationScoped +public class CustomTypeSerde implements Serde { + static final int SHIFT = 10; + + private final CustomTypeSerializer customTypeSerializer; + + private final CustomTypeDeserializer customTypeDeserializer; + + @Inject + public CustomTypeSerde(CustomTypeSerializer customTypeSerializer, CustomTypeDeserializer customTypeDeserializer) { + this.customTypeSerializer = customTypeSerializer; + this.customTypeDeserializer = customTypeDeserializer; + } + + @Override + public Serializer serializer() { + return customTypeSerializer; + } + + @Override + public Deserializer deserializer() { + return customTypeDeserializer; + } +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java new file mode 100644 index 0000000..326a685 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java @@ -0,0 +1,29 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.Serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@ApplicationScoped +public class CustomTypeSerializer implements Serializer { + private final ObjectMapper objectMapper; + + @Inject + public CustomTypeSerializer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public byte[] serialize(String topic, CustomType data) { + CustomType valueToSerialize = new CustomType(data.getValue() + CustomTypeSerde.SHIFT); + try { + return objectMapper.writeValueAsBytes(valueToSerialize); + } catch (Exception e) { + throw new RuntimeException("Error serializing CustomType", e); + } + } + +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java new file mode 100644 index 0000000..5f2b0a5 --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java @@ -0,0 +1,44 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import java.nio.charset.StandardCharsets; + +import jakarta.annotation.Priority; +import jakarta.decorator.Decorator; +import jakarta.decorator.Delegate; +import jakarta.inject.Inject; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities; + +@Decorator +@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION + 2) + +public class HeaderDecorator implements Processor { + @lombok.experimental.Delegate(excludes = Excludes.class) + private final Processor delegate; + + @Inject + public HeaderDecorator(@Delegate Processor delegate) { + this.delegate = delegate; + } + + @Override + public void process(Record record) { + Header header = record.headers().lastHeader("custom-header"); + if (header != null) { + String value = new String(header.value(), StandardCharsets.UTF_8); + if (value.contains("error")) { + throw new IllegalStateException("Error in header"); + } + } + delegate.process(record); + } + + private interface Excludes { + void process(Record record); + } + +} diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java new file mode 100644 index 0000000..bf843ba --- /dev/null +++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java @@ -0,0 +1,38 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Processor +@RequiredArgsConstructor +public class PingProcessor extends ContextualProcessor { + @Override + public void process(Record ping) { + log.info("Process the custom type with value: {}", ping.value().getValue()); + context().forward(ping); + } +} diff --git a/integration-tests/custom-serde/src/main/resources/application.properties b/integration-tests/custom-serde/src/main/resources/application.properties new file mode 100644 index 0000000..38bd30c --- /dev/null +++ b/integration-tests/custom-serde/src/main/resources/application.properties @@ -0,0 +1,5 @@ +kafkastreamsprocessor.input.topic=ping-events +kafkastreamsprocessor.output.topic=pong-events +quarkus.kafka-streams.bootstrap-servers=localhost:9092 +quarkus.kafka-streams.topics=ping-events,pong-events +kafka-streams.producer.linger.ms=0 \ No newline at end of file diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java new file mode 100644 index 0000000..d508ae1 --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java @@ -0,0 +1,23 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class CustomTypeDeserializerTest { + + CustomTypeDeserializer deserializer = new CustomTypeDeserializer(new ObjectMapper()); + + @Test + public void testDeserialize() { + byte[] data = "{\"value\":11}".getBytes(); + + Object customType = deserializer.deserialize("topic", data); + + assertThat(((CustomType) customType).getValue(), equalTo(1)); + } + +} \ No newline at end of file diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java new file mode 100644 index 0000000..e6a7e64 --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java @@ -0,0 +1,23 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class CustomTypeSerializerTest { + CustomTypeSerializer serializer = new CustomTypeSerializer(new ObjectMapper()); + + @Test + public void testSerialize() { + CustomType customType = new CustomType(1); + + byte[] serialized = serializer.serialize("topic", customType); + + assertThat(new String(serialized, StandardCharsets.UTF_8), equalTo("{\"value\":11}")); + } +} \ No newline at end of file diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java new file mode 100644 index 0000000..3415327 --- /dev/null +++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java @@ -0,0 +1,78 @@ +package io.quarkiverse.kafkastreamsprocessor.sample.customserde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Durations; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class PingProcessorQuarkusTest { + @ConfigProperty(name = "kafka.bootstrap.servers") + String kafkaBootstrapServers; + + String senderTopic = "ping-events"; + + String consumerTopic = "pong-events"; + + KafkaProducer producer; + + KafkaConsumer consumer; + + @Inject + CustomTypeSerde customTypeSerde; + + @BeforeEach + public void setup() throws Exception { + Map consumerProps = KafkaTestUtils.consumerProps(kafkaBootstrapServers, "test", "true"); + consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), customTypeSerde.deserializer()); + consumer.subscribe(List.of(consumerTopic)); + + Map producerProps = KafkaTestUtils.producerProps(kafkaBootstrapServers); + producer = new KafkaProducer<>(producerProps, new StringSerializer(), customTypeSerde.serializer()); + } + + @AfterEach + public void tearDown() { + producer.close(); + consumer.close(); + } + + @Test + public void testCount() { + producer.send(new ProducerRecord<>(senderTopic, "1", new CustomType(1))); + producer.flush(); + ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, consumerTopic, + Durations.FIVE_SECONDS); + assertThat(((CustomType) record.value()).getValue(), equalTo(1)); + } + + @Test + public void testHeaderError() { + producer.send(new ProducerRecord<>(senderTopic, 0, "1", new CustomType(1), + new RecordHeaders().add("custom-header", "error".getBytes(StandardCharsets.UTF_8)))); + producer.flush(); + assertThrows(IllegalStateException.class, + () -> KafkaTestUtils.getSingleRecord(consumer, consumerTopic, Durations.FIVE_SECONDS)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index f32c922..2b85ec8 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -15,6 +15,7 @@ multioutput simple stateful + custom-serde diff --git a/pom.xml b/pom.xml index a8d01ab..a83a5ac 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ 17 UTF-8 UTF-8 - 3.8.3 + 3.8.6 3.24.1 @@ -98,37 +98,37 @@ LF - - - - - org.codehaus.mojo - license-maven-plugin - 2.4.0 - - - add-license-header - - check-file-header - - process-sources - - - - false - apache_v2 - 2024 - Amadeus s.a.s. - Quarkus Kafka Streams Processor - - **/*.java - - - **/*$$*.java - - - + + + + org.codehaus.mojo + license-maven-plugin + 2.4.0 + + + add-license-header + + check-file-header + + process-sources + + + + false + apache_v2 + 2024 + Amadeus s.a.s. + Quarkus Kafka Streams Processor + + **/*.java + + + **/*$$*.java + + + +