diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index e677ac15..0b966193 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -103,9 +103,13 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() { stringInputTopic.pipeInput("key", "error"); var resultDlq = dlqTopic.readValuesToList(); - var resultOutput = stringOutputTopic.readValuesToList(); - assertEquals(1, resultDlq.size()); + var record = resultDlq.get(0); + assertEquals(record.getApplicationId(), "test"); + assertEquals(record.getTopic(), "stringTopic"); + assertEquals(record.getValue(), "error"); + + var resultOutput = stringOutputTopic.readValuesToList(); assertEquals(0, resultOutput.size()); } diff --git a/kstreamplify-core/src/main/avro/kafka-error.avsc b/kstreamplify-core/src/main/avro/kafka-error.avsc index 81471ce1..e0be431b 100644 --- a/kstreamplify-core/src/main/avro/kafka-error.avsc +++ b/kstreamplify-core/src/main/avro/kafka-error.avsc @@ -48,6 +48,12 @@ "name": "topic", "type": "string", "doc": "Source topic of erroneous message" + }, + { + "name": "applicationId", + "type": ["null", "string"], + "doc": "Application id of the application that produced the erroneous message", + "default": null } ] } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index be9a2f67..cee94c5c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -58,7 +58,8 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, "An exception occurred during the stream internal deserialization") .setOffset(consumerRecord.offset()) .setPartition(consumerRecord.partition()) - .setTopic(consumerRecord.topic()); + .setTopic(consumerRecord.topic()) + .setApplicationId(processorContext.applicationId()); boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; // If the cause of this exception is a KafkaException and if getCause == sourceException diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index 931fd7f6..39932cdb 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -9,6 +9,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProductionExceptionHandler; /** @@ -58,7 +59,9 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord .setOffset(-1) .setPartition( producerRecord.partition() == null ? -1 : producerRecord.partition()) - .setTopic(producerRecord.topic()); + .setTopic(producerRecord.topic()) + .setApplicationId( + KafkaStreamsExecutionContext.getProperties().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)); producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), producerRecord.key(), builder.build())).get(); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java index fcce0bf0..76b01bf7 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java @@ -36,6 +36,7 @@ public void process(FixedKeyRecord> fixedKeyRecord) { recordMetadata != null && recordMetadata.topic() != null ? recordMetadata.topic() : "Outside topic context") .setValue(fixedKeyRecord.value().getKafkaRecord()) + .setApplicationId(context().applicationId()) .build(); context().forward(fixedKeyRecord.withValue(error)); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java index 4fb66504..25e3cf7e 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java @@ -45,6 +45,7 @@ void shouldCreateProcessingErrorFromAvroRecord() { .setPartition(1) .setTopic("Topic") .setValue("Value") + .setApplicationId("ApplicationId") .build(); ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); @@ -57,6 +58,7 @@ void shouldCreateProcessingErrorFromAvroRecord() { "offset": 1, "cause": "Cause", "topic": "Topic", + "applicationId": "ApplicationId", "value": "Value" }""", processingError.getKafkaRecord()); }