Skip to content

Commit

Permalink
Use awaitility to read data in KafkaBufferIT to promote stability and…
Browse files Browse the repository at this point in the history
… speed of execution. Resolves opensearch-project#4168

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Jul 2, 2024
1 parent 5972867 commit aa7910f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,6 +62,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -536,17 +535,6 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti
}
}

private static Map.Entry<Collection<Record<Event>>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) {
final Map.Entry<Collection<Record<Event>>, CheckpointState>[] lastReadResult = new Map.Entry[1];
await()
.atMost(Duration.ofSeconds(30))
.until(() -> {
lastReadResult[0] = objectUnderTest.read(500);
return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1;
});
return lastReadResult[0];
}

private byte[] createRandomBytes() {
final byte[] writtenBytes = new byte[128];
random.nextBytes(writtenBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper.awaitRead;

@ExtendWith(MockitoExtension.class)
public class KafkaBuffer_KmsIT {
Expand Down Expand Up @@ -177,7 +178,7 @@ void write_and_read_encrypted() throws TimeoutException {
Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down Expand Up @@ -216,7 +217,7 @@ void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeExcepti

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;

import static org.awaitility.Awaitility.await;

class ReadBufferHelper {
static Map.Entry<Collection<Record<Event>>, CheckpointState> awaitRead(final KafkaBuffer objectUnderTest) {
final Map.Entry<Collection<Record<Event>>, CheckpointState>[] lastReadResult = new Map.Entry[1];
await()
.atMost(Duration.ofSeconds(30))
.until(() -> {
lastReadResult[0] = objectUnderTest.read(500);
return lastReadResult[0] != null && lastReadResult[0].getKey() != null && lastReadResult[0].getKey().size() >= 1;
});
return lastReadResult[0];
}
}

0 comments on commit aa7910f

Please sign in to comment.