diff --git a/build-resources.gradle b/build-resources.gradle index 63d9842560..628de1815d 100644 --- a/build-resources.gradle +++ b/build-resources.gradle @@ -8,7 +8,7 @@ ext.versionMap = [ junitJupiter : '5.8.2', mockito : '3.11.2', opentelemetryProto : '1.7.1-alpha', - opensearchVersion : '1.1.0' + opensearchVersion : '1.3.1' ] ext.coreProjects = [ diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index 0636420493..95f5d14043 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -38,6 +38,8 @@ dependencies { testImplementation project(':data-prepper-api').sourceSets.test.output implementation project(':data-prepper-plugins:common') implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearchVersion}" + implementation "org.opensearch.client:opensearch-rest-client:${opensearchVersion}" + implementation 'org.opensearch.client:opensearch-java:1.0.0' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1' @@ -52,10 +54,10 @@ dependencies { implementation 'software.amazon.awssdk:arns:2.17.15' implementation 'io.micrometer:micrometer-core:1.8.5' testImplementation 'org.awaitility:awaitility:4.1.1' - testImplementation "org.opensearch.test:framework:${opensearchVersion}" testImplementation 'commons-io:commons-io:2.11.0' testImplementation 'net.bytebuddy:byte-buddy:1.12.8' testImplementation 'net.bytebuddy:byte-buddy-agent:1.11.20' + testImplementation 'org.slf4j:slf4j-simple:1.7.36' } sourceSets { diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 6067ac88f1..16569f4cd2 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -9,21 +9,29 @@ import com.amazon.dataprepper.metrics.MetricsTestUtil; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.EventType; import com.amazon.dataprepper.model.event.JacksonEvent; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConstants; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Measurement; import org.apache.commons.io.FileUtils; import org.apache.http.util.EntityUtils; import org.hamcrest.MatcherAssert; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -48,7 +56,9 @@ import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; @@ -58,7 +68,9 @@ import static com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates; import static org.apache.http.HttpStatus.SC_OK; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.closeTo; @@ -75,14 +87,14 @@ public class OpenSearchSinkIT { private RestClient client; - @Before + @BeforeEach public void metricsInit() throws IOException { MetricsTestUtil.initMetrics(); client = createOpenSearchClient(); } - @After + @AfterEach public void cleanOpenSearch() throws Exception { wipeAllOpenSearchIndices(); wipeAllTemplates(); @@ -142,15 +154,16 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I RuntimeException.class, () -> new OpenSearchSink(pluginSetting)); } - @Test - public void testOutputRawSpanDefault() throws IOException, InterruptedException { + @ParameterizedTest + @ArgumentsSource(MultipleRecordTypeArgumentProvider.class) + public void testOutputRawSpanDefault(Function stringToRecord) throws IOException, InterruptedException { final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2); final ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") final Map expData1 = mapper.readValue(testDoc1, Map.class); @SuppressWarnings("unchecked") final Map expData2 = mapper.readValue(testDoc2, Map.class); - final List> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2)); + final List> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2)); final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null); final OpenSearchSink sink = new OpenSearchSink(pluginSetting); sink.output(testRecords); @@ -158,7 +171,7 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final List> retSources = getSearchResponseDocSources(expIndexAlias); MatcherAssert.assertThat(retSources.size(), equalTo(2)); - MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expData1, expData2)), equalTo(true)); + MatcherAssert.assertThat(retSources, hasItems(expData1, expData2)); MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1))); sink.shutdown(); @@ -202,18 +215,19 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2188.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2188.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0)); } - @Test - public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException { + @ParameterizedTest + @ArgumentsSource(MultipleRecordTypeArgumentProvider.class) + public void testOutputRawSpanWithDLQ(Function stringToRecord) throws IOException, InterruptedException { // TODO: write test case final String testDoc1 = readDocFromFile("raw-span-error.json"); final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); final ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc2, Map.class); - final List> testRecords = Arrays.asList(new Record<>(testDoc1), new Record<>(testDoc2)); + final List> testRecords = Arrays.asList(stringToRecord.apply(testDoc1), stringToRecord.apply(testDoc2)); final PluginSetting pluginSetting = generatePluginSetting(true, false, null, null); // generate temporary directory for dlq file final File tempDirectory = Files.createTempDirectory("").toFile(); @@ -225,9 +239,10 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException sink.output(testRecords); sink.shutdown(); - final StringBuilder content = new StringBuilder(); - Files.lines(Paths.get(expDLQFile)).forEach(content::append); - MatcherAssert.assertThat(content.toString().contains(testDoc1), equalTo(true)); + final StringBuilder dlqContent = new StringBuilder(); + Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append); + final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class)); + MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString)); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final List> retSources = getSearchResponseDocSources(expIndexAlias); MatcherAssert.assertThat(retSources.size(), equalTo(1)); @@ -256,8 +271,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2181.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2181.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0)); } @@ -280,13 +295,14 @@ public void testInstantiateSinkServiceMapDefault() throws IOException { } } - @Test - public void testOutputServiceMapDefault() throws IOException, InterruptedException { + @ParameterizedTest + @ArgumentsSource(MultipleRecordTypeArgumentProvider.class) + public void testOutputServiceMapDefault(Function stringToRecord) throws IOException, InterruptedException { final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE); final ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc, Map.class); - final List> testRecords = Collections.singletonList(new Record<>(testDoc)); + final List> testRecords = Collections.singletonList(stringToRecord.apply(testDoc)); final PluginSetting pluginSetting = generatePluginSetting(false, true, null, null); OpenSearchSink sink = new OpenSearchSink(pluginSetting); sink.output(testRecords); @@ -313,8 +329,8 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(309.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(309.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0)); // Check restart for index already exists sink = new OpenSearchSink(pluginSetting); @@ -443,14 +459,15 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOEx } - @Test - public void testOutputCustomIndex() throws IOException, InterruptedException { + @ParameterizedTest + @ArgumentsSource(MultipleRecordTypeArgumentProvider.class) + public void testOutputCustomIndex(Function stringToRecord) throws IOException, InterruptedException { final String testIndexAlias = "test-alias"; final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; final String testId = "foo"; - final List> testRecords = Collections.singletonList(generateCustomRecord(testIdField, testId)); + final List> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId))); final PluginSetting pluginSetting = generatePluginSetting(false, false, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); final OpenSearchSink sink = new OpenSearchSink(pluginSetting); @@ -523,14 +540,12 @@ private PluginSetting generatePluginSettingByMetadata(final Map return pluginSetting; } - private Record generateCustomRecord(final String idField, final String documentId) throws IOException { - return new Record<>( - Strings.toString( - XContentFactory.jsonBuilder() - .startObject() - .field(idField, documentId) - .endObject() - ) + private String generateCustomRecordJson(final String idField, final String documentId) throws IOException { + return Strings.toString( + XContentFactory.jsonBuilder() + .startObject() + .field(idField, documentId) + .endObject() ); } @@ -630,4 +645,27 @@ private void wipeAllOpenSearchIndices() throws IOException { } }); } + + /** + * Provides a function for mapping a String to a Record to allow the tests to run + * against both String and Event models. + */ + static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + final ObjectMapper objectMapper = new ObjectMapper(); + Function stringModel = jsonString -> new Record(jsonString); + Function eventModel = jsonString -> { + try { + return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }; + return Stream.of( + Arguments.of(stringModel), + Arguments.of(eventModel) + ); + } + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index b07392e0fd..fa87db7a0a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -6,13 +6,15 @@ package com.amazon.dataprepper.plugins.sink.opensearch; import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import io.micrometer.core.instrument.Counter; import org.opensearch.OpenSearchException; -import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BackoffPolicy; -import org.opensearch.action.bulk.BulkItemResponse; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.opensearch._types.ErrorCause; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.common.unit.TimeValue; import org.opensearch.rest.RestStatus; @@ -24,6 +26,8 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; +import static com.amazon.dataprepper.plugins.sink.opensearch.ErrorCauseStringCreator.toSingleLineDisplayString; + public final class BulkRetryStrategy { public static final String DOCUMENTS_SUCCESS = "documentsSuccess"; public static final String DOCUMENTS_SUCCESS_FIRST_ATTEMPT = "documentsSuccessFirstAttempt"; @@ -36,19 +40,19 @@ public final class BulkRetryStrategy { RestStatus.CONFLICT.getStatus() )); - private final RequestFunction requestFunction; - private final BiConsumer, Throwable> logFailure; + private final RequestFunction, BulkResponse> requestFunction; + private final BiConsumer logFailure; private final PluginMetrics pluginMetrics; - private final Supplier bulkRequestSupplier; + private final Supplier bulkRequestSupplier; private final Counter sentDocumentsCounter; private final Counter sentDocumentsOnFirstAttemptCounter; private final Counter documentErrorsCounter; - public BulkRetryStrategy(final RequestFunction requestFunction, - final BiConsumer, Throwable> logFailure, + public BulkRetryStrategy(final RequestFunction, BulkResponse> requestFunction, + final BiConsumer logFailure, final PluginMetrics pluginMetrics, - final Supplier bulkRequestSupplier) { + final Supplier bulkRequestSupplier) { this.requestFunction = requestFunction; this.logFailure = logFailure; this.pluginMetrics = pluginMetrics; @@ -59,7 +63,7 @@ public BulkRetryStrategy(final RequestFunction reques documentErrorsCounter = pluginMetrics.counter(DOCUMENT_ERRORS); } - public void execute(final BulkRequest bulkRequest) throws InterruptedException { + public void execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException { // Exponential backoff run forever // TODO: replace with custom backoff policy setting including maximum interval between retries final BackOffUtils backOffUtils = new BackOffUtils( @@ -68,8 +72,8 @@ public void execute(final BulkRequest bulkRequest) throws InterruptedException { } public boolean canRetry(final BulkResponse response) { - for (final BulkItemResponse bulkItemResponse : response) { - if (bulkItemResponse.isFailed() && !NON_RETRY_STATUS.contains(bulkItemResponse.status().getStatus())) { + for (final BulkResponseItem bulkItemResponse : response.items()) { + if (bulkItemResponse.error() != null && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) { return true; } } @@ -82,9 +86,9 @@ public boolean canRetry(final Exception e) { !NON_RETRY_STATUS.contains(((OpenSearchException) e).status().getStatus()))); } - private void handleRetry(final BulkRequest request, final BulkResponse response, + private void handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, final BackOffUtils backOffUtils, final boolean firstAttempt) throws InterruptedException { - final BulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response); + final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response); if (backOffUtils.hasNext()) { // Wait for backOff duration backOffUtils.next(); @@ -95,49 +99,49 @@ private void handleRetry(final BulkRequest request, final BulkResponse response, if (canRetry(e)) { handleRetry(bulkRequestForRetry, null, backOffUtils, false); } else { - handleFailures(bulkRequestForRetry.requests(), e); + handleFailures(bulkRequestForRetry, e); } return; } - if (bulkResponse.hasFailures()) { + if (bulkResponse.errors()) { if (canRetry(bulkResponse)) { if (firstAttempt) { - for (final BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { - if (!bulkItemResponse.isFailed()) { + for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { + if (bulkItemResponse.error() == null) { sentDocumentsOnFirstAttemptCounter.increment(); } } } handleRetry(bulkRequestForRetry, bulkResponse, backOffUtils, false); } else { - handleFailures(bulkRequestForRetry.requests(), bulkResponse.getItems()); + handleFailures(bulkRequestForRetry, bulkResponse.items()); } } else { - final int numberOfDocs = bulkRequestForRetry.numberOfActions(); + final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); if (firstAttempt) { sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs); } - sentDocumentsCounter.increment(bulkRequestForRetry.numberOfActions()); + sentDocumentsCounter.increment(bulkRequestForRetry.getOperationsCount()); } } } - private BulkRequest createBulkRequestForRetry( - final BulkRequest request, final BulkResponse response) { + private AccumulatingBulkRequest createBulkRequestForRetry( + final AccumulatingBulkRequest request, final BulkResponse response) { if (response == null) { // first attempt or retry due to Exception return request; } else { - final BulkRequest requestToReissue = bulkRequestSupplier.get(); + final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get(); int index = 0; - for (final BulkItemResponse bulkItemResponse : response.getItems()) { - if (bulkItemResponse.isFailed()) { - if (!NON_RETRY_STATUS.contains(bulkItemResponse.status().getStatus())) { - requestToReissue.add(request.requests().get(index)); + for (final BulkResponseItem bulkItemResponse : response.items()) { + if (bulkItemResponse.error() != null) { + if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { + requestToReissue.addOperation(request.getOperationAt(index)); } else { // log non-retryable failed request - logFailure.accept(request.requests().get(index), bulkItemResponse.getFailure().getCause()); + logFailure.accept(request.getOperationAt(index), new RuntimeException(toSingleLineDisplayString(bulkItemResponse.error()))); documentErrorsCounter.increment(); } } else { @@ -149,14 +153,14 @@ private BulkRequest createBulkRequestForRetry( } } - private void handleFailures(final List> docWriteRequests, final BulkItemResponse[] itemResponses) { - assert docWriteRequests.size() == itemResponses.length; - for (int i = 0; i < itemResponses.length; i++) { - final BulkItemResponse bulkItemResponse = itemResponses[i]; - final DocWriteRequest docWriteRequest = docWriteRequests.get(i); - if (bulkItemResponse.isFailed()) { - final BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - logFailure.accept(docWriteRequest, failure.getCause()); + private void handleFailures(final AccumulatingBulkRequest accumulatingBulkRequest, final List itemResponses) { + assert accumulatingBulkRequest.getOperationsCount() == itemResponses.size(); + for (int i = 0; i < itemResponses.size(); i++) { + final BulkResponseItem bulkItemResponse = itemResponses.get(i); + final BulkOperation bulkOperation = accumulatingBulkRequest.getOperationAt(i); + if (bulkItemResponse.error() != null) { + final ErrorCause error = bulkItemResponse.error(); + logFailure.accept(bulkOperation, new RuntimeException(toSingleLineDisplayString(error))); documentErrorsCounter.increment(); } else { sentDocumentsCounter.increment(); @@ -164,10 +168,10 @@ private void handleFailures(final List> docWriteRequests, fin } } - private void handleFailures(final List> docWriteRequests, final Throwable failure) { - documentErrorsCounter.increment(docWriteRequests.size()); - for (final DocWriteRequest docWriteRequest: docWriteRequests) { - logFailure.accept(docWriteRequest, failure); + private void handleFailures(final AccumulatingBulkRequest accumulatingBulkRequest, final Throwable failure) { + documentErrorsCounter.increment(accumulatingBulkRequest.getOperationsCount()); + for (final BulkOperation bulkOperation: accumulatingBulkRequest.getOperations()) { + logFailure.accept(bulkOperation, failure); } } } diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreator.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreator.java new file mode 100644 index 0000000000..fd5bf7f429 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreator.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch; + +import org.opensearch.client.opensearch._types.ErrorCause; + +class ErrorCauseStringCreator { + static String toSingleLineDisplayString(ErrorCause errorCause) { + ErrorCause currentErrorCause = errorCause; + + StringBuilder errorString = new StringBuilder(); + while (currentErrorCause != null) { + final String reasonLine = currentErrorCause.reason() != null ? currentErrorCause.reason() : "unknown"; + if (currentErrorCause != errorCause) { + errorString.append(" caused by "); + } + errorString.append(reasonLine); + currentErrorCause = currentErrorCause.causedBy(); + } + + return errorString.toString(); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 856bbb12b8..5b8c0f5f8c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -11,17 +11,26 @@ import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.model.sink.AbstractSink; import com.amazon.dataprepper.model.sink.Sink; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.client.RequestOptions; +import jakarta.json.JsonObject; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -48,15 +57,13 @@ public class OpenSearchSink extends AbstractSink> { public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class); - // Pulled from BulkRequest to make estimation of bytes consistent - private static final int REQUEST_OVERHEAD = 50; private BufferedWriter dlqWriter; private final OpenSearchSinkConfiguration openSearchSinkConfig; private final IndexManagerFactory indexManagerFactory; private RestHighLevelClient restHighLevelClient; private IndexManager indexManager; - private Supplier bulkRequestSupplier; + private Supplier bulkRequestSupplier; private BulkRetryStrategy bulkRetryStrategy; private final long bulkSize; private final IndexType indexType; @@ -65,6 +72,7 @@ public class OpenSearchSink extends AbstractSink> { private final Timer bulkRequestTimer; private final Counter bulkRequestErrorsCounter; private final DistributionSummary bulkRequestSizeBytesSummary; + private OpenSearchClient openSearchClient; public OpenSearchSink(final PluginSetting pluginSetting) { super(pluginSetting); @@ -101,9 +109,12 @@ public void initialize() throws IOException { dlqWriter = Files.newBufferedWriter(Paths.get(dlqFile), StandardOpenOption.CREATE, StandardOpenOption.APPEND); } indexManager.checkAndCreateIndex(); - bulkRequestSupplier = () -> new BulkRequest(indexManager.getIndexAlias()); + + OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new JacksonJsonpMapper()); + openSearchClient = new OpenSearchClient(transport); + bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder().index(indexManager.getIndexAlias())); bulkRetryStrategy = new BulkRetryStrategy( - bulkRequest -> restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT), + bulkRequest -> openSearchClient.bulk(bulkRequest.getRequest()), this::logFailure, pluginMetrics, bulkRequestSupplier); @@ -115,29 +126,36 @@ public void doOutput(final Collection> records) { if (records.isEmpty()) { return; } - BulkRequest bulkRequest = bulkRequestSupplier.get(); + + AccumulatingBulkRequest bulkRequest = bulkRequestSupplier.get(); for (final Record record : records) { - final String document = getDocument(record.getData()); - final IndexRequest indexRequest = new IndexRequest().source(document, XContentType.JSON); - try { - final Map source = getMapFromJson(document); - final String docId = (String) source.get(documentIdField); + final JsonData document = getDocument(record.getData()); + + final IndexOperation.Builder indexOperationBuilder = new IndexOperation.Builder<>() + .index(indexManager.getIndexAlias()) + .document(document); + + final JsonObject jsonObject = document.toJson().asJsonObject(); + if(jsonObject != null) { + final String docId = (String) jsonObject.getString(documentIdField, null); if (docId != null) { - indexRequest.id(docId); - } - final long estimatedBytesBeforeAdd = bulkRequest.estimatedSizeInBytes() + calcEstimatedSizeInBytes(indexRequest); - if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.numberOfActions() > 0) { - flushBatch(bulkRequest); - bulkRequest = bulkRequestSupplier.get(); + indexOperationBuilder.id(docId); } - bulkRequest.add(indexRequest); - } catch (final IOException e) { - throw new RuntimeException(e.getMessage(), e); } + final BulkOperation indexBulkOperation = new BulkOperation.Builder() + .index(indexOperationBuilder.build()) + .build(); + + final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(indexBulkOperation); + if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) { + flushBatch(bulkRequest); + bulkRequest = bulkRequestSupplier.get(); + } + bulkRequest.addOperation(indexBulkOperation); } // Flush the remaining requests - if (bulkRequest.numberOfActions() > 0) { + if (bulkRequest.getOperationsCount() > 0) { flushBatch(bulkRequest); } } @@ -145,26 +163,26 @@ public void doOutput(final Collection> records) { // Temporary function to support both trace and log ingestion pipelines. // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 - private String getDocument(final Object object) { + private JsonData getDocument(final Object object) { + final String jsonString; if (object instanceof String) { - return (String) object; + jsonString = (String) object; } else if (object instanceof Event) { - return ((Event) object).toJsonString(); + jsonString = ((Event) object).toJsonString(); + } else { throw new RuntimeException("Invalid record type. OpenSearch sink only supports String and Events"); } - } - private long calcEstimatedSizeInBytes(final IndexRequest indexRequest) { - // From BulkRequest#internalAdd(IndexRequest request) - return (indexRequest.source() != null ? indexRequest.source().length() : 0) + REQUEST_OVERHEAD; + return SizedJsonData.fromString(jsonString, openSearchClient._transport().jsonpMapper()); } - private void flushBatch(final BulkRequest bulkRequest) { + private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { bulkRequestTimer.record(() -> { try { - bulkRetryStrategy.execute(bulkRequest); - bulkRequestSizeBytesSummary.record(bulkRequest.estimatedSizeInBytes()); + LOG.info("Sending data to OpenSearch"); + bulkRetryStrategy.execute(accumulatingBulkRequest); + bulkRequestSizeBytesSummary.record(accumulatingBulkRequest.getEstimatedSizeInBytes()); } catch (final InterruptedException e) { LOG.error("Unexpected Interrupt:", e); bulkRequestErrorsCounter.increment(); @@ -179,22 +197,22 @@ private Map getMapFromJson(final String documentJson) throws IOE return parser.map(); } - private void logFailure(final DocWriteRequest docWriteRequest, final Throwable failure) { + private void logFailure(final BulkOperation bulkOperation, final Throwable failure) { if (dlqWriter != null) { try { dlqWriter.write(String.format("{\"Document\": [%s], \"failure\": %s}\n", - docWriteRequest.toString(), failure.getMessage())); + BulkOperationWriter.bulkOperationToString(bulkOperation), failure.getMessage())); } catch (final IOException e) { - LOG.error("DLQ failed for Document [{}]", docWriteRequest.toString()); + LOG.error("DLQ failed for Document [{}]", bulkOperation.toString()); } } else { - LOG.warn("Document [{}] has failure: {}", docWriteRequest.toString(), failure); + LOG.warn("Document [{}] has failure: {}", bulkOperation.toString(), failure); } } @Override public void shutdown() { - // Close the client + // Close the client. This closes the low-level client which will close it for both high-level clients. if (restHighLevelClient != null) { try { restHighLevelClient.close(); diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/AccumulatingBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/AccumulatingBulkRequest.java new file mode 100644 index 0000000000..815e576b54 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/AccumulatingBulkRequest.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import java.util.List; + +/** + * Accumulates Bulk Requests. + * + * @param + * @param + */ +public interface AccumulatingBulkRequest { + long estimateSizeInBytesWithDocument(O documentOrOperation); + + void addOperation(O documentOrOperation); + + O getOperationAt(int index); + + long getEstimatedSizeInBytes(); + + int getOperationsCount(); + + List getOperations(); + + R getRequest(); +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java new file mode 100644 index 0000000000..354b51ae9e --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import org.opensearch.client.json.JsonData; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.common.unit.ByteSizeValue; + +/** + * Based on low-level REST client's org.opensearch.action.index.IndexRequest::toString method. + */ +public class BulkOperationWriter { + private static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048; + + public static String bulkOperationToString(BulkOperation bulkOperation) { + String index = bulkOperation.index().index(); + String source = extractDocumentSource(bulkOperation); + String id = bulkOperation.index().id(); + + String sSource = "_na_"; + try { + if (source.length() > MAX_SOURCE_LENGTH_IN_TOSTRING) { + sSource = "n/a, actual length: [" + + new ByteSizeValue(source.length()).toString() + + "], max length: " + + new ByteSizeValue(MAX_SOURCE_LENGTH_IN_TOSTRING).toString(); + } else { + sSource = source; + } + } catch (Exception e) { + // ignore + } + return "index {[" + index + "][" + id + "], source[" + sSource + "]}"; + } + + private static String extractDocumentSource(BulkOperation bulkOperation) { + final JsonData document = (JsonData) bulkOperation.index().document(); + + return document.toJson().toString(); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java new file mode 100644 index 0000000000..f0e3ff8097 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkRequest { + static final int OPERATION_OVERHEAD = 50; + + private final List bulkOperations; + private BulkRequest.Builder bulkRequestBuilder; + private long currentBulkSize = 0L; + private int operationCount = 0; + private BulkRequest builtRequest; + + public JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder) { + this.bulkRequestBuilder = bulkRequestBuilder; + bulkOperations = new ArrayList<>(); + } + + @Override + public long estimateSizeInBytesWithDocument(BulkOperation documentOrOperation) { + return currentBulkSize + estimateBulkOperationSize(documentOrOperation); + } + + @Override + public void addOperation(BulkOperation bulkOperation) { + final Long documentLength = estimateBulkOperationSize(bulkOperation); + + currentBulkSize += documentLength; + + bulkRequestBuilder = bulkRequestBuilder.operations(bulkOperation); + + operationCount++; + bulkOperations.add(bulkOperation); + } + + @Override + public BulkOperation getOperationAt(int index) { + return bulkOperations.get(index); + } + + @Override + public long getEstimatedSizeInBytes() { + return currentBulkSize; + } + + @Override + public int getOperationsCount() { + return operationCount; + } + + @Override + public List getOperations() { + return Collections.unmodifiableList(bulkOperations); + } + + @Override + public BulkRequest getRequest() { + if(builtRequest == null) + builtRequest = bulkRequestBuilder.build(); + return builtRequest; + } + + private long estimateBulkOperationSize(BulkOperation bulkOperation) { + + if (!bulkOperation.isIndex()) { + throw new UnsupportedOperationException("Only index operations are supported currently. " + bulkOperation); + } + + Object anyDocument = bulkOperation.index().document(); + + if (anyDocument == null) + return OPERATION_OVERHEAD; + + if (!(anyDocument instanceof SizedJsonData)) { + throw new IllegalArgumentException("Only SizedJsonData is permitted for accumulating bulk requests. " + bulkOperation); + } + + SizedJsonData sizedDocument = (SizedJsonData) anyDocument; + + final long documentLength = sizedDocument.getDocumentSize(); + return documentLength + OPERATION_OVERHEAD; + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java new file mode 100644 index 0000000000..dcedb39112 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import jakarta.json.spi.JsonProvider; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.JsonpMapper; + +import java.io.StringReader; + +/** + * Extends the {@link JsonData} interface from the opensearch-java client with + * the addition of having a document size. + */ +public interface SizedJsonData extends JsonData { + /** + * The size of the document represented by this {@link JsonData}. + * + * @return The document size in bytes + */ + long getDocumentSize(); + + /** + * Creates a new {@link SizedJsonData} from a JSON string. + * + * @param jsonString The serialized JSON string which forms this JSON data. + * @param jsonpMapper The {@link JsonpMapper} to use for mapping. + * @return A new {@link SizedJsonData}. + */ + static SizedJsonData fromString(String jsonString, JsonpMapper jsonpMapper) { + JsonProvider jsonProvider = jsonpMapper.jsonProvider(); + final JsonData jsonData = JsonData.from(jsonProvider.createParser(new StringReader(jsonString)), jsonpMapper); + + final String serializedJsonLength = jsonData.toJson().toString(); + + return new SizedJsonDataImpl(jsonData, serializedJsonLength.length()); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java new file mode 100644 index 0000000000..210971e155 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import jakarta.json.JsonValue; +import jakarta.json.stream.JsonGenerator; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; + +class SizedJsonDataImpl implements SizedJsonData { + private final JsonData innerJsonData; + private final long documentSize; + + public SizedJsonDataImpl(final JsonData innerJsonData, final long documentSize) { + this.innerJsonData = innerJsonData; + this.documentSize = documentSize; + } + + @Override + public long getDocumentSize() { + return documentSize; + } + + @Override + public JsonValue toJson() { + return innerJsonData.toJson(); + } + + @Override + public JsonValue toJson(JsonpMapper mapper) { + return innerJsonData.toJson(mapper); + } + + @Override + public T to(Class clazz) { + return innerJsonData.to(clazz); + } + + @Override + public T to(Class clazz, JsonpMapper mapper) { + return innerJsonData.to(clazz, mapper); + } + + @Override + public T deserialize(JsonpDeserializer deserializer) { + return innerJsonData.deserialize(deserializer); + } + + @Override + public T deserialize(JsonpDeserializer deserializer, JsonpMapper mapper) { + return innerJsonData.deserialize(deserializer, mapper); + } + + @Override + public void serialize(JsonGenerator generator, JsonpMapper mapper) { + innerJsonData.serialize(generator, mapper); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 271868fbff..65c8519346 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -9,21 +9,28 @@ import com.amazon.dataprepper.metrics.MetricsTestUtil; import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.configuration.PluginSetting; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData; import io.micrometer.core.instrument.Measurement; import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch._types.ErrorCause; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.rest.RestStatus; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.StringJoiner; @@ -47,7 +54,7 @@ public class BulkRetryStrategyTests { setPipelineName(PIPELINE_NAME); }}; private static final PluginMetrics PLUGIN_METRICS = PluginMetrics.fromPluginSetting(PLUGIN_SETTING); - private BiConsumer, Throwable> logFailureConsumer; + private BiConsumer logFailureConsumer; @BeforeEach public void setUp() { @@ -57,25 +64,26 @@ public void setUp() { @Test public void testCanRetry() { + AccumulatingBulkRequest accumulatingBulkRequest = mock(AccumulatingBulkRequest.class); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - bulkRequest -> new BulkResponse(new BulkItemResponse[bulkRequest.requests().size()], 10), - (docWriteRequest, throwable) -> {}, PLUGIN_METRICS, BulkRequest::new); + bulkRequest -> mock(BulkResponse.class), + (docWriteRequest, throwable) -> {}, PLUGIN_METRICS, () -> mock(AccumulatingBulkRequest.class)); final String testIndex = "foo"; - final BulkItemResponse bulkItemResponse1 = successItemResponse(testIndex); - final BulkItemResponse bulkItemResponse2 = badRequestItemResponse(testIndex); - BulkResponse response = new BulkResponse( - new BulkItemResponse[]{bulkItemResponse1, bulkItemResponse2}, 10); - assertFalse(bulkRetryStrategy.canRetry(response)); - - final BulkItemResponse bulkItemResponse3 = tooManyRequestItemResponse(testIndex); - response = new BulkResponse( - new BulkItemResponse[]{bulkItemResponse1, bulkItemResponse3}, 10); - assertTrue(bulkRetryStrategy.canRetry(response)); - - final BulkItemResponse bulkItemResponse4 = internalServerErrorItemResponse(testIndex); - response = new BulkResponse( - new BulkItemResponse[]{bulkItemResponse2, bulkItemResponse4}, 10); - assertTrue(bulkRetryStrategy.canRetry(response)); + final BulkResponseItem bulkItemResponse1 = successItemResponse(testIndex); + final BulkResponseItem bulkItemResponse2 = badRequestItemResponse(testIndex); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(bulkItemResponse1, bulkItemResponse2)); + assertFalse(bulkRetryStrategy.canRetry(bulkResponse)); + + final BulkResponseItem bulkItemResponse3 = tooManyRequestItemResponse(testIndex); + bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(bulkItemResponse1, bulkItemResponse3)); + assertTrue(bulkRetryStrategy.canRetry(bulkResponse)); + + final BulkResponseItem bulkItemResponse4 = internalServerErrorItemResponse(testIndex); + bulkResponse = mock(BulkResponse.class); + when(bulkResponse.items()).thenReturn(Arrays.asList(bulkItemResponse2, bulkItemResponse4)); + assertTrue(bulkRetryStrategy.canRetry(bulkResponse)); } @Test @@ -85,14 +93,19 @@ public void testExecuteSuccessOnFirstAttempt() throws Exception { client.successOnFirstAttempt = true; final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); - final BulkRequest testBulkRequest = new BulkRequest(); - testBulkRequest.add(new IndexRequest(testIndex).id("1")); - testBulkRequest.add(new IndexRequest(testIndex).id("2")); - testBulkRequest.add(new IndexRequest(testIndex).id("3")); - testBulkRequest.add(new IndexRequest(testIndex).id("4")); + client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); - bulkRetryStrategy.execute(testBulkRequest); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation3).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation4).build()); + + bulkRetryStrategy.execute(accumulatingBulkRequest); assertEquals(1, client.attempt); @@ -115,27 +128,31 @@ public void testExecuteRetryable() throws Exception { final FakeClient client = new FakeClient(testIndex); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); - final BulkRequest testBulkRequest = new BulkRequest(); - testBulkRequest.add(new IndexRequest(testIndex).id("1")); - testBulkRequest.add(new IndexRequest(testIndex).id("2")); - testBulkRequest.add(new IndexRequest(testIndex).id("3")); - testBulkRequest.add(new IndexRequest(testIndex).id("4")); - - bulkRetryStrategy.execute(testBulkRequest); + client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation3).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation4).build()); + + bulkRetryStrategy.execute(accumulatingBulkRequest); assertEquals(3, client.attempt); - assertEquals(2, client.finalResponse.getItems().length); - assertFalse(client.finalResponse.hasFailures()); - assertEquals("3", client.finalRequest.requests().get(0).id()); - assertEquals("4", client.finalRequest.requests().get(1).id()); + assertEquals(2, client.finalResponse.items().size()); + assertFalse(client.finalResponse.errors()); + assertEquals("3", client.finalRequest.operations().get(0).index().id()); + assertEquals("4", client.finalRequest.operations().get(1).index().id()); - ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(BulkOperation.class); ArgumentCaptor loggerThrowableArgCaptor = ArgumentCaptor.forClass(Throwable.class); verify(logFailureConsumer).accept(loggerWriteRequestArgCaptor.capture(), loggerThrowableArgCaptor.capture()); MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue(), notNullValue()); - MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().index(), equalTo(testIndex)); - MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().id(), equalTo("2")); + MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().index().index(), equalTo(testIndex)); + MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().index().id(), equalTo("2")); MatcherAssert.assertThat(loggerThrowableArgCaptor.getValue(), notNullValue()); // verify metrics @@ -163,27 +180,31 @@ public void testExecuteNonRetryableException() throws Exception { client.retryable = false; final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); - final BulkRequest testBulkRequest = new BulkRequest(); - testBulkRequest.add(new IndexRequest(testIndex).id("1")); - testBulkRequest.add(new IndexRequest(testIndex).id("2")); - testBulkRequest.add(new IndexRequest(testIndex).id("3")); - testBulkRequest.add(new IndexRequest(testIndex).id("4")); - - bulkRetryStrategy.execute(testBulkRequest); + client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation3).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation4).build()); + + bulkRetryStrategy.execute(accumulatingBulkRequest); assertEquals(1, client.attempt); - ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(BulkOperation.class); ArgumentCaptor loggerExceptionArgCaptor = ArgumentCaptor.forClass(Throwable.class); verify(logFailureConsumer, times(4)) .accept(loggerWriteRequestArgCaptor.capture(), isA(IllegalArgumentException.class)); - final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); + final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); for (int i = 0; i < allLoggerWriteRequests.size(); i++) { - final DocWriteRequest actualFailedWrite = allLoggerWriteRequests.get(i); - MatcherAssert.assertThat(actualFailedWrite.index(), equalTo(testIndex)); + final BulkOperation actualFailedWrite = allLoggerWriteRequests.get(i); + MatcherAssert.assertThat(actualFailedWrite.index().index(), equalTo(testIndex)); String expectedIndexName = Integer.toString(i+1); - MatcherAssert.assertThat(actualFailedWrite.id(), equalTo(expectedIndexName)); + MatcherAssert.assertThat(actualFailedWrite.index().id(), equalTo(expectedIndexName)); } // verify metrics @@ -207,27 +228,31 @@ public void testExecuteNonRetryableResponse() throws Exception { client.nonRetryableException = false; final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); - final BulkRequest testBulkRequest = new BulkRequest(); - testBulkRequest.add(new IndexRequest(testIndex).id("1")); - testBulkRequest.add(new IndexRequest(testIndex).id("2")); - testBulkRequest.add(new IndexRequest(testIndex).id("3")); - testBulkRequest.add(new IndexRequest(testIndex).id("4")); - - bulkRetryStrategy.execute(testBulkRequest); + client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation3).build()); + accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation4).build()); + + bulkRetryStrategy.execute(accumulatingBulkRequest); assertEquals(1, client.attempt); - ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(BulkOperation.class); ArgumentCaptor loggerExceptionArgCaptor = ArgumentCaptor.forClass(Throwable.class); verify(logFailureConsumer, times(3)) - .accept(loggerWriteRequestArgCaptor.capture(), isA(IllegalArgumentException.class)); - final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); + .accept(loggerWriteRequestArgCaptor.capture(), isA(RuntimeException.class)); + final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); for (int i = 0; i < allLoggerWriteRequests.size(); i++) { - final DocWriteRequest actualFailedWrite = allLoggerWriteRequests.get(i); - MatcherAssert.assertThat(actualFailedWrite.index(), equalTo(testIndex)); + final BulkOperation actualFailedWrite = allLoggerWriteRequests.get(i); + MatcherAssert.assertThat(actualFailedWrite.index().index(), equalTo(testIndex)); String expectedIndexName = Integer.toString(i+2); - MatcherAssert.assertThat(actualFailedWrite.id(), equalTo(expectedIndexName)); + MatcherAssert.assertThat(actualFailedWrite.index().id(), equalTo(expectedIndexName)); } // verify metrics @@ -243,33 +268,35 @@ public void testExecuteNonRetryableResponse() throws Exception { assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0); } - private static BulkItemResponse successItemResponse(final String index) { - return mock(BulkItemResponse.class); + + private static BulkResponseItem successItemResponse(final String index) { + return mock(BulkResponseItem.class); } - private static BulkItemResponse badRequestItemResponse(final String index) { - return customBulkFailureResponse(index, RestStatus.BAD_REQUEST, new IllegalArgumentException()); + private static BulkResponseItem badRequestItemResponse(final String index) { + return customBulkFailureResponse(index, RestStatus.BAD_REQUEST); } - private static BulkItemResponse tooManyRequestItemResponse(final String index) { - return customBulkFailureResponse(index, RestStatus.TOO_MANY_REQUESTS, new OpenSearchRejectedExecutionException()); + private static BulkResponseItem tooManyRequestItemResponse(final String index) { + return customBulkFailureResponse(index, RestStatus.TOO_MANY_REQUESTS); } - private static BulkItemResponse internalServerErrorItemResponse(final String index) { - return customBulkFailureResponse(index, RestStatus.INTERNAL_SERVER_ERROR, new IllegalAccessException()); + private static BulkResponseItem internalServerErrorItemResponse(final String index) { + return customBulkFailureResponse(index, RestStatus.INTERNAL_SERVER_ERROR); } - private static BulkItemResponse customBulkFailureResponse(final String index, final RestStatus restStatus, final Exception cause) { - final BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); - when(failure.getStatus()).thenReturn(restStatus); - when(failure.getCause()).thenReturn(cause); - final BulkItemResponse badResponse = mock(BulkItemResponse.class); - when(badResponse.isFailed()).thenReturn(true); - when(badResponse.status()).thenReturn(restStatus); - when(badResponse.getFailure()).thenReturn(failure); + private static BulkResponseItem customBulkFailureResponse(final String index, final RestStatus restStatus) { + final ErrorCause errorCause = mock(ErrorCause.class); + final BulkResponseItem badResponse = mock(BulkResponseItem.class); + when(badResponse.status()).thenReturn(restStatus.getStatus()); + when(badResponse.error()).thenReturn(errorCause); return badResponse; } + private JsonData arbitraryDocument() { + return SizedJsonData.fromString("{}", new JacksonJsonpMapper()); + } + private static class FakeClient { boolean successOnFirstAttempt = false; @@ -284,7 +311,8 @@ public FakeClient(final String index) { this.index = index; } - public BulkResponse bulk(final BulkRequest bulkRequest) throws IOException { + public BulkResponse bulk(final AccumulatingBulkRequest accumulatingBulkRequest) throws IOException { + final BulkRequest bulkRequest = accumulatingBulkRequest.getRequest(); if (successOnFirstAttempt) { attempt++; return bulkSuccessResponse(bulkRequest); @@ -298,7 +326,7 @@ public BulkResponse bulk(final BulkRequest bulkRequest) throws IOException { } } finalRequest = bulkRequest; - final int requestSize = bulkRequest.requests().size(); + final int requestSize = bulkRequest.operations().size(); if (attempt == 0) { assert requestSize == 4; attempt++; @@ -318,38 +346,38 @@ public BulkResponse bulk(final BulkRequest bulkRequest) throws IOException { } private BulkResponse bulkFirstResponse(final BulkRequest bulkRequest) { - final int requestSize = bulkRequest.requests().size(); + final int requestSize = bulkRequest.operations().size(); assert requestSize == 4; - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[]{ + final List bulkItemResponses = Arrays.asList( successItemResponse(index), badRequestItemResponse(index), internalServerErrorItemResponse(index), - tooManyRequestItemResponse(index)}; - return new BulkResponse(bulkItemResponses, 10); + tooManyRequestItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); } private BulkResponse bulkSecondResponse(final BulkRequest bulkRequest) { - final int requestSize = bulkRequest.requests().size(); + final int requestSize = bulkRequest.operations().size(); assert requestSize == 2; - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[]{ - successItemResponse(index), successItemResponse(index)}; - return new BulkResponse(bulkItemResponses, 10); + final List bulkItemResponses = Arrays.asList( + successItemResponse(index), successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(false).took(10).build(); } private BulkResponse bulkNonRetryableResponse(final BulkRequest bulkRequest) { - final int requestSize = bulkRequest.requests().size(); + final int requestSize = bulkRequest.operations().size(); assert requestSize == 4; - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[]{ + final List bulkItemResponses = Arrays.asList( successItemResponse(index), badRequestItemResponse(index), badRequestItemResponse(index), - badRequestItemResponse(index)}; - return new BulkResponse(bulkItemResponses, 10); + badRequestItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); } private BulkResponse bulkSuccessResponse(final BulkRequest bulkRequest) { - final int requestSize = bulkRequest.requests().size(); + final int requestSize = bulkRequest.operations().size(); assert requestSize == 4; - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[]{ + final List bulkItemResponses = Arrays.asList( successItemResponse(index), successItemResponse(index), successItemResponse(index), - successItemResponse(index)}; - return new BulkResponse(bulkItemResponses, 10); + successItemResponse(index)); + return new BulkResponse.Builder().items(bulkItemResponses).took(10).errors(false).build(); } } } diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreatorTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreatorTest.java new file mode 100644 index 0000000000..7f6ac82072 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/ErrorCauseStringCreatorTest.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch; + +import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch._types.ErrorCause; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ErrorCauseStringCreatorTest { + @Test + void toSingleLineDisplayString_returns_empty_string_with_null_ErrorCause() { + assertThat(ErrorCauseStringCreator.toSingleLineDisplayString(null), + equalTo("")); + } + + @Test + void toSingleLineDisplayString_returns_empty_when_reason_is_null() { + ErrorCause errorCause = mock(ErrorCause.class); + + assertThat(ErrorCauseStringCreator.toSingleLineDisplayString(errorCause), + equalTo("unknown")); + } + + @Test + void toSingleLineDisplayString_returns_string_with_reason_when_no_nested_cause() { + String reason = UUID.randomUUID().toString(); + ErrorCause errorCause = mock(ErrorCause.class); + when(errorCause.reason()).thenReturn(reason); + + assertThat(ErrorCauseStringCreator.toSingleLineDisplayString(errorCause), + equalTo(reason)); + } + + @Test + void toSingleLineDisplayString_returns_string_of_reasons_with_a_single_nested_cause() { + String innerReason = UUID.randomUUID().toString(); + ErrorCause innerErrorCause = mock(ErrorCause.class); + when(innerErrorCause.reason()).thenReturn(innerReason); + + String outerReason = UUID.randomUUID().toString(); + ErrorCause outerErrorCause = mock(ErrorCause.class); + when(outerErrorCause.reason()).thenReturn(outerReason); + when(outerErrorCause.causedBy()).thenReturn(innerErrorCause); + + assertThat(ErrorCauseStringCreator.toSingleLineDisplayString(outerErrorCause), + equalTo(outerReason + " caused by " + innerReason)); + } + + @Test + void toSingleLineDisplayString_returns_string_of_reasons_with_multiple_nested_causes() { + String innerReason = UUID.randomUUID().toString(); + ErrorCause innerErrorCause = mock(ErrorCause.class); + when(innerErrorCause.reason()).thenReturn(innerReason); + + String middleReason = UUID.randomUUID().toString(); + ErrorCause middleErrorCause = mock(ErrorCause.class); + when(middleErrorCause.reason()).thenReturn(middleReason); + when(middleErrorCause.causedBy()).thenReturn(innerErrorCause); + + String outerReason = UUID.randomUUID().toString(); + ErrorCause outerErrorCause = mock(ErrorCause.class); + when(outerErrorCause.reason()).thenReturn(outerReason); + when(outerErrorCause.causedBy()).thenReturn(middleErrorCause); + + assertThat(ErrorCauseStringCreator.toSingleLineDisplayString(outerErrorCause), + equalTo(outerReason + " caused by " + middleReason + " caused by " + innerReason)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java new file mode 100644 index 0000000000..8dbfbc065a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java @@ -0,0 +1,222 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class JavaClientAccumulatingBulkRequestTest { + + private BulkRequest.Builder bulkRequestBuilder; + + @BeforeEach + void setUp() { + bulkRequestBuilder = mock(BulkRequest.Builder.class); + + when(bulkRequestBuilder.operations(any(BulkOperation.class))) + .thenReturn(bulkRequestBuilder); + + } + + private JavaClientAccumulatingBulkRequest createObjectUnderTest() { + return new JavaClientAccumulatingBulkRequest(bulkRequestBuilder); + } + + @Test + void getOperationCount_returns_0_if_no_interactions() { + assertThat(createObjectUnderTest().getOperationsCount(), equalTo(0)); + } + + @Test + void getOperations_returns_empty_list_if_no_interactions() { + assertThat(createObjectUnderTest().getOperations(), + equalTo(Collections.emptyList())); + } + + @Test + void getOperations_returns_unmodifiable_list() { + final List operations = createObjectUnderTest().getOperations(); + + final BulkOperation bulkOperation = createBulkOperation(generateDocument()); + assertThrows(UnsupportedOperationException.class, () -> operations.add(bulkOperation)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getOperationsCount_returns_the_correct_operation_count(final int operationCount) { + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + for (int i = 0; i < operationCount; i++) { + BulkOperation bulkOperation = createBulkOperation(generateDocument()); + objectUnderTest.addOperation(bulkOperation); + } + + assertThat(objectUnderTest.getOperationsCount(), equalTo(operationCount)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getEstimatedSizeInBytes_returns_the_current_size(final int operationCount) { + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + final long arbitraryDocumentSize = 175; + for (int i = 0; i < operationCount; i++) { + BulkOperation bulkOperation = createBulkOperation(generateDocumentWithLength(arbitraryDocumentSize)); + objectUnderTest.addOperation(bulkOperation); + } + + final long expectedSize = operationCount * (arbitraryDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD); + assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_documents(final int operationCount) { + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + for (int i = 0; i < operationCount; i++) { + objectUnderTest.addOperation(createBulkOperation(null)); + } + + final long expectedSize = operationCount * JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD; + assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); + } + + @Test + void getOperationAt_returns_the_correct_index() { + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + + List knownOperations = new ArrayList<>(); + + for (int i = 0; i < 7; i++) { + BulkOperation bulkOperation = createBulkOperation(generateDocument()); + objectUnderTest.addOperation(bulkOperation); + knownOperations.add(bulkOperation); + } + + for (int i = 0; i < 7; i++) { + assertThat(objectUnderTest.getOperationAt(i), equalTo(knownOperations.get(i))); + } + } + + @ParameterizedTest + @ValueSource(longs = {0, 1, 2, 10, 50, 100}) + void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { + final JsonData document = generateDocumentWithLength(inputDocumentSize); + final BulkOperation bulkOperation = createBulkOperation(document); + + assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), + equalTo(inputDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD)); + } + + @ParameterizedTest + @ValueSource(longs = {0, 1, 2, 10, 50, 100}) + void estimateSizeInBytesWithDocument_on_request_with_operations_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { + final JsonData document = generateDocumentWithLength(inputDocumentSize); + final BulkOperation bulkOperation = createBulkOperation(document); + + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + objectUnderTest.addOperation(createBulkOperation(generateDocumentWithLength(inputDocumentSize))); + + final long expectedSize = 2 * (inputDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD); + assertThat(objectUnderTest.estimateSizeInBytesWithDocument(bulkOperation), + equalTo(expectedSize)); + } + + @Test + void estimateSizeInBytesWithDocument_on_new_object_returns_operation_overhead_if_no_document() { + BulkOperation bulkOperation = createBulkOperation(null); + + assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), + equalTo((long) JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD)); + } + + @Test + void addOperation_adds_operation_to_the_BulkRequestBuilder() { + final BulkOperation bulkOperation = createBulkOperation(generateDocument()); + + createObjectUnderTest().addOperation(bulkOperation); + + verify(bulkRequestBuilder).operations(bulkOperation); + } + + @Test + void addOperation_throws_when_BulkOperation_is_not_an_index_request() { + final BulkOperation bulkOperation = mock(BulkOperation.class); + + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThrows(UnsupportedOperationException.class, () -> objectUnderTest.addOperation(bulkOperation)); + } + + @Test + void addOperation_throws_when_document_is_not_JsonSize() { + final BulkOperation bulkOperation = createBulkOperation(UUID.randomUUID().toString()); + + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.addOperation(bulkOperation)); + } + + @Test + void getRequest_returns_BulkRequestBuilder_build() { + BulkRequest expectedBulkRequest = mock(BulkRequest.class); + when(bulkRequestBuilder.build()).thenReturn(expectedBulkRequest); + + assertThat(createObjectUnderTest().getRequest(), equalTo(expectedBulkRequest)); + } + + @Test + void getRequest_called_multiple_times_only_builds_once_and_reuses_the_built_request() { + BulkRequest expectedBulkRequest = mock(BulkRequest.class); + when(bulkRequestBuilder.build()).thenReturn(expectedBulkRequest); + + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getRequest(), equalTo(expectedBulkRequest)); + assertThat(objectUnderTest.getRequest(), sameInstance(objectUnderTest.getRequest())); + + verify(bulkRequestBuilder, times(1)).build(); + } + + private BulkOperation createBulkOperation(Object document) { + final IndexOperation indexOperation = mock(IndexOperation.class); + when(indexOperation.document()).thenReturn(document); + final BulkOperation bulkOperation = mock(BulkOperation.class); + when(bulkOperation.isIndex()).thenReturn(true); + when(bulkOperation.index()).thenReturn(indexOperation); + + return bulkOperation; + } + + private JsonData generateDocument() { + return generateDocumentWithLength(10L); + } + + private JsonData generateDocumentWithLength(long documentLength) { + final SizedJsonData sizedJsonData = mock(SizedJsonData.class); + when(sizedJsonData.getDocumentSize()).thenReturn(documentLength); + return sizedJsonData; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java new file mode 100644 index 0000000000..96df204ff9 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import jakarta.json.JsonValue; +import jakarta.json.stream.JsonGenerator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; + +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class SizedJsonDataImplTest { + private JsonData innerJsonData; + private long documentSize; + + @BeforeEach + void setUp() { + Random random = new Random(); + innerJsonData = mock(JsonData.class); + documentSize = random.nextInt(10_000) + 100; + } + + private SizedJsonDataImpl createObjectUnderTest() { + return new SizedJsonDataImpl(innerJsonData, documentSize); + } + + @Test + void getDocumentSize_returns_the_documentSize() { + assertThat(createObjectUnderTest().getDocumentSize(), equalTo(documentSize)); + } + + @Nested + class ToJson { + private JsonValue jsonValue; + + @BeforeEach + void setUp() { + jsonValue = mock(JsonValue.class); + } + + @Test + void toJson_returns_inner_JsonData_toJson() { + when(innerJsonData.toJson()).thenReturn(jsonValue); + + assertThat(createObjectUnderTest().toJson(), equalTo(jsonValue)); + } + + @Test + void toJson_with_mapper_returns_inner_JsonData_toJson() { + JsonpMapper jsonpMapper = mock(JsonpMapper.class); + when(innerJsonData.toJson(jsonpMapper)).thenReturn(jsonValue); + + assertThat(createObjectUnderTest().toJson(jsonpMapper), equalTo(jsonValue)); + } + } + + @Nested + class ToClass { + private Class toClass; + private Object expectedToObject; + + @BeforeEach + void setUp() { + toClass = String.class; + + expectedToObject = mock(Object.class); + } + + @Test + void to_returns_inner_JsonData_to() { + when(innerJsonData.to(toClass)).thenReturn(expectedToObject); + + assertThat(createObjectUnderTest().to(toClass), equalTo(expectedToObject)); + } + + @Test + void to_with_mapper_returns_inner_JsonData_to() { + JsonpMapper jsonpMapper = mock(JsonpMapper.class); + when(innerJsonData.to(toClass, jsonpMapper)).thenReturn(expectedToObject); + + assertThat(createObjectUnderTest().to(toClass, jsonpMapper), equalTo(expectedToObject)); + } + } + + @Nested + class Deserialize { + private JsonpDeserializer jsonpDeserializer; + private Object expectedDeserializedObject; + + @BeforeEach + void setUp() { + jsonpDeserializer = mock(JsonpDeserializer.class); + expectedDeserializedObject = mock(Object.class); + } + + @Test + void deserialize_returns_inner_JsonData_deserialize() { + when(innerJsonData.deserialize(jsonpDeserializer)).thenReturn(expectedDeserializedObject); + + assertThat(createObjectUnderTest().deserialize(jsonpDeserializer), equalTo(expectedDeserializedObject)); + } + + @Test + void deserialize_with_mapper_returns_inner_JsonData_deserialize() { + JsonpMapper jsonpMapper = mock(JsonpMapper.class); + when(innerJsonData.deserialize(jsonpDeserializer, jsonpMapper)).thenReturn(expectedDeserializedObject); + + assertThat(createObjectUnderTest().deserialize(jsonpDeserializer, jsonpMapper), equalTo(expectedDeserializedObject)); + } + } + + @Test + void serialize_calls_inner_JsonData_serialize() { + JsonGenerator generator = mock(JsonGenerator.class); + JsonpMapper mapper = mock(JsonpMapper.class); + + createObjectUnderTest().serialize(generator, mapper); + + verify(innerJsonData).serialize(generator, mapper); + } +} \ No newline at end of file