Skip to content

Commit

Permalink
Make reader context per-user
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi committed Apr 24, 2020
1 parent 058d663 commit f81d651
Show file tree
Hide file tree
Showing 20 changed files with 231 additions and 262 deletions.
16 changes: 8 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,14 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public final Reader acquireReader(Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireReader(wrapper, SearcherScope.EXTERNAL);
public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Reader acquireReader(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -611,7 +611,7 @@ public Reader acquireReader(Function<Searcher, Searcher> wrapper, SearcherScope
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
Reader reader = new Reader(wrapper) {
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
Expand Down Expand Up @@ -659,9 +659,9 @@ public final Searcher acquireSearcher(String source) throws EngineException {
}

public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
Reader releasable = null;
SearcherSupplier releasable = null;
try {
Reader reader = releasable = acquireReader(Function.identity(), scope);
SearcherSupplier reader = releasable = acquireSearcherSupplier(Function.identity(), scope);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),
Expand Down Expand Up @@ -1174,10 +1174,10 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}
}

public abstract static class Reader implements Releasable {
public abstract static class SearcherSupplier implements Releasable {
private final Function<Searcher, Searcher> wrapper;

public Reader(Function<Searcher, Searcher> wrapper) {
public SearcherSupplier(Function<Searcher, Searcher> wrapper) {
this.wrapper = wrapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -158,15 +157,15 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

static final class StatsHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,18 +1199,18 @@ public void failShard(String reason, @Nullable Exception e) {
/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.Reader acquireReader() {
return acquireReader(Engine.SearcherScope.EXTERNAL);
public Engine.SearcherSupplier acquireSearcherSupplier() {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.Reader acquireReader(Engine.SearcherScope scope) {
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
readAllowed();
markSearcherAccessed();
final Engine engine = getEngine();
return engine.acquireReader(this::wrapSearcher, scope);
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
}

public Engine.Searcher acquireSearcher(String source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -93,28 +92,29 @@ default void onNewReaderContext(ReaderContext readerContext) {}
default void onFreeReaderContext(ReaderContext readerContext) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param scrollContext the created search context
* Executed when a new scroll search {@link ReaderContext} was created
* @param readerContext the created reader context
*/
default void onNewScrollContext(ScrollContext scrollContext) {}
default void onNewScrollContext(ReaderContext readerContext) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param scrollContext the freed search context
* @param readerContext the freed search context
*/
default void onFreeScrollContext(ScrollContext scrollContext) {}
default void onFreeScrollContext(ReaderContext readerContext) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
* from the active contexts. If the context is deemed invalid a runtime
* exception can be thrown, which will prevent the context from being used.
* @param context the context retrieved from the active contexts
* @param readerContext The reader context used by this request.
* @param searchContext The newly created {@link SearchContext}.
* @param transportRequest the request that is going to use the search context
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}
default void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
Expand Down Expand Up @@ -225,33 +225,33 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(scrollContext);
listener.onNewScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(scrollContext);
listener.onFreeScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void validateSearchContext(SearchContext context, TransportRequest request) {
public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) {
Exception exception = null;
for (SearchOperationListener listener : listeners) {
try {
listener.validateSearchContext(context, request);
listener.validateSearchContext(readerContext, searchContext, request);
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down
Loading

0 comments on commit f81d651

Please sign in to comment.