Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoiding BulkProcessor deadlock in ILMHistoryStore #91238

Merged
merged 64 commits into from
Jan 9, 2023

Conversation

masseyke
Copy link
Member

@masseyke masseyke commented Nov 1, 2022

We have been seeing deadlocks in ILMHistoryStore in production (#68468). The deadlock appears to be caused by the fact that BulkProcessor uses two locks (BulkProcessor.lock and BulkRequestHandler.semaphore) and holds onto the latter lock for an indefinite amount of time.

This PR avoids deadlock by using a new BulkProcessor2 that does not require that both locks be held at once, and drastically shortens the amount of time that either lock needs to be held. It does this by adding a new queue (in the Retry2 class).
Note that we have left the original BulkProcessor in place, and have cloned/modified it into BulkProcessor2. BulkProcessor2 for now is only used by ILMHistoryStore but will likely replace BulkProcessor altogether in the near future.

The flow in the original BulkProcessor is like this:

  • ILMHistoryStore adds IndexRequests to BulkProcessor asynchronously.
  • BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
  • If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls BulkRequestHandler to load that BulkRequest to the server.
  • BulkRequestHandler must acquire a token from its semaphore to do this.
  • It calls Client::bulk from the current thread.
  • If it fails, it keeps the semaphore lock while it retries (possibly multiple times).

The flow in the new BulkProcessor2:

  • ILMHistoryStore adds IndexRequests to BulkProcessor synchronously (since this part is very fast now).
  • BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
  • If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls Retry2::withBackoff to attempt to load the bulk a fixed number of times.
  • If the number of bytes already in flight to Elasticsearch is higher than a configured number, or if Elasticsearch is too busy, the listener is notified with an EsRejectedExecutionException.
  • Either way, control returns immediately and the BulkProcessor lock is released.

We are no longer using a semaphore to throttle how many concurrent requests can be sent to Elasticsearch at once. And there is no longer any blocking. Instead we throttle the total number of bytes in flight to Elasticsearch (approximate), and allow Elasticsearch to throw an EsRejectedExecutionException if it thinks there are too many concurrent requests.

Closes #50440
Closes #68468

@masseyke masseyke added the :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP label Nov 1, 2022
@masseyke
Copy link
Member Author

masseyke commented Nov 2, 2022

@elasticmachine update branch

@masseyke
Copy link
Member Author

masseyke commented Nov 3, 2022

@elasticmachine update branch

@masseyke masseyke changed the title Bulk processor deadlock Avoiding BulkProcessor deadlock in ILMHistoryStore Nov 3, 2022
@masseyke masseyke removed the :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP label Nov 4, 2022
@masseyke masseyke requested a review from joegallo January 4, 2023 15:15
@masseyke masseyke added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Jan 5, 2023
@masseyke
Copy link
Member Author

masseyke commented Jan 5, 2023

@elasticmachine update branch

@elasticsearchmachine elasticsearchmachine merged commit 0e5844f into elastic:main Jan 9, 2023
@masseyke masseyke deleted the BulkProcessor-deadlock branch January 9, 2023 15:58
masseyke added a commit that referenced this pull request Mar 2, 2023
In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore.
This commit ports watcher over to the new BulkProcessor2 implementation. The only real change
is that watcher history documents are now indexed asynchronously instead of in a blocking way,
meaning that tests had to change to account for this.
elasticsearchmachine pushed a commit that referenced this pull request Mar 2, 2023
* backporting 91238 and 86184

* increasing test timeouts (#92771)

BulkProcessor2IT can occasionally fail with timeouts like this:

```
java.util.concurrent.TimeoutException: (No message provided)

  at __randomizedtesting.SeedInfo.seed([164F04355E8E8724:9D44A00946BBB3F3]:0)
  at java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:795)
  at org.elasticsearch.action.bulk.Retry2.awaitClose(Retry2.java:129)
  at org.elasticsearch.action.bulk.BulkProcessor2.awaitClose(BulkProcessor2.java:254)
  at org.elasticsearch.action.bulk.BulkProcessor2IT.testBulkProcessor2ConcurrentRequestsReadOnlyIndex(BulkProcessor2IT.java:197)
  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
  at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:568)
  at com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1758)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:946)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:982)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:996)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:44)
  at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
  at org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
  at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
  at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
  at com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:843)
  at com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:490)
  at com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:955)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:840)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:891)
  at com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:902)
  at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
  at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
  at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
  at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
  at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
  at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
  at org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
  at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
  at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:390)
  at com.carrotsearch.randomizedtesting.ThreadLeakControl.lambda$forkTimeoutingTask$0(ThreadLeakControl.java:850)
  at java.lang.Thread.run(Thread.java:833)
```

It looks like we're just cutting it a little too closely using a
1-second timeout to wait for all requests to complete. This PR bumps
that timeout to 5 seconds. In the previous version of this test
(BulkProcessorIT) the code did not actually wait for all requests to
complete, which explains why this behavior is new. Closes #92770

* fixing build problems

* reverting accidental change

* fixing build problems

* fixing a unit test

* fixing tests

* fixing tests

* Not propagating TimeoutException from Retry2::awaitClose (#92773)

Logging a message rather than propagating a TimeoutException from Retry2::awaitClose

---------

Co-authored-by: Joe Gallo <[email protected]>
elasticsearchmachine pushed a commit that referenced this pull request Mar 3, 2023
…BulkProcessor (#94172)

In #91238 we rewrote
BulkProcessor to avoid deadlock that had been seen in the
IlmHistoryStore. At some point we will remove BulkProcessor altogether.
This PR ports a couple of integration tests that were using BulkProcesor
over to BulkProcessor2.
masseyke added a commit that referenced this pull request Mar 6, 2023
In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore.
This commit ports deprecation logging over to the new BulkProcessor2 implementation.
masseyke added a commit that referenced this pull request Mar 22, 2023
In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the
IlmHistoryStore. This PR ports TSDB downsampling over to the new
BulkProcessor2 implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) >bug :Data Management/ILM+SLM Index and Snapshot lifecycle management Team:Data Management Meta label for data/management team v8.7.0
Projects
None yet
6 participants