From 2b894a1751242f049a7420a371b708ab620342d0 Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 12 May 2022 20:22:56 -0500 Subject: [PATCH] Improve OpenSearch sink performance by creating a customer JsonpMapper which avoids re-serializing bulk documents. Signed-off-by: David Venable --- .../sink/opensearch/OpenSearchSinkIT.java | 9 +- .../sink/opensearch/OpenSearchSink.java | 33 +++-- .../opensearch/bulk/BulkOperationWriter.java | 5 +- .../JavaClientAccumulatingBulkRequest.java | 6 +- .../bulk/PreSerializedJsonpMapper.java | 84 +++++++++++ .../sink/opensearch/bulk/SerializedJson.java | 28 ++++ .../opensearch/bulk/SerializedJsonImpl.java | 24 ++++ .../sink/opensearch/bulk/SizedDocument.java | 18 +++ .../sink/opensearch/bulk/SizedJsonData.java | 41 ------ .../opensearch/bulk/SizedJsonDataImpl.java | 62 -------- .../opensearch/BulkRetryStrategyTests.java | 40 +++--- ...JavaClientAccumulatingBulkRequestTest.java | 15 +- .../bulk/PreSerializedJsonpMapperTest.java | 114 +++++++++++++++ .../bulk/SerializedJsonImplTest.java | 42 ++++++ .../opensearch/bulk/SerializedJsonTest.java | 24 ++++ .../bulk/SizedJsonDataImplTest.java | 134 ------------------ 16 files changed, 395 insertions(+), 284 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapper.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedDocument.java delete mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java delete mode 100644 data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java create mode 100644 data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapperTest.java create mode 100644 data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java create mode 100644 data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java delete mode 100644 data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 16569f4cd2..a2384c450e 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -654,7 +654,14 @@ static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { final ObjectMapper objectMapper = new ObjectMapper(); - Function stringModel = jsonString -> new Record(jsonString); + Function stringModel = jsonString -> { + try { + // Normalize the JSON string. + return new Record(objectMapper.writeValueAsString(objectMapper.readValue(jsonString, Map.class))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }; Function eventModel = jsonString -> { try { return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build()); diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 5b8c0f5f8c..2d5edb132b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -14,17 +14,16 @@ import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import com.amazon.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest; -import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; -import jakarta.json.JsonObject; import org.opensearch.client.RestHighLevelClient; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; @@ -73,6 +72,7 @@ public class OpenSearchSink extends AbstractSink> { private final Counter bulkRequestErrorsCounter; private final DistributionSummary bulkRequestSizeBytesSummary; private OpenSearchClient openSearchClient; + private ObjectMapper objectMapper; public OpenSearchSink(final PluginSetting pluginSetting) { super(pluginSetting); @@ -110,7 +110,8 @@ public void initialize() throws IOException { } indexManager.checkAndCreateIndex(); - OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new JacksonJsonpMapper()); + + OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper()); openSearchClient = new OpenSearchClient(transport); bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder().index(indexManager.getIndexAlias())); bulkRetryStrategy = new BulkRetryStrategy( @@ -119,6 +120,8 @@ public void initialize() throws IOException { pluginMetrics, bulkRequestSupplier); LOG.info("Initialized OpenSearch sink"); + + objectMapper = new ObjectMapper(); } @Override @@ -127,17 +130,25 @@ public void doOutput(final Collection> records) { return; } + + AccumulatingBulkRequest bulkRequest = bulkRequestSupplier.get(); for (final Record record : records) { - final JsonData document = getDocument(record.getData()); + final SerializedJson document = getDocument(record.getData()); final IndexOperation.Builder indexOperationBuilder = new IndexOperation.Builder<>() .index(indexManager.getIndexAlias()) .document(document); - final JsonObject jsonObject = document.toJson().asJsonObject(); - if(jsonObject != null) { - final String docId = (String) jsonObject.getString(documentIdField, null); + + final Map documentAsMap; + try { + documentAsMap = objectMapper.readValue(document.getSerializedJson(), Map.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + if(documentAsMap != null) { + final String docId = (String) documentAsMap.get(documentIdField); if (docId != null) { indexOperationBuilder.id(docId); } @@ -163,7 +174,7 @@ public void doOutput(final Collection> records) { // Temporary function to support both trace and log ingestion pipelines. // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 - private JsonData getDocument(final Object object) { + private SerializedJson getDocument(final Object object) { final String jsonString; if (object instanceof String) { jsonString = (String) object; @@ -174,7 +185,7 @@ private JsonData getDocument(final Object object) { throw new RuntimeException("Invalid record type. OpenSearch sink only supports String and Events"); } - return SizedJsonData.fromString(jsonString, openSearchClient._transport().jsonpMapper()); + return SerializedJson.fromString(jsonString); } private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java index 354b51ae9e..66f26b6ac9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java @@ -32,7 +32,6 @@ package com.amazon.dataprepper.plugins.sink.opensearch.bulk; -import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.common.unit.ByteSizeValue; @@ -64,8 +63,8 @@ public static String bulkOperationToString(BulkOperation bulkOperation) { } private static String extractDocumentSource(BulkOperation bulkOperation) { - final JsonData document = (JsonData) bulkOperation.index().document(); + final SerializedJson document = (SerializedJson) bulkOperation.index().document(); - return document.toJson().toString(); + return new String(document.getSerializedJson()); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java index f0e3ff8097..881860fe9d 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java @@ -81,11 +81,11 @@ private long estimateBulkOperationSize(BulkOperation bulkOperation) { if (anyDocument == null) return OPERATION_OVERHEAD; - if (!(anyDocument instanceof SizedJsonData)) { - throw new IllegalArgumentException("Only SizedJsonData is permitted for accumulating bulk requests. " + bulkOperation); + if (!(anyDocument instanceof SizedDocument)) { + throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation); } - SizedJsonData sizedDocument = (SizedJsonData) anyDocument; + SizedDocument sizedDocument = (SizedDocument) anyDocument; final long documentLength = sizedDocument.getDocumentSize(); return documentLength + OPERATION_OVERHEAD; diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapper.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapper.java new file mode 100644 index 0000000000..11ed9f3bef --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapper.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import jakarta.json.spi.JsonProvider; +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.jackson.JacksonJsonProvider; +import org.opensearch.client.json.jackson.JacksonJsonpGenerator; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An implementation of the opensearch-java client's {@link JsonpMapper}. It can avoid duplicate + * serialization by use of the {@link SerializedJson} interface. For values that inherit from + * {@link SerializedJson}, it will not re-serialize these. For other values, it uses Jackson + * serialization via the {@link JacksonJsonpMapper}. + */ +public class PreSerializedJsonpMapper implements JsonpMapper { + + private final JsonpMapper innerMapper; + private final PreSerializedJsonProvider jsonProvider; + + public PreSerializedJsonpMapper() { + innerMapper = new JacksonJsonpMapper(); + jsonProvider = new PreSerializedJsonProvider(); + } + + @Override + public JsonProvider jsonProvider() { + return jsonProvider; + } + + @Override + public T deserialize(JsonParser parser, Class clazz) { + return innerMapper.deserialize(parser, clazz); + } + + @Override + public void serialize(T value, JsonGenerator generator) { + if(value instanceof SerializedJson) { + if (! (generator instanceof PreSerializedJsonGenerator)) + throw new IllegalArgumentException("Unsupported JsonGenerator"); + + final OutputStream outputStream = ((PreSerializedJsonGenerator) generator).outputStream; + + try { + outputStream.write(((SerializedJson) value).getSerializedJson()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } else { + innerMapper.serialize(value, generator); + } + } + + private static class PreSerializedJsonGenerator extends JacksonJsonpGenerator { + private OutputStream outputStream; + + public PreSerializedJsonGenerator(com.fasterxml.jackson.core.JsonGenerator generator, OutputStream outputStream) { + super(generator); + this.outputStream = outputStream; + } + } + + + private static class PreSerializedJsonProvider extends JacksonJsonProvider { + @Override + public JsonGenerator createGenerator(OutputStream out) { + try { + return new PreSerializedJsonGenerator(jacksonJsonFactory().createGenerator(out), out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java new file mode 100644 index 0000000000..669f31e382 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + * Represents JSON which is already serialized for use in the {@link PreSerializedJsonpMapper}. + */ +public interface SerializedJson extends SizedDocument { + + byte[] getSerializedJson(); + + /** + * Creates a new {@link SerializedJson} from a JSON string. + * + * @param jsonString The serialized JSON string which forms this JSON data. + * @return A new {@link SerializedJson}. + */ + static SerializedJson fromString(String jsonString) { + Objects.requireNonNull(jsonString); + return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java new file mode 100644 index 0000000000..6de815fb88 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +class SerializedJsonImpl implements SerializedJson { + private byte[] document; + + public SerializedJsonImpl(final byte[] document) { + this.document = document; + } + + @Override + public long getDocumentSize() { + return document.length; + } + + @Override + public byte[] getSerializedJson() { + return document; + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedDocument.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedDocument.java new file mode 100644 index 0000000000..c011bee310 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedDocument.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +/** + * Represents an OpenSearch document with a serialized document size. + */ +public interface SizedDocument { + /** + * The size of this document. + * + * @return The document size in bytes + */ + long getDocumentSize(); +} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java deleted file mode 100644 index dcedb39112..0000000000 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonData.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.amazon.dataprepper.plugins.sink.opensearch.bulk; - -import jakarta.json.spi.JsonProvider; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.json.JsonpMapper; - -import java.io.StringReader; - -/** - * Extends the {@link JsonData} interface from the opensearch-java client with - * the addition of having a document size. - */ -public interface SizedJsonData extends JsonData { - /** - * The size of the document represented by this {@link JsonData}. - * - * @return The document size in bytes - */ - long getDocumentSize(); - - /** - * Creates a new {@link SizedJsonData} from a JSON string. - * - * @param jsonString The serialized JSON string which forms this JSON data. - * @param jsonpMapper The {@link JsonpMapper} to use for mapping. - * @return A new {@link SizedJsonData}. - */ - static SizedJsonData fromString(String jsonString, JsonpMapper jsonpMapper) { - JsonProvider jsonProvider = jsonpMapper.jsonProvider(); - final JsonData jsonData = JsonData.from(jsonProvider.createParser(new StringReader(jsonString)), jsonpMapper); - - final String serializedJsonLength = jsonData.toJson().toString(); - - return new SizedJsonDataImpl(jsonData, serializedJsonLength.length()); - } -} diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java deleted file mode 100644 index 210971e155..0000000000 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImpl.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.amazon.dataprepper.plugins.sink.opensearch.bulk; - -import jakarta.json.JsonValue; -import jakarta.json.stream.JsonGenerator; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.json.JsonpDeserializer; -import org.opensearch.client.json.JsonpMapper; - -class SizedJsonDataImpl implements SizedJsonData { - private final JsonData innerJsonData; - private final long documentSize; - - public SizedJsonDataImpl(final JsonData innerJsonData, final long documentSize) { - this.innerJsonData = innerJsonData; - this.documentSize = documentSize; - } - - @Override - public long getDocumentSize() { - return documentSize; - } - - @Override - public JsonValue toJson() { - return innerJsonData.toJson(); - } - - @Override - public JsonValue toJson(JsonpMapper mapper) { - return innerJsonData.toJson(mapper); - } - - @Override - public T to(Class clazz) { - return innerJsonData.to(clazz); - } - - @Override - public T to(Class clazz, JsonpMapper mapper) { - return innerJsonData.to(clazz, mapper); - } - - @Override - public T deserialize(JsonpDeserializer deserializer) { - return innerJsonData.deserialize(deserializer); - } - - @Override - public T deserialize(JsonpDeserializer deserializer, JsonpMapper mapper) { - return innerJsonData.deserialize(deserializer, mapper); - } - - @Override - public void serialize(JsonGenerator generator, JsonpMapper mapper) { - innerJsonData.serialize(generator, mapper); - } -} diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 65c8519346..60d8500950 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -11,15 +11,13 @@ import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest; -import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData; +import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import io.micrometer.core.instrument.Measurement; import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; @@ -95,10 +93,10 @@ public void testExecuteSuccessOnFirstAttempt() throws Exception { final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); - final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); - final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); - final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); - final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); @@ -129,10 +127,10 @@ public void testExecuteRetryable() throws Exception { final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); - final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); - final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); - final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); - final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); @@ -181,10 +179,10 @@ public void testExecuteNonRetryableException() throws Exception { final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); - final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); - final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); - final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); - final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); @@ -229,10 +227,10 @@ public void testExecuteNonRetryableResponse() throws Exception { final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( client::bulk, logFailureConsumer, PLUGIN_METRICS, () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder())); - final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); - final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); - final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); - final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); + final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); + final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); + final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation1).build()); accumulatingBulkRequest.addOperation(new BulkOperation.Builder().index(indexOperation2).build()); @@ -293,8 +291,8 @@ private static BulkResponseItem customBulkFailureResponse(final String index, fi return badResponse; } - private JsonData arbitraryDocument() { - return SizedJsonData.fromString("{}", new JacksonJsonpMapper()); + private SerializedJson arbitraryDocument() { + return SerializedJson.fromString("{}"); } private static class FakeClient { diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java index 8dbfbc065a..a46ac2ebda 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java @@ -9,7 +9,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; @@ -123,7 +122,7 @@ void getOperationAt_returns_the_correct_index() { @ParameterizedTest @ValueSource(longs = {0, 1, 2, 10, 50, 100}) void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { - final JsonData document = generateDocumentWithLength(inputDocumentSize); + final SizedDocument document = generateDocumentWithLength(inputDocumentSize); final BulkOperation bulkOperation = createBulkOperation(document); assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), @@ -133,7 +132,7 @@ void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_si @ParameterizedTest @ValueSource(longs = {0, 1, 2, 10, 50, 100}) void estimateSizeInBytesWithDocument_on_request_with_operations_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { - final JsonData document = generateDocumentWithLength(inputDocumentSize); + final SizedDocument document = generateDocumentWithLength(inputDocumentSize); final BulkOperation bulkOperation = createBulkOperation(document); final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); @@ -210,13 +209,13 @@ private BulkOperation createBulkOperation(Object document) { return bulkOperation; } - private JsonData generateDocument() { + private SizedDocument generateDocument() { return generateDocumentWithLength(10L); } - private JsonData generateDocumentWithLength(long documentLength) { - final SizedJsonData sizedJsonData = mock(SizedJsonData.class); - when(sizedJsonData.getDocumentSize()).thenReturn(documentLength); - return sizedJsonData; + private SizedDocument generateDocumentWithLength(long documentLength) { + final SizedDocument sizedDocument = mock(SizedDocument.class); + when(sizedDocument.getDocumentSize()).thenReturn(documentLength); + return sizedDocument; } } \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapperTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapperTest.java new file mode 100644 index 0000000000..aafe37e9a6 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/PreSerializedJsonpMapperTest.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.json.stream.JsonGenerator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.opensearch.client.json.jackson.JacksonJsonProvider; +import org.opensearch.client.json.jackson.JacksonJsonpParser; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class PreSerializedJsonpMapperTest { + private PreSerializedJsonpMapper createObjectUnderTest() { + return new PreSerializedJsonpMapper(); + } + + @Test + void jsonProvider_returns_a_non_null_JsonProvider_which_is_also_a_JacksonJsonProvider() { + final PreSerializedJsonpMapper objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.jsonProvider(), notNullValue()); + assertThat(objectUnderTest.jsonProvider(), instanceOf(JacksonJsonProvider.class)); + } + + @Test + void deserialize_is_able_to_create_objects() throws IOException { + final JsonParser jsonParser = JsonFactory.builder().build().createParser("{\"a\":\"b\"}"); + final JacksonJsonpParser jacksonJsonpParser = new JacksonJsonpParser(jsonParser); + final Map deserializedMap = createObjectUnderTest().deserialize(jacksonJsonpParser, Map.class); + + assertThat(deserializedMap, notNullValue()); + assertThat(deserializedMap, hasKey("a")); + assertThat(deserializedMap.get("a"), equalTo("b")); + } + + @Nested + class WithSerializedJson { + + private byte[] documentBytes; + private SerializedJson serializedJson; + + @BeforeEach + void setUp() { + final String notActuallyJsonString = UUID.randomUUID().toString(); + documentBytes = notActuallyJsonString.getBytes(StandardCharsets.UTF_8); + serializedJson = mock(SerializedJson.class); + when(serializedJson.getSerializedJson()).thenReturn(documentBytes); + } + + @Test + void serialize_on_SerializedJson_writes_directly_to_that_outputStream() throws IOException { + final OutputStream outputStream = mock(OutputStream.class); + + final PreSerializedJsonpMapper objectUnderTest = createObjectUnderTest(); + final JsonGenerator generator = objectUnderTest.jsonProvider().createGenerator(outputStream); + + objectUnderTest.serialize(serializedJson, generator); + + verify(outputStream).write(documentBytes); + } + + @Test + void serialize_on_SerializedJson_with_an_external_JsonGenerator_throws_exception() throws IOException { + final OutputStream outputStream = mock(OutputStream.class); + + final JsonGenerator generator = mock(JsonGenerator.class); + + final PreSerializedJsonpMapper objectUnderTest = createObjectUnderTest(); + + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.serialize(serializedJson, generator)); + } + } + + @Test + void serialize_on_Map_uses_Jackson_serializer() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + final PreSerializedJsonpMapper objectUnderTest = createObjectUnderTest(); + final JsonGenerator generator = objectUnderTest.jsonProvider().createGenerator(outputStream); + + final Map document = Collections.singletonMap(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + objectUnderTest.serialize(document, generator); + + final ObjectMapper objectMapper = new ObjectMapper(); + final String expectedSerializedJson = objectMapper.writeValueAsString(document); + + assertThat(new String(outputStream.toByteArray()), equalTo(expectedSerializedJson)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java new file mode 100644 index 0000000000..cbbfe10aee --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; + +class SerializedJsonImplTest { + private int documentSize; + private byte[] documentBytes; + + @BeforeEach + void setUp() { + Random random = new Random(); + documentSize = random.nextInt(1_000) + 100; + + documentBytes = new byte[documentSize]; + } + + private SerializedJsonImpl createObjectUnderTest() { + return new SerializedJsonImpl(documentBytes); + } + + @Test + void getDocumentSize_returns_size_of_the_document_byte_array() { + assertThat(createObjectUnderTest().getDocumentSize(), equalTo((long) documentSize)); + } + + @Test + void getSerializedJson_returns_the_document_byte_array() { + assertThat(createObjectUnderTest().getSerializedJson(), sameInstance(documentBytes)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java new file mode 100644 index 0000000000..fd84581e64 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonTest.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class SerializedJsonTest { + @Test + void fromString_returns_SerializedJsonImpl() { + assertThat(SerializedJson.fromString("{}"), instanceOf(SerializedJsonImpl.class)); + } + + @Test + void fromString_throws_if_the_jsonString_is_null() { + assertThrows(NullPointerException.class, () -> SerializedJson.fromString(null)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java deleted file mode 100644 index 96df204ff9..0000000000 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/bulk/SizedJsonDataImplTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.amazon.dataprepper.plugins.sink.opensearch.bulk; - -import jakarta.json.JsonValue; -import jakarta.json.stream.JsonGenerator; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.opensearch.client.json.JsonData; -import org.opensearch.client.json.JsonpDeserializer; -import org.opensearch.client.json.JsonpMapper; - -import java.util.Random; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class SizedJsonDataImplTest { - private JsonData innerJsonData; - private long documentSize; - - @BeforeEach - void setUp() { - Random random = new Random(); - innerJsonData = mock(JsonData.class); - documentSize = random.nextInt(10_000) + 100; - } - - private SizedJsonDataImpl createObjectUnderTest() { - return new SizedJsonDataImpl(innerJsonData, documentSize); - } - - @Test - void getDocumentSize_returns_the_documentSize() { - assertThat(createObjectUnderTest().getDocumentSize(), equalTo(documentSize)); - } - - @Nested - class ToJson { - private JsonValue jsonValue; - - @BeforeEach - void setUp() { - jsonValue = mock(JsonValue.class); - } - - @Test - void toJson_returns_inner_JsonData_toJson() { - when(innerJsonData.toJson()).thenReturn(jsonValue); - - assertThat(createObjectUnderTest().toJson(), equalTo(jsonValue)); - } - - @Test - void toJson_with_mapper_returns_inner_JsonData_toJson() { - JsonpMapper jsonpMapper = mock(JsonpMapper.class); - when(innerJsonData.toJson(jsonpMapper)).thenReturn(jsonValue); - - assertThat(createObjectUnderTest().toJson(jsonpMapper), equalTo(jsonValue)); - } - } - - @Nested - class ToClass { - private Class toClass; - private Object expectedToObject; - - @BeforeEach - void setUp() { - toClass = String.class; - - expectedToObject = mock(Object.class); - } - - @Test - void to_returns_inner_JsonData_to() { - when(innerJsonData.to(toClass)).thenReturn(expectedToObject); - - assertThat(createObjectUnderTest().to(toClass), equalTo(expectedToObject)); - } - - @Test - void to_with_mapper_returns_inner_JsonData_to() { - JsonpMapper jsonpMapper = mock(JsonpMapper.class); - when(innerJsonData.to(toClass, jsonpMapper)).thenReturn(expectedToObject); - - assertThat(createObjectUnderTest().to(toClass, jsonpMapper), equalTo(expectedToObject)); - } - } - - @Nested - class Deserialize { - private JsonpDeserializer jsonpDeserializer; - private Object expectedDeserializedObject; - - @BeforeEach - void setUp() { - jsonpDeserializer = mock(JsonpDeserializer.class); - expectedDeserializedObject = mock(Object.class); - } - - @Test - void deserialize_returns_inner_JsonData_deserialize() { - when(innerJsonData.deserialize(jsonpDeserializer)).thenReturn(expectedDeserializedObject); - - assertThat(createObjectUnderTest().deserialize(jsonpDeserializer), equalTo(expectedDeserializedObject)); - } - - @Test - void deserialize_with_mapper_returns_inner_JsonData_deserialize() { - JsonpMapper jsonpMapper = mock(JsonpMapper.class); - when(innerJsonData.deserialize(jsonpDeserializer, jsonpMapper)).thenReturn(expectedDeserializedObject); - - assertThat(createObjectUnderTest().deserialize(jsonpDeserializer, jsonpMapper), equalTo(expectedDeserializedObject)); - } - } - - @Test - void serialize_calls_inner_JsonData_serialize() { - JsonGenerator generator = mock(JsonGenerator.class); - JsonpMapper mapper = mock(JsonpMapper.class); - - createObjectUnderTest().serialize(generator, mapper); - - verify(innerJsonData).serialize(generator, mapper); - } -} \ No newline at end of file