From 802ee008eb5c2ae7730c1274b28c9512db25c93d Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Sep 2024 14:17:59 +0100 Subject: [PATCH] Introduce repository integrity verification API (#112348) Adds an API which scans all the metadata (and optionally the raw data) in a snapshot repository to look for corruptions or other inconsistencies. Closes https://github.com/elastic/elasticsearch/issues/52622 Closes ES-8560 --- docs/changelog/112348.yaml | 6 + .../apis/snapshot-restore-apis.asciidoc | 1 + .../apis/verify-repo-integrity-api.asciidoc | 232 +++++ .../register-repository.asciidoc | 4 +- .../snapshot.repository_verify_integrity.json | 65 ++ .../repositories/RepositoryData.java | 7 + .../qa/native-multi-node-tests/build.gradle | 1 + .../xpack/security/operator/Constants.java | 1 + .../test/20_verify_integrity.yml | 39 + .../RepositoryVerifyIntegrityIT.java | 806 +++++++++++++++ .../src/main/java/module-info.java | 24 + .../testkit/SnapshotRepositoryTestKit.java | 25 +- .../SnapshotRepositoryTestKitFeatures.java | 22 + .../ActiveRepositoryVerifyIntegrityTasks.java | 56 ++ .../testkit/integrity/IndexDescription.java | 56 ++ .../RepositoryIntegrityVerifier.java | 949 ++++++++++++++++++ .../RepositoryVerifyIntegrityParams.java | 135 +++ .../RepositoryVerifyIntegrityResponse.java | 47 + ...epositoryVerifyIntegrityResponseChunk.java | 355 +++++++ ...positoryVerifyIntegrityResponseStream.java | 151 +++ .../RepositoryVerifyIntegrityTask.java | 133 +++ .../RestRepositoryVerifyIntegrityAction.java | 49 + ...nsportRepositoryVerifyIntegrityAction.java | 158 +++ ...toryVerifyIntegrityCoordinationAction.java | 186 ++++ ...oryVerifyIntegrityResponseChunkAction.java | 93 ++ ...lasticsearch.features.FeatureSpecification | 8 + 26 files changed, 3606 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/112348.yaml create mode 100644 docs/reference/snapshot-restore/apis/verify-repo-integrity-api.asciidoc create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.repository_verify_integrity.json create mode 100644 x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/20_verify_integrity.yml create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/module-info.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKitFeatures.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/ActiveRepositoryVerifyIntegrityTasks.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/IndexDescription.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityParams.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponse.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseChunk.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseStream.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityTask.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RestRepositoryVerifyIntegrityAction.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityAction.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityCoordinationAction.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityResponseChunkAction.java create mode 100644 x-pack/plugin/snapshot-repo-test-kit/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification diff --git a/docs/changelog/112348.yaml b/docs/changelog/112348.yaml new file mode 100644 index 0000000000000..84110a7cd4f1b --- /dev/null +++ b/docs/changelog/112348.yaml @@ -0,0 +1,6 @@ +pr: 112348 +summary: Introduce repository integrity verification API +area: Snapshot/Restore +type: enhancement +issues: + - 52622 diff --git a/docs/reference/snapshot-restore/apis/snapshot-restore-apis.asciidoc b/docs/reference/snapshot-restore/apis/snapshot-restore-apis.asciidoc index 6cdf65ba54e7e..b8bb6a2cd7d13 100644 --- a/docs/reference/snapshot-restore/apis/snapshot-restore-apis.asciidoc +++ b/docs/reference/snapshot-restore/apis/snapshot-restore-apis.asciidoc @@ -28,6 +28,7 @@ For more information, see <>. include::put-repo-api.asciidoc[] include::verify-repo-api.asciidoc[] include::repo-analysis-api.asciidoc[] +include::verify-repo-integrity-api.asciidoc[] include::get-repo-api.asciidoc[] include::delete-repo-api.asciidoc[] include::clean-up-repo-api.asciidoc[] diff --git a/docs/reference/snapshot-restore/apis/verify-repo-integrity-api.asciidoc b/docs/reference/snapshot-restore/apis/verify-repo-integrity-api.asciidoc new file mode 100644 index 0000000000000..99ae126b401f5 --- /dev/null +++ b/docs/reference/snapshot-restore/apis/verify-repo-integrity-api.asciidoc @@ -0,0 +1,232 @@ +[role="xpack"] +[[verify-repo-integrity-api]] +=== Verify repository integrity API +++++ +Verify repository integrity +++++ + +Verifies the integrity of the contents of a snapshot repository. + +//// +[source,console] +---- +PUT /_snapshot/my_repository +{ + "type": "fs", + "settings": { + "location": "my_backup_location" + } +} +---- +// TESTSETUP +//// + +[source,console] +---- +POST /_snapshot/my_repository/_verify_integrity +---- + +[[verify-repo-integrity-api-request]] +==== {api-request-title} + +`POST /_snapshot//_verify_integrity` + +[[verify-repo-integrity-api-prereqs]] +==== {api-prereq-title} + +* If the {es} {security-features} are enabled, you must have the `manage` +<> to use this API. For more +information, see <>. + +[[verify-repo-integrity-api-desc]] +==== {api-description-title} + +This API allows you to perform a comprehensive check of the contents of a +repository, looking for any anomalies in its data or metadata which might +prevent you from restoring snapshots from the repository or which might cause +future snapshot create or delete operations to fail. + +If you suspect the integrity of the contents of one of your snapshot +repositories, cease all write activity to this repository immediately, set its +`read_only` option to `true`, and use this API to verify its integrity. Until +you do so: + +* It may not be possible to <> from this repository. + +* <> may report errors when searched, or may have + unassigned shards. + +* <> into this repository may fail, + or may appear to succeed having created a snapshot which cannot be restored. + +* <> from this repository may fail, or + may appear to succeed leaving the underlying data on disk. + +* Continuing to write to the repository while it is in an invalid state may + causing additional damage to its contents. + +If the <> API finds any problems with the integrity +of the contents of your repository, {es} will not be able to repair the damage. +The only way to bring the repository back into a fully working state after its +contents have been damaged is by restoring its contents from a +<> which was taken before the +damage occurred. You must also identify what caused the damage and take action +to prevent it from happening again. + +If you cannot restore a repository backup, +<> and use this for +all future snapshot operations. In some cases it may be possible to recover +some of the contents of a damaged repository, either by +<> as many of its snapshots as needed and +<> of the restored data, or by +using the <> API to copy data from any <> +mounted from the damaged repository. + +Avoid all operations which write to the repository while the +<> API is running. If something changes the +repository contents while an integrity verification is running then {es} may +incorrectly report having detected some anomalies in its contents due to the +concurrent writes. It may also incorrectly fail to report some anomalies that +the concurrent writes prevented it from detecting. + +NOTE: This API is intended for exploratory use by humans. You should expect the +request parameters and the response format to vary in future versions. + +NOTE: This API may not work correctly in a mixed-version cluster. + +[[verify-repo-integrity-api-path-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +Name of the snapshot repository whose integrity to verify. + +[[verify-repo-integrity-api-query-params]] +==== {api-query-parms-title} + +The default values for the parameters of this API are designed to limit the +impact of the integrity verification on other activities in your cluster. For +instance, by default it will only use at most half of the `snapshot_meta` +threads to verify the integrity of each snapshot, allowing other snapshot +operations to use the other half of this thread pool. + +If you modify these parameters to speed up the verification process, you risk +disrupting other snapshot-related operations in your cluster. For large +repositories, consider setting up a separate single-node {es} cluster just for +running the integrity verification API. + +`snapshot_verification_concurrency`:: +(Optional, integer) Specifies the number of snapshots to verify concurrently. +Defaults to `0` which means to use at most half of the `snapshot_meta` thread +pool at once. + +`index_verification_concurrency`:: +(Optional, integer) Specifies the number of indices to verify concurrently. +Defaults to `0` which means to use the entire `snapshot_meta` thread pool. + +`meta_thread_pool_concurrency`:: +(Optional, integer) Specifies the maximum number of snapshot metadata +operations to execute concurrently. Defaults to `0` which means to use at most +half of the `snapshot_meta` thread pool at once. + +`index_snapshot_verification_concurrency`:: +(Optional, integer) Specifies the maximum number of index snapshots to verify +concurrently within each index verification. Defaults to `1`. + +`max_failed_shard_snapshots`:: +(Optional, integer) Limits the number of shard snapshot failures to track +during integrity verification, in order to avoid excessive resource usage. If +your repository contains more than this number of shard snapshot failures then +the verification will fail. Defaults to `10000`. + +`verify_blob_contents`:: +(Optional, boolean) Specifies whether to verify the checksum of every data blob +in the repository. Defaults to `false`. If this feature is enabled, {es} will +read the entire repository contents, which may be extremely slow and expensive. + +`blob_thread_pool_concurrency`:: +(Optional, integer) If `?verify_blob_contents` is `true`, this parameter +specifies how many blobs to verify at once. Defaults to `1`. + +`max_bytes_per_sec`:: +(Optional, <>) +If `?verify_blob_contents` is `true`, this parameter specifies the maximum +amount of data that {es} will read from the repository every second. Defaults +to `10mb`. + +[role="child_attributes"] +[[verify-repo-integrity-api-response-body]] +==== {api-response-body-title} + +The response exposes implementation details of the analysis which may change +from version to version. The response body format is therefore not considered +stable and may be different in newer versions. + +`log`:: +(array) A sequence of objects that report the progress of the analysis. ++ +.Properties of `log` +[%collapsible%open] +==== +`timestamp_in_millis`:: +(integer) The timestamp of this log entry, represented as the number of +milliseconds since the {wikipedia}/Unix_time[Unix epoch]. + +`timestamp`:: +(string) The timestamp of this log entry, represented as a string formatted +according to {wikipedia}/ISO_8601[ISO 8601]. Only included if the +<> flag is set. + +`snapshot`:: +(object) If the log entry pertains to a particular snapshot then the snapshot +will be described in this object. + +`index`:: +(object) If the log entry pertains to a particular index then the index will be +described in this object. + +`snapshot_restorability`:: +(object) If the log entry pertains to the restorability of an index then the +details will be described in this object. + +`anomaly`:: +(string) If the log entry pertains to an anomaly in the repository contents then +this string will describe the anomaly. + +`exception`:: +(object) If the log entry pertains to an exception that {es} encountered during +the verification then the details will be included in this object. + +==== + +`results`:: +(object) An object which describes the final results of the analysis. ++ +.Properties of `results` +[%collapsible%open] +==== +`status`:: +(object) The final status of the analysis task. + +`final_repository_generation`:: +(integer) The repository generation at the end of the analysis. If there were +any writes to the repository during the analysis then this value will be +different from the `generation` reported in the task status, and the analysis +may have detected spurious anomalies due to the concurrent writes, or may even +have failed to detect some anomalies in the repository contents. + +`total_anomalies`:: +(integer) The total number of anomalies detected during the analysis. + +`result`:: +(string) The final result of the analysis. If the repository contents appear to +be intact then this will be the string `pass`. If this field is missing, or +contains some other value, then the repository contents were not fully +verified. + +==== + +`exception`:: +(object) If the analysis encountered an exception which prevented it from +completing successfully then this exception will be reported here. diff --git a/docs/reference/snapshot-restore/register-repository.asciidoc b/docs/reference/snapshot-restore/register-repository.asciidoc index 28b0640a8fae5..2147ad3c684f3 100644 --- a/docs/reference/snapshot-restore/register-repository.asciidoc +++ b/docs/reference/snapshot-restore/register-repository.asciidoc @@ -272,7 +272,9 @@ filesystem snapshot of this repository. When restoring a repository from a backup, you must not register the repository with {es} until the repository contents are fully restored. If you alter the contents of a repository while it is registered with {es} then the repository -may become unreadable or may silently lose some of its contents. +may become unreadable or may silently lose some of its contents. After +restoring a repository from a backup, use the <> API +to verify its integrity before you start to use the repository. include::repository-azure.asciidoc[] include::repository-gcs.asciidoc[] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.repository_verify_integrity.json b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.repository_verify_integrity.json new file mode 100644 index 0000000000000..bab8101b74552 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.repository_verify_integrity.json @@ -0,0 +1,65 @@ +{ + "snapshot.repository_verify_integrity":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-snapshots.html", + "description":"Verifies the integrity of the contents of a snapshot repository" + }, + "stability":"experimental", + "visibility":"private", + "headers": { + "accept": [ + "application/json" + ] + }, + "url":{ + "paths":[ + { + "path":"/_snapshot/{repository}/_verify_integrity", + "methods":[ + "POST" + ], + "parts":{ + "repository":{ + "type":"string", + "description":"A repository name" + } + } + } + ] + }, + "params":{ + "meta_thread_pool_concurrency":{ + "type":"number", + "description":"Number of threads to use for reading metadata" + }, + "blob_thread_pool_concurrency":{ + "type":"number", + "description":"Number of threads to use for reading blob contents" + }, + "snapshot_verification_concurrency":{ + "type":"number", + "description":"Number of snapshots to verify concurrently" + }, + "index_verification_concurrency":{ + "type":"number", + "description":"Number of indices to verify concurrently" + }, + "index_snapshot_verification_concurrency":{ + "type":"number", + "description":"Number of snapshots to verify concurrently within each index" + }, + "max_failed_shard_snapshots":{ + "type":"number", + "description":"Maximum permitted number of failed shard snapshots" + }, + "verify_blob_contents":{ + "type":"boolean", + "description":"Whether to verify the contents of individual blobs" + }, + "max_bytes_per_sec":{ + "type":"string", + "description":"Rate limit for individual blob verification" + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index c6494eca9823b..72376d5b20fdb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -281,6 +281,13 @@ public Collection getSnapshotIds() { return snapshotIds.values(); } + /** + * @return the number of index snapshots (i.e. the sum of the index count of each snapshot) + */ + public long getIndexSnapshotCount() { + return indexSnapshots.values().stream().mapToLong(List::size).sum(); + } + /** * @return whether some of the {@link SnapshotDetails} of the given snapshot are missing, due to BwC, so that they must be loaded from * the {@link SnapshotInfo} blob instead. diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index c9e860f27a5d4..19f2e984f6493 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -17,6 +17,7 @@ dependencies { javaRestTestImplementation project(path: xpackModule('rank-rrf')) javaRestTestImplementation project(path: xpackModule('esql-core')) javaRestTestImplementation project(path: xpackModule('esql')) + javaRestTestImplementation project(path: xpackModule('snapshot-repo-test-kit')) } // location for keys and certificates diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c5304d8313df2..853d0fd9318ae 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -98,6 +98,7 @@ public class Constants { "cluster:admin/snapshot/restore", "cluster:admin/snapshot/status", "cluster:admin/snapshot/status[nodes]", + "cluster:admin/repository/verify_integrity", "cluster:admin/features/get", "cluster:admin/features/reset", "cluster:admin/tasks/cancel", diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/20_verify_integrity.yml b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/20_verify_integrity.yml new file mode 100644 index 0000000000000..be6929a15ff44 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/20_verify_integrity.yml @@ -0,0 +1,39 @@ +--- +setup: + - requires: + cluster_features: "snapshot.repository_verify_integrity" + reason: "required feature" + + - do: + snapshot.create_repository: + repository: test_repo + body: + type: fs + settings: + location: "test_repo_loc" + + - do: + bulk: + index: test + refresh: true + body: + - '{"index":{}}' + - '{}' + + - do: + snapshot.create: + repository: test_repo + snapshot: snap + wait_for_completion: true + +--- +"Integrity verification": + - do: + snapshot.repository_verify_integrity: + repository: test_repo + + - match: {results.result: pass} + - match: {results.status.snapshots.total: 1} + - match: {results.status.snapshots.verified: 1} + - match: {results.status.indices.total: 1} + - match: {results.status.indices.verified: 1} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java new file mode 100644 index 0000000000000..4b0e0fdbb0955 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java @@ -0,0 +1,806 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGenerations; +import org.elasticsearch.repositories.blobstore.BlobStoreCorruptionUtils; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.RepositoryFileType; +import org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_FORMAT; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class RepositoryVerifyIntegrityIT extends AbstractSnapshotIntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable HTTP + } + + @SuppressWarnings("unchecked") + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopyNoNullElements( + super.nodePlugins(), + SnapshotRepositoryTestKit.class, + MockTransportService.TestPlugin.class + ); + } + + private static long getCurrentTime(Function summarizer) { + return summarizer.apply( + StreamSupport.stream(internalCluster().getInstances(ThreadPool.class).spliterator(), false) + .mapToLong(ThreadPool::absoluteTimeInMillis) + ).orElseThrow(AssertionError::new); + } + + public void testSuccess() throws IOException { + final var minStartTimeMillis = getCurrentTime(LongStream::min); + final var testContext = createTestContext(); + final var request = testContext.getVerifyIntegrityRequest(); + if (randomBoolean()) { + request.addParameter("verify_blob_contents", null); + } + final var response = getRestClient().performRequest(request); + final var maxEndTimeMillis = getCurrentTime(LongStream::max); + assertEquals(200, response.getStatusLine().getStatusCode()); + final var responseObjectPath = ObjectPath.createFromResponse(response); + final var logEntryCount = responseObjectPath.evaluateArraySize("log"); + final var seenSnapshotNames = new HashSet(); + final var seenIndexNames = new HashSet(); + for (int i = 0; i < logEntryCount; i++) { + assertThat( + responseObjectPath.evaluate("log." + i + ".timestamp_in_millis"), + allOf(greaterThanOrEqualTo(minStartTimeMillis), lessThanOrEqualTo(maxEndTimeMillis)) + ); + assertThat( + responseObjectPath.evaluate("log." + i + ".timestamp"), + request.getParameters().containsKey("human") ? instanceOf(String.class) : nullValue() + ); + final String maybeSnapshotName = responseObjectPath.evaluate("log." + i + ".snapshot.snapshot"); + if (maybeSnapshotName != null) { + assertTrue(seenSnapshotNames.add(maybeSnapshotName)); + } else { + final String indexName = responseObjectPath.evaluate("log." + i + ".index.name"); + assertNotNull(indexName); + assertTrue(seenIndexNames.add(indexName)); + assertEquals( + testContext.snapshotNames().size(), + (int) responseObjectPath.evaluate("log." + i + ".snapshot_restorability.total_snapshot_count") + ); + assertEquals( + testContext.snapshotNames().size(), + (int) responseObjectPath.evaluate("log." + i + ".snapshot_restorability.restorable_snapshot_count") + ); + } + } + assertEquals(Set.copyOf(testContext.snapshotNames()), seenSnapshotNames); + assertEquals(Set.copyOf(testContext.indexNames()), seenIndexNames); + + assertEquals(0, (int) responseObjectPath.evaluate("results.total_anomalies")); + assertEquals("pass", responseObjectPath.evaluate("results.result")); + } + + public void testTaskStatus() throws IOException { + final var testContext = createTestContext(); + + // use non-master node to coordinate the request so that we can capture chunks being sent back + final var coordNodeName = getCoordinatingNodeName(); + final var coordNodeTransportService = MockTransportService.getInstance(coordNodeName); + final var masterTaskManager = MockTransportService.getInstance(internalCluster().getMasterName()).getTaskManager(); + + final SubscribableListener snapshotsCompleteStatusListener = new SubscribableListener<>(); + final AtomicInteger chunksSeenCounter = new AtomicInteger(); + + coordNodeTransportService.addRequestHandlingBehavior( + TransportRepositoryVerifyIntegrityResponseChunkAction.ACTION_NAME, + (handler, request, channel, task) -> { + final SubscribableListener unblockChunkHandlingListener = switch (request.chunkContents().type()) { + case START_RESPONSE -> { + final var status = asInstanceOf( + RepositoryVerifyIntegrityTask.Status.class, + randomBoolean() + ? masterTaskManager.getTask(task.getParentTaskId().getId()).getStatus() + : client().admin() + .cluster() + .prepareGetTask(task.getParentTaskId()) + .get(SAFE_AWAIT_TIMEOUT) + .getTask() + .getTask() + .status() + ); + assertEquals(testContext.repositoryName(), status.repositoryName()); + assertEquals(testContext.snapshotNames().size(), status.snapshotCount()); + assertEquals(0L, status.snapshotsVerified()); + assertEquals(testContext.indexNames().size(), status.indexCount()); + assertEquals(0L, status.indicesVerified()); + assertEquals(testContext.indexNames().size() * testContext.snapshotNames().size(), status.indexSnapshotCount()); + assertEquals(0L, status.indexSnapshotsVerified()); + assertEquals(0L, status.blobsVerified()); + assertEquals(0L, status.blobBytesVerified()); + yield SubscribableListener.newSucceeded(null); + } + case INDEX_RESTORABILITY -> { + // several of these chunks might arrive concurrently; we want to verify the task status before processing any of + // them, so use SubscribableListener to pick out the first status + snapshotsCompleteStatusListener.onResponse( + asInstanceOf( + RepositoryVerifyIntegrityTask.Status.class, + masterTaskManager.getTask(task.getParentTaskId().getId()).getStatus() + ) + ); + yield snapshotsCompleteStatusListener.andThenAccept(status -> { + assertEquals(testContext.repositoryName(), status.repositoryName()); + assertEquals(testContext.snapshotNames().size(), status.snapshotCount()); + assertEquals(testContext.snapshotNames().size(), status.snapshotsVerified()); + assertEquals(testContext.indexNames().size(), status.indexCount()); + assertEquals(0L, status.indicesVerified()); + }); + } + case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null); + case ANOMALY -> fail(null, "should not see anomalies"); + }; + + unblockChunkHandlingListener.addListener(ActionTestUtils.assertNoFailureListener(ignored -> { + chunksSeenCounter.incrementAndGet(); + handler.messageReceived(request, channel, task); + })); + } + ); + + try (var client = createRestClient(coordNodeName)) { + final var response = client.performRequest(testContext.getVerifyIntegrityRequest()); + assertEquals(1 + testContext.indexNames().size() + testContext.snapshotNames().size(), chunksSeenCounter.get()); + assertEquals(200, response.getStatusLine().getStatusCode()); + final var responseObjectPath = ObjectPath.createFromResponse(response); + assertEquals(0, (int) responseObjectPath.evaluate("results.total_anomalies")); + assertEquals("pass", responseObjectPath.evaluate("results.result")); + } finally { + coordNodeTransportService.clearAllRules(); + } + } + + public void testShardSnapshotFailed() throws IOException { + final var testContext = createTestContext(); + + final var newIndex = randomIdentifier(); + assertAcked( + client().admin() + .indices() + .prepareCreate(newIndex) + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings(indexSettings(1, 0).put(INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_id", "not-a-node-id")) + ); + + final var createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, testContext.repositoryName(), randomIdentifier()) + .setWaitForCompletion(true) + .setPartial(true) + .get(); + + assertEquals(SnapshotState.PARTIAL, createSnapshotResponse.getSnapshotInfo().state()); + + final var takeGoodSnapshot = randomBoolean(); + if (takeGoodSnapshot) { + updateIndexSettings(Settings.builder().putNull(INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_id"), newIndex); + ensureGreen(newIndex); + createSnapshot(testContext.repositoryName(), randomIdentifier(), List.of(newIndex)); + } + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + final var responseObjectPath = ObjectPath.createFromResponse(response); + assertThat(getAnomalies(responseObjectPath), equalTo(Set.of())); + assertEquals(0, (int) responseObjectPath.evaluate("results.total_anomalies")); + assertEquals("pass", responseObjectPath.evaluate("results.result")); + + final var logEntryCount = responseObjectPath.evaluateArraySize("log"); + for (int i = 0; i < logEntryCount; i++) { + if (newIndex.equals(responseObjectPath.evaluate("log." + i + ".index.name"))) { + assertEquals( + takeGoodSnapshot ? 2 : 1, + (int) responseObjectPath.evaluate("log." + i + ".snapshot_restorability.total_snapshot_count") + ); + assertEquals( + takeGoodSnapshot ? 1 : 0, + (int) responseObjectPath.evaluate("log." + i + ".snapshot_restorability.restorable_snapshot_count") + ); + } + } + } + + public void testCorruption() throws IOException { + final var testContext = createTestContext(); + + final Response response; + final Path corruptedFile; + final RepositoryFileType corruptedFileType; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + corruptedFile = BlobStoreCorruptionUtils.corruptRandomFile(testContext.repositoryRootPath()); + corruptedFileType = RepositoryFileType.getRepositoryFileType(testContext.repositoryRootPath(), corruptedFile); + logger.info("--> corrupted file: {}", corruptedFile); + logger.info("--> corrupted file type: {}", corruptedFileType); + + final var request = testContext.getVerifyIntegrityRequest(); + if (corruptedFileType == RepositoryFileType.SHARD_DATA || randomBoolean()) { + request.addParameter("verify_blob_contents", null); + } + response = getRestClient().performRequest(request); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + final var responseObjectPath = ObjectPath.createFromResponse(response); + final var logEntryCount = responseObjectPath.evaluateArraySize("log"); + final var anomalies = new HashSet(); + final var seenIndexNames = new HashSet(); + int fullyRestorableIndices = 0; + for (int i = 0; i < logEntryCount; i++) { + final String maybeAnomaly = responseObjectPath.evaluate("log." + i + ".anomaly"); + if (maybeAnomaly != null) { + anomalies.add(maybeAnomaly); + } else { + final String indexName = responseObjectPath.evaluate("log." + i + ".index.name"); + if (indexName != null) { + assertTrue(seenIndexNames.add(indexName)); + assertThat(testContext.indexNames(), hasItem(indexName)); + final int totalSnapshots = responseObjectPath.evaluate("log." + i + ".snapshot_restorability.total_snapshot_count"); + final int restorableSnapshots = responseObjectPath.evaluate( + "log." + i + ".snapshot_restorability.restorable_snapshot_count" + ); + if (totalSnapshots == restorableSnapshots) { + fullyRestorableIndices += 1; + } + } + } + } + + assertThat( + fullyRestorableIndices, + corruptedFileType == RepositoryFileType.SHARD_GENERATION || corruptedFileType.equals(RepositoryFileType.GLOBAL_METADATA) + ? equalTo(testContext.indexNames().size()) + : lessThan(testContext.indexNames().size()) + ); + assertThat(anomalies, not(empty())); + assertThat(responseObjectPath.evaluate("results.total_anomalies"), greaterThanOrEqualTo(anomalies.size())); + assertEquals("fail", responseObjectPath.evaluate("results.result")); + + // remove permitted/expected anomalies to verify that no unexpected ones were seen + switch (corruptedFileType) { + case SNAPSHOT_INFO -> anomalies.remove("failed to load snapshot info"); + case GLOBAL_METADATA -> anomalies.remove("failed to load global metadata"); + case INDEX_METADATA -> anomalies.remove("failed to load index metadata"); + case SHARD_GENERATION -> anomalies.remove("failed to load shard generation"); + case SHARD_SNAPSHOT_INFO -> anomalies.remove("failed to load shard snapshot"); + case SHARD_DATA -> { + anomalies.remove("missing blob"); + anomalies.remove("mismatched blob length"); + anomalies.remove("corrupt data blob"); + } + } + assertThat(anomalies, empty()); + } + + public void testTransportException() throws IOException { + final var testContext = createTestContext(); + + // use non-master node to coordinate the request so that we can capture chunks being sent back + final var coordNodeName = getCoordinatingNodeName(); + final var coordNodeTransportService = MockTransportService.getInstance(coordNodeName); + final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); + + final var messageCount = 2 // request & response + * (1 // forward to master + + 1 // start response + + testContext.indexNames().size() + testContext.snapshotNames().size()); + final var failureStep = between(1, messageCount); + + final var failTransportMessageBehaviour = new StubbableTransport.RequestHandlingBehavior<>() { + final AtomicInteger currentStep = new AtomicInteger(); + + @Override + public void messageReceived( + TransportRequestHandler handler, + TransportRequest request, + TransportChannel channel, + Task task + ) throws Exception { + if (currentStep.incrementAndGet() == failureStep) { + throw new ElasticsearchException("simulated"); + } else { + handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return "test"; + } + + @Override + public void sendResponse(TransportResponse response) { + if (currentStep.incrementAndGet() == failureStep) { + channel.sendResponse(new ElasticsearchException("simulated")); + } else { + channel.sendResponse(response); + } + } + + @Override + public void sendResponse(Exception exception) { + if (currentStep.incrementAndGet() == failureStep) { + throw new AssertionError("shouldn't have failed yet"); + } else { + channel.sendResponse(exception); + } + } + }, task); + } + } + }; + + masterTransportService.addRequestHandlingBehavior( + TransportRepositoryVerifyIntegrityAction.ACTION_NAME, + failTransportMessageBehaviour + ); + + coordNodeTransportService.addRequestHandlingBehavior( + TransportRepositoryVerifyIntegrityResponseChunkAction.ACTION_NAME, + failTransportMessageBehaviour + ); + + final var request = testContext.getVerifyIntegrityRequest(); + if (failureStep <= 2) { + request.addParameter("ignore", "500"); + } + final Response response; + try (var restClient = createRestClient(coordNodeName)) { + response = restClient.performRequest(request); + } + final var responseObjectPath = ObjectPath.createFromResponse(response); + if (failureStep <= 2) { + assertEquals(500, response.getStatusLine().getStatusCode()); + assertNotNull(responseObjectPath.evaluate("error")); + assertEquals(500, (int) responseObjectPath.evaluate("status")); + } else { + assertEquals(200, response.getStatusLine().getStatusCode()); + assertNotNull(responseObjectPath.evaluate("log")); + assertNotNull(responseObjectPath.evaluate("exception")); + } + + assertNull(responseObjectPath.evaluate("results")); + } + + public void testBadSnapshotInfo() throws IOException { + final var testContext = createTestContext(); + + final var snapshotInfoBlob = BlobStoreCorruptionUtils.getRandomFileToCorrupt( + testContext.repositoryRootPath(), + RepositoryFileType.SNAPSHOT_INFO + ); + + final SnapshotInfo snapshotInfo; + try (var inputStream = Files.newInputStream(snapshotInfoBlob)) { + snapshotInfo = SNAPSHOT_FORMAT.deserialize(testContext.repositoryName(), xContentRegistry(), inputStream); + } + + final var newIndices = new ArrayList<>(snapshotInfo.indices()); + newIndices.remove(between(0, newIndices.size() - 1)); + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + try (var outputStream = Files.newOutputStream(snapshotInfoBlob)) { + SNAPSHOT_FORMAT.serialize( + new SnapshotInfo( + snapshotInfo.snapshot(), + newIndices, + snapshotInfo.dataStreams(), + snapshotInfo.featureStates(), + snapshotInfo.reason(), + snapshotInfo.version(), + snapshotInfo.startTime(), + snapshotInfo.endTime(), + snapshotInfo.totalShards(), + snapshotInfo.successfulShards(), + snapshotInfo.shardFailures(), + snapshotInfo.includeGlobalState(), + snapshotInfo.userMetadata(), + snapshotInfo.state(), + snapshotInfo.indexSnapshotDetails() + ), + snapshotInfoBlob.toString(), + randomBoolean(), + outputStream + ); + } + + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(getAnomalies(ObjectPath.createFromResponse(response)), equalTo(Set.of("snapshot contents mismatch"))); + } + + public void testShardPathEmpty() throws IOException { + final var testContext = createTestContext(); + + final var shardPath = BlobStoreCorruptionUtils.getRandomFileToCorrupt( + testContext.repositoryRootPath(), + RepositoryFileType.SHARD_GENERATION + ).getParent(); + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + IOUtils.rm(shardPath); + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(getAnomalies(ObjectPath.createFromResponse(response)), equalTo(Set.of("failed to load shard snapshot"))); + } + + public void testShardPathUnreadable() throws IOException { + final var testContext = createTestContext(); + + final var shardPath = BlobStoreCorruptionUtils.getRandomFileToCorrupt( + testContext.repositoryRootPath(), + RepositoryFileType.SHARD_GENERATION + ).getParent(); + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + IOUtils.rm(shardPath); + Files.write(shardPath, new byte[0], StandardOpenOption.CREATE_NEW); + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(getAnomalies(ObjectPath.createFromResponse(response)), equalTo(Set.of("failed to list shard container contents"))); + } + + public void testShardGenerationMissing() throws IOException { + final var testContext = createTestContext(); + + final var repository = asInstanceOf( + BlobStoreRepository.class, + internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(testContext.repositoryName()) + ); + final var repoSettings = repository.getMetadata().settings(); + + final RepositoryData repositoryData = safeAwait(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l)); + + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + + final var rootBlob = BlobStoreCorruptionUtils.getRandomFileToCorrupt( + testContext.repositoryRootPath(), + RepositoryFileType.ROOT_INDEX_N + ); + + final var indexToBreak = randomFrom(testContext.indexNames()); + final var newShardGenerations = ShardGenerations.builder(); + for (final var index : repositoryData.shardGenerations().indices()) { + final var indexShardGenerations = repositoryData.shardGenerations().getGens(index); + for (int i = 0; i < indexShardGenerations.size(); i++) { + if (i > 0 || index.getName().equals(indexToBreak) == false) { + newShardGenerations.put(index, i, indexShardGenerations.get(i)); + } + } + } + + final var brokenRepositoryData = new RepositoryData( + repositoryData.getUuid(), + repositoryData.getGenId(), + repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())), + repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData::getSnapshotDetails)), + repositoryData.getIndices().values().stream().collect(Collectors.toMap(Function.identity(), repositoryData::getSnapshots)), + newShardGenerations.build(), + repositoryData.indexMetaDataGenerations(), + repositoryData.getClusterUUID() + ); + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + + Files.write( + rootBlob, + BytesReference.toBytes( + BytesReference.bytes(brokenRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), IndexVersion.current())) + ), + StandardOpenOption.TRUNCATE_EXISTING + ); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + .setType(FsRepository.TYPE) + .setSettings(repoSettings) + ); + + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(getAnomalies(ObjectPath.createFromResponse(response)), equalTo(Set.of("shard generation not defined"))); + } + + public void testSnapshotNotInShardGeneration() throws IOException { + final var testContext = createTestContext(); + runInconsistentShardGenerationBlobTest( + testContext, + blobStoreIndexShardSnapshots -> blobStoreIndexShardSnapshots.withRetainedSnapshots( + testContext.snapshotNames().stream().skip(1).map(n -> new SnapshotId(n, "_na_")).collect(Collectors.toSet()) + ), + "snapshot not in shard generation" + ); + } + + public void testBlobInShardGenerationButNotSnapshot() throws IOException { + final var testContext = createTestContext(); + final var snapshotToUpdate = randomFrom(testContext.snapshotNames()); + runInconsistentShardGenerationBlobTest(testContext, blobStoreIndexShardSnapshots -> { + BlobStoreIndexShardSnapshots result = BlobStoreIndexShardSnapshots.EMPTY; + for (final var snapshotFiles : blobStoreIndexShardSnapshots.snapshots()) { + if (snapshotFiles.snapshot().equals(snapshotToUpdate)) { + result = result.withAddedSnapshot( + new SnapshotFiles( + snapshotToUpdate, + CollectionUtils.appendToCopy( + snapshotFiles.indexFiles(), + new BlobStoreIndexShardSnapshot.FileInfo( + "extra", + new StoreFileMetadata("extra", 1L, "checksum", Version.CURRENT.toString()), + ByteSizeValue.ONE + ) + ), + snapshotFiles.shardStateIdentifier() + ) + ); + } else { + result = result.withAddedSnapshot(snapshotFiles); + } + } + return result; + }, "blob in shard generation but not snapshot"); + } + + public void testSnapshotShardGenerationMismatch() throws IOException { + final var testContext = createTestContext(); + runInconsistentShardGenerationBlobTest(testContext, blobStoreIndexShardSnapshots -> { + final var fileToUpdate = randomFrom(blobStoreIndexShardSnapshots.iterator().next().indexFiles()); + final var updatedFile = new BlobStoreIndexShardSnapshot.FileInfo( + fileToUpdate.name(), + fileToUpdate.metadata(), + ByteSizeValue.ONE + ); + assertFalse(fileToUpdate.isSame(updatedFile)); + + BlobStoreIndexShardSnapshots result = BlobStoreIndexShardSnapshots.EMPTY; + for (final var snapshotFiles : blobStoreIndexShardSnapshots.snapshots()) { + result = result.withAddedSnapshot( + new SnapshotFiles( + snapshotFiles.snapshot(), + snapshotFiles.indexFiles() + .stream() + .map(fileInfo -> fileInfo.name().equals(fileToUpdate.name()) ? updatedFile : fileInfo) + .toList(), + snapshotFiles.shardStateIdentifier() + ) + ); + } + return result; + }, "snapshot shard generation mismatch"); + } + + public void testBlobInSnapshotNotShardGeneration() throws IOException { + final var testContext = createTestContext(); + final var snapshotToUpdate = randomFrom(testContext.snapshotNames()); + runInconsistentShardGenerationBlobTest(testContext, blobStoreIndexShardSnapshots -> { + BlobStoreIndexShardSnapshots result = BlobStoreIndexShardSnapshots.EMPTY; + for (final var snapshotFiles : blobStoreIndexShardSnapshots.snapshots()) { + if (snapshotFiles.snapshot().equals(snapshotToUpdate)) { + final var indexFilesCopy = new ArrayList<>(snapshotFiles.indexFiles()); + indexFilesCopy.remove(between(0, indexFilesCopy.size() - 1)); + result = result.withAddedSnapshot( + new SnapshotFiles(snapshotToUpdate, indexFilesCopy, snapshotFiles.shardStateIdentifier()) + ); + } else { + result = result.withAddedSnapshot(snapshotFiles); + } + } + return result; + }, "blob in snapshot but not shard generation"); + } + + private void runInconsistentShardGenerationBlobTest( + TestContext testContext, + UnaryOperator shardGenerationUpdater, + String expectedAnomaly + ) throws IOException { + + final var shardGenerationBlob = BlobStoreCorruptionUtils.getRandomFileToCorrupt( + testContext.repositoryRootPath(), + RepositoryFileType.SHARD_GENERATION + ); + + final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; + try (var inputStream = Files.newInputStream(shardGenerationBlob)) { + blobStoreIndexShardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.deserialize( + testContext.repositoryName(), + xContentRegistry(), + inputStream + ); + } + + final Response response; + try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) { + try (var outputStream = Files.newOutputStream(shardGenerationBlob)) { + INDEX_SHARD_SNAPSHOTS_FORMAT.serialize( + shardGenerationUpdater.apply(blobStoreIndexShardSnapshots), + shardGenerationBlob.toString(), + randomBoolean(), + outputStream + ); + } + response = getRestClient().performRequest(testContext.getVerifyIntegrityRequest()); + } finally { + assertAcked( + client().admin().cluster().prepareDeleteRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, testContext.repositoryName()) + ); + } + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(getAnomalies(ObjectPath.createFromResponse(response)), equalTo(Set.of(expectedAnomaly))); + } + + private Set getAnomalies(ObjectPath responseObjectPath) throws IOException { + final var logEntryCount = responseObjectPath.evaluateArraySize("log"); + final var anomalies = new HashSet(); + for (int i = 0; i < logEntryCount; i++) { + final String maybeAnomaly = responseObjectPath.evaluate("log." + i + ".anomaly"); + if (maybeAnomaly != null) { + anomalies.add(maybeAnomaly); + } + } + + assertThat(responseObjectPath.evaluate("results.total_anomalies"), greaterThanOrEqualTo(anomalies.size())); + if (anomalies.size() > 0) { + assertEquals("fail", responseObjectPath.evaluate("results.result")); + } + + return anomalies; + } + + private record TestContext(String repositoryName, Path repositoryRootPath, List indexNames, List snapshotNames) { + Request getVerifyIntegrityRequest() { + final var request = new Request("POST", "/_snapshot/" + repositoryName + "/_verify_integrity"); + if (randomBoolean()) { + request.addParameter("human", null); + } + if (randomBoolean()) { + request.addParameter("pretty", null); + } + return request; + } + } + + private TestContext createTestContext() { + final var repositoryName = randomIdentifier(); + final var repositoryRootPath = randomRepoPath(); + + createRepository(repositoryName, FsRepository.TYPE, repositoryRootPath); + + final var indexNames = randomList(1, 3, ESTestCase::randomIdentifier); + for (var indexName : indexNames) { + createIndexWithRandomDocs(indexName, between(1, 100)); + flushAndRefresh(indexName); + } + + final var snapshotNames = randomList(1, 3, ESTestCase::randomIdentifier); + for (var snapshotName : snapshotNames) { + createSnapshot(repositoryName, snapshotName, indexNames); + } + + return new TestContext(repositoryName, repositoryRootPath, indexNames, snapshotNames); + } + + private static String getCoordinatingNodeName() { + if (internalCluster().size() == 1) { + internalCluster().startNode(); + } + return randomValueOtherThan(internalCluster().getMasterName(), () -> internalCluster().getRandomNodeName()); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/module-info.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/module-info.java new file mode 100644 index 0000000000000..70385cdc4cf04 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/module-info.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +module org.elasticsearch.repositories.blobstore.testkit { + requires org.elasticsearch.base; + requires org.elasticsearch.server; + requires org.elasticsearch.xcontent; + requires org.elasticsearch.xcore; + + requires org.apache.logging.log4j; + requires org.apache.lucene.core; + requires org.elasticsearch.logging; + + exports org.elasticsearch.repositories.blobstore.testkit.analyze; + exports org.elasticsearch.repositories.blobstore.testkit.integrity; + + provides org.elasticsearch.features.FeatureSpecification + with + org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKitFeatures; +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java index 04d59906e6db3..b0ae1b0752b71 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java @@ -22,8 +22,12 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.testkit.analyze.RepositoryAnalyzeAction; import org.elasticsearch.repositories.blobstore.testkit.analyze.RestRepositoryAnalyzeAction; +import org.elasticsearch.repositories.blobstore.testkit.integrity.RepositoryVerifyIntegrityTask; +import org.elasticsearch.repositories.blobstore.testkit.integrity.RestRepositoryVerifyIntegrityAction; +import org.elasticsearch.repositories.blobstore.testkit.integrity.TransportRepositoryVerifyIntegrityCoordinationAction; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -35,7 +39,13 @@ public class SnapshotRepositoryTestKit extends Plugin implements ActionPlugin { @Override public List> getActions() { - return List.of(new ActionHandler<>(RepositoryAnalyzeAction.INSTANCE, RepositoryAnalyzeAction.class)); + return List.of( + new ActionHandler<>(RepositoryAnalyzeAction.INSTANCE, RepositoryAnalyzeAction.class), + new ActionHandler<>( + TransportRepositoryVerifyIntegrityCoordinationAction.INSTANCE, + TransportRepositoryVerifyIntegrityCoordinationAction.class + ) + ); } @Override @@ -50,7 +60,7 @@ public List getRestHandlers( Supplier nodesInCluster, Predicate clusterSupportsFeature ) { - return List.of(new RestRepositoryAnalyzeAction()); + return List.of(new RestRepositoryAnalyzeAction(), new RestRepositoryVerifyIntegrityAction()); } public static void humanReadableNanos(XContentBuilder builder, String rawFieldName, String readableFieldName, long nanos) @@ -63,4 +73,15 @@ public static void humanReadableNanos(XContentBuilder builder, String rawFieldNa builder.field(rawFieldName, nanos); } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + Task.Status.class, + RepositoryVerifyIntegrityTask.Status.NAME, + RepositoryVerifyIntegrityTask.Status::new + ) + ); + } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKitFeatures.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKitFeatures.java new file mode 100644 index 0000000000000..cc513a948519b --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKitFeatures.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit; + +import org.elasticsearch.features.FeatureSpecification; +import org.elasticsearch.features.NodeFeature; + +import java.util.Set; + +import static org.elasticsearch.repositories.blobstore.testkit.integrity.RestRepositoryVerifyIntegrityAction.REPOSITORY_VERIFY_INTEGRITY_FEATURE; + +public class SnapshotRepositoryTestKitFeatures implements FeatureSpecification { + @Override + public Set getFeatures() { + return Set.of(REPOSITORY_VERIFY_INTEGRITY_FEATURE); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/ActiveRepositoryVerifyIntegrityTasks.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/ActiveRepositoryVerifyIntegrityTasks.java new file mode 100644 index 0000000000000..ac410465c3deb --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/ActiveRepositoryVerifyIntegrityTasks.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; + +import java.util.Map; + +/** + * The repository-verify-integrity tasks that this node is currently coordinating. + */ +class ActiveRepositoryVerifyIntegrityTasks { + + private final Map responseStreamsByCoordinatingTaskId = ConcurrentCollections + .newConcurrentMap(); + + Releasable registerResponseBuilder(long coordinatingTaskId, RepositoryVerifyIntegrityResponseStream responseStream) { + assert responseStream.hasReferences(); // ref held until the REST-layer listener is completed + + final var previous = responseStreamsByCoordinatingTaskId.putIfAbsent(coordinatingTaskId, responseStream); + if (previous != null) { + final var exception = new IllegalStateException("already executing verify task [" + coordinatingTaskId + "]"); + assert false : exception; + throw exception; + } + + return Releasables.assertOnce(() -> { + final var removed = responseStreamsByCoordinatingTaskId.remove(coordinatingTaskId, responseStream); + if (removed == false) { + final var exception = new IllegalStateException("already completed verify task [" + coordinatingTaskId + "]"); + assert false : exception; + throw exception; + } + }); + } + + /** + * Obtain the response stream for the given coordinating-node task ID, and increment its refcount. + * @throws ResourceNotFoundException if the task is not running or its refcount already reached zero (likely because it completed) + */ + RepositoryVerifyIntegrityResponseStream acquireResponseStream(long taskId) { + final var outerRequest = responseStreamsByCoordinatingTaskId.get(taskId); + if (outerRequest == null || outerRequest.tryIncRef() == false) { + throw new ResourceNotFoundException("verify task [" + taskId + "] not found"); + } + return outerRequest; + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/IndexDescription.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/IndexDescription.java new file mode 100644 index 0000000000000..e13d970346868 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/IndexDescription.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Details of an index in a specific snapshot, identifying its corresponding {@link org.elasticsearch.cluster.metadata.IndexMetadata} blob + * and the number of shards. + */ +public record IndexDescription(IndexId indexId, @Nullable String indexMetadataBlob, int shardCount) implements Writeable, ToXContentObject { + + public IndexDescription { + if (indexId == null || shardCount < 0) { + throw new IllegalArgumentException("invalid IndexDescription"); + } + } + + public IndexDescription(StreamInput in) throws IOException { + this(new IndexId(in), in.readOptionalString(), in.readVInt()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + indexId.writeTo(out); + out.writeOptionalString(indexMetadataBlob); + out.writeVInt(shardCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", indexId.getName()); + builder.field("uuid", indexId.getId()); + if (indexMetadataBlob != null) { + builder.field("metadata_blob", indexMetadataBlob); + } + if (shardCount > 0) { + builder.field("shards", shardCount); + } + return builder.endObject(); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java new file mode 100644 index 0000000000000..a5c81d18071fc --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java @@ -0,0 +1,949 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RateLimiter; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.blobstore.support.BlobMetadata; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThrottledIterator; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryVerificationException; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; + +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; + +class RepositoryIntegrityVerifier { + private static final Logger logger = LogManager.getLogger(RepositoryIntegrityVerifier.class); + + private final LongSupplier currentTimeMillisSupplier; + private final BlobStoreRepository blobStoreRepository; + private final RepositoryVerifyIntegrityResponseChunk.Writer responseChunkWriter; + private final String repositoryName; + private final RepositoryVerifyIntegrityParams requestParams; + private final RepositoryData repositoryData; + private final BooleanSupplier isCancelledSupplier; + private final CancellableRunner metadataTaskRunner; + private final CancellableRunner snapshotTaskRunner; + private final RateLimiter rateLimiter; + + private final Set unreadableSnapshotInfoUuids = ConcurrentCollections.newConcurrentSet(); + private final long snapshotCount; + private final AtomicLong snapshotProgress = new AtomicLong(); + private final long indexCount; + private final AtomicLong indexProgress = new AtomicLong(); + private final long indexSnapshotCount; + private final AtomicLong indexSnapshotProgress = new AtomicLong(); + private final AtomicLong blobsVerified = new AtomicLong(); + private final AtomicLong blobBytesVerified = new AtomicLong(); + private final AtomicLong throttledNanos; + private final AtomicLong failedShardSnapshotsCount = new AtomicLong(); + private final Set failedShardSnapshotDescriptions = ConcurrentCollections.newConcurrentSet(); + + RepositoryIntegrityVerifier( + LongSupplier currentTimeMillisSupplier, + BlobStoreRepository blobStoreRepository, + RepositoryVerifyIntegrityResponseChunk.Writer responseChunkWriter, + RepositoryVerifyIntegrityParams requestParams, + RepositoryData repositoryData, + CancellableThreads cancellableThreads + ) { + this.currentTimeMillisSupplier = currentTimeMillisSupplier; + this.blobStoreRepository = blobStoreRepository; + this.repositoryName = blobStoreRepository.getMetadata().name(); + this.responseChunkWriter = responseChunkWriter; + this.requestParams = requestParams; + this.repositoryData = repositoryData; + this.isCancelledSupplier = cancellableThreads::isCancelled; + this.snapshotTaskRunner = new CancellableRunner( + new ThrottledTaskRunner( + "verify-blob", + requestParams.blobThreadPoolConcurrency(), + blobStoreRepository.threadPool().executor(ThreadPool.Names.SNAPSHOT) + ), + cancellableThreads + ); + this.metadataTaskRunner = new CancellableRunner( + new ThrottledTaskRunner( + "verify-metadata", + requestParams.metaThreadPoolConcurrency(), + blobStoreRepository.threadPool().executor(ThreadPool.Names.SNAPSHOT_META) + ), + cancellableThreads + ); + + this.snapshotCount = repositoryData.getSnapshotIds().size(); + this.indexCount = repositoryData.getIndices().size(); + this.indexSnapshotCount = repositoryData.getIndexSnapshotCount(); + this.rateLimiter = new RateLimiter.SimpleRateLimiter(requestParams.maxBytesPerSec().getMbFrac()); + + this.throttledNanos = new AtomicLong(requestParams.verifyBlobContents() ? 1 : 0); // nonzero if verifying so status reported + } + + RepositoryVerifyIntegrityTask.Status getStatus() { + return new RepositoryVerifyIntegrityTask.Status( + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + snapshotCount, + snapshotProgress.get(), + indexCount, + indexProgress.get(), + indexSnapshotCount, + indexSnapshotProgress.get(), + blobsVerified.get(), + blobBytesVerified.get(), + throttledNanos.get() + ); + } + + void start(ActionListener listener) { + logger.info( + """ + [{}] verifying metadata integrity for index generation [{}]: \ + repo UUID [{}], cluster UUID [{}], snapshots [{}], indices [{}], index snapshots [{}]""", + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID(), + getSnapshotCount(), + getIndexCount(), + getIndexSnapshotCount() + ); + + SubscribableListener + // first verify the top-level properties of the snapshots + .newForked(this::verifySnapshots) + .andThen(this::checkFailedShardSnapshotCount) + // then verify the restorability of each index + .andThen(this::verifyIndices) + .andThenAccept(v -> this.ensureNotCancelled()) + // see if the repository data has changed + .andThen( + l -> blobStoreRepository.getRepositoryData(blobStoreRepository.threadPool().executor(ThreadPool.Names.MANAGEMENT), l) + ) + // log the completion and return the result + .addListener(new ActionListener<>() { + @Override + public void onResponse(RepositoryData finalRepositoryData) { + logger.info( + "[{}] completed verifying metadata integrity for index generation [{}]: repo UUID [{}], cluster UUID [{}]", + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID() + ); + listener.onResponse(new RepositoryVerifyIntegrityResponse(getStatus(), finalRepositoryData.getGenId())); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> Strings.format( + "[%s] failed verifying metadata integrity for index generation [%d]: repo UUID [%s], cluster UUID [%s]", + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID() + ), + e + ); + listener.onFailure(e); + } + }); + } + + private void ensureNotCancelled() { + if (isCancelledSupplier.getAsBoolean()) { + throw new TaskCancelledException("task cancelled"); + } + } + + private void verifySnapshots(ActionListener listener) { + new SnapshotsVerifier().run(listener); + } + + /** + * Verifies the top-level snapshot metadata in the repo, including {@link SnapshotInfo} and optional {@link Metadata} blobs. + */ + private class SnapshotsVerifier { + final Map> indexNamesBySnapshotName; + + SnapshotsVerifier() { + indexNamesBySnapshotName = Maps.newHashMapWithExpectedSize(repositoryData.getIndices().size()); + for (final var indexId : repositoryData.getIndices().values()) { + for (final var snapshotId : repositoryData.getSnapshots(indexId)) { + indexNamesBySnapshotName.computeIfAbsent(snapshotId.getName(), ignored -> new HashSet<>()).add(indexId.getName()); + } + } + } + + void run(ActionListener listener) { + var listeners = new RefCountingListener(listener); + runThrottled( + Iterators.failFast( + repositoryData.getSnapshotIds().iterator(), + () -> isCancelledSupplier.getAsBoolean() || listeners.isFailing() + ), + (releasable, snapshotId) -> new SnapshotVerifier(snapshotId).run( + ActionListener.assertOnce(ActionListener.releaseAfter(listeners.acquire(), releasable)) + ), + requestParams.snapshotVerificationConcurrency(), + snapshotProgress, + listeners + ); + } + + /** + * Verifies a single snapshot's metadata, including its {@link SnapshotInfo} and optional {@link Metadata} blobs. + */ + private class SnapshotVerifier { + private final SnapshotId snapshotId; + + SnapshotVerifier(SnapshotId snapshotId) { + this.snapshotId = snapshotId; + } + + void run(ActionListener listener) { + if (isCancelledSupplier.getAsBoolean()) { + // getSnapshotInfo does its own forking, so we must check for cancellation here + listener.onResponse(null); + return; + } + + blobStoreRepository.getSnapshotInfo(snapshotId, new ActionListener<>() { + @Override + public void onResponse(SnapshotInfo snapshotInfo) { + verifySnapshotInfo(snapshotInfo, listener); + } + + @Override + public void onFailure(Exception e) { + unreadableSnapshotInfoUuids.add(snapshotId.getUUID()); + anomaly("failed to load snapshot info").snapshotId(snapshotId).exception(e).write(listener); + } + }); + } + + void verifySnapshotInfo(SnapshotInfo snapshotInfo, ActionListener listener) { + final var chunkBuilder = new RepositoryVerifyIntegrityResponseChunk.Builder( + responseChunkWriter, + RepositoryVerifyIntegrityResponseChunk.Type.SNAPSHOT_INFO, + currentTimeMillisSupplier.getAsLong() + ).snapshotInfo(snapshotInfo); + + // record the SnapshotInfo in the response + final var chunkWrittenStep = SubscribableListener.newForked(chunkBuilder::write); + + if (failedShardSnapshotsCount.get() < requestParams.maxFailedShardSnapshots()) { + for (final var shardFailure : snapshotInfo.shardFailures()) { + if (failedShardSnapshotsCount.getAndIncrement() < requestParams.maxFailedShardSnapshots()) { + failedShardSnapshotDescriptions.add( + getShardSnapshotDescription(snapshotId, shardFailure.index(), shardFailure.shardId()) + ); + } + } + } else { + failedShardSnapshotsCount.addAndGet(snapshotInfo.shardFailures().size()); + } + + // check the indices in the SnapshotInfo match those in RepositoryData + final var snapshotContentsOkStep = chunkWrittenStep.andThen(l -> { + if (Set.copyOf(snapshotInfo.indices()).equals(indexNamesBySnapshotName.get(snapshotId.getName()))) { + l.onResponse(null); + } else { + anomaly("snapshot contents mismatch").snapshotId(snapshotId).write(l); + } + }); + + // check the global metadata is readable if present + final var globalMetadataOkStep = Boolean.TRUE.equals(snapshotInfo.includeGlobalState()) + ? snapshotContentsOkStep.andThen(this::verifySnapshotGlobalMetadata) + : snapshotContentsOkStep; + + globalMetadataOkStep.addListener(listener); + } + + private void verifySnapshotGlobalMetadata(ActionListener listener) { + metadataTaskRunner.run(ActionRunnable.wrap(listener, l -> { + try { + blobStoreRepository.getSnapshotGlobalMetadata(snapshotId); + // no checks here, loading it is enough + l.onResponse(null); + } catch (Exception e) { + anomaly("failed to load global metadata").snapshotId(snapshotId).exception(e).write(l); + } + })); + } + } + } + + private void checkFailedShardSnapshotCount(ActionListener listener) { + if (failedShardSnapshotDescriptions.size() < failedShardSnapshotsCount.get()) { + listener.onFailure( + new RepositoryVerificationException( + repositoryName, + Strings.format( + """ + Cannot verify the integrity of all index snapshots because this repository contains too many shard snapshot \ + failures: there are [%d] shard snapshot failures but [?%s] is set to [%d]. \ + Please increase this limit if it is safe to do so.""", + failedShardSnapshotsCount.get(), + RepositoryVerifyIntegrityParams.MAX_FAILED_SHARD_SNAPSHOTS, + requestParams.maxFailedShardSnapshots() + ) + ) + ); + } else { + listener.onResponse(null); + } + } + + private void verifyIndices(ActionListener listener) { + var listeners = new RefCountingListener(listener); + runThrottled( + Iterators.failFast( + repositoryData.getIndices().values().iterator(), + () -> isCancelledSupplier.getAsBoolean() || listeners.isFailing() + ), + (releasable, indexId) -> new IndexVerifier(indexId).run(ActionListener.releaseAfter(listeners.acquire(), releasable)), + requestParams.indexVerificationConcurrency(), + indexProgress, + listeners + ); + } + + /** + * Verifies the integrity of the snapshots of a specific index + */ + private class IndexVerifier { + private final IndexId indexId; + private final ShardContainerContentsDeduplicator shardContainerContentsDeduplicator = new ShardContainerContentsDeduplicator(); + private final IndexDescriptionsDeduplicator indexDescriptionsDeduplicator = new IndexDescriptionsDeduplicator(); + private final AtomicInteger totalSnapshotCounter = new AtomicInteger(); + private final AtomicInteger restorableSnapshotCounter = new AtomicInteger(); + + IndexVerifier(IndexId indexId) { + this.indexId = indexId; + } + + void run(ActionListener listener) { + SubscribableListener + + .newForked(l -> { + var listeners = new RefCountingListener(1, l); + runThrottled( + Iterators.failFast( + repositoryData.getSnapshots(indexId).iterator(), + () -> isCancelledSupplier.getAsBoolean() || listeners.isFailing() + ), + (releasable, snapshotId) -> verifyIndexSnapshot( + snapshotId, + ActionListener.releaseAfter(listeners.acquire(), releasable) + ), + requestParams.indexSnapshotVerificationConcurrency(), + indexSnapshotProgress, + listeners + ); + }) + .andThen(l -> { + ensureNotCancelled(); + new RepositoryVerifyIntegrityResponseChunk.Builder( + responseChunkWriter, + RepositoryVerifyIntegrityResponseChunk.Type.INDEX_RESTORABILITY, + currentTimeMillisSupplier.getAsLong() + ).indexRestorability(indexId, totalSnapshotCounter.get(), restorableSnapshotCounter.get()).write(l); + }) + .addListener(listener); + } + + private void verifyIndexSnapshot(SnapshotId snapshotId, ActionListener listener) { + totalSnapshotCounter.incrementAndGet(); + indexDescriptionsDeduplicator.get(snapshotId).andThen((l, indexDescription) -> { + if (indexDescription == null) { + // index metadata was unreadable; anomaly already reported, skip further verification of this index snapshot + l.onResponse(null); + } else { + new ShardSnapshotsVerifier(snapshotId, indexDescription).run(l); + } + }).addListener(listener); + } + + /** + * Information about the contents of the {@code ${REPO}/indices/${INDEX}/${SHARD}/} container, shared across the verifications of + * each snapshot of this shard. + * + * @param shardId the numeric shard ID. + * @param blobsByName the {@link BlobMetadata} for every blob in the container, keyed by blob name. + * @param shardGeneration the current {@link ShardGeneration} for this shard, identifying the current {@code index-${UUID}} blob. + * @param filesByPhysicalNameBySnapshotName a {@link BlobStoreIndexShardSnapshot.FileInfo} for every tracked file, keyed by snapshot + * name and then by the file's physical name. + * @param blobContentsListeners a threadsafe mutable map, keyed by file name, for every tracked file that the verification process + * encounters. Used to avoid double-counting the size of any files, and also to deduplicate work to + * verify their contents if {@code ?verify_blob_contents} is set. + */ + private record ShardContainerContents( + int shardId, + Map blobsByName, + @Nullable /* if shard gen is not defined */ + ShardGeneration shardGeneration, + @Nullable /* if shard gen blob could not be read */ + Map> filesByPhysicalNameBySnapshotName, + Map> blobContentsListeners + ) {} + + /** + * Verifies the integrity of the shard snapshots of a specific index snapshot + */ + private class ShardSnapshotsVerifier { + private final SnapshotId snapshotId; + private final IndexDescription indexDescription; + private final AtomicInteger restorableShardCount = new AtomicInteger(); + + ShardSnapshotsVerifier(SnapshotId snapshotId, IndexDescription indexDescription) { + this.snapshotId = snapshotId; + this.indexDescription = indexDescription; + } + + void run(ActionListener listener) { + try (var listeners = new RefCountingListener(1, listener.map(v -> { + if (unreadableSnapshotInfoUuids.contains(snapshotId.getUUID()) == false + && indexDescription.shardCount() == restorableShardCount.get()) { + restorableSnapshotCounter.incrementAndGet(); + } + return v; + }))) { + for (int shardId = 0; shardId < indexDescription.shardCount(); shardId++) { + if (failedShardSnapshotDescriptions.contains(getShardSnapshotDescription(snapshotId, indexId.getName(), shardId))) { + continue; + } + + shardContainerContentsDeduplicator.get(shardId) + // deduplicating reads of shard container contents + .andThen((l, shardContainerContents) -> { + if (shardContainerContents == null) { + // shard container contents was unreadable; anomaly already reported, skip further verification + l.onResponse(null); + } else { + new ShardSnapshotVerifier(shardContainerContents).run(l); + } + }) + .addListener(listeners.acquire()); + } + } + } + + /** + * Verifies the integrity of a specific shard snapshot + */ + private class ShardSnapshotVerifier { + private final ShardContainerContents shardContainerContents; + private volatile boolean isRestorable = true; + + ShardSnapshotVerifier(ShardContainerContents shardContainerContents) { + this.shardContainerContents = shardContainerContents; + } + + void run(ActionListener listener) { + metadataTaskRunner.run(ActionRunnable.wrap(listener, this::verifyShardSnapshot)); + } + + private void verifyShardSnapshot(ActionListener listener) { + final var shardId = shardContainerContents.shardId(); + final BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot; + try { + blobStoreIndexShardSnapshot = blobStoreRepository.loadShardSnapshot( + blobStoreRepository.shardContainer(indexId, shardId), + snapshotId + ); + } catch (Exception e) { + anomaly("failed to load shard snapshot").snapshotId(snapshotId) + .shardDescription(indexDescription, shardId) + .exception(e) + .write(listener); + return; + } + + final var listeners = new RefCountingListener(1, listener.map(v -> { + if (isRestorable) { + restorableShardCount.incrementAndGet(); + } + return v; + })); + final var shardGenerationConsistencyListener = listeners.acquire(); + + runThrottled( + Iterators.failFast( + blobStoreIndexShardSnapshot.indexFiles().iterator(), + () -> isCancelledSupplier.getAsBoolean() || listeners.isFailing() + ), + (releasable, fileInfo) -> verifyFileInfo(fileInfo, ActionListener.releaseAfter(listeners.acquire(), releasable)), + 1, + blobsVerified, + listeners + ); + + // NB this next step doesn't matter for restorability, it is just verifying that the shard gen blob matches the shard + // snapshot blob + verifyShardGenerationConsistency(blobStoreIndexShardSnapshot, shardGenerationConsistencyListener); + } + + /** + * Checks that the given {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo} matches + * the actual blob in the repository. + */ + private void verifyFileInfo(BlobStoreIndexShardSnapshot.FileInfo fileInfo, ActionListener listener) { + if (fileInfo.metadata().hashEqualsContents()) { + listener.onResponse(null); + return; + } + + for (int partIndex = 0; partIndex < fileInfo.numberOfParts(); partIndex++) { + final var blobName = fileInfo.partName(partIndex); + final var blobInfo = shardContainerContents.blobsByName().get(blobName); + if (blobInfo == null) { + isRestorable = false; + String physicalFileName = fileInfo.physicalName(); + anomaly("missing blob").snapshotId(snapshotId) + .shardDescription(indexDescription, shardContainerContents.shardId()) + .blobName(blobName, physicalFileName) + .part(partIndex, fileInfo.numberOfParts()) + .fileLength(ByteSizeValue.ofBytes(fileInfo.length())) + .partLength(ByteSizeValue.ofBytes(fileInfo.partBytes(partIndex))) + .write(listener); + return; + } else if (blobInfo.length() != fileInfo.partBytes(partIndex)) { + isRestorable = false; + String physicalFileName = fileInfo.physicalName(); + ByteSizeValue blobLength = ByteSizeValue.ofBytes(blobInfo.length()); + anomaly("mismatched blob length").snapshotId(snapshotId) + .shardDescription(indexDescription, shardContainerContents.shardId()) + .blobName(blobName, physicalFileName) + .part(partIndex, fileInfo.numberOfParts()) + .fileLength(ByteSizeValue.ofBytes(fileInfo.length())) + .partLength(ByteSizeValue.ofBytes(fileInfo.partBytes(partIndex))) + .blobLength(blobLength) + .write(listener); + return; + } + } + + // NB adding a listener whether ?verify_blob_contents is set or not - we want to track the blob sizes either way + blobContentsListeners(indexDescription, shardContainerContents, fileInfo).addListener( + listener.delegateResponse((l, e) -> { + isRestorable = false; + String physicalFileName = fileInfo.physicalName(); + anomaly("corrupt data blob").snapshotId(snapshotId) + .shardDescription(indexDescription, shardContainerContents.shardId()) + .blobName(fileInfo.name(), physicalFileName) + .part(-1, fileInfo.numberOfParts()) + .fileLength(ByteSizeValue.ofBytes(fileInfo.length())) + .exception(e) + .write(l); + }) + ); + } + + /** + * Checks that the shard generation blob has the right content for this shard snapshot. + */ + private void verifyShardGenerationConsistency( + BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot, + ActionListener listener + ) { + final var summaryFilesByPhysicalNameBySnapshotName = shardContainerContents.filesByPhysicalNameBySnapshotName(); + if (summaryFilesByPhysicalNameBySnapshotName == null) { + // couldn't read shard gen blob at all - already reported, nothing more to do here + listener.onResponse(null); + return; + } + + final var shardId = shardContainerContents.shardId(); + + final var summaryFilesByPhysicalName = summaryFilesByPhysicalNameBySnapshotName.get(snapshotId.getName()); + if (summaryFilesByPhysicalName == null) { + anomaly("snapshot not in shard generation").snapshotId(snapshotId) + .shardDescription(indexDescription, shardId) + .shardGeneration(shardContainerContents.shardGeneration()) + .write(listener); + return; + } + + final var snapshotFiles = getFilesByPhysicalName(blobStoreIndexShardSnapshot.indexFiles()); + + for (final var summaryFile : summaryFilesByPhysicalName.values()) { + final var snapshotFile = snapshotFiles.get(summaryFile.physicalName()); + if (snapshotFile == null) { + anomaly("blob in shard generation but not snapshot").snapshotId(snapshotId) + .shardDescription(indexDescription, shardId) + .shardGeneration(shardContainerContents.shardGeneration()) + .physicalFileName(summaryFile.physicalName()) + .write(listener); + return; + } else if (summaryFile.isSame(snapshotFile) == false) { + anomaly("snapshot shard generation mismatch").snapshotId(snapshotId) + .shardDescription(indexDescription, shardId) + .shardGeneration(shardContainerContents.shardGeneration()) + .physicalFileName(summaryFile.physicalName()) + .write(listener); + return; + } + } + + for (final var snapshotFile : blobStoreIndexShardSnapshot.indexFiles()) { + if (summaryFilesByPhysicalName.get(snapshotFile.physicalName()) == null) { + anomaly("blob in snapshot but not shard generation").snapshotId(snapshotId) + .shardDescription(indexDescription, shardId) + .shardGeneration(shardContainerContents.shardGeneration()) + .physicalFileName(snapshotFile.physicalName()) + .write(listener); + return; + } + } + + listener.onResponse(null); + } + } + } + + /** + * Exposes {@link IndexDescription} per index-metadata-blob (particularly the shard count), caching the value on first read + * to avoid duplicate work. + */ + private class IndexDescriptionsDeduplicator { + private final Map> listenersByBlobId = newConcurrentMap(); + + SubscribableListener get(SnapshotId snapshotId) { + final var indexMetaBlobId = repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId); + return listenersByBlobId.computeIfAbsent( + indexMetaBlobId, + ignored -> SubscribableListener.newForked( + indexDescriptionListener -> metadataTaskRunner.run( + ActionRunnable.wrap(indexDescriptionListener, l -> load(snapshotId, indexMetaBlobId, l)) + ) + ) + ); + } + + private void load(SnapshotId snapshotId, String indexMetaBlobId, ActionListener listener) { + try { + listener.onResponse( + new IndexDescription( + indexId, + indexMetaBlobId, + blobStoreRepository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId).getNumberOfShards() + ) + ); + } catch (Exception e) { + anomaly("failed to load index metadata").indexDescription(new IndexDescription(indexId, indexMetaBlobId, 0)) + .exception(e) + .write(listener.map(v -> null)); + } + } + } + + /** + * Exposes {@link ShardContainerContents} per shard, caching the value on the first read to avoid duplicate work. + */ + private class ShardContainerContentsDeduplicator { + private final Map> listenersByShardId = newConcurrentMap(); + + SubscribableListener get(int shardId) { + return listenersByShardId.computeIfAbsent( + shardId, + ignored -> SubscribableListener.newForked( + shardContainerContentsListener -> metadataTaskRunner.run( + ActionRunnable.wrap(shardContainerContentsListener, l -> load(shardId, l)) + ) + ) + ); + } + + private void load(int shardId, ActionListener listener) { + final var indexDescription = new IndexDescription(indexId, null, 0); + + final Map blobsByName; + try { + blobsByName = blobStoreRepository.shardContainer(indexId, shardId).listBlobs(OperationPurpose.REPOSITORY_ANALYSIS); + } catch (Exception e) { + anomaly("failed to list shard container contents").shardDescription(new IndexDescription(indexId, null, 0), shardId) + .exception(e) + .write(listener.map(v -> null)); + return; + } + + final var shardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); + if (shardGen == null) { + anomaly("shard generation not defined").shardDescription(indexDescription, shardId) + .write( + listener.map( + // NB we don't need the shard gen to do most of the rest of the verification, so we set it to null and + // carry on: + v -> new ShardContainerContents(shardId, blobsByName, null, null, ConcurrentCollections.newConcurrentMap()) + ) + ); + return; + } + + SubscribableListener + // try and load the shard gen blob + .newForked(l -> { + try { + l.onResponse(blobStoreRepository.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen)); + } catch (Exception e) { + // failing here is not fatal to snapshot restores, only to creating/deleting snapshots, so we can return null + // and carry on with the analysis + anomaly("failed to load shard generation").shardDescription(indexDescription, shardId) + .shardGeneration(shardGen) + .exception(e) + .write(l.map(v -> null)); + } + }) + .andThenApply( + blobStoreIndexShardSnapshots -> new ShardContainerContents( + shardId, + blobsByName, + shardGen, + getFilesByPhysicalNameBySnapshotName(blobStoreIndexShardSnapshots), + ConcurrentCollections.newConcurrentMap() + ) + ) + .addListener(listener); + } + + private static Map> getFilesByPhysicalNameBySnapshotName( + BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots + ) { + if (blobStoreIndexShardSnapshots == null) { + return null; + } + + final Map> filesByPhysicalNameBySnapshotName = Maps + .newHashMapWithExpectedSize(blobStoreIndexShardSnapshots.snapshots().size()); + for (final var snapshotFiles : blobStoreIndexShardSnapshots.snapshots()) { + filesByPhysicalNameBySnapshotName.put(snapshotFiles.snapshot(), getFilesByPhysicalName(snapshotFiles.indexFiles())); + } + return filesByPhysicalNameBySnapshotName; + } + } + + private SubscribableListener blobContentsListeners( + IndexDescription indexDescription, + ShardContainerContents shardContainerContents, + BlobStoreIndexShardSnapshot.FileInfo fileInfo + ) { + return shardContainerContents.blobContentsListeners().computeIfAbsent(fileInfo.name(), ignored -> { + if (requestParams.verifyBlobContents()) { + return SubscribableListener.newForked(listener -> snapshotTaskRunner.run(ActionRunnable.run(listener, () -> { + try (var slicedStream = new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(int slice) throws IOException { + return blobStoreRepository.shardContainer(indexDescription.indexId(), shardContainerContents.shardId()) + .readBlob(OperationPurpose.REPOSITORY_ANALYSIS, fileInfo.partName(slice)); + } + }; + var rateLimitedStream = new RateLimitingInputStream(slicedStream, () -> rateLimiter, throttledNanos::addAndGet); + var indexInput = new IndexInputWrapper(rateLimitedStream, fileInfo.length()) + ) { + CodecUtil.checksumEntireFile(indexInput); + } + }))); + } else { + blobBytesVerified.addAndGet(fileInfo.length()); + return SubscribableListener.newSucceeded(null); + } + }); + } + } + + private static String getShardSnapshotDescription(SnapshotId snapshotId, String index, int shardId) { + return snapshotId.getUUID() + "/" + index + "/" + shardId; + } + + private static Map getFilesByPhysicalName( + List fileInfos + ) { + final Map filesByPhysicalName = Maps.newHashMapWithExpectedSize(fileInfos.size()); + for (final var fileInfo : fileInfos) { + filesByPhysicalName.put(fileInfo.physicalName(), fileInfo); + } + return filesByPhysicalName; + } + + private static void runThrottled( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + AtomicLong progressCounter, + Releasable onCompletion + ) { + ThrottledIterator.run(iterator, itemConsumer, maxConcurrency, progressCounter::incrementAndGet, onCompletion::close); + } + + private RepositoryVerifyIntegrityResponseChunk.Builder anomaly(String anomaly) { + return new RepositoryVerifyIntegrityResponseChunk.Builder( + responseChunkWriter, + RepositoryVerifyIntegrityResponseChunk.Type.ANOMALY, + currentTimeMillisSupplier.getAsLong() + ).anomaly(anomaly); + } + + public long getSnapshotCount() { + return snapshotCount; + } + + public long getIndexCount() { + return indexCount; + } + + public long getIndexSnapshotCount() { + return indexSnapshotCount; + } + + private class IndexInputWrapper extends IndexInput { + private final InputStream inputStream; + private final long length; + long filePointer = 0L; + + IndexInputWrapper(InputStream inputStream, long length) { + super(""); + this.inputStream = inputStream; + this.length = length; + } + + @Override + public byte readByte() throws IOException { + if (isCancelledSupplier.getAsBoolean()) { + throw new TaskCancelledException("task cancelled"); + } + final var read = inputStream.read(); + if (read == -1) { + throw new EOFException(); + } + filePointer += 1; + blobBytesVerified.incrementAndGet(); + return (byte) read; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + while (len > 0) { + if (isCancelledSupplier.getAsBoolean()) { + throw new TaskCancelledException("task cancelled"); + } + final var read = inputStream.read(b, offset, len); + if (read == -1) { + throw new EOFException(); + } + filePointer += read; + blobBytesVerified.addAndGet(read); + len -= read; + offset += read; + } + } + + @Override + public void close() {} + + @Override + public long getFilePointer() { + return filePointer; + } + + @Override + public void seek(long pos) { + if (filePointer != pos) { + assert false : "cannot seek"; + throw new UnsupportedOperationException("seek"); + } + } + + @Override + public long length() { + return length; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + assert false; + throw new UnsupportedOperationException("slice"); + } + } + + private static class CancellableRunner { + private final ThrottledTaskRunner delegate; + private final CancellableThreads cancellableThreads; + + CancellableRunner(ThrottledTaskRunner delegate, CancellableThreads cancellableThreads) { + this.delegate = delegate; + this.cancellableThreads = cancellableThreads; + } + + void run(AbstractRunnable runnable) { + delegate.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + if (cancellableThreads.isCancelled()) { + runnable.onFailure(new TaskCancelledException("task cancelled")); + } else { + cancellableThreads.execute(runnable::run); + } + } + } + + @Override + public void onFailure(Exception e) { + runnable.onFailure(e); + } + }); + } + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityParams.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityParams.java new file mode 100644 index 0000000000000..61a58c0da8df0 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityParams.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Objects; + +/** + * Parameters of a repository-verity-integrity request. + * + * @param repository the name of the repository whose integrity to verify. + * @param metaThreadPoolConcurrency the number of concurrent tasks to execute on the {@link ThreadPool.Names#SNAPSHOT_META} pool, or + * {@code 0} to use a sensible default. + * @param blobThreadPoolConcurrency the number of concurrent tasks to execute on the {@link ThreadPool.Names#SNAPSHOT} pool, or {@code 0} + * to use a sensible default. + * @param snapshotVerificationConcurrency the number of snapshots to verify concurrently, or {@code 0} to use a sensible default. + * @param indexVerificationConcurrency the number of indices to verify concurrently, or {@code 0} to use a sensible default. + * @param indexSnapshotVerificationConcurrency the number of snapshots to verify concurrently for each index, or {@code 0} to use a sensible + * default. + * @param maxFailedShardSnapshots the maximum number of shard snapshots failures to track - we must build a list of all of them in memory + * to avoid reporting spurious anomalies, and this can be overwhelming in a very broken repository. + * @param verifyBlobContents whether to verify the contents of each data blob (which is very expensive). + * @param maxBytesPerSec rate limit to use for blob contents verification. + */ +public record RepositoryVerifyIntegrityParams( + String repository, + int metaThreadPoolConcurrency, + int blobThreadPoolConcurrency, + int snapshotVerificationConcurrency, + int indexVerificationConcurrency, + int indexSnapshotVerificationConcurrency, + int maxFailedShardSnapshots, + boolean verifyBlobContents, + ByteSizeValue maxBytesPerSec +) implements Writeable { + + public static final String MAX_FAILED_SHARD_SNAPSHOTS = "max_failed_shard_snapshots"; + + public RepositoryVerifyIntegrityParams { + Objects.requireNonNull(repository, "repository"); + requireNonNegative("meta_thread_pool_concurrency", metaThreadPoolConcurrency); + requireNonNegative("blob_thread_pool_concurrency", blobThreadPoolConcurrency); + requireNonNegative("snapshot_verification_concurrency", snapshotVerificationConcurrency); + requireNonNegative("index_verification_concurrency", indexVerificationConcurrency); + requireNonNegative("index_snapshot_verification_concurrency", indexSnapshotVerificationConcurrency); + requireNonNegative(MAX_FAILED_SHARD_SNAPSHOTS, maxFailedShardSnapshots); + if (maxBytesPerSec.getBytes() < 1) { + throw new IllegalArgumentException("invalid rate limit"); + } + } + + private static void requireNonNegative(String name, int value) { + if (value < 0) { + throw new IllegalArgumentException("argument [" + name + "] must be at least [0]"); + } + } + + RepositoryVerifyIntegrityParams(RestRequest restRequest) { + this( + restRequest.param("repository"), + restRequest.paramAsInt("meta_thread_pool_concurrency", 0), + restRequest.paramAsInt("blob_thread_pool_concurrency", 0), + restRequest.paramAsInt("snapshot_verification_concurrency", 0), + restRequest.paramAsInt("index_verification_concurrency", 0), + restRequest.paramAsInt("index_snapshot_verification_concurrency", 0), + restRequest.paramAsInt(MAX_FAILED_SHARD_SNAPSHOTS, 0), + restRequest.paramAsBoolean("verify_blob_contents", false), + restRequest.paramAsSize("max_bytes_per_sec", ByteSizeValue.ofMb(40)) + ); + } + + RepositoryVerifyIntegrityParams(StreamInput in) throws IOException { + this( + in.readString(), + in.readVInt(), + in.readVInt(), + in.readVInt(), + in.readVInt(), + in.readVInt(), + in.readVInt(), + in.readBoolean(), + ByteSizeValue.readFrom(in) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repository); + out.writeVInt(metaThreadPoolConcurrency); + out.writeVInt(blobThreadPoolConcurrency); + out.writeVInt(snapshotVerificationConcurrency); + out.writeVInt(indexVerificationConcurrency); + out.writeVInt(indexSnapshotVerificationConcurrency); + out.writeVInt(maxFailedShardSnapshots); + out.writeBoolean(verifyBlobContents); + maxBytesPerSec.writeTo(out); + } + + public RepositoryVerifyIntegrityParams withResolvedDefaults(ThreadPool.Info metadataThreadPoolInfo) { + if (metaThreadPoolConcurrency > 0 + && blobThreadPoolConcurrency > 0 + && snapshotVerificationConcurrency > 0 + && indexVerificationConcurrency > 0 + && indexSnapshotVerificationConcurrency > 0 + && maxFailedShardSnapshots > 0) { + return this; + } + + final var maxThreads = Math.max(1, metadataThreadPoolInfo.getMax()); + final var halfMaxThreads = Math.max(1, maxThreads / 2); + return new RepositoryVerifyIntegrityParams( + repository, + metaThreadPoolConcurrency > 0 ? metaThreadPoolConcurrency : halfMaxThreads, + blobThreadPoolConcurrency > 0 ? blobThreadPoolConcurrency : 1, + snapshotVerificationConcurrency > 0 ? snapshotVerificationConcurrency : halfMaxThreads, + indexVerificationConcurrency > 0 ? indexVerificationConcurrency : maxThreads, + indexSnapshotVerificationConcurrency > 0 ? indexSnapshotVerificationConcurrency : 1, + maxFailedShardSnapshots > 0 ? maxFailedShardSnapshots : 10000, + verifyBlobContents, + maxBytesPerSec + ); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponse.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponse.java new file mode 100644 index 0000000000000..eff6ed7eb465d --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponse.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class RepositoryVerifyIntegrityResponse extends ActionResponse { + private final RepositoryVerifyIntegrityTask.Status finalTaskStatus; + private final long finalRepositoryGeneration; + + RepositoryVerifyIntegrityResponse(RepositoryVerifyIntegrityTask.Status finalTaskStatus, long finalRepositoryGeneration) { + this.finalTaskStatus = finalTaskStatus; + this.finalRepositoryGeneration = finalRepositoryGeneration; + } + + RepositoryVerifyIntegrityResponse(StreamInput in) throws IOException { + finalRepositoryGeneration = in.readLong(); + finalTaskStatus = new RepositoryVerifyIntegrityTask.Status(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(finalRepositoryGeneration); + finalTaskStatus.writeTo(out); + } + + public long finalRepositoryGeneration() { + return finalRepositoryGeneration; + } + + public RepositoryVerifyIntegrityTask.Status finalTaskStatus() { + return finalTaskStatus; + } + + public long originalRepositoryGeneration() { + return finalTaskStatus.repositoryGeneration(); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseChunk.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseChunk.java new file mode 100644 index 0000000000000..90130811c1218 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseChunk.java @@ -0,0 +1,355 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * A chunk of response to be streamed to the waiting client. + * + * @param type indicates the type of this chunk. + * @param anomaly a textual description of the anomaly found, or {@code null} if this chunk does not describe an anomaly. + * @param snapshotId the ID of the snapshot to which this chunk pertains, or {@code null} if this chunk does not pertain to a particular + * snapshot. + * @param snapshotInfo the raw {@link SnapshotInfo} for the snapshot, or {@code null}. + * @param indexDescription information about the index to which this chunk pertains, or {@code null} if this chunk does not pertain to + * a particular index. + * @param shardId the ID of the shard to which this chunk pertains, or {@code -1} if this chunk does not pertain to a particular shard. + * @param shardGeneration the {@link ShardGeneration} for the given shard, or {@code null} if not relevant. + * @param blobName the name of the blob to which this chunk pertains, or {@code null} if this chunk does not pertain to a particular blob. + * @param physicalFileName the name of the Lucene file to which this chunk pertains, or {@code null} if this chunk does not pertain to a + * particular Lucene file. + * @param partIndex the index of the part of the file represented by the blob to which this chunk pertains, or {@code -1} if this chunk does + * not pertain to a particular part. + * @param partCount the number of parts into which the file to which this chunk pertains is divided, or {@code -1} if not applicable. + * @param fileLength the length of the Lucene file to which this chunk pertains, or {@link ByteSizeValue#MINUS_ONE} if not applicable. + * @param partLength the length of the file part to which this chunk pertains, or {@link ByteSizeValue#MINUS_ONE} if not applicable. + * @param blobLength the length of the blob to which this chunk pertains, or {@link ByteSizeValue#MINUS_ONE} if not applicable. + * @param totalSnapshotCount the total number of snapshots which involve the index to which this chunk pertains, or {@code -1} if not + * applicable. + * @param restorableSnapshotCount the number of restorable snapshots which involve the index to which this chunk pertains, or {@code -1} if + * not applicable. + * @param exception an exception which relates to the failure described by this chunk, or {@code null} if not applicable. + */ +public record RepositoryVerifyIntegrityResponseChunk( + long timestampMillis, + Type type, + @Nullable String anomaly, + @Nullable SnapshotId snapshotId, + @Nullable SnapshotInfo snapshotInfo, + @Nullable IndexDescription indexDescription, + int shardId, + @Nullable ShardGeneration shardGeneration, + @Nullable String blobName, + @Nullable String physicalFileName, + int partIndex, + int partCount, + ByteSizeValue fileLength, + ByteSizeValue partLength, + ByteSizeValue blobLength, + int totalSnapshotCount, + int restorableSnapshotCount, + @Nullable Exception exception +) implements Writeable, ToXContentFragment { + + public enum Type { + /** + * The first chunk sent. Used to indicate that the verification has successfully started, and therefore we should start to send a + * 200 OK response to the client. + */ + START_RESPONSE, + + /** + * This chunk contains the raw {@link SnapshotInfo} for a snapshot. + */ + SNAPSHOT_INFO, + + /** + * This chunk contains information about the restorability of an index. + */ + INDEX_RESTORABILITY, + + /** + * This chunk describes an anomaly found during verification. + */ + ANOMALY, + } + + public RepositoryVerifyIntegrityResponseChunk { + if (fileLength == null + || partLength == null + || blobLength == null + || shardId < -1 + || partIndex < -1 + || partCount < -1 + || totalSnapshotCount < -1 + || restorableSnapshotCount < -1 + || (totalSnapshotCount >= 0 != restorableSnapshotCount >= 0)) { + throw new IllegalArgumentException("invalid: " + this); + } + } + + public RepositoryVerifyIntegrityResponseChunk(StreamInput in) throws IOException { + this( + in.readVLong(), + // TODO enum serialization tests + in.readEnum(Type.class), + in.readOptionalString(), + in.readOptionalWriteable(SnapshotId::new), + in.readOptionalWriteable(SnapshotInfo::readFrom), + in.readOptionalWriteable(IndexDescription::new), + in.readInt(), + in.readOptionalWriteable(ShardGeneration::new), + in.readOptionalString(), + in.readOptionalString(), + in.readInt(), + in.readInt(), + ByteSizeValue.readFrom(in), + ByteSizeValue.readFrom(in), + ByteSizeValue.readFrom(in), + in.readInt(), + in.readInt(), + in.readOptional(StreamInput::readException) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(timestampMillis); + out.writeEnum(type); + out.writeOptionalString(anomaly); + out.writeOptionalWriteable(snapshotId); + out.writeOptionalWriteable(snapshotInfo); + out.writeOptionalWriteable(indexDescription); + out.writeInt(shardId); + out.writeOptionalWriteable(shardGeneration); + out.writeOptionalString(blobName); + out.writeOptionalString(physicalFileName); + out.writeInt(partIndex); + out.writeInt(partCount); + fileLength.writeTo(out); + partLength.writeTo(out); + blobLength.writeTo(out); + out.writeInt(totalSnapshotCount); + out.writeInt(restorableSnapshotCount); + out.writeOptional(StreamOutput::writeException, exception); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.timeField("timestamp_in_millis", "timestamp", timestampMillis); + + if (anomaly() != null) { + builder.field("anomaly", anomaly()); + } + + if (snapshotInfo() != null) { + builder.field("snapshot"); + snapshotInfo().toXContentExternal(builder, params); + } else if (snapshotId() != null) { + builder.startObject("snapshot"); + builder.field("snapshot", snapshotId().getName()); + builder.field("uuid", snapshotId().getUUID()); + builder.endObject(); + } + + if (indexDescription() != null) { + builder.field("index", indexDescription(), params); + } + if (shardId() >= 0) { + builder.field("shard_id", shardId()); + } + if (shardGeneration() != null) { + builder.field("shard_generation", shardGeneration(), params); + } + if (blobName() != null) { + builder.field("blob_name", blobName()); + } + if (physicalFileName() != null) { + builder.field("physical_file_name", physicalFileName()); + } + if (partIndex() >= 0) { + builder.field("part_index", partIndex()); + } + if (partCount() >= 0) { + builder.field("part_count", partCount()); + } + if (fileLength() != ByteSizeValue.MINUS_ONE) { + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength()); + } + if (partLength() != ByteSizeValue.MINUS_ONE) { + builder.humanReadableField("part_length_in_bytes", "part_length", partLength()); + } + if (blobLength() != ByteSizeValue.MINUS_ONE) { + builder.humanReadableField("blob_length_in_bytes", "blob_length", blobLength()); + } + if (totalSnapshotCount() >= 0 && restorableSnapshotCount() >= 0) { + builder.startObject("snapshot_restorability"); + builder.field("total_snapshot_count", totalSnapshotCount()); + builder.field("restorable_snapshot_count", restorableSnapshotCount()); + builder.endObject(); + } + if (exception() != null) { + builder.startObject("exception") + .value((bb, pp) -> ElasticsearchException.generateFailureXContent(bb, pp, exception(), true)) + .field("status", ExceptionsHelper.status(exception())) + .endObject(); + } + return builder; + } + + static class Builder { + private final Writer responseWriter; + private final Type type; + private final long timestampMillis; + + private String anomaly; + private SnapshotId snapshotId; + private SnapshotInfo snapshotInfo; + private IndexDescription indexDescription; + private int shardId = -1; + private ShardGeneration shardGeneration; + private String blobName; + private String physicalFileName; + private int partIndex = -1; + private int partCount = -1; + private ByteSizeValue fileLength = ByteSizeValue.MINUS_ONE; + private ByteSizeValue partLength = ByteSizeValue.MINUS_ONE; + private ByteSizeValue blobLength = ByteSizeValue.MINUS_ONE; + private int totalSnapshotCount = -1; + private int restorableSnapshotCount = -1; + private Exception exception; + + Builder(Writer responseWriter, Type type, long timestampMillis) { + this.responseWriter = responseWriter; + this.type = type; + this.timestampMillis = timestampMillis; + } + + Builder anomaly(String anomaly) { + this.anomaly = anomaly; + return this; + } + + Builder snapshotId(SnapshotId snapshotId) { + this.snapshotId = snapshotId; + return this; + } + + Builder snapshotInfo(SnapshotInfo snapshotInfo) { + this.snapshotInfo = snapshotInfo; + return this; + } + + Builder indexDescription(IndexDescription indexDescription) { + this.indexDescription = indexDescription; + return this; + } + + Builder shardDescription(IndexDescription indexDescription, int shardId) { + this.indexDescription = indexDescription; + this.shardId = shardId; + return this; + } + + Builder shardGeneration(ShardGeneration shardGeneration) { + this.shardGeneration = shardGeneration; + return this; + } + + Builder blobName(String blobName, String physicalFileName) { + this.blobName = blobName; + this.physicalFileName = physicalFileName; + return this; + } + + Builder physicalFileName(String physicalFileName) { + this.physicalFileName = physicalFileName; + return this; + } + + Builder part(int partIndex, int partCount) { + this.partIndex = partIndex; + this.partCount = partCount; + return this; + } + + Builder fileLength(ByteSizeValue fileLength) { + this.fileLength = Objects.requireNonNull(fileLength); + return this; + } + + Builder partLength(ByteSizeValue partLength) { + this.partLength = Objects.requireNonNull(partLength); + return this; + } + + Builder blobLength(ByteSizeValue blobLength) { + this.blobLength = Objects.requireNonNull(blobLength); + return this; + } + + Builder indexRestorability(IndexId indexId, int totalSnapshotCount, int restorableSnapshotCount) { + this.indexDescription = new IndexDescription(indexId, null, 0); + this.totalSnapshotCount = totalSnapshotCount; + this.restorableSnapshotCount = restorableSnapshotCount; + return this; + } + + Builder exception(Exception exception) { + this.exception = exception; + return this; + } + + void write(ActionListener listener) { + responseWriter.writeResponseChunk( + new RepositoryVerifyIntegrityResponseChunk( + timestampMillis, + type, + anomaly, + snapshotId, + snapshotInfo, + indexDescription, + shardId, + shardGeneration, + blobName, + physicalFileName, + partIndex, + partCount, + fileLength, + partLength, + blobLength, + totalSnapshotCount, + restorableSnapshotCount, + exception + ), + ActionListener.assertOnce(listener) + ); + } + } + + interface Writer { + void writeResponseChunk(RepositoryVerifyIntegrityResponseChunk responseChunk, ActionListener listener); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseStream.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseStream.java new file mode 100644 index 0000000000000..7ea9bfe6f2b23 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityResponseStream.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.StreamingXContentResponse; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Represents a (possibly-streaming) response to the repository-verify-integrity API. + */ +class RepositoryVerifyIntegrityResponseStream extends AbstractRefCounted { + // ref-counting discipline: + // - one ref added at creation in the REST layer and released there by the listener returned from getCompletionListener() + // - one ref held for every response chunk while it is being added to the fragment queue + // thus when all refs are released the transport-layer coordinating action is complete and no more trailing fragments can be added, + // so we can send the last response fragment. + + private static final Logger logger = LogManager.getLogger(RepositoryVerifyIntegrityResponseStream.class); + + private final RestChannel restChannel; + + private final SubscribableListener finalResultListener = new SubscribableListener<>(); + + // the listener exposed to the transport response handler + private final ActionListener completionListener = ActionListener.assertOnce( + ActionListener.releaseAfter(finalResultListener, this::decRef) + ); + + // set in startResponse() which completes before any calls to writeChunk() or closeInternal() so no need to be volatile + @Nullable // if not yet started + private StreamingXContentResponse streamingXContentResponse; + + private final AtomicLong anomalyCount = new AtomicLong(); + + RepositoryVerifyIntegrityResponseStream(RestChannel restChannel) { + this.restChannel = restChannel; + } + + void startResponse(Releasable releasable) throws IOException { + assert hasReferences(); + assert streamingXContentResponse == null; + streamingXContentResponse = new StreamingXContentResponse(restChannel, restChannel.request(), () -> {}); + streamingXContentResponse.writeFragment( + p0 -> ChunkedToXContentHelper.singleChunk((b, p) -> b.startObject().startArray("log")), + releasable + ); + } + + void writeChunk(RepositoryVerifyIntegrityResponseChunk chunk, Releasable releasable) { + assert hasReferences(); + assert streamingXContentResponse != null; + + if (chunk.type() == RepositoryVerifyIntegrityResponseChunk.Type.ANOMALY) { + anomalyCount.incrementAndGet(); + } + streamingXContentResponse.writeFragment( + p0 -> ChunkedToXContentHelper.singleChunk((b, p) -> b.startObject().value(chunk, p).endObject()), + releasable + ); + } + + @Override + protected void closeInternal() { + try { + assert finalResultListener.isDone(); + finalResultListener.addListener(new ActionListener<>() { + @Override + public void onResponse(RepositoryVerifyIntegrityResponse repositoryVerifyIntegrityResponse) { + // success - finish the response with the final results + assert streamingXContentResponse != null; + streamingXContentResponse.writeFragment( + p0 -> ChunkedToXContentHelper.singleChunk( + (b, p) -> b.endArray() + .startObject("results") + .field("status", repositoryVerifyIntegrityResponse.finalTaskStatus()) + .field("final_repository_generation", repositoryVerifyIntegrityResponse.finalRepositoryGeneration()) + .field("total_anomalies", anomalyCount.get()) + .field( + "result", + anomalyCount.get() == 0 + ? repositoryVerifyIntegrityResponse + .originalRepositoryGeneration() == repositoryVerifyIntegrityResponse.finalRepositoryGeneration() + ? "pass" + : "inconclusive due to concurrent writes" + : "fail" + ) + .endObject() + .endObject() + ), + () -> {} + ); + } + + @Override + public void onFailure(Exception e) { + if (streamingXContentResponse != null) { + // failure after starting the response - finish the response with a rendering of the final exception + streamingXContentResponse.writeFragment( + p0 -> ChunkedToXContentHelper.singleChunk( + (b, p) -> b.endArray() + .startObject("exception") + .value((bb, pp) -> ElasticsearchException.generateFailureXContent(bb, pp, e, true)) + .field("status", ExceptionsHelper.status(e)) + .endObject() + .endObject() + ), + () -> {} + ); + } else { + // didn't even get as far as starting to stream the response, must have hit an early exception (e.g. repo not found) + // so we can return this exception directly. + try { + restChannel.sendResponse(new RestResponse(restChannel, e)); + } catch (IOException e2) { + e.addSuppressed(e2); + logger.error("error building error response", e); + assert false : e; // shouldn't actually throw anything here + restChannel.request().getHttpChannel().close(); + } + } + } + }); + } finally { + Releasables.closeExpectNoException(streamingXContentResponse); + } + } + + public ActionListener getCompletionListener() { + return completionListener; + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityTask.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityTask.java new file mode 100644 index 0000000000000..eaae913fe9c6f --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityTask.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public class RepositoryVerifyIntegrityTask extends CancellableTask { + + private volatile Supplier statusSupplier; + + public RepositoryVerifyIntegrityTask( + long id, + String type, + String action, + String description, + TaskId parentTaskId, + Map headers + ) { + super(id, type, action, description, parentTaskId, headers); + } + + public void setStatusSupplier(Supplier statusSupplier) { + this.statusSupplier = statusSupplier; + } + + @Override + public Status getStatus() { + return Optional.ofNullable(statusSupplier).map(Supplier::get).orElse(null); + } + + public record Status( + String repositoryName, + long repositoryGeneration, + String repositoryUUID, + long snapshotCount, + long snapshotsVerified, + long indexCount, + long indicesVerified, + long indexSnapshotCount, + long indexSnapshotsVerified, + long blobsVerified, + long blobBytesVerified, + long throttledNanos + ) implements org.elasticsearch.tasks.Task.Status { + + public static String NAME = "verify_repository_integrity_status"; + + public Status(StreamInput in) throws IOException { + this( + in.readString(), + in.readVLong(), + in.readString(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repositoryName); + out.writeVLong(repositoryGeneration); + out.writeString(repositoryUUID); + out.writeVLong(snapshotCount); + out.writeVLong(snapshotsVerified); + out.writeVLong(indexCount); + out.writeVLong(indicesVerified); + out.writeVLong(indexSnapshotCount); + out.writeVLong(indexSnapshotsVerified); + out.writeVLong(blobsVerified); + out.writeVLong(blobBytesVerified); + out.writeVLong(throttledNanos); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("repository"); + builder.field("name", repositoryName); + builder.field("uuid", repositoryUUID); + builder.field("generation", repositoryGeneration); + builder.endObject(); + builder.startObject("snapshots"); + builder.field("verified", snapshotsVerified); + builder.field("total", snapshotCount); + builder.endObject(); + builder.startObject("indices"); + builder.field("verified", indicesVerified); + builder.field("total", indexCount); + builder.endObject(); + builder.startObject("index_snapshots"); + builder.field("verified", indexSnapshotsVerified); + builder.field("total", indexSnapshotCount); + builder.endObject(); + builder.startObject("blobs"); + builder.field("verified", blobsVerified); + if (throttledNanos > 0) { + builder.humanReadableField("verified_size_in_bytes", "verified_size", ByteSizeValue.ofBytes(blobBytesVerified)); + builder.humanReadableField("throttled_time_in_millis", "throttled_time", TimeValue.timeValueNanos(throttledNanos)); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return NAME; + } + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RestRepositoryVerifyIntegrityAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RestRepositoryVerifyIntegrityAction.java new file mode 100644 index 0000000000000..16cdb9140411c --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RestRepositoryVerifyIntegrityAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +@ServerlessScope(Scope.INTERNAL) +public class RestRepositoryVerifyIntegrityAction extends BaseRestHandler { + + public static final NodeFeature REPOSITORY_VERIFY_INTEGRITY_FEATURE = new NodeFeature("snapshot.repository_verify_integrity"); + + @Override + public List routes() { + return List.of(new Route(POST, "/_snapshot/{repository}/_verify_integrity")); + } + + @Override + public String getName() { + return "repository_verify_integrity"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final var requestParams = new RepositoryVerifyIntegrityParams(request); + return channel -> { + final var responseStream = new RepositoryVerifyIntegrityResponseStream(channel); + new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + TransportRepositoryVerifyIntegrityCoordinationAction.INSTANCE, + new TransportRepositoryVerifyIntegrityCoordinationAction.Request(requestParams, responseStream), + responseStream.getCompletionListener() + ); + }; + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityAction.java new file mode 100644 index 0000000000000..aa29f83341317 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityAction.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.function.LongSupplier; + +/** + * Transport action that actually runs the {@link RepositoryIntegrityVerifier} and sends response chunks back to the coordinating node. + */ +class TransportRepositoryVerifyIntegrityAction extends HandledTransportAction< + TransportRepositoryVerifyIntegrityAction.Request, + RepositoryVerifyIntegrityResponse> { + + // NB runs on the master because that's the expected place to read metadata blobs from the repository, but not an actual + // TransportMasterNodeAction since we don't want to retry on a master failover + + static final String ACTION_NAME = TransportRepositoryVerifyIntegrityCoordinationAction.INSTANCE.name() + "[m]"; + private final RepositoriesService repositoriesService; + private final TransportService transportService; + private final Executor executor; + + TransportRepositoryVerifyIntegrityAction( + TransportService transportService, + RepositoriesService repositoriesService, + ActionFilters actionFilters, + Executor executor + ) { + super(ACTION_NAME, transportService, actionFilters, TransportRepositoryVerifyIntegrityAction.Request::new, executor); + this.repositoriesService = repositoriesService; + this.transportService = transportService; + this.executor = executor; + } + + static class Request extends ActionRequest { + private final DiscoveryNode coordinatingNode; + private final long coordinatingTaskId; + private final RepositoryVerifyIntegrityParams requestParams; + + Request(DiscoveryNode coordinatingNode, long coordinatingTaskId, RepositoryVerifyIntegrityParams requestParams) { + this.coordinatingNode = coordinatingNode; + this.coordinatingTaskId = coordinatingTaskId; + this.requestParams = Objects.requireNonNull(requestParams); + } + + Request(StreamInput in) throws IOException { + super(in); + coordinatingNode = new DiscoveryNode(in); + coordinatingTaskId = in.readVLong(); + requestParams = new RepositoryVerifyIntegrityParams(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + coordinatingNode.writeTo(out); + out.writeVLong(coordinatingTaskId); + requestParams.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new RepositoryVerifyIntegrityTask(id, type, action, getDescription(), parentTaskId, headers); + } + } + + @Override + protected void doExecute(Task rawTask, Request request, ActionListener listener) { + final var responseWriter = new RepositoryVerifyIntegrityResponseChunk.Writer() { + + // no need to obtain a fresh connection each time - this connection shouldn't close, so if it does we can fail the verification + final Transport.Connection responseConnection = transportService.getConnection(request.coordinatingNode); + + @Override + public void writeResponseChunk(RepositoryVerifyIntegrityResponseChunk responseChunk, ActionListener listener) { + transportService.sendChildRequest( + responseConnection, + TransportRepositoryVerifyIntegrityResponseChunkAction.ACTION_NAME, + new TransportRepositoryVerifyIntegrityResponseChunkAction.Request(request.coordinatingTaskId, responseChunk), + rawTask, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler( + listener.map(ignored -> null), + in -> ActionResponse.Empty.INSTANCE, + executor + ) + ); + } + }; + + final LongSupplier currentTimeMillisSupplier = transportService.getThreadPool()::absoluteTimeInMillis; + final var repository = (BlobStoreRepository) repositoriesService.repository(request.requestParams.repository()); + final var task = (RepositoryVerifyIntegrityTask) rawTask; + + SubscribableListener + + .newForked(l -> repository.getRepositoryData(executor, l)) + .andThenApply(repositoryData -> { + final var cancellableThreads = new CancellableThreads(); + task.addListener(() -> cancellableThreads.cancel("task cancelled")); + final var verifier = new RepositoryIntegrityVerifier( + currentTimeMillisSupplier, + repository, + responseWriter, + request.requestParams.withResolvedDefaults(repository.threadPool().info(ThreadPool.Names.SNAPSHOT_META)), + repositoryData, + cancellableThreads + ); + task.setStatusSupplier(verifier::getStatus); + return verifier; + }) + .andThen( + (l, repositoryIntegrityVerifier) -> new RepositoryVerifyIntegrityResponseChunk.Builder( + responseWriter, + RepositoryVerifyIntegrityResponseChunk.Type.START_RESPONSE, + currentTimeMillisSupplier.getAsLong() + ).write(l.map(ignored -> repositoryIntegrityVerifier)) + ) + .andThen((l, repositoryIntegrityVerifier) -> repositoryIntegrityVerifier.start(l)) + .addListener(listener); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityCoordinationAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityCoordinationAction.java new file mode 100644 index 0000000000000..d5a5749997d8d --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityCoordinationAction.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Transport action that coordinates the integrity verification, dispatching a request to run the verification on the master and setting up + * the machinery needed to send the response chunks back to the client. + */ +public class TransportRepositoryVerifyIntegrityCoordinationAction extends TransportAction< + TransportRepositoryVerifyIntegrityCoordinationAction.Request, + RepositoryVerifyIntegrityResponse> { + + /* + * Message flow: the coordinating node (the one running this action) forwards the request on to a master node which actually runs the + * verification. The master node in turn sends requests back to this node containing chunks of response, either information about the + * snapshots processed, or about the restorability of the indices in the repository, or details of any verification anomalies found. + * When the process is complete the master responds to the original transport request with the final results: + * + * +---------+ +-------------+ +--------+ + * | Client | | Coordinator | | Master | + * +---------+ +-------------+ +--------+ + * | | | + * |-[REST request]--------------------->| | + * | |---[master node request]----------------->| ----------------------\ + * | | |-| Initialize verifier | + * | | | |---------------------| + * | |<--[START_RESPONSE chunk request]---------| + * |<---[headers & initial JSON body]----| | + * | |---[START_RESPONSE chunk response]------->| ------------------\ + * | | |-| Verify snapshot | + * | | | |-----------------| + * | |<--[SNAPSHOT_INFO chunk request]----------| + * |<---[more JSON body]-----------------| | + * | |---[SNAPSHOT_INFO chunk response]-------->| ------------------\ + * | | |-| Verify snapshot | + * | | | |-----------------| + * | |<--[SNAPSHOT_INFO chunk request]----------| + * |<---[more JSON body]-----------------| | + * | |---[SNAPSHOT_INFO chunk response]-------->| ... + * . . . + * . . . + * | | | -----------------------------\ + * | | |-| Verify index restorability | + * | | | |----------------------------| + * | |<--[INDEX_RESTORABILITY chunk request]----| + * |<---[more JSON body]-----------------| | + * | |---[INDEX_RESTORABILITY chunk response]-->| -----------------------------\ + * | | |-| Verify index restorability | + * | | | |----------------------------| + * | |<--[INDEX_RESTORABILITY chunk request]----| + * |<---[more JSON body]-----------------| | + * | |---[INDEX_RESTORABILITY chunk response]-->| ... + * . . . + * . . . + * | |<--[response to master node request]------| + * |<--[final JSON to complete body]-----| | + * + * This message flow ties the lifecycle of the verification process to that of the transport request sent from coordinator to master, + * which means it integrates well with the tasks framework and handles network issues properly. An alternative would be for the + * coordinator to repeatedly request chunks from the master, but that would mean that there's no one task representing the whole + * process, and it'd be a little tricky for the master node to know if the coordinator has failed and the verification should be + * cancelled. + */ + + public static final ActionType INSTANCE = new ActionType<>( + "cluster:admin/repository/verify_integrity" + ); + + private final ActiveRepositoryVerifyIntegrityTasks activeRepositoryVerifyIntegrityTasks = new ActiveRepositoryVerifyIntegrityTasks(); + + private final TransportService transportService; + private final ClusterService clusterService; + private final Executor managementExecutor; + + public static class Request extends ActionRequest { + private final RepositoryVerifyIntegrityParams requestParams; + private final RepositoryVerifyIntegrityResponseStream responseStream; + + public Request(RepositoryVerifyIntegrityParams requestParams, RepositoryVerifyIntegrityResponseStream responseStream) { + this.requestParams = requestParams; + this.responseStream = responseStream; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public RepositoryVerifyIntegrityParams requestParams() { + return requestParams; + } + + public RepositoryVerifyIntegrityResponseStream responseStream() { + return responseStream; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + } + + @Inject + public TransportRepositoryVerifyIntegrityCoordinationAction( + TransportService transportService, + ClusterService clusterService, + RepositoriesService repositoriesService, + ActionFilters actionFilters + ) { + super( + INSTANCE.name(), + actionFilters, + transportService.getTaskManager(), + transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT) + ); + + this.transportService = transportService; + this.clusterService = clusterService; + this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT); + + // register subsidiary actions + new TransportRepositoryVerifyIntegrityAction(transportService, repositoriesService, actionFilters, managementExecutor); + + new TransportRepositoryVerifyIntegrityResponseChunkAction( + transportService, + actionFilters, + managementExecutor, + activeRepositoryVerifyIntegrityTasks + ); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + ActionListener.run( + ActionListener.releaseAfter( + listener, + activeRepositoryVerifyIntegrityTasks.registerResponseBuilder(task.getId(), request.responseStream()) + ), + l -> { + final var master = clusterService.state().nodes().getMasterNode(); + if (master == null) { + // no waiting around or retries here, we just fail immediately + throw new MasterNotDiscoveredException(); + } + transportService.sendChildRequest( + master, + TransportRepositoryVerifyIntegrityAction.ACTION_NAME, + new TransportRepositoryVerifyIntegrityAction.Request( + transportService.getLocalNode(), + task.getId(), + request.requestParams() + ), + task, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(l, RepositoryVerifyIntegrityResponse::new, managementExecutor) + ); + } + ); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityResponseChunkAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityResponseChunkAction.java new file mode 100644 index 0000000000000..9015866fb3ec2 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/TransportRepositoryVerifyIntegrityResponseChunkAction.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.testkit.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Executor; + +/** + * Transport action that handles a response chunk on the coordinating node, sending it out to the REST client. + */ +class TransportRepositoryVerifyIntegrityResponseChunkAction extends HandledTransportAction< + TransportRepositoryVerifyIntegrityResponseChunkAction.Request, + ActionResponse.Empty> { + + static final String ACTION_NAME = TransportRepositoryVerifyIntegrityCoordinationAction.INSTANCE.name() + "[response_chunk]"; + + private final ActiveRepositoryVerifyIntegrityTasks activeRepositoryVerifyIntegrityTasks; + + TransportRepositoryVerifyIntegrityResponseChunkAction( + TransportService transportService, + ActionFilters actionFilters, + Executor executor, + ActiveRepositoryVerifyIntegrityTasks activeRepositoryVerifyIntegrityTasks + ) { + super(ACTION_NAME, transportService, actionFilters, Request::new, executor); + this.activeRepositoryVerifyIntegrityTasks = activeRepositoryVerifyIntegrityTasks; + } + + static class Request extends ActionRequest { + private final long coordinatingTaskId; + private final RepositoryVerifyIntegrityResponseChunk chunkContents; + + Request(long coordinatingTaskId, RepositoryVerifyIntegrityResponseChunk chunkContents) { + this.coordinatingTaskId = coordinatingTaskId; + this.chunkContents = Objects.requireNonNull(chunkContents); + } + + Request(StreamInput in) throws IOException { + super(in); + coordinatingTaskId = in.readVLong(); + chunkContents = new RepositoryVerifyIntegrityResponseChunk(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(coordinatingTaskId); + chunkContents.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public RepositoryVerifyIntegrityResponseChunk chunkContents() { + return chunkContents; + } + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + ActionListener.run(listener, l -> { + final var responseStream = activeRepositoryVerifyIntegrityTasks.acquireResponseStream(request.coordinatingTaskId); + try { + if (request.chunkContents().type() == RepositoryVerifyIntegrityResponseChunk.Type.START_RESPONSE) { + responseStream.startResponse(() -> l.onResponse(ActionResponse.Empty.INSTANCE)); + } else { + responseStream.writeChunk(request.chunkContents(), () -> l.onResponse(ActionResponse.Empty.INSTANCE)); + } + } finally { + responseStream.decRef(); + } + }); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification b/x-pack/plugin/snapshot-repo-test-kit/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification new file mode 100644 index 0000000000000..ae11c3bb39d0b --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification @@ -0,0 +1,8 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. +# + +org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKitFeatures