Skip to content

Commit

Permalink
Replace map-based source filtering with bytes streamed based filtering
Browse files Browse the repository at this point in the history
It basically replace the usage of XContentMapValues.filter() by a new
method XContentHelper.filter() that works using Jackson's streaming
filter feature.
  • Loading branch information
tlrx committed Jul 3, 2017
1 parent 0e2cfc6 commit 3362a50
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -349,15 +347,9 @@ public static GetResult extractGetResult(final UpdateRequest request, String con
BytesReference sourceFilteredAsBytes = sourceAsBytes;
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
sourceRequested = true;
if (request.fetchSource().includes().length > 0 || request.fetchSource().excludes().length > 0) {
Object value = sourceLookup.filter(request.fetchSource());
if (request.fetchSource().isFiltered()) {
try {
final int initialCapacity = Math.min(1024, sourceAsBytes.length());
BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
try (XContentBuilder builder = new XContentBuilder(sourceContentType.xContent(), streamOutput)) {
builder.value(value);
sourceFilteredAsBytes = builder.bytes();
}
sourceFilteredAsBytes = sourceLookup.filter(request.fetchSource());
} catch (IOException e) {
throw new ElasticsearchException("Error filtering source", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.ToXContent.Params;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -470,4 +475,49 @@ public static BytesReference toXContent(ToXContent toXContent, XContentType xCon
return builder.bytes();
}
}

public static BytesReference filter(BytesReference bytes, XContentType xContentType, String[] includes, String[] excludes) throws
IOException {
if (bytes == null || bytes.length() == 0) {
return bytes;
}

InputStream inputStream;
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
inputStream = compressor.streamInput(bytes.streamInput());
} else {
inputStream = bytes.streamInput();
}

final XContent xContent;
if (xContentType != null) {
xContent = xContentType.xContent();
} else {
if (inputStream.markSupported() == false) {
inputStream = new BufferedInputStream(inputStream);
}
@SuppressWarnings("deprecated") // When filtering the source of a document we don't always know the XContentType
XContentType bytesContentType = XContentFactory.xContentType(inputStream);
if (bytesContentType == null) {
throw new IllegalArgumentException("Unable to detect xcontent type of the reference to bytes");
}
xContent = bytesContentType.xContent();
}

final Set<String> sourceIncludes = (includes != null) ? Arrays.stream(includes).collect(toSet()) : emptySet();
final Set<String> sourceExcludes = (excludes != null) ? Arrays.stream(excludes).collect(toSet()) : emptySet();

try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, inputStream)) {
final int capacity = Math.min(1024, bytes.length());
try (XContentBuilder builder = new XContentBuilder(xContent, new BytesStreamOutput(capacity), sourceIncludes, sourceExcludes)) {
//We can't use builder.copyCurrentStructure(parser) here because it would filter out the root object.
if (parser.currentToken() == null) {
parser.nextToken();
}
XContentHelper.copyCurrentStructure(builder.generator(), parser);
return builder.bytes();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static Map<String, Object> filter(Map<String, ?> map, String[] includes,
* Returns a function that filters a document map based on the given include and exclude rules.
* @see #filter(Map, String[], String[]) for details
*/
public static Function<Map<String, ?>, Map<String, Object>> filter(String[] includes, String[] excludes) {
private static Function<Map<String, ?>, Map<String, Object>> filter(String[] includes, String[] excludes) {
CharacterRunAutomaton matchAllAutomaton = new CharacterRunAutomaton(Automata.makeAnyString());

CharacterRunAutomaton include;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -218,16 +214,9 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]

if (!fetchSourceContext.fetchSource()) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
Map<String, Object> sourceAsMap;
XContentType sourceContentType = null;
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
} else if (fetchSourceContext.isFiltered()) {
try {
source = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes();
source = XContentHelper.filter(source, null, fetchSourceContext.includes(), fetchSourceContext.excludes());
} catch (IOException e) {
throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;

Expand All @@ -45,14 +40,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class SourceFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_source";

public static final String CONTENT_TYPE = "_source";
private final Function<Map<String, ?>, Map<String, Object>> filter;
private final boolean filtered;

public static class Defaults {
public static final String NAME = SourceFieldMapper.NAME;
Expand Down Expand Up @@ -189,7 +183,7 @@ private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes,
this.includes = includes;
this.excludes = excludes;
final boolean filtered = (includes != null && includes.length > 0) || (excludes != null && excludes.length > 0);
this.filter = enabled && filtered && fieldType().stored() ? XContentMapValues.filter(includes, excludes) : null;
this.filtered = enabled && filtered && fieldType().stored();
this.complete = enabled && includes == null && excludes == null;
}

Expand Down Expand Up @@ -239,17 +233,8 @@ protected void parseCreateField(ParseContext context, List<IndexableField> field
return;
}

if (filter != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<XContentType, Map<String, Object>> mapTuple =
XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType());
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
XContentType contentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource);
builder.close();

source = bStream.bytes();
if (filtered) {
source = XContentHelper.filter(source, context.sourceToParse().getXContentType(), includes, excludes);
}
BytesRef ref = source.toBytesRef();
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,26 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Context used to fetch the {@code _source}.
*/
public class FetchSourceContext implements Writeable, ToXContent {

public static final ParseField INCLUDES_FIELD = new ParseField("includes", "include");
public static final ParseField EXCLUDES_FIELD = new ParseField("excludes", "exclude");
private static final ParseField INCLUDES_FIELD = new ParseField("includes", "include");
private static final ParseField EXCLUDES_FIELD = new ParseField("excludes", "exclude");

public static final FetchSourceContext FETCH_SOURCE = new FetchSourceContext(true);
public static final FetchSourceContext DO_NOT_FETCH_SOURCE = new FetchSourceContext(false);
private final boolean fetchSource;
private final String[] includes;
private final String[] excludes;
private Function<Map<String, ?>, Map<String, Object>> filter;

public FetchSourceContext(boolean fetchSource, String[] includes, String[] excludes) {
this.fetchSource = fetchSource;
Expand Down Expand Up @@ -89,6 +85,10 @@ public String[] excludes() {
return this.excludes;
}

public boolean isFiltered() {
return (includes != null && includes.length >0) || (excludes != null && excludes.length >0);
}

public static FetchSourceContext parseFromRestRequest(RestRequest request) {
Boolean fetchSource = null;
String[] source_excludes = null;
Expand Down Expand Up @@ -224,15 +224,4 @@ public int hashCode() {
result = 31 * result + (excludes != null ? Arrays.hashCode(excludes) : 0);
return result;
}

/**
* Returns a filter function that expects the source map as an input and returns
* the filtered map.
*/
public Function<Map<String, ?>, Map<String, Object>> getFilter() {
if (filter == null) {
filter = XContentMapValues.filter(includes, excludes);
}
return filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.search.fetch.subphase;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
Expand All @@ -35,29 +33,23 @@ public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.sourceRequested() == false) {
return;
}
SourceLookup source = context.lookup().source();
SourceLookup sourceLookup = context.lookup().source();
FetchSourceContext fetchSourceContext = context.fetchSourceContext();
assert fetchSourceContext.fetchSource();
if (fetchSourceContext.includes().length == 0 && fetchSourceContext.excludes().length == 0) {
hitContext.hit().sourceRef(source.internalSourceRef());
if (fetchSourceContext.isFiltered() == false) {
hitContext.hit().sourceRef(sourceLookup.internalSourceRef());
return;
}

if (source.internalSourceRef() == null) {
if (sourceLookup.internalSourceRef() == null) {
throw new IllegalArgumentException("unable to fetch fields from _source field: _source is disabled in the mappings " +
"for index [" + context.indexShard().shardId().getIndexName() + "]");
}

final Object value = source.filter(fetchSourceContext);
try {
final int initialCapacity = Math.min(1024, source.internalSourceRef().length());
BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
XContentBuilder builder = new XContentBuilder(source.sourceContentType().xContent(), streamOutput);
builder.value(value);
hitContext.hit().sourceRef(builder.bytes());
hitContext.hit().sourceRef(sourceLookup.filter(fetchSourceContext));
} catch (IOException e) {
throw new ElasticsearchException("Error filtering source", e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,8 +129,8 @@ public List<Object> extractRawValues(String path) {
return XContentMapValues.extractRawValues(path, loadSourceIfNeeded());
}

public Object filter(FetchSourceContext context) {
return context.getFilter().apply(loadSourceIfNeeded());
public BytesReference filter(FetchSourceContext context) throws IOException {
return XContentHelper.filter(sourceAsBytes, sourceContentType, context.includes(), context.excludes());
}

public Object extractValue(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public void testFetchSourceParsing() throws Exception {
assertThat(request.fetchSource().includes().length, equalTo(0));
assertThat(request.fetchSource().excludes().length, equalTo(0));
assertThat(request.fetchSource().fetchSource(), equalTo(true));
assertThat(request.fetchSource().isFiltered(), equalTo(false));

request.fromXContent(createParser(XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -310,6 +311,7 @@ public void testFetchSourceParsing() throws Exception {
assertThat(request.fetchSource().includes().length, equalTo(0));
assertThat(request.fetchSource().excludes().length, equalTo(0));
assertThat(request.fetchSource().fetchSource(), equalTo(false));
assertThat(request.fetchSource().isFiltered(), equalTo(false));

request.fromXContent(createParser(XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -320,6 +322,7 @@ public void testFetchSourceParsing() throws Exception {
assertThat(request.fetchSource().includes().length, equalTo(1));
assertThat(request.fetchSource().excludes().length, equalTo(0));
assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*"));
assertThat(request.fetchSource().isFiltered(), equalTo(true));

request.fromXContent(createParser(XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -334,6 +337,7 @@ public void testFetchSourceParsing() throws Exception {
assertThat(request.fetchSource().excludes().length, equalTo(1));
assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*"));
assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*"));
assertThat(request.fetchSource().isFiltered(), equalTo(true));
}

public void testNowInScript() throws IOException {
Expand Down
Loading

0 comments on commit 3362a50

Please sign in to comment.