Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opensearch Sink: refactor all index related operations into IndexManager classes for easier future extension #361

Merged
merged 2 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,18 @@
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.AbstractSink;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManager;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexTemplatesRequest;
import org.opensearch.client.indices.GetIndexTemplatesResponse;
import org.opensearch.client.indices.IndexTemplateMetadata;
import org.opensearch.client.indices.IndexTemplatesExistRequest;
import org.opensearch.client.indices.PutIndexTemplateRequest;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +37,6 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -61,29 +49,31 @@ public class OpenSearchSink extends AbstractSink<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
// Pulled from BulkRequest to make estimation of bytes consistent
private static final int REQUEST_OVERHEAD = 50;
protected static final String INDEX_ALIAS_USED_AS_INDEX_ERROR = "Invalid alias name [%s], an index exists with the same name as the alias";

private BufferedWriter dlqWriter;
private final OpenSearchSinkConfiguration esSinkConfig;
private final OpenSearchSinkConfiguration openSearchSinkConfig;
private final IndexManagerFactory indexManagerFactory;
private RestHighLevelClient restHighLevelClient;
private IndexManager indexManager;
private Supplier<BulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private final long bulkSize;
private final String indexType;
private final IndexType indexType;
private final String documentIdField;

private final Timer bulkRequestTimer;
private final Counter bulkRequestErrorsCounter;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);

this.esSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(esSinkConfig.getIndexConfiguration().getBulkSize());
this.indexType = esSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = esSinkConfig.getIndexConfiguration().getDocumentIdField();
this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField();
this.indexManagerFactory = new IndexManagerFactory();

