-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix enrich coordinator to reject documents instead of deadlocking #56247
Conversation
Pinging @elastic/es-core-features (:Core/Features/Ingest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left some minor comments.
...enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
Outdated
Show resolved
Hide resolved
boolean accepted = queue.offer(new Slot(searchRequest, listener)); | ||
int queueSize = queue.size(); | ||
|
||
// coordinate lookups no matter what, even if queues were full |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can describe why it is important to coordicate lookups even the queue is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a short comment on the code but wanted to mirror some thoughts here:
One of the issues with the code is that once the queue is full in the current version only a search thread can drain it. The search thread does so only after it completes processing the results of the multi-search, during which the thread may end up in this part of the code again. If the queue is full here, and the code does not coordinate lookups on the data in the queue no matter what, then the search thread will eventually fail all the records it's processing with 429 errors because they cannot enter the queue for the next enrich processor in the pipeline, essentially halting ingestion until the queues can accept writes again. All the while, the bulk threads are also rejecting documents, until a search thread can drain the queue a bit. If the queue fills up again while the search is running, when the search comes back, it too will reject all the documents it's processing at the time.
Now that I'm thinking about this more, scheduling lookups no matter what may solve the rejection problem at this layer, but it puts more strain on the search thread pool. I still think it is better though to rely on the thread pool task queues to regulate back pressure rather than this coordination queue, which to me seems more like a mechanism to facilitate combining multiple requests together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sharing your thoughts here.
I still think it is better though to rely on the thread pool task queues to regulate back pressure rather than this coordination queue, which to me seems more like a mechanism to facilitate combining multiple requests together.
Yes, this is the purpose of the coordination queue.
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous, | ||
// especially since the logic to kick off draining the queue is located right after this section. If we | ||
// cannot insert a request to the queue, we should reject the document with a 429 error code. | ||
boolean accepted = queue.offer(new Slot(searchRequest, listener)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java
Outdated
Show resolved
Hide resolved
...enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java
Show resolved
Hide resolved
@elasticmachine run elasticsearch-ci/bwc |
@elasticmachine run elasticsearch-ci/default-distro |
…astic#56247) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
…astic#56247) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
…astic#56247) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
…6247) (#57179) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
…6247) (#57188) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
…6247) (#57189) This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
Relates #55634
This does not solve the entire issue we have with #55634 - we still need to find a way to process the results of the search results not on search threads and in a way that does not flood the write thread pool queue with small tasks. We are weighing options and will be fixing that problem soon.