diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index e285c8fef233e..043a415e4af36 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -65,7 +66,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi protected void doExecute(Task task, SearchRequest request, ActionListener listener) { // Write tp is expected when executing enrich processor from index / bulk api // Management tp is expected when executing enrich processor from ingest simulate api + // Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the + // search thread, which could end up here again if there is more than one enrich processor in a pipeline. assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) + || Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH) || Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); coordinator.schedule(request, listener); } @@ -76,6 +80,7 @@ public static class Coordinator { final BiConsumer> lookupFunction; final int maxLookupsPerRequest; final int maxNumberOfConcurrentRequests; + final int queueCapacity; final BlockingQueue queue; final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0); volatile long remoteRequestsTotal = 0; @@ -99,21 +104,30 @@ public Coordinator(Client client, Settings settings) { this.lookupFunction = lookupFunction; this.maxLookupsPerRequest = maxLookupsPerRequest; this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests; + this.queueCapacity = queueCapacity; this.queue = new ArrayBlockingQueue<>(queueCapacity); } void schedule(SearchRequest searchRequest, ActionListener listener) { - // Use put(...), because if queue is full then this method will wait until a free slot becomes available - // The calling thread here is a write thread (write tp is used by ingest) and - // this will create natural back pressure from the enrich processor. - // If there are no write threads available then write requests with ingestion will fail with 429 error code. - try { - queue.put(new Slot(searchRequest, listener)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("unable to add item to queue", e); - } + // Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous, + // especially since the logic to kick off draining the queue is located right after this section. If we + // cannot insert a request to the queue, we should reject the document with a 429 error code. + boolean accepted = queue.offer(new Slot(searchRequest, listener)); + int queueSize = queue.size(); + + // Coordinate lookups no matter what, even if queues were full. Search threads should be draining the queue, + // but they may be busy with processing the remaining work for enrich results. If there is more than one + // enrich processor in a pipeline, those search threads may find themselves here again before they can + // coordinate the next set of lookups. coordinateLookups(); + + if (accepted == false) { + listener.onFailure( + new EsRejectedExecutionException( + "Could not perform enrichment, " + "enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]" + ) + ); + } } CoordinatorStats getStats(String nodeId) { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java new file mode 100644 index 0000000000000..0f4bdd14fffb2 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java @@ -0,0 +1,275 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class EnrichResiliencyTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class); + } + + @Override + protected Settings nodeSettings() { + // Severely throttle the processing throughput to reach max capacity easier + return Settings.builder() + .put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1) + .put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1) + .put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10) + .build(); + } + + public void testWriteThreadLivenessBackToBack() throws Exception { + ensureGreen(); + + long testSuffix = System.currentTimeMillis(); + String enrichIndexName = "enrich_lookup_" + testSuffix; + String enrichPolicyName = "enrich_policy_" + testSuffix; + String enrichPipelineName = "enrich_pipeline_" + testSuffix; + String enrichedIndexName = "enrich_results_" + testSuffix; + + client().index( + new IndexRequest(enrichIndexName).source( + JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject() + ) + ).actionGet(); + + client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet(); + + client().execute( + PutEnrichPolicyAction.INSTANCE, + new PutEnrichPolicyAction.Request( + enrichPolicyName, + new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) + ) + ).actionGet(); + + client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true) + ).actionGet(); + + XContentBuilder pipe1 = JsonXContent.contentBuilder(); + pipe1.startObject(); + { + pipe1.startArray("processors"); + { + pipe1.startObject(); + { + pipe1.startObject("enrich"); + { + pipe1.field("policy_name", enrichPolicyName); + pipe1.field("field", "custom_id"); + pipe1.field("target_field", "enrich_value_1"); + } + pipe1.endObject(); + } + pipe1.endObject(); + pipe1.startObject(); + { + pipe1.startObject("enrich"); + { + pipe1.field("policy_name", enrichPolicyName); + pipe1.field("field", "custom_id"); + pipe1.field("target_field", "enrich_value_2"); + } + pipe1.endObject(); + } + pipe1.endObject(); + } + pipe1.endArray(); + } + pipe1.endObject(); + + client().execute( + PutPipelineAction.INSTANCE, + new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON) + ).actionGet(); + + client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet(); + + XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject(); + + BulkRequest bulk = new BulkRequest(enrichedIndexName); + bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); + for (int idx = 0; idx < 50; idx++) { + bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName)); + } + + BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); + + assertTrue(bulkItemResponses.hasFailures()); + BulkItemResponse.Failure firstFailure = null; + int successfulItems = 0; + for (BulkItemResponse item : bulkItemResponses.getItems()) { + if (item.isFailed() && firstFailure == null) { + firstFailure = item.getFailure(); + } else if (item.isFailed() == false) { + successfulItems++; + } + } + assertNotNull(firstFailure); + assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); + assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + + client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); + assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + } + + public void testWriteThreadLivenessWithPipeline() throws Exception { + ensureGreen(); + + long testSuffix = System.currentTimeMillis(); + String enrichIndexName = "enrich_lookup_" + testSuffix; + String enrichPolicyName = "enrich_policy_" + testSuffix; + String enrichPipelineName = "enrich_pipeline_" + testSuffix; + String enrichedIndexName = "enrich_results_" + testSuffix; + String enrichPipelineName1 = enrichPipelineName + "_1"; + String enrichPipelineName2 = enrichPipelineName + "_2"; + + client().index( + new IndexRequest(enrichIndexName).source( + JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject() + ) + ).actionGet(); + + client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet(); + + client().execute( + PutEnrichPolicyAction.INSTANCE, + new PutEnrichPolicyAction.Request( + enrichPolicyName, + new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) + ) + ).actionGet(); + + client().execute( + ExecuteEnrichPolicyAction.INSTANCE, + new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true) + ).actionGet(); + + XContentBuilder pipe1 = JsonXContent.contentBuilder(); + pipe1.startObject(); + { + pipe1.startArray("processors"); + { + pipe1.startObject(); + { + pipe1.startObject("enrich"); + { + pipe1.field("policy_name", enrichPolicyName); + pipe1.field("field", "custom_id"); + pipe1.field("target_field", "enrich_value_1"); + } + pipe1.endObject(); + } + pipe1.endObject(); + pipe1.startObject(); + { + pipe1.startObject("pipeline"); + { + pipe1.field("name", enrichPipelineName2); + } + pipe1.endObject(); + } + pipe1.endObject(); + } + pipe1.endArray(); + } + pipe1.endObject(); + + XContentBuilder pipe2 = JsonXContent.contentBuilder(); + pipe2.startObject(); + { + pipe2.startArray("processors"); + { + pipe2.startObject(); + { + pipe2.startObject("enrich"); + { + pipe2.field("policy_name", enrichPolicyName); + pipe2.field("field", "custom_id"); + pipe2.field("target_field", "enrich_value_2"); + } + pipe2.endObject(); + } + pipe2.endObject(); + } + pipe2.endArray(); + } + pipe2.endObject(); + + client().execute( + PutPipelineAction.INSTANCE, + new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON) + ).actionGet(); + + client().execute( + PutPipelineAction.INSTANCE, + new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON) + ).actionGet(); + + client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet(); + + XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject(); + + BulkRequest bulk = new BulkRequest(enrichedIndexName); + bulk.timeout(new TimeValue(10, TimeUnit.SECONDS)); + for (int idx = 0; idx < 50; idx++) { + bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1)); + } + + BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS)); + + assertTrue(bulkItemResponses.hasFailures()); + BulkItemResponse.Failure firstFailure = null; + int successfulItems = 0; + for (BulkItemResponse item : bulkItemResponses.getItems()) { + if (item.isFailed() && firstFailure == null) { + firstFailure = item.getFailure(); + } else if (item.isFailed() == false) { + successfulItems++; + } + } + assertNotNull(firstFailure); + assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429))); + assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + + client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet(); + assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value); + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java index 926fdd2c502f5..2086ea07a03b0 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/CoordinatorTests.java @@ -34,11 +34,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction.Coordinator; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; public class CoordinatorTests extends ESTestCase { @@ -189,31 +192,54 @@ public void testCoordinateLookupsMultiSearchItemError() { } } - public void testQueueing() throws Exception { + public void testNoBlockingWhenQueueing() throws Exception { MockLookupFunction lookupFunction = new MockLookupFunction(); + // Only one request allowed in flight. Queue size maxed at 1. Coordinator coordinator = new Coordinator(lookupFunction, 1, 1, 1); + + // Pre-load the queue to be at capacity and spoof the coordinator state to seem like max requests in flight. coordinator.queue.add(new Coordinator.Slot(new SearchRequest(), ActionListener.wrap(() -> {}))); + coordinator.remoteRequestsCurrent.incrementAndGet(); - AtomicBoolean completed = new AtomicBoolean(false); + // Try to schedule an item into the coordinator, should emit an exception SearchRequest searchRequest = new SearchRequest(); - Thread t = new Thread(() -> { - coordinator.schedule(searchRequest, ActionListener.wrap(() -> {})); - completed.set(true); - }); - t.start(); - assertBusy(() -> { - assertThat(t.getState(), equalTo(Thread.State.WAITING)); - assertThat(completed.get(), is(false)); - }); - - coordinator.coordinateLookups(); - assertBusy(() -> { assertThat(completed.get(), is(true)); }); - + final AtomicReference capturedException = new AtomicReference<>(); + coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set)); + + // Ensure rejection since queue is full + Exception rejectionException = capturedException.get(); + assertThat(rejectionException.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + + // Ensure that nothing was scheduled because max requests is already in flight + assertThat(lookupFunction.capturedConsumers, is(empty())); + + // Try to schedule again while max requests is not full. Ensure that despite the rejection, the queued request is sent. + coordinator.remoteRequestsCurrent.decrementAndGet(); + capturedException.set(null); + coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set)); + rejectionException = capturedException.get(); + assertThat(rejectionException.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity")); + assertThat(lookupFunction.capturedRequests.size(), is(1)); + assertThat(lookupFunction.capturedConsumers.size(), is(1)); + + // Schedule once more now, the queue should be able to accept the item, but will not schedule it yet + capturedException.set(null); + coordinator.schedule(searchRequest, ActionListener.wrap(response -> {}, capturedException::set)); + rejectionException = capturedException.get(); + assertThat(rejectionException, is(nullValue())); + assertThat(coordinator.queue.size(), is(1)); + assertThat(coordinator.remoteRequestsCurrent.get(), is(1)); + assertThat(lookupFunction.capturedRequests.size(), is(1)); + assertThat(lookupFunction.capturedConsumers.size(), is(1)); + + // Fulfill the captured consumer which will schedule the next item in the queue. lookupFunction.capturedConsumers.get(0) .accept( new MultiSearchResponse(new MultiSearchResponse.Item[] { new MultiSearchResponse.Item(emptySearchResponse(), null) }, 1L), null ); + + // Ensure queue was drained and that the item in it was scheduled assertThat(coordinator.queue.size(), equalTo(0)); assertThat(lookupFunction.capturedRequests.size(), equalTo(2)); assertThat(lookupFunction.capturedRequests.get(1).requests().get(0), sameInstance(searchRequest));