Skip to content

Commit

Permalink
Use ILM for Watcher history deletion
Browse files Browse the repository at this point in the history
This commit adds an index lifecycle policy for the `.watch-history-*` indices.
This policy is automatically used for all new watch history indices.

This does not yet remove the automatic cleanup that the monitoring plugin does
for the .watch-history indices, and it does not touch the
`xpack.watcher.history.cleaner_service.enabled` setting.

Relates to elastic#32041
  • Loading branch information
dakrone committed Jan 14, 2019
1 parent 15aa376 commit f37b3c9
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* A utility class used for loading index lifecycle policies from the resource classpath
*/
public class LifecyclePolicyUtils {

private LifecyclePolicyUtils() {};

/**
* Loads a built-in index lifecycle policy and returns its source.
*/
public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) {
try {
BytesReference source = load(resource);
validate(source);

try (XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) {
return LifecyclePolicy.parse(parser, name);
}
} catch (Exception e) {
throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e);
}
}

/**
* Loads a resource from the classpath and returns it as a {@link BytesReference}
*/
private static BytesReference load(String name) throws IOException {
try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
}

/**
* Parses and validates that the source is not empty.
*/
private static void validate(BytesReference source) {
if (source == null) {
throw new ElasticsearchParseException("policy must not be null");
}

try {
XContentHelper.convertToMap(source, false, XContentType.JSON).v2();
} catch (NotXContentException e) {
throw new ElasticsearchParseException("policy must not be empty");
} catch (Exception e) {
throw new ElasticsearchParseException("invalid policy", e);
}
}
}
10 changes: 10 additions & 0 deletions x-pack/plugin/core/src/main/resources/watch-history-policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"phases": {
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
1 change: 1 addition & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "watch-history-policy",
"index.format": 6
},
"mappings": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String lifecycleName = restRequest.param("name");
XContentParser parser = restRequest.contentParser();
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
try (XContentParser parser = restRequest.contentParser()) {
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));

return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public ClusterState execute(ClusterState currentState) throws Exception {
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders,
nextVersion, Instant.now().toEpochMilli());
newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ integTestCluster {
dependsOn copyKeyCerts
setting 'xpack.security.enabled', 'true'
setting 'xpack.ml.enabled', 'true'
setting 'xpack.watcher.enabled', 'false'
setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.authc.token.enabled', 'true'
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/watcher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
compileOnly project(path: ':plugins:transport-nio', configuration: 'runtime')

testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile "org.elasticsearch.plugin:x-pack-ilm:${version}"

// watcher deps
compile 'com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:r239'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
throw new UncheckedIOException(e);
}

new WatcherIndexTemplateRegistry(clusterService, threadPool, client);
new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry);

// http client
httpClient = new HttpClient(settings, getSslService(), cryptoService, clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;

import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand All @@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES
};

public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-policy", "/watch-history-policy.json");

private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class);

private final Client client;
private final ThreadPool threadPool;
private final TemplateConfig[] indexTemplates;
private final NamedXContentRegistry xContentRegistry;
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean();

public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) {
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.client = client;
this.threadPool = threadPool;
this.indexTemplates = TEMPLATE_CONFIGS;
this.xContentRegistry = xContentRegistry;
clusterService.addListener(this);
}

Expand All @@ -82,6 +95,7 @@ public void clusterChanged(ClusterChangedEvent event) {

if (event.localNodeMaster() || localNodeVersionAfterMaster) {
addTemplatesIfMissing(state);
addIndexLifecyclePolicyIfMissing(state);
}
}

Expand Down Expand Up @@ -127,6 +141,54 @@ public void onFailure(Exception e) {
});
}

// Package visible for testing
LifecyclePolicy loadWatcherHistoryPolicy() {
return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry);
}

private void addIndexLifecyclePolicyIfMissing(ClusterState state) {
if (historyPolicyCreationInProgress.compareAndSet(false, true)) {
final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy();

Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
final boolean needsUpdating = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName())))
// TODO: do we want to leave an existing policy as-is instead of replacing it with the one on disk?
.map(current -> current.equals(policyOnDisk) == false)
.orElse(true); // If there is no policy then one needs to be put;

if (needsUpdating) {
putPolicy(policyOnDisk, historyPolicyCreationInProgress);
}
}
}

private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request,
new ActionListener<PutLifecycleAction.Response>() {
@Override
public void onResponse(PutLifecycleAction.Response response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged",
policy.getName());
}
}

@Override
public void onFailure(Exception e) {
creationCheck.set(false);
logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]",
policy.getName()), e);
}
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));
});
}

public static boolean validate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) &&
Expand All @@ -153,9 +215,19 @@ public String getTemplateName() {

public byte[] load() {
String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
Pattern.quote("${xpack.watcher.template.version}"));
Pattern.quote("${xpack.watcher.template.version}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
}
}
public static class PolicyConfig {

private final String policyName;
private String fileName;

PolicyConfig(String templateName, String fileName) {
this.policyName = templateName;
this.fileName = fileName;
}
}
}
Loading

0 comments on commit f37b3c9

Please sign in to comment.