From 69e5452d9f03df6c55b2b5e1f9d8195a134ec14b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Oct 2021 09:07:47 +0200 Subject: [PATCH] Short-Circuit NOOP Mapping Updates Earlier (#77574) (#78593) No need to actually run noop mapping updates or create a new document mapper if nothing has changed. --- .../PutMappingClusterStateUpdateRequest.java | 15 ++++++-- .../put/TransportPutMappingAction.java | 21 ++++++---- .../metadata/MetadataMappingService.java | 38 ++++++++++++++++--- 3 files changed, 57 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java index 61a7091518327..d39cee84d33b7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingClusterStateUpdateRequest.java @@ -9,6 +9,9 @@ package org.elasticsearch.action.admin.indices.mapping.put; import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.elasticsearch.common.compress.CompressedXContent; + +import java.io.IOException; /** * Cluster state update request that allows to put a mapping @@ -17,7 +20,7 @@ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpda private String type; - private String source; + private CompressedXContent source; public PutMappingClusterStateUpdateRequest() { @@ -32,12 +35,16 @@ public PutMappingClusterStateUpdateRequest type(String type) { return this; } - public String source() { + public PutMappingClusterStateUpdateRequest(String source) throws IOException { + this.source = new CompressedXContent(source); + } + + public CompressedXContent source() { return source; } - public PutMappingClusterStateUpdateRequest source(String source) { - this.source = source; + public PutMappingClusterStateUpdateRequest source(String source) throws IOException { + this.source = new CompressedXContent(source); return this; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index f70e4b46bdeb4..13213a2eb1598 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -125,16 +126,22 @@ static void performMappingUpdate(Index[] concreteIndices, PutMappingRequest request, ActionListener listener, MetadataMappingService metadataMappingService) { - PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() - .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .indices(concreteIndices).type(request.type()) - .source(request.source()); - - metadataMappingService.putMapping(updateRequest, listener.delegateResponse((l, e) -> { + final ActionListener wrappedListener = listener.delegateResponse((l, e) -> { logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", Arrays.asList(concreteIndices)), e); l.onFailure(e); - })); + }); + final PutMappingClusterStateUpdateRequest updateRequest; + try { + updateRequest = new PutMappingClusterStateUpdateRequest(request.source()) + .indices(concreteIndices).type(request.type()) + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()); + } catch (IOException e) { + wrappedListener.onFailure(e); + return; + } + + metadataMappingService.putMapping(updateRequest, wrappedListener); } static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[] concreteIndices, PutMappingRequest request) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java index 215b8ced51b9e..f27e9a945b71f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java @@ -34,7 +34,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -92,9 +91,9 @@ class PutMappingExecutor implements ClusterStateTaskExecutor indexMapperServices) throws IOException { + Map indexMapperServices) { + final CompressedXContent mappingUpdateSource = request.source(); String mappingType = request.type(); - CompressedXContent mappingUpdateSource = new CompressedXContent(request.source()); final Metadata metadata = currentState.metadata(); final List updateList = new ArrayList<>(); for (Index index : request.indices()) { @@ -102,12 +101,13 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt // IMPORTANT: always get the metadata from the state since it get's batched // and if we pull it from the indexService we might miss an update etc. final IndexMetadata indexMetadata = currentState.getMetadata().getIndexSafe(index); - + DocumentMapper existingMapper = mapperService.documentMapper(); + if (existingMapper != null && existingMapper.mappingSource().equals(mappingUpdateSource)) { + continue; + } // this is paranoia... just to be sure we use the exact same metadata tuple on the update that // we used for the validation, it makes this mechanism little less scary (a little) updateList.add(indexMetadata); - // try and parse it (no need to add it here) so we can bail early in case of parsing exception - DocumentMapper existingMapper = mapperService.documentMapper(); String typeForUpdate = mapperService.getTypeForUpdate(mappingType, mappingUpdateSource); if (existingMapper != null && existingMapper.type().equals(typeForUpdate) == false) { @@ -213,6 +213,32 @@ public String describeTasks(List tasks) { } public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { + final Metadata metadata = clusterService.state().metadata(); + boolean noop = true; + for (Index index : request.indices()) { + final IndexMetadata indexMetadata = metadata.index(index); + if (indexMetadata == null) { + // local store recovery sends a mapping update request during application of a cluster state on t he data node which + // might we receive here before the CS update that created the index has been applied on all nodes and thus the index + // isn't found in the state yet but will be visible to the CS update below + noop = false; + break; + } + final MappingMetadata mappingMetadata = indexMetadata.mapping(); + if (mappingMetadata == null) { + noop = false; + break; + } + if (request.source().equals(mappingMetadata.source()) == false) { + noop = false; + break; + } + } + if (noop) { + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()), request, ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),