Skip to content

Commit

Permalink
Added multiple items in UT with different retry counts and item level…
Browse files Browse the repository at this point in the history
… retry assertion

Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj committed Nov 22, 2023
1 parent eb5def5 commit 49b60f7
Showing 1 changed file with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,15 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -950,9 +953,16 @@ public void testRetries() throws Exception {

public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);
UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
writeRequest.retryOnConflict(3);
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);

int nItems = randomIntBetween(2, 5);
List<BulkItemRequest> items = new ArrayList<>(nItems);
for (int i = 0; i < nItems; i++) {
int retryOnConflictCount = randomIntBetween(0, 3);
logger.debug("Setting retryCount for item {}: {}", i, retryOnConflictCount);
UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")
.retryOnConflict(retryOnConflictCount);
items.add(new BulkItemRequest(i, updateRequest));
}

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Expand All @@ -977,8 +987,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
)
);

BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new));

final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
Expand All @@ -988,26 +997,34 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(
result -> assertEquals(
VersionConflictEngineException.class,
result.replicaRequest().items()[0].getPrimaryResponse().getFailure().getCause().getClass()
)
),
latch
),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertEquals(nItems, result.replicaRequest().items().length);
for (BulkItemRequest item : result.replicaRequest().items()) {
assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass());
}
}), latch),
threadPool,
Names.WRITE
);

// execute the runnable on a separate thread so that the infinite loop can be detected
Thread thread = new Thread(runnable);
thread.start();
new Thread(runnable).start();

// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(items[0].getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);

items.forEach(item -> {
assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);

// this assertion is based on the assumption that all bulk item requests are updates and are hence calling
// UpdateRequest::prepareRequest
UpdateRequest updateRequest = (UpdateRequest) item.request();
verify(updateHelper, times(updateRequest.retryOnConflict() + 1)).prepare(
eq(updateRequest),
any(IndexShard.class),
any(LongSupplier.class)
);
});
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
Expand Down

0 comments on commit 49b60f7

Please sign in to comment.