Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot v2] Run queued operations post v2 operations completion #16179

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,124 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception {
assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo()));
}

public void testDeleteWhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> a = startDeleteSnapshot(repoName, "snapshot-v1");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(a.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(1, snapInfo.size());
assertThat(snapInfo, contains(csr.getSnapshotInfo()));
}

public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startDeleteSnapshot = startDeleteSnapshot(repoName, "snapshot-v1");
ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1-2", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(startDeleteSnapshot.actionGet().isAcknowledged());
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());
}

protected ActionFuture<AcknowledgedResponse> startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareCloneSnapshot(repoName, sourceSnapshotName, snapshotName).setIndices("*").execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@
);
if (request.partial() == false) {
Set<String> missing = new HashSet<>();
for (final Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards.entrySet()) {
for (final Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == ShardState.MISSING) {
missing.add(entry.getKey().getIndex().getName());
}
Expand Down Expand Up @@ -606,8 +606,8 @@
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);

Check warning on line 610 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L610

Added line #L610 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we moved this inside?

failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
Expand All @@ -622,6 +622,9 @@
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

Expand Down Expand Up @@ -1010,8 +1013,8 @@
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
leaveRepoLoop(repositoryName);
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
leaveRepoLoop(repositoryName);

Check warning on line 1017 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1017

Added line #L1017 was not covered by tests
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting Snapshot-v2 clone, no longer cluster manager")
Expand All @@ -1027,6 +1030,9 @@
}
logger.info("snapshot-v2 clone [{}] completed successfully", snapshot);
listener.onResponse(null);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);

Check warning on line 1035 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1035

Added line #L1035 was not covered by tests
}

@Override
Expand Down Expand Up @@ -1740,7 +1746,7 @@
/**
* Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (cluster-manager fail-over or
* disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is
* {@link SnapshotsInProgress.ShardState#WAITING}.
* {@link ShardState#WAITING}.
*
* @param changedNodes true iff either a cluster-manager fail-over occurred or a data node that was doing snapshot work got removed from the
* cluster
Expand Down Expand Up @@ -2645,7 +2651,7 @@
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
return unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
Expand Down Expand Up @@ -2795,7 +2801,7 @@
reusedExistingDelete = true;
return currentState;
}
final List<SnapshotId> toDelete = Collections.unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
final List<SnapshotId> toDelete = unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
ensureBelowConcurrencyLimit(repoName, toDelete.get(0).getName(), snapshots, deletionsInProgress);
newDelete = new SnapshotDeletionsInProgress.Entry(
toDelete,
Expand Down Expand Up @@ -3353,7 +3359,7 @@
* @param indices Indices to snapshot
* @return list of shard to be included into current snapshot
*/
private static Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
private static Map<ShardId, ShardSnapshotStatus> shards(
SnapshotsInProgress snapshotsInProgress,
@Nullable SnapshotDeletionsInProgress deletionsInProgress,
Metadata metadata,
Expand All @@ -3362,7 +3368,7 @@
RepositoryData repositoryData,
String repoName
) {
final Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = new HashMap<>();
final Map<ShardId, ShardSnapshotStatus> builder = new HashMap<>();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final InFlightShardSnapshotStates inFlightShardStates = InFlightShardSnapshotStates.forRepo(
repoName,
Expand Down Expand Up @@ -3396,7 +3402,7 @@
}
final ShardSnapshotStatus shardSnapshotStatus;
if (indexRoutingTable == null) {
shardSnapshotStatus = new SnapshotsInProgress.ShardSnapshotStatus(
shardSnapshotStatus = new ShardSnapshotStatus(

Check warning on line 3405 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L3405

Added line #L3405 was not covered by tests
null,
ShardState.MISSING,
"missing routing table",
Expand Down
Loading