From c3c9ccf61f8617c9b29e271385cabc74a6868ddf Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 20 Dec 2019 12:33:36 -0700 Subject: [PATCH] [7.x] Add ILM histore store index (#50287) (#50345) * Add ILM histore store index (#50287) * Add ILM histore store index This commit adds an ILM history store that tracks the lifecycle execution state as an index progresses through its ILM policy. ILM history documents store output similar to what the ILM explain API returns. An example document with ALL fields (not all documents will have all fields) would look like: ```json { "@timestamp": 1203012389, "policy": "my-ilm-policy", "index": "index-2019.1.1-000023", "index_age":123120, "success": true, "state": { "phase": "warm", "action": "allocate", "step": "ERROR", "failed_step": "update-settings", "is_auto-retryable_error": true, "creation_date": 12389012039, "phase_time": 12908389120, "action_time": 1283901209, "step_time": 123904107140, "phase_definition": "{\"policy\":\"ilm-history-ilm-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_size\":\"50gb\",\"max_age\":\"30d\"}}},\"version\":1,\"modified_date_in_millis\":1576517253463}", "step_info": "{... etc step info here as json ...}" }, "error_details": "java.lang.RuntimeException: etc\n\tcaused by:etc etc etc full stacktrace" } ``` These documents go into the `ilm-history-1-00000N` index to provide an audit trail of the operations ILM has performed. This history storage is enabled by default but can be disabled by setting `index.lifecycle.history_index_enabled` to `false.` Resolves #49180 * Make ILMHistoryStore.putAsync truly async (#50403) This moves the `putAsync` method in `ILMHistoryStore` never to block. Previously due to the way that the `BulkProcessor` works, it was possible for `BulkProcessor#add` to block executing a bulk request. This was bad as we may be adding things to the history store in cluster state update threads. This also moves the index creation to be done prior to the bulk request execution, rather than being checked every time an operation was added to the queue. This lessens the chance of the index being created, then deleted (by some external force), and then recreated via a bulk indexing request. Resolves #50353 --- client/rest-high-level/build.gradle | 1 + docs/build.gradle | 1 + docs/plugins/analysis-icu.asciidoc | 2 +- .../reference/indices/rollover-index.asciidoc | 2 +- docs/reference/settings/ilm-settings.asciidoc | 5 + .../test/rest/ESRestTestCase.java | 24 +- .../xpack/core/ilm/LifecycleSettings.java | 3 + .../resources/ilm-history-ilm-policy.json | 18 + .../core/src/main/resources/ilm-history.json | 83 ++++ x-pack/plugin/ilm/qa/multi-node/build.gradle | 2 + .../ilm/TimeSeriesLifecycleActionsIT.java | 172 +++++++- .../xpack/ilm/ExecuteStepsUpdateTask.java | 52 ++- .../xpack/ilm/IndexLifecycle.java | 15 +- .../xpack/ilm/IndexLifecycleRunner.java | 128 +++++- .../xpack/ilm/IndexLifecycleService.java | 8 +- .../xpack/ilm/MoveToErrorStepUpdateTask.java | 13 +- .../xpack/ilm/history/ILMHistoryItem.java | 115 +++++ .../xpack/ilm/history/ILMHistoryStore.java | 226 ++++++++++ .../history/ILMHistoryTemplateRegistry.java | 80 ++++ .../IndexLifecycleInitialisationTests.java | 3 +- .../xpack/ilm/IndexLifecycleRunnerTests.java | 77 +++- .../xpack/ilm/IndexLifecycleServiceTests.java | 2 +- .../ilm/MoveToErrorStepUpdateTaskTests.java | 8 +- .../ilm/history/ILMHistoryItemTests.java | 92 ++++ .../ilm/history/ILMHistoryStoreTests.java | 398 ++++++++++++++++++ .../slm/SLMSnapshotBlockingIntegTests.java | 2 + .../test/MonitoringIntegTestCase.java | 1 + .../AbstractWatcherIntegrationTestCase.java | 1 + 28 files changed, 1459 insertions(+), 75 deletions(-) create mode 100644 x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json create mode 100644 x-pack/plugin/core/src/main/resources/ilm-history.json create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java diff --git a/client/rest-high-level/build.gradle b/client/rest-high-level/build.gradle index 82c15c0c54414..55f1f9e56c98e 100644 --- a/client/rest-high-level/build.gradle +++ b/client/rest-high-level/build.gradle @@ -135,6 +135,7 @@ testClusters.all { setting 'xpack.security.authc.realms.pki.pki1.delegation.enabled', 'true' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'index.lifecycle.history_index_enabled', 'false' keystore 'xpack.security.transport.ssl.truststore.secure_password', 'testnode' extraConfigFile 'roles.yml', file('roles.yml') user username: System.getProperty('tests.rest.cluster.username', 'test_user'), diff --git a/docs/build.gradle b/docs/build.gradle index f88d68d65f10e..fece645fa274c 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -59,6 +59,7 @@ testClusters.integTest { extraConfigFile 'hunspell/en_US/en_US.dic', project(":server").file('src/test/resources/indices/analyze/conf_dir/hunspell/en_US/en_US.dic') // Whitelist reindexing from the local node so we can test it. setting 'reindex.remote.whitelist', '127.0.0.1:*' + setting 'index.lifecycle.history_index_enabled', 'false' // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0 systemProperty 'es.transport.cname_in_publish_address', 'true' diff --git a/docs/plugins/analysis-icu.asciidoc b/docs/plugins/analysis-icu.asciidoc index 19a8625f60403..d0b95ae7f497b 100644 --- a/docs/plugins/analysis-icu.asciidoc +++ b/docs/plugins/analysis-icu.asciidoc @@ -368,7 +368,7 @@ PUT my_index } } -GET _search <3> +GET /my_index/_search <3> { "query": { "match": { diff --git a/docs/reference/indices/rollover-index.asciidoc b/docs/reference/indices/rollover-index.asciidoc index cbc0dfc081ee7..46a1ae10f59f3 100644 --- a/docs/reference/indices/rollover-index.asciidoc +++ b/docs/reference/indices/rollover-index.asciidoc @@ -431,7 +431,7 @@ PUT logs/_doc/2 <2> ////////////////////////// [source,console] -------------------------------------------------- -GET _alias +GET my_logs_index-000001,my_logs_index-000002/_alias -------------------------------------------------- // TEST[continued] ////////////////////////// diff --git a/docs/reference/settings/ilm-settings.asciidoc b/docs/reference/settings/ilm-settings.asciidoc index 8781581a6e57d..946848017bf7b 100644 --- a/docs/reference/settings/ilm-settings.asciidoc +++ b/docs/reference/settings/ilm-settings.asciidoc @@ -14,6 +14,11 @@ ILM REST API endpoints and functionality. Defaults to `true`. (<>) How often {ilm} checks for indices that meet policy criteria. Defaults to `10m`. +`index.lifecycle.history_index_enabled`:: +Whether ILM's history index is enabled. If enabled, ILM will record the +history of actions taken as part of ILM policies to the `ilm-history-*` +indices. Defaults to `true`. + ==== Index level settings These index-level {ilm-init} settings are typically configured through index templates. For more information, see <>. diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 677d7c22cea11..5468451066d7f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -75,6 +75,7 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -472,6 +473,13 @@ protected boolean preserveILMPoliciesUponCompletion() { return false; } + /** + * A set of ILM policies that should be preserved between runs. + */ + protected Set preserveILMPolicyIds() { + return Collections.singleton("ilm-history-ilm-policy"); + } + /** * Returns whether to preserve auto-follow patterns. Defaults to not * preserving them. Only runs at all if xpack is installed on the cluster @@ -560,7 +568,7 @@ private void wipeCluster() throws Exception { } if (hasXPack && false == preserveILMPoliciesUponCompletion()) { - deleteAllILMPolicies(); + deleteAllILMPolicies(preserveILMPolicyIds()); } if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) { @@ -686,7 +694,7 @@ private void waitForPendingRollupTasks() throws Exception { waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); } - private static void deleteAllILMPolicies() throws IOException { + private static void deleteAllILMPolicies(Set exclusions) throws IOException { Map policies; try { @@ -704,9 +712,15 @@ private static void deleteAllILMPolicies() throws IOException { return; } - for (String policyName : policies.keySet()) { - adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); - } + policies.keySet().stream() + .filter(p -> exclusions.contains(p) == false) + .forEach(policyName -> { + try { + adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); + } catch (IOException e) { + throw new RuntimeException("failed to delete policy: " + policyName, e); + } + }); } private static void deleteAllSLMPolicies() throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java index 62797eb34a07d..3d186892ec983 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java @@ -19,6 +19,7 @@ public class LifecycleSettings { public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date"; public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date"; + public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled"; public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; @@ -35,6 +36,8 @@ public class LifecycleSettings { Setting.longSetting(LIFECYCLE_ORIGINATION_DATE, -1, -1, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE, false, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED, + true, Setting.Property.NodeScope); public static final Setting SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, diff --git a/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json new file mode 100644 index 0000000000000..febae00bc3608 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json @@ -0,0 +1,18 @@ +{ + "phases": { + "hot": { + "actions": { + "rollover": { + "max_size": "50GB", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/ilm-history.json b/x-pack/plugin/core/src/main/resources/ilm-history.json new file mode 100644 index 0000000000000..ae9c50552b385 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history.json @@ -0,0 +1,83 @@ +{ + "index_patterns": [ + "ilm-history-${xpack.ilm_history.template.version}*" + ], + "order": 2147483647, + "settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "ilm-history-ilm-policy", + "index.lifecycle.rollover_alias": "ilm-history-${xpack.ilm_history.template.version}", + "index.format": 1 + }, + "mappings": { + "_doc": { + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "policy": { + "type": "keyword" + }, + "index": { + "type": "keyword" + }, + "index_age":{ + "type": "long" + }, + "success": { + "type": "boolean" + }, + "state": { + "type": "object", + "dynamic": true, + "properties": { + "phase": { + "type": "keyword" + }, + "action": { + "type": "keyword" + }, + "step": { + "type": "keyword" + }, + "failed_step": { + "type": "keyword" + }, + "is_auto-retryable_error": { + "type": "keyword" + }, + "creation_date": { + "type": "date", + "format": "epoch_millis" + }, + "phase_time": { + "type": "date", + "format": "epoch_millis" + }, + "action_time": { + "type": "date", + "format": "epoch_millis" + }, + "step_time": { + "type": "date", + "format": "epoch_millis" + }, + "phase_definition": { + "type": "text" + }, + "step_info": { + "type": "text" + } + } + }, + "error_details": { + "type": "text" + } + } + } + } +} diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 6703f281b93eb..bb0c247b5c85e 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -27,4 +27,6 @@ testClusters.integTest { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'logger.org.elasticsearch.xpack.core.ilm', 'TRACE' + setting 'logger.org.elasticsearch.xpack.ilm', 'TRACE' } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 169b613f22a2b..682e13b6d37d2 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -8,13 +8,14 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -37,8 +38,10 @@ import org.elasticsearch.xpack.core.ilm.SetPriorityAction; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.hamcrest.Matchers; import org.junit.Before; @@ -1010,6 +1013,171 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { assertBusy(() -> assertTrue(indexExists(thirdIndex))); } + public void testHistoryIsWrittenWithSuccess() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\",\n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed"), 30, TimeUnit.SECONDS); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); + } + + public void testHistoryIsWrittenWithFailure() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + false); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", false, "ERROR"), 30, TimeUnit.SECONDS); + } + + public void testHistoryIsWrittenWithDeletion() throws Exception { + String index = "index"; + + createNewSingletonPolicy("delete", new DeleteAction()); + Request createIndexTemplate = new Request("PUT", "_template/delete_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + // Index should be created and then deleted by ILM + createIndexWithSettings(index, Settings.builder(), false); + + assertBusy(() -> { + logger.info("--> checking for index deletion..."); + Request existCheck = new Request("HEAD", "/" + index); + Response resp = client().performRequest(existCheck); + assertThat(resp.getStatusLine().getStatusCode(), equalTo(404)); + }); + + assertBusy(() -> { + assertHistoryIsPresent(policy, index, true, "delete", "delete", "wait-for-shard-history-leases"); + assertHistoryIsPresent(policy, index, true, "delete", "delete", "complete"); + }, 30, TimeUnit.SECONDS); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException { + assertHistoryIsPresent(policyName, indexName, success, null, null, stepName); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, + @Nullable String phase, @Nullable String action, String stepName) throws IOException { + logger.info("--> checking for history item [{}], [{}], success: [{}], phase: [{}], action: [{}], step: [{}]", + policyName, indexName, success, phase, action, stepName); + final Request historySearchRequest = new Request("GET", "ilm-history*/_search"); + historySearchRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"policy\": \"" + policyName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"success\": " + success + "\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"index\": \"" + indexName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"state.step\": \"" + stepName + "\"\n" + + " }\n" + + " }\n" + + (phase == null ? "" : ",{\"term\": {\"state.phase\": \"" + phase + "\"}}") + + (action == null ? "" : ",{\"term\": {\"state.action\": \"" + action + "\"}}") + + " ]\n" + + " }\n" + + " }\n" + + "}"); + Response historyResponse; + try { + historyResponse = client().performRequest(historySearchRequest); + Map historyResponseMap; + try (InputStream is = historyResponse.getEntity().getContent()) { + historyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + logger.info("--> history response: {}", historyResponseMap); + assertThat((int)((Map) ((Map) historyResponseMap.get("hits")).get("total")).get("value"), + greaterThanOrEqualTo(1)); + } catch (ResponseException e) { + // Throw AssertionError instead of an exception if the search fails so that assertBusy works as expected + logger.error(e); + fail("failed to perform search:" + e.getMessage()); + } + + // Finally, check that the history index is in a good state + Step.StepKey stepKey = getStepKeyForIndex("ilm-history-1-000001"); + assertEquals("hot", stepKey.getPhase()); + assertEquals(RolloverAction.NAME, stepKey.getAction()); + assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); + } + private void createFullPolicy(TimeValue hotTime) throws IOException { Map hotActions = new HashMap<>(); hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index 46364f7cb4021..b97944fe67bf7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -12,10 +12,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.ilm.ErrorStep; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; @@ -29,8 +32,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; private final IndexLifecycleRunner lifecycleRunner; - private LongSupplier nowSupplier; + private final LongSupplier nowSupplier; private Step.StepKey nextStepKey = null; + private Exception failure = null; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { @@ -115,7 +119,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException // wait for the next trigger to evaluate the // condition again logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); ClusterStateWaitStep.Result result; try { result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); @@ -124,22 +128,25 @@ public ClusterState execute(final ClusterState currentState) throws IOException } if (result.isComplete()) { logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - if (currentStep.getNextStepKey() == null) { + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); + if (nextStepKey == null) { return state; } else { state = IndexLifecycleTransition.moveClusterStateToStep(index, state, - currentStep.getNextStepKey(), nowSupplier, policyStepsRegistry,false); + nextStepKey, nowSupplier, policyStepsRegistry,false); } } else { - logger.trace("[{}] condition not met ({}) [{}], returning existing state", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey()); + final ToXContentObject stepInfo = result.getInfomationContext(); + if (logger.isTraceEnabled()) { + logger.trace("[{}] condition not met ({}) [{}], returning existing state (info: {})", + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), + Strings.toString(stepInfo)); + } // We may have executed a step and set "nextStepKey" to // a value, but in this case, since the condition was // not met, we can't advance any way, so don't attempt // to run the current step nextStepKey = null; - ToXContentObject stepInfo = result.getInfomationContext(); if (stepInfo == null) { return state; } else { @@ -169,13 +176,23 @@ public ClusterState execute(final ClusterState currentState) throws IOException public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (oldState.equals(newState) == false) { IndexMetaData indexMetaData = newState.metaData().index(index); - if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { - logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", - index.getName(), startStep.getKey(), nextStepKey); - // After the cluster state has been processed and we have moved - // to a new step, we need to conditionally execute the step iff - // it is an `AsyncAction` so that it is executed exactly once. - lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + if (indexMetaData != null) { + + LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) { + lifecycleRunner.registerFailedOperation(indexMetaData, failure); + } else { + lifecycleRunner.registerSuccessfulOperation(indexMetaData); + } + + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) { + logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", + index.getName(), startStep.getKey(), nextStepKey); + // After the cluster state has been processed and we have moved + // to a new step, we need to conditionally execute the step iff + // it is an `AsyncAction` so that it is executed exactly once. + lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + } } } } @@ -187,10 +204,9 @@ public void onFailure(String source, Exception e) { } private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException { + this.failure = cause; logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey); - MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, - nowSupplier, policyStepsRegistry::getStep); - return moveToErrorStepUpdateTask.execute(state); + return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep); } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 8143f05d77034..947d1752dad65 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -94,6 +94,8 @@ import org.elasticsearch.xpack.ilm.action.TransportRetryAction; import org.elasticsearch.xpack.ilm.action.TransportStartILMAction; import org.elasticsearch.xpack.ilm.action.TransportStopILMAction; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; +import org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry; import org.elasticsearch.xpack.slm.SLMFeatureSet; import org.elasticsearch.xpack.slm.SnapshotLifecycleService; import org.elasticsearch.xpack.slm.SnapshotLifecycleTask; @@ -131,6 +133,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); + private final SetOnce ilmHistoryStore = new SetOnce<>(); private final SetOnce snapshotLifecycleService = new SetOnce<>(); private final SetOnce snapshotRetentionService = new SetOnce<>(); private final SetOnce snapshotHistoryStore = new SetOnce<>(); @@ -172,6 +175,7 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, + LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, @@ -188,8 +192,14 @@ public Collection createComponents(Client client, ClusterService cluster } final List components = new ArrayList<>(); if (ilmEnabled) { + // This registers a cluster state listener, so appears unused but is not. + @SuppressWarnings("unused") + ILMHistoryTemplateRegistry ilmTemplateRegistry = + new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); + ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), + clusterService, threadPool)); indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, - getClock(), System::currentTimeMillis, xContentRegistry)); + getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get())); components.add(indexLifecycleInitialisationService.get()); } if (slmEnabled) { @@ -317,7 +327,8 @@ public void onIndexModule(IndexModule indexModule) { @Override public void close() { try { - IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get()); + IOUtils.close(indexLifecycleInitialisationService.get(), ilmHistoryStore.get(), + snapshotLifecycleService.get(), snapshotRetentionService.get()); } catch (IOException e) { throw new ElasticsearchException("unable to close index lifecycle services", e); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index d8eaf2c53c5c3..15bde507c3a69 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; @@ -23,10 +24,13 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.util.function.LongSupplier; @@ -35,13 +39,15 @@ class IndexLifecycleRunner { private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); private final ThreadPool threadPool; - private PolicyStepsRegistry stepRegistry; - private ClusterService clusterService; - private LongSupplier nowSupplier; + private final ClusterService clusterService; + private final PolicyStepsRegistry stepRegistry; + private final ILMHistoryStore ilmHistoryStore; + private final LongSupplier nowSupplier; - IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, + IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ILMHistoryStore ilmHistoryStore, ClusterService clusterService, ThreadPool threadPool, LongSupplier nowSupplier) { this.stepRegistry = stepRegistry; + this.ilmHistoryStore = ilmHistoryStore; this.clusterService = clusterService; this.nowSupplier = nowSupplier; this.threadPool = threadPool; @@ -62,17 +68,29 @@ static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Inde } /** - * Return true or false depending on whether the index is ready to be in {@code phase} + * Calculate the index's origination time (in milliseconds) based on its + * metadata. Returns null if there is no lifecycle date and the origination + * date is not set. */ - boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + @Nullable + private static Long calculateOriginationMillis(final IndexMetaData indexMetaData) { LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); Long originationDate = indexMetaData.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, -1L); if (lifecycleState.getLifecycleDate() == null && originationDate == -1L) { - logger.trace("no index creation or origination date has been set yet"); + return null; + } + return originationDate == -1L ? lifecycleState.getLifecycleDate() : originationDate; + } + + /** + * Return true or false depending on whether the index is ready to be in {@code phase} + */ + boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + final Long lifecycleDate = calculateOriginationMillis(indexMetaData); + if (lifecycleDate == null) { + logger.trace("[{}] no index creation or origination date has been set yet", indexMetaData.getIndex().getName()); return true; } - final Long lifecycleDate = originationDate != -1L ? originationDate : lifecycleState.getLifecycleDate(); - assert lifecycleDate != null && lifecycleDate >= 0 : "expected index to have a lifecycle date but it did not"; final TimeValue after = stepRegistry.getIndexAgeForPhase(policy, phase); final long now = nowSupplier.getAsLong(); final TimeValue age = new TimeValue(now - lifecycleDate); @@ -221,19 +239,26 @@ void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() { - @Override - public void onResponse(boolean complete) { - logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + @Override + public void onResponse(boolean complete) { + logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); + if (complete) { + if (((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } else { + // Delete needs special handling, because after this step we + // will no longer have access to any information about the + // index since it will be... deleted. + registerDeleteOperation(indexMetaData); + } + } } - } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } - }); + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); } else { logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey()); } @@ -298,6 +323,7 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> { IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerSuccessfulOperation(indexMetaData); if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { maybeRunAsyncAction(clusterState, indexMetaData, policy, newStepKey); } @@ -311,7 +337,10 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey), e); clusterService.submitStateUpdateTask("ilm-move-to-error-step", - new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep)); + new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> { + IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerFailedOperation(indexMetaData, e); + })); } /** @@ -343,4 +372,61 @@ private void markPolicyRetrievalError(String policyName, Index index, LifecycleE setStepInfo(index, policyName, LifecycleExecutionState.getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e)); } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * successfully into this new state using the {@link ILMHistoryStore} + */ + void registerSuccessfulOperation(IndexMetaData indexMetaData) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData))); + } + + /** + * For the given index metadata, register (index a document) that the index + * has been deleted by ILM using the {@link ILMHistoryStore} + */ + void registerDeleteOperation(IndexMetaData metadataBeforeDeletion) { + if (metadataBeforeDeletion == null) { + throw new IllegalStateException("cannot register deletion of an index that did not previously exist"); + } + Long origination = calculateOriginationMillis(metadataBeforeDeletion); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(metadataBeforeDeletion.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(metadataBeforeDeletion.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.builder(LifecycleExecutionState.fromIndexMetadata(metadataBeforeDeletion)) + // Register that the delete phase is now "complete" + .setStep(PhaseCompleteStep.NAME) + .build())); + } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * into the ERROR state using the {@link ILMHistoryStore} + */ + void registerFailedOperation(IndexMetaData indexMetaData, Exception failure) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.failure(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData), + failure)); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index f116f9de08743..a5ae4d4673a77 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.io.Closeable; import java.time.Clock; @@ -59,21 +60,24 @@ public class IndexLifecycleService private final Clock clock; private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; + private final ILMHistoryStore ilmHistoryStore; private final Settings settings; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, - LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { + LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry, + ILMHistoryStore ilmHistoryStore) { super(); this.settings = settings; this.clusterService = clusterService; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; + this.ilmHistoryStore = ilmHistoryStore; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); + this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); clusterService.addListener(this); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java index 1b80e070c5552..ae05d21021339 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.LongSupplier; public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { @@ -24,17 +25,20 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { private final String policy; private final Step.StepKey currentStepKey; private final BiFunction stepLookupFunction; + private final Consumer stateChangeConsumer; private LongSupplier nowSupplier; private Exception cause; public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier, - BiFunction stepLookupFunction) { + BiFunction stepLookupFunction, + Consumer stateChangeConsumer) { this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; this.cause = cause; this.nowSupplier = nowSupplier; this.stepLookupFunction = stepLookupFunction; + this.stateChangeConsumer = stateChangeConsumer; } Index getIndex() { @@ -73,6 +77,13 @@ public ClusterState execute(ClusterState currentState) throws IOException { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (newState.equals(oldState) == false) { + stateChangeConsumer.accept(newState); + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java new file mode 100644 index 0000000000000..e972613e3e901 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; + +/** + * The {@link ILMHistoryItem} class encapsulates the state of an index at a point in time. It should + * be constructed when an index has transitioned into a new step. Construction is done through the + * {@link #success(String, String, long, Long, LifecycleExecutionState)} and + * {@link #failure(String, String, long, Long, LifecycleExecutionState, Exception)} methods. + */ +public class ILMHistoryItem implements ToXContentObject { + private static final ParseField INDEX = new ParseField("index"); + private static final ParseField POLICY = new ParseField("policy"); + private static final ParseField TIMESTAMP = new ParseField("@timestamp"); + private static final ParseField INDEX_AGE = new ParseField("index_age"); + private static final ParseField SUCCESS = new ParseField("success"); + private static final ParseField EXECUTION_STATE = new ParseField("state"); + private static final ParseField ERROR = new ParseField("error_details"); + + private final String index; + private final String policyId; + private final long timestamp; + @Nullable + private final Long indexAge; + private final boolean success; + @Nullable + private final LifecycleExecutionState executionState; + @Nullable + private final String errorDetails; + + private ILMHistoryItem(String index, String policyId, long timestamp, @Nullable Long indexAge, boolean success, + @Nullable LifecycleExecutionState executionState, @Nullable String errorDetails) { + this.index = index; + this.policyId = policyId; + this.timestamp = timestamp; + this.indexAge = indexAge; + this.success = success; + this.executionState = executionState; + this.errorDetails = errorDetails; + } + + public static ILMHistoryItem success(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState) { + return new ILMHistoryItem(index, policyId, timestamp, indexAge, true, executionState, null); + } + + public static ILMHistoryItem failure(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState, Exception error) { + Objects.requireNonNull(error, "ILM failures require an attached exception"); + return new ILMHistoryItem(index, policyId, timestamp, indexAge, false, executionState, exceptionToString(error)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(INDEX.getPreferredName(), index); + builder.field(POLICY.getPreferredName(), policyId); + builder.field(TIMESTAMP.getPreferredName(), timestamp); + if (indexAge != null) { + builder.field(INDEX_AGE.getPreferredName(), indexAge); + } + builder.field(SUCCESS.getPreferredName(), success); + if (executionState != null) { + builder.field(EXECUTION_STATE.getPreferredName(), executionState.asMap()); + } + if (errorDetails != null) { + builder.field(ERROR.getPreferredName(), errorDetails); + } + builder.endObject(); + return builder; + } + + private static String exceptionToString(Exception exception) { + Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } catch (IOException e) { + // In the unlikely case that we cannot generate an exception string, + // try the best way can to encapsulate the error(s) with at least + // the message + exceptionString = "unable to generate the ILM error details due to: " + e.getMessage() + + "; the ILM error was: " + exception.getMessage(); + } + return exceptionString; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java new file mode 100644 index 0000000000000..96c54e5adfca3 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry.INDEX_TEMPLATE_VERSION; + +/** + * The {@link ILMHistoryStore} handles indexing {@link ILMHistoryItem} documents into the + * appropriate index. It sets up a {@link BulkProcessor} for indexing in bulk, and handles creation + * of the index/alias as needed for ILM policies. + */ +public class ILMHistoryStore implements Closeable { + private static final Logger logger = LogManager.getLogger(ILMHistoryStore.class); + + public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; + public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; + + private final boolean ilmHistoryEnabled; + private final BulkProcessor processor; + private final ThreadPool threadPool; + + public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) { + this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + this.threadPool = threadPool; + + this.processor = BulkProcessor.builder( + new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + // Prior to actually performing the bulk, we should ensure the index exists, and + // if we were unable to create it or it was in a bad state, we should not + // attempt to index documents. + try { + final CompletableFuture indexCreated = new CompletableFuture<>(); + ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete, + ex -> { + logger.warn("failed to create ILM history store index prior to issuing bulk request", ex); + indexCreated.completeExceptionally(ex); + })); + indexCreated.get(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to index the following ILM history items:\n{}", + request.requests().stream() + .filter(dwr -> (dwr instanceof IndexRequest)) + .map(dwr -> ((IndexRequest) dwr)) + .map(IndexRequest::sourceAsMap) + .map(Object::toString) + .collect(Collectors.joining("\n"))), e); + throw new ElasticsearchException(e); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + long items = request.numberOfActions(); + if (logger.isTraceEnabled()) { + logger.trace("indexed [{}] items into ILM history index [{}]", items, + Arrays.stream(response.getItems()) + .map(BulkItemResponse::getIndex) + .distinct() + .collect(Collectors.joining(","))); + } + if (response.hasFailures()) { + Map failures = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed) + .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); + logger.error("failures: [{}]", failures); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + long items = request.numberOfActions(); + logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure); + } + }) + .setBulkActions(100) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(1) + .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3)) + .build(); + } + + /** + * Attempts to asynchronously index an ILM history entry + */ + public void putAsync(ILMHistoryItem item) { + if (ilmHistoryEnabled == false) { + logger.trace("not recording ILM history item because [{}] is [false]: [{}]", + LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); + return; + } + logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); + // TODO: remove the threadpool wrapping when the .add call is non-blocking + // (it can currently execute the bulk request occasionally) + // see: https://github.com/elastic/elasticsearch/issues/50440 + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + processor.add(request); + } catch (Exception e) { + logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), e); + } + }); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), exception); + } + } + + /** + * Checks if the ILM history index exists, and if not, creates it. + * + * @param client The client to use to create the index if needed + * @param state The current cluster state, to determine if the alias exists + * @param listener Called after the index has been created. `onResponse` called with `true` if the index was created, + * `false` if it already existed. + */ + static void ensureHistoryIndex(Client client, ClusterState state, ActionListener listener) { + final String initialHistoryIndexName = ILM_HISTORY_INDEX_PREFIX + "000001"; + final AliasOrIndex ilmHistory = state.metaData().getAliasAndIndexLookup().get(ILM_HISTORY_ALIAS); + final AliasOrIndex initialHistoryIndex = state.metaData().getAliasAndIndexLookup().get(initialHistoryIndexName); + + if (ilmHistory == null && initialHistoryIndex == null) { + // No alias or index exists with the expected names, so create the index with appropriate alias + logger.debug("creating ILM history index [{}]", initialHistoryIndexName); + client.admin().indices().prepareCreate(initialHistoryIndexName) + .setWaitForActiveShards(1) + .addAlias(new Alias(ILM_HISTORY_ALIAS) + .writeIndex(true)) + .execute(new ActionListener() { + @Override + public void onResponse(CreateIndexResponse response) { + listener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + // The index didn't exist before we made the call, there was probably a race - just ignore this + logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call", + initialHistoryIndexName); + listener.onResponse(false); + } else { + listener.onFailure(e); + } + } + }); + } else if (ilmHistory == null) { + // alias does not exist but initial index does, something is broken + listener.onFailure(new IllegalStateException("ILM history index [" + initialHistoryIndexName + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + } else if (ilmHistory.isAlias() && ilmHistory instanceof AliasOrIndex.Alias) { + if (((AliasOrIndex.Alias) ilmHistory).getWriteIndex() != null) { + // The alias exists and has a write index, so we're good + listener.onResponse(false); + } else { + // The alias does not have a write index, so we can't index into it + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + "does not have a write index")); + } + } else if (ilmHistory.isAlias() == false) { + // This is not an alias, error out + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + } else { + logger.error("unexpected IndexOrAlias for [{}]: [{}]", ILM_HISTORY_ALIAS, ilmHistory); + assert false : ILM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously"; + } + } + + @Override + public void close() { + try { + processor.awaitClose(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java new file mode 100644 index 0000000000000..21b2d16afdc83 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; + +import java.util.Collections; +import java.util.List; + +/** + * The {@link ILMHistoryTemplateRegistry} class sets up and configures an ILM policy and index + * template for the ILM history indices (ilm-history-N-00000M). + */ +public class ILMHistoryTemplateRegistry extends IndexTemplateRegistry { + // history (please add a comment why you increased the version here) + // version 1: initial + public static final String INDEX_TEMPLATE_VERSION = "1"; + + public static final String ILM_TEMPLATE_VERSION_VARIABLE = "xpack.ilm_history.template.version"; + public static final String ILM_TEMPLATE_NAME = "ilm-history"; + + public static final String ILM_POLICY_NAME = "ilm-history-ilm-policy"; + + public static final IndexTemplateConfig TEMPLATE_ILM_HISTORY = new IndexTemplateConfig( + ILM_TEMPLATE_NAME, + "/ilm-history.json", + INDEX_TEMPLATE_VERSION, + ILM_TEMPLATE_VERSION_VARIABLE + ); + + public static final LifecyclePolicyConfig ILM_HISTORY_POLICY = new LifecyclePolicyConfig( + ILM_POLICY_NAME, + "/ilm-history-ilm-policy.json" + ); + + private final boolean ilmHistoryEnabled; + + public ILMHistoryTemplateRegistry(Settings nodeSettings, ClusterService clusterService, + ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + this.ilmHistoryEnabled = LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + } + + @Override + protected List getTemplateConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(TEMPLATE_ILM_HISTORY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected List getPolicyConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(ILM_HISTORY_POLICY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected String getOrigin() { + return ClientHelper.INDEX_LIFECYCLE_ORIGIN; + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java index d768714ba65fe..2179898b5a4a7 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java @@ -107,7 +107,8 @@ protected Settings nodeSettings(int nodeOrdinal) { settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); - // This is necessary to prevent SLM installing a lifecycle policy, these tests assume a blank slate + // This is necessary to prevent ILM and SLM installing a lifecycle policy, these tests assume a blank slate + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return settings.build(); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 2580e2970e521..faa1270f479aa 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -55,6 +55,8 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -91,6 +93,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private static final NamedXContentRegistry REGISTRY; private ThreadPool threadPool; + private Client noopClient; + private NoOpHistoryStore historyStore; static { try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { @@ -100,12 +104,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } @Before - public void prepareThreadPool() { + public void prepare() { threadPool = new TestThreadPool("test"); + noopClient = new NoOpClient(threadPool); + historyStore = new NoOpHistoryStore(); } @After public void shutdown() { + historyStore.close(); + noopClient.close(); threadPool.shutdownNow(); } @@ -114,7 +122,7 @@ public void testRunPolicyTerminalPolicyStep() { TerminalPolicyStep step = TerminalPolicyStep.INSTANCE; PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -136,7 +144,7 @@ public void testRunPolicyErrorStep() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(false); @@ -176,7 +184,7 @@ public void testRunPolicyErrorStepOnRetryableFailedStep() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(true); @@ -221,7 +229,7 @@ public void testRunStateChangePolicyWithNoNextStep() throws Exception { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -282,7 +290,8 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -303,6 +312,14 @@ public void testRunStateChangePolicyWithNextStep() throws Exception { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":" + stepTime + + ",\"success\":true,\"state\":{\"phase\":\"phase\",\"action\":\"action\"," + + "\"step\":\"next_cluster_state_action_step\",\"step_time\":\"" + stepTime + "\"}}")); } public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { @@ -357,7 +374,8 @@ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); if (asyncAction) { @@ -409,7 +427,7 @@ public void testRunAsyncActionDoesNotRun() { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); // State changes should not run AsyncAction steps @@ -468,7 +486,7 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -488,6 +506,13 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":0,\"success\":true," + + "\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}")); } public void testRunPeriodicStep() throws Exception { @@ -535,7 +560,7 @@ public void testRunPeriodicStep() throws Exception { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -558,7 +583,7 @@ public void testRunPolicyClusterStateActionStep() { MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -576,7 +601,7 @@ public void testRunPolicyClusterStateWaitStep() { step.setWillComplete(true); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -595,7 +620,7 @@ public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -613,7 +638,7 @@ public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -627,7 +652,7 @@ public void testRunPolicyThatDoesntExist() { String policyName = "cluster_state_action_policy"; ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null), - clusterService, threadPool, () -> 0L); + historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown @@ -712,7 +737,8 @@ public void testIsReadyToTransition() { stepMap, NamedXContentRegistry.EMPTY, null); ClusterService clusterService = mock(ClusterService.class); final AtomicLong now = new AtomicLong(5); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, threadPool, now::get); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, historyStore, + clusterService, threadPool, now::get); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) @@ -1086,4 +1112,23 @@ public static MockPolicyStepsRegistry createMultiStepPolicyStepRegistry(String p when(client.settings()).thenReturn(Settings.EMPTY); return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); } + + private class NoOpHistoryStore extends ILMHistoryStore { + + private final List items = new ArrayList<>(); + + NoOpHistoryStore() { + super(Settings.EMPTY, noopClient, null, null); + } + + public List getItems() { + return items; + } + + @Override + public void putAsync(ILMHistoryItem item) { + logger.info("--> adding ILM history item: [{}]", item); + items.add(item); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index a7f15419d3718..2bf054d318fb5 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -108,7 +108,7 @@ public void prepareServices() { threadPool = new TestThreadPool("test"); indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool, - clock, () -> now, null); + clock, () -> now, null, null); Mockito.verify(clusterService).addListener(indexLifecycleService); Mockito.verify(clusterService).addStateApplier(indexLifecycleService); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java index fa2a626b0f9bc..39b3fca46c1f1 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java @@ -75,7 +75,7 @@ public void testExecuteSuccessfullyMoved() throws IOException { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey)); + (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey), state -> {}); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); @@ -101,7 +101,7 @@ public void testExecuteNoopDifferentStep() throws IOException { Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE"); setStateToKey(notCurrentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -113,7 +113,7 @@ public void testExecuteNoopDifferentPolicy() throws IOException { setStateToKey(currentStepKey); setStatePolicy("not-" + policy); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -126,7 +126,7 @@ public void testOnFailure() { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java new file mode 100644 index 0000000000000..2e3f76c6370e7 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class ILMHistoryItemTests extends ESTestCase { + + public void testToXContent() throws IOException { + ILMHistoryItem success = ILMHistoryItem.success("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{}") + .setStepInfo("{\"step_info\": \"foo\"") + .build()); + + ILMHistoryItem failure = ILMHistoryItem.failure("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("ERROR") + .setFailedStep("step") + .setFailedStepRetryCount(7) + .setIsAutoRetryableError(true) + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{\"phase_json\": \"eggplant\"}") + .setStepInfo("{\"step_info\": \"foo\"") + .build(), + new IllegalArgumentException("failure")); + + try (XContentBuilder builder = jsonBuilder()) { + success.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, equalTo("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":true," + + "\"state\":{\"phase\":\"phase\"," + + "\"phase_definition\":\"{}\"," + + "\"action_time\":\"20\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\",\"action\":\"action\",\"step\":\"step\",\"step_time\":\"30\"}}" + )); + } + + try (XContentBuilder builder = jsonBuilder()) { + failure.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, startsWith("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":false," + + "\"state\":{\"phase\":\"phase\"," + + "\"failed_step\":\"step\"," + + "\"phase_definition\":\"{\\\"phase_json\\\": \\\"eggplant\\\"}\"," + + "\"action_time\":\"20\"," + + "\"is_auto_retryable_error\":\"true\"," + + "\"failed_step_retry_count\":\"7\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\"," + + "\"action\":\"action\"," + + "\"step\":\"ERROR\"," + + "\"step_time\":\"30\"}," + + "\"error_details\":\"{\\\"type\\\":\\\"illegal_argument_exception\\\"," + + "\\\"reason\\\":\\\"failure\\\"," + + "\\\"stack_trace\\\":\\\"java.lang.IllegalArgumentException: failure")); + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java new file mode 100644 index 0000000000000..948c017d9d0f7 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -0,0 +1,398 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexAction; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriFunction; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_ALIAS; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_INDEX_PREFIX; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class ILMHistoryStoreTests extends ESTestCase { + + private ThreadPool threadPool; + private VerifyingClient client; + private ClusterService clusterService; + private ILMHistoryStore historyStore; + + @Before + public void setup() { + threadPool = new TestThreadPool(this.getClass().getName()); + client = new VerifyingClient(threadPool); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService, threadPool); + } + + @After + public void setdown() { + historyStore.close(); + clusterService.close(); + client.close(); + threadPool.shutdownNow(); + } + + public void testNoActionIfDisabled() throws Exception { + Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); + try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null, threadPool)) { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null); + + CountDownLatch latch = new CountDownLatch(1); + client.setVerifier((a, r, l) -> { + fail("the history store is disabled, no action should have been taken"); + latch.countDown(); + return null; + }); + disabledHistoryStore.putAsync(record); + latch.await(10, TimeUnit.SECONDS); + } + } + + @SuppressWarnings("unchecked") + public void testPut() throws Exception { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + { + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build()); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> assertEquals(ILM_HISTORY_ALIAS, dwr.index())); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId("index", "uuid", 0), "_doc", randomAlphaOfLength(10), 1, 1, 1, true))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + { + final String cause = randomAlphaOfLength(9); + Exception failureException = new RuntimeException(cause); + ILMHistoryItem record = ILMHistoryItem.failure("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build(), failureException); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> { + assertEquals(ILM_HISTORY_ALIAS, dwr.index()); + assertThat(dwr, instanceOf(IndexRequest.class)); + IndexRequest ir = (IndexRequest) dwr; + String indexedDocument = ir.source().utf8ToString(); + assertThat(indexedDocument, Matchers.containsString("runtime_exception")); + assertThat(indexedDocument, Matchers.containsString(cause)); + }); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses with failures + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "_doc", i + "", failureException))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + } + + public void testHistoryIndexNeedsCreation() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + return new CreateIndexResponse(true, true, request.index()); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertTrue, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexProperlyExistsAlready() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .writeIndex(true) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexHasNoWriteIndex() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build())) + .put(IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "does not have a write index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexNotAlias() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_ALIAS) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexCreatedConcurrently() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + throw new ResourceAlreadyExistsException("that index already exists"); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryAliasDoesntExistButIndexDoes() throws InterruptedException { + final String initialIndex = ILM_HISTORY_INDEX_PREFIX + "000001"; + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(initialIndex) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + response -> { + logger.error(response); + fail("should have called onFailure, not onResponse"); + }, + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history index [" + initialIndex + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void assertContainsMap(String indexedDocument, Map map) { + map.forEach((k, v) -> { + assertThat(indexedDocument, Matchers.containsString(k)); + if (v instanceof Map) { + assertContainsMap(indexedDocument, (Map) v); + } + if (v instanceof Iterable) { + ((Iterable) v).forEach(elem -> { + assertThat(indexedDocument, Matchers.containsString(elem.toString())); + }); + } else { + assertThat(indexedDocument, Matchers.containsString(v.toString())); + } + }); + } + + /** + * A client that delegates to a verifying function for action/request/listener + */ + public static class VerifyingClient extends NoOpClient { + + private TriFunction, ActionRequest, ActionListener, ActionResponse> verifier = (a, r, l) -> { + fail("verifier not set"); + return null; + }; + + VerifyingClient(ThreadPool threadPool) { + super(threadPool); + } + + @Override + @SuppressWarnings("unchecked") + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + try { + listener.onResponse((Response) verifier.apply(action, request, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + VerifyingClient setVerifier(TriFunction, ActionRequest, ActionListener, ActionResponse> verifier) { + this.verifier = verifier; + return this; + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index 453aa2555c134..34d208337f1ff 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; @@ -89,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) { settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); return settings.build(); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java index d716e2e479f4a..f8aad5d1d1d6c 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java @@ -58,6 +58,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) // .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) // we do this by default in core, but for monitoring this isn't needed and only adds noise. + .put("index.lifecycle.history_index_enabled", false) .put("index.store.mock.check_index_on_close", false); return builder.build(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 7060e9f4555ee..771cff9f5d034 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -118,6 +118,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // watcher settings that should work despite randomization .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) + .put("index.lifecycle.history_index_enabled", false) // SLM can cause timing issues during testsuite teardown: https://github.com/elastic/elasticsearch/issues/50302 // SLM is not required for tests extending from this base class and only add noise. .put("xpack.slm.enabled", false)