Skip to content

Commit

Permalink
Opensearch Sink: refactor all index related operations into IndexMana…
Browse files Browse the repository at this point in the history
…ger classes for easier future extension

Signed-off-by: Han Jiang <[email protected]>
  • Loading branch information
jianghancn committed Oct 5, 2021
1 parent 3a669bf commit c4207e3
Show file tree
Hide file tree
Showing 20 changed files with 1,092 additions and 300 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,18 @@
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.AbstractSink;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexTemplatesRequest;
import org.opensearch.client.indices.GetIndexTemplatesResponse;
import org.opensearch.client.indices.IndexTemplateMetadata;
import org.opensearch.client.indices.IndexTemplatesExistRequest;
import org.opensearch.client.indices.PutIndexTemplateRequest;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +37,6 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -61,29 +49,29 @@ public class OpenSearchSink extends AbstractSink<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
// Pulled from BulkRequest to make estimation of bytes consistent
private static final int REQUEST_OVERHEAD = 50;
protected static final String INDEX_ALIAS_USED_AS_INDEX_ERROR = "Invalid alias name [%s], an index exists with the same name as the alias";

private BufferedWriter dlqWriter;
private final OpenSearchSinkConfiguration esSinkConfig;
private final OpenSearchSinkConfiguration openSearchSinkConfig;
private RestHighLevelClient restHighLevelClient;
private IndexManagerFactory indexManagerFactory;
private IndexManager indexManager;
private Supplier<BulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private final long bulkSize;
private final String indexType;
private final IndexType indexType;
private final String documentIdField;

private final Timer bulkRequestTimer;
private final Counter bulkRequestErrorsCounter;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);

