Skip to content

Commit

Permalink
Atomic mapping updates across types (elastic#22220)
Browse files Browse the repository at this point in the history
This commit makes mapping updates atomic when multiple types in an index are updated. Mappings for an index are now applied in a single atomic operation, which also allows to optimize some of the cross-type updates and checks.
  • Loading branch information
ywelsch authored Dec 19, 2016
1 parent 1cabf66 commit 63af03a
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -356,10 +357,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
// now add the mappings
MapperService mapperService = indexService.mapperService();
try {
mapperService.merge(mappings, request.updateAllTypes());
} catch (MapperParsingException mpe) {
mapperService.merge(mappings, MergeReason.MAPPING_UPDATE, request.updateAllTypes());
} catch (Exception e) {
removalExtraInfo = "failed on parsing default mapping/mappings on index creation";
throw mpe;
throw e;
}

// the context is only used for validation so it's fine to pass fake values for the shard id and the current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,11 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList(), shardId -> {});
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
}
for (ObjectCursor<MappingMetaData> cursor : index.getMappings().values()) {
MappingMetaData mappingMetaData = cursor.value;
indexService.mapperService().merge(mappingMetaData.type(), mappingMetaData.source(),
MapperService.MergeReason.MAPPING_RECOVERY, false);
}
indicesToClose.add(index.getIndex());
indexService.mapperService().merge(index, MapperService.MergeReason.MAPPING_RECOVERY, false);
}
indices.put(action.getIndex(), indexService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexTemplateException;
Expand Down Expand Up @@ -222,7 +223,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
mappingsForValidation.put(entry.getKey(), MapperService.parseMapping(entry.getValue()));
}

dummyIndexService.mapperService().merge(mappingsForValidation, false);
dummyIndexService.mapperService().merge(mappingsForValidation, MergeReason.MAPPING_UPDATE, false);

} finally {
if (createdIndex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ public Set<Entry<String, NamedAnalyzer>> entrySet() {
};
try (IndexAnalyzers fakeIndexAnalzyers = new IndexAnalyzers(indexSettings, fakeDefault, fakeDefault, fakeDefault, analyzerMap)) {
MapperService mapperService = new MapperService(indexSettings, fakeIndexAnalzyers, similarityService, mapperRegistry, () -> null);
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMetaData = cursor.value;
mapperService.merge(mappingMetaData.type(), mappingMetaData.source(), MapperService.MergeReason.MAPPING_RECOVERY, false);
}
mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY, false);
}
} catch (Exception ex) {
// Wrap the inner exception so we have the index name in the exception message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;

Expand Down Expand Up @@ -146,10 +147,7 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList(), shardId -> {});
removeIndex = true;
for (ObjectCursor<MappingMetaData> metaData : indexMetaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
indexService.mapperService().merge(metaData.value.type(), metaData.value.source(), MapperService.MergeReason.MAPPING_RECOVERY, true);
}
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY, true);
}

IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
Expand Down Expand Up @@ -226,10 +224,7 @@ public BatchResult<PutMappingClusterStateUpdateRequest> execute(ClusterState cur
MapperService mapperService = indicesService.createIndexMapperService(indexMetaData);
indexMapperServices.put(index, mapperService);
// add mappings for all types, we need them for cross-type validation
for (ObjectCursor<MappingMetaData> mapping : indexMetaData.getMappings().values()) {
mapperService.merge(mapping.value.type(), mapping.value.source(),
MapperService.MergeReason.MAPPING_RECOVERY, request.updateAllTypes());
}
mapperService.merge(indexMetaData, MergeReason.MAPPING_RECOVERY, request.updateAllTypes());
}
}
currentState = applyRequest(currentState, request, indexMapperServices);
Expand Down Expand Up @@ -313,7 +308,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
if (existingMapper != null) {
existingSource = existingMapper.mappingSource();
}
DocumentMapper mergedMapper = mapperService.merge(mappingType, mappingUpdateSource, MapperService.MergeReason.MAPPING_UPDATE, request.updateAllTypes());
DocumentMapper mergedMapper = mapperService.merge(mappingType, mappingUpdateSource, MergeReason.MAPPING_UPDATE, request.updateAllTypes());
CompressedXContent updatedSource = mergedMapper.mappingSource();

if (existingSource != null) {
Expand Down
Loading

0 comments on commit 63af03a

Please sign in to comment.