Skip to content

Commit

Permalink
Only throw if mapping version hasn't changed
Browse files Browse the repository at this point in the history
Also reset noop mapping update count when doing a retry for another reason
  • Loading branch information
felixbarny committed Nov 27, 2023
1 parent a0a0006 commit b3c8fda
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum ItemProcessingState {
private DocWriteRequest<?> requestToExecute;
private BulkItemResponse executionResult;
private int updateRetryCounter;
private boolean noopMappingUpdateRetry;
private long noopMappingUpdateRetryForMappingVersion;

BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
this.request = request;
Expand Down Expand Up @@ -90,7 +90,7 @@ private void advance() {
updateRetryCounter = 0;
requestToExecute = null;
executionResult = null;
noopMappingUpdateRetry = false;
noopMappingUpdateRetryForMappingVersion = -1;
assert assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down Expand Up @@ -193,24 +193,38 @@ public void resetForMappingUpdateRetry() {
resetForExecutionRetry();
}

public void resetForNoopMappingUpdateRetry() {
/**
* Don't bother the master node if the mapping update is a noop.
* This may happen if there was a concurrent mapping update that added the same field.
*
* @param mappingVersion the current mapping version. This is used to guard against infinite loops.
* @throws IllegalStateException if retried multiple times with the same mapping version, to guard against infinite loops.
*/
public void resetForNoopMappingUpdateRetry(long mappingVersion) {
assert assertInvariants(ItemProcessingState.TRANSLATED);
if (noopMappingUpdateRetry) {
if (noopMappingUpdateRetryForMappingVersion == mappingVersion) {
// seems like we're in a live lock/infinite loop here
// we've already re-tried and are about to retry again
// as no state has changed in the meantime (the mapping version is still the same),
// we can't expect another retry would yield a different result
// a possible cause:
// maybe we added more dynamic mappers in DocumentParserContext.addDynamicMapper than possible according to the field limit
// the additional fields are then ignored by the mapping merge and the process repeats
throw new IllegalStateException(
"On retry, this indexing request resulted in another noop mapping update. "
+ "Failing the indexing operation to prevent an infinite retry loop."
);
}
noopMappingUpdateRetry = true;
resetForExecutionRetry();
noopMappingUpdateRetryForMappingVersion = mappingVersion;
}

/** resets the current item state, prepare for a new execution */
private void resetForExecutionRetry() {
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
noopMappingUpdateRetryForMappingVersion = -1;
assert assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,21 +384,7 @@ static boolean executeBulkItemRequest(
.map(DocumentMapper::mappingSource);

if (mergedSource.equals(previousSource)) {
// don't bother the master node if the mapping update is a noop
// this may happen if there was a concurrent mapping update that added the same field
try {
context.resetForNoopMappingUpdateRetry();
} catch (IllegalStateException e) {
// TODO remove, this is to find out why the build fails
throw new IllegalStateException(
e.getMessage()
+ " mapping update: "
+ result.getRequiredMappingUpdate()
+ " mapping: "
+ mergedSource.orElse(null)
);
}

context.resetForNoopMappingUpdateRetry(primary.mapperService().mappingVersion());
return true;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public boolean isAutoUpdate() {
private final Supplier<MappingParserContext> mappingParserContextSupplier;

private volatile DocumentMapper mapper;
private volatile long mappingVersion;

public MapperService(
ClusterService clusterService,
Expand Down Expand Up @@ -317,6 +318,7 @@ public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexM
previousMapper = this.mapper;
assert assertRefreshIsNotNeeded(previousMapper, type, incomingMapping);
this.mapper = newDocumentMapper(incomingMapping, MergeReason.MAPPING_RECOVERY, incomingMappingSource);
this.mappingVersion = newIndexMetadata.getMappingVersion();
}
String op = previousMapper != null ? "updated" : "added";
if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) {
Expand Down Expand Up @@ -622,6 +624,10 @@ public DocumentMapper documentMapper() {
return mapper;
}

public long mappingVersion() {
return mappingVersion;
}

/**
* Returns {@code true} if the given {@code mappingSource} includes a type
* as a top-level object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,14 @@ public void testRetries() throws Exception {
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);
when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class));

DocumentMapper mergedDocMapper = mock(DocumentMapper.class);
when(mergedDocMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDocMapper);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
Expand Down

0 comments on commit b3c8fda

Please sign in to comment.