Skip to content

Commit

Permalink
Add index commit id to searcher (#63963)
Browse files Browse the repository at this point in the history
This change assigns the id of an index commit to a searcher, so we can
retry search requests on another shard copy if they have the same index
commit.
  • Loading branch information
dnhatn authored Dec 12, 2020
1 parent defaa93 commit 0e8e02f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -472,6 +474,34 @@ public void testResyncPropagatePrimaryTerm() throws Exception {
}
}

public void testCommitIdInSearcher() throws Exception {
final String indexName = "test_commit_id";
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(toList()));
ensureGreen(indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
ensureGreen(indexName);
final String nodeWithPrimary = Iterables.get(internalCluster().nodesInclude(indexName), 0);
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary)
.indexService(resolveIndex(indexName)).getShard(0);
final String commitId;
try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) {
assertNotNull(searcherSupplier.getCommitId());
commitId = searcherSupplier.getCommitId();
}
internalCluster().restartNode(nodeWithPrimary);
ensureGreen(indexName);
shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary).indexService(resolveIndex(indexName)).getShard(0);
try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) {
assertThat(searcherSupplier.getCommitId(), equalTo(commitId));
}
}

private static void closeIndices(final String... indices) {
closeIndices(client().admin().indices().prepareClose(indices));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -808,6 +809,13 @@ public void delete() {
}
}

/**
* Returns a base64 encoded string of the commit id of the given {@link SegmentInfos}
*/
public static String getCommitId(SegmentInfos segmentInfos) {
return Base64.getEncoder().encodeToString(segmentInfos.getId());
}

/**
* Return a {@link Bits} view of the provided scorer.
* <b>NOTE</b>: that the returned {@link Bits} instance MUST be consumed in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Map;

import static java.util.Map.entry;
Expand All @@ -47,7 +46,7 @@ public CommitStats(SegmentInfos segmentInfos) {
userData = Map.copyOf(segmentInfos.getUserData());
// lucene calls the current generation, last generation.
generation = segmentInfos.getLastGeneration();
id = Base64.getEncoder().encodeToString(segmentInfos.getId());
id = Lucene.getCommitId(segmentInfos);
numDocs = Lucene.getNumDocs(segmentInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,15 @@ public final void close() {
protected abstract void doClose();

protected abstract Searcher acquireSearcherInternal(String source);

/**
* Returns a commit id associated with this searcher if it's opened from an index commit; otherwise, return null. Two searchers
* with the same commit id must have identical Lucene level indices (i.e., identical segments with same docs using same doc-ids).
*/
@Nullable
public String getCommitId() {
return null;
}
}

public static final class Searcher extends IndexSearcher implements Releasable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ReadOnlyEngine extends Engine {
private final boolean requireCompleteHistory;

protected volatile TranslogStats translogStats;
protected final String commitId;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand Down Expand Up @@ -110,6 +111,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.commitId = Lucene.getCommitId(lastCommittedSegmentInfos);
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -523,4 +525,25 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException {
}
}


@Override
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope);
return new SearcherSupplier(Function.identity()) {
@Override
protected void doClose() {
delegate.close();
}

@Override
protected Searcher acquireSearcherInternal(String source) {
return delegate.acquireSearcherInternal(source);
}

@Override
public String getCommitId() {
return commitId;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public Searcher acquireSearcherInternal(String source) {
protected void doClose() {
store.decRef();
}

@Override
public String getCommitId() {
return commitId;
}
};
}

Expand Down

0 comments on commit 0e8e02f

Please sign in to comment.