From 47028f2e08b1a7997956dc3000e16b0bd5fae08f Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 2 Jul 2024 10:18:42 -0500 Subject: [PATCH 1/2] Use awaitility to read data in KafkaBufferIT to promote stability and speed of execution. Resolves #4168 Signed-off-by: David Venable --- .../kafka-plugin-integration-tests.yml | 2 +- .../plugins/kafka/buffer/KafkaBufferIT.java | 23 ++++++--------- .../kafka/buffer/KafkaBuffer_KmsIT.java | 5 ++-- .../kafka/buffer/ReadBufferHelper.java | 29 +++++++++++++++++++ 4 files changed, 42 insertions(+), 17 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java diff --git a/.github/workflows/kafka-plugin-integration-tests.yml b/.github/workflows/kafka-plugin-integration-tests.yml index 507ca1d435..72d6645d53 100644 --- a/.github/workflows/kafka-plugin-integration-tests.yml +++ b/.github/workflows/kafka-plugin-integration-tests.yml @@ -41,7 +41,7 @@ jobs: run: | echo 'KAFKA_VERSION=${{ matrix.kafka }}' > data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env docker compose --project-directory data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/zookeeper --env-file data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env up -d - sleep 10 + sleep 2 - name: Wait for Kafka run: | ./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 261008e050..b44355810b 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -25,12 +25,10 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; -import org.opensearch.dataprepper.model.codec.JsonDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +62,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; @ExtendWith(MockitoExtension.class) @@ -74,8 +73,6 @@ public class KafkaBufferIT { private KafkaBufferConfig kafkaBufferConfig; @Mock - private PluginFactory pluginFactory; - @Mock private AcknowledgementSetManager acknowledgementSetManager; @Mock private AcknowledgementSet acknowledgementSet; @@ -95,9 +92,7 @@ void setUp() { random = new Random(); acknowledgementSetManager = mock(AcknowledgementSetManager.class); acknowledgementSet = mock(AcknowledgementSet.class); - lenient().doAnswer((a) -> { - return null; - }).when(acknowledgementSet).complete(); + lenient().doAnswer((a) -> null).when(acknowledgementSet).complete(); lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @@ -142,7 +137,7 @@ void write_and_read() throws TimeoutException { Record record = createRecord(); objectUnderTest.write(record, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -183,7 +178,7 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep Record record = createLargeRecord(); objectUnderTest.write(record, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -213,7 +208,7 @@ void writeBigJson_and_read() throws Exception { inputJson += "]"; objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -243,7 +238,7 @@ void writeMultipleSmallJson_and_read() throws Exception { objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000); } - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -273,7 +268,7 @@ void writeBytes_and_read() throws Exception { final String key = UUID.randomUUID().toString(); objectUnderTest.writeBytes(bytes, key, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -398,7 +393,7 @@ void write_and_read_encrypted() throws TimeoutException { Record record = createRecord(); objectUnderTest.write(record, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -526,7 +521,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray()); - final Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java index 464d7966e3..8a57765efb 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead; @ExtendWith(MockitoExtension.class) public class KafkaBuffer_KmsIT { @@ -177,7 +178,7 @@ void write_and_read_encrypted() throws TimeoutException { Record record = createRecord(); objectUnderTest.write(record, 1_000); - Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); @@ -216,7 +217,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray()); - final Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java new file mode 100644 index 0000000000..7c325a18b6 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/ReadBufferHelper.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; + +import static org.awaitility.Awaitility.await; + +class ReadBufferHelper { + static Map.Entry>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) { + final Map.Entry>, CheckpointState>[] lastReadResult = new Map.Entry[1]; + await() + .atMost(Duration.ofSeconds(30)) + .until(() -> { + lastReadResult[0] = objectUnderTest.read(500); + return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1; + }); + return lastReadResult[0]; + } +} From 7ee3d34fd6411103741026b173d8a1dd693eb812 Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 2 Jul 2024 16:57:12 -0500 Subject: [PATCH 2/2] Slightly tighten the assertion for the KafkaStartIT. Signed-off-by: David Venable --- .../org/opensearch/dataprepper/plugins/kafka/KafkaStartIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/KafkaStartIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/KafkaStartIT.java index 1963f8910f..e8d6ce7000 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/KafkaStartIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/KafkaStartIT.java @@ -29,7 +29,7 @@ void waitForKafka() { try (AdminClient adminClient = AdminClient.create(props)) { await().atMost(Duration.ofMinutes(3)) .pollDelay(Duration.ofSeconds(2)) - .untilAsserted(() -> adminClient.listTopics().names().get()); + .until(() -> adminClient.listTopics().names().get() != null); } } }