Skip to content

Commit

Permalink
Small cleanup in BlobStoreRepository#finalizeSnapshot (#99635)
Browse files Browse the repository at this point in the history
Reorders the operations into their logical order and adds a few TODOs
for gaps that this cleanup exposes.
  • Loading branch information
DaveCTurner authored Sep 19, 2023
1 parent 9ce92a9 commit 92458fc
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotException;
import org.hamcrest.Matchers;

import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class BlobStoreSizeLimitIT extends AbstractSnapshotIntegTestCase {

Expand All @@ -32,7 +35,11 @@ public void testBlobStoreSizeIsLimited() throws Exception {
);
final List<String> snapshotNames = createNSnapshots(repoName, maxSnapshots);
final ActionFuture<CreateSnapshotResponse> failingSnapshotFuture = startFullSnapshot(repoName, "failing-snapshot");
final RepositoryException repositoryException = expectThrows(RepositoryException.class, failingSnapshotFuture::actionGet);
final SnapshotException snapshotException = expectThrows(SnapshotException.class, failingSnapshotFuture::actionGet);
assertThat(snapshotException.getRepositoryName(), equalTo(repoName));
assertThat(snapshotException.getSnapshotName(), equalTo("failing-snapshot"));
assertThat(snapshotException.getCause(), instanceOf(RepositoryException.class));
final RepositoryException repositoryException = (RepositoryException) snapshotException.getCause();
assertThat(
repositoryException.getMessage(),
Matchers.endsWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -1326,44 +1327,107 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
// when writing the index-${N} to each shard directory.
final IndexVersion repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion();
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
final Consumer<Exception> onUpdateFailure = e -> finalizeSnapshotContext.onFailure(
new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)
);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);

final ListenableFuture<RepositoryData> repoDataListener = new ListenableFuture<>();
getRepositoryData(repoDataListener);
repoDataListener.addListener(ActionListener.wrap(existingRepositoryData -> {
final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size();
if (existingSnapshotCount >= maxSnapshotCount) {
finalizeSnapshotContext.onFailure(
new RepositoryException(
record MetadataWriteResult(
RepositoryData existingRepositoryData,
Map<IndexId, String> indexMetas,
Map<String, String> indexMetaIdentifiers
) {}

record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData newRepositoryData) {}

SubscribableListener

// Get the current RepositoryData
.<RepositoryData>newForked(this::getRepositoryData)

// Identify and write the missing metadata
.<MetadataWriteResult>andThen((l, existingRepositoryData) -> {
final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size();
if (existingSnapshotCount >= maxSnapshotCount) {
throw new RepositoryException(
metadata.name(),
"Cannot add another snapshot to this repository as it "
+ "already contains ["
"Cannot add another snapshot to this repository as it already contains ["
+ existingSnapshotCount
+ "] snapshots and is configured to hold up to ["
+ maxSnapshotCount
+ "] snapshots only."
)
);
return;
}
);
}

final Map<IndexId, String> indexMetas;
final Map<String, String> indexMetaIdentifiers;
if (writeIndexGens) {
indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap();
indexMetas = ConcurrentCollections.newConcurrentMap();
} else {
indexMetas = null;
indexMetaIdentifiers = null;
}
final MetadataWriteResult metadataWriteResult;
if (writeIndexGens) {
metadataWriteResult = new MetadataWriteResult(
existingRepositoryData,
ConcurrentCollections.newConcurrentMap(),
ConcurrentCollections.newConcurrentMap()
);
} else {
metadataWriteResult = new MetadataWriteResult(existingRepositoryData, null, null);
}

try (var allMetaListeners = new RefCountingListener(l.map(ignored -> metadataWriteResult))) {
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method
// will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version
// of the index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a
// way that decrements the generation it points at

// Write global metadata
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
)
);

// Write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
if (writeIndexGens) {
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
if (metaUUID == null) {
// We don't yet have this version of the metadata so we write it
metaUUID = UUIDs.base64UUID();
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
metadataWriteResult.indexMetaIdentifiers().put(identifiers, metaUUID);
} // else this task was largely a no-op - TODO no need to fork in that case
metadataWriteResult.indexMetas().put(index, identifiers);
} else {
INDEX_METADATA_FORMAT.write(
clusterMetadata.index(index.getName()),
indexContainer(index),
snapshotId.getUUID(),
compress
);
}
}));
}

