diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java new file mode 100644 index 0000000000000..ea5c4e9753c1b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.Strings; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; + +public class ThrottledIterator implements Releasable { + + private static final Logger logger = LogManager.getLogger(ThrottledIterator.class); + + /** + * Iterate through the given collection, performing an operation on each item which may fork background tasks, but with a limit on the + * number of such background tasks running concurrently to avoid overwhelming the rest of the system (e.g. starving other work of access + * to an executor). + * + * @param iterator The items to iterate. May be accessed by multiple threads, but accesses are all protected by synchronizing on itself. + * @param itemConsumer The operation to perform on each item. Each operation receives a {@link RefCounted} which can be used to track + * the execution of any background tasks spawned for this item. This operation may run on the thread which + * originally called {@link #run}, if this method has not yet returned. Otherwise it will run on a thread on which a + * background task previously called {@link RefCounted#decRef()} on its ref count. This operation should not throw + * any exceptions. + * @param maxConcurrency The maximum number of ongoing operations at any time. + * @param onItemCompletion Executed when each item is completed, which can be used for instance to report on progress. Must not throw + * exceptions. + * @param onCompletion Executed when all items are completed. + */ + public static void run( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + Runnable onItemCompletion, + Runnable onCompletion + ) { + try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onItemCompletion, onCompletion)) { + throttledIterator.run(); + } + } + + private final RefCounted throttleRefs; + private final Iterator iterator; + private final BiConsumer itemConsumer; + private final Semaphore permits; + private final Runnable onItemCompletion; + + private ThrottledIterator( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + Runnable onItemCompletion, + Runnable onCompletion + ) { + this.iterator = Objects.requireNonNull(iterator); + this.itemConsumer = Objects.requireNonNull(itemConsumer); + if (maxConcurrency <= 0) { + throw new IllegalArgumentException("maxConcurrency must be positive"); + } + this.permits = new Semaphore(maxConcurrency); + this.onItemCompletion = Objects.requireNonNull(onItemCompletion); + this.throttleRefs = AbstractRefCounted.of(onCompletion); + } + + private void run() { + while (permits.tryAcquire()) { + final T item; + synchronized (iterator) { + if (iterator.hasNext()) { + item = iterator.next(); + } else { + permits.release(); + return; + } + } + try (var itemRefs = new ItemRefCounted()) { + // TODO simplify, there's always exactly two refs? + itemRefs.incRef(); + itemConsumer.accept(Releasables.releaseOnce(itemRefs::decRef), item); + } catch (Exception e) { + logger.error(Strings.format("exception when processing [%s] with [%s]", item, itemConsumer), e); + assert false : e; + } + } + } + + @Override + public void close() { + throttleRefs.decRef(); + } + + // A RefCounted for a single item, including protection against calling back into run() if it's created and closed within a single + // invocation of run(). + private class ItemRefCounted extends AbstractRefCounted implements Releasable { + private boolean isRecursive = true; + + ItemRefCounted() { + throttleRefs.incRef(); + } + + @Override + protected void closeInternal() { + try { + onItemCompletion.run(); + } catch (Exception e) { + logger.error("exception in onItemCompletion", e); + assert false : e; + } finally { + permits.release(); + try { + // Someone must now pick up the next item. Here we might be called from the run() invocation which started processing + // the just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks + // finished first. If so, there's no need to call run() here, we can just return and the next iteration of the run() + // loop will continue the processing; moreover calling run() in this situation could lead to a stack overflow. However + // if we're not within that run() invocation then ... + if (isRecursive() == false) { + // ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here. + run(); + } + } finally { + throttleRefs.decRef(); + } + } + } + + // Note on blocking: we call both of these synchronized methods exactly once (and must enter close() before calling isRecursive()). + // If close() releases the last ref and calls closeInternal(), and hence isRecursive(), then there's no other threads involved and + // hence no blocking. In contrast if close() doesn't release the last ref then it exits immediately, so the call to isRecursive() + // will proceed without delay in this case too. + + private synchronized boolean isRecursive() { + return isRecursive; + } + + @Override + public synchronized void close() { + decRef(); + isRecursive = false; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index baf0c9ad08059..7f5c50ceaf421 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; @@ -318,7 +319,10 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { } case WRITER_UUID -> { writerUuid = new BytesRef(parser.binaryValue()); - assert writerUuid.length > 0; + assert BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED == false || writerUuid.length > 0; + if (writerUuid.length == 0) { + throw new ElasticsearchParseException("invalid (empty) writer uuid"); + } } default -> XContentParserUtils.throwUnknownField(currentFieldName, parser); } @@ -336,6 +340,11 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException { } else if (checksum == null) { throw new ElasticsearchParseException("missing checksum for name [" + name + "]"); } + try { + org.apache.lucene.util.Version.parse(writtenBy); + } catch (Exception e) { + throw new ElasticsearchParseException("invalid written_by [" + writtenBy + "]"); + } return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize); } @@ -571,6 +580,13 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th } } + if (snapshot == null) { + throw new CorruptStateException("snapshot missing"); + } + if (indexVersion < 0) { + throw new CorruptStateException("index version missing or corrupt"); + } + return new BlobStoreIndexShardSnapshot( snapshot, indexVersion, diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index 113d3c8f28a19..2f022e1c34394 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -256,6 +256,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + static volatile boolean INTEGRITY_ASSERTIONS_ENABLED = true; + public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException { XContentParser.Token token = parser.currentToken(); if (token == null) { // New parser @@ -309,7 +311,11 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t List fileInfosBuilder = new ArrayList<>(); for (String file : entry.v2()) { FileInfo fileInfo = files.get(file); - assert fileInfo != null; + if (fileInfo == null) { + final var exception = new IllegalStateException("shard index inconsistent at file [" + file + "]"); + assert INTEGRITY_ASSERTIONS_ENABLED == false : exception; + throw exception; + } fileInfosBuilder.add(fileInfo); } snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1()))); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 089b5a6e639ba..93cc618236c6d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -275,6 +275,10 @@ public Collection getSnapshotIds() { return snapshotIds.values(); } + public long getIndexSnapshotCount() { + return indexSnapshots.values().stream().mapToLong(List::size).sum(); + } + /** * @return whether some of the {@link SnapshotDetails} of the given snapshot are missing, due to BwC, so that they must be loaded from * the {@link SnapshotInfo} blob instead. diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java new file mode 100644 index 0000000000000..9521677e2db5f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.stream.IntStream; + +public class ThrottledIteratorTests extends ESTestCase { + private static final String CONSTRAINED = "constrained"; + private static final String RELAXED = "relaxed"; + + public void testConcurrency() throws InterruptedException { + final var maxConstrainedThreads = between(1, 3); + final var maxRelaxedThreads = between(1, 100); + final var constrainedQueue = between(3, 6); + final var threadPool = new TestThreadPool( + "test", + new FixedExecutorBuilder(Settings.EMPTY, CONSTRAINED, maxConstrainedThreads, constrainedQueue, CONSTRAINED, false), + new ScalingExecutorBuilder(RELAXED, 1, maxRelaxedThreads, TimeValue.timeValueSeconds(30), true) + ); + try { + final var items = between(1, 10000); // large enough that inadvertent recursion will trigger a StackOverflowError + final var itemStartLatch = new CountDownLatch(items); + final var completedItems = new AtomicInteger(); + final var maxConcurrency = between(1, (constrainedQueue + maxConstrainedThreads) * 2); + final var itemPermits = new Semaphore(maxConcurrency); + final var completionLatch = new CountDownLatch(1); + final BooleanSupplier forkSupplier = randomFrom( + () -> false, + ESTestCase::randomBoolean, + LuceneTestCase::rarely, + LuceneTestCase::usually, + () -> true + ); + final var blockPermits = new Semaphore(between(0, Math.min(maxRelaxedThreads, maxConcurrency) - 1)); + + ThrottledIterator.run(IntStream.range(0, items).boxed().iterator(), (releasable, item) -> { + try (var refs = new RefCountingRunnable(releasable::close)) { + assertTrue(itemPermits.tryAcquire()); + if (forkSupplier.getAsBoolean()) { + var ref = refs.acquire(); + final var executor = randomFrom(CONSTRAINED, RELAXED); + threadPool.executor(executor).execute(new AbstractRunnable() { + + @Override + public void onRejection(Exception e) { + assertEquals(CONSTRAINED, executor); + itemStartLatch.countDown(); + } + + @Override + protected void doRun() { + itemStartLatch.countDown(); + if (RELAXED.equals(executor) && randomBoolean() && blockPermits.tryAcquire()) { + // simulate at most (maxConcurrency-1) long-running operations, to demonstrate that they don't + // hold up the processing of the other operations + try { + assertTrue(itemStartLatch.await(30, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError("unexpected", e); + } finally { + blockPermits.release(); + } + } + } + + @Override + public void onAfter() { + itemPermits.release(); + ref.close(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }); + } else { + itemStartLatch.countDown(); + itemPermits.release(); + } + } + }, maxConcurrency, completedItems::incrementAndGet, completionLatch::countDown); + + assertTrue(completionLatch.await(30, TimeUnit.SECONDS)); + assertEquals(items, completedItems.get()); + assertTrue(itemPermits.tryAcquire(maxConcurrency)); + assertTrue(itemStartLatch.await(0, TimeUnit.SECONDS)); + } finally { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java b/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java new file mode 100644 index 0000000000000..d22db399a2f34 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshotsIntegritySuppressor.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.snapshots.blobstore; + +import org.elasticsearch.core.Releasable; + +public class BlobStoreIndexShardSnapshotsIntegritySuppressor implements Releasable { + + public BlobStoreIndexShardSnapshotsIntegritySuppressor() { + BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = false; + } + + @Override + public void close() { + BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = true; + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c0db57f84b96a..ee79b15dfa370 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -53,6 +53,7 @@ public class Constants { "cluster:admin/repository/get", "cluster:admin/repository/put", "cluster:admin/repository/verify", + "cluster:admin/repository/verify_integrity", "cluster:admin/reroute", "cluster:admin/script/delete", "cluster:admin/script/get", diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/integrity/BlobStoreMetadataIntegrityIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/integrity/BlobStoreMetadataIntegrityIT.java new file mode 100644 index 0000000000000..92f0143ae7f8e --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/integrity/BlobStoreMetadataIntegrityIT.java @@ -0,0 +1,427 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.CorruptionUtils; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; + +public class BlobStoreMetadataIntegrityIT extends AbstractSnapshotIntegTestCase { + + private static final String REPOSITORY_NAME = "test-repo"; + + private Releasable integrityCheckSuppressor; + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy( + CollectionUtils.appendToCopy(super.nodePlugins(), LocalStateCompositeXPackPlugin.class), + SnapshotRepositoryTestKit.class + ); + } + + @Before + public void suppressIntegrityChecks() { + disableRepoConsistencyCheck("testing integrity checks involves breaking the repo"); + assertNull(integrityCheckSuppressor); + integrityCheckSuppressor = new BlobStoreIndexShardSnapshotsIntegritySuppressor(); + } + + @After + public void enableIntegrityChecks() { + Releasables.closeExpectNoException(integrityCheckSuppressor); + integrityCheckSuppressor = null; + } + + @TestLogging(reason = "testing", value = "org.elasticsearch.repositories.blobstore.integrity.MetadataVerifier:DEBUG") + public void testIntegrityCheck() throws Exception { + final var repoPath = randomRepoPath(); + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder().put(BlobStoreRepository.SUPPORT_URL_REPO.getKey(), false).put("location", repoPath) + ); + final MockRepository repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) + .repository(REPOSITORY_NAME); + + final var indexCount = between(1, 3); + for (int i = 0; i < indexCount; i++) { + createIndexWithRandomDocs("test-index-" + i, between(1, 1000)); + } + + final var snapshotCount = between(2, 4); + for (int snapshotIndex = 0; snapshotIndex < snapshotCount; snapshotIndex++) { + final var indexRequests = new ArrayList(); + for (int i = 0; i < indexCount; i++) { + if (randomBoolean()) { + final var indexName = "test-index-" + i; + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareDelete(indexName)); + createIndexWithRandomDocs(indexName, between(1, 1000)); + } + final var numDocs = between(1, 1000); + for (int doc = 0; doc < numDocs; doc++) { + indexRequests.add(client().prepareIndex(indexName).setSource("field1", "bar " + doc)); + } + } + } + indexRandom(true, indexRequests); + assertEquals(0, client().admin().indices().prepareFlush().get().getFailedShards()); + final var snapshotInfo = clusterAdmin().prepareCreateSnapshot(REPOSITORY_NAME, "test-snapshot-" + snapshotIndex) + .setIncludeGlobalState(randomBoolean()) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + } + + repository.setBlockOnReadIndexMeta(); + + final var tasksFuture = new PlainActionFuture>(); + repository.threadPool().generic().execute(() -> { + try { + assertBusy(() -> assertTrue(repository.blocked())); + } catch (Exception e) { + throw new AssertionError(e); + } + + ActionListener.completeWith( + tasksFuture, + () -> client().admin() + .cluster() + .prepareListTasks() + .setDetailed(true) + .get() + .getTasks() + .stream() + .filter(t -> t.action().equals(VerifyRepositoryIntegrityAction.NAME) && t.status() != null) + .toList() + ); + + repository.unblock(); + }); + + verifyAndAssertSuccessful(indexCount); + + final var tasks = tasksFuture.actionGet(30, TimeUnit.SECONDS); + assertThat(tasks, not(empty())); + for (TaskInfo task : tasks) { + if (task.status()instanceof VerifyRepositoryIntegrityAction.Status status) { + assertEquals(REPOSITORY_NAME, status.repositoryName()); + assertThat(status.repositoryGeneration(), greaterThan(0L)); + assertEquals(snapshotCount, status.snapshotCount()); + assertEquals(snapshotCount, status.snapshotsVerified()); + assertEquals(indexCount, status.indexCount()); + assertEquals(0, status.indicesVerified()); + assertThat(status.indexSnapshotCount(), greaterThanOrEqualTo((long) indexCount)); + assertEquals(0, status.indexSnapshotsVerified()); + assertEquals(0, status.anomalyCount()); + assertEquals(0, status.blobsVerified()); + assertEquals(0, status.blobBytesVerified()); + } else { + assert false : Strings.toString(task); + } + } + + final var tempDir = createTempDir(); + + final var repositoryData = PlainActionFuture.get(repository::getRepositoryData, 10, TimeUnit.SECONDS); + final var repositoryDataBlob = repoPath.resolve("index-" + repositoryData.getGenId()); + + final List blobs; + try (var paths = Files.walk(repoPath)) { + blobs = paths.filter(path -> Files.isRegularFile(path) && path.equals(repositoryDataBlob) == false).sorted().toList(); + } + + for (int i = 0; i < 20; i++) { + final var blobToDamage = randomFrom(blobs); + final var isDataBlob = blobToDamage.getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX); + final var truncate = randomBoolean(); + final var corrupt = randomBoolean(); + if (truncate) { + logger.info("--> truncating {}", blobToDamage); + Files.copy(blobToDamage, tempDir.resolve("tmp")); + Files.write(blobToDamage, new byte[0]); + } else if (corrupt) { + logger.info("--> corrupting {}", blobToDamage); + Files.copy(blobToDamage, tempDir.resolve("tmp")); + CorruptionUtils.corruptFile(random(), blobToDamage); + } else { + logger.info("--> deleting {}", blobToDamage); + Files.move(blobToDamage, tempDir.resolve("tmp")); + } + try { + // TODO include some cancellation tests + + verifyAndGetAnomalies(indexCount, repoPath.relativize(blobToDamage), truncate, corrupt); + + // + // final var isCancelled = new AtomicBoolean(); + // + // final var verificationResponse = PlainActionFuture.get( + // (PlainActionFuture listener) -> repository.verifyMetadataIntegrity( + // client(), + // () -> new RecyclerBytesStreamOutput(NON_RECYCLING_INSTANCE), + // request, + // listener, + // () -> { + // if (rarely() && rarely()) { + // isCancelled.set(true); + // return true; + // } + // return isCancelled.get(); + // } + // ), + // 30, + // TimeUnit.SECONDS + // ); + // for (SearchHit hit : client().prepareSearch("metadata_verification_results").setSize(10000).get().getHits().getHits()) { + // logger.info("--> {}", Strings.toString(hit)); + // } + // assertThat(verificationResponse, not(nullValue())); + // final var responseString = verificationResponse.stream().map(Throwable::getMessage).collect(Collectors.joining("\n")); + // if (isCancelled.get()) { + // assertThat(responseString, containsString("verification task cancelled before completion")); + // } + // if (isDataBlob && isCancelled.get() == false) { + // assertThat( + // responseString, + // allOf(containsString(blobToDamage.getFileName().toString()), containsString("missing blob")) + // ); + // } + } finally { + Files.deleteIfExists(blobToDamage); + Files.move(tempDir.resolve("tmp"), blobToDamage); + } + + verifyAndAssertSuccessful(indexCount); + } + } + + private void verifyAndAssertSuccessful(int indexCount) { + final var waitForCompletion = randomBoolean(); + + final var response = PlainActionFuture.get( + listener -> client().execute( + VerifyRepositoryIntegrityAction.INSTANCE, + new VerifyRepositoryIntegrityAction.Request( + REPOSITORY_NAME, + "", + 0, + 0, + 0, + 0, + 0, + randomBoolean(), + ByteSizeValue.ofMb(10), + waitForCompletion + ), + listener + ), + 30, + TimeUnit.SECONDS + ); + logger.info("--> waitForCompletion[{}]: response {}", waitForCompletion, response); + if (waitForCompletion == false) { + client().admin() + .cluster() + .prepareListTasks() + .setTargetTaskId(response.getTaskId()) + .setWaitForCompletion(true) + .get(TimeValue.timeValueSeconds(30)); + + assertThat( + client().admin() + .cluster() + .prepareListTasks() + .setTargetTaskId(response.getTaskId()) + .get(TimeValue.timeValueSeconds(10)) + .getTasks(), + empty() + ); + } + assertEquals( + 0, + client().prepareSearch(response.getResultsIndex()) + .setSize(0) + .setQuery(new ExistsQueryBuilder("anomaly")) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + indexCount, + client().prepareSearch(response.getResultsIndex()) + .setSize(0) + .setQuery(new ExistsQueryBuilder("restorability")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + indexCount, + client().prepareSearch(response.getResultsIndex()) + .setSize(0) + .setQuery(new TermQueryBuilder("restorability", "full")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + assertEquals( + 0, + client().prepareSearch(response.getResultsIndex()) + .setSize(1) + .setQuery(new TermQueryBuilder("completed", true)) + .get() + .getHits() + .getHits()[0].getSourceAsMap().get("total_anomalies") + ); + assertAcked(client().admin().indices().prepareDelete(response.getResultsIndex())); + } + + private void verifyAndGetAnomalies(long indexCount, Path damagedBlob, boolean truncate, boolean corrupt) { + final var damagedFileName = damagedBlob.getFileName().toString(); + final var isDataBlob = damagedFileName.startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX); + final var response = PlainActionFuture.get( + listener -> client().execute( + VerifyRepositoryIntegrityAction.INSTANCE, + new VerifyRepositoryIntegrityAction.Request( + REPOSITORY_NAME, + "", + 0, + 0, + 0, + 0, + 0, + (isDataBlob && truncate == false && corrupt) || randomBoolean(), + ByteSizeValue.ofMb(10), + true + ), + listener + ), + 30, + TimeUnit.SECONDS + ); + assertThat( + client().prepareSearch(response.getResultsIndex()) + .setSize(10000) + .setQuery(new ExistsQueryBuilder("anomaly")) + .get() + .getHits() + .getTotalHits().value, + greaterThan(0L) + ); + assertEquals( + indexCount, + client().prepareSearch(response.getResultsIndex()) + .setSize(0) + .setQuery(new ExistsQueryBuilder("restorability")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value + ); + assertThat( + client().prepareSearch(response.getResultsIndex()) + .setSize(0) + .setQuery(new TermQueryBuilder("restorability", "full")) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value, + damagedFileName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) + || isDataBlob + || (damagedFileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && damagedBlob.startsWith("indices")) + ? lessThan(indexCount) + : equalTo(indexCount) + ); + assertThat( + (int) client().prepareSearch(response.getResultsIndex()) + .setSize(1) + .setQuery(new TermQueryBuilder("completed", true)) + .get() + .getHits() + .getHits()[0].getSourceAsMap().get("total_anomalies"), + greaterThan(0) + ); + if (damagedBlob.toString().startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { + assertAnomaly(response.getResultsIndex(), MetadataVerifier.Anomaly.FAILED_TO_LOAD_SNAPSHOT_INFO); + } else if (damagedFileName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { + assertAnomaly(response.getResultsIndex(), MetadataVerifier.Anomaly.FAILED_TO_LOAD_SHARD_SNAPSHOT); + } else if (damagedBlob.toString().startsWith(BlobStoreRepository.METADATA_PREFIX)) { + assertAnomaly(response.getResultsIndex(), MetadataVerifier.Anomaly.FAILED_TO_LOAD_GLOBAL_METADATA); + } else if (damagedFileName.startsWith(BlobStoreRepository.METADATA_PREFIX)) { + assertAnomaly(response.getResultsIndex(), MetadataVerifier.Anomaly.FAILED_TO_LOAD_INDEX_METADATA); + } else if (damagedFileName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) { + assertAnomaly(response.getResultsIndex(), MetadataVerifier.Anomaly.FAILED_TO_LOAD_SHARD_GENERATION); + } else if (isDataBlob) { + assertAnomaly( + response.getResultsIndex(), + truncate ? MetadataVerifier.Anomaly.MISMATCHED_BLOB_LENGTH + : corrupt ? MetadataVerifier.Anomaly.CORRUPT_DATA_BLOB + : MetadataVerifier.Anomaly.MISSING_BLOB + ); + } + assertAcked(client().admin().indices().prepareDelete(response.getResultsIndex())); + } + + private void assertAnomaly(String resultsIndex, MetadataVerifier.Anomaly anomaly) { + assertThat( + client().prepareSearch(resultsIndex) + .setSize(0) + .setQuery(new TermQueryBuilder("anomaly", anomaly.toString())) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits().value, + greaterThan(0L) + ); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/MetadataVerifier.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/MetadataVerifier.java new file mode 100644 index 0000000000000..8d81f9242ab01 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/MetadataVerifier.java @@ -0,0 +1,1138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.integrity; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RateLimiter; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.blobstore.support.BlobMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThrottledIterator; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Strings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; +import org.elasticsearch.index.snapshots.blobstore.RateLimitingInputStream; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; + +public class MetadataVerifier implements Releasable { + private static final Logger logger = LogManager.getLogger(MetadataVerifier.class); + + public enum Anomaly { + FAILED_TO_LOAD_SNAPSHOT_INFO, + FAILED_TO_LOAD_GLOBAL_METADATA, + FAILED_TO_LOAD_SHARD_SNAPSHOT, + FAILED_TO_LOAD_INDEX_METADATA, + FAILED_TO_LIST_SHARD_CONTAINER, + FAILED_TO_LOAD_SHARD_GENERATION, + MISSING_BLOB, + MISMATCHED_BLOB_LENGTH, + CORRUPT_DATA_BLOB, + UNDEFINED_SHARD_GENERATION, + UNEXPECTED_EXCEPTION, + FILE_IN_SHARD_GENERATION_NOT_SNAPSHOT, + SNAPSHOT_SHARD_GENERATION_MISMATCH, + FILE_IN_SNAPSHOT_NOT_SHARD_GENERATION, + UNKNOWN_SNAPSHOT_FOR_INDEX, + SNAPSHOT_NOT_IN_SHARD_GENERATION, + } + + private static void mappedField(XContentBuilder builder, String fieldName, String type) throws IOException { + builder.startObject(fieldName).field("type", type).endObject(); + } + + public static void run( + BlobStoreRepository blobStoreRepository, + NodeClient client, + VerifyRepositoryIntegrityAction.Request verifyRequest, + CancellableThreads cancellableThreads, + VerifyRepositoryIntegrityAction.Task backgroundTask, + ActionListener foregroundTaskListener, + ActionListener backgroundTaskListener + ) { + logger.info( + "[{}] verifying metadata integrity and writing results to [{}]", + verifyRequest.getRepository(), + verifyRequest.getResultsIndex() + ); + + final var repositoryDataFuture = new ListenableActionFuture(); + blobStoreRepository.getRepositoryData(repositoryDataFuture); + + final var createIndex = client.admin() + .indices() + .prepareCreate(verifyRequest.getResultsIndex()) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")); + + try (var builder = XContentFactory.jsonBuilder()) { + builder.startObject().startObject("_doc").field("dynamic", "strict").startObject("properties"); + + mappedField(builder, "@timestamp", "date"); + mappedField(builder, "task", "keyword"); + mappedField(builder, "repository", "keyword"); + mappedField(builder, "uuid", "keyword"); + mappedField(builder, "repository_generation", "long"); + mappedField(builder, "final_repository_generation", "long"); + mappedField(builder, "completed", "boolean"); + mappedField(builder, "cancelled", "boolean"); + mappedField(builder, "total_anomalies", "long"); + + builder.startObject("error").field("type", "object").field("dynamic", "false").endObject(); + + builder.startObject("snapshot").startObject("properties"); + mappedField(builder, "name", "keyword"); + mappedField(builder, "id", "keyword"); + mappedField(builder, "start_time", "date"); + mappedField(builder, "end_time", "date"); + builder.endObject().endObject(); + + builder.startObject("index").startObject("properties"); + mappedField(builder, "name", "keyword"); + mappedField(builder, "id", "keyword"); + mappedField(builder, "metadata_blob", "keyword"); + mappedField(builder, "shards", "long"); + builder.endObject().endObject(); + mappedField(builder, "shard", "long"); + + mappedField(builder, "anomaly", "keyword"); + mappedField(builder, "blob_name", "keyword"); + mappedField(builder, "part", "long"); + mappedField(builder, "number_of_parts", "long"); + mappedField(builder, "file_name", "keyword"); + mappedField(builder, "file_length_in_bytes", "long"); + mappedField(builder, "part_length_in_bytes", "long"); + mappedField(builder, "actual_length_in_bytes", "long"); + mappedField(builder, "expected_length_in_bytes", "long"); + + mappedField(builder, "restorability", "keyword"); + mappedField(builder, "total_snapshots", "long"); + mappedField(builder, "restorable_snapshots", "long"); + mappedField(builder, "unrestorable_snapshots", "long"); + + builder.endObject(); + builder.endObject(); + builder.endObject(); + createIndex.setMapping(builder); + } catch (Exception e) { + logger.error("error generating index mapping", e); + foregroundTaskListener.onFailure(e); + return; + } + + createIndex.execute(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + onSuccess(); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + onSuccess(); + } else { + foregroundTaskListener.onFailure(e); + } + } + + private void onSuccess() { + repositoryDataFuture.addListener(foregroundTaskListener.map(repositoryData -> { + try ( + var metadataVerifier = new MetadataVerifier( + blobStoreRepository, + client, + verifyRequest, + repositoryData, + cancellableThreads, + backgroundTask, + createLoggingListener(backgroundTaskListener, repositoryData) + ) + ) { + logger.info( + "[{}] verifying metadata integrity for index generation [{}]: " + + "repo UUID [{}], cluster UUID [{}], snapshots [{}], indices [{}], index snapshots [{}]", + verifyRequest.getRepository(), + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID(), + metadataVerifier.getSnapshotCount(), + metadataVerifier.getIndexCount(), + metadataVerifier.getIndexSnapshotCount() + ); + return metadataVerifier.start(); + } + })); + } + + private ActionListener createLoggingListener(ActionListener l, RepositoryData repositoryData) { + return new ActionListener<>() { + @Override + public void onResponse(Long anomalyCount) { + logger.info( + "[{}] completed verifying metadata integrity for index generation [{}]: " + + "repo UUID [{}], cluster UUID [{}], anomalies [{}]", + verifyRequest.getRepository(), + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID(), + anomalyCount + ); + l.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> Strings.format( + "[%s] failed verifying metadata integrity for index generation [%d]: repo UUID [%s], cluster UUID [%s]", + verifyRequest.getRepository(), + repositoryData.getGenId(), + repositoryData.getUuid(), + repositoryData.getClusterUUID() + ) + ); + l.onFailure(e); + } + }; + } + }); + } + + private final BlobStoreRepository blobStoreRepository; + private final NodeClient client; + private final ActionListener finalListener; + private final RefCountingRunnable finalRefs = new RefCountingRunnable(this::onCompletion); + private final String repositoryName; + private final VerifyRepositoryIntegrityAction.Request verifyRequest; + private final RepositoryData repositoryData; + private final BooleanSupplier isCancelledSupplier; + private final VerifyRepositoryIntegrityAction.Task task; + private final TaskId taskId; + private final AtomicLong anomalyCount = new AtomicLong(); + private final Map snapshotDescriptionsById = ConcurrentCollections.newConcurrentMap(); + private final CancellableRunner metadataTaskRunner; + private final CancellableRunner snapshotTaskRunner; + private final String resultsIndex; + private final RateLimiter rateLimiter; + + private final long snapshotCount; + private final AtomicLong snapshotProgress = new AtomicLong(); + private final long indexCount; + private final AtomicLong indexProgress = new AtomicLong(); + private final long indexSnapshotCount; + private final AtomicLong indexSnapshotProgress = new AtomicLong(); + private final AtomicLong blobsVerified = new AtomicLong(); + private final AtomicLong blobBytesVerified = new AtomicLong(); + private final AtomicLong throttledNanos; + + MetadataVerifier( + BlobStoreRepository blobStoreRepository, + NodeClient client, + VerifyRepositoryIntegrityAction.Request verifyRequest, + RepositoryData repositoryData, + CancellableThreads cancellableThreads, + VerifyRepositoryIntegrityAction.Task task, + ActionListener finalListener + ) { + this.blobStoreRepository = blobStoreRepository; + this.repositoryName = blobStoreRepository.getMetadata().name(); + this.client = client; + this.verifyRequest = verifyRequest; + this.repositoryData = repositoryData; + this.isCancelledSupplier = cancellableThreads::isCancelled; + this.task = task; + this.taskId = new TaskId(client.getLocalNodeId(), task.getId()); + this.finalListener = finalListener; + this.snapshotTaskRunner = new CancellableRunner( + new ThrottledTaskRunner( + "verify-blob", + verifyRequest.getBlobThreadPoolConcurrency(), + blobStoreRepository.threadPool().executor(ThreadPool.Names.SNAPSHOT) + ), + cancellableThreads + ); + this.metadataTaskRunner = new CancellableRunner( + new ThrottledTaskRunner( + "verify-metadata", + verifyRequest.getMetaThreadPoolConcurrency(), + blobStoreRepository.threadPool().executor(ThreadPool.Names.SNAPSHOT_META) + ), + cancellableThreads + ); + this.resultsIndex = verifyRequest.getResultsIndex(); + + this.snapshotCount = repositoryData.getSnapshotIds().size(); + this.indexCount = repositoryData.getIndices().size(); + this.indexSnapshotCount = repositoryData.getIndexSnapshotCount(); + this.rateLimiter = new RateLimiter.SimpleRateLimiter(verifyRequest.getMaxBytesPerSec().getMbFrac()); + + this.throttledNanos = new AtomicLong(verifyRequest.getVerifyBlobContents() ? 1 : 0); // nonzero if verifying so status reported + } + + @Override + public void close() { + finalRefs.close(); + } + + private VerifyRepositoryIntegrityAction.Status getStatus() { + return new VerifyRepositoryIntegrityAction.Status( + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + snapshotCount, + snapshotProgress.get(), + indexCount, + indexProgress.get(), + indexSnapshotCount, + indexSnapshotProgress.get(), + blobsVerified.get(), + blobBytesVerified.get(), + throttledNanos.get(), + anomalyCount.get(), + resultsIndex + ); + } + + private VerifyRepositoryIntegrityAction.Response start() { + task.setStatusSupplier(this::getStatus); + verifySnapshots(this::verifyIndices); + return new VerifyRepositoryIntegrityAction.Response( + taskId, + repositoryName, + repositoryData.getGenId(), + repositoryData.getUuid(), + snapshotCount, + indexCount, + indexSnapshotCount, + resultsIndex + ); + } + + private void verifySnapshots(Runnable onCompletion) { + runThrottled( + repositoryData.getSnapshotIds().iterator(), + this::verifySnapshot, + verifyRequest.getSnapshotVerificationConcurrency(), + snapshotProgress, + wrapRunnable(finalRefs.acquire(), onCompletion) + ); + } + + private record SnapshotDescription(SnapshotId snapshotId, long startTimeMillis, long endTimeMillis) { + void writeXContent(XContentBuilder builder) throws IOException { + builder.startObject("snapshot"); + builder.field("id", snapshotId.getUUID()); + builder.field("name", snapshotId.getName()); + if (startTimeMillis != 0) { + builder.field("start_time", dateFormatter.format(Instant.ofEpochMilli(startTimeMillis))); + } + if (endTimeMillis != 0) { + builder.field("end_time", dateFormatter.format(Instant.ofEpochMilli(endTimeMillis))); + } + builder.endObject(); + } + } + + private void verifySnapshot(Releasable releasable, SnapshotId snapshotId) { + try (var snapshotRefs = new RefCountingRunnable(releasable::close)) { + if (isCancelledSupplier.getAsBoolean()) { + // getSnapshotInfo does its own forking so we must check for cancellation here + return; + } + + blobStoreRepository.getSnapshotInfo(snapshotId, ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(SnapshotInfo snapshotInfo) { + final var snapshotDescription = new SnapshotDescription(snapshotId, snapshotInfo.startTime(), snapshotInfo.endTime()); + snapshotDescriptionsById.put(snapshotId.getUUID(), snapshotDescription); + metadataTaskRunner.run(ActionRunnable.run(snapshotRefs.acquireListener(), () -> { + try { + blobStoreRepository.getSnapshotGlobalMetadata(snapshotDescription.snapshotId()); + // no checks here, loading it is enough + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_GLOBAL_METADATA, snapshotRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + } + })); + } + + @Override + public void onFailure(Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_SNAPSHOT_INFO, snapshotRefs.acquire(), (builder, params) -> { + new SnapshotDescription(snapshotId, 0, 0).writeXContent(builder); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + } + }, snapshotRefs.acquire())); + } + } + + private void verifyIndices() { + runThrottled( + repositoryData.getIndices().values().iterator(), + (releasable, indexId) -> new IndexVerifier(releasable, indexId).run(), + verifyRequest.getIndexVerificationConcurrency(), + indexProgress, + wrapRunnable(finalRefs.acquire(), () -> {}) + ); + } + + private record IndexDescription(IndexId indexId, String indexMetadataBlob, int shardCount) { + void writeXContent(XContentBuilder builder) throws IOException { + writeIndexId(indexId, builder, b -> b.field("metadata_blob", indexMetadataBlob).field("shards", shardCount)); + } + } + + private record ShardContainerContents(int shardId, Map blobsByName, @Nullable // if it could not be read + BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots, Map> blobContentsListeners) {} + + private class IndexVerifier { + private final RefCountingRunnable indexRefs; + private final IndexId indexId; + private final Map> shardContainerContentsListener = newConcurrentMap(); + private final Map> indexDescriptionListenersByBlobId = newConcurrentMap(); + private final AtomicInteger totalSnapshotCounter = new AtomicInteger(); + private final AtomicInteger restorableSnapshotCounter = new AtomicInteger(); + + IndexVerifier(Releasable releasable, IndexId indexId) { + this.indexRefs = new RefCountingRunnable(releasable::close); + this.indexId = indexId; + } + + void run() { + runThrottled( + repositoryData.getSnapshots(indexId).iterator(), + this::verifyIndexSnapshot, + verifyRequest.getIndexSnapshotVerificationConcurrency(), + indexSnapshotProgress, + wrapRunnable(indexRefs, () -> recordRestorability(totalSnapshotCounter.get(), restorableSnapshotCounter.get())) + ); + } + + private void recordRestorability(int totalSnapshotCount, int restorableSnapshotCount) { + if (isCancelledSupplier.getAsBoolean() == false) { + addResult(indexRefs.acquire(), (builder, params) -> { + writeIndexId(indexId, builder, b -> {}); + builder.field( + "restorability", + totalSnapshotCount == restorableSnapshotCount ? "full" : 0 < restorableSnapshotCount ? "partial" : "none" + ); + builder.field("total_snapshots", totalSnapshotCount); + builder.field("restorable_snapshots", restorableSnapshotCount); + builder.field("unrestorable_snapshots", totalSnapshotCount - restorableSnapshotCount); + return builder; + }); + } + } + + private void verifyIndexSnapshot(Releasable releasable, SnapshotId snapshotId) { + try (var indexSnapshotRefs = new RefCountingRunnable(releasable::close)) { + totalSnapshotCounter.incrementAndGet(); + + final var snapshotDescription = snapshotDescriptionsById.get(snapshotId.getUUID()); + if (snapshotDescription == null) { + addAnomaly(Anomaly.UNKNOWN_SNAPSHOT_FOR_INDEX, indexSnapshotRefs.acquire(), (builder, params) -> { + writeIndexId(indexId, builder, b -> {}); + new SnapshotDescription(snapshotId, 0, 0).writeXContent(builder); + return builder; + }); + return; + } + + final var indexMetaBlobId = repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId); + indexDescriptionListeners(snapshotId, indexMetaBlobId).addListener( + makeListener( + indexSnapshotRefs.acquire(), + indexDescription -> verifyShardSnapshots(snapshotDescription, indexDescription, indexSnapshotRefs.acquire()) + ) + ); + } + } + + private void verifyShardSnapshots( + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + Releasable releasable + ) { + final var restorableShardCount = new AtomicInteger(); + try (var shardSnapshotsRefs = new RefCountingRunnable(wrapRunnable(releasable, () -> { + if (indexDescription.shardCount() == restorableShardCount.get()) { + restorableSnapshotCounter.incrementAndGet(); + } + }))) { + for (int shardId = 0; shardId < indexDescription.shardCount(); shardId++) { + shardContainerContentsListeners(indexDescription, shardId).addListener( + makeListener( + shardSnapshotsRefs.acquire(), + shardContainerContents -> metadataTaskRunner.run( + ActionRunnable.run( + shardSnapshotsRefs.acquireListener(), + () -> verifyShardSnapshot( + snapshotDescription, + indexDescription, + shardContainerContents, + restorableShardCount, + shardSnapshotsRefs + ) + ) + ) + ) + ); + } + } + } + + private void verifyShardSnapshot( + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + ShardContainerContents shardContainerContents, + AtomicInteger restorableShardCount, + RefCountingRunnable shardSnapshotsRefs + ) throws AnomalyException { + final var shardId = shardContainerContents.shardId(); + final BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot; + try { + blobStoreIndexShardSnapshot = blobStoreRepository.loadShardSnapshot( + blobStoreRepository.shardContainer(indexId, shardId), + snapshotDescription.snapshotId() + ); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_SHARD_SNAPSHOT, shardSnapshotsRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + throw new AnomalyException(e); + } + + final var restorable = new AtomicBoolean(true); + runThrottled( + blobStoreIndexShardSnapshot.indexFiles().iterator(), + (releasable, fileInfo) -> verifyFileInfo( + releasable, + snapshotDescription, + indexDescription, + shardContainerContents, + restorable, + fileInfo + ), + 1, + blobsVerified, + wrapRunnable(shardSnapshotsRefs.acquire(), () -> { + if (restorable.get()) { + restorableShardCount.incrementAndGet(); + } + }) + ); + + verifyConsistentShardFiles( + snapshotDescription, + indexDescription, + shardContainerContents, + blobStoreIndexShardSnapshot, + shardSnapshotsRefs + ); + } + + private void verifyConsistentShardFiles( + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + ShardContainerContents shardContainerContents, + BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot, + RefCountingRunnable shardSnapshotsRefs + ) { + final var blobStoreIndexShardSnapshots = shardContainerContents.blobStoreIndexShardSnapshots(); + if (blobStoreIndexShardSnapshots == null) { + // already reported + return; + } + + final var shardId = shardContainerContents.shardId(); + for (SnapshotFiles summary : blobStoreIndexShardSnapshots.snapshots()) { + if (summary.snapshot().equals(snapshotDescription.snapshotId().getName()) == false) { + continue; + } + + final var snapshotFiles = blobStoreIndexShardSnapshot.indexFiles() + .stream() + .collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, Function.identity())); + + for (final var summaryFile : summary.indexFiles()) { + final var snapshotFile = snapshotFiles.get(summaryFile.physicalName()); + if (snapshotFile == null) { + addAnomaly(Anomaly.FILE_IN_SHARD_GENERATION_NOT_SNAPSHOT, shardSnapshotsRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", summaryFile.physicalName()); + return builder; + }); + } else if (summaryFile.isSame(snapshotFile) == false) { + addAnomaly(Anomaly.SNAPSHOT_SHARD_GENERATION_MISMATCH, shardSnapshotsRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", summaryFile.physicalName()); + return builder; + }); + } + } + + final var summaryFiles = summary.indexFiles() + .stream() + .collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, Function.identity())); + for (final var snapshotFile : blobStoreIndexShardSnapshot.indexFiles()) { + if (summaryFiles.get(snapshotFile.physicalName()) == null) { + addAnomaly(Anomaly.FILE_IN_SNAPSHOT_NOT_SHARD_GENERATION, shardSnapshotsRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("file_name", snapshotFile.physicalName()); + return builder; + }); + } + } + + return; + } + + addAnomaly(Anomaly.SNAPSHOT_NOT_IN_SHARD_GENERATION, shardSnapshotsRefs.acquire(), (builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + return builder; + }); + } + + private void verifyFileInfo( + Releasable releasable, + SnapshotDescription snapshotDescription, + IndexDescription indexDescription, + ShardContainerContents shardContainerContents, + AtomicBoolean restorable, + BlobStoreIndexShardSnapshot.FileInfo fileInfo + ) { + try (var fileRefs = new RefCountingRunnable(releasable::close)) { + if (fileInfo.metadata().hashEqualsContents()) { + return; + } + + final var shardId = shardContainerContents.shardId(); + final var shardBlobs = shardContainerContents.blobsByName(); + final var fileLength = ByteSizeValue.ofBytes(fileInfo.length()); + for (int part = 0; part < fileInfo.numberOfParts(); part++) { + final var finalPart = part; + final var blobName = fileInfo.partName(part); + final var blobInfo = shardBlobs.get(blobName); + final var partLength = ByteSizeValue.ofBytes(fileInfo.partBytes(part)); + if (blobInfo == null) { + restorable.set(false); + addAnomaly(Anomaly.MISSING_BLOB, fileRefs.acquire(), ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("blob_name", blobName); + builder.field("file_name", fileInfo.physicalName()); + builder.field("part", finalPart); + builder.field("number_of_parts", fileInfo.numberOfParts()); + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength); + builder.humanReadableField("part_length_in_bytes", "part_length", partLength); + return builder; + })); + return; + } else if (blobInfo.length() != partLength.getBytes()) { + restorable.set(false); + addAnomaly(Anomaly.MISMATCHED_BLOB_LENGTH, fileRefs.acquire(), ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("blob_name", blobName); + builder.field("file_name", fileInfo.physicalName()); + builder.field("part", finalPart); + builder.field("number_of_parts", fileInfo.numberOfParts()); + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength); + builder.humanReadableField("part_length_in_bytes", "part_length", partLength); + builder.humanReadableField("actual_length_in_bytes", "actual_length", ByteSizeValue.ofBytes(blobInfo.length())); + return builder; + })); + return; + } + } + + blobContentsListeners(indexDescription, shardContainerContents, fileInfo).addListener( + makeListener(fileRefs.acquire(), (Void ignored) -> {}).delegateResponse((l, e) -> { + restorable.set(false); + addAnomaly(Anomaly.CORRUPT_DATA_BLOB, fileRefs.acquire(), ((builder, params) -> { + snapshotDescription.writeXContent(builder); + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + builder.field("blob_name", fileInfo.name()); + builder.field("file_name", fileInfo.physicalName()); + builder.field("number_of_parts", fileInfo.numberOfParts()); + builder.humanReadableField("file_length_in_bytes", "file_length", fileLength); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + })); + l.onResponse(null); + }) + ); + } + } + + private ListenableActionFuture indexDescriptionListeners(SnapshotId snapshotId, String indexMetaBlobId) { + return indexDescriptionListenersByBlobId.computeIfAbsent(indexMetaBlobId, ignored -> { + final var indexDescriptionListener = new ListenableActionFuture(); + metadataTaskRunner.run(ActionRunnable.supply(indexDescriptionListener, () -> { + try { + return new IndexDescription( + indexId, + indexMetaBlobId, + blobStoreRepository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId).getNumberOfShards() + ); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_INDEX_METADATA, indexRefs.acquire(), (builder, params) -> { + writeIndexId(indexId, builder, b -> b.field("metadata_blob", indexMetaBlobId)); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + throw new AnomalyException(e); + } + })); + return indexDescriptionListener; + }); + } + + private ListenableActionFuture shardContainerContentsListeners( + IndexDescription indexDescription, + int shardId + ) { + return shardContainerContentsListener.computeIfAbsent(shardId, ignored -> { + final var shardContainerContentsFuture = new ListenableActionFuture(); + metadataTaskRunner.run(ActionRunnable.supply(shardContainerContentsFuture, () -> { + final Map blobsByName; + try { + blobsByName = blobStoreRepository.shardContainer(indexId, shardId).listBlobs(); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LIST_SHARD_CONTAINER, indexRefs.acquire(), (builder, params) -> { + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + throw new AnomalyException(e); + } + + final var shardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); + if (shardGen == null) { + addAnomaly(Anomaly.UNDEFINED_SHARD_GENERATION, indexRefs.acquire(), (builder, params) -> { + indexDescription.writeXContent(builder); + return builder.field("shard", shardId); + }); + throw new AnomalyException( + new ElasticsearchException("undefined shard generation for " + indexId + "[" + shardId + "]") + ); + } + + return new ShardContainerContents( + shardId, + blobsByName, + loadShardGeneration(indexDescription, shardId, shardGen), + ConcurrentCollections.newConcurrentMap() + ); + })); + return shardContainerContentsFuture; + }); + } + + private BlobStoreIndexShardSnapshots loadShardGeneration(IndexDescription indexDescription, int shardId, ShardGeneration shardGen) { + try { + return blobStoreRepository.getBlobStoreIndexShardSnapshots(indexId, shardId, shardGen); + } catch (Exception e) { + addAnomaly(Anomaly.FAILED_TO_LOAD_SHARD_GENERATION, indexRefs.acquire(), (builder, params) -> { + indexDescription.writeXContent(builder); + builder.field("shard", shardId); + ElasticsearchException.generateFailureXContent(builder, params, e, true); + return builder; + }); + // failing here is not fatal to snapshot restores, only to creating/deleting snapshots, so we can carry on with the analysis + return null; + } + } + + private ListenableActionFuture blobContentsListeners( + IndexDescription indexDescription, + ShardContainerContents shardContainerContents, + BlobStoreIndexShardSnapshot.FileInfo fileInfo + ) { + return shardContainerContents.blobContentsListeners().computeIfAbsent(fileInfo.name(), ignored -> { + var listener = new ListenableActionFuture(); + + if (verifyRequest.getVerifyBlobContents()) { + // TODO do this on a remote node? + snapshotTaskRunner.run(ActionRunnable.run(listener, () -> { + try (var slicedStream = new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(int slice) throws IOException { + return blobStoreRepository.shardContainer(indexDescription.indexId(), shardContainerContents.shardId()) + .readBlob(fileInfo.partName(slice)); + } + }; + var rateLimitedStream = new RateLimitingInputStream(slicedStream, () -> rateLimiter, throttledNanos::addAndGet); + var indexInput = new IndexInputWrapper(rateLimitedStream, fileInfo.length()) + ) { + CodecUtil.checksumEntireFile(indexInput); + } + })); + } else { + blobBytesVerified.addAndGet(fileInfo.length()); + listener.onResponse(null); + } + + return listener; + }); + } + } + + private ActionListener makeListener(Releasable releasable, CheckedConsumer consumer) { + try (var refs = new RefCountingRunnable(releasable::close)) { + return ActionListener.releaseAfter(ActionListener.wrap(consumer, exception -> { + if (isCancelledSupplier.getAsBoolean() && exception instanceof TaskCancelledException) { + return; + } + if (exception instanceof AnomalyException) { + // already reported + return; + } + addAnomaly(Anomaly.UNEXPECTED_EXCEPTION, refs.acquire(), (builder, params) -> { + ElasticsearchException.generateFailureXContent(builder, params, exception, true); + return builder; + }); + }), refs.acquire()); + } + } + + private Runnable wrapRunnable(Releasable releasable, Runnable runnable) { + return () -> { + try (releasable) { + runnable.run(); + } + }; + } + + private void onCompletion() { + try ( + var completionRefs = new RefCountingRunnable( + () -> client.admin() + .indices() + .prepareFlush(resultsIndex) + .execute( + finalListener.delegateFailure( + (l1, ignored1) -> client.admin() + .indices() + .prepareRefresh(resultsIndex) + .execute(l1.delegateFailure((l2, ignored2) -> l2.onResponse(anomalyCount.get()))) + ) + ) + ) + ) { + blobStoreRepository.getRepositoryData(makeListener(completionRefs.acquire(), finalRepositoryData -> { + final var finalRepositoryGeneration = finalRepositoryData.getGenId(); + addResult(completionRefs.acquire(), (builder, params) -> { + builder.field("completed", true); + builder.field("cancelled", isCancelledSupplier.getAsBoolean()); + builder.field("final_repository_generation", finalRepositoryGeneration); + builder.field("total_anomalies", anomalyCount.get()); + return builder; + }); + })); + } + } + + private static void runThrottled( + Iterator iterator, + BiConsumer itemConsumer, + int maxConcurrency, + AtomicLong progressCounter, + Runnable onCompletion + ) { + ThrottledIterator.run(iterator, itemConsumer, maxConcurrency, progressCounter::incrementAndGet, onCompletion); + } + + private final Queue> pendingResults = ConcurrentCollections.newQueue(); + private final Semaphore resultsIndexingSemaphore = new Semaphore(1); + + private void processPendingResults() { + while (resultsIndexingSemaphore.tryAcquire()) { + final var bulkRequest = new BulkRequest(); + final var completionActions = new ArrayList(); + + Tuple nextItem; + while ((nextItem = pendingResults.poll()) != null) { + bulkRequest.add(nextItem.v1()); + completionActions.add(nextItem.v2()); + } + + if (completionActions.isEmpty()) { + resultsIndexingSemaphore.release(); + return; + } + + final var isRecursing = new AtomicBoolean(true); + client.bulk(bulkRequest, ActionListener.runAfter(ActionListener.wrap(bulkResponse -> { + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (bulkItemResponse.isFailed()) { + logger.error("error indexing result", bulkItemResponse.getFailure().getCause()); + } + } + }, e -> logger.error("error indexing results", e)), () -> { + resultsIndexingSemaphore.release(); + for (final var completionAction : completionActions) { + completionAction.run(); + } + if (isRecursing.get() == false) { + processPendingResults(); + } + })); + isRecursing.set(false); + } + } + + private static final DateFormatter dateFormatter = DateFormatter.forPattern(FormatNames.ISO8601.getName()).withLocale(Locale.ROOT); + + private IndexRequest buildResultDoc(ToXContent toXContent) { + try (var builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + builder.field( + "@timestamp", + dateFormatter.format(Instant.ofEpochMilli(blobStoreRepository.threadPool().absoluteTimeInMillis())) + ); + builder.field("repository", repositoryName); + builder.field("uuid", repositoryData.getUuid()); + builder.field("repository_generation", repositoryData.getGenId()); + builder.field("task", taskId.toString()); + toXContent.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + return new IndexRequestBuilder(client, IndexAction.INSTANCE, resultsIndex).setSource(builder).request(); + } catch (Exception e) { + logger.error("error generating failure output", e); + return null; + } + } + + private void addAnomaly(Anomaly anomaly, Releasable releasable, ToXContent toXContent) { + if (isCancelledSupplier.getAsBoolean()) { + releasable.close(); + } else { + anomalyCount.incrementAndGet(); + addResult(releasable, (builder, params) -> toXContent.toXContent(builder.field("anomaly", anomaly.toString()), params)); + } + } + + private void addResult(Releasable releasable, ToXContent toXContent) { + IndexRequest indexRequest = buildResultDoc(toXContent); + if (indexRequest == null) { + releasable.close(); + } else { + logger.debug(() -> Strings.format("recording result document: %s", indexRequest.source().utf8ToString())); + pendingResults.add(Tuple.tuple(indexRequest, releasable::close)); + processPendingResults(); + } + } + + public long getSnapshotCount() { + return snapshotCount; + } + + public long getIndexCount() { + return indexCount; + } + + public long getIndexSnapshotCount() { + return indexSnapshotCount; + } + + private static void writeIndexId(IndexId indexId, XContentBuilder builder, CheckedConsumer extra) + throws IOException { + builder.startObject("index"); + builder.field("id", indexId.getId()); + builder.field("name", indexId.getName()); + extra.accept(builder); + builder.endObject(); + } + + private class IndexInputWrapper extends IndexInput { + private final InputStream inputStream; + private final long length; + long filePointer = 0L; + + IndexInputWrapper(InputStream inputStream, long length) { + super(""); + this.inputStream = inputStream; + this.length = length; + } + + @Override + public byte readByte() throws IOException { + if (isCancelledSupplier.getAsBoolean()) { + throw new TaskCancelledException("task cancelled"); + } + final var read = inputStream.read(); + if (read == -1) { + throw new EOFException(); + } + filePointer += 1; + blobBytesVerified.incrementAndGet(); + return (byte) read; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + while (len > 0) { + if (isCancelledSupplier.getAsBoolean()) { + throw new TaskCancelledException("task cancelled"); + } + final var read = inputStream.read(b, offset, len); + if (read == -1) { + throw new EOFException(); + } + filePointer += read; + blobBytesVerified.addAndGet(read); + len -= read; + offset += read; + } + } + + @Override + public void close() {} + + @Override + public long getFilePointer() { + return filePointer; + } + + @Override + public void seek(long pos) { + if (filePointer != pos) { + assert false : "cannot seek"; + throw new UnsupportedOperationException("seek"); + } + } + + @Override + public long length() { + return length; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + assert false; + throw new UnsupportedOperationException("slice"); + } + } + + private static class CancellableRunner { + private final ThrottledTaskRunner delegate; + private final CancellableThreads cancellableThreads; + + CancellableRunner(ThrottledTaskRunner delegate, CancellableThreads cancellableThreads) { + this.delegate = delegate; + this.cancellableThreads = cancellableThreads; + } + + void run(AbstractRunnable runnable) { + delegate.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + if (cancellableThreads.isCancelled()) { + runnable.onFailure(new TaskCancelledException("task cancelled")); + } else { + cancellableThreads.execute(runnable::run); + } + } + } + + @Override + public void onFailure(Exception e) { + runnable.onFailure(e); + } + }); + } + } + + private static class AnomalyException extends Exception { + AnomalyException(Exception cause) { + super(cause); + } + } + +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/RestVerifyRepositoryIntegrityAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/RestVerifyRepositoryIntegrityAction.java new file mode 100644 index 0000000000000..eb4b3d9902755 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/RestVerifyRepositoryIntegrityAction.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.integrity; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestVerifyRepositoryIntegrityAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(POST, "/_snapshot/{repository}/_verify_integrity")); + } + + @Override + public String getName() { + return "verify_repository_integrity_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final var verifyRequest = new VerifyRepositoryIntegrityAction.Request( + request.param("repository"), + request.param("results_index", ""), + request.paramAsInt("metadata_threads", 0), + request.paramAsInt("data_threads", 0), + request.paramAsInt("snapshot_verification_concurrency", 0), + request.paramAsInt("index_verification_concurrency", 0), + request.paramAsInt("index_snapshot_verification_concurrency", 0), + request.paramAsBoolean("verify_blob_contents", false), + request.paramAsSize("max_verify_bytes_per_sec", ByteSizeValue.ofMb(10)), + request.paramAsBoolean("wait_for_completion", false) + ); + verifyRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRequest.masterNodeTimeout())); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + VerifyRepositoryIntegrityAction.INSTANCE, + verifyRequest, + new RestToXContentListener<>(channel) + ); + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityAction.java new file mode 100644 index 0000000000000..913131460f4bb --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityAction.java @@ -0,0 +1,561 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.integrity; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.elasticsearch.common.Strings.isNullOrBlank; +import static org.elasticsearch.core.Strings.format; + +public class VerifyRepositoryIntegrityAction extends ActionType { + + public static final VerifyRepositoryIntegrityAction INSTANCE = new VerifyRepositoryIntegrityAction(); + public static final String NAME = "cluster:admin/repository/verify_integrity"; + + private VerifyRepositoryIntegrityAction() { + super(NAME, VerifyRepositoryIntegrityAction.Response::new); + } + + public static class Request extends MasterNodeReadRequest { + + private final String repository; + private final String resultsIndex; + private final int metaThreadPoolConcurrency; + private final int blobThreadPoolConcurrency; + private final int snapshotVerificationConcurrency; + private final int indexVerificationConcurrency; + private final int indexSnapshotVerificationConcurrency; + private final boolean verifyBlobContents; + private final ByteSizeValue maxBytesPerSec; + private final boolean waitForCompletion; + + public Request( + String repository, + String resultsIndex, + int metaThreadPoolConcurrency, + int blobThreadPoolConcurrency, + int snapshotVerificationConcurrency, + int indexVerificationConcurrency, + int indexSnapshotVerificationConcurrency, + boolean verifyBlobContents, + ByteSizeValue maxBytesPerSec, + boolean waitForCompletion + ) { + this.repository = Objects.requireNonNull(repository, "repository"); + this.resultsIndex = Objects.requireNonNull(resultsIndex, "resultsIndex"); + this.metaThreadPoolConcurrency = requireNonNegative("metaThreadPoolConcurrency", metaThreadPoolConcurrency); + this.blobThreadPoolConcurrency = requireNonNegative("blobThreadPoolConcurrency", blobThreadPoolConcurrency); + this.snapshotVerificationConcurrency = requireNonNegative("snapshotVerificationConcurrency", snapshotVerificationConcurrency); + this.indexVerificationConcurrency = requireNonNegative("indexVerificationConcurrency", indexVerificationConcurrency); + this.indexSnapshotVerificationConcurrency = requireNonNegative( + "indexSnapshotVerificationConcurrency", + indexSnapshotVerificationConcurrency + ); + this.verifyBlobContents = verifyBlobContents; + if (maxBytesPerSec.getBytes() < 1) { + throw new IllegalArgumentException("invalid rate limit"); + } + this.maxBytesPerSec = maxBytesPerSec; + this.waitForCompletion = waitForCompletion; + } + + private static int requireNonNegative(String name, int value) { + if (value < 0) { + throw new IllegalArgumentException("argument [" + name + "] must be at least [0]"); + } + return value; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.repository = in.readString(); + this.resultsIndex = in.readString(); + this.metaThreadPoolConcurrency = in.readVInt(); + this.blobThreadPoolConcurrency = in.readVInt(); + this.snapshotVerificationConcurrency = in.readVInt(); + this.indexVerificationConcurrency = in.readVInt(); + this.indexSnapshotVerificationConcurrency = in.readVInt(); + this.verifyBlobContents = in.readBoolean(); + this.maxBytesPerSec = ByteSizeValue.readFrom(in); + this.waitForCompletion = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeString(resultsIndex); + out.writeVInt(metaThreadPoolConcurrency); + out.writeVInt(blobThreadPoolConcurrency); + out.writeVInt(snapshotVerificationConcurrency); + out.writeVInt(indexVerificationConcurrency); + out.writeVInt(indexSnapshotVerificationConcurrency); + out.writeBoolean(verifyBlobContents); + maxBytesPerSec.writeTo(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new VerifyRepositoryIntegrityAction.Task(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return "verify repository integrity {" + + "repository='" + + repository + + '\'' + + ", resultsIndex='" + + resultsIndex + + '\'' + + ", metaThreadPoolConcurrency=" + + metaThreadPoolConcurrency + + ", blobThreadPoolConcurrency=" + + blobThreadPoolConcurrency + + ", snapshotVerificationConcurrency=" + + snapshotVerificationConcurrency + + ", indexVerificationConcurrency=" + + indexVerificationConcurrency + + ", indexSnapshotVerificationConcurrency=" + + indexSnapshotVerificationConcurrency + + ", verifyBlobContents=" + + verifyBlobContents + + ", maxBytesPerSec=" + + maxBytesPerSec + + ", waitForCompletion=" + + waitForCompletion + + '}'; + } + + public String getRepository() { + return repository; + } + + public String getResultsIndex() { + return resultsIndex; + } + + public int getMetaThreadPoolConcurrency() { + return metaThreadPoolConcurrency; + } + + public int getBlobThreadPoolConcurrency() { + return blobThreadPoolConcurrency; + } + + public int getSnapshotVerificationConcurrency() { + return snapshotVerificationConcurrency; + } + + public int getIndexVerificationConcurrency() { + return indexVerificationConcurrency; + } + + public int getIndexSnapshotVerificationConcurrency() { + return indexSnapshotVerificationConcurrency; + } + + public boolean getVerifyBlobContents() { + return verifyBlobContents; + } + + public ByteSizeValue getMaxBytesPerSec() { + return maxBytesPerSec; + } + + public boolean getWaitForCompletion() { + return waitForCompletion; + } + + public Request withResolvedDefaults(long currentTimeMillis, ThreadPool.Info metadataThreadPoolInfo) { + if (isNullOrBlank(resultsIndex) == false + && metaThreadPoolConcurrency > 0 + && blobThreadPoolConcurrency > 0 + && snapshotVerificationConcurrency > 0 + && indexVerificationConcurrency > 0 + && indexSnapshotVerificationConcurrency > 0) { + return this; + } + + final var maxThreads = Math.max(1, metadataThreadPoolInfo.getMax()); + final var halfMaxThreads = Math.max(1, maxThreads / 2); + final var request = new Request( + repository, + isNullOrBlank(resultsIndex) ? ("repository-metadata-verification-" + repository + "-" + currentTimeMillis) : resultsIndex, + metaThreadPoolConcurrency > 0 ? metaThreadPoolConcurrency : halfMaxThreads, + blobThreadPoolConcurrency > 0 ? blobThreadPoolConcurrency : 1, + snapshotVerificationConcurrency > 0 ? snapshotVerificationConcurrency : halfMaxThreads, + indexVerificationConcurrency > 0 ? indexVerificationConcurrency : maxThreads, + indexSnapshotVerificationConcurrency > 0 ? indexSnapshotVerificationConcurrency : 1, + verifyBlobContents, + maxBytesPerSec, + waitForCompletion + ); + request.masterNodeTimeout(masterNodeTimeout()); + return request; + } + + @Override + public String toString() { + return getDescription(); + } + } + + public static class Task extends CancellableTask { + + private volatile Supplier statusSupplier; + + public Task(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); + } + + public void setStatusSupplier(Supplier statusSupplier) { + this.statusSupplier = statusSupplier; + } + + @Override + public Status getStatus() { + return Optional.ofNullable(statusSupplier).map(Supplier::get).orElse(null); + } + } + + public record Status( + String repositoryName, + long repositoryGeneration, + String repositoryUUID, + long snapshotCount, + long snapshotsVerified, + long indexCount, + long indicesVerified, + long indexSnapshotCount, + long indexSnapshotsVerified, + long blobsVerified, + long blobBytesVerified, + long throttledNanos, + long anomalyCount, + String resultsIndex + ) implements org.elasticsearch.tasks.Task.Status { + + public static String NAME = "verify_repository_status"; + + public Status(StreamInput in) throws IOException { + this( + in.readString(), + in.readVLong(), + in.readString(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readString() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repositoryName); + out.writeVLong(repositoryGeneration); + out.writeString(repositoryUUID); + out.writeVLong(snapshotCount); + out.writeVLong(snapshotsVerified); + out.writeVLong(indexCount); + out.writeVLong(indicesVerified); + out.writeVLong(indexSnapshotCount); + out.writeVLong(indexSnapshotsVerified); + out.writeVLong(blobsVerified); + out.writeVLong(blobBytesVerified); + out.writeVLong(throttledNanos); + out.writeVLong(anomalyCount); + out.writeString(resultsIndex); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("repository"); + builder.field("name", repositoryName); + builder.field("uuid", repositoryUUID); + builder.field("generation", repositoryGeneration); + builder.endObject(); + builder.startObject("snapshots"); + builder.field("verified", snapshotsVerified); + builder.field("total", snapshotCount); + builder.endObject(); + builder.startObject("indices"); + builder.field("verified", indicesVerified); + builder.field("total", indexCount); + builder.endObject(); + builder.startObject("index_snapshots"); + builder.field("verified", indexSnapshotsVerified); + builder.field("total", indexSnapshotCount); + builder.endObject(); + builder.startObject("blobs"); + builder.field("verified", blobsVerified); + builder.humanReadableField("verified_size_in_bytes", "verified_size", ByteSizeValue.ofBytes(blobBytesVerified)); + if (throttledNanos > 0) { + builder.humanReadableField("throttled_time_in_millis", "throttled_time", TimeValue.timeValueNanos(throttledNanos)); + } + builder.endObject(); + builder.field("anomalies", anomalyCount); + builder.field("results_index", resultsIndex); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return NAME; + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final TaskId taskId; + private final String repositoryName; + private final long repositoryGeneration; + private final String repositoryUUID; + private final long snapshotCount; + private final long indexCount; + private final long indexSnapshotCount; + private final String resultsIndex; + + public Response( + TaskId taskId, + String repositoryName, + long repositoryGeneration, + String repositoryUUID, + long snapshotCount, + long indexCount, + long indexSnapshotCount, + String resultsIndex + ) { + this.taskId = taskId; + this.repositoryName = repositoryName; + this.repositoryGeneration = repositoryGeneration; + this.repositoryUUID = repositoryUUID; + this.snapshotCount = snapshotCount; + this.indexCount = indexCount; + this.indexSnapshotCount = indexSnapshotCount; + this.resultsIndex = resultsIndex; + } + + public Response(StreamInput in) throws IOException { + this( + TaskId.readFromStream(in), + in.readString(), + in.readVLong(), + in.readString(), + in.readVLong(), + in.readVLong(), + in.readVLong(), + in.readString() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + taskId.writeTo(out); + out.writeString(repositoryName); + out.writeVLong(repositoryGeneration); + out.writeString(repositoryUUID); + out.writeVLong(snapshotCount); + out.writeVLong(indexCount); + out.writeVLong(indexSnapshotCount); + out.writeString(resultsIndex); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("task", taskId.toString()); + builder.startObject("repository"); + builder.field("name", repositoryName); + builder.field("uuid", repositoryUUID); + builder.field("generation", repositoryGeneration); + builder.endObject(); + builder.startObject("snapshots"); + builder.field("total", snapshotCount); + builder.endObject(); + builder.startObject("indices"); + builder.field("total", indexCount); + builder.endObject(); + builder.startObject("index_snapshots"); + builder.field("total", indexSnapshotCount); + builder.endObject(); + builder.field("results_index", resultsIndex); + builder.endObject(); + return builder; + } + + @Override + public String toString() { + return Strings.toString(this, false, false); + } + + public TaskId getTaskId() { + return taskId; + } + + public String getResultsIndex() { + return resultsIndex; + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + private final RepositoriesService repositoriesService; + private final NodeClient client; + + @Inject + public TransportAction( + TransportService transportService, + ClusterService clusterService, + RepositoriesService repositoriesService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + NodeClient client + ) { + super( + NAME, + true, + transportService, + clusterService, + threadPool, + actionFilters, + Request::new, + indexNameExpressionResolver, + Response::new, + ThreadPool.Names.SNAPSHOT_META + ); + this.repositoriesService = repositoriesService; + this.client = client; + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void masterOperation( + org.elasticsearch.tasks.Task task, + Request request, + ClusterState state, + ActionListener listener + ) { + final BlobStoreRepository repository; + if (repositoriesService.repository(request.getRepository())instanceof BlobStoreRepository blobStoreRepository) { + repository = blobStoreRepository; + } else { + throw new UnsupportedOperationException( + format("repository [%s] does not support metadata verification", request.getRepository()) + ); + } + + // TODO add docs about blob deletions while this is running + final var foregroundTask = (Task) task; + final var backgroundTask = (Task) taskManager.register("background", task.getAction(), request); + final var cancellableThreads = new CancellableThreads(); + + foregroundTask.addListener(() -> cancellableThreads.cancel("foreground task cancelled")); + backgroundTask.addListener(() -> cancellableThreads.cancel("task cancelled")); + + final ClusterStateListener noLongerMasterListener = event -> { + if (event.localNodeMaster() == false) { + cancellableThreads.cancel("no longer master"); + } + }; + clusterService.addListener(noLongerMasterListener); + + final Runnable cleanup = () -> Releasables.closeExpectNoException( + () -> cancellableThreads.cancel("end of task"), + () -> clusterService.removeListener(noLongerMasterListener), + () -> taskManager.unregister(backgroundTask) + ); + + final ActionListener backgroundTaskListener; + final ActionListener foregroundTaskListener; + if (request.getWaitForCompletion()) { + final var responseListener = new ListenableActionFuture(); + final var finalListener = ActionListener.notifyOnce(listener); + foregroundTaskListener = finalListener.delegateFailure((l, r) -> responseListener.onResponse(r)); + backgroundTaskListener = ActionListener.runAfter( + finalListener.delegateFailure((l, v) -> responseListener.addListener(l)), + cleanup + ); + } else { + foregroundTaskListener = listener; + backgroundTaskListener = ActionListener.wrap(cleanup); + } + + MetadataVerifier.run( + repository, + client, + request.withResolvedDefaults( + clusterService.threadPool().absoluteTimeInMillis(), + clusterService.threadPool().info(ThreadPool.Names.SNAPSHOT_META) + ), + cancellableThreads, + backgroundTask, + foregroundTaskListener.delegateResponse( + (l, e) -> Releasables.closeExpectNoException(() -> backgroundTaskListener.onFailure(e), () -> l.onFailure(e)) + ), + backgroundTaskListener + ); + } + } +} diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java index 02eb4805421cc..ddb8c29e5106d 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/SnapshotRepositoryTestKit.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -18,8 +19,11 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.blobstore.integrity.RestVerifyRepositoryIntegrityAction; +import org.elasticsearch.repositories.blobstore.integrity.VerifyRepositoryIntegrityAction; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -33,7 +37,8 @@ public class SnapshotRepositoryTestKit extends Plugin implements ActionPlugin { return List.of( new ActionHandler<>(RepositoryAnalyzeAction.INSTANCE, RepositoryAnalyzeAction.TransportAction.class), new ActionHandler<>(BlobAnalyzeAction.INSTANCE, BlobAnalyzeAction.TransportAction.class), - new ActionHandler<>(GetBlobChecksumAction.INSTANCE, GetBlobChecksumAction.TransportAction.class) + new ActionHandler<>(GetBlobChecksumAction.INSTANCE, GetBlobChecksumAction.TransportAction.class), + new ActionHandler<>(VerifyRepositoryIntegrityAction.INSTANCE, VerifyRepositoryIntegrityAction.TransportAction.class) ); } @@ -47,7 +52,7 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of(new RestRepositoryAnalyzeAction()); + return List.of(new RestRepositoryAnalyzeAction(), new RestVerifyRepositoryIntegrityAction()); } static void humanReadableNanos(XContentBuilder builder, String rawFieldName, String readableFieldName, long nanos) throws IOException { @@ -59,4 +64,15 @@ static void humanReadableNanos(XContentBuilder builder, String rawFieldName, Str builder.field(rawFieldName, nanos); } + + @Override + public List getNamedWriteables() { + return List.of( + new NamedWriteableRegistry.Entry( + Task.Status.class, + VerifyRepositoryIntegrityAction.Status.NAME, + VerifyRepositoryIntegrityAction.Status::new + ) + ); + } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/test/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityActionTests.java b/x-pack/plugin/snapshot-repo-test-kit/src/test/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityActionTests.java new file mode 100644 index 0000000000000..c4384dfa6ebb6 --- /dev/null +++ b/x-pack/plugin/snapshot-repo-test-kit/src/test/java/org/elasticsearch/repositories/blobstore/integrity/VerifyRepositoryIntegrityActionTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.repositories.blobstore.integrity; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class VerifyRepositoryIntegrityActionTests extends ESTestCase { + + public void testStatusToXContent() throws IOException { + try (var builder = XContentFactory.jsonBuilder()) { + builder.humanReadable(true); + new VerifyRepositoryIntegrityAction.Status( + "repo", + 1, + "uuid", + 2, + 3, + 4, + 5, + 6, + 7, + 8, + ByteSizeValue.ofKb(4000).getBytes(), + TimeValue.timeValueSeconds(350).nanos(), + 9, + "results" + ).toXContent(builder, new ToXContent.MapParams(Map.of("human", "true"))); + assertThat(BytesReference.bytes(builder).utf8ToString(), equalTo(""" + {"repository":{"name":"repo","uuid":"uuid","generation":1},"snapshots":{"verified":3,"total":2},\ + "indices":{"verified":5,"total":4},"index_snapshots":{"verified":7,"total":6},\ + "blobs":{"verified":8,"verified_size":"3.9mb","verified_size_in_bytes":4096000,\ + "throttled_time":"5.8m","throttled_time_in_millis":350000},"anomalies":9,"results_index":"results"}""")); + } + + try (var builder = XContentFactory.jsonBuilder()) { + builder.humanReadable(true); + new VerifyRepositoryIntegrityAction.Status( + "repo", + 1, + "uuid", + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 0, + TimeValue.timeValueSeconds(350).nanos(), + 9, + "results" + ).toXContent(builder, new ToXContent.MapParams(Map.of("human", "true"))); + assertThat(BytesReference.bytes(builder).utf8ToString(), equalTo(""" + {"repository":{"name":"repo","uuid":"uuid","generation":1},"snapshots":{"verified":3,"total":2},\ + "indices":{"verified":5,"total":4},"index_snapshots":{"verified":7,"total":6},\ + "blobs":{"verified":8},"anomalies":9,"results_index":"results"}""")); + } + } + + // TODO wire serialization tests +}