Skip to content

Commit

Permalink
Adds a configuration to the random string source to configure the wai…
Browse files Browse the repository at this point in the history
…t delay between writes to the buffer. Resolves #3595. Also uses a single thread for this source to avoid an unnecessary thread pool and increases the code coverage. (#3602)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 14, 2023
1 parent 4b29bfe commit 88e0bb0
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,69 +6,68 @@
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Generates a random string every 500 milliseconds. Intended to be used for testing setups
*/
@DataPrepperPlugin(name = "random", pluginType = Source.class)
@DataPrepperPlugin(name = "random", pluginType = Source.class, pluginConfigurationType = RandomStringSourceConfig.class)
public class RandomStringSource implements Source<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(RandomStringSource.class);
private static final int BUFFER_WAIT = 500;
private final long waitTimeInMillis;

private ExecutorService executorService;
private volatile boolean stop = false;
private Thread thread;

private void setExecutorService() {
if(executorService == null || executorService.isShutdown()) {
executorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("random-source-pool-%d").build()
);
}
@DataPrepperPluginConstructor
public RandomStringSource(final RandomStringSourceConfig config) {
waitTimeInMillis = config.getWaitDelay().toMillis();
}

@Override
public void start(final Buffer<Record<Event>> buffer) {
setExecutorService();
executorService.execute(() -> {
if(thread != null) {
throw new IllegalStateException("This source has already started.");
}

thread = new Thread(() -> {
while (!stop) {
try {
LOG.debug("Writing to buffer");
final Record<Event> record = generateRandomStringEventRecord();
buffer.write(record, 500);
Thread.sleep(500);
buffer.write(record, BUFFER_WAIT);
Thread.sleep(waitTimeInMillis);
} catch (final InterruptedException e) {
break;
} catch (final TimeoutException e) {
// Do nothing
}
}
});
},
"random-source");

thread.setDaemon(false);
thread.start();
}

@Override
public void stop() {
stop = true;
executorService.shutdown();
try {
if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (final InterruptedException ex) {
executorService.shutdownNow();
thread.join(waitTimeInMillis + BUFFER_WAIT + 100);
} catch (final InterruptedException e) {
thread.interrupt();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Duration;

public class RandomStringSourceConfig {
@JsonProperty("wait_delay")
private Duration waitDelay = Duration.ofMillis(500);

public Duration getWaitDelay() {
return waitDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,34 @@

package org.opensearch.dataprepper.plugins.source;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.TestBuffer;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

@ExtendWith(MockitoExtension.class)
class RandomStringSourceTests {

private TestBuffer buffer;

@BeforeEach
void setUp() {
final Queue<Record<Event>> bufferQueue = new ConcurrentLinkedQueue<>();
buffer = new TestBuffer(bufferQueue, 1);
}
@Mock
private Buffer<Record<Event>> buffer;

private RandomStringSource createObjectUnderTest() {
return new RandomStringSource();
return new RandomStringSource(new RandomStringSourceConfig());
}

@Test
Expand All @@ -41,25 +42,44 @@ void testPutRecord() {
randomStringSource.start(buffer);
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.until(() -> buffer.size() > 0);
assertThat(buffer.size(), greaterThan(0));
.untilAsserted(() -> verify(buffer).write(any(), anyInt()));
}

@Test
void source_continues_to_write_if_a_write_to_buffer_fails() throws TimeoutException {
final RandomStringSource randomStringSource = createObjectUnderTest();

doThrow(TimeoutException.class).when(buffer).write(any(), anyInt());

randomStringSource.start(buffer);
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> verify(buffer, atLeast(2)).write(any(), anyInt()));
}

@Test
void testStop() throws InterruptedException {
void testStop() throws InterruptedException, TimeoutException {
final RandomStringSource randomStringSource = createObjectUnderTest();
//Start source, and sleep for 1000 millis
randomStringSource.start(buffer);
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.until(() -> buffer.size() > 0);
.untilAsserted(() -> verify(buffer).write(any(), anyInt()));
//Stop the source, and wait long enough that another message would be sent
//if the source was running
randomStringSource.stop();
Thread.sleep(200); // Ensure the other thread has time to finish writing.
final int sizeAfterCompletion = buffer.size();
verify(buffer, atLeastOnce()).write(any(), anyInt());
Thread.sleep(1000);
assertThat(buffer.size(), equalTo(sizeAfterCompletion));
verifyNoMoreInteractions(buffer);
}

@Test
void multiple_calls_to_start_throws() {
final RandomStringSource objectUnderTest = createObjectUnderTest();

objectUnderTest.start(buffer);

assertThrows(IllegalStateException.class, () -> objectUnderTest.start(buffer));
}
}

0 comments on commit 88e0bb0

Please sign in to comment.