From 74574786c6ba55b746f54f505c37cec29a8a61fa Mon Sep 17 00:00:00 2001 From: Thibault Vallin Date: Mon, 5 Feb 2024 15:16:37 +0100 Subject: [PATCH] [4.x] Fix KafkaSeTest on Windows (#8322) Signed-off-by: tvallin --- .../connectors/kafka/KafkaSeTest.java | 204 +++++++++--------- 1 file changed, 99 insertions(+), 105 deletions(-) diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java index a2354a2f2e6..12bcde2a5df 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2023 Oracle and/or its affiliates. + * Copyright (c) 2020, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel6; import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel8; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -74,6 +75,7 @@ public class KafkaSeTest extends AbstractKafkaTest { private static final Duration TIMEOUT = Duration.of(45, ChronoUnit.SECONDS); private static final String TEST_DQL_TOPIC = "test-dlq-topic"; + private static final String TEST_DQL_TOPIC_1 = "test-dlq-topic-1"; private static final String TEST_SE_TOPIC_1 = "special-se-topic-1"; private static final String TEST_SE_TOPIC_2 = "special-se-topic-2"; private static final String TEST_SE_TOPIC_3 = "special-se-topic-3"; @@ -128,6 +130,10 @@ static void prepareTopics() { @AfterAll static void afterAll() { + List topics = kafkaResource.getKafkaTestUtils().getTopics().stream() + .map(TopicListing::name) + .collect(Collectors.toList()); + ADMIN.get().deleteTopics(topics); nackHandlerLogLogger.removeHandler(testHandler); kafkaResource.stopKafka(); } @@ -553,125 +559,113 @@ void someEventsNoAckWithDifferentPartitions() { @Test void consumeKafkaDLQNackExplicitConf() { kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC, 2, (short) 1); - try { - List result = consumerWithNack(KafkaConnector.configBuilder() - .property("nack-dlq.topic", TEST_DQL_TOPIC) - .bootstrapServers(KAFKA_SERVER) - .groupId("test-group") - .topic(TEST_SE_TOPIC_8) - .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) - .enableAutoCommit(false) - .keyDeserializer(StringDeserializer.class) - .valueDeserializer(StringDeserializer.class) - .build(), - TEST_SE_TOPIC_8, - "10" - ); - - assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); - - List> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC); - - assertThat(dlqRecords.size(), is(1)); - ConsumerRecord consumerRecord = dlqRecords.get(0); - Map headersMap = Arrays.stream(consumerRecord.headers().toArray()) - .collect(Collectors.toMap(Header::key, h -> new String(h.value()))); - assertThat(consumerRecord.key(), is("3")); - assertThat(consumerRecord.value(), is("3")); - assertThat(headersMap.get("dlq-error"), is("java.lang.Exception")); - assertThat(headersMap.get("dlq-error-msg"), is("BOOM!")); - assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_8)); - assertThat(headersMap.get("dlq-orig-offset"), is("3")); - assertThat(headersMap.get("dlq-orig-partition"), is("0")); - } finally { - ADMIN.get().deleteTopics(List.of(TEST_DQL_TOPIC)); - } + + List result = consumerWithNack(KafkaConnector.configBuilder() + .property("nack-dlq.topic", TEST_DQL_TOPIC) + .bootstrapServers(KAFKA_SERVER) + .groupId("test-group") + .topic(TEST_SE_TOPIC_8) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build(), + TEST_SE_TOPIC_8, + "10" + ); + + assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); + + List> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC); + + assertThat(dlqRecords.size(), is(1)); + ConsumerRecord consumerRecord = dlqRecords.get(0); + Map headersMap = Arrays.stream(consumerRecord.headers().toArray()) + .collect(Collectors.toMap(Header::key, h -> new String(h.value()))); + assertThat(consumerRecord.key(), is("3")); + assertThat(consumerRecord.value(), is("3")); + assertThat(headersMap.get("dlq-error"), is("java.lang.Exception")); + assertThat(headersMap.get("dlq-error-msg"), is("BOOM!")); + assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_8)); + assertThat(headersMap.get("dlq-orig-offset"), is("3")); + assertThat(headersMap.get("dlq-orig-partition"), is("0")); } @Test void consumeKafkaDLQNackDerivedConf() { - kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC, 2, (short) 1); - try { - List result = consumerWithNack(KafkaConnector.configBuilder() - .property("nack-dlq", TEST_DQL_TOPIC) - .bootstrapServers(KAFKA_SERVER) - .groupId("test-group") - .topic(TEST_SE_TOPIC_9) - .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) - .enableAutoCommit(false) - .keyDeserializer(StringDeserializer.class) - .valueDeserializer(StringDeserializer.class) - .build(), - TEST_SE_TOPIC_9, - "10" - ); - - assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); - - List> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC); - - assertThat(dlqRecords.size(), is(1)); - ConsumerRecord consumerRecord = dlqRecords.get(0); - Map headersMap = Arrays.stream(consumerRecord.headers().toArray()) - .collect(Collectors.toMap(Header::key, h -> new String(h.value()))); - assertThat(consumerRecord.key(), is("3")); - assertThat(consumerRecord.value(), is("3")); - assertThat(headersMap.get("dlq-error"), is("java.lang.Exception")); - assertThat(headersMap.get("dlq-error-msg"), is("BOOM!")); - assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_9)); - assertThat(headersMap.get("dlq-orig-offset"), is("3")); - assertThat(headersMap.get("dlq-orig-partition"), is("0")); - } finally { - ADMIN.get().deleteTopics(List.of(TEST_DQL_TOPIC)); - } + kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC_1, 2, (short) 1); + + List result = consumerWithNack(KafkaConnector.configBuilder() + .property("nack-dlq", TEST_DQL_TOPIC_1) + .bootstrapServers(KAFKA_SERVER) + .groupId("test-group") + .topic(TEST_SE_TOPIC_9) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build(), + TEST_SE_TOPIC_9, + "10" + ); + + assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); + + List> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC_1); + + assertThat(dlqRecords.size(), is(1)); + ConsumerRecord consumerRecord = dlqRecords.get(0); + Map headersMap = Arrays.stream(consumerRecord.headers().toArray()) + .collect(Collectors.toMap(Header::key, h -> new String(h.value()))); + assertThat(consumerRecord.key(), is("3")); + assertThat(consumerRecord.value(), is("3")); + assertThat(headersMap.get("dlq-error"), is("java.lang.Exception")); + assertThat(headersMap.get("dlq-error-msg"), is("BOOM!")); + assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_9)); + assertThat(headersMap.get("dlq-orig-offset"), is("3")); + assertThat(headersMap.get("dlq-orig-partition"), is("0")); } @Test void consumeKafkaKillChannelNack() { var testTopic = "test-topic"; kafkaResource.getKafkaTestUtils().createTopic(testTopic, 2, (short) 1); - try { - List result = consumerWithNack(KafkaConnector.configBuilder() - .bootstrapServers(KAFKA_SERVER) - .groupId("test-group") - .topic(testTopic) - .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) - .enableAutoCommit(false) - .keyDeserializer(StringDeserializer.class) - .valueDeserializer(StringDeserializer.class) - .build(), - testTopic, - null // wait for channel being killed - ); - assertThat(result, contains("0", "1", "2")); - } finally { - ADMIN.get().deleteTopics(List.of(testTopic)); - } + + List result = consumerWithNack(KafkaConnector.configBuilder() + .bootstrapServers(KAFKA_SERVER) + .groupId("test-group") + .topic(testTopic) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build(), + testTopic, + null // wait for channel being killed + ); + assertThat(result, contains("0", "1", "2")); } @Test void consumeKafkaLogOnlyNack() { - var testTopic = "test-topic"; + var testTopic = "test-topic-1"; kafkaResource.getKafkaTestUtils().createTopic(testTopic, 2, (short) 1); - try { - List result = consumerWithNack(KafkaConnector.configBuilder() - .bootstrapServers(KAFKA_SERVER) - .property("nack-log-only", "true") - .groupId("test-group") - .topic(testTopic) - .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) - .enableAutoCommit(false) - .keyDeserializer(StringDeserializer.class) - .valueDeserializer(StringDeserializer.class) - .build(), - testTopic, - "10" - ); - assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); - assertThat(logNackHandlerWarnings, hasItem("NACKED Message - ignored key: 3 topic: test-topic offset: 3 partition: 0")); - } finally { - ADMIN.get().deleteTopics(List.of(testTopic)); - } + + List result = consumerWithNack(KafkaConnector.configBuilder() + .bootstrapServers(KAFKA_SERVER) + .property("nack-log-only", "true") + .groupId("test-group") + .topic(testTopic) + .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST) + .enableAutoCommit(false) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .build(), + testTopic, + "10" + ); + assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10")); + assertThat(logNackHandlerWarnings, hasItem("NACKED Message - ignored key: 3 topic: test-topic-1 offset: 3 partition: 0")); } List consumerWithNack(Config c, String topic, String lastExpectedValue) {