Skip to content

Commit

Permalink
Add cause exception type to DLQ headers
Browse files Browse the repository at this point in the history
Closes #1913
  • Loading branch information
ozangunalp committed Oct 5, 2022
1 parent 1eb5a6d commit 9d45041
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class KafkaDeadLetterQueue implements KafkaFailureHandler {

public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";

public static final String DEAD_LETTER_REASON = "dead-letter-reason";
public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
Expand Down Expand Up @@ -140,8 +143,10 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso

ProducerRecord<K, V> dead = new ProducerRecord<>(topic, partition, key, record.getPayload());

addHeader(dead, DEAD_LETTER_EXCEPTION_CLASS_NAME, reason.getClass().getName());
addHeader(dead, DEAD_LETTER_REASON, getThrowableMessage(reason));
if (reason.getCause() != null) {
addHeader(dead, DEAD_LETTER_CAUSE_CLASS_NAME, reason.getCause().getClass().getName());
addHeader(dead, DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause()));
}
addHeader(dead, DEAD_LETTER_TOPIC, record.getTopic());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE_CLASS_NAME;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_EXCEPTION_CLASS_NAME;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_OFFSET;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_PARTITION;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_REASON;
Expand Down Expand Up @@ -131,8 +133,11 @@ public void testDeadLetterQueueStrategyWithBatchRecords() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(bean.nacked());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn(dlqOffsets);
Expand Down Expand Up @@ -166,8 +171,11 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(bean.nacked());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn(dlqOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo("dead-letter-topic-kafka");
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo("dead-letter-default");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9");
Expand All @@ -140,7 +143,7 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() {
MyReceiverBean bean = runApplication(
getDeadLetterQueueWithCustomConfig(topic, dqTopic),
MyReceiverBean.class);
bean.setToThrowable(p -> new IllegalArgumentException());
bean.setToThrowable(p -> new IllegalStateException(new NullPointerException("msg")));
await().until(this::isReady);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
Expand All @@ -152,9 +155,14 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalStateException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value()))
.isEqualTo(new IllegalArgumentException().toString());
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
.isEqualTo(new NullPointerException() + ": msg");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_CAUSE).value()))
.isEqualTo("msg");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME).value()))
.isEqualTo(NullPointerException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9");
Expand Down Expand Up @@ -186,8 +194,11 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9");
Expand Down Expand Up @@ -220,8 +231,11 @@ public void testDeadLetterQueueStrategyWithInterceptor() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION_CLASS_NAME).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE_CLASS_NAME)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_OFFSET).value())).isNotNull().isIn("3", "6", "9");
Expand Down

0 comments on commit 9d45041

Please sign in to comment.