Skip to content

Commit

Permalink
Replace some DocumentMapper usages with MappingLookup (#72400)
Browse files Browse the repository at this point in the history
We recently replaced some usages of DocumentMapper with MappingLookup in the search layer, as document mapper is mutable which can cause issues. In order to do that, MappingLookup grew and became quite similar to DocumentMapper in what it does and holds.

In many cases it makes sense to use MappingLookup instead of DocumentMapper, and we may even be able to remove DocumentMapper entirely in favour of MappingLookup in the long run.

This commit replaces some of its straight-forward usages.
  • Loading branch information
javanna authored May 3, 2021
1 parent ae04192 commit 0761875
Show file tree
Hide file tree
Showing 20 changed files with 150 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.query.AbstractQueryBuilder;
Expand Down Expand Up @@ -657,12 +658,14 @@ private static Response prepareRamIndex(Request request,

try (Directory directory = new ByteBuffersDirectory()) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig(defaultAnalyzer))) {
MappingLookup mappingLookup = indexService.mapperService().mappingLookup();
String index = indexService.index().getName();
String type = indexService.mapperService().documentMapper().type();
BytesReference document = request.contextSetup.document;
XContentType xContentType = request.contextSetup.xContentType;
SourceToParse sourceToParse = new SourceToParse(index, type, "_id", document, xContentType);
ParsedDocument parsedDocument = indexService.mapperService().documentMapper().parse(sourceToParse);
SourceToParse sourceToParse = new SourceToParse(index, mappingLookup.getType(), "_id", document, xContentType);
//TODO this throws NPE when called against an empty index with no provided mappings: DocumentMapper is null
// and the corresponding empty MappingLookup does not have a DocumentParser set
ParsedDocument parsedDocument = mappingLookup.parseDocument(sourceToParse);
indexWriter.addDocuments(parsedDocument.docs());
try (IndexReader indexReader = DirectoryReader.open(indexWriter)) {
final IndexSearcher searcher = new IndexSearcher(indexReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -84,15 +83,15 @@ protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexReq
Predicate<String> metadataFieldPredicate = (f) -> indexService.mapperService().isMetadataField(f);
Predicate<String> fieldPredicate = metadataFieldPredicate.or(indicesService.getFieldFilter().apply(shardId.getIndexName()));

DocumentMapper mapper = indexService.mapperService().documentMapper();
MappingLookup mappingLookup = indexService.mapperService().mappingLookup();
Collection<String> typeIntersection;
if (request.types().length == 0) {
typeIntersection = mapper == null
? Collections.emptySet()
: Collections.singleton(mapper.type());
typeIntersection = mappingLookup.hasMappings()
? Collections.singleton(mappingLookup.getType())
: Collections.emptySet();
} else {
typeIntersection = mapper != null && Regex.simpleMatch(request.types(), mapper.type())
? Collections.singleton(mapper.type())
typeIntersection = mappingLookup.hasMappings() && Regex.simpleMatch(request.types(), mappingLookup.getType())
? Collections.singleton(mappingLookup.getType())
: Collections.emptySet();
if (typeIntersection.isEmpty()) {
throw new TypeMissingException(shardId.getIndex(), request.types());
Expand All @@ -101,8 +100,8 @@ protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexReq

Map<String, Map<String, FieldMappingMetadata>> typeMappings = new HashMap<>();
for (String type : typeIntersection) {
DocumentMapper documentMapper = indexService.mapperService().documentMapper(type);
Map<String, FieldMappingMetadata> fieldMapping = findFieldMappingsByType(fieldPredicate, documentMapper, request);
MappingLookup mappers = indexService.mapperService().documentMapper(type).mappers();
Map<String, FieldMappingMetadata> fieldMapping = findFieldMappingsByType(fieldPredicate, mappers, request);
if (fieldMapping.isEmpty() == false) {
typeMappings.put(type, fieldMapping);
}
Expand Down Expand Up @@ -158,12 +157,11 @@ public Boolean paramAsBoolean(String key, Boolean defaultValue) {
};

private static Map<String, FieldMappingMetadata> findFieldMappingsByType(Predicate<String> fieldPredicate,
DocumentMapper documentMapper,
GetFieldMappingsIndexRequest request) {
MappingLookup mappingLookup,
GetFieldMappingsIndexRequest request) {
//TODO the logic here needs to be reworked to also include runtime fields. Though matching is against mappers rather
// than field types, and runtime fields are mixed with ordinary fields in FieldTypeLookup
Map<String, FieldMappingMetadata> fieldMappings = new HashMap<>();
final MappingLookup mappingLookup = documentMapper.mappers();
for (String field : request.fields()) {
if (Regex.isMatchAllPattern(field)) {
for (Mapper fieldMapper : mappingLookup.fieldMappers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -244,13 +245,12 @@ public static ComposableIndexTemplate lookupTemplateForDataStream(String dataStr
return composableIndexTemplate;
}

public static void validateTimestampFieldMapping(String timestampFieldName, MapperService mapperService) throws IOException {
MetadataFieldMapper fieldMapper =
(MetadataFieldMapper) mapperService.documentMapper().mappers().getMapper("_data_stream_timestamp");
public static void validateTimestampFieldMapping(MappingLookup mappingLookup) throws IOException {
MetadataFieldMapper fieldMapper = (MetadataFieldMapper) mappingLookup.getMapper("_data_stream_timestamp");
assert fieldMapper != null : "[_data_stream_timestamp] meta field mapper must exist";

Map<String, Object> parsedTemplateMapping =
MapperService.parseMapping(NamedXContentRegistry.EMPTY, mapperService.documentMapper().mappingSource().string());
MapperService.parseMapping(NamedXContentRegistry.EMPTY, mappingLookup.getMapping().toCompressedXContent().string());
Boolean enabled = ObjectPath.eval("_doc._data_stream_timestamp.enabled", parsedTemplateMapping);
// Sanity check: if this fails then somehow the mapping for _data_stream_timestamp has been overwritten and
// that would be a bug.
Expand All @@ -259,7 +259,7 @@ public static void validateTimestampFieldMapping(String timestampFieldName, Mapp
}

// Sanity check (this validation logic should already have been executed when merging mappings):
fieldMapper.validate(mapperService.documentMapper().mappers());
fieldMapper.validate(mappingLookup);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ private static void updateIndexMappingsAndBuildSortOrder(IndexService indexServi
indexService.getIndexSortSupplier().get();
}
if (request.dataStreamName() != null) {
validateTimestampFieldMapping("@timestamp", mapperService);
validateTimestampFieldMapping(mapperService.mappingLookup());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,7 @@ private static void validateCompositeTemplate(final ClusterState state,
}

if (template.getDataStreamTemplate() != null) {
String tsFieldName = template.getDataStreamTemplate().getTimestampField();
validateTimestampFieldMapping(tsFieldName, mapperService);
validateTimestampFieldMapping(mapperService.mappingLookup());
}
} catch (Exception e) {
throw new IllegalArgumentException("invalid composite mappings for [" + templateName + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -601,7 +601,7 @@ protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) thr
}
}

public abstract GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper);
public abstract GetResult get(Get get, MappingLookup mappingLookup, Function<Engine.Searcher, Engine.Searcher> searcherWrapper);

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -653,10 +653,10 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}
}

private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper mapper,
private GetResult getFromTranslog(Get get, Translog.Index index, MappingLookup mappingLookup,
Function<Searcher, Searcher> searcherWrapper) throws IOException {
assert get.isReadFromTranslog();
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mapper, config().getAnalyzer());
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mappingLookup, config().getAnalyzer());
final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader);
final Searcher wrappedSearcher = searcherWrapper.apply(searcher);
Expand All @@ -675,7 +675,7 @@ private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper
}

@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
public GetResult get(Get get, MappingLookup mappingLookup, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Expand Down Expand Up @@ -706,7 +706,7 @@ public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, E
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
return getFromTranslog(get, (Translog.Index) operation, mappingLookup, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -287,7 +288,8 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
} else {
leaf.reader().document(segmentDocID, fields);
}
fields.postProcess(mapperService::fieldType, mapperService.documentMapper() == null ? null : mapperService.documentMapper().type());
MappingLookup mappingLookup = mapperService.mappingLookup();
fields.postProcess(mapperService::fieldType, mappingLookup.hasMappings() ? mappingLookup.getType() : null);

final Translog.Operation op;
final boolean isTombstone = parallelArray.isTombStone[docIndex];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardLongFieldRange;
Expand Down Expand Up @@ -255,7 +255,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
}

@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Searcher, Searcher> searcherWrapper) {
public GetResult get(Get get, MappingLookup mappingLookup, Function<Searcher, Searcher> searcherWrapper) {
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -47,8 +47,8 @@
final class SingleDocDirectoryReader extends DirectoryReader {
private final SingleDocLeafReader leafReader;

SingleDocDirectoryReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) throws IOException {
this(new SingleDocLeafReader(shardId, operation, mapper, analyzer));
SingleDocDirectoryReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, Analyzer analyzer) throws IOException {
this(new SingleDocLeafReader(shardId, operation, mappingLookup, analyzer));
}

private SingleDocDirectoryReader(SingleDocLeafReader leafReader) throws IOException {
Expand Down Expand Up @@ -109,15 +109,15 @@ private static class SingleDocLeafReader extends LeafReader {

private final ShardId shardId;
private final Translog.Index operation;
private final DocumentMapper mapper;
private final MappingLookup mappingLookup;
private final Analyzer analyzer;
private final Directory directory;
private final AtomicReference<LeafReader> delegate = new AtomicReference<>();

SingleDocLeafReader(ShardId shardId, Translog.Index operation, DocumentMapper mapper, Analyzer analyzer) {
SingleDocLeafReader(ShardId shardId, Translog.Index operation, MappingLookup mappingLookup, Analyzer analyzer) {
this.shardId = shardId;
this.operation = operation;
this.mapper = mapper;
this.mappingLookup = mappingLookup;
this.analyzer = analyzer;
this.directory = new ByteBuffersDirectory();
}
Expand All @@ -140,7 +140,7 @@ private LeafReader getDelegate() {

private LeafReader createInMemoryLeafReader() {
assert Thread.holdsLock(this);
final ParsedDocument parsedDocs = mapper.parse(new SourceToParse(shardId.getIndexName(), operation.type(),
final ParsedDocument parsedDocs = mappingLookup.parseDocument(new SourceToParse(shardId.getIndexName(), operation.type(),
operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing(),
Collections.emptyMap()));
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
Expand Down
Loading

0 comments on commit 0761875

Please sign in to comment.