Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For custom index type, create ISM policy on OpenSearch if ism_policy_file parameter is set. #433

Merged
merged 3 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -37,13 +38,15 @@ public class IndexConfiguration {
public static final String NUM_REPLICAS = "number_of_replicas";
public static final String BULK_SIZE = "bulk_size";
public static final String DOCUMENT_ID_FIELD = "document_id_field";
public static final String ISM_POLICY_FILE = "ism_policy_file";
public static final long DEFAULT_BULK_SIZE = 5L;

private final IndexType indexType;
private final String indexAlias;
private final Map<String, Object> indexTemplate;
private final String documentIdField;
private final long bulkSize;
private final Optional<String> ismPolicyFile;

@SuppressWarnings("unchecked")
private IndexConfiguration(final Builder builder) {
Expand Down Expand Up @@ -87,6 +90,7 @@ private IndexConfiguration(final Builder builder) {
documentIdField = "hashId";
}
this.documentIdField = documentIdField;
this.ismPolicyFile = builder.ismPolicyFile;
}

public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetting) {
Expand All @@ -109,6 +113,10 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
if (documentId != null) {
builder = builder.withDocumentIdField(documentId);
}

final String ismPolicyFile = pluginSetting.getStringOrDefault(ISM_POLICY_FILE, null);
builder = builder.withIsmPolicyFile(ismPolicyFile);

return builder.build();
}

Expand All @@ -132,6 +140,10 @@ public long getBulkSize() {
return bulkSize;
}

public Optional<String> getIsmPolicyFile() {
return ismPolicyFile;
}

