Skip to content

Commit

Permalink
Track Repository Gen. in BlobStoreRepository
Browse files Browse the repository at this point in the history
This is intended as a stop-gap solution/improvement to elastic#38941 that
prevents repo modifications without an intermittent master failover
from causing inconsistent (outdated due to inconsistent listing of index-N blobs)
`RepositoryData` to be written.

Tracking the latest repository generation will move to the cluster state in a
separate pull request. This is intended to be backported as far as possible and
motived by the recently increased chance of elastic#38941 causing trouble via SLM.
  • Loading branch information
original-brownbear committed Nov 11, 2019
1 parent ae0d0ca commit 7d13ca4
Showing 1 changed file with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -366,7 +367,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
} else {
try {
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
Expand All @@ -377,6 +378,27 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
}
}

/**
* Loads {@link RepositoryData} ensuring that it is consistent with the given {@code rootBlobs} as well of the assumed generation.
*
* @param repositoryStateId Expected repository generation
* @param rootBlobs Blobs at the repository root
* @return RepositoryData
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad != generation) {
throw new IllegalStateException("Determined repository's generation from its contents to [" + generation + "] but " +
"current generation is at least [" + genToLoad + "]");
}
if (genToLoad != repositoryStateId) {
throw new IllegalStateException("Determined latest repository generation to be [" + genToLoad + "] but this operation " +
"assumes generation [" + repositoryStateId + "]");
}
return getRepositoryData(genToLoad);
}

/**
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
* and then has all now unreferenced blobs in it deleted.
Expand Down Expand Up @@ -604,14 +626,8 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getGenId() != repositoryStateId) {
// Check that we are working on the expected repository version before gathering the data to clean up
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
"expected current generation [" + repositoryStateId + "], actual current generation ["
+ repositoryData.getGenId() + "]");
}
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
Expand Down Expand Up @@ -897,13 +913,24 @@ public void endVerification(String seed) {
}
}

// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);

@Override
public RepositoryData getRepositoryData() {
final long generation;
try {
return getRepositoryData(latestIndexBlobId());
generation = latestIndexBlobId();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad != generation) {
logger.warn("Determined repository generation [" + generation + "] from repository contents but correct generation must be " +
"at least [" + genToLoad + "]");
}
return getRepositoryData(genToLoad);
}

private RepositoryData getRepositoryData(long indexGen) {
Expand Down Expand Up @@ -945,11 +972,21 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long newLatestGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen != newLatestGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + newLatestGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down

0 comments on commit 7d13ca4

Please sign in to comment.