From 3362a50bc8c0280a39607d661a23d4b2cdaeb5f0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 30 Jun 2017 15:42:29 +0200 Subject: [PATCH] Replace map-based source filtering with bytes streamed based filtering It basically replace the usage of XContentMapValues.filter() by a new method XContentHelper.filter() that works using Jackson's streaming filter feature. --- .../action/update/UpdateHelper.java | 12 +---- .../common/xcontent/XContentHelper.java | 50 +++++++++++++++++++ .../xcontent/support/XContentMapValues.java | 2 +- .../index/get/ShardGetService.java | 15 +----- .../index/mapper/SourceFieldMapper.java | 23 ++------- .../fetch/subphase/FetchSourceContext.java | 23 +++------ .../fetch/subphase/FetchSourceSubPhase.java | 18 ++----- .../search/lookup/SourceLookup.java | 5 +- .../action/update/UpdateRequestTests.java | 4 ++ .../xcontent/support/XContentHelperTests.java | 23 ++++++++- .../builder/SearchSourceBuilderTests.java | 2 + 11 files changed, 100 insertions(+), 77 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index fbf005415d96d..917844935f780 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -30,10 +30,8 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.document.DocumentField; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -349,15 +347,9 @@ public static GetResult extractGetResult(final UpdateRequest request, String con BytesReference sourceFilteredAsBytes = sourceAsBytes; if (request.fetchSource() != null && request.fetchSource().fetchSource()) { sourceRequested = true; - if (request.fetchSource().includes().length > 0 || request.fetchSource().excludes().length > 0) { - Object value = sourceLookup.filter(request.fetchSource()); + if (request.fetchSource().isFiltered()) { try { - final int initialCapacity = Math.min(1024, sourceAsBytes.length()); - BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity); - try (XContentBuilder builder = new XContentBuilder(sourceContentType.xContent(), streamOutput)) { - builder.value(value); - sourceFilteredAsBytes = builder.bytes(); - } + sourceFilteredAsBytes = sourceLookup.filter(request.fetchSource()); } catch (IOException e) { throw new ElasticsearchException("Error filtering source", e); } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 3d69dd38b3ce4..df0a2994306d5 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -25,17 +25,22 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.ToXContent.Params; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; @SuppressWarnings("unchecked") @@ -470,4 +475,49 @@ public static BytesReference toXContent(ToXContent toXContent, XContentType xCon return builder.bytes(); } } + + public static BytesReference filter(BytesReference bytes, XContentType xContentType, String[] includes, String[] excludes) throws + IOException { + if (bytes == null || bytes.length() == 0) { + return bytes; + } + + InputStream inputStream; + Compressor compressor = CompressorFactory.compressor(bytes); + if (compressor != null) { + inputStream = compressor.streamInput(bytes.streamInput()); + } else { + inputStream = bytes.streamInput(); + } + + final XContent xContent; + if (xContentType != null) { + xContent = xContentType.xContent(); + } else { + if (inputStream.markSupported() == false) { + inputStream = new BufferedInputStream(inputStream); + } + @SuppressWarnings("deprecated") // When filtering the source of a document we don't always know the XContentType + XContentType bytesContentType = XContentFactory.xContentType(inputStream); + if (bytesContentType == null) { + throw new IllegalArgumentException("Unable to detect xcontent type of the reference to bytes"); + } + xContent = bytesContentType.xContent(); + } + + final Set sourceIncludes = (includes != null) ? Arrays.stream(includes).collect(toSet()) : emptySet(); + final Set sourceExcludes = (excludes != null) ? Arrays.stream(excludes).collect(toSet()) : emptySet(); + + try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, inputStream)) { + final int capacity = Math.min(1024, bytes.length()); + try (XContentBuilder builder = new XContentBuilder(xContent, new BytesStreamOutput(capacity), sourceIncludes, sourceExcludes)) { + //We can't use builder.copyCurrentStructure(parser) here because it would filter out the root object. + if (parser.currentToken() == null) { + parser.nextToken(); + } + XContentHelper.copyCurrentStructure(builder.generator(), parser); + return builder.bytes(); + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java b/core/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java index 6e9b53a73615c..f28c33ed87790 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java @@ -162,7 +162,7 @@ public static Map filter(Map map, String[] includes, * Returns a function that filters a document map based on the given include and exclude rules. * @see #filter(Map, String[], String[]) for details */ - public static Function, Map> filter(String[] includes, String[] excludes) { + private static Function, Map> filter(String[] includes, String[] excludes) { CharacterRunAutomaton matchAllAutomaton = new CharacterRunAutomaton(Automata.makeAnyString()); CharacterRunAutomaton include; diff --git a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 0aeb4f3f19d58..2784afea38936 100644 --- a/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/core/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -23,16 +23,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -218,16 +214,9 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] if (!fetchSourceContext.fetchSource()) { source = null; - } else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) { - Map sourceAsMap; - XContentType sourceContentType = null; - // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care? - Tuple> typeMapTuple = XContentHelper.convertToMap(source, true); - sourceContentType = typeMapTuple.v1(); - sourceAsMap = typeMapTuple.v2(); - sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes()); + } else if (fetchSourceContext.isFiltered()) { try { - source = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes(); + source = XContentHelper.filter(source, null, fetchSourceContext.includes(), fetchSourceContext.excludes()); } catch (IOException e) { throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e); } diff --git a/core/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 5e1b6843940f0..efb84870efce2 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -27,15 +27,10 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardException; @@ -45,14 +40,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Function; public class SourceFieldMapper extends MetadataFieldMapper { public static final String NAME = "_source"; public static final String CONTENT_TYPE = "_source"; - private final Function, Map> filter; + private final boolean filtered; public static class Defaults { public static final String NAME = SourceFieldMapper.NAME; @@ -189,7 +183,7 @@ private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes, this.includes = includes; this.excludes = excludes; final boolean filtered = (includes != null && includes.length > 0) || (excludes != null && excludes.length > 0); - this.filter = enabled && filtered && fieldType().stored() ? XContentMapValues.filter(includes, excludes) : null; + this.filtered = enabled && filtered && fieldType().stored(); this.complete = enabled && includes == null && excludes == null; } @@ -239,17 +233,8 @@ protected void parseCreateField(ParseContext context, List field return; } - if (filter != null) { - // we don't update the context source if we filter, we want to keep it as is... - Tuple> mapTuple = - XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType()); - Map filteredSource = filter.apply(mapTuple.v2()); - BytesStreamOutput bStream = new BytesStreamOutput(); - XContentType contentType = mapTuple.v1(); - XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource); - builder.close(); - - source = bStream.bytes(); + if (filtered) { + source = XContentHelper.filter(source, context.sourceToParse().getXContentType(), includes, excludes); } BytesRef ref = source.toBytesRef(); fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length)); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java index c86342f690f13..1765a1587eafd 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java @@ -29,30 +29,26 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestRequest; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.function.Function; /** * Context used to fetch the {@code _source}. */ public class FetchSourceContext implements Writeable, ToXContent { - public static final ParseField INCLUDES_FIELD = new ParseField("includes", "include"); - public static final ParseField EXCLUDES_FIELD = new ParseField("excludes", "exclude"); + private static final ParseField INCLUDES_FIELD = new ParseField("includes", "include"); + private static final ParseField EXCLUDES_FIELD = new ParseField("excludes", "exclude"); public static final FetchSourceContext FETCH_SOURCE = new FetchSourceContext(true); public static final FetchSourceContext DO_NOT_FETCH_SOURCE = new FetchSourceContext(false); private final boolean fetchSource; private final String[] includes; private final String[] excludes; - private Function, Map> filter; public FetchSourceContext(boolean fetchSource, String[] includes, String[] excludes) { this.fetchSource = fetchSource; @@ -89,6 +85,10 @@ public String[] excludes() { return this.excludes; } + public boolean isFiltered() { + return (includes != null && includes.length >0) || (excludes != null && excludes.length >0); + } + public static FetchSourceContext parseFromRestRequest(RestRequest request) { Boolean fetchSource = null; String[] source_excludes = null; @@ -224,15 +224,4 @@ public int hashCode() { result = 31 * result + (excludes != null ? Arrays.hashCode(excludes) : 0); return result; } - - /** - * Returns a filter function that expects the source map as an input and returns - * the filtered map. - */ - public Function, Map> getFilter() { - if (filter == null) { - filter = XContentMapValues.filter(includes, excludes); - } - return filter; - } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java index 3171ca4b00834..f2837642ff866 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceSubPhase.java @@ -20,8 +20,6 @@ package org.elasticsearch.search.fetch.subphase; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceLookup; @@ -35,29 +33,23 @@ public void hitExecute(SearchContext context, HitContext hitContext) { if (context.sourceRequested() == false) { return; } - SourceLookup source = context.lookup().source(); + SourceLookup sourceLookup = context.lookup().source(); FetchSourceContext fetchSourceContext = context.fetchSourceContext(); assert fetchSourceContext.fetchSource(); - if (fetchSourceContext.includes().length == 0 && fetchSourceContext.excludes().length == 0) { - hitContext.hit().sourceRef(source.internalSourceRef()); + if (fetchSourceContext.isFiltered() == false) { + hitContext.hit().sourceRef(sourceLookup.internalSourceRef()); return; } - if (source.internalSourceRef() == null) { + if (sourceLookup.internalSourceRef() == null) { throw new IllegalArgumentException("unable to fetch fields from _source field: _source is disabled in the mappings " + "for index [" + context.indexShard().shardId().getIndexName() + "]"); } - final Object value = source.filter(fetchSourceContext); try { - final int initialCapacity = Math.min(1024, source.internalSourceRef().length()); - BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity); - XContentBuilder builder = new XContentBuilder(source.sourceContentType().xContent(), streamOutput); - builder.value(value); - hitContext.hit().sourceRef(builder.bytes()); + hitContext.hit().sourceRef(sourceLookup.filter(fetchSourceContext)); } catch (IOException e) { throw new ElasticsearchException("Error filtering source", e); } - } } diff --git a/core/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java b/core/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java index 4cc44747d0f00..3ce6e47c579ec 100644 --- a/core/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java +++ b/core/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -128,8 +129,8 @@ public List extractRawValues(String path) { return XContentMapValues.extractRawValues(path, loadSourceIfNeeded()); } - public Object filter(FetchSourceContext context) { - return context.getFilter().apply(loadSourceIfNeeded()); + public BytesReference filter(FetchSourceContext context) throws IOException { + return XContentHelper.filter(sourceAsBytes, sourceContentType, context.includes(), context.excludes()); } public Object extractValue(String path) { diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index 7049d0fa9e98e..9bad39b71bfb9 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -301,6 +301,7 @@ public void testFetchSourceParsing() throws Exception { assertThat(request.fetchSource().includes().length, equalTo(0)); assertThat(request.fetchSource().excludes().length, equalTo(0)); assertThat(request.fetchSource().fetchSource(), equalTo(true)); + assertThat(request.fetchSource().isFiltered(), equalTo(false)); request.fromXContent(createParser(XContentFactory.jsonBuilder() .startObject() @@ -310,6 +311,7 @@ public void testFetchSourceParsing() throws Exception { assertThat(request.fetchSource().includes().length, equalTo(0)); assertThat(request.fetchSource().excludes().length, equalTo(0)); assertThat(request.fetchSource().fetchSource(), equalTo(false)); + assertThat(request.fetchSource().isFiltered(), equalTo(false)); request.fromXContent(createParser(XContentFactory.jsonBuilder() .startObject() @@ -320,6 +322,7 @@ public void testFetchSourceParsing() throws Exception { assertThat(request.fetchSource().includes().length, equalTo(1)); assertThat(request.fetchSource().excludes().length, equalTo(0)); assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*")); + assertThat(request.fetchSource().isFiltered(), equalTo(true)); request.fromXContent(createParser(XContentFactory.jsonBuilder() .startObject() @@ -334,6 +337,7 @@ public void testFetchSourceParsing() throws Exception { assertThat(request.fetchSource().excludes().length, equalTo(1)); assertThat(request.fetchSource().includes()[0], equalTo("path.inner.*")); assertThat(request.fetchSource().excludes()[0], equalTo("another.inner.*")); + assertThat(request.fetchSource().isFiltered(), equalTo(true)); } public void testNowInScript() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java b/core/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java index 219bd12daeaea..94f4630484397 100644 --- a/core/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java +++ b/core/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java @@ -23,10 +23,10 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; import java.io.IOException; @@ -34,8 +34,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; -public class XContentHelperTests extends ESTestCase { +import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; + +public class XContentHelperTests extends AbstractFilteringTestCase { + + @Override + protected void testFilter(Builder expected, Builder actual, Set includes, Set excludes) throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + final boolean humanReadable = randomBoolean(); + + ToXContentObject expectedToXContent = (builder, params) -> expected.apply(builder); + BytesReference expectedBytes = toXContent(expectedToXContent, xContentType, humanReadable); + + BytesReference actualBytes; + try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent(), includes, excludes)) { + actualBytes = actual.apply(builder).bytes(); + } + + assertArrayEquals(expectedBytes.toBytesRef().bytes, actualBytes.toBytesRef().bytes); + } Map getMap(Object... keyValues) { Map map = new HashMap<>(); diff --git a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java index f8bd7b80d0e9f..5635d0b0cff95 100644 --- a/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/builder/SearchSourceBuilderTests.java @@ -109,6 +109,7 @@ public void testParseIncludeExclude() throws IOException { SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.fromXContent(createParseContext(parser)); assertArrayEquals(new String[]{"*.field2"}, searchSourceBuilder.fetchSource().excludes()); assertArrayEquals(new String[]{"include"}, searchSourceBuilder.fetchSource().includes()); + assertTrue(searchSourceBuilder.fetchSource().isFiltered()); } } { @@ -118,6 +119,7 @@ public void testParseIncludeExclude() throws IOException { assertArrayEquals(new String[]{}, searchSourceBuilder.fetchSource().excludes()); assertArrayEquals(new String[]{}, searchSourceBuilder.fetchSource().includes()); assertFalse(searchSourceBuilder.fetchSource().fetchSource()); + assertFalse(searchSourceBuilder.fetchSource().isFiltered()); } } }