Skip to content

Commit

Permalink
Support disabling any form of OpenSearch index management (#1420)
Browse files Browse the repository at this point in the history
Support using Data Prepper without any form of OpenSearch index management through the addition of the management_disabled index_type. Resolves #1051.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored May 31, 2022
1 parent 7bc7dbf commit 8ced533
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 31 deletions.
18 changes: 17 additions & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`], which represents an index type. Defaults to `custom`. This index_type instructs Sink plugin what type of data it is handling.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management-disabled`], which represents an index type. Defaults to `custom`. This index_type instructs Sink plugin what type of data it is handling.

```
APM trace analytics raw span data type example:
Expand Down Expand Up @@ -144,6 +144,22 @@ If a single record turns out to be larger than the set bulk size, it will be sen

- `ism_policy_file` (optional): A String of absolute file path for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. OpenSearch documentation has more about [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/)

### Management Disabled Index Type

Normally Data Prepper manages the indices it needs within OpenSearch. When `index_type` is set to
`management_disabled`, Data Prepper will not perform any index management on your behalf. You must
provide your own mechanism for creating the indices with the correct mappings applied. Data Prepper
will not use ISM, create templates, or even validate that the index exists. This setting can be
useful when you want to minimize the OpenSearch permissions which you grant to Data Prepper. But,
you should only use it if you are proficient with OpenSearch index management.

With management disabled, Data Prepper can run with only being granted the
`["indices:data/write/index", "indices:data/write/bulk*", "indices:admin/mapping/put"]` permissions on
the desired indices. It is strongly recommend to retain the `"indices:admin/mapping/put"`
permission. If Data Prepper lacks this permission, then it cannot write any documents
that rely on dynamic mapping. You would need to take great care to ensure that every possible field
is explicitly mapped by your index template.

## Metrics

Besides common metrics in [AbstractSink](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/com/amazon/dataprepper/model/sink/AbstractSink.java), OpenSearch sink introduces the following custom metrics.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.sink.opensearch;

import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.common.Strings;
import org.opensearch.common.xcontent.XContentFactory;

import javax.ws.rs.HttpMethod;
import java.io.IOException;

class OpenSearchSecurityAccessor {
private static final String PLUGINS_SECURITY_API = "_opendistro/_security/api/";
private final RestClient client;

OpenSearchSecurityAccessor(final RestClient client) {
this.client = client;
}

void createBulkWritingRole(final String role, final String indexPattern) throws IOException {
createRole(role, indexPattern, "indices:data/write/index", "indices:data/write/bulk*");
}

private void createRole(final String role, final String indexPattern, final String... allowedActions) throws IOException {
final Request request = new Request(HttpMethod.PUT, PLUGINS_SECURITY_API + "roles/" + role);

final String createRoleJson = Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.startArray("index_permissions")
.startObject()
.array("index_patterns", new String[]{indexPattern})
.array("allowed_actions", allowedActions)
.endObject()
.endArray()
.endObject()
);
request.setJsonEntity(createRoleJson);
final Response response = client.performRequest(request);
}

public void createUser(final String username, final String password, final String... roles) throws IOException {
final Request request = new Request(HttpMethod.PUT, PLUGINS_SECURITY_API + "internalusers/" + username);

final String createUserJson = Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.field("password", password)
.array("opendistro_security_roles", roles)
.endObject()
);
request.setJsonEntity(createUserJson);
final Response response = client.performRequest(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -42,6 +43,7 @@
import javax.ws.rs.HttpMethod;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -55,6 +57,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -156,7 +159,7 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
public void testOutputRawSpanDefault(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
Expand Down Expand Up @@ -221,7 +224,7 @@ public void testOutputRawSpanDefault(Function<String, Record> stringToRecord) th

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputRawSpanWithDLQ(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
public void testOutputRawSpanWithDLQ(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
Expand Down Expand Up @@ -297,7 +300,7 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputServiceMapDefault(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
public void testOutputServiceMapDefault(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);
Expand Down Expand Up @@ -372,7 +375,7 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException {
MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false));
sink.shutdown();

String expectedIndexPolicyName = indexAlias + "-policy";
final String expectedIndexPolicyName = indexAlias + "-policy";
if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
Expand Down Expand Up @@ -461,7 +464,7 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates() throws IOEx

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
public void testOutputCustomIndex(Function<String, Record> stringToRecord) throws IOException, InterruptedException {
public void testOutputCustomIndex(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
Expand Down Expand Up @@ -511,6 +514,51 @@ public void testEventOutput() throws IOException, InterruptedException {
sink.shutdown();
}

@ParameterizedTest
@ArgumentsSource(MultipleRecordTypeArgumentProvider.class)
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testOutputManagementDisabled(final Function<String, Record> stringToRecord) throws IOException, InterruptedException {
final String testIndexAlias = "test-" + UUID.randomUUID();
final String roleName = UUID.randomUUID().toString();
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
final OpenSearchSecurityAccessor securityAccessor = new OpenSearchSecurityAccessor(client);
securityAccessor.createBulkWritingRole(roleName, testIndexAlias + "*");
securityAccessor.createUser(username, password, roleName);

final String testIdField = "someId";
final String testId = "foo";

final List<Record<Object>> testRecords = Collections.singletonList(stringToRecord.apply(generateCustomRecordJson(testIdField, testId)));

final Map<String, Object> metadata = initializeConfigurationMetadata(false, false, testIndexAlias, null);
metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue());
metadata.put(ConnectionConfiguration.USERNAME, username);
metadata.put(ConnectionConfiguration.PASSWORD, password);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata);
final OpenSearchSink sink = new OpenSearchSink(pluginSetting);

final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource("management-disabled-index-template.json")).getFile();
createIndexTemplate(testIndexAlias, testIndexAlias + "*", testTemplateFile);
createIndex(testIndexAlias);

sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
MatcherAssert.assertThat(retSources.size(), equalTo(1));
MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
sink.shutdown();

// verify metrics
final List<Measurement> bulkRequestLatencies = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_LATENCY).toString());
MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3));
// COUNT
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);
}

private Map<String, Object> initializeConfigurationMetadata (final boolean isRaw, final boolean isServiceMap, final String indexAlias,
final String templateFilePath) {
final Map<String, Object> metadata = new HashMap<>();
Expand Down Expand Up @@ -640,7 +688,7 @@ private void wipeAllOpenSearchIndices() throws IOException {
.forEach(indexName -> {
try {
client.performRequest(new Request("DELETE", "/" + indexName));
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
});
Expand All @@ -654,18 +702,18 @@ static class MultipleRecordTypeArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final ObjectMapper objectMapper = new ObjectMapper();
Function<String, Record> stringModel = jsonString -> {
final Function<String, Record> stringModel = jsonString -> {
try {
// Normalize the JSON string.
return new Record(objectMapper.writeValueAsString(objectMapper.readValue(jsonString, Map.class)));
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
};
Function<String, Record> eventModel = jsonString -> {
final Function<String, Record> eventModel = jsonString -> {
try {
return new Record(JacksonEvent.builder().withEventType(EventType.TRACE.toString()).withData(objectMapper.readValue(jsonString, Map.class)).build());
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
};
Expand All @@ -675,4 +723,22 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
);
}
}

private void createIndex(final String indexName) throws IOException {
final Request request = new Request(HttpMethod.PUT, indexName);
final Response response = client.performRequest(request);
}

private void createIndexTemplate(final String templateName, final String indexPattern, final String fileName) throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final Map<String, Object> templateJson = objectMapper.readValue(new FileInputStream(fileName), Map.class);

templateJson.put("index_patterns", indexPattern);

final Request request = new Request(HttpMethod.PUT, "_template/" + templateName);

final String createTemplateJson = objectMapper.writeValueAsString(templateJson);
request.setJsonEntity(createTemplateJson);
final Response response = client.performRequest(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

@DataPrepperPlugin(name = "opensearch", pluginType = Sink.class)
Expand Down Expand Up @@ -98,18 +97,12 @@ public void initialize() throws IOException {
LOG.info("Initializing OpenSearch sink");
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient();
indexManager = indexManagerFactory.getIndexManager(indexType, restHighLevelClient, openSearchSinkConfig);
final boolean isISMEnabled = indexManager.checkISMEnabled();
final Optional<String> policyIdOptional = isISMEnabled ? indexManager.checkAndCreatePolicy() :
Optional.empty();
if (!openSearchSinkConfig.getIndexConfiguration().getIndexTemplate().isEmpty()) {
indexManager.checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
}
final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile();
if (dlqFile != null) {
dlqWriter = Files.newBufferedWriter(Paths.get(dlqFile), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
indexManager.checkAndCreateIndex();

indexManager.setupIndex();

OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper());
openSearchClient = new OpenSearchClient(transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,35 @@ private ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}

public final boolean checkISMEnabled() throws IOException {
final boolean checkISMEnabled() throws IOException {
final ClusterGetSettingsRequest request = new ClusterGetSettingsRequest();
request.includeDefaults(true);
final ClusterGetSettingsResponse response = restHighLevelClient.cluster().getSettings(request, RequestOptions.DEFAULT);
final String enabled = response.getSetting(IndexConstants.ISM_ENABLED_SETTING);
return enabled != null && enabled.equals("true");
}

public final void checkAndCreateIndexTemplate(final boolean isISMEnabled, final String ismPolicyId) throws IOException {
/**
* Setups anything required for the index.
*
* @throws IOException
*/
public void setupIndex() throws IOException {
checkAndCreateIndexTemplate();
checkAndCreateIndex();
}

private void checkAndCreateIndexTemplate() throws IOException {
final boolean isISMEnabled = checkISMEnabled();
final Optional<String> policyIdOptional = isISMEnabled ?
ismPolicyManagementStrategy.checkAndCreatePolicy() :
Optional.empty();
if (!openSearchSinkConfiguration.getIndexConfiguration().getIndexTemplate().isEmpty()) {
checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null));
}
}

final void checkAndCreateIndexTemplate(final boolean isISMEnabled, final String ismPolicyId) throws IOException {
//If index prefix has a ending dash, then remove it to avoid two consecutive dashes.
final String indexPrefixWithoutTrailingDash = indexPrefix.replaceAll("-$", "");
final String indexTemplateName = indexPrefixWithoutTrailingDash + "-index-template";
Expand All @@ -173,7 +193,7 @@ public final void checkAndCreateIndexTemplate(final boolean isISMEnabled, final
restHighLevelClient.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
}

public final Optional<String> checkAndCreatePolicy() throws IOException {
final Optional<String> checkAndCreatePolicy() throws IOException {
return ismPolicyManagementStrategy.checkAndCreatePolicy();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public final IndexManager getIndexManager(final IndexType indexType,
return new TraceAnalyticsRawIndexManager(restHighLevelClient, openSearchSinkConfiguration);
case TRACE_ANALYTICS_SERVICE_MAP:
return new TraceAnalyticsServiceMapIndexManager(restHighLevelClient, openSearchSinkConfiguration);
case MANAGEMENT_DISABLED:
return new ManagementDisabledIndexManager(restHighLevelClient, openSearchSinkConfiguration);
default:
return new DefaultIndexManager(restHighLevelClient, openSearchSinkConfiguration);
}
Expand Down Expand Up @@ -70,4 +72,15 @@ public TraceAnalyticsServiceMapIndexManager(final RestHighLevelClient restHighLe
}

}

private class ManagementDisabledIndexManager extends IndexManager {
protected ManagementDisabledIndexManager(final RestHighLevelClient restHighLevelClient, final OpenSearchSinkConfiguration openSearchSinkConfiguration) {
super(restHighLevelClient, openSearchSinkConfiguration);
}

@Override
public void setupIndex() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
public enum IndexType {
TRACE_ANALYTICS_RAW("trace-analytics-raw"),
TRACE_ANALYTICS_SERVICE_MAP("trace-analytics-service-map"),
CUSTOM("custom");
CUSTOM("custom"),
MANAGEMENT_DISABLED("management_disabled");

private final String value;

Expand All @@ -32,7 +33,7 @@ public enum IndexType {
this.value = value;
}

String getValue(){
public String getValue(){
return value;
}

Expand Down
Loading

0 comments on commit 8ced533

Please sign in to comment.