Skip to content

Commit

Permalink
Fix enrich coordinator to reject documents instead of deadlocking (el…
Browse files Browse the repository at this point in the history
…astic#56247)

This PR removes the blocking call to insert ingest documents into a queue in the
coordinator. It replaces it with an offer call which will throw a rejection exception
in the event that the queue is full. This prevents deadlocks of the write threads
when the queue fills to capacity and there are more than one enrich processors
in a pipeline.
  • Loading branch information
jbaiera committed May 26, 2020
1 parent 92e127e commit abb5181
Show file tree
Hide file tree
Showing 3 changed files with 354 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -65,7 +66,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
// Write tp is expected when executing enrich processor from index / bulk api
// Management tp is expected when executing enrich processor from ingest simulate api
// Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the
// search thread, which could end up here again if there is more than one enrich processor in a pipeline.
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
coordinator.schedule(request, listener);
}
Expand All @@ -76,6 +80,7 @@ public static class Coordinator {
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
final int maxLookupsPerRequest;
final int maxNumberOfConcurrentRequests;
final int queueCapacity;
final BlockingQueue<Slot> queue;
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
volatile long remoteRequestsTotal = 0;
Expand All @@ -99,21 +104,30 @@ public Coordinator(Client client, Settings settings) {
this.lookupFunction = lookupFunction;
this.maxLookupsPerRequest = maxLookupsPerRequest;
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
this.queueCapacity = queueCapacity;
this.queue = new ArrayBlockingQueue<>(queueCapacity);
}

void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
// The calling thread here is a write thread (write tp is used by ingest) and
// this will create natural back pressure from the enrich processor.
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
try {
queue.put(new Slot(searchRequest, listener));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("unable to add item to queue", e);
}
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous,
// especially since the logic to kick off draining the queue is located right after this section. If we
// cannot insert a request to the queue, we should reject the document with a 429 error code.
boolean accepted = queue.offer(new Slot(searchRequest, listener));
int queueSize = queue.size();

// Coordinate lookups no matter what, even if queues were full. Search threads should be draining the queue,
// but they may be busy with processing the remaining work for enrich results. If there is more than one
// enrich processor in a pipeline, those search threads may find themselves here again before they can
// coordinate the next set of lookups.
coordinateLookups();

if (accepted == false) {
listener.onFailure(
new EsRejectedExecutionException(
"Could not perform enrichment, " + "enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]"
)
);
}
}

CoordinatorStats getStats(String nodeId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class EnrichResiliencyTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class);
}

@Override
protected Settings nodeSettings() {
// Severely throttle the processing throughput to reach max capacity easier
return Settings.builder()
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)
.build();
}

public void testWriteThreadLivenessBackToBack() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;

client().index(
new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
)
).actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(
PutEnrichPolicyAction.INSTANCE,
new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(enrichIndexName),
"my_key",
Collections.singletonList("my_value")
)
)
).actionGet();

client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_2");
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON)
).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;
String enrichPipelineName1 = enrichPipelineName + "_1";
String enrichPipelineName2 = enrichPipelineName + "_2";

client().index(
new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
)
).actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(
PutEnrichPolicyAction.INSTANCE,
new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(enrichIndexName),
"my_key",
Collections.singletonList("my_value")
)
)
).actionGet();

client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("pipeline");
{
pipe1.field("name", enrichPipelineName2);
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

XContentBuilder pipe2 = JsonXContent.contentBuilder();
pipe2.startObject();
{
pipe2.startArray("processors");
{
pipe2.startObject();
{
pipe2.startObject("enrich");
{
pipe2.field("policy_name", enrichPolicyName);
pipe2.field("field", "custom_id");
pipe2.field("target_field", "enrich_value_2");
}
pipe2.endObject();
}
pipe2.endObject();
}
pipe2.endArray();
}
pipe2.endObject();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON)
).actionGet();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON)
).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}
}
Loading

0 comments on commit abb5181

Please sign in to comment.