Skip to content

Commit

Permalink
Replace some DocumentMapper usages with MappingLookup (#72400)
Browse files Browse the repository at this point in the history
We recently replaced some usages of DocumentMapper with MappingLookup in the search layer, as document mapper is mutable which can cause issues. In order to do that, MappingLookup grew and became quite similar to DocumentMapper in what it does and holds.

In many cases it makes sense to use MappingLookup instead of DocumentMapper, and we may even be able to remove DocumentMapper entirely in favour of MappingLookup in the long run.

This commit replaces some of its straight-forward usages.
  • Loading branch information
javanna authored May 3, 2021
1 parent 9c2759d commit b92b9d1
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,9 @@ private static Response prepareRamIndex(Request request,
BytesReference document = request.contextSetup.document;
XContentType xContentType = request.contextSetup.xContentType;
SourceToParse sourceToParse = new SourceToParse(index, "_id", document, xContentType);
ParsedDocument parsedDocument = indexService.mapperService().documentMapper().parse(sourceToParse);
//TODO this throws NPE when called against an empty index with no provided mappings: DocumentMapper is null
// and the corresponding empty MappingLookup does not have a DocumentParser set
ParsedDocument parsedDocument = indexService.mapperService().mappingLookup().parseDocument(sourceToParse);
indexWriter.addDocuments(parsedDocument.docs());
try (IndexReader indexReader = DirectoryReader.open(indexWriter)) {
final IndexSearcher searcher = new IndexSearcher(indexReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -84,8 +83,8 @@ protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexReq
Predicate<String> metadataFieldPredicate = (f) -> indexService.mapperService().isMetadataField(f);
Predicate<String> fieldPredicate = metadataFieldPredicate.or(indicesService.getFieldFilter().apply(shardId.getIndexName()));

DocumentMapper documentMapper = indexService.mapperService().documentMapper();
Map<String, FieldMappingMetadata> fieldMapping = findFieldMappings(fieldPredicate, documentMapper, request);
MappingLookup mappingLookup = indexService.mapperService().mappingLookup();
Map<String, FieldMappingMetadata> fieldMapping = findFieldMappings(fieldPredicate, mappingLookup, request);
return new GetFieldMappingsResponse(singletonMap(shardId.getIndexName(), fieldMapping));
}

Expand Down Expand Up @@ -137,15 +136,11 @@ public Boolean paramAsBoolean(String key, Boolean defaultValue) {
};

private static Map<String, FieldMappingMetadata> findFieldMappings(Predicate<String> fieldPredicate,
DocumentMapper documentMapper,
GetFieldMappingsIndexRequest request) {
if (documentMapper == null) {
return Collections.emptyMap();
}
MappingLookup mappingLookup,
GetFieldMappingsIndexRequest request) {
//TODO the logic here needs to be reworked to also include runtime fields. Though matching is against mappers rather
// than field types, and runtime fields are mixed with ordinary fields in FieldTypeLookup
Map<String, FieldMappingMetadata> fieldMappings = new HashMap<>();
final MappingLookup mappingLookup = documentMapper.mappers();
for (String field : request.fields()) {
if (Regex.isMatchAllPattern(field)) {
for (Mapper fieldMapper : mappingLookup.fieldMappers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -237,13 +238,12 @@ public static ComposableIndexTemplate lookupTemplateForDataStream(String dataStr
return composableIndexTemplate;
}

public static void validateTimestampFieldMapping(String timestampFieldName, MapperService mapperService) throws IOException {
MetadataFieldMapper fieldMapper =
(MetadataFieldMapper) mapperService.documentMapper().mappers().getMapper("_data_stream_timestamp");
public static void validateTimestampFieldMapping(MappingLookup mappingLookup) throws IOException {
MetadataFieldMapper fieldMapper = (MetadataFieldMapper) mappingLookup.getMapper("_data_stream_timestamp");
assert fieldMapper != null : "[_data_stream_timestamp] meta field mapper must exist";

Map<String, Object> parsedTemplateMapping =
MapperService.parseMapping(NamedXContentRegistry.EMPTY, mapperService.documentMapper().mappingSource().string());
MapperService.parseMapping(NamedXContentRegistry.EMPTY, mappingLookup.getMapping().toCompressedXContent().string());
Boolean enabled = ObjectPath.eval("_doc._data_stream_timestamp.enabled", parsedTemplateMapping);
// Sanity check: if this fails then somehow the mapping for _data_stream_timestamp has been overwritten and
// that would be a bug.
Expand All @@ -252,7 +252,7 @@ public static void validateTimestampFieldMapping(String timestampFieldName, Mapp
}

// Sanity check (this validation logic should already have been executed when merging mappings):
fieldMapper.validate(mapperService.documentMapper().mappers());
fieldMapper.validate(mappingLookup);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ private static void updateIndexMappingsAndBuildSortOrder(IndexService indexServi
indexService.getIndexSortSupplier().get();
}
if (request.dataStreamName() != null) {
validateTimestampFieldMapping("@timestamp", mapperService);
validateTimestampFieldMapping(mapperService.mappingLookup());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,7 @@ private static void validateCompositeTemplate(final ClusterState state,
}

if (template.getDataStreamTemplate() != null) {
String tsFieldName = template.getDataStreamTemplate().getTimestampField();
validateTimestampFieldMapping(tsFieldName, mapperService);
validateTimestampFieldMapping(mapperService.mappingLookup());
}
} catch (Exception e) {
throw new IllegalArgumentException("invalid composite mappings for [" + templateName + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -581,7 +581,7 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) thr
}
}

public abstract GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper);
public abstract GetResult get(Get get, MappingLookup mappingLookup, Function<Engine.Searcher, Engine.Searcher> searcherWrapper);

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -606,10 +606,10 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}
}

private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper mapper,
private GetResult getFromTranslog(Get get, Translog.Index index, MappingLookup mappingLookup,
Function<Searcher, Searcher> searcherWrapper) throws IOException {
assert get.isReadFromTranslog();
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mapper, config().getAnalyzer());
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mappingLookup, config().getAnalyzer());
final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader);
final Searcher wrappedSearcher = searcherWrapper.apply(searcher);
Expand All @@ -628,7 +628,7 @@ private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper
}

@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
public GetResult get(Get get, MappingLookup mappingLookup, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Expand Down Expand Up @@ -659,7 +659,7 @@ public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, E
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
return getFromTranslog(get, (Translog.Index) operation, mappingLookup, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardLongFieldRange;
Expand Down Expand Up @@ -246,7 +246,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
}

@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Searcher, Searcher> searcherWrapper) {
public GetResult get(Get get, MappingLookup mappingLookup, Function<Searcher, Searcher> searcherWrapper) {
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -47,8 +47,8 @@
final class SingleDocDirectoryReader extends DirectoryReader {
private final SingleDocLeafReader leafReader;

SingleDocDirectoryReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) throws IOException {
this(new SingleDocLeafReader(shardId, operation, mapper, analyzer));
SingleDocDirectoryReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, Analyzer analyzer) throws IOException {
this(new SingleDocLeafReader(shardId, operation, mappingLookup, analyzer));
}

private SingleDocDirectoryReader(SingleDocLeafReader leafReader) throws IOException {
Expand Down Expand Up @@ -109,15 +109,15 @@ private static class SingleDocLeafReader extends LeafReader {

private final ShardId shardId;
private final Translog.Index operation;
private final DocumentMapper mapper;
private final MappingLookup mappingLookup;
private final Analyzer analyzer;
private final Directory directory;
private final AtomicReference<LeafReader> delegate = new AtomicReference<>();

SingleDocLeafReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) {
SingleDocLeafReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, Analyzer analyzer) {
this.shardId = shardId;
this.operation = operation;
this.mapper = mapper;
this.mappingLookup = mappingLookup;
this.analyzer = analyzer;
this.directory = new ByteBuffersDirectory();
}
Expand All @@ -140,7 +140,7 @@ private LeafReader getDelegate() {

private LeafReader createInMemoryLeafReader() {
assert Thread.holdsLock(this);
final ParsedDocument parsedDocs = mapper.parse(new SourceToParse(shardId.getIndexName(), operation.id(),
final ParsedDocument parsedDocs = mappingLookup.parseDocument(new SourceToParse(shardId.getIndexName(), operation.id(),
operation.source(), XContentHelper.xContentType(operation.source()), operation.routing(), Map.of()));

parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.elasticsearch.index.engine.TranslogLeafReader;
import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
Expand Down Expand Up @@ -122,7 +122,7 @@ public GetResult get(Engine.GetResult engineGetResult, String id,
try {
long now = System.nanoTime();
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, fields);
GetResult getResult = innerGetLoadFromStoredFields(id, fields, fetchSourceContext, engineGetResult, mapperService);
GetResult getResult = innerGetLoadFromStoredFields(id, fields, fetchSourceContext, engineGetResult);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
Expand Down Expand Up @@ -169,23 +169,23 @@ private GetResult innerGet(String id, String[] gFields, boolean realtime, long v

try {
// break between having loaded it from translog (so we only have _source), and having a document to load
return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);
return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get);
} finally {
get.close();
}
}

private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields, FetchSourceContext fetchSourceContext,
Engine.GetResult get, MapperService mapperService) {
Engine.GetResult get) {
assert get.exists() : "method should only be called if document could be retrieved";

// check first if stored fields to be loaded don't contain an object field
DocumentMapper docMapper = mapperService.documentMapper();
MappingLookup mappingLookup = mapperService.mappingLookup();
if (storedFields != null) {
for (String field : storedFields) {
Mapper fieldMapper = docMapper.mappers().getMapper(field);
Mapper fieldMapper = mappingLookup.getMapper(field);
if (fieldMapper == null) {
if (docMapper.mappers().objectMappers().get(field) != null) {
if (mappingLookup.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
Expand Down Expand Up @@ -225,7 +225,7 @@ private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields,
assert source != null : "original source in translog must exist";
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),
fieldVisitor.routing(), Map.of());
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
ParsedDocument doc = indexShard.mapperService().mappingLookup().parseDocument(sourceToParse);
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
// update special fields
doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public int compare(Mapper o1, Mapper o2) {

}

CompressedXContent toCompressedXContent() {
/**
* Outputs this mapping instance and returns it in {@link CompressedXContent} format
* @return the {@link CompressedXContent} representation of this mapping instance
*/
public CompressedXContent toCompressedXContent() {
try {
return new CompressedXContent(this, XContentType.JSON, ToXContent.EMPTY_PARAMS);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,11 @@ public CacheKey cacheKey() {
return cacheKey;
}

Mapping getMapping() {
/**
* Returns the mapping source that this lookup originated from
* @return the mapping source
*/
public Mapping getMapping() {
return mapping;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -973,11 +973,11 @@ private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws I

public Engine.GetResult get(Engine.Get get) {
readAllowed();
DocumentMapper mapper = mapperService.documentMapper();
if (mapper == null) {
MappingLookup mappingLookup = mapperService.mappingLookup();
if (mappingLookup.hasMappings() == false) {
return GetResult.NOT_EXISTS;
}
return getEngine().get(get, mapper, this::wrapSearcher);
return getEngine().get(get, mappingLookup, this::wrapSearcher);
}

/**
Expand Down
Loading

0 comments on commit b92b9d1

Please sign in to comment.