Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate document parsing logic #10802

Merged
merged 1 commit into from
Apr 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 9 additions & 140 deletions src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
Expand All @@ -46,8 +41,6 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.mapper.Mapping.SourceTransform;
Expand Down Expand Up @@ -75,12 +68,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

/**
Expand Down Expand Up @@ -172,27 +163,15 @@ public DocumentMapper build(DocumentMapperParser docMapperParser) {
}
}


private CloseableThreadLocal<ParseContext.InternalParseContext> cache = new CloseableThreadLocal<ParseContext.InternalParseContext>() {
@Override
protected ParseContext.InternalParseContext initialValue() {
return new ParseContext.InternalParseContext(index, indexSettings, docMapperParser, DocumentMapper.this, new ContentPath(0));
}
};

private final String index;

private final Settings indexSettings;

private final String type;
private final StringAndBytesText typeText;

private final DocumentMapperParser docMapperParser;

private volatile CompressedString mappingSource;

private final Mapping mapping;

private final DocumentParser documentParser;

private volatile DocumentFieldMappers fieldMappers;

private volatile ImmutableMap<String, ObjectMapper> objectMappers = ImmutableMap.of();
Expand All @@ -211,16 +190,14 @@ public DocumentMapper(String index, @Nullable Settings indexSettings, DocumentMa
RootObjectMapper rootObjectMapper,
ImmutableMap<String, Object> meta,
Map<Class<? extends RootMapper>, RootMapper> rootMappers, List<SourceTransform> sourceTransforms) {
this.index = index;
this.indexSettings = indexSettings;
this.type = rootObjectMapper.name();
this.typeText = new StringAndBytesText(this.type);
this.docMapperParser = docMapperParser;
this.mapping = new Mapping(
rootObjectMapper,
rootMappers.values().toArray(new RootMapper[rootMappers.values().size()]),
sourceTransforms.toArray(new SourceTransform[sourceTransforms.size()]),
meta);
this.documentParser = new DocumentParser(index, indexSettings, docMapperParser, this);

this.typeFilter = typeMapper().termFilter(type, null);

Expand Down Expand Up @@ -364,109 +341,13 @@ public ParsedDocument parse(String type, String id, BytesReference source) throw
}

public ParsedDocument parse(SourceToParse source) throws MapperParsingException {
return parse(source, null);
return documentParser.parseDocument(source, null);
}

// NOTE: do not use this method, it will be removed in the future once
// https://github.com/elastic/elasticsearch/issues/10736 is done (MLT api is the only user of this listener)
public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listener) throws MapperParsingException {
ParseContext.InternalParseContext context = cache.get();

if (source.type() != null && !source.type().equals(this.type)) {
throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + this.type + "]");
}
source.type(this.type);

XContentParser parser = source.parser();
try {
if (parser == null) {
parser = XContentHelper.createParser(source.source());
}
if (mapping.sourceTransforms.length > 0) {
parser = transform(parser);
}
context.reset(parser, new ParseContext.Document(), source, listener);

// will result in START_OBJECT
int countDownTokens = 0;
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new MapperParsingException("Malformed content, must start with an object");
}
boolean emptyDoc = false;
token = parser.nextToken();
if (token == XContentParser.Token.END_OBJECT) {
// empty doc, we can handle it...
emptyDoc = true;
} else if (token != XContentParser.Token.FIELD_NAME) {
throw new MapperParsingException("Malformed content, after first object, either the type field or the actual properties should exist");
}

for (RootMapper rootMapper : mapping.rootMappers) {
rootMapper.preParse(context);
}

if (!emptyDoc) {
Mapper update = mapping.root.parse(context);
if (update != null) {
context.addDynamicMappingsUpdate((RootObjectMapper) update);
}
}

for (int i = 0; i < countDownTokens; i++) {
parser.nextToken();
}

for (RootMapper rootMapper : mapping.rootMappers) {
rootMapper.postParse(context);
}
} catch (Throwable e) {
// if its already a mapper parsing exception, no need to wrap it...
if (e instanceof MapperParsingException) {
throw (MapperParsingException) e;
}

// Throw a more meaningful message if the document is empty.
if (source.source() != null && source.source().length() == 0) {
throw new MapperParsingException("failed to parse, document is empty");
}

throw new MapperParsingException("failed to parse", e);
} finally {
// only close the parser when its not provided externally
if (source.parser() == null && parser != null) {
parser.close();
}
}
// reverse the order of docs for nested docs support, parent should be last
if (context.docs().size() > 1) {
Collections.reverse(context.docs());
}
// apply doc boost
if (context.docBoost() != 1.0f) {
Set<String> encounteredFields = Sets.newHashSet();
for (ParseContext.Document doc : context.docs()) {
encounteredFields.clear();
for (IndexableField field : doc) {
if (field.fieldType().indexOptions() != IndexOptions.NONE && !field.fieldType().omitNorms()) {
if (!encounteredFields.contains(field.name())) {
((Field) field).setBoost(context.docBoost() * field.boost());
encounteredFields.add(field.name());
}
}
}
}
}

Mapper rootDynamicUpdate = context.dynamicMappingsUpdate();
Mapping update = null;
if (rootDynamicUpdate != null) {
update = mapping.mappingUpdate(rootDynamicUpdate);
}

ParsedDocument doc = new ParsedDocument(context.uid(), context.version(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(),
context.source(), update).parent(source.parent());
// reset the context to free up memory
context.reset(null, null, null, null);
return doc;
return documentParser.parseDocument(source, listener);
}

/**
Expand Down Expand Up @@ -514,19 +395,7 @@ public ObjectMapper findParentObjectMapper(ObjectMapper objectMapper) {
* @return transformed version of transformMe. This may actually be the same object as sourceAsMap
*/
public Map<String, Object> transformSourceAsMap(Map<String, Object> sourceAsMap) {
if (mapping.sourceTransforms.length == 0) {
return sourceAsMap;
}
for (SourceTransform transform : mapping.sourceTransforms) {
sourceAsMap = transform.transformSourceAsMap(sourceAsMap);
}
return sourceAsMap;
}

private XContentParser transform(XContentParser parser) throws IOException {
Map<String, Object> transformed = transformSourceAsMap(parser.mapOrderedAndClose());
XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType()).value(transformed);
return parser.contentType().xContent().createParser(builder.bytes());
return DocumentParser.transformSourceAsMap(mapping, sourceAsMap);
}

public void addFieldMappers(Collection<FieldMapper<?>> fieldMappers) {
Expand Down Expand Up @@ -638,7 +507,7 @@ public CompressedString refreshSource() throws ElasticsearchGenerationException
}

public void close() {
cache.close();
documentParser.close();
mapping.root.close();
for (RootMapper rootMapper : mapping.rootMappers) {
rootMapper.close();
Expand Down
Loading