From 28de499190226a72c37afbc6037f126c8306c96c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20Bigu=C3=A9?= Date: Tue, 12 Nov 2024 15:32:37 +0100 Subject: [PATCH 1/5] Add applicationId to KafkaError avro schema --- kstreamplify-core/src/main/avro/kafka-error.avsc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kstreamplify-core/src/main/avro/kafka-error.avsc b/kstreamplify-core/src/main/avro/kafka-error.avsc index 81471ce1..e0be431b 100644 --- a/kstreamplify-core/src/main/avro/kafka-error.avsc +++ b/kstreamplify-core/src/main/avro/kafka-error.avsc @@ -48,6 +48,12 @@ "name": "topic", "type": "string", "doc": "Source topic of erroneous message" + }, + { + "name": "applicationId", + "type": ["null", "string"], + "doc": "Application id of the application that produced the erroneous message", + "default": null } ] } From 8a7463462d6abf722d6491236e8d23a66b6406d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20Bigu=C3=A9?= Date: Tue, 12 Nov 2024 22:02:06 +0100 Subject: [PATCH 2/5] Fill applicationId when building KafkaError in ExceptionHandler and GenericErrorProcessor --- .../error/DlqDeserializationExceptionHandler.java | 3 ++- .../kstreamplify/error/DlqProductionExceptionHandler.java | 5 ++++- .../michelin/kstreamplify/error/GenericErrorProcessor.java | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index be9a2f67..cee94c5c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -58,7 +58,8 @@ public DeserializationHandlerResponse handle(ProcessorContext processorContext, "An exception occurred during the stream internal deserialization") .setOffset(consumerRecord.offset()) .setPartition(consumerRecord.partition()) - .setTopic(consumerRecord.topic()); + .setTopic(consumerRecord.topic()) + .setApplicationId(processorContext.applicationId()); boolean isCausedByKafka = consumptionException.getCause() instanceof KafkaException; // If the cause of this exception is a KafkaException and if getCause == sourceException diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index 931fd7f6..39932cdb 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -9,6 +9,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProductionExceptionHandler; /** @@ -58,7 +59,9 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord .setOffset(-1) .setPartition( producerRecord.partition() == null ? -1 : producerRecord.partition()) - .setTopic(producerRecord.topic()); + .setTopic(producerRecord.topic()) + .setApplicationId( + KafkaStreamsExecutionContext.getProperties().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)); producer.send(new ProducerRecord<>(KafkaStreamsExecutionContext.getDlqTopicName(), producerRecord.key(), builder.build())).get(); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java index fcce0bf0..76b01bf7 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/GenericErrorProcessor.java @@ -36,6 +36,7 @@ public void process(FixedKeyRecord> fixedKeyRecord) { recordMetadata != null && recordMetadata.topic() != null ? recordMetadata.topic() : "Outside topic context") .setValue(fixedKeyRecord.value().getKafkaRecord()) + .setApplicationId(context().applicationId()) .build(); context().forward(fixedKeyRecord.withValue(error)); From cc217b476c2a66ab3cdab4c14648538890844e4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20Bigu=C3=A9?= Date: Tue, 12 Nov 2024 22:32:49 +0100 Subject: [PATCH 3/5] In ProcessingErrorTest, add applicationId in KafkaError message --- .../com/michelin/kstreamplify/error/ProcessingErrorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java index 4fb66504..25e3cf7e 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java @@ -45,6 +45,7 @@ void shouldCreateProcessingErrorFromAvroRecord() { .setPartition(1) .setTopic("Topic") .setValue("Value") + .setApplicationId("ApplicationId") .build(); ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); @@ -57,6 +58,7 @@ void shouldCreateProcessingErrorFromAvroRecord() { "offset": 1, "cause": "Cause", "topic": "Topic", + "applicationId": "ApplicationId", "value": "Value" }""", processingError.getKafkaRecord()); } From b09cb923def5f74a3d272cae7d6514e10059be9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20Bigu=C3=A9?= Date: Wed, 13 Nov 2024 11:14:53 +0100 Subject: [PATCH 4/5] In TopologyErrorHandlerTest, add assertions to check for KafkaError fields --- .../michelin/kstreamplify/TopologyErrorHandlerTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index e677ac15..3d5e8e68 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -103,9 +103,13 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() { stringInputTopic.pipeInput("key", "error"); var resultDlq = dlqTopic.readValuesToList(); - var resultOutput = stringOutputTopic.readValuesToList(); - assertEquals(1, resultDlq.size()); + var record = resultDlq.getFirst(); + assertEquals(record.getApplicationId(), "test"); + assertEquals(record.getTopic(), "stringTopic"); + assertEquals(record.getValue(), "error"); + + var resultOutput = stringOutputTopic.readValuesToList(); assertEquals(0, resultOutput.size()); } From d0010b70be39dcdc68eba64728c147c6cae51869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20Bigu=C3=A9?= Date: Thu, 14 Nov 2024 11:35:46 +0100 Subject: [PATCH 5/5] Remove usage of getFirst that is only available for Java21 --- .../com/michelin/kstreamplify/TopologyErrorHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index 3d5e8e68..0b966193 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -104,7 +104,7 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() { var resultDlq = dlqTopic.readValuesToList(); assertEquals(1, resultDlq.size()); - var record = resultDlq.getFirst(); + var record = resultDlq.get(0); assertEquals(record.getApplicationId(), "test"); assertEquals(record.getTopic(), "stringTopic"); assertEquals(record.getValue(), "error");