Skip to content

Commit

Permalink
Assign id to searcher using ids of segments (#66668)
Browse files Browse the repository at this point in the history
The commit id introduced in #63963 does not work well with searchable 
snapshots as we create a new index commit when restoring from snapshots.

This change revises an approach that generates an id using the ids of the
 segments of an index commit.

Relates #63963
  • Loading branch information
dnhatn authored Dec 21, 2020
1 parent e7c9be1 commit fdec6c1
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -51,6 +51,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -474,10 +475,11 @@ public void testResyncPropagatePrimaryTerm() throws Exception {
}
}

public void testCommitIdInSearcher() throws Exception {
public void testSearcherId() throws Exception {
final String indexName = "test_commit_id";
final int numberOfShards = randomIntBetween(1, 5);
createIndex(indexName, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
Expand All @@ -486,19 +488,42 @@ public void testCommitIdInSearcher() throws Exception {
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
ensureGreen(indexName);
final String nodeWithPrimary = Iterables.get(internalCluster().nodesInclude(indexName), 0);
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary)
.indexService(resolveIndex(indexName)).getShard(0);
final String commitId;
try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) {
assertNotNull(searcherSupplier.getCommitId());
commitId = searcherSupplier.getCommitId();
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen(indexName);
}
String[] searcherIds = new String[numberOfShards];
Set<String> allocatedNodes = internalCluster().nodesInclude(indexName);
for (String node : allocatedNodes) {
IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName));
for (IndexShard shard : indexService) {
try (Engine.SearcherSupplier searcher = shard.acquireSearcherSupplier()) {
assertNotNull(searcher.getSearcherId());
if (searcherIds[shard.shardId().id()] != null) {
assertThat(searcher.getSearcherId(), equalTo(searcherIds[shard.shardId().id()]));
} else {
searcherIds[shard.shardId().id()] = searcher.getSearcherId();
}
}
}
}
for (String node : allocatedNodes) {
if (randomBoolean()) {
internalCluster().restartNode(node);
}
}
internalCluster().restartNode(nodeWithPrimary);
ensureGreen(indexName);
shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary).indexService(resolveIndex(indexName)).getShard(0);
try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) {
assertThat(searcherSupplier.getCommitId(), equalTo(commitId));
allocatedNodes = internalCluster().nodesInclude(indexName);
for (String node : allocatedNodes) {
IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName));
for (IndexShard shard : indexService) {
try (Engine.SearcherSupplier searcher = shard.acquireSearcherSupplier()) {
assertNotNull(searcher.getSearcherId());
assertThat(searcher.getSearcherId(), equalTo(searcherIds[shard.shardId().id()]));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -816,13 +815,6 @@ public void delete() {
}
}

/**
* Returns a base64 encoded string of the commit id of the given {@link SegmentInfos}
*/
public static String getCommitId(SegmentInfos segmentInfos) {
return Base64.getEncoder().encodeToString(segmentInfos.getId());
}

/**
* Return a {@link Bits} view of the provided scorer.
* <b>NOTE</b>: that the returned {@link Bits} instance MUST be consumed in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Map;

import static java.util.Map.entry;
Expand All @@ -46,7 +47,7 @@ public CommitStats(SegmentInfos segmentInfos) {
userData = Map.copyOf(segmentInfos.getUserData());
// lucene calls the current generation, last generation.
generation = segmentInfos.getLastGeneration();
id = Lucene.getCommitId(segmentInfos);
id = Base64.getEncoder().encodeToString(segmentInfos.getId());
numDocs = Lucene.getNumDocs(segmentInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,11 @@ public final void close() {
protected abstract Searcher acquireSearcherInternal(String source);

/**
* Returns a commit id associated with this searcher if it's opened from an index commit; otherwise, return null. Two searchers
* with the same commit id must have identical Lucene level indices (i.e., identical segments with same docs using same doc-ids).
* Returns an id associated with this searcher if exists. Two searchers with the same searcher id must have
* identical Lucene level indices (i.e., identical segments with same docs using same doc-ids).
*/
@Nullable
public String getCommitId() {
public String getSearcherId() {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand All @@ -50,6 +52,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -112,7 +115,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.commitId = Lucene.getCommitId(lastCommittedSegmentInfos);
this.commitId = generateSearcherId(lastCommittedSegmentInfos);
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -140,6 +143,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

/**
* Generate a searcher id using the ids of the underlying segments of an index commit. Here we can't use the commit id directly
* as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged.
* Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid.
*/
static String generateSearcherId(SegmentInfos sis) {
final MessageDigest md = MessageDigests.sha256();
for (SegmentCommitInfo si : sis) {
final byte[] segmentId = si.getId();
if (segmentId != null) {
md.update(segmentId);
} else {
// old segments do not have segment ids
return null;
}
}
return MessageDigests.toHexString(md.digest());
}

protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) {
if (requireCompleteHistory == false) {
return;
Expand Down Expand Up @@ -552,7 +574,7 @@ protected Searcher acquireSearcherInternal(String source) {
}

@Override
public String getCommitId() {
public String getSearcherId() {
return commitId;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.common.UUIDs;
Expand All @@ -41,6 +42,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;

public class ReadOnlyEngineTests extends EngineTestCase {

Expand Down Expand Up @@ -294,4 +296,40 @@ public void testTranslogStats() throws IOException {
}
}
}

public void testSearcherId() throws Exception {
IOUtils.close(engine, store);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
final EngineConfig config =
config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
String lastSearcherId;
try (InternalEngine engine = createEngine(config)) {
lastSearcherId = ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos());
assertNotNull(lastSearcherId);
int iterations = randomIntBetween(0, 10);
for (int i = 0; i < iterations; i++) {
assertThat(ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()), equalTo(lastSearcherId));
final List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100),
engine.getProcessedLocalCheckpoint() + 1L, false, randomBoolean(), randomBoolean());
applyOperations(engine, operations);
engine.flush(randomBoolean(), true);
final String newCommitId = ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos());
assertThat(newCommitId, not(equalTo(lastSearcherId)));
if (randomBoolean()) {
engine.flush(true, true);
assertThat(ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()), equalTo(newCommitId));
}
lastSearcherId = newCommitId;
}
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) {
try (Engine.SearcherSupplier searcher =
readOnlyEngine.acquireSearcherSupplier(Function.identity(), randomFrom(Engine.SearcherScope.values()))) {
assertThat(searcher.getSearcherId(), equalTo(lastSearcherId));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,12 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica

public List<Engine.Operation> generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate,
boolean includeNestedDocs) throws Exception {
long seqNo = 0;
return generateHistoryOnReplica(numOps, 0L, allowGapInSeqNo, allowDuplicate, includeNestedDocs);
}

public List<Engine.Operation> generateHistoryOnReplica(int numOps, long startingSeqNo, boolean allowGapInSeqNo, boolean allowDuplicate,
boolean includeNestedDocs) throws Exception {
long seqNo = startingSeqNo;
final int maxIdValue = randomInt(numOps * 2);
final List<Engine.Operation> operations = new ArrayList<>(numOps);
CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory = nestedParsedDocFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected void doClose() {
}

@Override
public String getCommitId() {
public String getSearcherId() {
return commitId;
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.snapshots.SnapshotId;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase {

public void testSearcherId() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final int numberOfShards = between(1, 5);
assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards).build())
.setMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}")
);
final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
final int docCount = between(0, 100);
for (int i = 0; i < docCount; i++) {
indexRequestBuilders.add(client().prepareIndex(indexName).setSource("created_date", "2011-02-02"));
}
indexRandom(true, false, indexRequestBuilders);
assertThat(
client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(),
equalTo(0)
);
refresh(indexName);
forceMerge();

final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(repositoryName, "fs");

final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId();
assertAcked(client().admin().indices().prepareDelete(indexName));

final int numberOfReplicas = between(0, 2);
final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);
mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings);
ensureGreen(indexName);

