From 9d450413e892bee102e5021a32191fa062844277 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 3 Oct 2022 15:43:31 +0100 Subject: [PATCH] Add cause exception type to DLQ headers Closes #1913 --- .../kafka/fault/KafkaDeadLetterQueue.java | 5 +++++ .../kafka/fault/BatchFailureHandlerTest.java | 8 ++++++++ .../kafka/fault/KafkaFailureHandlerTest.java | 20 ++++++++++++++++--- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index 3db075700b..f2f056282a 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -35,6 +35,9 @@ @SuppressWarnings({ "rawtypes", "unchecked" }) public class KafkaDeadLetterQueue implements KafkaFailureHandler { + public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name"; + public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name"; + public static final String DEAD_LETTER_REASON = "dead-letter-reason"; public static final String DEAD_LETTER_CAUSE = "dead-letter-cause"; public static final String DEAD_LETTER_TOPIC = "dead-letter-topic"; @@ -140,8 +143,10 @@ public Uni handle(IncomingKafkaRecord record, Throwable reaso ProducerRecord dead = new ProducerRecord<>(topic, partition, key, record.getPayload()); + addHeader(dead, DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName()); addHeader(dead, DEAD_LETTER_REASON, getThrowableMessage(reason)); if (reason.getCause() != null) { + addHeader(dead, DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName()); addHeader(dead, DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause())); } addHeader(dead, DEAD_LETTER_TOPIC, record.getTopic()); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/BatchFailureHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/BatchFailureHandlerTest.java index 6e1028f63c..68fa9f29f1 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/BatchFailureHandlerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/BatchFailureHandlerTest.java @@ -1,6 +1,8 @@ package io.smallrye.reactive.messaging.kafka.fault; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE; +import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE_CLASS_NAME; +import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_EXCEPTION_CLASS_NAME; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_OFFSET; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_PARTITION; import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_REASON; @@ -131,8 +133,11 @@ public void testDeadLetterQueueStrategyWithBatchRecords() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo(dqTopic); assertThat(r.value()).isIn(bean.nacked()); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalArgumentException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -"); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn(dlqOffsets); @@ -166,8 +171,11 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo(dqTopic); assertThat(r.value()).isIn(bean.nacked()); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalArgumentException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -"); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn(dlqOffsets); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java index 19208a8fec..b1eacb7136 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java @@ -118,8 +118,11 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo("dead-letter-topic-kafka"); assertThat(r.value()).isIn(3, 6, 9); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalArgumentException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -"); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo("dead-letter-default"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9"); @@ -140,7 +143,7 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() { MyReceiverBean bean = runApplication( getDeadLetterQueueWithCustomConfig(topic, dqTopic), MyReceiverBean.class); - bean.setToThrowable(p -> new IllegalArgumentException()); + bean.setToThrowable(p -> new IllegalStateException(new NullPointerException("msg"))); await().until(this::isReady); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); @@ -152,9 +155,14 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo(dqTopic); assertThat(r.value()).isIn(3, 6, 9); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalStateException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())) - .isEqualTo(new IllegalArgumentException().toString()); - assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + .isEqualTo(new NullPointerException() + ": msg"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_CAUSE).value())) + .isEqualTo("msg"); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME).value())) + .isEqualTo(NullPointerException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9"); @@ -186,8 +194,11 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo(dqTopic); assertThat(r.value()).isIn(3, 6, 9); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalArgumentException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -"); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9"); @@ -220,8 +231,11 @@ public void testDeadLetterQueueStrategyWithInterceptor() { assertThat(records.getRecords()).allSatisfy(r -> { assertThat(r.topic()).isEqualTo(dqTopic); assertThat(r.value()).isIn(3, 6, 9); + assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value())) + .isEqualTo(IllegalArgumentException.class.getName()); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -"); assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull(); + assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull(); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0"); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic); assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9");