diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 4df8efa6b2743..3f4a0b6042a4c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -483,26 +483,16 @@ private static void executeOnPrimaryWhileHandlingMappi throws IOException { T result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // try to update the mappings and try again. + // try to update the mappings and mark the context as needing to try again. try { mappingUpdater.accept(result.getRequiredMappingUpdate()); + context.markAsRequiringMappingUpdate(); } catch (Exception e) { // failure to update the mapping should translate to a failure of specific requests. Other requests // still need to be executed and replicated. onComplete.accept(exceptionToResult.apply(e)); return; } - - // TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now - result = toExecute.get(); - - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // double mapping update. We assume that the successful mapping update wasn't yet processed on the node - // and retry the entire request again. - context.markAsRequiringMappingUpdate(); - } else { - onComplete.accept(result); - } } else { onComplete.accept(result); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 37e82884c5133..aa4a4b070e40c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -285,8 +285,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); - // Verify that the shard "executed" the operation twice - verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + // Verify that the shard "executed" the operation once + verify(shard, times(1)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(success); @@ -295,9 +295,9 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); - // Verify that the shard "executed" the operation only once (2 for previous invocations plus + // Verify that the shard "executed" the operation only once (1 for previous invocations plus // 1 for this execution) - verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();