Skip to content

Commit

Permalink
Support cloning of searchable snapshot indices (#56595)
Browse files Browse the repository at this point in the history
Today you can convert a searchable snapshot index back into a regular index by
restoring the underlying snapshot, but this is somewhat wasteful if the shards
are already in cache since it copies the whole index from the repository again.

Instead, we can make use of the locally-cached data by using the clone API to
copy the contents of the cache into the layout expected by a regular shard.
This commit marks the searchable snapshot's private index settings as
`NotCopyableOnResize` so that they are removed by resize operations such as
cloning.

Cloning a regular index typically hard-links the underlying files rather than
copying them, but this is tricky to support in the case of a searchable
snapshot so this commit takes the simpler approach of always copying the
underlying files.
  • Loading branch information
DaveCTurner committed May 13, 2020
1 parent ca586f2 commit c10b4ae
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.Supplier;

import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
Expand All @@ -78,6 +79,7 @@
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;

/**
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
Expand Down Expand Up @@ -438,6 +440,20 @@ public static Directory create(
ThreadPool threadPool
) throws IOException {

if (SNAPSHOT_REPOSITORY_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_INDEX_ID_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_SNAPSHOT_NAME_SETTING.exists(indexSettings.getSettings()) == false
|| SNAPSHOT_SNAPSHOT_ID_SETTING.exists(indexSettings.getSettings()) == false) {

throw new IllegalArgumentException(
"directly setting ["
+ INDEX_STORE_TYPE_SETTING.getKey()
+ "] to ["
+ SNAPSHOT_DIRECTORY_FACTORY_KEY
+ "] is not permitted; use the mount snapshot API instead"
);
}

final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
if (repository instanceof BlobStoreRepository == false) {
throw new IllegalArgumentException("Repository [" + repository + "] is not searchable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,46 +82,54 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
public static final Setting<String> SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString(
"index.store.snapshot.repository_name",
Setting.Property.IndexScope,
Setting.Property.PrivateIndex
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<String> SNAPSHOT_SNAPSHOT_NAME_SETTING = Setting.simpleString(
"index.store.snapshot.snapshot_name",
Setting.Property.IndexScope,
Setting.Property.PrivateIndex
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<String> SNAPSHOT_SNAPSHOT_ID_SETTING = Setting.simpleString(
"index.store.snapshot.snapshot_uuid",
Setting.Property.IndexScope,
Setting.Property.PrivateIndex
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<String> SNAPSHOT_INDEX_ID_SETTING = Setting.simpleString(
"index.store.snapshot.index_uuid",
Setting.Property.IndexScope,
Setting.Property.PrivateIndex
Setting.Property.PrivateIndex,
Setting.Property.NotCopyableOnResize
);
public static final Setting<Boolean> SNAPSHOT_CACHE_ENABLED_SETTING = Setting.boolSetting(
"index.store.snapshot.cache.enabled",
true,
Setting.Property.IndexScope
Setting.Property.IndexScope,
Setting.Property.NotCopyableOnResize
);
public static final Setting<Boolean> SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING = Setting.boolSetting(
"index.store.snapshot.cache.prewarm.enabled",
true,
Setting.Property.IndexScope
Setting.Property.IndexScope,
Setting.Property.NotCopyableOnResize
);
// The file extensions that are excluded from the cache
public static final Setting<List<String>> SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING = Setting.listSetting(
"index.store.snapshot.cache.excluded_file_types",
emptyList(),
Function.identity(),
Setting.Property.IndexScope,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.NotCopyableOnResize
);
public static final Setting<ByteSizeValue> SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"index.store.snapshot.uncached_chunk_size",
new ByteSizeValue(-1, ByteSizeUnit.BYTES),
Setting.Property.IndexScope,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.NotCopyableOnResize
);

private volatile Supplier<RepositoriesService> repositoriesServiceSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -99,6 +100,10 @@
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -683,6 +688,32 @@ public void testClearCache() throws Exception {
}
}

public void testRequiresAdditionalSettings() {
final List<Setting<String>> requiredSettings = org.elasticsearch.common.collect.List.of(
SNAPSHOT_REPOSITORY_SETTING,
SNAPSHOT_INDEX_ID_SETTING,
SNAPSHOT_SNAPSHOT_NAME_SETTING,
SNAPSHOT_SNAPSHOT_ID_SETTING
);

for (int i = 0; i < requiredSettings.size(); i++) {
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT);
for (int j = 0; j < requiredSettings.size(); j++) {
if (i != j) {
settings.put(requiredSettings.get(j).getKey(), randomAlphaOfLength(10));
}
}
final IndexSettings indexSettings = new IndexSettings(IndexMetadata.builder("test").settings(settings).build(), Settings.EMPTY);
expectThrows(
IllegalArgumentException.class,
() -> SearchableSnapshotDirectory.create(null, null, indexSettings, null, null, null)
);
}
}

private static <T> void assertThat(
String reason,
IndexInput actual,
Expand Down Expand Up @@ -727,4 +758,5 @@ private static IndexSettings newIndexSettings() {
.build()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
public void testCreateAndRestoreSearchableSnapshot() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String aliasName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

Expand All @@ -84,6 +86,8 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {
// Peer recovery always copies .liv files but we do not permit writing to searchable snapshot directories so this doesn't work, but
// we can bypass this by forcing soft deletes to be used. TODO this restriction can be lifted when #55142 is resolved.
assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)));
assertAcked(client().admin().indices().prepareAliases().addAlias(indexName, aliasName));

final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = between(10, 10_000); i >= 0; i--) {
indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"));
Expand Down Expand Up @@ -178,8 +182,14 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {
assertRecovered(restoredIndexName, originalAllHits, originalBarHits);
assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions);

assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0));
assertAcked(client().admin().indices().prepareAliases().addAlias(restoredIndexName, aliasName));
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1));
assertRecovered(aliasName, originalAllHits, originalBarHits, false);

internalCluster().fullRestart();
assertRecovered(restoredIndexName, originalAllHits, originalBarHits);
assertRecovered(aliasName, originalAllHits, originalBarHits, false);
assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions);

internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down Expand Up @@ -217,6 +227,46 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {

assertRecovered(restoredIndexName, originalAllHits, originalBarHits);
assertSearchableSnapshotStats(restoredIndexName, cacheEnabled, nonCachedExtensions);

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(restoredIndexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.putNull(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey())
)
);

final String clonedIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(
client().admin()
.indices()
.prepareResizeIndex(restoredIndexName, clonedIndexName)
.setResizeType(ResizeType.CLONE)
.setSettings(Settings.builder().putNull(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()).build())
);
ensureGreen(clonedIndexName);
assertRecovered(clonedIndexName, originalAllHits, originalBarHits, false);

final Settings clonedIndexSettings = client().admin()
.indices()
.prepareGetSettings(clonedIndexName)
.get()
.getIndexToSettings()
.get(clonedIndexName);
assertFalse(clonedIndexSettings.hasValue(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()));
assertFalse(clonedIndexSettings.hasValue(SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING.getKey()));
assertFalse(clonedIndexSettings.hasValue(SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING.getKey()));
assertFalse(clonedIndexSettings.hasValue(SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING.getKey()));
assertFalse(clonedIndexSettings.hasValue(SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING.getKey()));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0));
assertAcked(client().admin().indices().prepareAliases().addAlias(clonedIndexName, aliasName));
assertRecovered(aliasName, originalAllHits, originalBarHits, false);

}

public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exception {
Expand Down Expand Up @@ -376,6 +426,12 @@ public void testMaxRestoreBytesPerSecIsUsed() throws Exception {
}

private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception {
assertRecovered(indexName, originalAllHits, originalBarHits, true);
}

private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits, boolean checkRecoveryStats)
throws Exception {

final Thread[] threads = new Thread[between(1, 5)];
final AtomicArray<TotalHits> allHits = new AtomicArray<>(threads.length);
final AtomicArray<TotalHits> barHits = new AtomicArray<>(threads.length);
Expand Down Expand Up @@ -406,12 +462,17 @@ private void assertRecovered(String indexName, TotalHits originalAllHits, TotalH
ensureGreen(indexName);
latch.countDown();

final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get();
for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) {
for (RecoveryState recoveryState : recoveryStates) {
logger.info("Checking {}[{}]", recoveryState.getShardId(), recoveryState.getPrimary() ? "p" : "r");
assertThat(recoveryState.getIndex().recoveredFileCount(), lessThanOrEqualTo(1)); // we make a new commit so we write a new
// `segments_n` file
if (checkRecoveryStats) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get();
for (List<RecoveryState> recoveryStates : recoveryResponse.shardRecoveryStates().values()) {
for (RecoveryState recoveryState : recoveryStates) {
logger.info("Checking {}[{}]", recoveryState.getShardId(), recoveryState.getPrimary() ? "p" : "r");
assertThat(
Strings.toString(recoveryState), // we make a new commit so we write a new `segments_n` file
recoveryState.getIndex().recoveredFileCount(),
lessThanOrEqualTo(1)
);
}
}
}

Expand Down

0 comments on commit c10b4ae

Please sign in to comment.