Skip to content

Commit

Permalink
Reroute when new repository is registered (#73761)
Browse files Browse the repository at this point in the history
Today we fail to allocate searchable snapshot shards if the repository
containing their underlying data is not registered with the cluster.
This failure is somewhat messy, we allocate them and let the recovery
fail, and furthermore we don't automatically retry the allocation if the
repository is subsequently registered.

This commit introduces an allocation decider to prevent the allocation
of such shards, and explain more clearly why, and also a cluster state
listener that performs a reroute when a new repository is registered.

Relates #73669
Relates #73714
  • Loading branch information
DaveCTurner committed Jun 7, 2021
1 parent 772fb0e commit eba76db
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
Expand All @@ -21,11 +22,16 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -975,6 +981,128 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr
);
}

public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegistered() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createAndPopulateIndex(
indexName,
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

// Take snapshot containing the actual data to one repository
final String dataRepoName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(dataRepoName, "fs");

final SnapshotId dataSnapshot = createSnapshot(dataRepoName, "data-snapshot", org.elasticsearch.common.collect.List.of(indexName))
.snapshotId();
assertAcked(client().admin().indices().prepareDelete(indexName));

final String restoredIndexName = randomValueOtherThan(indexName, () -> randomAlphaOfLength(10).toLowerCase(Locale.ROOT));
mountSnapshot(dataRepoName, dataSnapshot.getName(), indexName, restoredIndexName, Settings.EMPTY);
ensureGreen(restoredIndexName);

if (randomBoolean()) {
logger.info("--> closing index before snapshot");
assertAcked(client().admin().indices().prepareClose(restoredIndexName));
}

// Back up the cluster to a different repo
final String backupRepoName = randomValueOtherThan(dataRepoName, () -> randomAlphaOfLength(10).toLowerCase(Locale.ROOT));
createRepository(backupRepoName, "fs");
final SnapshotId backupSnapshot = createSnapshot(
backupRepoName,
"backup-snapshot",
org.elasticsearch.common.collect.List.of(restoredIndexName)
).snapshotId();

// Clear out data & the repo that contains it
final RepositoryMetadata dataRepoMetadata = client().admin()
.cluster()
.prepareGetRepositories(dataRepoName)
.get()
.repositories()
.get(0);
assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
assertAcked(client().admin().cluster().prepareDeleteRepository(dataRepoName));

// Restore the backup snapshot
assertThat(
client().admin()
.cluster()
.prepareRestoreSnapshot(backupRepoName, backupSnapshot.getName())
.setIndices(restoredIndexName)
.get()
.status(),
equalTo(RestStatus.ACCEPTED)
);

assertBusy(() -> {
final ClusterAllocationExplanation clusterAllocationExplanation = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex(restoredIndexName)
.setShard(0)
.setPrimary(true)
.get()
.getExplanation();

final String description = Strings.toString(clusterAllocationExplanation);
final AllocateUnassignedDecision allocateDecision = clusterAllocationExplanation.getShardAllocationDecision()
.getAllocateDecision();
assertTrue(description, allocateDecision.isDecisionTaken());
assertThat(description, allocateDecision.getAllocationDecision(), equalTo(AllocationDecision.NO));
for (NodeAllocationResult nodeAllocationResult : allocateDecision.getNodeDecisions()) {
for (Decision decision : nodeAllocationResult.getCanAllocateDecision().getDecisions()) {
final String explanation = decision.getExplanation();
if (explanation.contains("this index is backed by a searchable snapshot")
&& explanation.contains("no such repository is registered")
&& explanation.contains("the required repository was originally named [" + dataRepoName + "]")) {
return;
}
}
}

fail(description);
});

assertBusy(() -> {
final RestoreInProgress restoreInProgress = client().admin()
.cluster()
.prepareState()
.clear()
.setCustoms(true)
.get()
.getState()
.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
assertTrue(Strings.toString(restoreInProgress, true, true), restoreInProgress.isEmpty());
});

// Re-register the repository containing the actual data & verify that the shards are now allocated
final String newRepositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final Settings.Builder settings = Settings.builder().put(dataRepoMetadata.settings());
if (randomBoolean()) {
settings.put(READONLY_SETTING_KEY, "true");
}
assertAcked(clusterAdmin().preparePutRepository(newRepositoryName).setType("fs").setSettings(settings));

ensureGreen(restoredIndexName);
assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
}

private void assertSearchableSnapshotStats(String indexName, boolean cacheEnabled, List<String> nonCachedExtensions) {
final SearchableSnapshotsStatsResponse statsResponse = client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.xpack.searchablesnapshots.action.cache.TransportSearchableSnapshotsNodeCachesStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider;
import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider;
import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -111,12 +120,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -351,7 +363,10 @@ public Collection<Object> createComponents(
this.allocator.set(new SearchableSnapshotAllocator(client, clusterService.getRerouteService(), frozenCacheInfoService));
components.add(new FrozenCacheServiceSupplier(frozenCacheService.get()));
components.add(new CacheServiceSupplier(cacheService.get()));
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
if (DiscoveryNode.isMasterNode(settings)) {
new SearchableSnapshotIndexMetadataUpgrader(clusterService, threadPool).initialize();
clusterService.addListener(new RepositoryUuidWatcher(clusterService.getRerouteService()));
}
return Collections.unmodifiableList(components);
}

Expand Down Expand Up @@ -537,6 +552,7 @@ protected XPackLicenseState getLicenseState() {
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return org.elasticsearch.common.collect.List.of(
new SearchableSnapshotAllocationDecider(() -> getLicenseState().isAllowed(XPackLicenseState.Feature.SEARCHABLE_SNAPSHOTS)),
new SearchableSnapshotRepositoryExistsAllocationDecider(),
new SearchableSnapshotEnableAllocationDecider(settings, clusterSettings),
new HasFrozenCacheAllocationDecider(frozenCacheInfoService),
new DedicatedFrozenNodeAllocationDecider()
Expand Down Expand Up @@ -715,4 +731,34 @@ public FrozenCacheService get() {
return frozenCacheService;
}
}

private static final class RepositoryUuidWatcher implements ClusterStateListener {

private final RerouteService rerouteService;
private final HashSet<String> knownUuids = new HashSet<>();

RepositoryUuidWatcher(RerouteService rerouteService) {
this.rerouteService = rerouteService;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
final RepositoriesMetadata repositoriesMetadata = event.state().metadata().custom(RepositoriesMetadata.TYPE);
if (repositoriesMetadata == null) {
knownUuids.clear();
return;
}

final Set<String> newUuids = repositoriesMetadata.repositories()
.stream()
.map(RepositoryMetadata::uuid)
.filter(s -> s.equals(RepositoryData.MISSING_UUID) == false)
.collect(Collectors.toSet());
if (knownUuids.addAll(newUuids)) {
rerouteService.reroute("repository UUIDs changed", Priority.NORMAL, ActionListener.wrap((() -> {})));
}
knownUuids.retainAll(newUuids);
assert knownUuids.equals(newUuids) : knownUuids + " vs " + newUuids;
}
}
}
Loading

0 comments on commit eba76db

Please sign in to comment.