From f20d827aa758d607d19878917579522ea08222a3 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 6 Dec 2019 12:56:32 -0700 Subject: [PATCH 1/9] Add ILM histore store index This commit adds an ILM history store that tracks the lifecycle execution state as an index progresses through its ILM policy. ILM history documents store output similar to what the ILM explain API returns. An example document with ALL fields (not all documents will have all fields) would look like: ```json { "@timestamp": 1203012389, "policy": "my-ilm-policy", "index": "index-2019.1.1-000023", "index_age":123120, "success": true, "state": { "phase": "warm", "action": "allocate", "step": "ERROR", "failed_step": "update-settings", "is_auto-retryable_error": true, "creation_date": 12389012039, "phase_time": 12908389120, "action_time": 1283901209, "step_time": 123904107140, "phase_definition": "{\"policy\":\"ilm-history-ilm-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_size\":\"50gb\",\"max_age\":\"30d\"}}},\"version\":1,\"modified_date_in_millis\":1576517253463}", "step_info": "{... etc step info here as json ...}" }, "error_details": "java.lang.RuntimeException: etc\n\tcaused by:etc etc etc full stacktrace" } ``` These documents go into the `ilm-history-1-00000N` index to provide an audit trail of the operations ILM has performed. This history storage is enabled by default but can be disabled by setting `index.lifecycle.history_index_enabled` to `false.` Resolves #49180 --- docs/reference/settings/ilm-settings.asciidoc | 5 + .../test/rest/ESRestTestCase.java | 24 +- .../xpack/core/ilm/LifecycleSettings.java | 3 + .../resources/ilm-history-ilm-policy.json | 18 + .../core/src/main/resources/ilm-history.json | 83 ++++ .../ilm/TimeSeriesLifecycleActionsIT.java | 174 +++++++- .../xpack/ilm/ExecuteStepsUpdateTask.java | 52 ++- .../xpack/ilm/IndexLifecycle.java | 14 +- .../xpack/ilm/IndexLifecycleRunner.java | 128 +++++- .../xpack/ilm/IndexLifecycleService.java | 8 +- .../xpack/ilm/MoveToErrorStepUpdateTask.java | 13 +- .../xpack/ilm/history/ILMHistoryItem.java | 114 +++++ .../xpack/ilm/history/ILMHistoryStore.java | 189 +++++++++ .../history/ILMHistoryTemplateRegistry.java | 80 ++++ .../IndexLifecycleInitialisationTests.java | 3 +- .../xpack/ilm/IndexLifecycleRunnerTests.java | 77 +++- .../xpack/ilm/IndexLifecycleServiceTests.java | 2 +- .../ilm/MoveToErrorStepUpdateTaskTests.java | 8 +- .../ilm/history/ILMHistoryItemTests.java | 92 ++++ .../ilm/history/ILMHistoryStoreTests.java | 398 ++++++++++++++++++ 20 files changed, 1412 insertions(+), 73 deletions(-) create mode 100644 x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json create mode 100644 x-pack/plugin/core/src/main/resources/ilm-history.json create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java diff --git a/docs/reference/settings/ilm-settings.asciidoc b/docs/reference/settings/ilm-settings.asciidoc index 8781581a6e57d..946848017bf7b 100644 --- a/docs/reference/settings/ilm-settings.asciidoc +++ b/docs/reference/settings/ilm-settings.asciidoc @@ -14,6 +14,11 @@ ILM REST API endpoints and functionality. Defaults to `true`. (<>) How often {ilm} checks for indices that meet policy criteria. Defaults to `10m`. +`index.lifecycle.history_index_enabled`:: +Whether ILM's history index is enabled. If enabled, ILM will record the +history of actions taken as part of ILM policies to the `ilm-history-*` +indices. Defaults to `true`. + ==== Index level settings These index-level {ilm-init} settings are typically configured through index templates. For more information, see <>. diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 1eb52ac6de769..73ca1dde99ca8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -75,6 +75,7 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -448,6 +449,13 @@ protected boolean preserveILMPoliciesUponCompletion() { return false; } + /** + * A set of ILM policies that should be preserved between runs. + */ + protected Set preserveILMPolicyIds() { + return Collections.singleton("ilm-history-ilm-policy"); + } + /** * Returns whether to preserve auto-follow patterns. Defaults to not * preserving them. Only runs at all if xpack is installed on the cluster @@ -545,7 +553,7 @@ private void wipeCluster() throws Exception { } if (hasXPack && false == preserveILMPoliciesUponCompletion()) { - deleteAllILMPolicies(); + deleteAllILMPolicies(preserveILMPolicyIds()); } if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) { @@ -680,7 +688,7 @@ private void waitForPendingRollupTasks() throws Exception { waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); } - private static void deleteAllILMPolicies() throws IOException { + private static void deleteAllILMPolicies(Set exclusions) throws IOException { Map policies; try { @@ -699,9 +707,15 @@ private static void deleteAllILMPolicies() throws IOException { return; } - for (String policyName : policies.keySet()) { - adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); - } + policies.keySet().stream() + .filter(p -> exclusions.contains(p) == false) + .forEach(policyName -> { + try { + adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); + } catch (IOException e) { + throw new RuntimeException("failed to delete policy: " + policyName, e); + } + }); } private static void deleteAllSLMPolicies() throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java index de037dc6f034e..b10f8defcdfc7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java @@ -19,6 +19,7 @@ public class LifecycleSettings { public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date"; public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date"; + public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled"; public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; @@ -35,6 +36,8 @@ public class LifecycleSettings { Setting.longSetting(LIFECYCLE_ORIGINATION_DATE, -1, -1, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE, false, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED, + true, Setting.Property.NodeScope); public static final Setting SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, diff --git a/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json new file mode 100644 index 0000000000000..febae00bc3608 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json @@ -0,0 +1,18 @@ +{ + "phases": { + "hot": { + "actions": { + "rollover": { + "max_size": "50GB", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/ilm-history.json b/x-pack/plugin/core/src/main/resources/ilm-history.json new file mode 100644 index 0000000000000..ae9c50552b385 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history.json @@ -0,0 +1,83 @@ +{ + "index_patterns": [ + "ilm-history-${xpack.ilm_history.template.version}*" + ], + "order": 2147483647, + "settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "ilm-history-ilm-policy", + "index.lifecycle.rollover_alias": "ilm-history-${xpack.ilm_history.template.version}", + "index.format": 1 + }, + "mappings": { + "_doc": { + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "policy": { + "type": "keyword" + }, + "index": { + "type": "keyword" + }, + "index_age":{ + "type": "long" + }, + "success": { + "type": "boolean" + }, + "state": { + "type": "object", + "dynamic": true, + "properties": { + "phase": { + "type": "keyword" + }, + "action": { + "type": "keyword" + }, + "step": { + "type": "keyword" + }, + "failed_step": { + "type": "keyword" + }, + "is_auto-retryable_error": { + "type": "keyword" + }, + "creation_date": { + "type": "date", + "format": "epoch_millis" + }, + "phase_time": { + "type": "date", + "format": "epoch_millis" + }, + "action_time": { + "type": "date", + "format": "epoch_millis" + }, + "step_time": { + "type": "date", + "format": "epoch_millis" + }, + "phase_definition": { + "type": "text" + }, + "step_info": { + "type": "text" + } + } + }, + "error_details": { + "type": "text" + } + } + } + } +} diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 4e160c69efd13..706de403c31be 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -8,13 +8,14 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -37,8 +38,10 @@ import org.elasticsearch.xpack.core.ilm.SetPriorityAction; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.hamcrest.Matchers; import org.junit.Before; @@ -1001,6 +1004,173 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { assertBusy(() -> assertTrue(indexExists(thirdIndex))); } + public void testHistoryIsWrittenWithSuccess() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\",\n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true); + + // Index a document + index(client(), index +"-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed")); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready")); + } + + public void testHistoryIsWrittenWithFailure() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + false); + + // Index a document + index(client(), index +"-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> { + assertHistoryIsPresent(policy, index + "-1", false, "ERROR"); + }); + } + + public void testHistoryIsWrittenWithDeletion() throws Exception { + String index = "index"; + + createNewSingletonPolicy("delete", new DeleteAction()); + Request createIndexTemplate = new Request("PUT", "_template/delete_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + // Index should be created and then deleted by ILM + createIndexWithSettings(index, Settings.builder(), false); + + assertBusy(() -> { + logger.info("--> checking for index deletion..."); + Request existCheck = new Request("HEAD", "/" + index); + Response resp = client().performRequest(existCheck); + assertThat(resp.getStatusLine().getStatusCode(), equalTo(404)); + }); + + assertBusy(() -> { + assertHistoryIsPresent(policy, index, true, "delete", "delete", "wait-for-shard-history-leases"); + assertHistoryIsPresent(policy, index, true, "delete", "delete", "complete"); + }); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException { + assertHistoryIsPresent(policyName, indexName, success, null, null, stepName); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, + @Nullable String phase, @Nullable String action, String stepName) throws IOException { + logger.info("--> checking for history item [{}], [{}], success: [{}], phase: [{}], action: [{}], step: [{}]", + policyName, indexName, success, phase, action, stepName); + final Request historySearchRequest = new Request("GET", "ilm-history*/_search"); + historySearchRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"policy\": \"" + policyName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"success\": " + success + "\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"index\": \"" + indexName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"state.step\": \"" + stepName + "\"\n" + + " }\n" + + " }\n" + + (phase == null ? "" : ",{\"term\": {\"state.phase\": \"" + phase + "\"}}") + + (action == null ? "" : ",{\"term\": {\"state.action\": \"" + action + "\"}}") + + " ]\n" + + " }\n" + + " }\n" + + "}"); + Response historyResponse; + try { + historyResponse = client().performRequest(historySearchRequest); + Map historyResponseMap; + try (InputStream is = historyResponse.getEntity().getContent()) { + historyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + logger.info("--> history response: {}", historyResponseMap); + assertThat((int)((Map) ((Map) historyResponseMap.get("hits")).get("total")).get("value"), + greaterThanOrEqualTo(1)); + } catch (ResponseException e) { + // Throw AssertionError instead of an exception if the search fails so that assertBusy works as expected + logger.error(e); + fail("failed to perform search:" + e.getMessage()); + } + + // Finally, check that the history index is in a good state + Step.StepKey stepKey = getStepKeyForIndex("ilm-history-1-000001"); + assertEquals("hot", stepKey.getPhase()); + assertEquals(RolloverAction.NAME, stepKey.getAction()); + assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); + } + private void createFullPolicy(TimeValue hotTime) throws IOException { Map hotActions = new HashMap<>(); hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index 46364f7cb4021..b97944fe67bf7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -12,10 +12,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.ilm.ErrorStep; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; @@ -29,8 +32,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; private final IndexLifecycleRunner lifecycleRunner; - private LongSupplier nowSupplier; + private final LongSupplier nowSupplier; private Step.StepKey nextStepKey = null; + private Exception failure = null; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { @@ -115,7 +119,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException // wait for the next trigger to evaluate the // condition again logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); ClusterStateWaitStep.Result result; try { result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); @@ -124,22 +128,25 @@ public ClusterState execute(final ClusterState currentState) throws IOException } if (result.isComplete()) { logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - if (currentStep.getNextStepKey() == null) { + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); + if (nextStepKey == null) { return state; } else { state = IndexLifecycleTransition.moveClusterStateToStep(index, state, - currentStep.getNextStepKey(), nowSupplier, policyStepsRegistry,false); + nextStepKey, nowSupplier, policyStepsRegistry,false); } } else { - logger.trace("[{}] condition not met ({}) [{}], returning existing state", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey()); + final ToXContentObject stepInfo = result.getInfomationContext(); + if (logger.isTraceEnabled()) { + logger.trace("[{}] condition not met ({}) [{}], returning existing state (info: {})", + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), + Strings.toString(stepInfo)); + } // We may have executed a step and set "nextStepKey" to // a value, but in this case, since the condition was // not met, we can't advance any way, so don't attempt // to run the current step nextStepKey = null; - ToXContentObject stepInfo = result.getInfomationContext(); if (stepInfo == null) { return state; } else { @@ -169,13 +176,23 @@ public ClusterState execute(final ClusterState currentState) throws IOException public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (oldState.equals(newState) == false) { IndexMetaData indexMetaData = newState.metaData().index(index); - if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { - logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", - index.getName(), startStep.getKey(), nextStepKey); - // After the cluster state has been processed and we have moved - // to a new step, we need to conditionally execute the step iff - // it is an `AsyncAction` so that it is executed exactly once. - lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + if (indexMetaData != null) { + + LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) { + lifecycleRunner.registerFailedOperation(indexMetaData, failure); + } else { + lifecycleRunner.registerSuccessfulOperation(indexMetaData); + } + + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) { + logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", + index.getName(), startStep.getKey(), nextStepKey); + // After the cluster state has been processed and we have moved + // to a new step, we need to conditionally execute the step iff + // it is an `AsyncAction` so that it is executed exactly once. + lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + } } } } @@ -187,10 +204,9 @@ public void onFailure(String source, Exception e) { } private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException { + this.failure = cause; logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey); - MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, - nowSupplier, policyStepsRegistry::getStep); - return moveToErrorStepUpdateTask.execute(state); + return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep); } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 394174ed673f6..4051b291b9801 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -94,6 +94,8 @@ import org.elasticsearch.xpack.ilm.action.TransportRetryAction; import org.elasticsearch.xpack.ilm.action.TransportStartILMAction; import org.elasticsearch.xpack.ilm.action.TransportStopILMAction; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; +import org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry; import org.elasticsearch.xpack.slm.SLMInfoTransportAction; import org.elasticsearch.xpack.slm.SLMUsageTransportAction; import org.elasticsearch.xpack.slm.SnapshotLifecycleService; @@ -132,6 +134,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); + private final SetOnce ilmHistoryStore = new SetOnce<>(); private final SetOnce snapshotLifecycleService = new SetOnce<>(); private final SetOnce snapshotRetentionService = new SetOnce<>(); private final SetOnce snapshotHistoryStore = new SetOnce<>(); @@ -158,6 +161,7 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, + LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, @@ -171,8 +175,13 @@ public Collection createComponents(Client client, ClusterService cluster NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { final List components = new ArrayList<>(); if (ilmEnabled) { + // This registers a cluster state listener, so appears unused but is not. + @SuppressWarnings("unused") + ILMHistoryTemplateRegistry ilmTemplateRegistry = + new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); + ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), clusterService)); indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, - getClock(), System::currentTimeMillis, xContentRegistry)); + getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get())); components.add(indexLifecycleInitialisationService.get()); } if (slmEnabled) { @@ -308,7 +317,8 @@ public void onIndexModule(IndexModule indexModule) { @Override public void close() { try { - IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get()); + IOUtils.close(indexLifecycleInitialisationService.get(), ilmHistoryStore.get(), + snapshotLifecycleService.get(), snapshotRetentionService.get()); } catch (IOException e) { throw new ElasticsearchException("unable to close index lifecycle services", e); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 5d64c35498a3d..736d5decc1123 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; @@ -23,10 +24,13 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.util.function.LongSupplier; @@ -35,13 +39,15 @@ class IndexLifecycleRunner { private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); private final ThreadPool threadPool; - private PolicyStepsRegistry stepRegistry; - private ClusterService clusterService; - private LongSupplier nowSupplier; + private final ClusterService clusterService; + private final PolicyStepsRegistry stepRegistry; + private final ILMHistoryStore ilmHistoryStore; + private final LongSupplier nowSupplier; - IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, + IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ILMHistoryStore ilmHistoryStore, ClusterService clusterService, ThreadPool threadPool, LongSupplier nowSupplier) { this.stepRegistry = stepRegistry; + this.ilmHistoryStore = ilmHistoryStore; this.clusterService = clusterService; this.nowSupplier = nowSupplier; this.threadPool = threadPool; @@ -62,17 +68,29 @@ static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Inde } /** - * Return true or false depending on whether the index is ready to be in {@code phase} + * Calculate the index's origination time (in milliseconds) based on its + * metadata. Returns null if there is no lifecycle date and the origination + * date is not set. */ - boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + @Nullable + private static Long calculateOriginationMillis(final IndexMetaData indexMetaData) { LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); Long originationDate = indexMetaData.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, -1L); if (lifecycleState.getLifecycleDate() == null && originationDate == -1L) { - logger.trace("no index creation or origination date has been set yet"); + return null; + } + return originationDate == -1L ? lifecycleState.getLifecycleDate() : originationDate; + } + + /** + * Return true or false depending on whether the index is ready to be in {@code phase} + */ + boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + final Long lifecycleDate = calculateOriginationMillis(indexMetaData); + if (lifecycleDate == null) { + logger.trace("[{}] no index creation or origination date has been set yet", indexMetaData.getIndex().getName()); return true; } - final Long lifecycleDate = originationDate != -1L ? originationDate : lifecycleState.getLifecycleDate(); - assert lifecycleDate != null && lifecycleDate >= 0 : "expected index to have a lifecycle date but it did not"; final TimeValue after = stepRegistry.getIndexAgeForPhase(policy, phase); final long now = nowSupplier.getAsLong(); final TimeValue age = new TimeValue(now - lifecycleDate); @@ -221,19 +239,26 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() { - @Override - public void onResponse(boolean complete) { - logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + @Override + public void onResponse(boolean complete) { + logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); + if (complete) { + if (((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } else { + // Delete needs special handling, because after this step we + // will no longer have access to any information about the + // index since it will be... deleted. + registerDeleteOperation(indexMetaData); + } + } } - } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } - }); + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); } else { logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey()); } @@ -298,6 +323,7 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> { IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerSuccessfulOperation(indexMetaData); if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { maybeRunAsyncAction(clusterState, indexMetaData, policy, newStepKey); } @@ -311,7 +337,10 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey), e); clusterService.submitStateUpdateTask("ilm-move-to-error-step", - new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep)); + new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> { + IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerFailedOperation(indexMetaData, e); + })); } /** @@ -343,4 +372,61 @@ private void markPolicyRetrievalError(String policyName, Index index, LifecycleE setStepInfo(index, policyName, LifecycleExecutionState.getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e)); } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * successfully into this new state using the {@link ILMHistoryStore} + */ + void registerSuccessfulOperation(IndexMetaData indexMetaData) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData))); + } + + /** + * For the given index metadata, register (index a document) that the index + * has been deleted by ILM using the {@link ILMHistoryStore} + */ + void registerDeleteOperation(IndexMetaData metadataBeforeDeletion) { + if (metadataBeforeDeletion == null) { + throw new IllegalStateException("cannot register deletion of an index that did not previously exist"); + } + Long origination = calculateOriginationMillis(metadataBeforeDeletion); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(metadataBeforeDeletion.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(metadataBeforeDeletion.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.builder(LifecycleExecutionState.fromIndexMetadata(metadataBeforeDeletion)) + // Register that the delete phase is now "complete" + .setStep(PhaseCompleteStep.NAME) + .build())); + } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * into the ERROR state using the {@link ILMHistoryStore} + */ + void registerFailedOperation(IndexMetaData indexMetaData, Exception failure) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.failure(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData), + failure)); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index f116f9de08743..a5ae4d4673a77 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.io.Closeable; import java.time.Clock; @@ -59,21 +60,24 @@ public class IndexLifecycleService private final Clock clock; private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; + private final ILMHistoryStore ilmHistoryStore; private final Settings settings; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, - LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { + LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry, + ILMHistoryStore ilmHistoryStore) { super(); this.settings = settings; this.clusterService = clusterService; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; + this.ilmHistoryStore = ilmHistoryStore; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); + this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); clusterService.addListener(this); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java index 1b80e070c5552..ae05d21021339 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.LongSupplier; public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { @@ -24,17 +25,20 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { private final String policy; private final Step.StepKey currentStepKey; private final BiFunction stepLookupFunction; + private final Consumer stateChangeConsumer; private LongSupplier nowSupplier; private Exception cause; public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier, - BiFunction stepLookupFunction) { + BiFunction stepLookupFunction, + Consumer stateChangeConsumer) { this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; this.cause = cause; this.nowSupplier = nowSupplier; this.stepLookupFunction = stepLookupFunction; + this.stateChangeConsumer = stateChangeConsumer; } Index getIndex() { @@ -73,6 +77,13 @@ public ClusterState execute(ClusterState currentState) throws IOException { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (newState.equals(oldState) == false) { + stateChangeConsumer.accept(newState); + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java new file mode 100644 index 0000000000000..ba01d672e3f5e --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; + +/** + * The {@link ILMHistoryItem} class encapsulates the state of an index at a point in time. It should + * be constructed when an index has transitioned into a new step. Construction is done through the + * {@link #success(String, String, long, Long, LifecycleExecutionState)} and + * {@link #failure(String, String, long, Long, LifecycleExecutionState, Exception)} methods. + */ +public class ILMHistoryItem implements ToXContentObject { + private static final ParseField INDEX = new ParseField("index"); + private static final ParseField POLICY = new ParseField("policy"); + private static final ParseField TIMESTAMP = new ParseField("@timestamp"); + private static final ParseField INDEX_AGE = new ParseField("index_age"); + private static final ParseField SUCCESS = new ParseField("success"); + private static final ParseField EXECUTION_STATE = new ParseField("state"); + private static final ParseField ERROR = new ParseField("error_details"); + + private final String index; + private final String policyId; + private final long timestamp; + @Nullable + private final Long indexAge; + private final boolean success; + @Nullable + private final LifecycleExecutionState executionState; + @Nullable + private final String errorDetails; + + private ILMHistoryItem(String index, String policyId, long timestamp, @Nullable Long indexAge, boolean success, + @Nullable LifecycleExecutionState executionState, @Nullable String errorDetails) { + this.index = index; + this.policyId = policyId; + this.timestamp = timestamp; + this.indexAge = indexAge; + this.success = success; + this.executionState = executionState; + this.errorDetails = errorDetails; + } + + public static ILMHistoryItem success(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState) { + return new ILMHistoryItem(index, policyId, timestamp, indexAge, true, executionState, null); + } + + public static ILMHistoryItem failure(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState, Exception error) { + Objects.requireNonNull(error, "ILM failures require an attached exception"); + return new ILMHistoryItem(index, policyId, timestamp, indexAge, false, executionState, exceptionToString(error)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(INDEX.getPreferredName(), index); + builder.field(POLICY.getPreferredName(), policyId); + builder.field(TIMESTAMP.getPreferredName(), timestamp); + if (indexAge != null) { + builder.field(INDEX_AGE.getPreferredName(), indexAge); + } + builder.field(SUCCESS.getPreferredName(), success); + if (executionState != null) { + builder.field(EXECUTION_STATE.getPreferredName(), executionState.asMap()); + } + if (errorDetails != null) { + builder.field(ERROR.getPreferredName(), errorDetails); + } + builder.endObject(); + return builder; + } + + private static String exceptionToString(Exception exception) { + Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } catch (IOException e) { + // In the unlikely case that we cannot generate an exception string, + // try the best way can to encapsulate the error(s) with at least + // the message + exceptionString = "unable to generate exception string: " + e.getMessage() + "; original exception: " + exception.getMessage(); + } + return exceptionString; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java new file mode 100644 index 0000000000000..ab8168de4b28d --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry.INDEX_TEMPLATE_VERSION; + +/** + * The {@link ILMHistoryStore} handles indexing {@link ILMHistoryItem} documents into the + * appropriate index. It sets up a {@link BulkProcessor} for indexing in bulk, and handles creation + * of the index/alias as needed for ILM policies. + */ +public class ILMHistoryStore implements Closeable { + private static final Logger logger = LogManager.getLogger(ILMHistoryStore.class); + + public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; + public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; + + private final Client client; + private final ClusterService clusterService; + private final boolean ilmHistoryEnabled; + private final BulkProcessor processor; + + public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + + this.processor = BulkProcessor.builder( + new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + long items = request.numberOfActions(); + logger.trace("indexed [{}] items into ILM history index", items); + if (response.hasFailures()) { + Map failures = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed) + .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); + logger.error("failures: [{}]", failures); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + long items = request.numberOfActions(); + logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure); + } + }) + .setBulkActions(100) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(1) + .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3)) + .build(); + } + + /** + * Attempts to asynchronously index an ILM history entry + */ + public void putAsync(ILMHistoryItem item) { + if (ilmHistoryEnabled == false) { + logger.trace("not recording ILM history item because [{}] is [false]: [{}]", + LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); + return; + } + logger.trace("about to index ILM history item in index [{}]: [{}]", ILM_HISTORY_ALIAS, item); + ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); + processor.add(request); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to index ILM history item in index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), exception); + } + }, ex -> logger.error(new ParameterizedMessage("failed to ensure ILM history index exists, not indexing history item [{}]", + item), ex))); + } + + /** + * Checks if the ILM history index exists, and if not, creates it. + * + * @param client The client to use to create the index if needed + * @param state The current cluster state, to determine if the alias exists + * @param listener Called after the index has been created. `onResponse` called with `true` if the index was created, + * `false` if it already existed. + */ + static void ensureHistoryIndex(Client client, ClusterState state, ActionListener listener) { + final String initialHistoryIndexName = ILM_HISTORY_INDEX_PREFIX + "000001"; + final AliasOrIndex ilmHistory = state.metaData().getAliasAndIndexLookup().get(ILM_HISTORY_ALIAS); + final AliasOrIndex initialHistoryIndex = state.metaData().getAliasAndIndexLookup().get(initialHistoryIndexName); + + if (ilmHistory == null && initialHistoryIndex == null) { + // No alias or index exists with the expected names, so create the index with appropriate alias + client.admin().indices().prepareCreate(initialHistoryIndexName) + .setWaitForActiveShards(1) + .addAlias(new Alias(ILM_HISTORY_ALIAS) + .writeIndex(true)) + .execute(new ActionListener() { + @Override + public void onResponse(CreateIndexResponse response) { + listener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + // The index didn't exist before we made the call, there was probably a race - just ignore this + logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call", + initialHistoryIndexName); + listener.onResponse(false); + } else { + listener.onFailure(e); + } + } + }); + } else if (ilmHistory == null) { + // alias does not exist but initial index does, something is broken + listener.onFailure(new IllegalStateException("ILM history index [" + initialHistoryIndexName + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + } else if (ilmHistory.isAlias() && ilmHistory instanceof AliasOrIndex.Alias) { + if (((AliasOrIndex.Alias) ilmHistory).getWriteIndex() != null) { + // The alias exists and has a write index, so we're good + listener.onResponse(false); + } else { + // The alias does not have a write index, so we can't index into it + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + "does not have a write index")); + } + } else if (ilmHistory.isAlias() == false) { + // This is not an alias, error out + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + } else { + logger.error("unexpected IndexOrAlias for [{}]: [{}]", ILM_HISTORY_ALIAS, ilmHistory); + assert false : ILM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously"; + } + } + + @Override + public void close() { + try { + processor.awaitClose(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java new file mode 100644 index 0000000000000..21b2d16afdc83 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; + +import java.util.Collections; +import java.util.List; + +/** + * The {@link ILMHistoryTemplateRegistry} class sets up and configures an ILM policy and index + * template for the ILM history indices (ilm-history-N-00000M). + */ +public class ILMHistoryTemplateRegistry extends IndexTemplateRegistry { + // history (please add a comment why you increased the version here) + // version 1: initial + public static final String INDEX_TEMPLATE_VERSION = "1"; + + public static final String ILM_TEMPLATE_VERSION_VARIABLE = "xpack.ilm_history.template.version"; + public static final String ILM_TEMPLATE_NAME = "ilm-history"; + + public static final String ILM_POLICY_NAME = "ilm-history-ilm-policy"; + + public static final IndexTemplateConfig TEMPLATE_ILM_HISTORY = new IndexTemplateConfig( + ILM_TEMPLATE_NAME, + "/ilm-history.json", + INDEX_TEMPLATE_VERSION, + ILM_TEMPLATE_VERSION_VARIABLE + ); + + public static final LifecyclePolicyConfig ILM_HISTORY_POLICY = new LifecyclePolicyConfig( + ILM_POLICY_NAME, + "/ilm-history-ilm-policy.json" + ); + + private final boolean ilmHistoryEnabled; + + public ILMHistoryTemplateRegistry(Settings nodeSettings, ClusterService clusterService, + ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + this.ilmHistoryEnabled = LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + } + + @Override + protected List getTemplateConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(TEMPLATE_ILM_HISTORY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected List getPolicyConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(ILM_HISTORY_POLICY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected String getOrigin() { + return ClientHelper.INDEX_LIFECYCLE_ORIGIN; + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java index b49d0870bdaa3..22f40913997a3 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java @@ -106,7 +106,8 @@ protected Settings nodeSettings(int nodeOrdinal) { settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); - // This is necessary to prevent SLM installing a lifecycle policy, these tests assume a blank slate + // This is necessary to prevent ILM and SLM installing a lifecycle policy, these tests assume a blank slate + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return settings.build(); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 2580e2970e521..130a77cf853dd 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -55,6 +55,8 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -91,6 +93,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private static final NamedXContentRegistry REGISTRY; private ThreadPool threadPool; + private Client noopClient; + private NoOpHistoryStore historyStore; static { try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { @@ -100,12 +104,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } @Before - public void prepareThreadPool() { + public void prepare() { threadPool = new TestThreadPool("test"); + noopClient = new NoOpClient(threadPool); + historyStore = new NoOpHistoryStore(); } @After public void shutdown() { + historyStore.close(); + noopClient.close(); threadPool.shutdownNow(); } @@ -114,7 +122,7 @@ public void testRunPolicyTerminalPolicyStep() { TerminalPolicyStep step = TerminalPolicyStep.INSTANCE; PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -136,7 +144,7 @@ public void testRunPolicyErrorStep() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(false); @@ -176,7 +184,7 @@ public void testRunPolicyErrorStepOnRetryableFailedStep() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(true); @@ -221,7 +229,7 @@ public void testRunStateChangePolicyWithNoNextStep() throws Exception { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -282,7 +290,8 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -303,6 +312,14 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":" + stepTime + + ",\"success\":true,\"state\":{\"phase\":\"phase\",\"action\":\"action\"," + + "\"step\":\"next_cluster_state_action_step\",\"step_time\":\"" + stepTime + "\"}}")); } public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { @@ -357,7 +374,8 @@ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); if (asyncAction) { @@ -409,7 +427,7 @@ public void testRunAsyncActionDoesNotRun() { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); // State changes should not run AsyncAction steps @@ -468,7 +486,7 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -488,6 +506,13 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":0,\"success\":true," + + "\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}")); } public void testRunPeriodicStep() throws Exception { @@ -535,7 +560,7 @@ public void testRunPeriodicStep() throws Exception { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -558,7 +583,7 @@ public void testRunPolicyClusterStateActionStep() { MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -576,7 +601,7 @@ public void testRunPolicyClusterStateWaitStep() { step.setWillComplete(true); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -595,7 +620,7 @@ public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -613,7 +638,7 @@ public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -627,7 +652,7 @@ public void testRunPolicyThatDoesntExist() { String policyName = "cluster_state_action_policy"; ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null), - clusterService, threadPool, () -> 0L); + historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown @@ -712,7 +737,8 @@ public void testIsReadyToTransition() { stepMap, NamedXContentRegistry.EMPTY, null); ClusterService clusterService = mock(ClusterService.class); final AtomicLong now = new AtomicLong(5); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, threadPool, now::get); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, historyStore, + clusterService, threadPool, now::get); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) @@ -1086,4 +1112,23 @@ public static MockPolicyStepsRegistry createMultiStepPolicyStepRegistry(String p when(client.settings()).thenReturn(Settings.EMPTY); return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); } + + private class NoOpHistoryStore extends ILMHistoryStore { + + private final List items = new ArrayList<>(); + + NoOpHistoryStore() { + super(Settings.EMPTY, noopClient, null); + } + + public List getItems() { + return items; + } + + @Override + public void putAsync(ILMHistoryItem item) { + logger.info("--> adding ILM history item: [{}]", item); + items.add(item); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index a7f15419d3718..2bf054d318fb5 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -108,7 +108,7 @@ public void prepareServices() { threadPool = new TestThreadPool("test"); indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool, - clock, () -> now, null); + clock, () -> now, null, null); Mockito.verify(clusterService).addListener(indexLifecycleService); Mockito.verify(clusterService).addStateApplier(indexLifecycleService); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java index fa2a626b0f9bc..39b3fca46c1f1 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java @@ -75,7 +75,7 @@ public void testExecuteSuccessfullyMoved() throws IOException { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey)); + (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey), state -> {}); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); @@ -101,7 +101,7 @@ public void testExecuteNoopDifferentStep() throws IOException { Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE"); setStateToKey(notCurrentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -113,7 +113,7 @@ public void testExecuteNoopDifferentPolicy() throws IOException { setStateToKey(currentStepKey); setStatePolicy("not-" + policy); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -126,7 +126,7 @@ public void testOnFailure() { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java new file mode 100644 index 0000000000000..2e3f76c6370e7 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class ILMHistoryItemTests extends ESTestCase { + + public void testToXContent() throws IOException { + ILMHistoryItem success = ILMHistoryItem.success("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{}") + .setStepInfo("{\"step_info\": \"foo\"") + .build()); + + ILMHistoryItem failure = ILMHistoryItem.failure("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("ERROR") + .setFailedStep("step") + .setFailedStepRetryCount(7) + .setIsAutoRetryableError(true) + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{\"phase_json\": \"eggplant\"}") + .setStepInfo("{\"step_info\": \"foo\"") + .build(), + new IllegalArgumentException("failure")); + + try (XContentBuilder builder = jsonBuilder()) { + success.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, equalTo("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":true," + + "\"state\":{\"phase\":\"phase\"," + + "\"phase_definition\":\"{}\"," + + "\"action_time\":\"20\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\",\"action\":\"action\",\"step\":\"step\",\"step_time\":\"30\"}}" + )); + } + + try (XContentBuilder builder = jsonBuilder()) { + failure.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, startsWith("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":false," + + "\"state\":{\"phase\":\"phase\"," + + "\"failed_step\":\"step\"," + + "\"phase_definition\":\"{\\\"phase_json\\\": \\\"eggplant\\\"}\"," + + "\"action_time\":\"20\"," + + "\"is_auto_retryable_error\":\"true\"," + + "\"failed_step_retry_count\":\"7\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\"," + + "\"action\":\"action\"," + + "\"step\":\"ERROR\"," + + "\"step_time\":\"30\"}," + + "\"error_details\":\"{\\\"type\\\":\\\"illegal_argument_exception\\\"," + + "\\\"reason\\\":\\\"failure\\\"," + + "\\\"stack_trace\\\":\\\"java.lang.IllegalArgumentException: failure")); + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java new file mode 100644 index 0000000000000..3e6b9a638737c --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -0,0 +1,398 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexAction; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriFunction; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_ALIAS; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_INDEX_PREFIX; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class ILMHistoryStoreTests extends ESTestCase { + + private ThreadPool threadPool; + private VerifyingClient client; + private ClusterService clusterService; + private ILMHistoryStore historyStore; + + @Before + public void setup() { + threadPool = new TestThreadPool(this.getClass().getName()); + client = new VerifyingClient(threadPool); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService); + } + + @After + public void setdown() { + historyStore.close(); + clusterService.close(); + client.close(); + threadPool.shutdownNow(); + } + + public void testNoActionIfDisabled() throws Exception { + Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); + try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null)) { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null); + + CountDownLatch latch = new CountDownLatch(1); + client.setVerifier((a, r, l) -> { + fail("the history store is disabled, no action should have been taken"); + latch.countDown(); + return null; + }); + disabledHistoryStore.putAsync(record); + latch.await(10, TimeUnit.SECONDS); + } + } + + @SuppressWarnings("unchecked") + public void testPut() throws Exception { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + { + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build()); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> assertEquals(ILM_HISTORY_ALIAS, dwr.index())); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId("index", "uuid", 0), randomAlphaOfLength(10), 1, 1, 1, true))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + { + final String cause = randomAlphaOfLength(9); + Exception failureException = new RuntimeException(cause); + ILMHistoryItem record = ILMHistoryItem.failure("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build(), failureException); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> { + assertEquals(ILM_HISTORY_ALIAS, dwr.index()); + assertThat(dwr, instanceOf(IndexRequest.class)); + IndexRequest ir = (IndexRequest) dwr; + String indexedDocument = ir.source().utf8ToString(); + assertThat(indexedDocument, Matchers.containsString("runtime_exception")); + assertThat(indexedDocument, Matchers.containsString(cause)); + }); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses with failures + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", i + "", failureException))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + } + + public void testHistoryIndexNeedsCreation() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + return new CreateIndexResponse(true, true, request.index()); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertTrue, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexProperlyExistsAlready() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .writeIndex(true) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexHasNoWriteIndex() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build())) + .put(IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "does not have a write index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexNotAlias() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_ALIAS) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexCreatedConcurrently() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + throw new ResourceAlreadyExistsException("that index already exists"); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryAliasDoesntExistButIndexDoes() throws InterruptedException { + final String initialIndex = ILM_HISTORY_INDEX_PREFIX + "000001"; + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(initialIndex) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + response -> { + logger.error(response); + fail("should have called onFailure, not onResponse"); + }, + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history index [" + initialIndex + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void assertContainsMap(String indexedDocument, Map map) { + map.forEach((k, v) -> { + assertThat(indexedDocument, Matchers.containsString(k)); + if (v instanceof Map) { + assertContainsMap(indexedDocument, (Map) v); + } + if (v instanceof Iterable) { + ((Iterable) v).forEach(elem -> { + assertThat(indexedDocument, Matchers.containsString(elem.toString())); + }); + } else { + assertThat(indexedDocument, Matchers.containsString(v.toString())); + } + }); + } + + /** + * A client that delegates to a verifying function for action/request/listener + */ + public static class VerifyingClient extends NoOpClient { + + private TriFunction, ActionRequest, ActionListener, ActionResponse> verifier = (a, r, l) -> { + fail("verifier not set"); + return null; + }; + + VerifyingClient(ThreadPool threadPool) { + super(threadPool); + } + + @Override + @SuppressWarnings("unchecked") + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + try { + listener.onResponse((Response) verifier.apply(action, request, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + VerifyingClient setVerifier(TriFunction, ActionRequest, ActionListener, ActionResponse> verifier) { + this.verifier = verifier; + return this; + } + } +} From 3b272dee78220420fd169d956cb38be18d5bf582 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 17 Dec 2019 12:13:54 -0700 Subject: [PATCH 2/9] Narrow scope of docs test requests to ignore ILM history index --- docs/plugins/analysis-icu.asciidoc | 2 +- docs/reference/indices/rollover-index.asciidoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/plugins/analysis-icu.asciidoc b/docs/plugins/analysis-icu.asciidoc index 19a8625f60403..d0b95ae7f497b 100644 --- a/docs/plugins/analysis-icu.asciidoc +++ b/docs/plugins/analysis-icu.asciidoc @@ -368,7 +368,7 @@ PUT my_index } } -GET _search <3> +GET /my_index/_search <3> { "query": { "match": { diff --git a/docs/reference/indices/rollover-index.asciidoc b/docs/reference/indices/rollover-index.asciidoc index c4c7fb980bebb..b75961a52559e 100644 --- a/docs/reference/indices/rollover-index.asciidoc +++ b/docs/reference/indices/rollover-index.asciidoc @@ -428,7 +428,7 @@ PUT logs/_doc/2 <2> ////////////////////////// [source,console] -------------------------------------------------- -GET _alias +GET my_logs_index-000001,my_logs_index-000002/_alias -------------------------------------------------- // TEST[continued] ////////////////////////// From 428332384830e0de935c95e81d25a56bf6a85a17 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 17 Dec 2019 12:40:25 -0700 Subject: [PATCH 3/9] Disable ILM history index for watcher integration tests --- .../xpack/watcher/test/AbstractWatcherIntegrationTestCase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 82a9cffc35a9d..0c1377ac79767 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -116,6 +116,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // watcher settings that should work despite randomization .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) + .put("index.lifecycle.history_index_enabled", false) .build(); } From 48da5a95b757b9a7cfdd44640ac2a5f948e78352 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 17 Dec 2019 13:06:12 -0700 Subject: [PATCH 4/9] Disable ILM history for monitoring integration tests (these tests check the exact number of indices) --- .../xpack/monitoring/test/MonitoringIntegTestCase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java index 22f3b59220998..8270f97d7c01b 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java @@ -54,6 +54,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) // .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) // we do this by default in core, but for monitoring this isn't needed and only adds noise. + .put("index.lifecycle.history_index_enabled", false) .put("index.store.mock.check_index_on_close", false); return builder.build(); From 75a12970a871d02a3485e77b2e11ec3b0acabce4 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 17 Dec 2019 14:09:56 -0700 Subject: [PATCH 5/9] Disable ILM history for docs tests --- docs/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/build.gradle b/docs/build.gradle index a39c7e0d9f3bd..5d9b8ae0a73e8 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -60,6 +60,7 @@ testClusters.integTest { extraConfigFile 'hunspell/en_US/en_US.dic', project(":server").file('src/test/resources/indices/analyze/conf_dir/hunspell/en_US/en_US.dic') // Whitelist reindexing from the local node so we can test it. setting 'reindex.remote.whitelist', '127.0.0.1:*' + setting 'index.lifecycle.history_index_enabled', 'false' // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0 systemProperty 'es.transport.cname_in_publish_address', 'true' From 1d4c6a451f068d3f26c5797b1ff4498627038b1b Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 17 Dec 2019 16:40:31 -0700 Subject: [PATCH 6/9] Push additional logging --- x-pack/plugin/ilm/qa/multi-node/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 1fd9ccd079726..b27285e9b70c4 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -25,4 +25,6 @@ testClusters.integTest { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'logger.org.elasticsearch.xpack.core.ilm', 'TRACE' + setting 'logger.org.elasticsearch.xpack.ilm', 'TRACE' } From 788234e36fc8cf6a5d524502f95380ec703762d8 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 18 Dec 2019 12:42:32 -0700 Subject: [PATCH 7/9] Formatting changes --- .../elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java | 4 ++-- .../org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 706de403c31be..4709e0c82f034 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -1026,7 +1026,7 @@ public void testHistoryIsWrittenWithSuccess() throws Exception { true); // Index a document - index(client(), index +"-1", "1", "foo", "bar"); + index(client(), index + "-1", "1", "foo", "bar"); Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); client().performRequest(refreshIndex); @@ -1067,7 +1067,7 @@ public void testHistoryIsWrittenWithFailure() throws Exception { false); // Index a document - index(client(), index +"-1", "1", "foo", "bar"); + index(client(), index + "-1", "1", "foo", "bar"); Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); client().performRequest(refreshIndex); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java index ba01d672e3f5e..e972613e3e901 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java @@ -102,7 +102,8 @@ private static String exceptionToString(Exception exception) { // In the unlikely case that we cannot generate an exception string, // try the best way can to encapsulate the error(s) with at least // the message - exceptionString = "unable to generate exception string: " + e.getMessage() + "; original exception: " + exception.getMessage(); + exceptionString = "unable to generate the ILM error details due to: " + e.getMessage() + + "; the ILM error was: " + exception.getMessage(); } return exceptionString; } From 3721c194ba0bb955a4cd844de640a0565e4b967a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 18 Dec 2019 13:53:36 -0700 Subject: [PATCH 8/9] Disable ILM history for HLRC tests --- client/rest-high-level/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/client/rest-high-level/build.gradle b/client/rest-high-level/build.gradle index d1f63d837fdce..1df697cbc5f6e 100644 --- a/client/rest-high-level/build.gradle +++ b/client/rest-high-level/build.gradle @@ -138,6 +138,7 @@ testClusters.all { setting 'xpack.security.authc.realms.pki.pki1.delegation.enabled', 'true' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'index.lifecycle.history_index_enabled', 'false' keystore 'xpack.security.transport.ssl.truststore.secure_password', 'testnode' extraConfigFile 'roles.yml', file('roles.yml') user username: System.getProperty('tests.rest.cluster.username', 'test_user'), From 5b8caff38f217798cc6a1553946846233c966af1 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 18 Dec 2019 14:43:55 -0700 Subject: [PATCH 9/9] Increase timeout for slow CI machines --- .../ilm/TimeSeriesLifecycleActionsIT.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 4709e0c82f034..300f987734c75 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -1030,20 +1030,20 @@ public void testHistoryIsWrittenWithSuccess() throws Exception { Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); client().performRequest(refreshIndex); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete")); - assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed")); - - assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready")); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed"), 30, TimeUnit.SECONDS); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); } public void testHistoryIsWrittenWithFailure() throws Exception { @@ -1071,9 +1071,7 @@ public void testHistoryIsWrittenWithFailure() throws Exception { Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); client().performRequest(refreshIndex); - assertBusy(() -> { - assertHistoryIsPresent(policy, index + "-1", false, "ERROR"); - }); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", false, "ERROR"), 30, TimeUnit.SECONDS); } public void testHistoryIsWrittenWithDeletion() throws Exception { @@ -1104,7 +1102,7 @@ public void testHistoryIsWrittenWithDeletion() throws Exception { assertBusy(() -> { assertHistoryIsPresent(policy, index, true, "delete", "delete", "wait-for-shard-history-leases"); assertHistoryIsPresent(policy, index, true, "delete", "delete", "complete"); - }); + }, 30, TimeUnit.SECONDS); } // This method should be called inside an assertBusy, it has no retry logic of its own