Skip to content

Commit

Permalink
Ensure UUID is fresh when loading a new RepositoryData
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Jan 21, 2021
1 parent 0b8b62d commit b680eb3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class MultiClusterRepoAccessIT extends AbstractSnapshotIntegTestCase {

Expand Down Expand Up @@ -107,4 +109,37 @@ public void testConcurrentDeleteFromOtherCluster() throws InterruptedException {
createRepository(repoNameOnFirstCluster, "fs", repoPath);
createFullSnapshot(repoNameOnFirstCluster, "snap-5");
}

public void testConcurrentWipeAndRecreateFromOtherCluster() throws InterruptedException, IOException {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "fs", repoPath);

createIndexWithRandomDocs("test-idx-1", randomIntBetween(1, 100));
createFullSnapshot(repoName, "snap-1");
final String repoUuid = client().admin().cluster().prepareGetRepositories(repoName).get().repositories()
.stream().filter(r -> r.name().equals(repoName)).findFirst().orElseThrow().uuid();

secondCluster.startMasterOnlyNode();
secondCluster.startDataOnlyNode();
assertAcked(secondCluster.client().admin().cluster().preparePutRepository(repoName)
.setType("fs")
.setSettings(Settings.builder().put("location", repoPath).put("read_only", true)));
assertThat(secondCluster.client().admin().cluster().prepareGetRepositories(repoName).get().repositories()
.stream().filter(r -> r.name().equals(repoName)).findFirst().orElseThrow().uuid(), equalTo(repoUuid));

assertAcked(client().admin().cluster().prepareDeleteRepository(repoName));
IOUtils.rm(internalCluster().getCurrentMasterNodeInstance(Environment.class).resolveRepoFile(repoPath.toString()));
createRepository(repoName, "fs", repoPath);
createFullSnapshot(repoName, "snap-1");

final String newRepoUuid = client().admin().cluster().prepareGetRepositories(repoName).get().repositories()
.stream().filter(r -> r.name().equals(repoName)).findFirst().orElseThrow().uuid();
assertThat(newRepoUuid, not(equalTo((repoUuid))));

secondCluster.client().admin().cluster().prepareGetSnapshots(repoName).get(); // force another read of the repo data
assertThat(secondCluster.client().admin().cluster().prepareGetRepositories(repoName).get().repositories()
.stream().filter(r -> r.name().equals(repoName)).findFirst().orElseThrow().uuid(), equalTo(newRepoUuid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL
ActionRunnable.wrap(getRepositoryDataStep, l -> repository(request.name()).getRepositoryData(l))), listener::onFailure);

// When the repository metadata is ready, update the repository UUID stored in the cluster state, if available
final StepListener<Void> updateRepositoryUuidStep = new StepListener<>();
getRepositoryDataStep.whenComplete(repositoryData ->
updateRepositoryUuidInMetadata(request.name(), repositoryData, updateRepositoryUuidStep), listener::onFailure);
final StepListener<Void> updateRepoUuidStep = new StepListener<>();
getRepositoryDataStep.whenComplete(
repositoryData -> updateRepositoryUuidInMetadata(clusterService, request.name(), repositoryData, updateRepoUuidStep),
listener::onFailure);

// Finally respond to the outer listener with the response from the original cluster state update
updateRepositoryUuidStep.whenComplete(ignored ->
acknowledgementStep.whenComplete(listener::onResponse, listener::onFailure), listener::onFailure);
updateRepoUuidStep.whenComplete(
ignored -> acknowledgementStep.whenComplete(listener::onResponse, listener::onFailure),
listener::onFailure);

} else {
acknowledgementStep.whenComplete(listener::onResponse, listener::onFailure);
Expand Down Expand Up @@ -226,7 +228,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

public void updateRepositoryUuidInMetadata(final String repositoryName, RepositoryData repositoryData, ActionListener<Void> listener) {
public static void updateRepositoryUuidInMetadata(
ClusterService clusterService,
final String repositoryName,
RepositoryData repositoryData,
ActionListener<Void> listener) {

final String repositoryUuid = repositoryData.getUuid();
if (repositoryUuid.equals(RepositoryData.MISSING_UUID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -1488,21 +1489,26 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
}
try {
final CachedRepositoryData cached = latestKnownRepositoryData.get();
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached.generation() == genToLoad && cached.hasData()) {
loaded = cached.repositoryData();
listener.onResponse(cached.repositoryData());
} else {
loaded = getRepositoryData(genToLoad);
final RepositoryData loaded = getRepositoryData(genToLoad);
if (cached == null || cached.generation() < genToLoad) {
// We can cache serialized in the most recent version here without regard to the actual repository metadata version
// since we're only caching the information that we just wrote and thus won't accidentally cache any information
// that isn't safe
cacheRepositoryData(compressRepoDataForCache(BytesReference.bytes(
loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT, true))), genToLoad);
}
if (loaded.getUuid().equals(metadata.uuid())) {
listener.onResponse(loaded);
} else {
// someone switched the repo contents out from under us
RepositoriesService.updateRepositoryUuidInMetadata(
clusterService, metadata.name(), loaded, listener.map(v -> loaded));
}
}
listener.onResponse(loaded);
return;
} catch (RepositoryException e) {
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
Expand Down

0 comments on commit b680eb3

Please sign in to comment.