Skip to content

Commit

Permalink
Migrated Data Prepper to use the opensearch-java client for bulk requ…
Browse files Browse the repository at this point in the history
…ests rather than the REST High Level Client. opensearch-project#1347

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed May 12, 2022
1 parent d0ccedd commit d57315b
Show file tree
Hide file tree
Showing 15 changed files with 1,071 additions and 225 deletions.
2 changes: 1 addition & 1 deletion build-resources.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ext.versionMap = [
junitJupiter : '5.8.2',
mockito : '3.11.2',
opentelemetryProto : '1.7.1-alpha',
opensearchVersion : '1.1.0'
opensearchVersion : '1.3.1'
]

ext.coreProjects = [
Expand Down
4 changes: 3 additions & 1 deletion data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
testImplementation project(':data-prepper-api').sourceSets.test.output
implementation project(':data-prepper-plugins:common')
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearchVersion}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearchVersion}"
implementation 'org.opensearch.client:opensearch-java:1.0.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
Expand All @@ -52,10 +54,10 @@ dependencies {
implementation 'software.amazon.awssdk:arns:2.17.15'
implementation 'io.micrometer:micrometer-core:1.8.5'
testImplementation 'org.awaitility:awaitility:4.1.1'
testImplementation "org.opensearch.test:framework:${opensearchVersion}"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'net.bytebuddy:byte-buddy:1.12.8'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.11.20'
testImplementation 'org.slf4j:slf4j-simple:1.7.36'
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@
import com.amazon.dataprepper.metrics.MetricsTestUtil;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.EventType;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Measurement;
import org.apache.commons.io.FileUtils;
import org.apache.http.util.EntityUtils;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
Expand All @@ -48,7 +56,9 @@
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
Expand All @@ -58,7 +68,9 @@
import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates;
import static org.apache.http.HttpStatus.SC_OK;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -75,14 +87,14 @@ public class OpenSearchSinkIT {

private RestClient client;

@Before
@BeforeEach
public void metricsInit() throws IOException {
MetricsTestUtil.initMetrics();

client = createOpenSearchClient();
}

@After
@AfterEach
public void cleanOpenSearch() throws Exception {
wipeAllOpenSearchIndices();
wipeAllTemplates();
Expand Down Expand Up @@ -142,23 +154,24 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I
RuntimeException.class, () -> new OpenSearchSink(pluginSetting));
}

@Test
public void testOutputRawSpanDefault() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData1 = mapper.readValue(testDoc1, Map.class);
@SuppressWarnings("unchecked") final Map<String, Object> expData2 = mapper.readValue(testDoc2, Map.class);

final List<Record<Object>> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2));
final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);

final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expIndexAlias);
MatcherAssert.assertThat(retSources.size(), equalTo(2));
MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expData1, expData2)), equalTo(true));
MatcherAssert.assertThat(retSources, hasItems(expData1, expData2));
MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1)));
sink.shutdown();

Expand Down Expand Up @@ -202,18 +215,19 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2188.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2188.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0));
}

@Test
public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanWithDLQ(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc2, Map.class);
final List<Record<Object>> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2));
final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null);
// generate temporary directory for dlq file
final File tempDirectory = Files.createTempDirectory("").toFile();
Expand All @@ -225,9 +239,10 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
sink.output(testRecords);
sink.shutdown();

final StringBuilder content = new StringBuilder();
Files.lines(Paths.get(expDLQFile)).forEach(content::append);
MatcherAssert.assertThat(content.toString().contains(testDoc1), equalTo(true));
final StringBuilder dlqContent = new StringBuilder();
Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append);
final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class));
MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString));
final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expIndexAlias);
MatcherAssert.assertThat(retSources.size(), equalTo(1));
Expand Down Expand Up @@ -256,8 +271,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2181.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2181.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0));

}

Expand All @@ -280,13 +295,14 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {
}
}

@Test
public void testOutputServiceMapDefault() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputServiceMapDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);

final List<Record<Object>> testRecords = Collections.singletonList(new Record<>(testDoc));
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(testDoc));
final PluginSetting pluginSetting = generatePluginSetting(false, true, null, null);
OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);
Expand All @@ -313,8 +329,8 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(309.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(309.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0));

// Check restart for index already exists
sink = new OpenSearchSink(pluginSetting);
Expand Down Expand Up @@ -443,14 +459,15 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOEx

}

@Test
public void testOutputCustomIndex() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputCustomIndex(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Object>> testRecords = Collections.singletonList(generateCustomRecord(testIdField, testId));
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));
final PluginSetting pluginSetting = generatePluginSetting(false, false, testIndexAlias, testTemplateFile);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
Expand Down Expand Up @@ -523,14 +540,12 @@ private PluginSetting generatePluginSettingByMetadata(final Map<String, Object>
return pluginSetting;
}

private Record<Object> generateCustomRecord(final String idField, final String documentId) throws IOException {
return new Record<>(
Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.field(idField, documentId)
.endObject()
)
private String generateCustomRecordJson(final String idField, final String documentId) throws IOException {
return Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.field(idField, documentId)
.endObject()
);
}

Expand Down Expand Up @@ -630,4 +645,27 @@ private void wipeAllOpenSearchIndices() throws IOException {
}
});
}

/**
* Provides a function for mapping a String to a Record to allow the tests to run
* against both String and Event models.
*/
static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
Function<String, Record> stringModel = jsonString -> new Record(jsonString);
Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
return Stream.of(
Arguments.of(stringModel),
Arguments.of(eventModel)
);
}
}
}
Loading

0 comments on commit d57315b

Please sign in to comment.