From b3c8fdabdd7ca87310f1e256fa06b3a5fed5eef0 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 27 Nov 2023 11:29:25 +0100 Subject: [PATCH] Only throw if mapping version hasn't changed Also reset noop mapping update count when doing a retry for another reason --- .../bulk/BulkPrimaryExecutionContext.java | 24 +++++++++++++++---- .../action/bulk/TransportShardBulkAction.java | 16 +------------ .../index/mapper/MapperService.java | 6 +++++ .../bulk/TransportShardBulkActionTests.java | 7 +++++- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 57b24621a3eb1..7320348a5c8f7 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -61,7 +61,7 @@ enum ItemProcessingState { private DocWriteRequest requestToExecute; private BulkItemResponse executionResult; private int updateRetryCounter; - private boolean noopMappingUpdateRetry; + private long noopMappingUpdateRetryForMappingVersion; BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { this.request = request; @@ -90,7 +90,7 @@ private void advance() { updateRetryCounter = 0; requestToExecute = null; executionResult = null; - noopMappingUpdateRetry = false; + noopMappingUpdateRetryForMappingVersion = -1; assert assertInvariants(ItemProcessingState.INITIAL); } @@ -193,17 +193,30 @@ public void resetForMappingUpdateRetry() { resetForExecutionRetry(); } - public void resetForNoopMappingUpdateRetry() { + /** + * Don't bother the master node if the mapping update is a noop. + * This may happen if there was a concurrent mapping update that added the same field. + * + * @param mappingVersion the current mapping version. This is used to guard against infinite loops. + * @throws IllegalStateException if retried multiple times with the same mapping version, to guard against infinite loops. + */ + public void resetForNoopMappingUpdateRetry(long mappingVersion) { assert assertInvariants(ItemProcessingState.TRANSLATED); - if (noopMappingUpdateRetry) { + if (noopMappingUpdateRetryForMappingVersion == mappingVersion) { + // seems like we're in a live lock/infinite loop here + // we've already re-tried and are about to retry again + // as no state has changed in the meantime (the mapping version is still the same), + // we can't expect another retry would yield a different result + // a possible cause: // maybe we added more dynamic mappers in DocumentParserContext.addDynamicMapper than possible according to the field limit + // the additional fields are then ignored by the mapping merge and the process repeats throw new IllegalStateException( "On retry, this indexing request resulted in another noop mapping update. " + "Failing the indexing operation to prevent an infinite retry loop." ); } - noopMappingUpdateRetry = true; resetForExecutionRetry(); + noopMappingUpdateRetryForMappingVersion = mappingVersion; } /** resets the current item state, prepare for a new execution */ @@ -211,6 +224,7 @@ private void resetForExecutionRetry() { currentItemState = ItemProcessingState.INITIAL; requestToExecute = null; executionResult = null; + noopMappingUpdateRetryForMappingVersion = -1; assert assertInvariants(ItemProcessingState.INITIAL); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 83a3d82338afe..d7cb41f0c0c04 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -384,21 +384,7 @@ static boolean executeBulkItemRequest( .map(DocumentMapper::mappingSource); if (mergedSource.equals(previousSource)) { - // don't bother the master node if the mapping update is a noop - // this may happen if there was a concurrent mapping update that added the same field - try { - context.resetForNoopMappingUpdateRetry(); - } catch (IllegalStateException e) { - // TODO remove, this is to find out why the build fails - throw new IllegalStateException( - e.getMessage() - + " mapping update: " - + result.getRequiredMappingUpdate() - + " mapping: " - + mergedSource.orElse(null) - ); - } - + context.resetForNoopMappingUpdateRetry(primary.mapperService().mappingVersion()); return true; } } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 859e29100f4d2..f94ebb50f6fec 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -149,6 +149,7 @@ public boolean isAutoUpdate() { private final Supplier mappingParserContextSupplier; private volatile DocumentMapper mapper; + private volatile long mappingVersion; public MapperService( ClusterService clusterService, @@ -317,6 +318,7 @@ public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexM previousMapper = this.mapper; assert assertRefreshIsNotNeeded(previousMapper, type, incomingMapping); this.mapper = newDocumentMapper(incomingMapping, MergeReason.MAPPING_RECOVERY, incomingMappingSource); + this.mappingVersion = newIndexMetadata.getMappingVersion(); } String op = previousMapper != null ? "updated" : "added"; if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) { @@ -622,6 +624,10 @@ public DocumentMapper documentMapper() { return mapper; } + public long mappingVersion() { + return mappingVersion; + } + /** * Returns {@code true} if the given {@code mappingSource} includes a type * as a top-level object. diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index e86d6739db478..267dfdf141bb2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -883,9 +883,14 @@ public void testRetries() throws Exception { }); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class)); + DocumentMapper mergedDocMapper = mock(DocumentMapper.class); + when(mergedDocMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDocMapper); + UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( new UpdateHelper.Result(