Skip to content

Commit

Permalink
Realtime get from in-memory segment when possible (#64504)
Browse files Browse the repository at this point in the history
If the reader wrapper is specified, then we can't perform a realtime get 
using operations from translog. With this change, we will create an
in-memory Lucene segment from that indexing operation and perform a
realtime get from that segment to avoid refresh storms.
  • Loading branch information
dnhatn authored Nov 10, 2020
1 parent 23232c1 commit 33b408f
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -47,6 +49,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.singleton;
Expand All @@ -65,7 +68,17 @@ public class GetActionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(InternalSettingsPlugin.class);
return List.of(InternalSettingsPlugin.class, SearcherWrapperPlugin.class);
}

public static class SearcherWrapperPlugin extends Plugin {
@Override
public void onIndexModule(IndexModule indexModule) {
super.onIndexModule(indexModule);
if (randomBoolean()) {
indexModule.setReaderWrapper(indexService -> EngineTestCase.randomReaderWrapper());
}
}
}

public void testSimpleGet() {
Expand Down
13 changes: 8 additions & 5 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapping;
Expand Down Expand Up @@ -96,7 +97,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -559,9 +559,7 @@ public static class NoOpResult extends Result {

}

protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory,
SearcherScope scope) throws EngineException {
final Engine.Searcher searcher = searcherFactory.apply("get", scope);
protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException {
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);
Expand Down Expand Up @@ -596,7 +594,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
}
}

public abstract GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException;
public abstract GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper);

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
Expand Down Expand Up @@ -1616,6 +1614,7 @@ private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion,
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this.fromTranslog = fromTranslog;
assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader;
}

public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
Expand All @@ -1630,6 +1629,10 @@ public long version() {
return this.version;
}

/**
* Returns {@code true} iff the get was performed from a translog operation. Notes that this returns {@code false}
* if the get was performed on an in-memory Lucene segment created from the corresponding translog operation.
*/
public boolean isFromTranslog() {
return fromTranslog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -618,14 +619,34 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}
}

private GetResult getFromTranslog(Get get, Translog.Index index, DocumentMapper mapper,
Function<Searcher, Searcher> searcherWrapper) throws IOException {
assert get.isReadFromTranslog();
final SingleDocDirectoryReader inMemoryReader = new SingleDocDirectoryReader(shardId, index, mapper, config().getAnalyzer());
final Engine.Searcher searcher = new Engine.Searcher("realtime_get", ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(), config().getQueryCache(), config().getQueryCachingPolicy(), inMemoryReader);
final Searcher wrappedSearcher = searcherWrapper.apply(searcher);
if (wrappedSearcher == searcher) {
searcher.close();
assert inMemoryReader.assertMemorySegmentStatus(false);
final TranslogLeafReader translogLeafReader = new TranslogLeafReader(index);
return new GetResult(new Engine.Searcher("realtime_get", translogLeafReader,
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), translogLeafReader),
new VersionsAndSeqNoResolver.DocIdAndVersion(
0, index.version(), index.seqNo(), index.primaryTerm(), translogLeafReader, 0), true);
} else {
assert inMemoryReader.assertMemorySegmentStatus(true);
return getFromSearcher(get, wrappedSearcher);
}
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = null;
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
Expand All @@ -649,15 +670,9 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(new Engine.Searcher("realtime_get", reader,
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
reader, 0), true);
return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
Expand All @@ -670,14 +685,11 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
scope = SearcherScope.INTERNAL;
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}

// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory, scope);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -48,7 +49,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -219,8 +219,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
public GetResult get(Get get, DocumentMapper mapper, Function<Searcher, Searcher> searcherWrapper) {
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}

@Override
Expand Down
Loading

0 comments on commit 33b408f

Please sign in to comment.