From 2b4e6f87605e2927e6448b16d1dfbbda26e0f3ad Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 20 Dec 2019 11:41:18 -0700 Subject: [PATCH] Make ILMHistoryStore.putAsync truly async (#50403) * Make ILMHistoryStore.putAsync truly async This moves the `putAsync` method in `ILMHistoryStore` never to block. Previously due to the way that the `BulkProcessor` works, it was possible for `BulkProcessor#add` to block executing a bulk request. This was bad as we may be adding things to the history store in cluster state update threads. This also moves the index creation to be done prior to the bulk request execution, rather than being checked every time an operation was added to the queue. This lessens the chance of the index being created, then deleted (by some external force), and then recreated via a bulk indexing request. Resolves #50353 --- .../xpack/ilm/IndexLifecycle.java | 3 +- .../xpack/ilm/history/ILMHistoryStore.java | 77 ++++++++++++++----- .../xpack/ilm/IndexLifecycleRunnerTests.java | 2 +- .../ilm/history/ILMHistoryStoreTests.java | 4 +- .../slm/SLMSnapshotBlockingIntegTests.java | 2 + 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 3d20496d38987..947d1752dad65 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -196,7 +196,8 @@ public Collection createComponents(Client client, ClusterService cluster @SuppressWarnings("unused") ILMHistoryTemplateRegistry ilmTemplateRegistry = new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); - ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), clusterService)); + ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), + clusterService, threadPool)); indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get())); components.add(indexLifecycleInitialisationService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java index ab8168de4b28d..96c54e5adfca3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -31,11 +32,13 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -54,26 +57,52 @@ public class ILMHistoryStore implements Closeable { public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; - private final Client client; - private final ClusterService clusterService; private final boolean ilmHistoryEnabled; private final BulkProcessor processor; + private final ThreadPool threadPool; - public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService) { - this.client = client; - this.clusterService = clusterService; - ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) { + this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + this.threadPool = threadPool; this.processor = BulkProcessor.builder( new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, new BulkProcessor.Listener() { @Override - public void beforeBulk(long executionId, BulkRequest request) { } + public void beforeBulk(long executionId, BulkRequest request) { + // Prior to actually performing the bulk, we should ensure the index exists, and + // if we were unable to create it or it was in a bad state, we should not + // attempt to index documents. + try { + final CompletableFuture indexCreated = new CompletableFuture<>(); + ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete, + ex -> { + logger.warn("failed to create ILM history store index prior to issuing bulk request", ex); + indexCreated.completeExceptionally(ex); + })); + indexCreated.get(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to index the following ILM history items:\n{}", + request.requests().stream() + .filter(dwr -> (dwr instanceof IndexRequest)) + .map(dwr -> ((IndexRequest) dwr)) + .map(IndexRequest::sourceAsMap) + .map(Object::toString) + .collect(Collectors.joining("\n"))), e); + throw new ElasticsearchException(e); + } + } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { long items = request.numberOfActions(); - logger.trace("indexed [{}] items into ILM history index", items); + if (logger.isTraceEnabled()) { + logger.trace("indexed [{}] items into ILM history index [{}]", items, + Arrays.stream(response.getItems()) + .map(BulkItemResponse::getIndex) + .distinct() + .collect(Collectors.joining(","))); + } if (response.hasFailures()) { Map failures = Arrays.stream(response.getItems()) .filter(BulkItemResponse::isFailed) @@ -105,18 +134,25 @@ public void putAsync(ILMHistoryItem item) { LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); return; } - logger.trace("about to index ILM history item in index [{}]: [{}]", ILM_HISTORY_ALIAS, item); - ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> { - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - item.toXContent(builder, ToXContent.EMPTY_PARAMS); - IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); - processor.add(request); - } catch (IOException exception) { - logger.error(new ParameterizedMessage("failed to index ILM history item in index [{}]: [{}]", - ILM_HISTORY_ALIAS, item), exception); - } - }, ex -> logger.error(new ParameterizedMessage("failed to ensure ILM history index exists, not indexing history item [{}]", - item), ex))); + logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); + // TODO: remove the threadpool wrapping when the .add call is non-blocking + // (it can currently execute the bulk request occasionally) + // see: https://github.com/elastic/elasticsearch/issues/50440 + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + processor.add(request); + } catch (Exception e) { + logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), e); + } + }); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), exception); + } } /** @@ -134,6 +170,7 @@ static void ensureHistoryIndex(Client client, ClusterState state, ActionListener if (ilmHistory == null && initialHistoryIndex == null) { // No alias or index exists with the expected names, so create the index with appropriate alias + logger.debug("creating ILM history index [{}]", initialHistoryIndexName); client.admin().indices().prepareCreate(initialHistoryIndexName) .setWaitForActiveShards(1) .addAlias(new Alias(ILM_HISTORY_ALIAS) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 130a77cf853dd..faa1270f479aa 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -1118,7 +1118,7 @@ private class NoOpHistoryStore extends ILMHistoryStore { private final List items = new ArrayList<>(); NoOpHistoryStore() { - super(Settings.EMPTY, noopClient, null); + super(Settings.EMPTY, noopClient, null, null); } public List getItems() { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java index 9d6d9964f2d8f..948c017d9d0f7 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -68,7 +68,7 @@ public void setup() { threadPool = new TestThreadPool(this.getClass().getName()); client = new VerifyingClient(threadPool); clusterService = ClusterServiceUtils.createClusterService(threadPool); - historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService); + historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService, threadPool); } @After @@ -81,7 +81,7 @@ public void setdown() { public void testNoActionIfDisabled() throws Exception { Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); - try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null)) { + try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null, threadPool)) { String policyId = randomAlphaOfLength(5); final long timestamp = randomNonNegativeLong(); ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index 453aa2555c134..34d208337f1ff 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; @@ -89,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) { settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); return settings.build(); }