Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated Data Prepper to use the opensearch-java client for bulk requests #1381

Merged
merged 1 commit into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build-resources.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ext.versionMap = [
junitJupiter : '5.8.2',
mockito : '3.11.2',
opentelemetryProto : '1.7.1-alpha',
opensearchVersion : '1.1.0'
opensearchVersion : '1.3.1'
]

ext.coreProjects = [
Expand Down
4 changes: 3 additions & 1 deletion data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
testImplementation project(':data-prepper-api').sourceSets.test.output
implementation project(':data-prepper-plugins:common')
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearchVersion}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearchVersion}"
implementation 'org.opensearch.client:opensearch-java:1.0.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
Expand All @@ -52,10 +54,10 @@ dependencies {
implementation 'software.amazon.awssdk:arns:2.17.15'
implementation 'io.micrometer:micrometer-core:1.8.5'
testImplementation 'org.awaitility:awaitility:4.1.1'
testImplementation "org.opensearch.test:framework:${opensearchVersion}"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'net.bytebuddy:byte-buddy:1.12.8'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.11.20'
testImplementation 'org.slf4j:slf4j-simple:1.7.36'
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@
import com.amazon.dataprepper.metrics.MetricsTestUtil;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.EventType;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Measurement;
import org.apache.commons.io.FileUtils;
import org.apache.http.util.EntityUtils;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we are using JUnit 5, this could be replaced. Could be in follow up PRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. This change is already significant, and so I wanted to minimize the number of unnecessary changes. I noticed this when doing it, but didn't want to add even more change.

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
Expand All @@ -48,7 +56,9 @@
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
Expand All @@ -58,7 +68,9 @@
import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates;
import static org.apache.http.HttpStatus.SC_OK;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -75,14 +87,14 @@ public class OpenSearchSinkIT {

private RestClient client;

@Before
@BeforeEach
public void metricsInit() throws IOException {
MetricsTestUtil.initMetrics();

client = createOpenSearchClient();
}

@After
@AfterEach
public void cleanOpenSearch() throws Exception {
wipeAllOpenSearchIndices();
wipeAllTemplates();
Expand Down Expand Up @@ -142,23 +154,24 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I
RuntimeException.class, () -> new OpenSearchSink(pluginSetting));
}

@Test
public void testOutputRawSpanDefault() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData1 = mapper.readValue(testDoc1, Map.class);
@SuppressWarnings("unchecked") final Map<String, Object> expData2 = mapper.readValue(testDoc2, Map.class);

final List<Record<Object>> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2));
final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);

final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expIndexAlias);
MatcherAssert.assertThat(retSources.size(), equalTo(2));
MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expData1, expData2)), equalTo(true));
MatcherAssert.assertThat(retSources, hasItems(expData1, expData2));
MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1)));
sink.shutdown();

Expand Down Expand Up @@ -202,18 +215,19 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2188.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2188.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0));
}

@Test
public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanWithDLQ(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc2, Map.class);
final List<Record<Object>> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2));
final List<Record<Object>> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null);
// generate temporary directory for dlq file
final File tempDirectory = Files.createTempDirectory("").toFile();
Expand All @@ -225,9 +239,10 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
sink.output(testRecords);
sink.shutdown();

final StringBuilder content = new StringBuilder();
Files.lines(Paths.get(expDLQFile)).forEach(content::append);
MatcherAssert.assertThat(content.toString().contains(testDoc1), equalTo(true));
final StringBuilder dlqContent = new StringBuilder();
Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append);
final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class));
MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString));
final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(expIndexAlias);
MatcherAssert.assertThat(retSources.size(), equalTo(1));
Expand Down Expand Up @@ -256,8 +271,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2181.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2181.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0));

}

Expand All @@ -280,13 +295,14 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {
}
}

@Test
public void testOutputServiceMapDefault() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputServiceMapDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);

final List<Record<Object>> testRecords = Collections.singletonList(new Record<>(testDoc));
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(testDoc));
final PluginSetting pluginSetting = generatePluginSetting(false, true, null, null);
OpenSearchSink sink = new OpenSearchSink(pluginSetting);
sink.output(testRecords);
Expand All @@ -313,8 +329,8 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(309.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(309.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0));

// Check restart for index already exists
sink = new OpenSearchSink(pluginSetting);
Expand Down Expand Up @@ -443,14 +459,15 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOEx

}

@Test
public void testOutputCustomIndex() throws IOException, InterruptedException {
@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputCustomIndex(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testId = "foo";
final List<Record<Object>> testRecords = Collections.singletonList(generateCustomRecord(testIdField, testId));
final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));
final PluginSetting pluginSetting = generatePluginSetting(false, false, testIndexAlias, testTemplateFile);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);
Expand Down Expand Up @@ -523,14 +540,12 @@ private PluginSetting generatePluginSettingByMetadata(final Map<String, Object>
return pluginSetting;
}

private Record<Object> generateCustomRecord(final String idField, final String documentId) throws IOException {
return new Record<>(
Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.field(idField, documentId)
.endObject()
)
private String generateCustomRecordJson(final String idField, final String documentId) throws IOException {
return Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.field(idField, documentId)
.endObject()
);
}

Expand Down Expand Up @@ -630,4 +645,27 @@ private void wipeAllOpenSearchIndices() throws IOException {
}
});
}

/**
* Provides a function for mapping a String to a Record to allow the tests to run
* against both String and Event models.
*/
static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
Function<String, Record> stringModel = jsonString -> new Record(jsonString);
Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
return Stream.of(
Arguments.of(stringModel),
Arguments.of(eventModel)
);
}
}
}
Loading