Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoiding BulkProcessor deadlock in ILMHistoryStore #91238

Merged
merged 64 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
dc63507
initial work -- queue of BulkRequests
masseyke Oct 31, 2022
d8085c3
cleanup
masseyke Oct 31, 2022
e08ea9d
Adding a separate queue for retries and adding a test
masseyke Nov 1, 2022
9aab67b
cleanup
masseyke Nov 1, 2022
945e7e1
Adding tests
masseyke Nov 1, 2022
2e08203
removing dead code
masseyke Nov 1, 2022
4093b50
cleanup
masseyke Nov 1, 2022
ac75c31
Adding awaitClose implementation
masseyke Nov 1, 2022
0a12752
spotlessApply
masseyke Nov 1, 2022
94b2589
Merge branch 'main' into BulkProcessor-deadlock
elasticmachine Nov 2, 2022
4b8aa92
adding trace logging
masseyke Nov 2, 2022
eb2c151
Merge branch 'BulkProcessor-deadlock' of github.com:masseyke/elastics…
masseyke Nov 2, 2022
56b3d61
spotlessApply
masseyke Nov 2, 2022
4b5ab6c
Added loggging, fixed a unit test
masseyke Nov 2, 2022
48acd5c
improving awaitClose
masseyke Nov 2, 2022
29f3080
improved logging, smaller queues
masseyke Nov 2, 2022
4830c82
Allowing a different queue size per bulk processor
masseyke Nov 3, 2022
0217435
making queue size a setting
masseyke Nov 3, 2022
0afa2f6
adding a test for rejections
masseyke Nov 3, 2022
01a837f
Merge branch 'main' into BulkProcessor-deadlock
elasticmachine Nov 3, 2022
b84f33b
cleanup / comments
masseyke Nov 3, 2022
c024937
Merge branch 'BulkProcessor-deadlock' of github.com:masseyke/elastics…
masseyke Nov 3, 2022
3fb5d08
making poll interval configurable
masseyke Nov 4, 2022
088414b
cleanup
masseyke Nov 4, 2022
abf7771
adding integration tests
masseyke Nov 4, 2022
3717594
Update docs/changelog/91238.yaml
masseyke Nov 4, 2022
07fab67
wait up to 1s to close
masseyke Nov 4, 2022
87c87c3
Merge branch 'BulkProcessor-deadlock' of github.com:masseyke/elastics…
masseyke Nov 4, 2022
0b867d7
Removing support for BackoffPoliy
masseyke Nov 7, 2022
9750954
cleanup
masseyke Nov 7, 2022
20beaa3
Update docs/changelog/91238.yaml
masseyke Nov 9, 2022
6d63b0b
Removing queues altogether
masseyke Nov 18, 2022
022cf8b
notifying listener instead of throwing
masseyke Nov 18, 2022
d20137e
Merge branch 'main' into BulkProcessor-deadlock
elasticmachine Nov 18, 2022
93b290a
cleanup
masseyke Nov 18, 2022
f56e6f0
Merge branch 'BulkProcessor-deadlock' of github.com:masseyke/elastics…
masseyke Nov 18, 2022
f64caf6
cleanup
masseyke Nov 18, 2022
8691294
Adding a test for awaitClose
masseyke Nov 18, 2022
d3d295d
Merge branch 'main' into BulkProcessor-deadlock
masseyke Nov 21, 2022
6ff5edf
cleanup
masseyke Nov 21, 2022
0b1abfa
Removed BulkRequestHandler2
masseyke Nov 21, 2022
fbf9ee2
fixed a compilation error
masseyke Nov 21, 2022
c8a27d1
fixed a compilation error
masseyke Nov 21, 2022
b67a357
cleanup
masseyke Nov 21, 2022
f57ac96
correcting totalBytesInFlight math
masseyke Nov 22, 2022
c1bc7b2
Only run the periodic flush if there are requests
masseyke Nov 22, 2022
c012a82
correcting flush task code
masseyke Nov 22, 2022
80add3d
Including the BulkProcessor2 in-work BulkRequest in totalBytesInFlight
masseyke Nov 22, 2022
ef921e0
Merge branch 'main' into BulkProcessor-deadlock
elasticmachine Nov 22, 2022
55c54a2
Merge branch 'main' into BulkProcessor-deadlock
masseyke Dec 2, 2022
864787f
Only tracking bytes at a bulk level (instead of individual requests)
masseyke Dec 5, 2022
3249374
Flushing request if totalyBytesInFlight is exceeded
masseyke Dec 5, 2022
8df1d7f
Merge branch 'main' into BulkProcessor-deadlock
masseyke Dec 19, 2022
29512af
fixing comments
masseyke Dec 19, 2022
e248d4c
fixing unit test
masseyke Dec 20, 2022
96d2f1a
Merge branch 'main' into BulkProcessor-deadlock
masseyke Jan 3, 2023
07f5959
updating comments
masseyke Jan 3, 2023
d343c22
renaming a variable
masseyke Jan 3, 2023
3017701
speeding up test
masseyke Jan 3, 2023
846cc6e
code review feedback
masseyke Jan 4, 2023
2fbee72
code review feedback
masseyke Jan 5, 2023
c9cde2d
Merge branch 'main' into BulkProcessor-deadlock
elasticmachine Jan 5, 2023
72baa84
fixing compilation errors after merge
masseyke Jan 6, 2023
e8c6275
fixing forbidden api use
masseyke Jan 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/91238.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 91238
summary: Avoiding `BulkProcessor` deadlock in ILMHistoryStore
area: ILM+SLM
type: bug
issues:
- 68468
- 50440
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class BulkProcessor2IT extends ESIntegTestCase {

public void testThatBulkProcessor2CountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch);

