Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High volume of ingest traffic can cause Enrich to deadlock #55634

Closed
jbaiera opened this issue Apr 22, 2020 · 8 comments
Closed

High volume of ingest traffic can cause Enrich to deadlock #55634

jbaiera opened this issue Apr 22, 2020 · 8 comments
Labels
>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team v7.7.1 v7.8.1 v7.9.0 v8.0.0-alpha1

Comments

@jbaiera
Copy link
Member

jbaiera commented Apr 22, 2020

Enrich processors all route their search requests through the EnrichCoordinatorProxyAction, which collects enrichment search requests together in order to collapse them down and submit them in one multi-search request. The coordinator maintains an internal queue of search requests for this purpose. Each thread entering the coordinator adds to this queue, then atomically drains the contents into a multi-search request which is executed asynchronously on a search thread. A maximum number of in-flight search requests is allowed (default 8). If that limit is reached then the coordinator simply queues ingest documents up until a new multi-search request can be executed. When the enrich coordinator queue reaches maximum capacity (1024 requests by default) it blocks the write thread under the assumption that a search request will eventually complete and begin draining the queue. This is meant to create a back pressure to the rest of the ingestion framework.

The discovered bug pertains to when the search thread completes the enrich lookup. When the multi-search completes, the search thread calls the response handler for the search. This handler simply returns to the ingestion framework and begins processing the next set of processors in the pipeline, potentially even the next document in the bulk request. Since the pipeline contains an enrich processor, the search thread will attempt to add a search request to the coordinator queue when it reaches it, just like a write request would. If this queue is filled, then the search thread is captured waiting for the queue to drain, just as the write threads would be. No threads would be able to pass this critical section to drain the queue and schedule the next search results. Thus, a deadlock arises, consuming the write threads and a portion of the read threads on a node.

Normally, even though search threads are erroneously captured to perform ingestion work, they are eventually released back to the search pool once the bulk request they are stuck in completes processing. This may have been why this bug flew under the radar and only manifests in cases where there is high load placed on the enrich system for an extended period of time. If the write threads are able to create more search requests in the coordination queue than the search threads can keep up with, then the system will degrade until it passes the queue capacity and locks in place.

@jbaiera jbaiera added >bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP labels Apr 22, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (:Core/Features/Ingest)

@jakelandis
Copy link
Contributor

jakelandis commented Apr 23, 2020

Nice find @jbaiera !

I think this deadlock can only happen if a pipeline processor is defined after an enrich processor and the subsequent pipeline also has an enrich processor, and the system is under heavy load.

The root cause of the deadlock, I do believe is as Jimmy mentions - the search thread ends up processing (parts) of the ingest pipeline(s) and end up getting hung up on the same queue.put method that the write threads are stuck on. Meaning that the search threads needs to complete to allow the write thread to complete, but the search threads can not because they are blocked by the same resource that the write thread is blocked on.

I did some debugging with the following additional logging: https://gist.github.com/jakelandis/61d18359baa325c6c12b40b8d015e798 (log in comments of gist)

Using the following repro case, i was able to see search thread processing the ingest document. I believe the fix here is to ensure that that CompoundProcessor

POST _bulk
{ "index" : { "_index" : "mysource"} }
{ "my_number" : 1, "my_value" : "a" }

PUT _enrich/policy/myenrich_policy
{
   "match": {
        "indices": "mysource",
        "match_field": "my_number",
        "enrich_fields": ["my_value"]
    }
}

POST _enrich/policy/myenrich_policy/_execute

PUT /_ingest/pipeline/myset_pipeline
{
  "processors": [
    {
      "set": {
        "field": "my_set_field",
        "value": "foobar"
      }
    }
  ]
}

PUT /_ingest/pipeline/myenrich_pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "myenrich_policy",
        "field": "custom_id",
        "target_field": "enrich_value"
      }
    },
    {
      "pipeline": {
        "name": "myset_pipeline"
      }
    }
  ]
}


POST myindex/_doc/1?pipeline=myenrich_pipeline
{
  "custom_id" : 1
}

The relevant (custom) logs are:

