From 23317510160d733c62f82f63063b0825a1dab1ec Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 13 Feb 2019 15:25:09 -0700 Subject: [PATCH] Remove immediate operation retry after mapping update Prior to this commit, when an indexing operation resulted in an `Engine.Result.Type.MAPPING_UPDATE_REQUIRED`, TransportShardBulkAction immediately retries the indexing operation to see if it succeeds. In the event that it succeeds the context does not wait until the mapping update has propagated through the cluster state before finishing the indexing. In some of our tests we rely on mappings being available as soon as they've been introduced in a document that indexed correctly. By removing the immediate retry we always wait for this to be the case. Resolves #38428 Supercedes #38579 Relates to #38711 --- .../action/bulk/TransportShardBulkAction.java | 14 ++------------ .../action/bulk/TransportShardBulkActionTests.java | 8 ++++---- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 4df8efa6b2743..3f4a0b6042a4c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -483,26 +483,16 @@ private static void executeOnPrimaryWhileHandlingMappi throws IOException { T result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // try to update the mappings and try again. + // try to update the mappings and mark the context as needing to try again. try { mappingUpdater.accept(result.getRequiredMappingUpdate()); + context.markAsRequiringMappingUpdate(); } catch (Exception e) { // failure to update the mapping should translate to a failure of specific requests. Other requests // still need to be executed and replicated. onComplete.accept(exceptionToResult.apply(e)); return; } - - // TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now - result = toExecute.get(); - - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // double mapping update. We assume that the successful mapping update wasn't yet processed on the node - // and retry the entire request again. - context.markAsRequiringMappingUpdate(); - } else { - onComplete.accept(result); - } } else { onComplete.accept(result); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 37e82884c5133..aa4a4b070e40c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -285,8 +285,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); - // Verify that the shard "executed" the operation twice - verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + // Verify that the shard "executed" the operation once + verify(shard, times(1)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(success); @@ -295,9 +295,9 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); - // Verify that the shard "executed" the operation only once (2 for previous invocations plus + // Verify that the shard "executed" the operation only once (1 for previous invocations plus // 1 for this execution) - verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();