Skip to content

Commit

Permalink
Bugfix for update staying stuck when sent as part of bulk with retry_…
Browse files Browse the repository at this point in the history
…on_conflict specified

Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj committed Nov 10, 2023
1 parent 0a9dfec commit 1c12252
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- GHA to verify checklist items completion in PR descriptions ([#10800](https://github.com/opensearch-project/OpenSearch/pull/10800))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Bugfix for update staying stuck when sent as part of bulk with `retry_on_conflict` specified ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -147,4 +148,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public void resetForExecutionForRetry() {
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
retryCounter++;
assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -947,6 +948,68 @@ public void testRetries() throws Exception {
latch.await();
}

public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);
UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
writeRequest.retryOnConflict(3);
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);

IndexShard shard = mock(IndexShard.class);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(
ir -> conflictedResult
);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
updateResponse,
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(
result -> assertEquals(
VersionConflictEngineException.class,
result.replicaRequest().items()[0].getPrimaryResponse().getFailure().getCause().getClass()
)
),
latch
),
threadPool,
Names.WRITE
);

// execute the runnable on a separate thread so that the infinite loop can be detected
Thread thread = new Thread(runnable);
thread.start();

// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(items[0].getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Expand Down

0 comments on commit 1c12252

Please sign in to comment.