Skip to content

Commit

Permalink
Remove immediate operation retry after mapping update (#38873)
Browse files Browse the repository at this point in the history
Prior to this commit, when an indexing operation resulted in an
`Engine.Result.Type.MAPPING_UPDATE_REQUIRED`, TransportShardBulkAction
immediately retries the indexing operation to see if it succeeds. In the event
that it succeeds the context does not wait until the mapping update has
propagated through the cluster state before finishing the indexing.

In some of our tests we rely on mappings being available as soon as they've been
introduced in a document that indexed correctly. By removing the immediate retry
we always wait for this to be the case.

Resolves #38428
Supercedes #38579
Relates to #38711
  • Loading branch information
dakrone committed Feb 14, 2019
1 parent d9c255d commit 0c733c0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,26 +483,16 @@ private static <T extends Engine.Result> void executeOnPrimaryWhileHandlingMappi
throws IOException {
T result = toExecute.get();
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// try to update the mappings and try again.
// try to update the mappings and mark the context as needing to try again.
try {
mappingUpdater.accept(result.getRequiredMappingUpdate());
context.markAsRequiringMappingUpdate();
} catch (Exception e) {
// failure to update the mapping should translate to a failure of specific requests. Other requests
// still need to be executed and replicated.
onComplete.accept(exceptionToResult.apply(e));
return;
}

// TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now
result = toExecute.get();

if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
// double mapping update. We assume that the successful mapping update wasn't yet processed on the node
// and retry the entire request again.
context.markAsRequiringMappingUpdate();
} else {
onComplete.accept(result);
}
} else {
onComplete.accept(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {

assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));

// Verify that the shard "executed" the operation twice
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
// Verify that the shard "executed" the operation once
verify(shard, times(1)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());

when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(success);
Expand All @@ -295,9 +295,9 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
(update, shardId, type) -> fail("should not have had to update the mappings"), () -> {});


// Verify that the shard "executed" the operation only once (2 for previous invocations plus
// Verify that the shard "executed" the operation only once (1 for previous invocations plus
// 1 for this execution)
verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());


BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
Expand Down

0 comments on commit 0c733c0

Please sign in to comment.