wrappedProcessor = processor;
+ for (AbstractProcessorDecorator decorator : processorDecorators) {
+ decorator.setDelegate(wrappedProcessor);
+ wrappedProcessor = decorator;
+ }
+ return wrappedProcessor;
}
private static boolean hasAnnotation(Object bean, Class extends Annotation> annotation) {
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
index fc92484..7781526 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecorator.java
@@ -20,17 +20,16 @@
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;
import jakarta.annotation.Priority;
-import jakarta.decorator.Decorator;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
-import lombok.AccessLevel;
-import lombok.RequiredArgsConstructor;
/**
* This class is responsible to manage the lifecycle of {@link jakarta.enterprise.context.RequestScoped} beans. It
@@ -41,16 +40,11 @@
*
* Warning: "Quarkus Tests" Junit extension is already managing the request scope on its own.
*/
-@Decorator
+//@Decorator
+@Dependent
@Priority(ProcessorDecoratorPriorities.CDI_REQUEST_SCOPE)
-@RequiredArgsConstructor(access = AccessLevel.MODULE)
-public class CdiRequestContextDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+//@RequiredArgsConstructor(access = AccessLevel.MODULE)
+public class CdiRequestContextDecorator extends AbstractProcessorDecorator {
/**
* The container object from Arc to inquire on request contextualization availability and activation
*/
@@ -58,13 +52,14 @@ public class CdiRequestContextDecorator implements Process
/**
* Constructor for injection of the delegate.
- *
- * @param delegate
- * injection point for composition
*/
@Inject
- public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor delegate) {
- this(delegate, Arc.container());
+ public CdiRequestContextDecorator() {
+ this(Arc.container());
+ }
+
+ public CdiRequestContextDecorator(ArcContainer container) {
+ this.container = container;
}
/**
@@ -74,20 +69,16 @@ public CdiRequestContextDecorator(@jakarta.decorator.Delegate Processor record) {
+ public void process(Record record) {
if (container.requestContext().isActive()) {
- delegate.process(record);
+ getDelegate().process(record);
} else {
container.requestContext().activate();
try {
- delegate.process(record);
+ getDelegate().process(record);
} finally {
container.requestContext().terminate();
}
}
}
-
- private interface Excludes {
- void process(Record record);
- }
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
index a13b62d..f3a6eaf 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java
@@ -23,8 +23,8 @@
import java.util.Set;
import jakarta.annotation.Priority;
-import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.common.KafkaException;
@@ -37,6 +37,7 @@
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer;
import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler;
@@ -53,14 +54,10 @@
* Uses a dead-letter sink from the topology, rather than a raw producer, to benefit from the same KStreams guarantees
* (at least once / exactly once).
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.DLQ)
-public class DlqDecorator implements Processor {
- /**
- * Inject point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
+@Dependent
+public class DlqDecorator extends AbstractProcessorDecorator {
/**
* A set of sink names that are involved in the business logic.
@@ -89,11 +86,10 @@ public class DlqDecorator implements Processor context;
+ private ProcessorContext context;
- DlqDecorator(Processor delegate, Set functionalSinks, DlqMetadataHandler dlqMetadataHandler,
+ DlqDecorator(Set functionalSinks, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics, boolean activated) {
- this.delegate = delegate;
this.functionalSinks = functionalSinks;
this.dlqMetadataHandler = dlqMetadataHandler;
this.metrics = metrics;
@@ -103,8 +99,6 @@ public class DlqDecorator implements Processor implements Processor delegate,
+ public DlqDecorator(
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler,
KafkaStreamsProcessorMetrics metrics,
KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config
- this(delegate, sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
+ this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
kStreamsProcessorConfig.dlq().topic()));
}
@@ -135,12 +129,12 @@ public DlqDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext context) {
if (activated) {
- this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks);
- delegate.init(this.context);
+ this.context = new DlqProcessorContextDecorator<>((InternalProcessorContext) context, functionalSinks);
+ getDelegate().init(this.context);
} else {
- delegate.init(context);
+ getDelegate().init(context);
}
}
@@ -154,10 +148,10 @@ public void init(final ProcessorContext context) {
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
if (activated) {
try {
- delegate.process(record);
+ getDelegate().process(record);
} catch (KafkaException e) {
// Do not forward to DLQ
throw e;
@@ -166,14 +160,14 @@ public void process(Record record) {
if (recordMetadata.isPresent()) {
dlqMetadataHandler.addMetadata(record.headers(), recordMetadata.get().topic(),
recordMetadata.get().partition(), e);
- context.forward((Record) record, TopologyProducer.DLQ_SINK_NAME);
+ context.forward(record, TopologyProducer.DLQ_SINK_NAME);
// Re-throw so the exception gets logged
metrics.microserviceDlqSentCounter().increment();
throw e;
}
}
} else {
- delegate.process(record);
+ getDelegate().process(record);
}
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
index 0e73b00..3f44f62 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecorator.java
@@ -21,27 +21,23 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
/**
* Decorator to enrich Kafka Streams metrics with a counter of exception raised by {@link Processor#process(Record)}.
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.METRICS)
-public class MetricsDecorator implements Processor {
- /**
- * Injection point for composition.
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class MetricsDecorator extends AbstractProcessorDecorator {
/**
* Counter of exception raised by {@link Processor#process(Record)}.
*/
@@ -50,15 +46,11 @@ public class MetricsDecorator implements Processor delegate,
- KafkaStreamsProcessorMetrics metrics) {
- this.delegate = delegate;
+ public MetricsDecorator(KafkaStreamsProcessorMetrics metrics) {
this.metrics = metrics;
}
@@ -70,9 +62,9 @@ public MetricsDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
try {
- delegate.process(record);
+ getDelegate().process(record);
} catch (Exception e) { // NOSONAR: Catching any error
metrics.processorErrorCounter().increment();
throw e;
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
index 5e1c1e0..cc4fa6c 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecorator.java
@@ -24,6 +24,7 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@@ -34,6 +35,7 @@
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.punctuator.DecoratedPunctuator;
import lombok.RequiredArgsConstructor;
@@ -44,15 +46,10 @@
*
* @see PunctuatorDecorationProcessorContextDecorator
*/
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION)
-public class PunctuatorDecorationProcessorDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class PunctuatorDecorationProcessorDecorator extends AbstractProcessorDecorator {
/**
* List of all the {@link Punctuator} decorators defined in this library and potential extensions made with the API.
*/
@@ -61,16 +58,13 @@ public class PunctuatorDecorationProcessorDecorator implem
/**
* Injection constructor.
*
- * @param delegate
- * injection point for composition
* @param decoratedPunctuators
* the list of all {@link Punctuator} decorators defined in this library and potential extensions made with
* the API.
*/
@Inject
- public PunctuatorDecorationProcessorDecorator(@Delegate Processor delegate,
+ public PunctuatorDecorationProcessorDecorator(
Instance decoratedPunctuators) {
- this.delegate = delegate;
this.decoratedPunctuators = decoratedPunctuators;
}
@@ -84,8 +78,8 @@ public PunctuatorDecorationProcessorDecorator(@Delegate Processor context) {
- delegate.init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context,
+ public void init(ProcessorContext context) {
+ getDelegate().init(new PunctuatorDecorationProcessorContextDecorator<>((InternalProcessorContext) context,
decoratedPunctuators));
}
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
index 92d75a7..5b79a55 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/RetryDecorator.java
@@ -21,13 +21,14 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.eclipse.microprofile.faulttolerance.Retry;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import lombok.extern.slf4j.Slf4j;
@@ -35,15 +36,10 @@
* Decorate a {@link Processor#process} with the {@link Retry} fault tolerance annotation.
*/
@Slf4j
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.RETRY)
-public class RetryDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class RetryDecorator extends AbstractProcessorDecorator {
/**
* The delegate object that has the processor method with the {@link Retry} annotation.
*
@@ -58,15 +54,12 @@ public class RetryDecorator implements Processor delegate,
+ public RetryDecorator(
RetryDecoratorDelegate retryDecoratorDelegate) {
- this.delegate = delegate;
this.retryDecoratorDelegate = retryDecoratorDelegate;
}
@@ -78,9 +71,9 @@ public RetryDecorator(@Delegate Processor delegate,
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
try {
- retryDecoratorDelegate.retryableProcess(delegate, record);
+ retryDecoratorDelegate.retryableProcess(getDelegate(), record);
} catch (RuntimeException e) {
log.info("An exception that has been raised by the processor will not be retried.\n"
+ "Possible causes:\n"
diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
index 5c05333..5597a68 100644
--- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
+++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java
@@ -31,7 +31,7 @@
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
+import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.common.KafkaException;
@@ -54,13 +54,12 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter;
-import lombok.AccessLevel;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -72,16 +71,10 @@
* {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}, which executes on the polling thread.
*/
@Slf4j
-@Decorator
+//@Decorator
@Priority(ProcessorDecoratorPriorities.TRACING)
-@RequiredArgsConstructor(access = AccessLevel.MODULE)
-public class TracingDecorator implements Processor {
- /**
- * Injection point for composition
- */
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
+@Dependent
+public class TracingDecorator extends AbstractProcessorDecorator {
/**
* The {@link OpenTelemetry} configured by Quarkus
*/
@@ -115,13 +108,11 @@ public class TracingDecorator implements Processor context;
+ private ProcessorContext context;
/**
* Injection constructor.
*
- * @param delegate
- * injection point for composition
* @param openTelemetry
* The {@link OpenTelemetry} configured by Quarkus
* @param textMapGetter
@@ -134,14 +125,24 @@ public class TracingDecorator implements Processor delegate, OpenTelemetry openTelemetry,
+ public TracingDecorator(OpenTelemetry openTelemetry,
KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer,
TopologyConfigurationImpl configuration) {
- this(delegate, openTelemetry, textMapGetter, textMapSetter, tracer,
+ this(openTelemetry, textMapGetter, textMapSetter, tracer,
configuration.getProcessorPayloadType().getName(),
JsonFormat.printer());
}
+ public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter,
+ Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) {
+ this.openTelemetry = openTelemetry;
+ this.textMapGetter = textMapGetter;
+ this.textMapSetter = textMapSetter;
+ this.tracer = tracer;
+ this.applicationName = applicationName;
+ this.jsonPrinter = jsonPrinter;
+ }
+
/**
* Init just to capture the reference to {@link ProcessorContext}.
*
@@ -150,8 +151,8 @@ public TracingDecorator(@Delegate Processor delegate, Open
* {@inheritDoc}
*/
@Override
- public void init(final ProcessorContext context) {
- delegate.init(context);
+ public void init(final ProcessorContext context) {
+ getDelegate().init(context);
this.context = context;
}
@@ -164,7 +165,7 @@ public void init(final ProcessorContext context) {
* {@inheritDoc}
*/
@Override
- public void process(Record record) {
+ public void process(Record record) {
SpanBuilder spanBuilder = tracer.spanBuilder(applicationName);
final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
@@ -194,7 +195,7 @@ public void process(Record record) {
// the headers in the incoming message so when an outgoing message is produced with the copied
// header values it already has the span id from this new child span
propagator.inject(Context.current(), record.headers(), textMapSetter);
- delegate.process(record);
+ getDelegate().process(record);
span.setStatus(StatusCode.OK);
} catch (KafkaException e) {
// we got a Kafka exception, we record the exception in the span, log but rethrow the exception
@@ -216,7 +217,7 @@ public void process(Record record) {
}
}
- void logInputMessageMetadata(Record record) {
+ void logInputMessageMetadata(Record record) {
if (log.isDebugEnabled()) {
Map headers = toMap(record.headers());
LoggedRecord.LoggedRecordBuilder builder = LoggedRecord.builder()
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
index 1f371a8..8c75299 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/CdiRequestContextDecoratorTest.java
@@ -41,7 +41,7 @@
@ExtendWith(MockitoExtension.class)
class CdiRequestContextDecoratorTest {
- CdiRequestContextDecorator decorator;
+ CdiRequestContextDecorator decorator;
@Mock
ArcContainer container;
@@ -54,8 +54,8 @@ class CdiRequestContextDecoratorTest {
@BeforeEach
public void setup() {
- when(container.requestContext()).thenReturn(requestContext);
- decorator = new CdiRequestContextDecorator<>(processor, container);
+ decorator = new CdiRequestContextDecorator(container);
+ decorator.setDelegate(processor);
}
@Test
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
index 6c6059a..b0c3074 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecoratorTest.java
@@ -62,7 +62,7 @@ public class DlqDecoratorTest {
private static final String RECORD_KEY = "key";
private static final String RECORD_VALUE = "value";
- DlqDecorator decorator;
+ DlqDecorator decorator;
DlqProcessorContextDecorator contextDecorator;
@@ -86,7 +86,8 @@ public class DlqDecoratorTest {
@BeforeEach
public void setUp() {
- decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true);
+ decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, true);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(context);
headers = new RecordHeaders();
record = new Record<>(RECORD_KEY, RECORD_VALUE, 0L, headers);
@@ -131,7 +132,8 @@ public void shouldForwardKeyValueToAllSinks() {
@Test
public void shouldDoNothingIfDeactivated() {
- decorator = new DlqDecorator<>(kafkaProcessor, Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false);
+ decorator = new DlqDecorator(Set.of(FUNCTIONAL_SINK), dlqMetadataHandler, metrics, false);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(context);
decorator.process(record);
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
index f8ce1a2..bbef50b 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/MetricsDecoratorTest.java
@@ -48,14 +48,15 @@ public class MetricsDecoratorTest {
Ping inputMessage = Ping.newBuilder().setMessage("message").build();
- MetricsDecorator processorDecorator;
+ MetricsDecorator processorDecorator;
@Mock
ArcContainer arcContainer;
@BeforeEach
public void setUp() {
- processorDecorator = new MetricsDecorator<>(kafkaProcessor, metrics);
+ processorDecorator = new MetricsDecorator(metrics);
+ processorDecorator.setDelegate(kafkaProcessor);
}
@Test
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
index 5187037..d97b9f2 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/PunctuatorDecorationProcessorDecoratorTest.java
@@ -68,9 +68,8 @@ public void process(Record record) {
}
};
- PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator<>(
- processor,
- decoratedPunctuators);
+ PunctuatorDecorationProcessorDecorator decorator = new PunctuatorDecorationProcessorDecorator(decoratedPunctuators);
+ decorator.setDelegate(processor);
decorator.init(context);
decorator.process(new Record<>("blabla", PingMessage.Ping.newBuilder().setMessage("blabla").build(), 0L, null));
decorator.close();
diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
index 930b32f..6e137cc 100644
--- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
+++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java
@@ -114,7 +114,7 @@ public class TracingDecoratorTest {
@RegisterExtension
static final OpenTelemetryExtension otel = OpenTelemetryExtension.create();
- TracingDecorator decorator;
+ TracingDecorator decorator;
@Mock
JsonFormat.Printer jsonPrinter;
@@ -140,8 +140,9 @@ public void setUp() {
rootLogger.addHandler(inMemoryLogHandler);
rootLogger.setLevel(Level.DEBUG);
when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class);
- decorator = new TracingDecorator<>(kafkaProcessor, otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter,
+ decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter,
tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);
}
@@ -206,8 +207,9 @@ public void shouldCleanMDCAndScopeInCaseOfException() {
.setMessage("blabla")
.build(), 0L, headers);
- decorator = new TracingDecorator<>(new ThrowExceptionProcessor(), otel.getOpenTelemetry(), kafkaTextMapGetter,
+ decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter,
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(new ThrowExceptionProcessor());
decorator.init(processorContext);
assertDoesNotThrow(() -> decorator.process(record));
@@ -307,9 +309,9 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable {
void shouldLogRawToStringValueIfNotProtobuf() throws Throwable {
Processor kafkaProcessor = mock(Processor.class);
ProcessorContext processorContext = mock(ProcessorContext.class);
- TracingDecorator decorator = new TracingDecorator<>(
- kafkaProcessor, GlobalOpenTelemetry.get(), kafkaTextMapGetter,
+ TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter,
kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter);
+ decorator.setDelegate(kafkaProcessor);
decorator.init(processorContext);
RuntimeException exception = new TestException();
@@ -334,9 +336,10 @@ void shouldPropagateOpentelemetryW3CBaggage() {
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(),
W3CBaggagePropagator.getInstance())))
.build()) {
- decorator = new TracingDecorator<>(new LogOpentelemetryBaggageProcessor(), openTelemetryWithBaggageSdk,
+ decorator = new TracingDecorator(openTelemetryWithBaggageSdk,
kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME,
jsonPrinter);
+ decorator.setDelegate(new LogOpentelemetryBaggageProcessor());
decorator.init(processorContext);
decorator.process(record);
diff --git a/integration-tests/custom-serde/pom.xml b/integration-tests/custom-serde/pom.xml
index 5b09ec0..196fe9b 100644
--- a/integration-tests/custom-serde/pom.xml
+++ b/integration-tests/custom-serde/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java
index 09889dc..ea5507f 100644
--- a/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java
+++ b/integration-tests/custom-serde/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/customserde/HeaderDecorator.java
@@ -22,29 +22,19 @@
import java.nio.charset.StandardCharsets;
import jakarta.annotation.Priority;
-import jakarta.decorator.Decorator;
-import jakarta.decorator.Delegate;
-import jakarta.inject.Inject;
+import jakarta.enterprise.context.Dependent;
import org.apache.kafka.common.header.Header;
-import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
+import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
-@Decorator
+@Dependent
@Priority(ProcessorDecoratorPriorities.PUNCTUATOR_DECORATION + 2)
-public class HeaderDecorator implements Processor {
- @lombok.experimental.Delegate(excludes = Excludes.class)
- private final Processor delegate;
-
- @Inject
- public HeaderDecorator(@Delegate Processor delegate) {
- this.delegate = delegate;
- }
-
+public class HeaderDecorator extends AbstractProcessorDecorator {
@Override
- public void process(Record record) {
+ public void process(Record record) {
Header header = record.headers().lastHeader("custom-header");
if (header != null) {
String value = new String(header.value(), StandardCharsets.UTF_8);
@@ -52,11 +42,6 @@ public void process(Record record) {
throw new IllegalStateException("Error in header");
}
}
- delegate.process(record);
+ getDelegate().process(record);
}
-
- private interface Excludes {
- void process(Record record);
- }
-
}
diff --git a/integration-tests/json-pojo/pom.xml b/integration-tests/json-pojo/pom.xml
index 0f7e5b1..e49c196 100644
--- a/integration-tests/json-pojo/pom.xml
+++ b/integration-tests/json-pojo/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/kafka-to-rest/pom.xml b/integration-tests/kafka-to-rest/pom.xml
index 74eb191..78d1255 100644
--- a/integration-tests/kafka-to-rest/pom.xml
+++ b/integration-tests/kafka-to-rest/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/multioutput/pom.xml b/integration-tests/multioutput/pom.xml
index ec88456..f435e26 100644
--- a/integration-tests/multioutput/pom.xml
+++ b/integration-tests/multioutput/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 819c521..f5edae2 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
quarkus-kafka-streams-processor-integration-tests
pom
diff --git a/integration-tests/protobuf-binding/pom.xml b/integration-tests/protobuf-binding/pom.xml
index f66dc7c..9f48606 100644
--- a/integration-tests/protobuf-binding/pom.xml
+++ b/integration-tests/protobuf-binding/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/simple/pom.xml b/integration-tests/simple/pom.xml
index fc5584f..9cded7b 100644
--- a/integration-tests/simple/pom.xml
+++ b/integration-tests/simple/pom.xml
@@ -3,7 +3,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/stateful/pom.xml b/integration-tests/stateful/pom.xml
index a3388ed..520baf6 100644
--- a/integration-tests/stateful/pom.xml
+++ b/integration-tests/stateful/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-integration-tests
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index 37c54d4..8bd6740 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
pom
bom
@@ -96,37 +96,37 @@
LF
-
-
-
-
- org.codehaus.mojo
- license-maven-plugin
- 2.4.0
-
-
- add-license-header
-
- check-file-header
-
- process-sources
-
-
-
- false
- apache_v2
- 2024
- Amadeus s.a.s.
- Quarkus Kafka Streams Processor
-
- **/*.java
-
-
- **/*$$*.java
-
-
-
+
+
+
+ org.codehaus.mojo
+ license-maven-plugin
+ 2.4.0
+
+
+ add-license-header
+
+ check-file-header
+
+ process-sources
+
+
+
+ false
+ apache_v2
+ 2024
+ Amadeus s.a.s.
+ Quarkus Kafka Streams Processor
+
+ **/*.java
+
+
+ **/*$$*.java
+
+
+
+
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 64ed647..7096e7c 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
quarkus-kafka-streams-processor
diff --git a/spi/pom.xml b/spi/pom.xml
index a4233bb..3a657b3 100644
--- a/spi/pom.xml
+++ b/spi/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
quarkus-kafka-streams-processor-spi
diff --git a/test-framework/pom.xml b/test-framework/pom.xml
index a0d000b..6abde26 100644
--- a/test-framework/pom.xml
+++ b/test-framework/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
quarkus-kafka-streams-processor-test-framework
diff --git a/text-map-accessors/pom.xml b/text-map-accessors/pom.xml
index a107006..f8b04ad 100644
--- a/text-map-accessors/pom.xml
+++ b/text-map-accessors/pom.xml
@@ -4,7 +4,7 @@
io.quarkiverse.kafkastreamsprocessor
quarkus-kafka-streams-processor-parent
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
quarkus-kafka-streams-processor-text-map-accessors