Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix enrich coordinator to reject documents instead of deadlocking #56247

Merged
merged 6 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -66,6 +67,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
// Write tp is expected when executing enrich processor from index / bulk api
// Management tp is expected when executing enrich processor from ingest simulate api
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
jbaiera marked this conversation as resolved.
Show resolved Hide resolved
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
coordinator.schedule(request, listener);
}
Expand All @@ -76,6 +78,7 @@ public static class Coordinator {
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
final int maxLookupsPerRequest;
final int maxNumberOfConcurrentRequests;
final int queueCapacity;
final BlockingQueue<Slot> queue;
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
volatile long remoteRequestsTotal = 0;
Expand All @@ -99,21 +102,24 @@ public Coordinator(Client client, Settings settings) {
this.lookupFunction = lookupFunction;
this.maxLookupsPerRequest = maxLookupsPerRequest;
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
this.queueCapacity = queueCapacity;
this.queue = new ArrayBlockingQueue<>(queueCapacity);
}

void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
// The calling thread here is a write thread (write tp is used by ingest) and
// this will create natural back pressure from the enrich processor.
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
try {
queue.put(new Slot(searchRequest, listener));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("unable to add item to queue", e);
}
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous,
// especially since the logic to kick off draining the queue is located right after this section. If we
// cannot insert a request to the queue, we should reject the document with a 429 error code.
boolean accepted = queue.offer(new Slot(searchRequest, listener));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

int queueSize = queue.size();

// coordinate lookups no matter what, even if queues were full
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can describe why it is important to coordicate lookups even the queue is full?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a short comment on the code but wanted to mirror some thoughts here:

One of the issues with the code is that once the queue is full in the current version only a search thread can drain it. The search thread does so only after it completes processing the results of the multi-search, during which the thread may end up in this part of the code again. If the queue is full here, and the code does not coordinate lookups on the data in the queue no matter what, then the search thread will eventually fail all the records it's processing with 429 errors because they cannot enter the queue for the next enrich processor in the pipeline, essentially halting ingestion until the queues can accept writes again. All the while, the bulk threads are also rejecting documents, until a search thread can drain the queue a bit. If the queue fills up again while the search is running, when the search comes back, it too will reject all the documents it's processing at the time.

Now that I'm thinking about this more, scheduling lookups no matter what may solve the rejection problem at this layer, but it puts more strain on the search thread pool. I still think it is better though to rely on the thread pool task queues to regulate back pressure rather than this coordination queue, which to me seems more like a mechanism to facilitate combining multiple requests together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing your thoughts here.

I still think it is better though to rely on the thread pool task queues to regulate back pressure rather than this coordination queue, which to me seems more like a mechanism to facilitate combining multiple requests together.

Yes, this is the purpose of the coordination queue.

coordinateLookups();

if (!accepted) {
jbaiera marked this conversation as resolved.
Show resolved Hide resolved
listener.onFailure(new EsRejectedExecutionException("Could not perform enrichment, " +
"enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]"));
}
}

CoordinatorStats getStats(String nodeId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.*;
jbaiera marked this conversation as resolved.
Show resolved Hide resolved

public class EnrichResiliencyTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class);
}

@Override
protected Settings nodeSettings() {
// Severely throttle the processing throughput to reach max capacity easier
return Settings.builder()
.put(XPackSettings.ENRICH_ENABLED_SETTING.getKey(), true)
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)
.build();
}

public void testWriteThreadLivenessBackToBack() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;

client().index(new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()))
.actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
)).actionGet();

client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(enrichPolicyName)
.setWaitForCompletion(true)).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_2");
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

client().execute(PutPipelineAction.INSTANCE, new PutPipelineRequest(
enrichPipelineName,
BytesReference.bytes(pipe1),
XContentType.JSON
)).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed()) {
assertThat(item.getFailure().getStatus().getStatus(), is(equalTo(429)));
assertThat(item.getFailureMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
break;
}
}

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
logger.info(client().search(new SearchRequest(enrichedIndexName)).actionGet().toString());
jbaiera marked this conversation as resolved.
Show resolved Hide resolved
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;
String enrichPipelineName1 = enrichPipelineName + "_1";
String enrichPipelineName2 = enrichPipelineName + "_2";

client().index(new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()))
.actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
)).actionGet();

client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(enrichPolicyName)
.setWaitForCompletion(true)).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("pipeline");
{
pipe1.field("name", enrichPipelineName2);
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

XContentBuilder pipe2 = JsonXContent.contentBuilder();
pipe2.startObject();
{
pipe2.startArray("processors");
{
pipe2.startObject();
{
pipe2.startObject("enrich");
{
pipe2.field("policy_name", enrichPolicyName);
pipe2.field("field", "custom_id");
pipe2.field("target_field", "enrich_value_2");
}
pipe2.endObject();
}
pipe2.endObject();
}
pipe2.endArray();
}
pipe2.endObject();

client().execute(PutPipelineAction.INSTANCE, new PutPipelineRequest(
enrichPipelineName1,
BytesReference.bytes(pipe1),
XContentType.JSON
)).actionGet();

client().execute(PutPipelineAction.INSTANCE, new PutPipelineRequest(
enrichPipelineName2,
BytesReference.bytes(pipe2),
XContentType.JSON
)).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed()) {
assertThat(item.getFailure().getStatus().getStatus(), is(equalTo(429)));
assertThat(item.getFailureMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
break;
}
}

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
logger.info(client().search(new SearchRequest(enrichedIndexName)).actionGet().toString());
}
}