diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java index c12c52a9bf..f21a1f2d52 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java @@ -5,33 +5,61 @@ package org.opensearch.dataprepper.plugins.source; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.TestBuffer; -import org.junit.Test; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -public class RandomStringSourceTests { +class RandomStringSourceTests { - @Test - public void testPutRecord() throws InterruptedException { - final RandomStringSource randomStringSource = - new RandomStringSource(); + private TestBuffer buffer; + + @BeforeEach + void setUp() { final Queue> bufferQueue = new ConcurrentLinkedQueue<>(); - final TestBuffer buffer = new TestBuffer(bufferQueue, 1); + buffer = new TestBuffer(bufferQueue, 1); + } + + private RandomStringSource createObjectUnderTest() { + return new RandomStringSource(); + } + + @Test + void testPutRecord() { + final RandomStringSource randomStringSource = createObjectUnderTest(); + + randomStringSource.start(buffer); + await().atMost(3, TimeUnit.SECONDS) + .pollDelay(200, TimeUnit.MILLISECONDS) + .until(() -> buffer.size() > 0); + assertThat(buffer.size(), greaterThan(0)); + } + + @Test + void testStop() throws InterruptedException { + final RandomStringSource randomStringSource = createObjectUnderTest(); //Start source, and sleep for 1000 millis randomStringSource.start(buffer); - Thread.sleep(1000); + await().atMost(3, TimeUnit.SECONDS) + .pollDelay(200, TimeUnit.MILLISECONDS) + .until(() -> buffer.size() > 0); //Stop the source, and wait long enough that another message would be sent //if the source was running - assertThat(buffer.size(), greaterThan(0)); - Thread.sleep(1000); randomStringSource.stop(); - assertThat(buffer.size(), greaterThan(0)); + Thread.sleep(200); // Ensure the other thread has time to finish writing. + final int sizeAfterCompletion = buffer.size(); + Thread.sleep(1000); + assertThat(buffer.size(), equalTo(sizeAfterCompletion)); } + }