Skip to content

Commit

Permalink
[4.x] Fix KafkaSeTest on Windows (helidon-io#8322)
Browse files Browse the repository at this point in the history
Signed-off-by: tvallin <[email protected]>
  • Loading branch information
tvallin authored and hrstoyanov committed Feb 23, 2024
1 parent 1411736 commit 7457478
Showing 1 changed file with 99 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
* Copyright (c) 2020, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,7 @@
import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel6;
import io.helidon.messaging.connectors.kafka.AbstractSampleBean.Channel8;

import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.IntegerDeserializer;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class KafkaSeTest extends AbstractKafkaTest {

private static final Duration TIMEOUT = Duration.of(45, ChronoUnit.SECONDS);
private static final String TEST_DQL_TOPIC = "test-dlq-topic";
private static final String TEST_DQL_TOPIC_1 = "test-dlq-topic-1";
private static final String TEST_SE_TOPIC_1 = "special-se-topic-1";
private static final String TEST_SE_TOPIC_2 = "special-se-topic-2";
private static final String TEST_SE_TOPIC_3 = "special-se-topic-3";
Expand Down Expand Up @@ -128,6 +130,10 @@ static void prepareTopics() {

@AfterAll
static void afterAll() {
List<String> topics = kafkaResource.getKafkaTestUtils().getTopics().stream()
.map(TopicListing::name)
.collect(Collectors.toList());
ADMIN.get().deleteTopics(topics);
nackHandlerLogLogger.removeHandler(testHandler);
kafkaResource.stopKafka();
}
Expand Down Expand Up @@ -553,125 +559,113 @@ void someEventsNoAckWithDifferentPartitions() {
@Test
void consumeKafkaDLQNackExplicitConf() {
kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC, 2, (short) 1);
try {
List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.property("nack-dlq.topic", TEST_DQL_TOPIC)
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(TEST_SE_TOPIC_8)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
TEST_SE_TOPIC_8,
"10"
);

assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));

List<ConsumerRecord<String, String>> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC);

assertThat(dlqRecords.size(), is(1));
ConsumerRecord<String, String> consumerRecord = dlqRecords.get(0);
Map<String, String> headersMap = Arrays.stream(consumerRecord.headers().toArray())
.collect(Collectors.toMap(Header::key, h -> new String(h.value())));
assertThat(consumerRecord.key(), is("3"));
assertThat(consumerRecord.value(), is("3"));
assertThat(headersMap.get("dlq-error"), is("java.lang.Exception"));
assertThat(headersMap.get("dlq-error-msg"), is("BOOM!"));
assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_8));
assertThat(headersMap.get("dlq-orig-offset"), is("3"));
assertThat(headersMap.get("dlq-orig-partition"), is("0"));
} finally {
ADMIN.get().deleteTopics(List.of(TEST_DQL_TOPIC));
}

List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.property("nack-dlq.topic", TEST_DQL_TOPIC)
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(TEST_SE_TOPIC_8)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
TEST_SE_TOPIC_8,
"10"
);

assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));

List<ConsumerRecord<String, String>> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC);

assertThat(dlqRecords.size(), is(1));
ConsumerRecord<String, String> consumerRecord = dlqRecords.get(0);
Map<String, String> headersMap = Arrays.stream(consumerRecord.headers().toArray())
.collect(Collectors.toMap(Header::key, h -> new String(h.value())));
assertThat(consumerRecord.key(), is("3"));
assertThat(consumerRecord.value(), is("3"));
assertThat(headersMap.get("dlq-error"), is("java.lang.Exception"));
assertThat(headersMap.get("dlq-error-msg"), is("BOOM!"));
assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_8));
assertThat(headersMap.get("dlq-orig-offset"), is("3"));
assertThat(headersMap.get("dlq-orig-partition"), is("0"));
}

@Test
void consumeKafkaDLQNackDerivedConf() {
kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC, 2, (short) 1);
try {
List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.property("nack-dlq", TEST_DQL_TOPIC)
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(TEST_SE_TOPIC_9)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
TEST_SE_TOPIC_9,
"10"
);

assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));