this.esSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(esSinkConfig.getIndexConfiguration().getBulkSize());
this.indexType = esSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = esSinkConfig.getIndexConfiguration().getDocumentIdField();
this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField();
try {
initialize();
} catch (final IOException e) {
Expand All @@ -94,19 +82,21 @@ public OpenSearchSink(final PluginSetting pluginSetting) {

public void initialize() throws IOException {
LOG.info("Initializing OpenSearch sink");
restHighLevelClient = esSinkConfig.getConnectionConfiguration().createClient();
final boolean isISMEnabled = IndexStateManagement.checkISMEnabled(restHighLevelClient);
final Optional<String> policyIdOptional = isISMEnabled ? IndexStateManagement.checkAndCreatePolicy(restHighLevelClient, indexType) :
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient();
indexManagerFactory = new IndexManagerFactory();
indexManager = indexManagerFactory.getIndexManager(indexType, restHighLevelClient, openSearchSinkConfig);
final boolean isISMEnabled = indexManager.checkISMEnabled();
final Optional<String> policyIdOptional = isISMEnabled ? indexManager.checkAndCreatePolicy() :
Optional.empty();
if (!esSinkConfig.getIndexConfiguration().getIndexTemplate().isEmpty()) {
checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
if (!openSearchSinkConfig.getIndexConfiguration().getIndexTemplate().isEmpty()) {
indexManager.checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
}
final String dlqFile = esSinkConfig.getRetryConfiguration().getDlqFile();
final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile();
if (dlqFile != null) {
dlqWriter = Files.newBufferedWriter(Paths.get(dlqFile), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
checkAndCreateIndex();
bulkRequestSupplier = () -> new BulkRequest(esSinkConfig.getIndexConfiguration().getIndexAlias());
indexManager.checkAndCreateIndex();
bulkRequestSupplier = () -> new BulkRequest(openSearchSinkConfig.getIndexConfiguration().getIndexAlias());
bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT),
this::logFailure,
Expand Down Expand Up @@ -164,115 +154,6 @@ private void flushBatch(final BulkRequest bulkRequest) {
});
}

private void checkAndCreateIndexTemplate(final boolean isISMEnabled, final String ismPolicyId) throws IOException {
final String indexAlias = esSinkConfig.getIndexConfiguration().getIndexAlias();
final String indexTemplateName = indexAlias + "-index-template";

// Check existing index template version - only overwrite if version is less than or does not exist
if (!shouldCreateIndexTemplate(indexTemplateName)) {
return;
}

final PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexTemplateName);
final boolean isRaw = indexType.equals(IndexConstants.RAW);
if (isRaw) {
putIndexTemplateRequest.patterns(Collections.singletonList(indexAlias + "-*"));
} else {
putIndexTemplateRequest.patterns(Collections.singletonList(indexAlias));
}
if (isISMEnabled) {
IndexStateManagement.attachPolicy(esSinkConfig.getIndexConfiguration(), ismPolicyId, indexAlias);
}

putIndexTemplateRequest.source(esSinkConfig.getIndexConfiguration().getIndexTemplate());
restHighLevelClient.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
}

// TODO: Unit tests for this (and for the rest of the class)
private boolean shouldCreateIndexTemplate(final String indexTemplateName) throws IOException {
final Optional<IndexTemplateMetadata> indexTemplateMetadataOptional = getIndexTemplateMetadata(indexTemplateName);
if (indexTemplateMetadataOptional.isPresent()) {
final Integer existingTemplateVersion = indexTemplateMetadataOptional.get().version();
LOG.info("Found version {} for existing index template {}", existingTemplateVersion, indexTemplateName);

final int newTemplateVersion = (int) esSinkConfig.getIndexConfiguration().getIndexTemplate().getOrDefault("version", 0);

if (existingTemplateVersion != null && existingTemplateVersion >= newTemplateVersion) {
LOG.info("Index template {} should not be updated, current version {} >= existing version {}",
indexTemplateName,
existingTemplateVersion,
newTemplateVersion);
return false;

} else {
LOG.info("Index template {} should be updated from version {} to version {}",
indexTemplateName,
existingTemplateVersion,
newTemplateVersion);
return true;
}
} else {
LOG.info("Index template {} does not exist and should be created", indexTemplateName);
return true;
}
}

private Optional<IndexTemplateMetadata> getIndexTemplateMetadata(final String indexTemplateName) throws IOException {
final IndexTemplatesExistRequest existsRequest = new IndexTemplatesExistRequest(indexTemplateName);
final boolean exists = restHighLevelClient.indices().existsTemplate(existsRequest, RequestOptions.DEFAULT);
if (!exists) {
return Optional.empty();
}

final GetIndexTemplatesRequest request = new GetIndexTemplatesRequest(indexTemplateName);
final GetIndexTemplatesResponse response = restHighLevelClient.indices().getIndexTemplate(request, RequestOptions.DEFAULT);

if (response.getIndexTemplates().size() == 1) {
return Optional.of(response.getIndexTemplates().get(0));
} else {
throw new RuntimeException(String.format("Found multiple index templates (%s) result when querying for %s",
response.getIndexTemplates().size(),
indexTemplateName));
}
}

private void checkAndCreateIndex() throws IOException {
// Check alias exists
final String indexAlias = esSinkConfig.getIndexConfiguration().getIndexAlias();
final boolean isRaw = indexType.equals(IndexConstants.RAW);
final boolean exists = isRaw ?
restHighLevelClient.indices().existsAlias(new GetAliasesRequest().aliases(indexAlias), RequestOptions.DEFAULT) :
restHighLevelClient.indices().exists(new GetIndexRequest(indexAlias), RequestOptions.DEFAULT);
if (!exists) {
// TODO: use date as suffix?
final String initialIndexName;
final CreateIndexRequest createIndexRequest;
if (isRaw) {
initialIndexName = indexAlias + "-000001";
createIndexRequest = new CreateIndexRequest(initialIndexName);
createIndexRequest.alias(new Alias(indexAlias).writeIndex(true));
} else {
initialIndexName = indexAlias;
createIndexRequest = new CreateIndexRequest(initialIndexName);
}
try {
restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
} catch (OpenSearchException e) {
if (e.getMessage().contains("resource_already_exists_exception")) {
// Do nothing - likely caused by a race condition where the resource was created
// by another host before this host's restClient made its request
} else if (e.getMessage().contains(String.format(INDEX_ALIAS_USED_AS_INDEX_ERROR, indexAlias))) {
// TODO: replace IOException with custom data-prepper exception
throw new IOException(
String.format("An index exists with the same name as the reserved index alias name [%s], please delete or migrate the existing index",
indexAlias));
} else {
throw new IOException(e);
}
}
}
}

private Map<String, Object> getMapFromJson(final String documentJson) throws IOException {
final XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, documentJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package com.amazon.dataprepper.plugins.sink.opensearch;

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import static com.google.common.base.Preconditions.checkNotNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.amazon.dataprepper.plugins.sink.opensearch.index;

import com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.opensearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.Optional;

public class DefaultIndexManager extends IndexManager {

public DefaultIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration) {
super(restHighLevelClient, openSearchSinkConfiguration);
}

@Override
public Optional<String> checkAndCreatePolicy() throws IOException {
return Optional.empty();
}

}
Loading

0 comments on commit c4207e3

Please sign in to comment.