Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and improvements for AbstractSinkTest #5021

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,55 @@

import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.function.BiConsumer;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.hamcrest.number.OrderingComparison.lessThan;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
@Test
void testMetrics() {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
private String sinkName;
private String pipelineName;
private PluginSetting pluginSetting;

@BeforeEach
void setUp() {
sinkName = "testSink";
pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
}

@Test
void testMetrics() {
AbstractSink<Record<String>> abstractSink = new AbstractSinkImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), true);
Expand All @@ -63,31 +78,30 @@ void testMetrics() {
assertEquals(5.0, recordsInMeasurements.get(0).getValue(), 0);
assertEquals(3, elapsedTimeMeasurements.size());
assertEquals(1.0, MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.COUNT).getValue(), 0);
assertTrue(MetricsTestUtil.isBetween(
MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.TOTAL_TIME).getValue(),
0.2,
0.3));
double totalTime = MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.TOTAL_TIME).getValue();
assertAll(
() -> assertThat(totalTime, greaterThan(0.1)),
() -> assertThat(totalTime, lessThan(0.2))
);
assertEquals(abstractSink.getRetryThreadState(), null);
abstractSink.shutdown();
}

@Test
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
// Do another intialize to make sure the sink is still not ready
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
assertThat(abstractSink.isReady(), equalTo(false));
assertThat(abstractSink.getRetryThreadState(), anyOf(equalTo(Thread.State.RUNNABLE), equalTo(Thread.State.TIMED_WAITING)));

await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(abstractSink.initCount, greaterThanOrEqualTo(1)));

abstractSink.initialized = true;
assertThat(abstractSink.isReady(), equalTo(true));
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(abstractSink.getRetryThreadState(), equalTo(Thread.State.TERMINATED)));
assertThat(abstractSink.getRetryThreadState(), equalTo(Thread.State.TERMINATED));
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
Expand All @@ -96,33 +110,42 @@ void testSinkNotReady() throws InterruptedException {

@Test
void testSinkWithRegisterEventReleaseHandler() {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<Event>> abstractSink = new AbstractEventSinkImpl(pluginSetting);
final AbstractSink<Record<Event>> abstractSink = new AbstractEventSinkImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), true);
count = 0;
Event event = JacksonEvent.builder()
.withEventType("event")
.build();
Record record = mock(Record.class);
EventHandle eventHandle = mock(EventHandle.class);

final Event event = mock(Event.class);
final EventHandle eventHandle = mock(EventHandle.class);
final int minimumLatencySeconds = 20;
final Instant startTime = Instant.now().minus(Duration.ofSeconds(minimumLatencySeconds));
when(eventHandle.getInternalOriginationTime()).thenReturn(startTime);
when(event.getEventHandle()).thenReturn(eventHandle);
doAnswer(a -> {
final BiConsumer<EventHandle, Boolean> onReleaseHandler = a.getArgument(0, BiConsumer.class);
onReleaseHandler.accept(eventHandle, true);
return a;
}).when(eventHandle).onRelease(any());
final Record record = mock(Record.class);
when(record.getData()).thenReturn(event);

abstractSink.updateLatencyMetrics(Arrays.asList(record));
abstractSink.output(Arrays.asList(record));
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
abstractSink.shutdown();

final List<Measurement> latencyMeasurements = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(pipelineName).add(sinkName).add(SinkLatencyMetrics.INTERNAL_LATENCY).toString());
assertThat(latencyMeasurements.size(), greaterThanOrEqualTo(1));
double totalTime = MetricsTestUtil.getMeasurementFromList(latencyMeasurements, Statistic.TOTAL_TIME).getValue();
assertAll(
() -> assertThat(totalTime, greaterThan((double)minimumLatencySeconds)),
() -> assertThat(totalTime, lessThan( minimumLatencySeconds + 1.0))
);
}

private static class AbstractEventSinkImpl extends AbstractSink<Record<Event>> {

AbstractEventSinkImpl(PluginSetting pluginSetting) {
super(pluginSetting, 10, 1000);
super(pluginSetting);
}

@Override
Expand Down Expand Up @@ -152,13 +175,13 @@ public boolean isReady() {
private static class AbstractSinkImpl extends AbstractSink<Record<String>> {

AbstractSinkImpl(PluginSetting pluginSetting) {
super(pluginSetting, 10, 1000);
super(pluginSetting, 10, 10);
}

@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(200);
Thread.sleep(150);
} catch (InterruptedException e) {

}
Expand All @@ -184,18 +207,13 @@ private static class AbstractSinkNotReadyImpl extends AbstractSink<Record<String
boolean initialized;
int initCount;
AbstractSinkNotReadyImpl(PluginSetting pluginSetting) {
super(pluginSetting);
super(pluginSetting, 100, 20);
initialized = false;
initCount = 0;
}

@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {

}
}

@Override
Expand Down
Loading