/**
* This method is used in the creation of IndexConfiguration object. It takes in the template file path
* or index type and returns the index template read from the file or specific to index type or returns an
Expand Down Expand Up @@ -173,6 +185,7 @@ public static class Builder {
private int numReplicas;
private String documentIdField;
private long bulkSize = DEFAULT_BULK_SIZE;
private Optional<String> ismPolicyFile;

public Builder setIsRaw(final Boolean isRaw) {
checkNotNull(isRaw, "trace_analytics_raw cannot be null.");
Expand Down Expand Up @@ -220,6 +233,11 @@ public Builder withNumReplicas(final int numReplicas) {
return this;
}

public Builder withIsmPolicyFile(final String ismPolicyFile) {
this.ismPolicyFile = Optional.ofNullable(ismPolicyFile);
return this;
}

public IndexConfiguration build() {
return new IndexConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.amazon.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.opensearch.client.RestHighLevelClient;

import java.util.Optional;

public class IndexManagerFactory {

public final IndexManager getIndexManager(final IndexType indexType,
Expand All @@ -20,10 +22,20 @@ public final IndexManager getIndexManager(final IndexType indexType,

private static class DefaultIndexManager extends IndexManager {

public static final String POLICY_NAME_SUFFIX = "-policy";

public DefaultIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration) {
super(restHighLevelClient, openSearchSinkConfiguration);
this.ismPolicyManagementStrategy = new NoIsmPolicyManagement(restHighLevelClient);
final Optional<String> ismPolicyFile = openSearchSinkConfiguration.getIndexConfiguration().getIsmPolicyFile();
if (ismPolicyFile.isPresent()) {
final String indexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
final String indexPolicyName = indexAlias + POLICY_NAME_SUFFIX;
this.ismPolicyManagementStrategy = new IsmPolicyManagement(restHighLevelClient, indexPolicyName, ismPolicyFile.get());
} else {
//Policy file doesn't exist
this.ismPolicyManagementStrategy = new NoIsmPolicyManagement(restHighLevelClient);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.amazon.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
Expand All @@ -11,9 +15,11 @@

import javax.ws.rs.HttpMethod;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -27,36 +33,60 @@ class IsmPolicyManagement implements IsmPolicyManagementStrategy {
// TODO: replace with new _opensearch API
private static final String POLICY_MANAGEMENT_ENDPOINT = "/_opendistro/_ism/policies/";
public static final String DEFAULT_INDEX_SUFFIX = "-000001";
private static final String POLICY_FILE_ROOT_KEY = "policy";
private static final String POLICY_FILE_ISM_TEMPLATE_KEY = "ism_template";

private final RestHighLevelClient restHighLevelClient;
private final String policyName;
private final String policyFileWithIsmTemplate;
private final String policyFile;
private final String policyFileWithoutIsmTemplate;

public IsmPolicyManagement(final RestHighLevelClient restHighLevelClient,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if this constructor is only invoked by IndexManagerFactory for trace-analytics, we should change the modifier to protected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

George, Thanks a lot for the review! Good point, we should be able to change it from public to package private. Will make this change in an upcoming pull request.

final String policyName,
final String policyFileWithIsmTemplate,
final String policyFile,
final String policyFileWithoutIsmTemplate) {
checkNotNull(restHighLevelClient);
checkArgument(StringUtils.isNotEmpty(policyName));
checkArgument(StringUtils.isNotEmpty(policyFileWithIsmTemplate));
checkArgument(StringUtils.isNotEmpty(policyFile));
checkArgument(StringUtils.isNotEmpty(policyFileWithoutIsmTemplate));
this.restHighLevelClient = restHighLevelClient;
this.policyName = policyName;
this.policyFileWithIsmTemplate = policyFileWithIsmTemplate;
this.policyFile = policyFile;
this.policyFileWithoutIsmTemplate = policyFileWithoutIsmTemplate;
}

public IsmPolicyManagement(final RestHighLevelClient restHighLevelClient,
final String policyName,
final String policyFile) {
checkNotNull(restHighLevelClient);
checkArgument(StringUtils.isNotEmpty(policyName));
checkArgument(StringUtils.isNotEmpty(policyFile));
this.restHighLevelClient = restHighLevelClient;
this.policyName = policyName;
this.policyFile = policyFile;
this.policyFileWithoutIsmTemplate = null;
}

@Override
public Optional<String> checkAndCreatePolicy() throws IOException {
final String policyManagementEndpoint = POLICY_MANAGEMENT_ENDPOINT + policyName;
Request request = createPolicyRequestFromFile(policyManagementEndpoint, policyFileWithIsmTemplate);

String policyJsonString = retrievePolicyJsonString(policyFile);
Request request = createPolicyRequestFromFile(policyManagementEndpoint, policyJsonString);

try {
restHighLevelClient.getLowLevelClient().performRequest(request);
} catch (ResponseException e1) {
final String msg = e1.getMessage();
if (msg.contains("Invalid field: [ism_template]")) {
request = createPolicyRequestFromFile(policyManagementEndpoint, policyFileWithoutIsmTemplate);

if(StringUtils.isEmpty(policyFileWithoutIsmTemplate)) {
policyJsonString = dropIsmTemplateFromPolicy(policyJsonString);
} else {
policyJsonString = retrievePolicyJsonString(policyFileWithoutIsmTemplate);
}

request = createPolicyRequestFromFile(policyManagementEndpoint, policyJsonString);
try {
restHighLevelClient.getLowLevelClient().performRequest(request);
} catch (ResponseException e2) {
Expand Down Expand Up @@ -106,16 +136,33 @@ public CreateIndexRequest getCreateIndexRequest(final String indexAlias) {
return createIndexRequest;
}

private Request createPolicyRequestFromFile(final String endPoint, final String fileName) throws IOException {
private String retrievePolicyJsonString(final String fileName) throws IOException {
final File file = new File(fileName);
final URL policyFileUrl;
if (file.isAbsolute()) {
policyFileUrl = file.toURI().toURL();
} else {
policyFileUrl = getClass().getClassLoader().getResource(fileName);
}
final StringBuilder policyJsonBuffer = new StringBuilder();
try (final InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName);
try (final InputStream inputStream = policyFileUrl.openStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(Objects.requireNonNull(inputStream)))) {
reader.lines().forEach(line -> policyJsonBuffer.append(line).append("\n"));
}
return policyJsonBuffer.toString();
}

private Request createPolicyRequestFromFile(final String endPoint, final String policyJsonString) throws IOException {
final Request request = new Request(HttpMethod.PUT, endPoint);
request.setJsonEntity(policyJsonBuffer.toString());
request.setJsonEntity(policyJsonString);
return request;
}

private String dropIsmTemplateFromPolicy(final String policyJsonString) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode jsonNode = mapper.readTree(policyJsonString);
((ObjectNode)jsonNode.get(POLICY_FILE_ROOT_KEY)).remove(POLICY_FILE_ISM_TEMPLATE_KEY);
return jsonNode.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
public class OpenSearchSinkIT extends OpenSearchRestTestCase {
private static final String PLUGIN_NAME = "opensearch";
private static final String PIPELINE_NAME = "integTestPipeline";
public List<String> HOSTS = Arrays.stream(System.getProperty("tests.rest.cluster").split(","))
private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json";
private List<String> HOSTS = Arrays.stream(System.getProperty("tests.rest.cluster").split(","))
.map(ip -> String.format("%s://%s", getProtocol(), ip)).collect(Collectors.toList());
private static final String TEST_TEMPLATE_V1_FILE = "test-index-template.json";
private static final String TEST_TEMPLATE_V2_FILE = "test-index-template-v2.json";
Expand Down Expand Up @@ -279,7 +280,7 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
sink.shutdown();
}

public void testInstantiateSinkCustomIndex() throws IOException {
public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
Expand All @@ -295,6 +296,52 @@ public void testInstantiateSinkCustomIndex() throws IOException {
sink.shutdown();
}

public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException {
final String indexAlias = "sink-custom-index-ism-test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final Map<String, Object> metadata = initializeConfigurationMetadata(false, false, indexAlias, testTemplateFile);
metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE);
final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata);
OpenSearchSink sink = new OpenSearchSink(pluginSetting);
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client().performRequest(request);
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
final String index = String.format("%s-000001", indexAlias);
final Map<String, Object> mappings = getIndexMappings(index);
assertNotNull(mappings);
assertFalse((boolean) mappings.get("date_detection"));
sink.shutdown();

String expectedIndexPolicyName = indexAlias + "-policy";
if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(expectedIndexPolicyName, getIndexPolicyId(index)); }
);
}

// roll over initial index
request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
request.setJsonEntity("{ \"conditions\" : { } }\n");
response = client().performRequest(request);
assertEquals(SC_OK, response.getStatusLine().getStatusCode());

// Instantiate sink again
sink = new OpenSearchSink(pluginSetting);
// Make sure no new write index *-000001 is created under alias
final String rolloverIndexName = String.format("%s-000002", indexAlias);
request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
response = client().performRequest(request);
assertEquals(true, checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
assertEquals(expectedIndexPolicyName, getIndexPolicyId(rolloverIndexName));
}
}

public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOException {
final String testIndexAlias = "test-alias";
final String expectedIndexTemplateName = testIndexAlias + "-index-template";
Expand Down Expand Up @@ -377,8 +424,8 @@ public void testOutputCustomIndex() throws IOException, InterruptedException {
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
}

private PluginSetting generatePluginSetting(final boolean isRaw, final boolean isServiceMap, final String indexAlias,
final String templateFilePath) {
private Map<String, Object> initializeConfigurationMetadata (final boolean isRaw, final boolean isServiceMap, final String indexAlias,
final String templateFilePath) {
final Map<String, Object> metadata = new HashMap<>();
metadata.put(IndexConfiguration.TRACE_ANALYTICS_RAW_FLAG, isRaw);
metadata.put(IndexConfiguration.TRACE_ANALYTICS_SERVICE_MAP_FLAG, isServiceMap);
Expand All @@ -391,8 +438,17 @@ private PluginSetting generatePluginSetting(final boolean isRaw, final boolean i
metadata.put(ConnectionConfiguration.USERNAME, user);
metadata.put(ConnectionConfiguration.PASSWORD, password);
}
return metadata;
}

private PluginSetting generatePluginSetting(final boolean isRaw, final boolean isServiceMap, final String indexAlias,
final String templateFilePath) {
final Map<String, Object> metadata = initializeConfigurationMetadata(isRaw, isServiceMap, indexAlias, templateFilePath);
return generatePluginSettingByMetadata(metadata);
}

final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata);
private PluginSetting generatePluginSettingByMetadata(final Map<String, Object> configurationMetadata) {
final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, configurationMetadata);
pluginSetting.setPipelineName(PIPELINE_NAME);
return pluginSetting;
}
Expand Down
Loading