Skip to content

Commit

Permalink
Search: Remove query-cache serialization optimization.
Browse files Browse the repository at this point in the history
The query-cache has an optimization to not deserialize the bytes at the shard
level. However this is a bit fragile since it assumes that serialized streams
can be concatenanted (which is not the case with shared strings) and also does
not update the QueryResult object that is held by the SearchContext. So you
need to make sure to use the right one.

With this change, the query cache just deserializes bytes into the QueryResult
object from the context.

Close #9500
  • Loading branch information
jpountz committed Jan 30, 2015
1 parent fb377d4 commit 00d54fa
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,26 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -217,11 +206,12 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
}

/**
* Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
* the same cache.
*/
public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
Key key = buildKey(request, context);
Loader loader = new Loader(queryPhase, context, key);
Expand All @@ -238,10 +228,11 @@ public QuerySearchResultProvider load(final ShardSearchRequest request, final Se
}
} else {
key.shard.queryCache().onHit();
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
result.readFromWithId(context.id(), value.reference.streamInput());
result.shardTarget(context.shardTarget());
}

// try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution
return new BytesQuerySearchResult(context.id(), context.shardTarget(), value.reference, loader.isLoaded() ? context.queryResult() : null);
}

private static class Loader implements Callable<Value> {
Expand Down Expand Up @@ -278,7 +269,6 @@ public Value call() throws Exception {
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
assert verifyCacheSerializationSameAsQueryResult(reference, context, context.queryResult());
loaded = true;
Value value = new Value(reference, out.ramBytesUsed());
key.shard.queryCache().onCached(key, value);
Expand Down Expand Up @@ -459,89 +449,11 @@ synchronized void reap() {
}
}

private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception {
BytesStreamOutput out1 = new BytesStreamOutput();
new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1);
BytesStreamOutput out2 = new BytesStreamOutput();
result.writeTo(out2);
return out1.bytes().equals(out2.bytes());
}

private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
// TODO: for now, this will create different keys for different JSON order
// TODO: tricky to get around this, need to parse and order all, which can be expensive
return new Key(context.indexShard(),
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
request.cacheKey());
}

/**
* this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult}
* and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}.
*/
private static class BytesQuerySearchResult extends QuerySearchResultProvider {

private long id;
private SearchShardTarget shardTarget;
private BytesReference data;

private transient QuerySearchResult result;

private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) {
this(id, shardTarget, data, null);
}

private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) {
this.id = id;
this.shardTarget = shardTarget;
this.data = data;
this.result = result;
}

@Override
public boolean includeFetch() {
return false;
}

@Override
public QuerySearchResult queryResult() {
if (result == null) {
result = new QuerySearchResult(id, shardTarget);
try {
result.readFromWithId(id, data.streamInput());
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse a cached query", e);
}
}
return result;
}

@Override
public long id() {
return id;
}

@Override
public SearchShardTarget shardTarget() {
return shardTarget;
}

@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new ElasticsearchIllegalStateException("readFrom should not be called");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
// shardTarget.writeTo(out); not needed
data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult
}
}
}
31 changes: 17 additions & 14 deletions src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;

import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -271,21 +272,27 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ
}
}

/**
* Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
*/
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context,
final QueryPhase queryPhase) throws Exception {
final boolean canCache = indicesQueryCache.canCache(request, context);
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}

public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);

QuerySearchResultProvider result;
boolean canCache = indicesQueryCache.canCache(request, context);
if (canCache) {
result = indicesQueryCache.load(request, context, queryPhase);
} else {
queryPhase.execute(context);
result = context.queryResult();
}
loadOrExecuteQueryPhase(request, context, queryPhase);

if (context.searchType() == SearchType.COUNT) {
freeContext(context.id());
Expand All @@ -294,7 +301,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t
}
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);

return result;
return context.queryResult();
} catch (Throwable e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
Expand Down Expand Up @@ -989,11 +996,7 @@ public void run() {
if (canCache != top) {
return;
}
if (canCache) {
indicesQueryCache.load(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
loadOrExecuteQueryPhase(request, context, queryPhase);
long took = System.nanoTime() - now;
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.cache.query;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ElasticsearchIntegrationTest;

import java.util.List;

import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.greaterThan;

public class IndicesQueryCacheTests extends ElasticsearchIntegrationTest {

// One of the primary purposes of the query cache is to cache aggs results
public void testCacheAggs() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index")
.addMapping("type", "f", "type=date")
.setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get());
indexRandom(true,
client().prepareIndex("index", "type").setSource("f", "2014-03-10T00:00:00.000Z"),
client().prepareIndex("index", "type").setSource("f", "2014-05-13T00:00:00.000Z"));

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
// see #9500
final SearchResponse r1 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
.addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
assertSearchResponse(r1);

// The cached is actually used
assertThat(client().admin().indices().prepareStats("index").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));

for (int i = 0; i < 10; ++i) {
final SearchResponse r2 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
.addAggregation(dateHistogram("histo").field("f").preZone("+01:00").minDocCount(0).interval(DateHistogramInterval.MONTH)).get();
assertSearchResponse(r2);
Histogram h1 = r1.getAggregations().get("histo");
Histogram h2 = r2.getAggregations().get("histo");
final List<? extends Bucket> buckets1 = h1.getBuckets();
final List<? extends Bucket> buckets2 = h2.getBuckets();
assertEquals(buckets1.size(), buckets2.size());
for (int j = 0; j < buckets1.size(); ++j) {
final Bucket b1 = buckets1.get(j);
final Bucket b2 = buckets2.get(j);
assertEquals(b1.getKey(), b2.getKey());
assertEquals(b1.getDocCount(), b2.getDocCount());
}
}
}

}

0 comments on commit 00d54fa

Please sign in to comment.