[2020-04-22T21:44:53,702][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][write][T#6]] Draining queue
[2020-04-22T21:44:53,717][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] Calling slot.actionListener
[2020-04-22T21:44:53,718][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] innerExecute -> result != null
[2020-04-22T21:44:53,718][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]] Processing ingest document
[2020-04-22T21:44:53,719][WARN ][*********************    ] [runTask-0] [elasticsearch[runTask-0][search][T#16]]java.base/java.lang.Thread.getStackTrace(Thread.java:1598)
org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:106)
org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:651)
org.elasticsearch.ingest.PipelineProcessor.execute(PipelineProcessor.java:45)
org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:141)
org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:161)
org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$execute$0(AbstractEnrichProcessor.java:130)
org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$createSearchRunner$1(AbstractEnrichProcessor.java:182)
org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.handleResponse(EnrichCoordinatorProxyAction.java:161)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$coordinateLookups$1(EnrichCoordinatorProxyAction.java:143)
org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$lookupFunction$4(EnrichCoordinatorProxyAction.java:206)
org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:261)
org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:247)
org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1101)
org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1179)
org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1159)
org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:54)
org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:47)
org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:30)
org.elasticsearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:58)
org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:73)
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:691)
org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
java.base/java.lang.Thread.run(Thread.java:832)

Basically the execute pipeline via the CompoundProcessor.innerExecute is getting executed on the search thread. It needs to fork back to the write thread pool (or some other thread pool) to prevent the deadlock.

@jakelandis
Copy link
Contributor

And the following assertion gets trips when running from source code with assertions enabled (code in EnrichCoordinatorProxyAction):

assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
                || Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);

With the following repro:

POST _bulk
{ "index" : { "_index" : "mysource"} }
{ "my_number" : 1, "my_value" : "a" }
PUT _enrich/policy/myenrich_policy
{
   "match": {
        "indices": "mysource",
        "match_field": "my_number",
        "enrich_fields": ["my_value"]
    }
}


POST _enrich/policy/myenrich_policy/_execute
PUT /_ingest/pipeline/myenrich_pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "myenrich_policy",
        "field": "custom_id",
        "target_field": "enrich_value"
      }
    },
    {
      "pipeline": {
        "name": "myenrich_pipeline2"
      }
    }
  ]
}

PUT /_ingest/pipeline/myenrich_pipeline2
{
  "processors": [
    {
      "enrich": {
        "policy_name": "myenrich_policy",
        "field": "custom_id",
        "target_field": "enrich_value2"
      }
    }
    ]
}

POST myindex/_doc/1?pipeline=myenrich_pipeline
{
  "custom_id" : 1
}

@martijnvg
Copy link
Member

Good catch @jbaiera and thanks @jakelandis for the additional explanation and easy reproduction!

@jbaiera
Copy link
Member Author

jbaiera commented Apr 27, 2020

I was able to get a solid reproduction by eliding the assertions (similar to how a production runtime would do so) and dramatically throttling the maximum allowed throughput (only 1 concurrent search at a time, queue size set to 10).

Importantly, the pipeline must indeed contain two enrich processors, but they do not need to be separated by a pipeline processor. The deadlock still occurs when running a single pipeline with two processors back to back.

For reference, here are the stack traces that came out of the pipeline processor scenario:

"elasticsearch[node_s_0][write][T#1]" #32 daemon prio=5 os_prio=31 cpu=101.01ms elapsed=18.47s tid=0x00007f9b9b6c7800 nid=0xad03 waiting on condition  [0x0000700010ef0000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x00000007ed2cc738> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.ArrayBlockingQueue.put([email protected]/ArrayBlockingQueue.java:367)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.schedule(EnrichCoordinatorProxyAction.java:118)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$TransportAction.doExecute(EnrichCoordinatorProxyAction.java:74)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$TransportAction.doExecute(EnrichCoordinatorProxyAction.java:58)
	at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:88)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:64)
	at org.elasticsearch.tasks.TaskManager.registerAndExecute(TaskManager.java:151)
	at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:90)
	at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:79)
	at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:380)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$createSearchRunner$3(AbstractEnrichProcessor.java:179)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor$$Lambda$2924/0x0000000800a89040.accept(Unknown Source)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.execute(AbstractEnrichProcessor.java:101)
	at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:138)
	at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:124)
	at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:100)
	at org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:650)
	at org.elasticsearch.ingest.IngestService.innerExecute(IngestService.java:505)
	at org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:411)
	at org.elasticsearch.ingest.IngestService$3.doRun(IngestService.java:385)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:691)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run([email protected]/Thread.java:834)