List<ConsumerRecord<String, String>> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC);

assertThat(dlqRecords.size(), is(1));
ConsumerRecord<String, String> consumerRecord = dlqRecords.get(0);
Map<String, String> headersMap = Arrays.stream(consumerRecord.headers().toArray())
.collect(Collectors.toMap(Header::key, h -> new String(h.value())));
assertThat(consumerRecord.key(), is("3"));
assertThat(consumerRecord.value(), is("3"));
assertThat(headersMap.get("dlq-error"), is("java.lang.Exception"));
assertThat(headersMap.get("dlq-error-msg"), is("BOOM!"));
assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_9));
assertThat(headersMap.get("dlq-orig-offset"), is("3"));
assertThat(headersMap.get("dlq-orig-partition"), is("0"));
} finally {
ADMIN.get().deleteTopics(List.of(TEST_DQL_TOPIC));
}
kafkaResource.getKafkaTestUtils().createTopic(TEST_DQL_TOPIC_1, 2, (short) 1);

List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.property("nack-dlq", TEST_DQL_TOPIC_1)
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(TEST_SE_TOPIC_9)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
TEST_SE_TOPIC_9,
"10"
);

assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));

List<ConsumerRecord<String, String>> dlqRecords = kafkaResource.consume(TEST_DQL_TOPIC_1);

assertThat(dlqRecords.size(), is(1));
ConsumerRecord<String, String> consumerRecord = dlqRecords.get(0);
Map<String, String> headersMap = Arrays.stream(consumerRecord.headers().toArray())
.collect(Collectors.toMap(Header::key, h -> new String(h.value())));
assertThat(consumerRecord.key(), is("3"));
assertThat(consumerRecord.value(), is("3"));
assertThat(headersMap.get("dlq-error"), is("java.lang.Exception"));
assertThat(headersMap.get("dlq-error-msg"), is("BOOM!"));
assertThat(headersMap.get("dlq-orig-topic"), is(TEST_SE_TOPIC_9));
assertThat(headersMap.get("dlq-orig-offset"), is("3"));
assertThat(headersMap.get("dlq-orig-partition"), is("0"));
}

@Test
void consumeKafkaKillChannelNack() {
var testTopic = "test-topic";
kafkaResource.getKafkaTestUtils().createTopic(testTopic, 2, (short) 1);
try {
List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(testTopic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
testTopic,
null // wait for channel being killed
);
assertThat(result, contains("0", "1", "2"));
} finally {
ADMIN.get().deleteTopics(List.of(testTopic));
}

List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.bootstrapServers(KAFKA_SERVER)
.groupId("test-group")
.topic(testTopic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
testTopic,
null // wait for channel being killed
);
assertThat(result, contains("0", "1", "2"));
}

@Test
void consumeKafkaLogOnlyNack() {
var testTopic = "test-topic";
var testTopic = "test-topic-1";
kafkaResource.getKafkaTestUtils().createTopic(testTopic, 2, (short) 1);
try {
List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.bootstrapServers(KAFKA_SERVER)
.property("nack-log-only", "true")
.groupId("test-group")
.topic(testTopic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
testTopic,
"10"
);
assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));
assertThat(logNackHandlerWarnings, hasItem("NACKED Message - ignored key: 3 topic: test-topic offset: 3 partition: 0"));
} finally {
ADMIN.get().deleteTopics(List.of(testTopic));
}

List<String> result = consumerWithNack(KafkaConnector.configBuilder()
.bootstrapServers(KAFKA_SERVER)
.property("nack-log-only", "true")
.groupId("test-group")
.topic(testTopic)
.autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
.enableAutoCommit(false)
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.build(),
testTopic,
"10"
);
assertThat(result, containsInAnyOrder("0", "1", "2", "4", "5", "6", "7", "8", "9", "10"));
assertThat(logNackHandlerWarnings, hasItem("NACKED Message - ignored key: 3 topic: test-topic-1 offset: 3 partition: 0"));
}

List<String> consumerWithNack(Config c, String topic, String lastExpectedValue) {
Expand Down

0 comments on commit 7457478

Please sign in to comment.