Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot_path prefix to snapshot shards path file on remote store #16267

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
import static org.opensearch.snapshots.SnapshotShardPaths.getIndexId;

/**
* BlobStore - based implementation of Snapshot Repository
Expand Down Expand Up @@ -2293,7 +2294,10 @@ private void remoteTranslogCleanupAsync(
* @return List of matching shard paths
*/
private List<String> findMatchingShardPaths(String indexId, Map<String, BlobMetadata> snapshotShardPaths) {
return snapshotShardPaths.keySet().stream().filter(s -> s.startsWith(indexId)).collect(Collectors.toList());
return snapshotShardPaths.keySet()
.stream()
.filter(s -> (s.startsWith(indexId) || s.startsWith(SnapshotShardPaths.FILE_PREFIX + indexId)))
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -2546,11 +2550,11 @@ public void finalizeSnapshot(
*/
private void cleanupRedundantSnapshotShardPaths(Set<String> updatedShardPathsIndexIds) {
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER)[0])
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
List<String> staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> {
String indexId = s.split("\\" + SnapshotShardPaths.DELIMITER)[0];
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
}).collect(Collectors.toList());
try {
Expand Down Expand Up @@ -2595,7 +2599,7 @@ String writeIndexShardPaths(IndexId indexId, SnapshotId snapshotId, int shardCou
List<String> paths = getShardPaths(indexId, shardCount);
int pathType = indexId.getShardPathType();
int pathHashAlgorithm = FNV_1A_COMPOSITE_1.getCode();
String blobName = String.join(
String name = String.join(
SnapshotShardPaths.DELIMITER,
indexId.getId(),
indexId.getName(),
Expand All @@ -2611,9 +2615,9 @@ String writeIndexShardPaths(IndexId indexId, SnapshotId snapshotId, int shardCou
PathType.fromCode(pathType),
PathHashAlgorithm.fromCode(pathHashAlgorithm)
);
SNAPSHOT_SHARD_PATHS_FORMAT.write(shardPaths, snapshotShardPathBlobContainer(), blobName);
SNAPSHOT_SHARD_PATHS_FORMAT.write(shardPaths, snapshotShardPathBlobContainer(), name);
logShardPathsOperationSuccess(indexId, snapshotId);
return blobName;
return SnapshotShardPaths.FILE_PREFIX + name;
} catch (IOException e) {
logShardPathsOperationWarning(indexId, snapshotId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class SnapshotShardPaths implements ToXContent {

public static final String DELIMITER = ".";

public static final String FILE_NAME_FORMAT = "%s";
public static final String FILE_PREFIX = "snapshot_path_";
public static final String FILE_NAME_FORMAT = FILE_PREFIX + "%s";

private static final String PATHS_FIELD = "paths";
private static final String INDEX_ID_FIELD = "indexId";
Expand Down Expand Up @@ -101,14 +102,21 @@ public static ShardInfo parseShardPath(String shardPath) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath);
}
try {
IndexId indexId = new IndexId(parts[1], parts[0], Integer.parseInt(parts[3]));
IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3]));
int shardCount = Integer.parseInt(parts[2]);
return new ShardInfo(indexId, shardCount);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e);
}
}

public static String getIndexId(String indexIdField) {
if (indexIdField.startsWith(FILE_PREFIX)) {
return indexIdField.substring(FILE_PREFIX.length());
}
return indexIdField;
}

/**
* Represents parsed information from a shard path.
* This class encapsulates the index ID and shard count extracted from a shard path string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public void testParseShardPath() {
IndexId indexId = repoData.getIndices().values().iterator().next();
int shardCount = repoData.shardGenerations().getGens(indexId).size();

// Version 2.17 has file name starting with indexId
String shardPath = String.join(
SnapshotShardPaths.DELIMITER,
indexId.getId(),
Expand All @@ -391,7 +392,19 @@ public void testParseShardPath() {
"1"
);
ShardInfo shardInfo = SnapshotShardPaths.parseShardPath(shardPath);
assertEquals(shardInfo.getIndexId(), indexId);
assertEquals(shardInfo.getShardCount(), shardCount);

// Version 2.17 has file name starting with snapshot_path_
shardPath = String.join(
SnapshotShardPaths.DELIMITER,
SnapshotShardPaths.FILE_PREFIX + indexId.getId(),
indexId.getName(),
String.valueOf(shardCount),
String.valueOf(indexId.getShardPathType()),
"1"
);
shardInfo = SnapshotShardPaths.parseShardPath(shardPath);
assertEquals(shardInfo.getIndexId(), indexId);
assertEquals(shardInfo.getShardCount(), shardCount);
}
Expand Down
Loading