"elasticsearch[node_s_0][search][T#2]" #35 daemon prio=5 os_prio=31 cpu=14.63ms elapsed=18.03s tid=0x00007f9b9a963000 nid=0x14e03 waiting on condition  [0x00007000111f8000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
	- parking to wait for  <0x00000007ed2cc738> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.ArrayBlockingQueue.put([email protected]/ArrayBlockingQueue.java:367)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.schedule(EnrichCoordinatorProxyAction.java:118)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$TransportAction.doExecute(EnrichCoordinatorProxyAction.java:74)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$TransportAction.doExecute(EnrichCoordinatorProxyAction.java:58)
	at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:88)
	at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:64)
	at org.elasticsearch.tasks.TaskManager.registerAndExecute(TaskManager.java:151)
	at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:90)
	at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:79)
	at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:380)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$createSearchRunner$3(AbstractEnrichProcessor.java:179)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor$$Lambda$2924/0x0000000800a89040.accept(Unknown Source)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.execute(AbstractEnrichProcessor.java:101)
	at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:138)
	at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:124)
	at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:100)
	at org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:650)
	at org.elasticsearch.ingest.PipelineProcessor.execute(PipelineProcessor.java:45)
	at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:138)
	at org.elasticsearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:157)
	at org.elasticsearch.ingest.CompoundProcessor$$Lambda$2836/0x0000000800a5e840.accept(Unknown Source)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$execute$0(AbstractEnrichProcessor.java:130)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor$$Lambda$2927/0x0000000800a89c40.accept(Unknown Source)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor.lambda$createSearchRunner$1(AbstractEnrichProcessor.java:182)
	at org.elasticsearch.xpack.enrich.AbstractEnrichProcessor$$Lambda$2928/0x0000000800a8a040.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
	at org.elasticsearch.client.node.NodeClient$$Lambda$2313/0x00000008008de040.accept(Unknown Source)
	at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
	at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.handleResponse(EnrichCoordinatorProxyAction.java:164)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$coordinateLookups$1(EnrichCoordinatorProxyAction.java:146)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator$$Lambda$2931/0x0000000800a8ac40.accept(Unknown Source)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator.lambda$lookupFunction$4(EnrichCoordinatorProxyAction.java:209)
	at org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction$Coordinator$$Lambda$2933/0x0000000800a8b440.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.client.node.NodeClient.lambda$executeLocally$0(NodeClient.java:91)
	at org.elasticsearch.client.node.NodeClient$$Lambda$2313/0x00000008008de040.accept(Unknown Source)
	at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:158)
	at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:151)
	at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:261)
	at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleResponse(TransportSingleShardAction.java:247)
	at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1101)
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1179)
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1159)
	at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:54)
	at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:47)
	at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:30)
	at org.elasticsearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:58)
	at org.elasticsearch.action.ActionRunnable$$Lambda$2847/0x0000000800a62040.accept(Unknown Source)
	at org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:73)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:44)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:691)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run([email protected]/Thread.java:834)

@jbaiera
Copy link
Member Author

jbaiera commented Jul 6, 2020

Closing this since the deadlock is mitigated with #56247. Further discussion about the temporary capture of search threads will take place in #56450.

@jmp601
Copy link

jmp601 commented Dec 21, 2022

How do we increase the 1024 max capacity of the enrich coordinator? We are just barely tripping that breaker and would like to adjust that up a bit.

@jbaiera
Copy link
Member Author

jbaiera commented Jan 9, 2023

@jmp601 By default, the queue size is set to the number of concurrent enrich search operations allowed at a time (default 8) times the number of enrich lookups per search operation (default 128).

You can increase the queue capacity for a node by giving a concrete value to enrich.coordinator_proxy.queue_capacity. That said, If the enrich queue is filling up then it means that you are not clearing enrich search requests fast enough for your ingest traffic. Increasing the queue size may not actually solve the root problem.

If you have enough extra headroom on your deployment to run more enrich operations at a time, you could look at increasing either enrich.coordinator_proxy.max_lookups_per_request (default 128) or enrich.coordinator_proxy.max_concurrent_requests (default 8). The queue's default size will automatically grow proportional to the new setting. This has the added benefit of being less likely to fill up the queue since more search capacity is being used by enrich operations. Potential downside to adjusting these settings is that you may cause more resource contention for search operations on your nodes that are doing heavy ingestion work. Be sure to test and balance your changes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team v7.7.1 v7.8.1 v7.9.0 v8.0.0-alpha1
Projects
None yet
Development

No branches or pull requests

6 participants