int numDocs = randomIntBetween(10, 100);
try (
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build()
) {

MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);

latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
}
}

public void testBulkProcessor2ConcurrentRequests() throws Exception {
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);

int expectedBulkActions = numDocs / bulkActions;

final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);

BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch, closeLatch);

MultiGetRequestBuilder multiGetRequestBuilder;
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
.setBulkActions(bulkActions)
// set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build();
try (processor) {

multiGetRequestBuilder = indexDocs(client(), processor, numDocs);

latch.await();

assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions));
}

closeLatch.await();

assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(numDocs));

Set<String> ids = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
}

assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
}

public void testBulkProcessor2WaitOnClose() throws Exception {
BulkProcessor2TestListener listener = new BulkProcessor2TestListener();

int numDocs = randomIntBetween(10, 100);
BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
.build();

MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
processor.awaitClose(1, TimeUnit.MINUTES);
assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}

public void testBulkProcessor2ConcurrentRequestsReadOnlyIndex() throws Exception {
createIndex("test-ro");
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("test-ro")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true))
);
ensureGreen();

int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);

int expectedBulkActions = numDocs / bulkActions;

final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);

int testDocs = 0;
int testReadOnlyDocs = 0;
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessor2TestListener listener = new BulkProcessor2TestListener(latch, closeLatch);

BulkProcessor2 processor = BulkProcessor2.builder(client()::bulk, listener, client().threadPool())
.setBulkActions(bulkActions)
// set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build();
try (processor) {

for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(
new IndexRequest("test").id(Integer.toString(testDocs)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")
);
multiGetRequestBuilder.add("test", Integer.toString(testDocs));
} else {
testReadOnlyDocs++;
processor.add(
new IndexRequest("test-ro").id(Integer.toString(testReadOnlyDocs))
.source(Requests.INDEX_CONTENT_TYPE, "field", "value")
);
}
}
}

closeLatch.await();

assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
assertThat(processor.getTotalBytesInFlight(), equalTo(0L));
Set<String> ids = new HashSet<>();
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
} else {
assertThat(bulkItemResponse.isFailed(), equalTo(true));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
// we do want to check that we don't get duplicate ids back
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
}
}

assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
}

private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor2 processor, int numDocs) throws Exception {
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
for (int i = 1; i <= numDocs; i++) {
processor.add(
new IndexRequest("test").id(Integer.toString(i))
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))
);
multiGetRequestBuilder.add("test", Integer.toString(i));
}
return multiGetRequestBuilder;
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
List<BulkItemResponse> sortedResponses = bulkItemResponses.stream()
.sorted(Comparator.comparing(o -> Integer.valueOf(o.getId())))
.toList();
for (BulkItemResponse bulkItemResponse : sortedResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(
"item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(),
equalTo(false)
);
}
}

private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) {
assertThat(multiGetResponse.getResponses().length, equalTo(numDocs));
int i = 1;
for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
}
}

private static class BulkProcessor2TestListener implements BulkProcessor2.Listener {

private final CountDownLatch[] latches;
private final AtomicInteger beforeCounts = new AtomicInteger();
private final AtomicInteger afterCounts = new AtomicInteger();
private final List<BulkItemResponse> bulkItems = new CopyOnWriteArrayList<>();
private final List<Throwable> bulkFailures = new CopyOnWriteArrayList<>();

private BulkProcessor2TestListener(CountDownLatch... latches) {
this.latches = latches;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {
beforeCounts.incrementAndGet();
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
bulkItems.addAll(Arrays.asList(response.getItems()));
afterCounts.incrementAndGet();
for (CountDownLatch latch : latches) {
latch.countDown();
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
bulkFailures.add(failure);
afterCounts.incrementAndGet();
for (CountDownLatch latch : latches) {
latch.countDown();
}
}
}
}
Loading