Skip to content

Commit

Permalink
Improve OpenSearch sink performance by creating a customer JsonpMappe…
Browse files Browse the repository at this point in the history
…r which avoids re-serializing bulk documents. (#1391)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored May 13, 2022
1 parent ebc9c67 commit 3158b45
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,14 @@ static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
Function<String, Record> stringModel = jsonString -> new Record(jsonString);
Function<String, Record> stringModel = jsonString -> {
try {
// Normalize the JSON string.
return new Record(objectMapper.writeValueAsString(objectMapper.readValue(jsonString, Map.class)));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import jakarta.json.JsonObject;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
Expand Down Expand Up @@ -73,6 +72,7 @@ public class OpenSearchSink extends AbstractSink<Record<Object>> {
private final Counter bulkRequestErrorsCounter;
private final DistributionSummary bulkRequestSizeBytesSummary;
private OpenSearchClient openSearchClient;
private ObjectMapper objectMapper;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
Expand Down Expand Up @@ -110,7 +110,8 @@ public void initialize() throws IOException {
}
indexManager.checkAndCreateIndex();

OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new JacksonJsonpMapper());

OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper());
openSearchClient = new OpenSearchClient(transport);
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder().index(indexManager.getIndexAlias()));
bulkRetryStrategy = new BulkRetryStrategy(
Expand All @@ -119,6 +120,8 @@ public void initialize() throws IOException {
pluginMetrics,
bulkRequestSupplier);
LOG.info("Initialized OpenSearch sink");

objectMapper = new ObjectMapper();
}

@Override
Expand All @@ -127,17 +130,25 @@ public void doOutput(final Collection<Record<Object>> records) {
return;
}



AccumulatingBulkRequest<BulkOperation, BulkRequest> bulkRequest = bulkRequestSupplier.get();
for (final Record<Object> record : records) {
final JsonData document = getDocument(record.getData());
final SerializedJson document = getDocument(record.getData());

final IndexOperation.Builder<Object> indexOperationBuilder = new IndexOperation.Builder<>()
.index(indexManager.getIndexAlias())
.document(document);

final JsonObject jsonObject = document.toJson().asJsonObject();
if(jsonObject != null) {
final String docId = (String) jsonObject.getString(documentIdField, null);

final Map documentAsMap;
try {
documentAsMap = objectMapper.readValue(document.getSerializedJson(), Map.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
if(documentAsMap != null) {
final String docId = (String) documentAsMap.get(documentIdField);
if (docId != null) {
indexOperationBuilder.id(docId);
}
Expand All @@ -163,7 +174,7 @@ public void doOutput(final Collection<Record<Object>> records) {

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private JsonData getDocument(final Object object) {
private SerializedJson getDocument(final Object object) {
final String jsonString;
if (object instanceof String) {
jsonString = (String) object;
Expand All @@ -174,7 +185,7 @@ private JsonData getDocument(final Object object) {
throw new RuntimeException("Invalid record type. OpenSearch sink only supports String and Events");
}

return SizedJsonData.fromString(jsonString, openSearchClient._transport().jsonpMapper());
return SerializedJson.fromString(jsonString);
}

private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.common.unit.ByteSizeValue;

Expand Down Expand Up @@ -64,8 +63,8 @@ public static String bulkOperationToString(BulkOperation bulkOperation) {
}

private static String extractDocumentSource(BulkOperation bulkOperation) {
final JsonData document = (JsonData) bulkOperation.index().document();
final SerializedJson document = (SerializedJson) bulkOperation.index().document();

return document.toJson().toString();
return new String(document.getSerializedJson());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ private long estimateBulkOperationSize(BulkOperation bulkOperation) {
if (anyDocument == null)
return OPERATION_OVERHEAD;

if (!(anyDocument instanceof SizedJsonData)) {
throw new IllegalArgumentException("Only SizedJsonData is permitted for accumulating bulk requests. " + bulkOperation);
if (!(anyDocument instanceof SizedDocument)) {
throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation);
}

SizedJsonData sizedDocument = (SizedJsonData) anyDocument;
SizedDocument sizedDocument = (SizedDocument) anyDocument;

final long documentLength = sizedDocument.getDocumentSize();
return documentLength + OPERATION_OVERHEAD;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonGenerator;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonProvider;
import org.opensearch.client.json.jackson.JacksonJsonpGenerator;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;

import java.io.IOException;
import java.io.OutputStream;

/**
* An implementation of the opensearch-java client's {@link JsonpMapper}. It can avoid duplicate
* serialization by use of the {@link SerializedJson} interface. For values that inherit from
* {@link SerializedJson}, it will not re-serialize these. For other values, it uses Jackson
* serialization via the {@link JacksonJsonpMapper}.
*/
public class PreSerializedJsonpMapper implements JsonpMapper {

private final JsonpMapper innerMapper;
private final PreSerializedJsonProvider jsonProvider;

public PreSerializedJsonpMapper() {
innerMapper = new JacksonJsonpMapper();
jsonProvider = new PreSerializedJsonProvider();
}

@Override
public JsonProvider jsonProvider() {
return jsonProvider;
}

@Override
public <T> T deserialize(JsonParser parser, Class<T> clazz) {
return innerMapper.deserialize(parser, clazz);
}

@Override
public <T> void serialize(T value, JsonGenerator generator) {
if(value instanceof SerializedJson) {
if (! (generator instanceof PreSerializedJsonGenerator))
throw new IllegalArgumentException("Unsupported JsonGenerator");

final OutputStream outputStream = ((PreSerializedJsonGenerator) generator).outputStream;

try {
outputStream.write(((SerializedJson) value).getSerializedJson());
} catch (IOException e) {
throw new RuntimeException(e);
}

} else {
innerMapper.serialize(value, generator);
}
}

private static class PreSerializedJsonGenerator extends JacksonJsonpGenerator {
private OutputStream outputStream;

public PreSerializedJsonGenerator(com.fasterxml.jackson.core.JsonGenerator generator, OutputStream outputStream) {
super(generator);
this.outputStream = outputStream;
}
}


private static class PreSerializedJsonProvider extends JacksonJsonProvider {
@Override
public JsonGenerator createGenerator(OutputStream out) {
try {
return new PreSerializedJsonGenerator(jacksonJsonFactory().createGenerator(out), out);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
* Represents JSON which is already serialized for use in the {@link PreSerializedJsonpMapper}.
*/
public interface SerializedJson extends SizedDocument {

byte[] getSerializedJson();

/**
* Creates a new {@link SerializedJson} from a JSON string.
*
* @param jsonString The serialized JSON string which forms this JSON data.
* @return A new {@link SerializedJson}.
*/
static SerializedJson fromString(String jsonString) {
Objects.requireNonNull(jsonString);
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

class SerializedJsonImpl implements SerializedJson {
private byte[] document;

public SerializedJsonImpl(final byte[] document) {
this.document = document;
}

@Override
public long getDocumentSize() {
return document.length;
}

@Override
public byte[] getSerializedJson() {
return document;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

/**
* Represents an OpenSearch document with a serialized document size.
*/
public interface SizedDocument {
/**
* The size of this document.
*
* @return The document size in bytes
*/
long getDocumentSize();
}

This file was deleted.

Loading

0 comments on commit 3158b45

Please sign in to comment.