// Write the SnapshotInfo blob
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
)
);

// TODO fail fast if any metadata write fails
// TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently
// finalizing the same snapshot: we can only clean up after removing the failed snapshot from the cluster state)
}
})

try (var allMetaListeners = new RefCountingListener(ActionListener.wrap(v -> {
// Update the root blob
.<RootBlobUpdateResult>andThen((l, metadataWriteResult) -> {
// unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here
final String slmPolicy = slmPolicy(snapshotInfo);
final SnapshotDetails snapshotDetails = new SnapshotDetails(
snapshotInfo.state(),
Expand All @@ -1372,64 +1436,41 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
snapshotInfo.endTime(),
slmPolicy
);
final var existingRepositoryData = metadataWriteResult.existingRepositoryData();
writeIndexGen(
existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers),
existingRepositoryData.addSnapshot(
snapshotId,
snapshotDetails,
shardGenerations,
metadataWriteResult.indexMetas(),
metadataWriteResult.indexMetaIdentifiers()
),
repositoryStateId,
repositoryMetaVersion,
finalizeSnapshotContext::updatedClusterState,
ActionListener.wrap(newRepoData -> {
finalizeSnapshotContext.onResponse(newRepoData);
cleanupOldMetadata(existingRepositoryData, newRepoData, finalizeSnapshotContext, snapshotInfo, writeShardGens);
}, onUpdateFailure)
l.map(newRepositoryData -> new RootBlobUpdateResult(existingRepositoryData, newRepositoryData))
);
}, onUpdateFailure))) {

// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method
// will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of
// the index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
// that decrements the generation it points at
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
// Write Global MetaData
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
)
// NB failure of writeIndexGen doesn't guarantee the update failed, so we cannot safely clean anything up on failure
})

// Report success, then clean up.
.<RepositoryData>andThen((l, rootBlobUpdateResult) -> {
l.onResponse(rootBlobUpdateResult.newRepositoryData());
cleanupOldMetadata(
rootBlobUpdateResult.oldRepositoryData(),
rootBlobUpdateResult.newRepositoryData(),
finalizeSnapshotContext,
snapshotInfo,
writeShardGens
);
})

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
if (writeIndexGens) {
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
if (metaUUID == null) {
// We don't yet have this version of the metadata so we write it
metaUUID = UUIDs.base64UUID();
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
indexMetaIdentifiers.put(identifiers, metaUUID);
}
indexMetas.put(index, identifiers);
} else {
INDEX_METADATA_FORMAT.write(
clusterMetadata.index(index.getName()),
indexContainer(index),
snapshotId.getUUID(),
compress
);
}
}));
}
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
)
);
}
}, onUpdateFailure));
// Finally subscribe the context as the listener, wrapping exceptions if needed
.addListener(
finalizeSnapshotContext.delegateResponse(
(l, e) -> l.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))
)
);
}

// Delete all old shard gen and root level index blobs that aren't referenced any longer as a result from moving to updated
Expand Down Expand Up @@ -1471,13 +1512,21 @@ private void cleanupOldMetadata(
}
}
}

if (toDelete.isEmpty() == false) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
deleteFromContainer(blobContainer(), toDelete.iterator());
} catch (Exception e) {
}

@Override
public void onFailure(Exception e) {
logger.warn("Failed to clean up old metadata blobs", e);
} finally {
}

@Override
public void onAfter() {
finalizeSnapshotContext.onDone(snapshotInfo);
}
});
Expand Down

0 comments on commit 92458fc

Please sign in to comment.