From 02cc01ddad82c6de2add6b94651ae5b01c2fd538 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 4 Mar 2021 16:38:08 +0000 Subject: [PATCH] Avoid atomic write of large blobs in repo analyzer (#69960) Today we randomly perform an atomic write even if there are no early reads, but we should only do so if the blob is small enough to write atomically. --- .../rest-api-spec/test/10_analyze.yml | 32 +++++++++++++++++++ .../blobstore/testkit/BlobAnalyzeAction.java | 22 ++++++++++--- .../testkit/RepositoryAnalyzeAction.java | 3 +- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/test/resources/rest-api-spec/test/10_analyze.yml b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/test/resources/rest-api-spec/test/10_analyze.yml index cc7d9e836fe38..8701ac20d816b 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/test/resources/rest-api-spec/test/10_analyze.yml +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/test/resources/rest-api-spec/test/10_analyze.yml @@ -18,6 +18,15 @@ setup: readonly: true location: "test_repo_loc" + - do: + snapshot.create_repository: + repository: test_repo_slow + body: + type: fs + settings: + max_snapshot_bytes_per_sec: "1b" + location: "test_repo_loc" + --- "Analysis fails on readonly repositories": - skip: @@ -145,3 +154,26 @@ setup: - is_false: summary.read.max_wait - is_false: summary.read.total_throttled - is_false: summary.read.total_elapsed + +--- +"Timeout with large blobs": + - skip: + version: "- 7.11.99" + reason: "introduced in 7.12" + + - do: + catch: request + snapshot.repository_analyze: + repository: test_repo_slow + blob_count: 1 + concurrency: 1 + max_blob_size: 2gb + max_total_data_size: 2gb + detailed: false + human: false + timeout: 1s + + - match: { status: 500 } + - match: { error.type: repository_verification_exception } + - match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" } + - match: { error.root_cause.0.type: receive_timeout_transport_exception } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java index 6ff29a61f61bc..ace60962d2606 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/BlobAnalyzeAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.StreamInput; @@ -188,6 +189,12 @@ protected void doExecute(Task task, Request request, ActionListener li } } + /** + * The atomic write API is based around a {@link BytesReference} which uses {@code int} for lengths and offsets and so on, so we can + * only use it to write a blob with size at most {@link Integer#MAX_VALUE}: + */ + static final long MAX_ATOMIC_WRITE_SIZE = Integer.MAX_VALUE; + /** * Analysis on a single blob, performing the write(s) and orchestrating the read(s). */ @@ -265,10 +272,15 @@ static class BlobAnalysis { } void run() { - writeRandomBlob(request.readEarly || random.nextBoolean(), true, this::doReadBeforeWriteComplete, write1Step); + writeRandomBlob( + request.readEarly || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean()), + true, + this::doReadBeforeWriteComplete, + write1Step + ); if (request.writeAndOverwrite) { - assert request.targetLength <= Integer.MAX_VALUE : "oversized atomic write"; + assert request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write"; write1Step.whenComplete(ignored -> writeRandomBlob(true, false, this::doReadAfterWrite, write2Step), ignored -> {}); } else { write2Step.onResponse(null); @@ -277,7 +289,7 @@ void run() { } private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLastRead, StepListener stepListener) { - assert atomic == false || request.targetLength <= Integer.MAX_VALUE : "oversized atomic write"; + assert atomic == false || request.targetLength <= MAX_ATOMIC_WRITE_SIZE : "oversized atomic write"; final RandomBlobContent content = new RandomBlobContent( request.getRepositoryName(), random.nextLong(), @@ -296,7 +308,7 @@ private void writeRandomBlob(boolean atomic, boolean failIfExists, Runnable onLa // E.g. for S3 blob containers we would like to choose somewhat more randomly between single-part and multi-part uploads, // rather than relying on the usual distinction based on the size of the blob. - if (atomic || (request.targetLength <= Integer.MAX_VALUE && random.nextBoolean())) { + if (atomic || (request.targetLength <= MAX_ATOMIC_WRITE_SIZE && random.nextBoolean())) { final RandomBlobContentBytesReference bytesReference = new RandomBlobContentBytesReference( content, Math.toIntExact(request.getTargetLength()) @@ -613,7 +625,7 @@ public static class Request extends ActionRequest implements TaskAwareRequest { boolean writeAndOverwrite ) { assert 0 < targetLength; - assert targetLength <= Integer.MAX_VALUE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write"; + assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write"; this.repositoryName = repositoryName; this.blobPath = blobPath; this.blobName = blobName; diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java index 2fbc691a7b9cb..0e03e2c9f3aa0 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java @@ -69,6 +69,7 @@ import java.util.function.LongSupplier; import java.util.stream.IntStream; +import static org.elasticsearch.repositories.blobstore.testkit.BlobAnalyzeAction.MAX_ATOMIC_WRITE_SIZE; import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositoryTestKit.humanReadableNanos; /** @@ -451,7 +452,7 @@ public void run() { for (int i = 0; i < request.getBlobCount(); i++) { final long targetLength = blobSizes.get(i); - final boolean smallBlob = targetLength <= Integer.MAX_VALUE; // avoid the atomic API for larger blobs + final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs final VerifyBlobTask verifyBlobTask = new VerifyBlobTask( nodes.get(random.nextInt(nodes.size())), new BlobAnalyzeAction.Request(