Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve OpenSearch sink performance within the opensearch-java client #1391

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,14 @@ static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
Function<String, Record> stringModel = jsonString -> new Record(jsonString);
Function<String, Record> stringModel = jsonString -> {
try {
// Normalize the JSON string.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the normalization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the assertions in the tests that assert the number of bytes written. The JSON is pretty formatted. So for Record<String> these are larger strings (and thus more bytes) than Record<Event>. The make the assertions later simpler, I just make all JSON non-pretty and thus the same byte size.

return new Record(objectMapper.writeValueAsString(objectMapper.readValue(jsonString, Map.class)));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
};
Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingBulkRequest;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SizedJsonData;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper;
import com.amazon.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import jakarta.json.JsonObject;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
Expand Down Expand Up @@ -73,6 +72,7 @@ public class OpenSearchSink extends AbstractSink<Record<Object>> {
private final Counter bulkRequestErrorsCounter;
private final DistributionSummary bulkRequestSizeBytesSummary;
private OpenSearchClient openSearchClient;
private ObjectMapper objectMapper;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
Expand Down Expand Up @@ -110,7 +110,8 @@ public void initialize() throws IOException {
}
indexManager.checkAndCreateIndex();

OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new JacksonJsonpMapper());

OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper());
openSearchClient = new OpenSearchClient(transport);
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder().index(indexManager.getIndexAlias()));
bulkRetryStrategy = new BulkRetryStrategy(
Expand All @@ -119,6 +120,8 @@ public void initialize() throws IOException {
pluginMetrics,
bulkRequestSupplier);
LOG.info("Initialized OpenSearch sink");

objectMapper = new ObjectMapper();
}

@Override
Expand All @@ -127,17 +130,25 @@ public void doOutput(final Collection<Record<Object>> records) {
return;
}



AccumulatingBulkRequest<BulkOperation, BulkRequest> bulkRequest = bulkRequestSupplier.get();
for (final Record<Object> record : records) {
final JsonData document = getDocument(record.getData());
final SerializedJson document = getDocument(record.getData());

final IndexOperation.Builder<Object> indexOperationBuilder = new IndexOperation.Builder<>()
.index(indexManager.getIndexAlias())
.document(document);

final JsonObject jsonObject = document.toJson().asJsonObject();
if(jsonObject != null) {
final String docId = (String) jsonObject.getString(documentIdField, null);

final Map documentAsMap;
try {
documentAsMap = objectMapper.readValue(document.getSerializedJson(), Map.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
if(documentAsMap != null) {
final String docId = (String) documentAsMap.get(documentIdField);
if (docId != null) {
indexOperationBuilder.id(docId);
}
Expand All @@ -163,7 +174,7 @@ public void doOutput(final Collection<Record<Object>> records) {

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private JsonData getDocument(final Object object) {
private SerializedJson getDocument(final Object object) {
final String jsonString;
if (object instanceof String) {
jsonString = (String) object;
Expand All @@ -174,7 +185,7 @@ private JsonData getDocument(final Object object) {
throw new RuntimeException("Invalid record type. OpenSearch sink only supports String and Events");
}

return SizedJsonData.fromString(jsonString, openSearchClient._transport().jsonpMapper());
return SerializedJson.fromString(jsonString);
}

private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.common.unit.ByteSizeValue;

Expand Down Expand Up @@ -64,8 +63,8 @@ public static String bulkOperationToString(BulkOperation bulkOperation) {
}

private static String extractDocumentSource(BulkOperation bulkOperation) {
final JsonData document = (JsonData) bulkOperation.index().document();
final SerializedJson document = (SerializedJson) bulkOperation.index().document();

return document.toJson().toString();
return new String(document.getSerializedJson());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ private long estimateBulkOperationSize(BulkOperation bulkOperation) {
if (anyDocument == null)
return OPERATION_OVERHEAD;

if (!(anyDocument instanceof SizedJsonData)) {
throw new IllegalArgumentException("Only SizedJsonData is permitted for accumulating bulk requests. " + bulkOperation);
if (!(anyDocument instanceof SizedDocument)) {
throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation);
}

SizedJsonData sizedDocument = (SizedJsonData) anyDocument;
SizedDocument sizedDocument = (SizedDocument) anyDocument;

final long documentLength = sizedDocument.getDocumentSize();
return documentLength + OPERATION_OVERHEAD;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonGenerator;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonProvider;
import org.opensearch.client.json.jackson.JacksonJsonpGenerator;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;

import java.io.IOException;
import java.io.OutputStream;

/**
* An implementation of the opensearch-java client's {@link JsonpMapper}. It can avoid duplicate
* serialization by use of the {@link SerializedJson} interface. For values that inherit from
* {@link SerializedJson}, it will not re-serialize these. For other values, it uses Jackson
* serialization via the {@link JacksonJsonpMapper}.
*/
public class PreSerializedJsonpMapper implements JsonpMapper {

private final JsonpMapper innerMapper;
private final PreSerializedJsonProvider jsonProvider;

public PreSerializedJsonpMapper() {
innerMapper = new JacksonJsonpMapper();
jsonProvider = new PreSerializedJsonProvider();
}

@Override
public JsonProvider jsonProvider() {
return jsonProvider;
}

@Override
public <T> T deserialize(JsonParser parser, Class<T> clazz) {
return innerMapper.deserialize(parser, clazz);
}

@Override
public <T> void serialize(T value, JsonGenerator generator) {
if(value instanceof SerializedJson) {
if (! (generator instanceof PreSerializedJsonGenerator))
throw new IllegalArgumentException("Unsupported JsonGenerator");

final OutputStream outputStream = ((PreSerializedJsonGenerator) generator).outputStream;

try {
outputStream.write(((SerializedJson) value).getSerializedJson());
} catch (IOException e) {
throw new RuntimeException(e);
}

} else {
innerMapper.serialize(value, generator);
}
}

private static class PreSerializedJsonGenerator extends JacksonJsonpGenerator {
private OutputStream outputStream;

public PreSerializedJsonGenerator(com.fasterxml.jackson.core.JsonGenerator generator, OutputStream outputStream) {
super(generator);
this.outputStream = outputStream;
}
}


private static class PreSerializedJsonProvider extends JacksonJsonProvider {
@Override
public JsonGenerator createGenerator(OutputStream out) {
try {
return new PreSerializedJsonGenerator(jacksonJsonFactory().createGenerator(out), out);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
* Represents JSON which is already serialized for use in the {@link PreSerializedJsonpMapper}.
*/
public interface SerializedJson extends SizedDocument {

byte[] getSerializedJson();

/**
* Creates a new {@link SerializedJson} from a JSON string.
*
* @param jsonString The serialized JSON string which forms this JSON data.
* @return A new {@link SerializedJson}.
*/
static SerializedJson fromString(String jsonString) {
Objects.requireNonNull(jsonString);
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

class SerializedJsonImpl implements SerializedJson {
private byte[] document;

public SerializedJsonImpl(final byte[] document) {
this.document = document;
}

@Override
public long getDocumentSize() {
return document.length;
}

@Override
public byte[] getSerializedJson() {
return document;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch.bulk;

/**
* Represents an OpenSearch document with a serialized document size.
*/
public interface SizedDocument {
/**
* The size of this document.
*
* @return The document size in bytes
*/
long getDocumentSize();
}

This file was deleted.

Loading