wrappedProcessor = processor;
+ for (AbstractProcessorDecorator decorator : processorDecorators) {
+ decorator.setDelegate(wrappedProcessor);
+ wrappedProcessor = decorator;
+ }
+ return wrappedProcessor;
}
private static boolean hasAnnotation(Object bean, Class extends Annotation> annotation) {
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
index fc92484..7781526 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
@@ -20,17 +20,16 @@
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;
import jakarta.annotation.Priority;
-import jakarta.decorator.Decorator;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
-import lombok.AccessLevel;
-import lombok.RequiredArgsConstructor;
/**
* This class is responsible to manage the lifecycle of {@link jakarta.enterprise.context.RequestScoped} beans. It
@@ -41,16 +40,11 @@
*
* Warning: "Quarkus Tests" Junit extension is already managing the request scope on its own.
*/
-@Decorator
+//@Decorator
+@Dependent
@Priority(ProcessorDecoratorPriorities.CDI_REQUEST_SCOPE)
-@RequiredArgsConstructor(access = AccessLevel.MODULE)
-public class CdiRequestContextDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+//@RequiredArgsConstructor(access = AccessLevel.MODULE)
+public class CdiRequestContextDecorator extends AbstractProcessorDecorator {
/**
* The container object from Arc to inquire on request contextualization availability and activation
*/
@@ -58,13 +52,14 @@ public class CdiRequestContextDecorator implements Process
/**
* Constructor for injection of the delegate.
- *
- * @param delegate
- * injection point for composition
*/
@Inject
- public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor delegate) {
- this(delegate, Arc.container());
+ public CdiRequestContextDecorator() {
+ this(Arc.container());
+ }
+
+ public CdiRequestContextDecorator(ArcContainer container) {
+ this.container = container;
}
/**
@@ -74,20 +69,16 @@ public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor record) {
+ public void process(Record record) {
if (container.requestContext().isActive()) {
- delegate.process(record);
+ getDelegate().process(record);
} else {
container.requestContext().activate();
try {
- delegate.process(record);
+ getDelegate().process(record);
} finally {
container.requestContext().terminate();
}
}
}
-
- private interface Excludes {
- void process(Record record);
- }
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
index a13b62d..f3a6eaf 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
@@ -23,8 +23,8 @@
import java.util.Set;
import jakarta.annotation.Priority;
-import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.common.KafkaException;
@@ -37,6 +37,7 @@
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler;
@@ -53,14 +54,10 @@
* Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees
* (at least once / exactly once).
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.DLQ)
-public class DlqDecorator implements Processor {
- /**
- * Inject point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
+@Dependent
+public class DlqDecorator extends AbstractProcessorDecorator {
/**
* A set of sink names that are involved in the business logic.
@@ -89,11 +86,10 @@ public class DlqDecorator implements Processor context;
+ private ProcessorContext context;
- DlqDecorator(Processor delegate, Set functionalSinks, DlqMetadataHandler dlqMetadataHandler,
+ DlqDecorator(Set functionalSinks, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics, boolean activated) {
- this.delegate = delegate;
this.functionalSinks = functionalSinks;
this.dlqMetadataHandler = dlqMetadataHandler;
this.metrics = metrics;
@@ -103,8 +99,6 @@ public class DlqDecorator implements Processor implements Processor delegate,
+ public DlqDecorator(
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics,
KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config
- this(delegate, sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
+ this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic()));
}
@@ -135,12 +129,12 @@ public DlqDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext context) {
if (activated) {
- this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks);
- delegate.init(this.context);
+ this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks);
+ getDelegate().init(this.context);
} else {
- delegate.init(context);
+ getDelegate().init(context);
}
}
@@ -154,10 +148,10 @@ public void init(final ProcessorContext context) {
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
if (activated) {
try {
- delegate.process(record);
+ getDelegate().process(record);
} catch (KafkaException e) {
// Do not forward to DLQ
throw e;
@@ -166,14 +160,14 @@ public void process(Record record) {
if (recordMetadata.isPresent()) {
dlqMetadataHandler.addMetadata(record.headers(), recordMetadata.get().topic(),
recordMetadata.get().partition(), e);
- context.forward((Record) record, TopologyProducer.DLQ_SINK_NAME);
+ context.forward(record, TopologyProducer.DLQ_SINK_NAME);
// Re-throw so the exception gets logged
metrics.microserviceDlqSentCounter().increment();
throw e;
}
}
} else {
- delegate.process(record);
+ getDelegate().process(record);
}
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
index 0e73b00..3f44f62 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
@@ -21,27 +21,23 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
/**
* Decorator to enrich Kafka Streams metrics with a counter of exception raised by {@link Processor#process(Record)}.
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.METRICS)
-public class MetricsDecorator implements Processor {
- /**
- * Injection point for composition.
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class MetricsDecorator extends AbstractProcessorDecorator {
/**
* Counter of exception raised by {@link Processor#process(Record)}.
*/
@@ -50,15 +46,11 @@ public class MetricsDecorator implements Processor delegate,
- KafkaStreamsProcessorMetrics metrics) {
- this.delegate = delegate;
+ public MetricsDecorator(KafkaStreamsProcessorMetrics metrics) {
this.metrics = metrics;
}
@@ -70,9 +62,9 @@ public MetricsDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
try {
- delegate.process(record);
+ getDelegate().process(record);
} catch (Exception e) { // NOSONAR: Catching any error
metrics.processorErrorCounter().increment();
throw e;
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
index 5e1c1e0..cc4fa6c 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
@@ -24,6 +24,7 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@@ -34,6 +35,7 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.punctuator.DecoratedPunctuator;
import lombok.RequiredArgsConstructor;
@@ -44,15 +46,10 @@
*
* @see PunctuatorDecorationProcessorContextDecorator
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION)
-public class PunctuatorDecorationProcessorDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class PunctuatorDecorationProcessorDecorator extends AbstractProcessorDecorator {
/**
* List of all the {@link Punctuator} decorators defined in this library and potential extensions made with the API.
*/
@@ -61,16 +58,13 @@ public class PunctuatorDecorationProcessorDecorator implem
/**
* Injection constructor.
*
- * @param delegate
- * injection point for composition
* @param decoratedPunctuators
* the list of all {@link Punctuator} decorators defined in this library and potential extensions made with
* the API.
*/
@Inject
- public PunctuatorDecorationProcessorDecorator(@Delegate Processor delegate,
+ public PunctuatorDecorationProcessorDecorator(
Instance decoratedPunctuators) {
- this.delegate = delegate;
this.decoratedPunctuators = decoratedPunctuators;
}
@@ -84,8 +78,8 @@ public PunctuatorDecorationProcessorDecorator(@Delegate Processor context) {
- delegate.init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context,
+ public void init(ProcessorContext context) {
+ getDelegate().init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context,
decoratedPunctuators));
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
index 92d75a7..5b79a55 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
@@ -21,13 +21,14 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.eclipse.microprofile.faulttolerance.Retry;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import lombok.extern.slf4j.Slf4j;
@@ -35,15 +36,10 @@
* Decorate a {@link Processor#process} with the {@link Retry} fault tolerance annotation.
*/
@Slf4j
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.RETRY)
-public class RetryDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class RetryDecorator extends AbstractProcessorDecorator {
/**
* The delegate object that has the processor method with the {@link Retry} annotation.
*
@@ -58,15 +54,12 @@ public class RetryDecorator implements Processor delegate,
+ public RetryDecorator(
RetryDecoratorDelegate retryDecoratorDelegate) {
- this.delegate = delegate;
this.retryDecoratorDelegate = retryDecoratorDelegate;
}
@@ -78,9 +71,9 @@ public RetryDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
try {
- retryDecoratorDelegate.retryableProcess(delegate, record);
+ retryDecoratorDelegate.retryableProcess(getDelegate(), record);
} catch (RuntimeException e) {
log.info("An exception that has been raised by the processor will not be retried.\n"
+ "Possible causes:\n"
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
index 5c05333..5597a68 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
@@ -31,7 +31,7 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.common.KafkaException;
@@ -54,13 +54,12 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
-import lombok.AccessLevel;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -72,16 +71,10 @@
* {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}, which executes on the polling thread.
*/
@Slf4j
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.TRACING)
-@RequiredArgsConstructor(access = AccessLevel.MODULE)
-public class TracingDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class TracingDecorator extends AbstractProcessorDecorator {
/**
* The {@link OpenTelemetry} configured by Quarkus
*/
@@ -115,13 +108,11 @@ public class TracingDecorator implements Processor context;
+ private ProcessorContext context;
/**
* Injection constructor.
*
- * @param delegate
- * injection point for composition
* @param openTelemetry
* The {@link OpenTelemetry} configured by Quarkus
* @param textMapGetter
@@ -134,14 +125,24 @@ public class TracingDecorator implements Processor delegate, OpenTelemetry openTelemetry,
+ public TracingDecorator(OpenTelemetry openTelemetry,
KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer,
TopologyConfigurationImpl configuration) {
- this(delegate, openTelemetry, textMapGetter, textMapSetter, tracer,
+ this(openTelemetry, textMapGetter, textMapSetter, tracer,
configuration.getProcessorPayloadType().getName(),
JsonFormat.printer());
}
+ public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter,
+ Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) {
+ this.openTelemetry = openTelemetry;
+ this.textMapGetter = textMapGetter;
+ this.textMapSetter = textMapSetter;
+ this.tracer = tracer;
+ this.applicationName = applicationName;
+ this.jsonPrinter = jsonPrinter;
+ }
+
/**
* Init just to capture the reference to {@link ProcessorContext}.
*
@@ -150,8 +151,8 @@ public TracingDecorator(@Delegate Processor delegate, Open
* {@inheritDoc}
*/
@Override
- public void init(final ProcessorContext context) {
- delegate.init(context);
+ public void init(final ProcessorContext context) {
+ getDelegate().init(context);
this.context = context;
}
@@ -164,7 +165,7 @@ public void init(final ProcessorContext context) {
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
SpanBuilder spanBuilder = tracer.spanBuilder(applicationName);
final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
@@ -194,7 +195,7 @@ public void process(Record record) {
// the headers in the incoming message so when an outgoing message is produced with the copied
// header values it already has the span id from this new child span
propagator.inject(Context.current(), record.headers(), textMapSetter);
- delegate.process(record);
+ getDelegate().process(record);
span.setStatus(StatusCode.OK);
} catch (KafkaException e) {
// we got a Kafka exception, we record the exception in the span, log but rethrow the exception
@@ -216,7 +217,7 @@ public void process(Record record) {
}
}
- void logInputMessageMetadata(Record record) {
+ void logInputMessageMetadata(Record record) {
if (log.isDebugEnabled()) {
Map headers = toMap(record.headers());
LoggedRecord.LoggedRecordBuilder builder = LoggedRecord.builder()
diff --git a/impl/src/main/resources/META-INF/microprofile-config.properties b/impl/src/main/resources/META-INF/microprofile-config.properties
index b3f2dae..1cf155f 100644
--- a/impl/src/main/resources/META-INF/microprofile-config.properties
+++ b/impl/src/main/resources/META-INF/microprofile-config.properties
@@ -19,3 +19,6 @@ kafka-streams.internal.leave.group.on.close=true
# Deactivate exposure of metrics through JMX beans
# It is still adding a mxBean in AppInfoParser though
kafka-streams.auto.include.jmx.reporter=false
+# For compatibility with generic decorators and Quarkus versions prior to 3.11.0
+# TODO: remove in main branch as the problem is not reproducible with Quarkus 3.11.0 and later versions
+quarkus.test.flat-class-path=true
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
index 3264e55..2114411 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
@@ -41,7 +41,7 @@
@ExtendWith(MockitoExtension.class)
class CdiRequestContextDecoratorTest {
- CdiRequestContextDecorator decorator;
+ CdiRequestContextDecorator decorator;
@Mock
ArcContainer container;
@@ -55,7 +55,8 @@ class CdiRequestContextDecoratorTest {
@BeforeEach
public void setup() {
when(container.requestContext()).thenReturn(requestContext);
- decorator = new CdiRequestContextDecorator<>(processor, container);
+ decorator = new CdiRequestContextDecorator(container);
+ decorator.setDelegate(processor);
}
@Test
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
index 6c6059a..b0c3074 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
@@ -62,7 +62,7 @@ public class DlqDecoratorTest {
private static final String RECORD_KEY = "key";
private static final String RECORD_VALUE = "value";
- DlqDecorator decorator;
+ DlqDecorator decorator;
DlqProcessorContextDecorator contextDecorator;
@@ -86,7 +86,8 @@ public class DlqDecoratorTest {
@BeforeEach
public void setUp() {
- decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true);
+ decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(context);
headers = new RecordHeaders();
record = new Record<>(RECORD_KEY, RECORD_VALUE, 0L, headers);
@@ -131,7 +132,8 @@ public void shouldForwardKeyValueToAllSinks() {
@Test
public void shouldDoNothingIfDeactivated() {
- decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false);
+ decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(context);
decorator.process(record);
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
index f8ce1a2..bbef50b 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
@@ -48,14 +48,15 @@ public class MetricsDecoratorTest {
Ping inputMessage = Ping.newBuilder().setMessage("message").build();
- MetricsDecorator processorDecorator;
+ MetricsDecorator processorDecorator;
@Mock
ArcContainer arcContainer;
@BeforeEach
public void setUp() {
- processorDecorator = new MetricsDecorator<>(kafkaProcessor, metrics);
+ processorDecorator = new MetricsDecorator(metrics);
+ processorDecorator.setDelegate(kafkaProcessor);
}
@Test
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
index 9a5ffa5..ca7e4aa 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
@@ -68,9 +68,8 @@ public void process(Record record) {
}
};
- PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>(
- processor,
- decoratedPunctuators);
+ PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator(decoratedPunctuators);
+ decorator.setDelegate(processor);
decorator.init(context);
decorator.process(new Record<>("blabla",PingMessage.Ping.newBuilder().setMessage("blabla").build(),0L,null));
decorator.close();
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
index 930b32f..6e137cc 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
@@ -114,7 +114,7 @@ public class TracingDecoratorTest {
@RegisterExtension
static final OpenTelemetryExtension otel = OpenTelemetryExtension.create();
- TracingDecorator decorator;
+ TracingDecorator decorator;
@Mock
JsonFormat.Printer jsonPrinter;
@@ -140,8 +140,9 @@ public void setUp() {
rootLogger.addHandler(inMemoryLogHandler);
rootLogger.setLevel(Level.DEBUG);
when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class);
- decorator = new TracingDecorator<>(kafkaProcessor, otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter,
+ decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);
}
@@ -206,8 +207,9 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
.setMessage("blabla")
.build(), 0L, headers);
- decorator = new TracingDecorator<>(new ThrowExceptionProcessor(), otel.getOpenTelemetry(), kafkaTextMapGetter,
+ decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter,
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(new ThrowExceptionProcessor());
decorator.init(processorContext);
assertDoesNotThrow(() -> decorator.process(record));
@@ -307,9 +309,9 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable {
void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
Processor kafkaProcessor = mock(Processor.class);
ProcessorContext processorContext = mock(ProcessorContext.class);
- TracingDecorator decorator = new TracingDecorator<>(
- kafkaProcessor, GlobalOpenTelemetry.get(), kafkaTextMapGetter,
+ TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter,
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);
RuntimeException exception = new TestException();
@@ -334,9 +336,10 @@ void shouldPropagateOpentelemetryW3CBaggage() {
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance())))
.build()) {
- decorator = new TracingDecorator<>(new LogOpentelemetryBaggageProcessor(), openTelemetryWithBaggageSdk,
+ decorator = new TracingDecorator(openTelemetryWithBaggageSdk,
kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME,
jsonPrinter);
+ decorator.setDelegate(new LogOpentelemetryBaggageProcessor());
decorator.init(processorContext);
decorator.process(record);
diff --git a/integration-tests/custom-serde/Readme.md b/integration-tests/custom-serde/Readme.md
new file mode 100644
index 0000000..d1c497b
--- /dev/null
+++ b/integration-tests/custom-serde/Readme.md
@@ -0,0 +1,18 @@
+# Sample with multiple TopologyConfigCustomizers
+
+EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/)
+
+## Introduction
+
+This module showcases the implementation of a
+[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances.
+
+## Quarkus Dev mode
+
+The sample is fully working with the Quarkus Dev mode that allows to
+modify the code and have a hot replacement when the file is saved. It
+can be used also to launch the application.
+
+```
+$> mvn clean install quarkus:dev
+```
diff --git a/integration-tests/custom-serde/pom.xml b/integration-tests/custom-serde/pom.xml
new file mode 100644
index 0000000..cecafd2
--- /dev/null
+++ b/integration-tests/custom-serde/pom.xml
@@ -0,0 +1,184 @@
+
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-integration-tests
+ 2.0.0-SNAPSHOT
+
+ 4.0.0
+
+ quarkus-kafka-streams-processor-custom-serde-sample
+
+
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-bom
+ ${project.version}
+ pom
+ import
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-test-bom
+ ${project.version}
+ pom
+ import
+
+
+
+
+
+
+
+ jakarta.inject
+ jakarta.inject-api
+
+
+ jakarta.enterprise
+ jakarta.enterprise.cdi-api
+
+
+ org.eclipse.microprofile.config
+ microprofile-config-api
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+ io.quarkus
+ quarkus-kafka-streams
+ runtime
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor
+ runtime
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-impl
+ runtime
+
+
+ io.quarkus
+ quarkus-smallrye-health
+ runtime
+
+
+ io.quarkus
+ quarkus-micrometer-registry-prometheus
+ runtime
+
+
+ io.quarkus
+ quarkus-opentelemetry
+ runtime
+
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-api
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-protobuf-binding
+ ${project.version}
+
+
+ de.sven-jacobs
+ loremipsum
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.quarkus
+ quarkus-test-common
+ test
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.apache.kafka
+ kafka-streams-test-utils
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ com.github.daniel-shuy
+ kafka-protobuf-serde
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.projectlombok
+ lombok
+ provided
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.hamcrest
+ hamcrest
+ test
+
+
+ io.quarkiverse.kafkastreamsprocessor
+ quarkus-kafka-streams-processor-test-framework
+ test
+
+
+
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java
new file mode 100644
index 0000000..91f8d93
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomType.java
@@ -0,0 +1,33 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+public class CustomType {
+ private int value;
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java
new file mode 100644
index 0000000..1df5867
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeConfigCustomizer.java
@@ -0,0 +1,46 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.Dependent;
+import jakarta.inject.Inject;
+
+import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration;
+import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;
+
+@Dependent
+@Priority(1)
+public class CustomTypeConfigCustomizer implements ConfigurationCustomizer {
+ private final CustomTypeSerde serde;
+ private final CustomTypeSerializer serializer;
+
+ @Inject
+ public CustomTypeConfigCustomizer(CustomTypeSerde serde, CustomTypeSerializer serializer) {
+ this.serde = serde;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void fillConfiguration(Configuration configuration) {
+ configuration.setSourceValueSerde(serde);
+ configuration.setSinkValueSerializer(serializer);
+ }
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java
new file mode 100644
index 0000000..7ec4543
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializer.java
@@ -0,0 +1,53 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import java.nio.charset.StandardCharsets;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+@ApplicationScoped
+@Slf4j
+public class CustomTypeDeserializer implements Deserializer {
+ private final ObjectMapper objectMapper;
+
+ @Inject
+ public CustomTypeDeserializer(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public CustomType deserialize(String topic, byte[] data) {
+ try {
+ CustomType readValue = objectMapper.readValue(data, CustomType.class);
+ return new CustomType(readValue.getValue() - CustomTypeSerde.SHIFT);
+ } catch (Exception e) {
+ log.error("Could not deserialize: {}", new String(data, StandardCharsets.UTF_8));
+ throw new RuntimeException("Error deserializing CustomType", e);
+ }
+ }
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java
new file mode 100644
index 0000000..776252f
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerde.java
@@ -0,0 +1,52 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+@ApplicationScoped
+public class CustomTypeSerde implements Serde {
+ static final int SHIFT = 10;
+
+ private final CustomTypeSerializer customTypeSerializer;
+
+ private final CustomTypeDeserializer customTypeDeserializer;
+
+ @Inject
+ public CustomTypeSerde(CustomTypeSerializer customTypeSerializer, CustomTypeDeserializer customTypeDeserializer) {
+ this.customTypeSerializer = customTypeSerializer;
+ this.customTypeDeserializer = customTypeDeserializer;
+ }
+
+ @Override
+ public Serializer serializer() {
+ return customTypeSerializer;
+ }
+
+ @Override
+ public Deserializer deserializer() {
+ return customTypeDeserializer;
+ }
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java
new file mode 100644
index 0000000..8a3791a
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializer.java
@@ -0,0 +1,48 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@ApplicationScoped
+public class CustomTypeSerializer implements Serializer {
+ private final ObjectMapper objectMapper;
+
+ @Inject
+ public CustomTypeSerializer(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public byte[] serialize(String topic, CustomType data) {
+ CustomType valueToSerialize = new CustomType(data.getValue() + CustomTypeSerde.SHIFT);
+ try {
+ return objectMapper.writeValueAsBytes(valueToSerialize);
+ } catch (Exception e) {
+ throw new RuntimeException("Error serializing CustomType", e);
+ }
+ }
+
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java
new file mode 100644
index 0000000..ea5507f
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java
@@ -0,0 +1,47 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import java.nio.charset.StandardCharsets;
+
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.Dependent;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.streams.processor.api.Record;
+
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
+
+@Dependent
+@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION + 2)
+public class HeaderDecorator extends AbstractProcessorDecorator {
+ @Override
+ public void process(Record record) {
+ Header header = record.headers().lastHeader("custom-header");
+ if (header != null) {
+ String value = new String(header.value(), StandardCharsets.UTF_8);
+ if (value.contains("error")) {
+ throw new IllegalStateException("Error in header");
+ }
+ }
+ getDelegate().process(record);
+ }
+}
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java
new file mode 100644
index 0000000..bf843ba
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessor.java
@@ -0,0 +1,38 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
+
+import io.quarkiverse.kafkastreamsprocessor.api.Processor;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Processor
+@RequiredArgsConstructor
+public class PingProcessor extends ContextualProcessor {
+ @Override
+ public void process(Record ping) {
+ log.info("Process the custom type with value: {}", ping.value().getValue());
+ context().forward(ping);
+ }
+}
diff --git a/integration-tests/custom-serde/src/main/resources/application.properties b/integration-tests/custom-serde/src/main/resources/application.properties
new file mode 100644
index 0000000..c989477
--- /dev/null
+++ b/integration-tests/custom-serde/src/main/resources/application.properties
@@ -0,0 +1,5 @@
+kafkastreamsprocessor.input.topic=ping-events
+kafkastreamsprocessor.output.topic=pong-events
+quarkus.kafka-streams.bootstrap-servers=localhost:9092
+quarkus.kafka-streams.topics=ping-events,pong-events
+kafka-streams.producer.linger.ms=0
diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java
new file mode 100644
index 0000000..d9e223c
--- /dev/null
+++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeDeserializerTest.java
@@ -0,0 +1,42 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class CustomTypeDeserializerTest {
+
+ CustomTypeDeserializer deserializer = new CustomTypeDeserializer(new ObjectMapper());
+
+ @Test
+ public void testDeserialize() {
+ byte[] data = "{\"value\":11}".getBytes();
+
+ Object customType = deserializer.deserialize("topic", data);
+
+ assertThat(((CustomType) customType).getValue(), equalTo(1));
+ }
+
+}
diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java
new file mode 100644
index 0000000..ae2a9bc
--- /dev/null
+++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/CustomTypeSerializerTest.java
@@ -0,0 +1,42 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class CustomTypeSerializerTest {
+ CustomTypeSerializer serializer = new CustomTypeSerializer(new ObjectMapper());
+
+ @Test
+ public void testSerialize() {
+ CustomType customType = new CustomType(1);
+
+ byte[] serialized = serializer.serialize("topic", customType);
+
+ assertThat(new String(serialized, StandardCharsets.UTF_8), equalTo("{\"value\":11}"));
+ }
+}
diff --git a/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java
new file mode 100644
index 0000000..be7beb6
--- /dev/null
+++ b/integration-tests/custom-serde/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/PingProcessorQuarkusTest.java
@@ -0,0 +1,97 @@
+/*-
+ * #%L
+ * Quarkus Kafka Streams Processor
+ * %%
+ * Copyright (C) 2024 Amadeus s.a.s.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package io.quarkiverse.kafkastreamsprocessor.sample.customserde;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import jakarta.inject.Inject;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.awaitility.Durations;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+public class PingProcessorQuarkusTest {
+ @ConfigProperty(name = "kafka.bootstrap.servers")
+ String kafkaBootstrapServers;
+
+ String senderTopic = "ping-events";
+
+ String consumerTopic = "pong-events";
+
+ KafkaProducer producer;
+
+ KafkaConsumer consumer;
+
+ @Inject
+ CustomTypeSerde customTypeSerde;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ Map consumerProps = KafkaTestUtils.consumerProps(kafkaBootstrapServers, "test", "true");
+ consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), customTypeSerde.deserializer());
+ consumer.subscribe(List.of(consumerTopic));
+
+ Map producerProps = KafkaTestUtils.producerProps(kafkaBootstrapServers);
+ producer = new KafkaProducer<>(producerProps, new StringSerializer(), customTypeSerde.serializer());
+ }
+
+ @AfterEach
+ public void tearDown() {
+ producer.close();
+ consumer.close();
+ }
+
+ @Test
+ public void testCount() {
+ producer.send(new ProducerRecord<>(senderTopic, "1", new CustomType(1)));
+ producer.flush();
+ ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, consumerTopic,
+ Durations.FIVE_SECONDS);
+ assertThat(((CustomType) record.value()).getValue(), equalTo(1));
+ }
+
+ @Test
+ public void testHeaderError() {
+ producer.send(new ProducerRecord<>(senderTopic, 0, "1", new CustomType(1),
+ new RecordHeaders().add("custom-header", "error".getBytes(StandardCharsets.UTF_8))));
+ producer.flush();
+ assertThrows(IllegalStateException.class,
+ () -> KafkaTestUtils.getSingleRecord(consumer, consumerTopic, Durations.FIVE_SECONDS));
+ }
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index f32c922..2b85ec8 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -15,6 +15,7 @@
multioutput
simple
stateful
+ custom-serde
diff --git a/pom.xml b/pom.xml
index a8d01ab..a83a5ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
17
UTF-8
UTF-8
- 3.8.3
+ 3.8.6
3.24.1
@@ -98,37 +98,37 @@
LF
-
-
-
-
- org.codehaus.mojo
- license-maven-plugin
- 2.4.0
-
-
- add-license-header
-
- check-file-header
-
- process-sources
-
-
-
- false
- apache_v2
- 2024
- Amadeus s.a.s.
- Quarkus Kafka Streams Processor
-
- **/*.java
-
-
- **/*$$*.java
-
-
-
+
+
+
+ org.codehaus.mojo
+ license-maven-plugin
+ 2.4.0
+
+
+ add-license-header
+
+ check-file-header
+
+ process-sources
+
+
+
+ false
+ apache_v2
+ 2024
+ Amadeus s.a.s.
+ Quarkus Kafka Streams Processor
+
+ **/*.java
+
+
+ **/*$$*.java
+
+
+
+