diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 7626e3dba6424..212f797180077 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -10,8 +10,11 @@ import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; @@ -19,10 +22,12 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.test.CorruptionUtils; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; import java.util.Locale; @@ -30,13 +35,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.greaterThan; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRestoreIT extends BaseRemoteStoreRestoreIT { /** @@ -461,5 +467,30 @@ public void testRateLimitedRemoteDownloads() throws Exception { } } + public void testRestoreCorruptSegmentShouldFail() throws IOException, ExecutionException, InterruptedException { + prepareCluster(1, 3, INDEX_NAME, 0, 1); + indexData(randomIntBetween(3, 4), true, INDEX_NAME); + + GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest()).get(); + String indexUUID = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID); + + logger.info("--> Corrupting segment files in remote segment store"); + Path path = segmentRepoPath.resolve(indexUUID).resolve("0").resolve("segments").resolve("data"); + try (Stream dataPath = Files.list(path)) { + CorruptionUtils.corruptFile(random(), dataPath.toArray(Path[]::new)); + } + + logger.info("--> Stop primary"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + + logger.info("--> Close and restore the index"); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).waitForCompletion(true), PlainActionFuture.newFuture()); + + logger.info("--> Check for index status, should be red due to corruption"); + ensureRed(INDEX_NAME); + } + // TODO: Restore flow - index aliases } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index be1f2341236ab..6b43fed3d8930 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -290,6 +290,10 @@ public void setWrittenByMajor(int writtenByMajor) { ); } } + + public int getWrittenByMajor() { + return writtenByMajor; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index b822742de6e97..d0cd2635ba672 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -105,6 +105,7 @@ import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -120,6 +121,7 @@ import java.util.zip.CRC32; import java.util.zip.Checksum; +import static java.lang.Character.MAX_RADIX; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; @@ -975,7 +977,11 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) boolean success = false; long startTime = System.currentTimeMillis(); try { - super.copyFrom(from, src, dest, context); + if (from instanceof RemoteSegmentStoreDirectory) { + copyFileAndValidateChecksum(from, src, dest, context, fileSize); + } else { + super.copyFrom(from, src, dest, context); + } success = true; afterDownload(fileSize, startTime); } finally { @@ -985,6 +991,43 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) } } + private void copyFileAndValidateChecksum(Directory from, String src, String dest, IOContext context, long fileSize) + throws IOException { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = ((RemoteSegmentStoreDirectory) from) + .getSegmentsUploadedToRemoteStore() + .get(dest); + boolean success = false; + try (IndexInput is = from.openInput(src, context); IndexOutput os = createOutput(dest, context)) { + // Here, we don't need the exact version as LuceneVerifyingIndexOutput does not verify version + // It is just used to emit logs when the entire metadata object is provided as parameter. Also, + // we can't provide null version as StoreFileMetadata has non-null check on writtenBy field. + Version luceneMajorVersion = Version.parse(metadata.getWrittenByMajor() + ".0.0"); + long checksum = Long.parseLong(metadata.getChecksum()); + StoreFileMetadata storeFileMetadata = new StoreFileMetadata( + dest, + fileSize, + Long.toString(checksum, MAX_RADIX), + luceneMajorVersion + ); + VerifyingIndexOutput verifyingIndexOutput = new LuceneVerifyingIndexOutput(storeFileMetadata, os); + verifyingIndexOutput.copyBytes(is, is.length()); + verifyingIndexOutput.verify(); + success = true; + } catch (ParseException e) { + throw new IOException("Exception while reading version info for segment file from remote store: " + dest, e); + } finally { + if (success == false) { + // If the exception is thrown after file is created, we clean up the file. + // We ignore the exception as the deletion is best-effort basis and can fail if file does not exist. + try { + deleteFile("Quietly deleting", dest); + } catch (Exception e) { + // Ignore + } + } + } + } + /** * Updates the amount of bytes attempted for download */ @@ -1476,7 +1519,7 @@ public static boolean isAutogenerated(String name) { * Produces a string representation of the given digest value. */ public static String digestToString(long digest) { - return Long.toString(digest, Character.MAX_RADIX); + return Long.toString(digest, MAX_RADIX); } /**