final String[] searcherIds = new String[numberOfShards];
Set<String> allocatedNodes = internalCluster().nodesInclude(indexName);
for (String node : allocatedNodes) {
IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName));
for (IndexShard indexShard : indexService) {
try (Engine.SearcherSupplier searcher = indexShard.acquireSearcherSupplier()) {
assertNotNull(searcher.getSearcherId());
if (searcherIds[indexShard.shardId().id()] != null) {
assertThat(searcher.getSearcherId(), equalTo(searcherIds[indexShard.shardId().id()]));
} else {
searcherIds[indexShard.shardId().id()] = searcher.getSearcherId();
}
}
}
}

for (String allocatedNode : allocatedNodes) {
if (randomBoolean()) {
internalCluster().restartNode(allocatedNode);
}
}
ensureGreen(indexName);
allocatedNodes = internalCluster().nodesInclude(indexName);
for (String node : allocatedNodes) {
IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName));
for (IndexShard indexShard : indexService) {
try (Engine.SearcherSupplier searcher = indexShard.acquireSearcherSupplier()) {
assertNotNull(searcher.getSearcherId());
assertThat(searcher.getSearcherId(), equalTo(searcherIds[indexShard.shardId().id()]));
}
}
}
}
}

0 comments on commit fdec6c1

Please sign in to comment.