try {
initialize();
} catch (final IOException e) {
Expand All @@ -94,19 +84,20 @@ public OpenSearchSink(final PluginSetting pluginSetting) {

public void initialize() throws IOException {
LOG.info("Initializing OpenSearch sink");
restHighLevelClient = esSinkConfig.getConnectionConfiguration().createClient();
final boolean isISMEnabled = IndexStateManagement.checkISMEnabled(restHighLevelClient);
final Optional<String> policyIdOptional = isISMEnabled ? IndexStateManagement.checkAndCreatePolicy(restHighLevelClient, indexType) :
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient();
indexManager = indexManagerFactory.getIndexManager(indexType, restHighLevelClient, openSearchSinkConfig);
final boolean isISMEnabled = indexManager.checkISMEnabled();
final Optional<String> policyIdOptional = isISMEnabled ? indexManager.checkAndCreatePolicy() :
Optional.empty();
if (!esSinkConfig.getIndexConfiguration().getIndexTemplate().isEmpty()) {
checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
if (!openSearchSinkConfig.getIndexConfiguration().getIndexTemplate().isEmpty()) {
indexManager.checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
}
final String dlqFile = esSinkConfig.getRetryConfiguration().getDlqFile();
final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile();
if (dlqFile != null) {
dlqWriter = Files.newBufferedWriter(Paths.get(dlqFile), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
checkAndCreateIndex();
bulkRequestSupplier = () -> new BulkRequest(esSinkConfig.getIndexConfiguration().getIndexAlias());
indexManager.checkAndCreateIndex();
bulkRequestSupplier = () -> new BulkRequest(openSearchSinkConfig.getIndexConfiguration().getIndexAlias());
bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT),
this::logFailure,
Expand Down Expand Up @@ -164,115 +155,6 @@ private void flushBatch(final BulkRequest bulkRequest) {
});
}

private void checkAndCreateIndexTemplate(final boolean isISMEnabled, final String ismPolicyId) throws IOException {
final String indexAlias = esSinkConfig.getIndexConfiguration().getIndexAlias();
final String indexTemplateName = indexAlias + "-index-template";

// Check existing index template version - only overwrite if version is less than or does not exist
if (!shouldCreateIndexTemplate(indexTemplateName)) {
return;
}

final PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexTemplateName);
final boolean isRaw = indexType.equals(IndexConstants.RAW);
if (isRaw) {
putIndexTemplateRequest.patterns(Collections.singletonList(indexAlias + "-*"));
} else {
putIndexTemplateRequest.patterns(Collections.singletonList(indexAlias));
}
if (isISMEnabled) {
IndexStateManagement.attachPolicy(esSinkConfig.getIndexConfiguration(), ismPolicyId, indexAlias);
}

putIndexTemplateRequest.source(esSinkConfig.getIndexConfiguration().getIndexTemplate());
restHighLevelClient.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
}

// TODO: Unit tests for this (and for the rest of the class)
private boolean shouldCreateIndexTemplate(final String indexTemplateName) throws IOException {
final Optional<IndexTemplateMetadata> indexTemplateMetadataOptional = getIndexTemplateMetadata(indexTemplateName);
if (indexTemplateMetadataOptional.isPresent()) {
final Integer existingTemplateVersion = indexTemplateMetadataOptional.get().version();
LOG.info("Found version {} for existing index template {}", existingTemplateVersion, indexTemplateName);

final int newTemplateVersion = (int) esSinkConfig.getIndexConfiguration().getIndexTemplate().getOrDefault("version", 0);

if (existingTemplateVersion != null && existingTemplateVersion >= newTemplateVersion) {
LOG.info("Index template {} should not be updated, current version {} >= existing version {}",
indexTemplateName,
existingTemplateVersion,
newTemplateVersion);
return false;

} else {
LOG.info("Index template {} should be updated from version {} to version {}",
indexTemplateName,
existingTemplateVersion,
newTemplateVersion);
return true;
}
} else {
LOG.info("Index template {} does not exist and should be created", indexTemplateName);
return true;
}
}

private Optional<IndexTemplateMetadata> getIndexTemplateMetadata(final String indexTemplateName) throws IOException {
final IndexTemplatesExistRequest existsRequest = new IndexTemplatesExistRequest(indexTemplateName);
final boolean exists = restHighLevelClient.indices().existsTemplate(existsRequest, RequestOptions.DEFAULT);
if (!exists) {
return Optional.empty();
}

final GetIndexTemplatesRequest request = new GetIndexTemplatesRequest(indexTemplateName);
final GetIndexTemplatesResponse response = restHighLevelClient.indices().getIndexTemplate(request, RequestOptions.DEFAULT);

if (response.getIndexTemplates().size() == 1) {
return Optional.of(response.getIndexTemplates().get(0));
} else {
throw new RuntimeException(String.format("Found multiple index templates (%s) result when querying for %s",
response.getIndexTemplates().size(),
indexTemplateName));
}
}

private void checkAndCreateIndex() throws IOException {
// Check alias exists
final String indexAlias = esSinkConfig.getIndexConfiguration().getIndexAlias();
final boolean isRaw = indexType.equals(IndexConstants.RAW);
final boolean exists = isRaw ?
restHighLevelClient.indices().existsAlias(new GetAliasesRequest().aliases(indexAlias), RequestOptions.DEFAULT) :
restHighLevelClient.indices().exists(new GetIndexRequest(indexAlias), RequestOptions.DEFAULT);
if (!exists) {
// TODO: use date as suffix?
final String initialIndexName;
final CreateIndexRequest createIndexRequest;
if (isRaw) {
initialIndexName = indexAlias + "-000001";
createIndexRequest = new CreateIndexRequest(initialIndexName);
createIndexRequest.alias(new Alias(indexAlias).writeIndex(true));
} else {
initialIndexName = indexAlias;
createIndexRequest = new CreateIndexRequest(initialIndexName);
}
try {
restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
} catch (OpenSearchException e) {
if (e.getMessage().contains("resource_already_exists_exception")) {
// Do nothing - likely caused by a race condition where the resource was created
// by another host before this host's restClient made its request
} else if (e.getMessage().contains(String.format(INDEX_ALIAS_USED_AS_INDEX_ERROR, indexAlias))) {
// TODO: replace IOException with custom data-prepper exception
throw new IOException(
String.format("An index exists with the same name as the reserved index alias name [%s], please delete or migrate the existing index",
indexAlias));
} else {
throw new IOException(e);
}
}
}
}

private Map<String, Object> getMapFromJson(final String documentJson) throws IOException {
final XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, documentJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package com.amazon.dataprepper.plugins.sink.opensearch;

import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import static com.google.common.base.Preconditions.checkNotNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.amazon.dataprepper.plugins.sink.opensearch.index;

import com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import com.amazon.dataprepper.plugins.sink.opensearch.index.ismpolicy.NoIsmPolicyManagement;
import org.opensearch.client.RestHighLevelClient;

public class DefaultIndexManager extends IndexManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this level of abstraction? Is the lack of dependecy injection in this project impacting eliminating this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if there is a DI framework like Guice, it may help eliminate children classes. There are other reasons I like have this inheritance:

  1. It allows us to clearly separately instantiate an IndexManager class of specific purpose.
  2. Because we have no.1, we can separate the unit test for different index types into different unit test classes.
  3. It would be clearer and time-saving for users of index manager to figure out which setup to use giving an index type. A programatic user of index manage (in my implementation, it is the index manager factory) also can have less to worry about and have a simpler implementation.
  4. It is easier for runtime debugging. It's easier to figure out which index type the index manager instance is for

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping these implementations of IndexManager is still important. The OpenSearch plugin defines what each of these types is via an enum which is configured in the pipeline. This strong correspondence between the enum and a known setup for the index lends itself well to these classes (DefaultIndexManager, TraceAnalyticsManager).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revist this when we have injection. I believe all of this could be achieved with injection via a map of ENUMs to different implementations of an IndeManger. I believe this is a simplier design and reduces our reliance on inheritence. This does not limit our ability to test the different configuration options nor should it decrease runtime debugging or increase time for users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the root problem is that these inherited classes are being exposed. From what I can tell of the code, we should be able to make them private static inner classes of the IndexManagerFactory class. At that point, it matters little whether it has its own inherited classes to expose them, or builds them all manually. It is just an implementation detail of IndexManagerFactory. I think a quick turnaround PR with this change would be appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Good suggestion, David. I am making the IndexManger
sub-classes private in the factory class. See: #414

I didn't make them static. I have not found being static is necessary.
Please let me know if you have a strong reason to make them static.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be static nested classes because they are not tied to the IndexManagerFactory class instance itself.

From the documentation:

A static nested class interacts with the instance members of its outer class (and other classes) just like any other top-level class. In effect, a static nested class is behaviorally a top-level class that has been nested in another top-level class for packaging convenience. Inner Class and Nested Static Class Example also demonstrates this.

https://docs.oracle.com/javase/tutorial/java/javaOO/nested.html


public DefaultIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration) {
super(restHighLevelClient, openSearchSinkConfiguration);
this.ismPolicyManagementStrategy = new NoIsmPolicyManagement(restHighLevelClient);
}

}
Loading