Skip to content

Commit

Permalink
Add cause exception type to DLQ headers
Browse files Browse the repository at this point in the history
Closes #1913
  • Loading branch information
ozangunalp committed Oct 3, 2022
1 parent 1eb5a6d commit 655295a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class KafkaDeadLetterQueue implements KafkaFailureHandler {

public static final String DEAD_LETTER_EXCEPTION = "dead-letter-exception";
public static final String DEAD_LETTER_REASON = "dead-letter-reason";
public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
Expand Down Expand Up @@ -140,6 +141,7 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso

ProducerRecord<K, V> dead = new ProducerRecord<>(topic, partition, key, record.getPayload());

addHeader(dead, DEAD_LETTER_EXCEPTION, reason.getClass().getName());
addHeader(dead, DEAD_LETTER_REASON, getThrowableMessage(reason));
if (reason.getCause() != null) {
addHeader(dead, DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_CAUSE;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_EXCEPTION;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_OFFSET;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_PARTITION;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.DEAD_LETTER_REASON;
Expand Down Expand Up @@ -131,6 +132,8 @@ public void testDeadLetterQueueStrategyWithBatchRecords() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(bean.nacked());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
Expand Down Expand Up @@ -166,6 +169,8 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(bean.nacked());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack all -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo("dead-letter-topic-kafka");
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
Expand All @@ -140,7 +142,7 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() {
MyReceiverBean bean = runApplication(
getDeadLetterQueueWithCustomConfig(topic, dqTopic),
MyReceiverBean.class);
bean.setToThrowable(p -> new IllegalArgumentException());
bean.setToThrowable(p -> new IllegalStateException());
await().until(this::isReady);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);
Expand All @@ -152,8 +154,10 @@ public void testDeadLetterQueueStrategyWithMessageLessThrowable() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalStateException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value()))
.isEqualTo(new IllegalArgumentException().toString());
.isEqualTo(new IllegalStateException().toString());
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_TOPIC).value())).isEqualTo(topic);
Expand Down Expand Up @@ -186,6 +190,8 @@ public void testDeadLetterQueueStrategyWithCustomTopicAndMethodUsingPayload() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
Expand Down Expand Up @@ -220,6 +226,8 @@ public void testDeadLetterQueueStrategyWithInterceptor() {
assertThat(records.getRecords()).allSatisfy(r -> {
assertThat(r.topic()).isEqualTo(dqTopic);
assertThat(r.value()).isIn(3, 6, 9);
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_EXCEPTION).value()))
.isEqualTo(IllegalArgumentException.class.getName());
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_REASON).value())).startsWith("nack 3 -");
assertThat(r.headers().lastHeader(DEAD_LETTER_CAUSE)).isNull();
assertThat(new String(r.headers().lastHeader(DEAD_LETTER_PARTITION).value())).isEqualTo("0");
Expand Down

0 comments on commit 655295a

Please sign in to comment.