Skip to content

Commit

Permalink
Avoid wrapping searchers multiple times in mget
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 11, 2024
1 parent 52bd9e8 commit 74131f6
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardGetService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -139,8 +140,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
@Override
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
MultiGetShardResponse response = new MultiGetShardResponse();
for (int i = 0; i < request.locations.size(); i++) {
getAndAddToResponse(shardId, i, request, response);
try (var mgetSession = new ShardGetService.MGetSession(getIndexShard(shardId).getService())) {
for (int i = 0; i < request.locations.size(); i++) {
getAndAddToResponse(mgetSession, i, request, response);
}
}
return response;
}
Expand Down Expand Up @@ -226,38 +229,43 @@ private void handleMultiGetOnUnpromotableShard(

private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
logger.trace("handling local gets for missing locations");
for (int i = 0; i < response.locations.size(); i++) {
if (response.responses.get(i) == null && response.failures.get(i) == null) {
getAndAddToResponse(shardId, i, request, response);
try (var mgetSession = new ShardGetService.MGetSession(getIndexShard(shardId).getService())) {
for (int i = 0; i < response.locations.size(); i++) {
if (response.responses.get(i) == null && response.failures.get(i) == null) {
getAndAddToResponse(mgetSession, i, request, response);
}
}
}
return response;
}

private void getAndAddToResponse(ShardId shardId, int location, MultiGetShardRequest request, MultiGetShardResponse response) {
var indexShard = getIndexShard(shardId);
private void getAndAddToResponse(
ShardGetService.MGetSession mgetSession,
int location,
MultiGetShardRequest request,
MultiGetShardResponse response
) {
MultiGetRequest.Item item = request.items.get(location);
try {
GetResult getResult = indexShard.getService()
.get(
item.id(),
item.storedFields(),
request.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
request.isForceSyntheticSource()
);
GetResult getResult = mgetSession.get(
item.id(),
item.storedFields(),
request.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
request.isForceSyntheticSource()
);
response.add(request.locations.get(location), new GetResponse(getResult));
} catch (RuntimeException e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw e;
} else {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
logger.debug(() -> format("%s failed to execute multi_get for [%s]", mgetSession.shardId(), item.id()), e);
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
} catch (IOException e) {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
logger.debug(() -> format("%s failed to execute multi_get for [%s]", mgetSession.shardId(), item.id()), e);
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
}
Expand Down
20 changes: 14 additions & 6 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -1212,15 +1211,24 @@ public static Engine.Delete prepareDelete(
}

public Engine.GetResult get(Engine.Get get) {
return innerGet(get, false);
return innerGet(get, false, this::wrapSearcher);
}

final MultiEngineGet newMultiEngineGet() {
return new MultiEngineGet(this::wrapSearcher) {
@Override
GetResult engineGet(Engine.Get get) {
return innerGet(get, false, this::wrapSearchWithCache);
}
};
}

public Engine.GetResult getFromTranslog(Engine.Get get) {
assert get.realtime();
return innerGet(get, true);
return innerGet(get, true, this::wrapSearcher);
}

private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly) {
private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
readAllowed();
MappingLookup mappingLookup = mapperService.mappingLookup();
if (mappingLookup.hasMappings() == false) {
Expand All @@ -1230,9 +1238,9 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly) {
throw new IllegalStateException("get operations not allowed on a legacy index");
}
if (translogOnly) {
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
}
return getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.shard;

import org.apache.lucene.index.IndexReader;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.engine.Engine;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

/**
* A session that can perform multiple gets without wrapping searchers multiple times.
* This must be created and used by a single thread.
*/
abstract class MultiEngineGet implements Releasable {
private final Map<IndexReader.CacheKey, Engine.Searcher> caches = new HashMap<>();
private final Thread creationThread;
private final Function<Engine.Searcher, Engine.Searcher> wrapper;

MultiEngineGet(Function<Engine.Searcher, Engine.Searcher> wrapper) {
this.creationThread = Thread.currentThread();
this.wrapper = wrapper;
}

private boolean assertAccessingThread() {
assert creationThread == Thread.currentThread()
: "created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]";
return true;
}

abstract Engine.GetResult engineGet(Engine.Get get);

Engine.Searcher wrapSearchWithCache(Engine.Searcher searcher) {
assert assertAccessingThread();
final IndexReader.CacheHelper cacheHelper = searcher.getIndexReader().getReaderCacheHelper();
final IndexReader.CacheKey cacheKey = cacheHelper != null ? cacheHelper.getKey() : null;
if (cacheKey == null) {
return wrapper.apply(searcher);
}
final Engine.Searcher wrapped = caches.computeIfAbsent(cacheKey, k -> wrapper.apply(searcher));
return new Engine.Searcher(
wrapped.source(),
wrapped.getIndexReader(),
wrapped.getSimilarity(),
wrapped.getQueryCache(),
wrapped.getQueryCachingPolicy(),
() -> {}
);
}

@Override
public void close() {
Releasables.close(caches.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.index.get;
package org.elasticsearch.index.shard;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -16,20 +16,21 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.Source;

Expand All @@ -41,6 +42,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -79,6 +81,7 @@ public GetResult get(
) throws IOException {
return get(
id,
indexShard::get,
gFields,
realtime,
version,
Expand All @@ -93,6 +96,7 @@ public GetResult get(

private GetResult get(
String id,
Function<Engine.Get, Engine.GetResult> getFromEngine,
String[] gFields,
boolean realtime,
long version,
Expand All @@ -108,6 +112,7 @@ private GetResult get(
long now = System.nanoTime();
GetResult getResult = innerGet(
id,
getFromEngine,
gFields,
realtime,
version,
Expand Down Expand Up @@ -141,6 +146,7 @@ public GetResult getFromTranslog(
) throws IOException {
return get(
id,
unused -> { throw new IllegalStateException("getFromTranslog should not call getFromEngine"); },
gFields,
realtime,
version,
Expand All @@ -156,6 +162,7 @@ public GetResult getFromTranslog(
public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throws IOException {
return get(
id,
indexShard::get,
new String[] { RoutingFieldMapper.NAME },
true,
Versions.MATCH_ANY,
Expand Down Expand Up @@ -217,6 +224,7 @@ private static FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSou

private GetResult innerGet(
String id,
Function<Engine.Get, Engine.GetResult> getFromEngine,
String[] gFields,
boolean realtime,
long version,
Expand All @@ -232,7 +240,7 @@ private GetResult innerGet(
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);
try (Engine.GetResult get = translogOnly ? indexShard.getFromTranslog(engineGet) : indexShard.get(engineGet)) {
try (Engine.GetResult get = translogOnly ? indexShard.getFromTranslog(engineGet) : getFromEngine.apply(engineGet)) {
if (get == null) {
return null;
}
Expand Down Expand Up @@ -345,4 +353,51 @@ private static StoredFieldLoader buildStoredFieldLoader(String[] fields, FetchSo
}
return StoredFieldLoader.create(fetchSourceContext.fetchSource(), fieldsToLoad);
}

/**
* A multi-get session where we can enable optimizations for multiple get requests.
* This session must be created and used by a single thread.
*/
public static final class MGetSession implements Releasable {
private final ShardGetService shardGetService;
private final MultiEngineGet multiEngineGet;

public MGetSession(ShardGetService shardGetService) {
this.shardGetService = shardGetService;
this.multiEngineGet = shardGetService.indexShard.newMultiEngineGet();
}

@Override
public void close() {
multiEngineGet.close();
}

public ShardId shardId() {
return shardGetService.shardId();
}

public GetResult get(
String id,
String[] gFields,
boolean realtime,
long version,
VersionType versionType,
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
) throws IOException {
return shardGetService.get(
id,
multiEngineGet::engineGet,
gFields,
realtime,
version,
versionType,
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
fetchSourceContext,
forceSyntheticSource,
false
);
}
}
}

0 comments on commit 74131f6

Please sign in to comment.