Skip to content

Commit

Permalink
Short-Circuit NOOP Mapping Updates Earlier (elastic#77574) (elastic#7…
Browse files Browse the repository at this point in the history
…8593)

No need to actually run noop mapping updates or create a new document mapper
if nothing has changed.
  • Loading branch information
original-brownbear authored Oct 4, 2021
1 parent 31aa229 commit 69e5452
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.elasticsearch.action.admin.indices.mapping.put;

import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
import org.elasticsearch.common.compress.CompressedXContent;

import java.io.IOException;

/**
* Cluster state update request that allows to put a mapping
Expand All @@ -17,7 +20,7 @@ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpda

private String type;

private String source;
private CompressedXContent source;

public PutMappingClusterStateUpdateRequest() {

Expand All @@ -32,12 +35,16 @@ public PutMappingClusterStateUpdateRequest type(String type) {
return this;
}

public String source() {
public PutMappingClusterStateUpdateRequest(String source) throws IOException {
this.source = new CompressedXContent(source);
}

public CompressedXContent source() {
return source;
}

public PutMappingClusterStateUpdateRequest source(String source) {
this.source = source;
public PutMappingClusterStateUpdateRequest source(String source) throws IOException {
this.source = new CompressedXContent(source);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -125,16 +126,22 @@ static void performMappingUpdate(Index[] concreteIndices,
PutMappingRequest request,
ActionListener<AcknowledgedResponse> listener,
MetadataMappingService metadataMappingService) {
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
.source(request.source());

metadataMappingService.putMapping(updateRequest, listener.delegateResponse((l, e) -> {
final ActionListener<AcknowledgedResponse> wrappedListener = listener.delegateResponse((l, e) -> {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Arrays.asList(concreteIndices)), e);
l.onFailure(e);
}));
});
final PutMappingClusterStateUpdateRequest updateRequest;
try {
updateRequest = new PutMappingClusterStateUpdateRequest(request.source())
.indices(concreteIndices).type(request.type())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
} catch (IOException e) {
wrappedListener.onFailure(e);
return;
}

metadataMappingService.putMapping(updateRequest, wrappedListener);
}

static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[] concreteIndices, PutMappingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -92,22 +91,23 @@ class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterSt
}

private ClusterState applyRequest(ClusterState currentState, PutMappingClusterStateUpdateRequest request,
Map<Index, MapperService> indexMapperServices) throws IOException {
Map<Index, MapperService> indexMapperServices) {
final CompressedXContent mappingUpdateSource = request.source();
String mappingType = request.type();
CompressedXContent mappingUpdateSource = new CompressedXContent(request.source());
final Metadata metadata = currentState.metadata();
final List<IndexMetadata> updateList = new ArrayList<>();
for (Index index : request.indices()) {
MapperService mapperService = indexMapperServices.get(index);
// IMPORTANT: always get the metadata from the state since it get's batched
// and if we pull it from the indexService we might miss an update etc.
final IndexMetadata indexMetadata = currentState.getMetadata().getIndexSafe(index);

DocumentMapper existingMapper = mapperService.documentMapper();
if (existingMapper != null && existingMapper.mappingSource().equals(mappingUpdateSource)) {
continue;
}
// this is paranoia... just to be sure we use the exact same metadata tuple on the update that
// we used for the validation, it makes this mechanism little less scary (a little)
updateList.add(indexMetadata);
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper existingMapper = mapperService.documentMapper();

String typeForUpdate = mapperService.getTypeForUpdate(mappingType, mappingUpdateSource);
if (existingMapper != null && existingMapper.type().equals(typeForUpdate) == false) {
Expand Down Expand Up @@ -213,6 +213,32 @@ public String describeTasks(List<PutMappingClusterStateUpdateRequest> tasks) {
}

public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
final Metadata metadata = clusterService.state().metadata();
boolean noop = true;
for (Index index : request.indices()) {
final IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata == null) {
// local store recovery sends a mapping update request during application of a cluster state on t he data node which
// might we receive here before the CS update that created the index has been applied on all nodes and thus the index
// isn't found in the state yet but will be visible to the CS update below
noop = false;
break;
}
final MappingMetadata mappingMetadata = indexMetadata.mapping();
if (mappingMetadata == null) {
noop = false;
break;
}
if (request.source().equals(mappingMetadata.source()) == false) {
noop = false;
break;
}
}
if (noop) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}

clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
Expand Down

0 comments on commit 69e5452

Please sign in to comment.