Skip to content

Commit

Permalink
Run queued operations post v2 operations completion (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#16179)

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored and dk2k committed Oct 21, 2024
1 parent 1a9eba8 commit 6642843
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,124 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception {
assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo()));
}

public void testDeleteWhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> a = startDeleteSnapshot(repoName, "snapshot-v1");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(a.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(1, snapInfo.size());
assertThat(snapInfo, contains(csr.getSnapshotInfo()));
}

public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startDeleteSnapshot = startDeleteSnapshot(repoName, "snapshot-v1");
ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1-2", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(startDeleteSnapshot.actionGet().isAcknowledged());
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());
}

protected ActionFuture<AcknowledgedResponse> startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareCloneSnapshot(repoName, sourceSnapshotName, snapshotName).setIndices("*").execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public ClusterState execute(ClusterState currentState) {
);
if (request.partial() == false) {
Set<String> missing = new HashSet<>();
for (final Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards.entrySet()) {
for (final Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == ShardState.MISSING) {
missing.add(entry.getKey().getIndex().getName());
}
Expand Down Expand Up @@ -606,8 +606,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
Expand All @@ -622,6 +622,9 @@ public void onResponse(RepositoryData repositoryData) {
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

Expand Down Expand Up @@ -1010,8 +1013,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager")
Expand All @@ -1027,6 +1030,9 @@ public void onResponse(RepositoryData repositoryData) {
}
logger.info("snapshot-v2 clone [{}] completed successfully", snapshot);
listener.onResponse(null);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
}

@Override
Expand Down Expand Up @@ -1740,7 +1746,7 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) {
/**
* Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (cluster-manager fail-over or
* disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is
* {@link SnapshotsInProgress.ShardState#WAITING}.
* {@link ShardState#WAITING}.
*
* @param changedNodes true iff either a cluster-manager fail-over occurred or a data node that was doing snapshot work got removed from the
* cluster
Expand Down Expand Up @@ -2645,7 +2651,7 @@ private static List<SnapshotId> matchingSnapshotIds(
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
return unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
Expand Down Expand Up @@ -2795,7 +2801,7 @@ public ClusterState execute(ClusterState currentState) {
reusedExistingDelete = true;
return currentState;
}
final List<SnapshotId> toDelete = Collections.unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
final List<SnapshotId> toDelete = unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
ensureBelowConcurrencyLimit(repoName, toDelete.get(0).getName(), snapshots, deletionsInProgress);
newDelete = new SnapshotDeletionsInProgress.Entry(
toDelete,
Expand Down Expand Up @@ -3353,7 +3359,7 @@ private static <T> void completeListenersIgnoringException(@Nullable List<Action
* @param indices Indices to snapshot
* @return list of shard to be included into current snapshot
*/
private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
private static Map<ShardId, ShardSnapshotStatus> shards(
SnapshotsInProgress snapshotsInProgress,
@Nullable SnapshotDeletionsInProgress deletionsInProgress,
Metadata metadata,
Expand All @@ -3362,7 +3368,7 @@ private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
RepositoryData repositoryData,
String repoName
) {
final Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = new HashMap<>();
final Map<ShardId, ShardSnapshotStatus> builder = new HashMap<>();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forRepo(
repoName,
Expand Down Expand Up @@ -3396,7 +3402,7 @@ private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
}
final ShardSnapshotStatus shardSnapshotStatus;
if (indexRoutingTable == null) {
shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(
shardSnapshotStatus = new ShardSnapshotStatus(
null,
ShardState.MISSING,
"missing routing table",
Expand Down

0 comments on commit 6642843

Please sign in to comment.