From 00d54fabb259a21e98441f58c04b86b7f13a3a71 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 30 Jan 2015 12:24:00 +0100 Subject: [PATCH] Search: Remove query-cache serialization optimization. The query-cache has an optimization to not deserialize the bytes at the shard level. However this is a bit fragile since it assumes that serialized streams can be concatenanted (which is not the case with shared strings) and also does not update the QueryResult object that is held by the SearchContext. So you need to make sure to use the right one. With this change, the query cache just deserializes bytes into the QueryResult object from the context. Close #9500 --- .../cache/query/IndicesQueryCache.java | 102 ++---------------- .../elasticsearch/search/SearchService.java | 31 +++--- .../cache/query/IndicesQueryCacheTests.java | 75 +++++++++++++ 3 files changed, 99 insertions(+), 109 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java diff --git a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java index 02564db8ba023..f2c882e1769e8 100644 --- a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -32,37 +32,26 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.PagedBytesReference; -import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.MemorySizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -217,11 +206,12 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { } /** - * Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows + * Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached + * value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows * to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse * the same cache. */ - public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception { + public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception { assert canCache(request, context); Key key = buildKey(request, context); Loader loader = new Loader(queryPhase, context, key); @@ -238,10 +228,11 @@ public QuerySearchResultProvider load(final ShardSearchRequest request, final Se } } else { key.shard.queryCache().onHit(); + // restore the cached query result into the context + final QuerySearchResult result = context.queryResult(); + result.readFromWithId(context.id(), value.reference.streamInput()); + result.shardTarget(context.shardTarget()); } - - // try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution - return new BytesQuerySearchResult(context.id(), context.shardTarget(), value.reference, loader.isLoaded() ? context.queryResult() : null); } private static class Loader implements Callable { @@ -278,7 +269,6 @@ public Value call() throws Exception { // for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep // the memory properly paged instead of having varied sized bytes final BytesReference reference = out.bytes(); - assert verifyCacheSerializationSameAsQueryResult(reference, context, context.queryResult()); loaded = true; Value value = new Value(reference, out.ramBytesUsed()); key.shard.queryCache().onCached(key, value); @@ -459,14 +449,6 @@ synchronized void reap() { } } - private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception { - BytesStreamOutput out1 = new BytesStreamOutput(); - new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1); - BytesStreamOutput out2 = new BytesStreamOutput(); - result.writeTo(out2); - return out1.bytes().equals(out2.bytes()); - } - private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception { // TODO: for now, this will create different keys for different JSON order // TODO: tricky to get around this, need to parse and order all, which can be expensive @@ -474,74 +456,4 @@ private static Key buildKey(ShardSearchRequest request, SearchContext context) t ((DirectoryReader) context.searcher().getIndexReader()).getVersion(), request.cacheKey()); } - - /** - * this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult} - * and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}. - */ - private static class BytesQuerySearchResult extends QuerySearchResultProvider { - - private long id; - private SearchShardTarget shardTarget; - private BytesReference data; - - private transient QuerySearchResult result; - - private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) { - this(id, shardTarget, data, null); - } - - private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) { - this.id = id; - this.shardTarget = shardTarget; - this.data = data; - this.result = result; - } - - @Override - public boolean includeFetch() { - return false; - } - - @Override - public QuerySearchResult queryResult() { - if (result == null) { - result = new QuerySearchResult(id, shardTarget); - try { - result.readFromWithId(id, data.streamInput()); - } catch (Exception e) { - throw new ElasticsearchParseException("failed to parse a cached query", e); - } - } - return result; - } - - @Override - public long id() { - return id; - } - - @Override - public SearchShardTarget shardTarget() { - return shardTarget; - } - - @Override - public void shardTarget(SearchShardTarget shardTarget) { - this.shardTarget = shardTarget; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - throw new ElasticsearchIllegalStateException("readFrom should not be called"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(id); -// shardTarget.writeTo(out); not needed - data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult - } - } } diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index a2de13db23eb1..35c8450301dfc 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,6 +24,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; + import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; @@ -271,6 +272,19 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ } } + /** + * Try to load the query results from the cache or execute the query phase directly if the cache cannot be used. + */ + private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context, + final QueryPhase queryPhase) throws Exception { + final boolean canCache = indicesQueryCache.canCache(request, context); + if (canCache) { + indicesQueryCache.loadIntoContext(request, context, queryPhase); + } else { + queryPhase.execute(context); + } + } + public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { final SearchContext context = createAndPutContext(request); try { @@ -278,14 +292,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t long time = System.nanoTime(); contextProcessing(context); - QuerySearchResultProvider result; - boolean canCache = indicesQueryCache.canCache(request, context); - if (canCache) { - result = indicesQueryCache.load(request, context, queryPhase); - } else { - queryPhase.execute(context); - result = context.queryResult(); - } + loadOrExecuteQueryPhase(request, context, queryPhase); if (context.searchType() == SearchType.COUNT) { freeContext(context.id()); @@ -294,7 +301,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t } context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); - return result; + return context.queryResult(); } catch (Throwable e) { // execution exception can happen while loading the cache, strip it if (e instanceof ExecutionException) { @@ -989,11 +996,7 @@ public void run() { if (canCache != top) { return; } - if (canCache) { - indicesQueryCache.load(request, context, queryPhase); - } else { - queryPhase.execute(context); - } + loadOrExecuteQueryPhase(request, context, queryPhase); long took = System.nanoTime() - now; if (indexShard.warmerService().logger().isTraceEnabled()) { indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took)); diff --git a/src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java b/src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java new file mode 100644 index 0000000000000..a25ce91b190ba --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/cache/query/IndicesQueryCacheTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cache.query; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.elasticsearch.test.ElasticsearchIntegrationTest; + +import java.util.List; + +import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.greaterThan; + +public class IndicesQueryCacheTests extends ElasticsearchIntegrationTest { + + // One of the primary purposes of the query cache is to cache aggs results + public void testCacheAggs() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index") + .addMapping("type", "f", "type=date") + .setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get()); + indexRandom(true, + client().prepareIndex("index", "type").setSource("f", "2014-03-10T00:00:00.000Z"), + client().prepareIndex("index", "type").setSource("f", "2014-05-13T00:00:00.000Z")); + + // This is not a random example: serialization with time zones writes shared strings + // which used to not work well with the query cache because of the handles stream output + // see #9500 + final SearchResponse r1 = client().prepareSearch("index").setSearchType(SearchType.COUNT) + .addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get(); + assertSearchResponse(r1); + + // The cached is actually used + assertThat(client().admin().indices().prepareStats("index").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l)); + + for (int i = 0; i < 10; ++i) { + final SearchResponse r2 = client().prepareSearch("index").setSearchType(SearchType.COUNT) + .addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get(); + assertSearchResponse(r2); + Histogram h1 = r1.getAggregations().get("histo"); + Histogram h2 = r2.getAggregations().get("histo"); + final List buckets1 = h1.getBuckets(); + final List buckets2 = h2.getBuckets(); + assertEquals(buckets1.size(), buckets2.size()); + for (int j = 0; j < buckets1.size(); ++j) { + final Bucket b1 = buckets1.get(j); + final Bucket b2 = buckets2.get(j); + assertEquals(b1.getKey(), b2.getKey()); + assertEquals(b1.getDocCount(), b2.getDocCount()); + } + } + } + +}