Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use awaitility to read data in Kafka buffer tests to fix flakiness #4703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: |
echo 'KAFKA_VERSION=${{ matrix.kafka }}' > data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env
docker compose --project-directory data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/zookeeper --env-file data-prepper-plugins/kafka-plugins/src/integrationTest/resources/kafka/.env up -d
sleep 10
sleep 2
- name: Wait for Kafka
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void waitForKafka() {
try (AdminClient adminClient = AdminClient.create(props)) {
await().atMost(Duration.ofMinutes(3))
.pollDelay(Duration.ofSeconds(2))
.untilAsserted(() -> adminClient.listTopics().names().get());
.until(() -> adminClient.listTopics().names().get() != null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,6 +62,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

@ExtendWith(MockitoExtension.class)
Expand All @@ -74,8 +73,6 @@ public class KafkaBufferIT {

private KafkaBufferConfig kafkaBufferConfig;
@Mock
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private AcknowledgementSet acknowledgementSet;
Expand All @@ -95,9 +92,7 @@ void setUp() {
random = new Random();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer((a) -> {
return null;
}).when(acknowledgementSet).complete();
lenient().doAnswer((a) -> null).when(acknowledgementSet).complete();
lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

Expand Down Expand Up @@ -142,7 +137,7 @@ void write_and_read() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -183,7 +178,7 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep
Record<Event> record = createLargeRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -213,7 +208,7 @@ void writeBigJson_and_read() throws Exception {
inputJson += "]";
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -243,7 +238,7 @@ void writeMultipleSmallJson_and_read() throws Exception {
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);
}

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -273,7 +268,7 @@ void writeBytes_and_read() throws Exception {
final String key = UUID.randomUUID().toString();
objectUnderTest.writeBytes(bytes, key, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -398,7 +393,7 @@ void write_and_read_encrypted() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -526,7 +521,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;

@ExtendWith(MockitoExtension.class)
public class KafkaBuffer_KmsIT {
Expand Down Expand Up @@ -177,7 +178,7 @@ void write_and_read_encrypted() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -216,7 +217,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;

import static org.awaitility.Awaitility.await;

class ReadBufferHelper {
static Map.Entry<Collection<Record<Event>>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) {
final Map.Entry<Collection<Record<Event>>, CheckpointState>[] lastReadResult = new Map.Entry[1];
await()
.atMost(Duration.ofSeconds(30))
.until(() -> {
lastReadResult[0] = objectUnderTest.read(500);
return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1;
});
return lastReadResult[0];
}
}
Loading