diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index d7294cab93844..88927df0563ec 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -64,12 +64,14 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.startsWith; @@ -207,16 +209,18 @@ public void testAbortRequestStats() throws Exception { } public void testRequestStatsWithOperationPurposes() throws IOException { - // The operationPurpose parameter is added but not yet used. This test asserts the new parameter does not change - // the existing stats collection. final String repoName = createRepository(randomRepositoryName()); final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); final BlobStore blobStore = repository.blobStore(); + // Initial stats are collected with the default operation purpose + final Map initialStats = blobStore.stats(); + + // Collect more stats with an operation purpose other than the default + final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values())); final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10)); final BlobContainer blobContainer = blobStore.blobContainer(blobPath); - final OperationPurpose purpose = randomFrom(OperationPurpose.values()); final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000))); blobContainer.writeBlob(purpose, "test.txt", whatToWrite, true); try (InputStream is = blobContainer.readBlob(purpose, "test.txt")) { @@ -224,11 +228,26 @@ public void testRequestStatsWithOperationPurposes() throws IOException { } blobContainer.delete(purpose); - final Map stats = blobStore.stats(); + final Map newStats = blobStore.stats(); + final Map netNewStats = newStats.entrySet().stream().filter(entry -> { + if (initialStats.containsKey(entry.getKey())) { + assertThat(entry.getValue(), equalTo(initialStats.get(entry.getKey()))); + return false; + } else { + return true; + } + }).collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + assertThat( - stats.keySet(), - containsInAnyOrder("GetObject", "ListObjects", "PutObject", "PutMultipartObject", "DeleteObjects", "AbortMultipartObject") + netNewStats.keySet(), + containsInAnyOrder( + "PutObject/" + purpose.getKey(), + "ListObjects/" + purpose.getKey(), + "GetObject/" + purpose.getKey(), + "DeleteObjects/" + purpose.getKey() + ) ); + assertThat(netNewStats.values(), everyItem(greaterThan(0L))); } public void testEnforcedCooldownPeriod() throws IOException { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index c7dee4f1599c5..5dfddfbfd6dad 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -52,6 +52,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; +import org.elasticsearch.repositories.s3.S3BlobStore.Operation; import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; @@ -204,7 +205,7 @@ protected void onCompletion() throws IOException { uploadId.get(), parts ); - complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest)); } } @@ -237,7 +238,7 @@ private UploadPartRequest createPartUploadRequest( uploadRequest.setUploadId(uploadId); uploadRequest.setPartNumber(number); uploadRequest.setInputStream(stream); - uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + uploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); uploadRequest.setPartSize(size); uploadRequest.setLastPart(lastPart); return uploadRequest; @@ -245,7 +246,7 @@ private UploadPartRequest createPartUploadRequest( private void abortMultiPartUpload(OperationPurpose purpose, String uploadId, String blobName) { final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId); - abortRequest.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector); + abortRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose)); try (AmazonS3Reference clientReference = blobStore.clientReference()) { SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); } @@ -255,7 +256,7 @@ private InitiateMultipartUploadRequest initiateMultiPartUpload(OperationPurpose final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName); initRequest.setStorageClass(blobStore.getStorageClass()); initRequest.setCannedACL(blobStore.getCannedACL()); - initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + initRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); if (blobStore.serverSideEncryption()) { final ObjectMetadata md = new ObjectMetadata(); md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); @@ -285,13 +286,13 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException { final ObjectListing list; if (prevListing != null) { final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing); - listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector); + listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest)); } else { final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); listObjectsRequest.setBucketName(blobStore.bucket()); listObjectsRequest.setPrefix(keyPath); - listObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector); + listObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); } final Iterator blobNameIterator = Iterators.map(list.getObjectSummaries().iterator(), summary -> { @@ -374,7 +375,7 @@ private List executeListing( ObjectListing list; if (prevListing != null) { final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing); - listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector); + listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest)); } else { list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); @@ -393,7 +394,7 @@ private ListObjectsRequest listObjectsRequest(OperationPurpose purpose, String p return new ListObjectsRequest().withBucketName(blobStore.bucket()) .withPrefix(pathPrefix) .withDelimiter("/") - .withRequestMetricCollector(blobStore.listMetricCollector); + .withRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); } // exposed for tests @@ -428,7 +429,7 @@ void executeSingleUpload( final PutObjectRequest putRequest = new PutObjectRequest(s3BlobStore.bucket(), blobName, input, md); putRequest.setStorageClass(s3BlobStore.getStorageClass()); putRequest.setCannedAcl(s3BlobStore.getCannedACL()); - putRequest.setRequestMetricCollector(s3BlobStore.putMetricCollector); + putRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_OBJECT, purpose)); try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) { SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); }); @@ -506,7 +507,7 @@ void executeMultipartUpload( uploadId.get(), parts ); - complRequest.setRequestMetricCollector(s3BlobStore.multiPartUploadMetricCollector); + complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)); SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest)); success = true; @@ -582,7 +583,7 @@ private class CompareAndExchangeOperation { private List listMultipartUploads() { final var listRequest = new ListMultipartUploadsRequest(bucket); listRequest.setPrefix(blobKey); - listRequest.setRequestMetricCollector(blobStore.listMetricCollector); + listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose)); try { return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads(); } catch (AmazonS3Exception e) { @@ -670,7 +671,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener client.initiateMultipartUpload(initiateRequest)).getUploadId(); final var uploadPartRequest = new UploadPartRequest(); @@ -681,7 +682,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener client.uploadPart(uploadPartRequest)).getPartETag(); final var currentUploads = listMultipartUploads(); @@ -715,7 +716,9 @@ void run(BytesReference expected, BytesReference updated, ActionListener client.completeMultipartUpload(completeMultipartUploadRequest)); isComplete.set(true); } @@ -774,7 +777,7 @@ private void safeAbortMultipartUpload(String uploadId) { private void abortMultipartUploadIfExists(String uploadId) { try { final var request = new AbortMultipartUploadRequest(bucket, blobKey, uploadId); - request.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector); + request.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose)); SocketAccess.doPrivilegedVoid(() -> client.abortMultipartUpload(request)); } catch (AmazonS3Exception e) { if (e.getStatusCode() != 404) { @@ -815,7 +818,7 @@ public void compareAndExchangeRegister( public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { ActionListener.completeWith(listener, () -> { final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key)); - getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector); + getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose)); try ( var clientReference = blobStore.clientReference(); var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest)); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index fca005e8de32c..e03f6eff59944 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -30,19 +30,22 @@ import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; @@ -75,14 +78,7 @@ class S3BlobStore implements BlobStore { private final ThreadPool threadPool; private final Executor snapshotExecutor; - private final Stats stats = new Stats(); - - final RequestMetricCollector getMetricCollector; - final RequestMetricCollector listMetricCollector; - final RequestMetricCollector putMetricCollector; - final RequestMetricCollector multiPartUploadMetricCollector; - final RequestMetricCollector deleteMetricCollector; - final RequestMetricCollector abortPartUploadMetricCollector; + private final StatsCollectors statsCollectors = new StatsCollectors(); S3BlobStore( S3Service service, @@ -105,48 +101,10 @@ class S3BlobStore implements BlobStore { this.repositoryMetadata = repositoryMetadata; this.threadPool = threadPool; this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - this.getMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("GET"); - stats.getCount.addAndGet(getRequestCount(request)); - } - }; - this.listMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("GET"); - stats.listCount.addAndGet(getRequestCount(request)); - } - }; - this.putMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("PUT"); - stats.putCount.addAndGet(getRequestCount(request)); - } - }; - this.multiPartUploadMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("PUT") || request.getHttpMethod().name().equals("POST"); - stats.postCount.addAndGet(getRequestCount(request)); - } - }; - this.deleteMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("POST"); - stats.deleteCount.addAndGet(getRequestCount(request)); - } - }; - this.abortPartUploadMetricCollector = new IgnoreNoResponseMetricsCollector() { - @Override - public void collectMetrics(Request request) { - assert request.getHttpMethod().name().equals("DELETE"); - stats.abortCount.addAndGet(getRequestCount(request)); - } - }; + } + + RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { + return statsCollectors.getMetricCollector(operation, purpose); } public Executor getSnapshotExecutor() { @@ -161,6 +119,8 @@ public TimeValue getCompareAndExchangeTimeToLive() { // issue private abstract static class IgnoreNoResponseMetricsCollector extends RequestMetricCollector { + protected final AtomicLong counter = new AtomicLong(); + @Override public final void collectMetrics(Request request, Response response) { if (response != null) { @@ -273,7 +233,7 @@ private void deletePartition( private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobStore blobStore, List blobs) { return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)) .withQuiet(true) - .withRequestMetricCollector(blobStore.deleteMetricCollector); + .withRequestMetricCollector(blobStore.getMetricCollector(Operation.DELETE_OBJECTS, purpose)); } @Override @@ -283,7 +243,7 @@ public void close() throws IOException { @Override public Map stats() { - return stats.toMap(); + return statsCollectors.statsMap(); } public CannedAccessControlList getCannedACL() { @@ -332,29 +292,83 @@ ThreadPool getThreadPool() { return threadPool; } - static class Stats { + enum Operation { + GET_OBJECT("GetObject"), + LIST_OBJECTS("ListObjects"), + PUT_OBJECT("PutObject"), + PUT_MULTIPART_OBJECT("PutMultipartObject"), + DELETE_OBJECTS("DeleteObjects"), + ABORT_MULTIPART_OBJECT("AbortMultipartObject"); - final AtomicLong listCount = new AtomicLong(); + private final String key; - final AtomicLong getCount = new AtomicLong(); + Operation(String key) { + this.key = key; + } + } - final AtomicLong putCount = new AtomicLong(); + class StatsCollectors { + final Map collectors; + + StatsCollectors() { + this.collectors = new ConcurrentHashMap<>( + Stream.of( + Operation.GET_OBJECT, + Operation.LIST_OBJECTS, + Operation.PUT_OBJECT, + Operation.PUT_MULTIPART_OBJECT, + Operation.DELETE_OBJECTS, + Operation.ABORT_MULTIPART_OBJECT + ).collect(Collectors.toUnmodifiableMap(ops -> ops.key, this::buildMetricCollector)) + ); + } - final AtomicLong postCount = new AtomicLong(); + RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { + final String statsKey; + if (purpose == OperationPurpose.SNAPSHOT) { + statsKey = operation.key; + } else { + statsKey = operation.key + "/" + purpose.getKey(); + if (false == collectors.containsKey(statsKey)) { + collectors.putIfAbsent(statsKey, buildMetricCollector(operation)); + } + } + return collectors.get(statsKey); + } - final AtomicLong deleteCount = new AtomicLong(); + Map statsMap() { + return Maps.transformValues(collectors, v -> v.counter.get()); + } - final AtomicLong abortCount = new AtomicLong(); + IgnoreNoResponseMetricsCollector buildMetricCollector(Operation operation) { + return new IgnoreNoResponseMetricsCollector() { + @Override + public void collectMetrics(Request request) { + assert assertConsistencyBetweenHttpRequestAndOperation(request, operation); + counter.addAndGet(getRequestCount(request)); + } + }; + } - Map toMap() { - final Map results = new HashMap<>(); - results.put("GetObject", getCount.get()); - results.put("ListObjects", listCount.get()); - results.put("PutObject", putCount.get()); - results.put("PutMultipartObject", postCount.get()); - results.put("DeleteObjects", deleteCount.get()); - results.put("AbortMultipartObject", abortCount.get()); - return results; + private boolean assertConsistencyBetweenHttpRequestAndOperation(Request request, Operation operation) { + switch (operation) { + case GET_OBJECT, LIST_OBJECTS -> { + return request.getHttpMethod().name().equals("GET"); + } + case PUT_OBJECT -> { + return request.getHttpMethod().name().equals("PUT"); + } + case PUT_MULTIPART_OBJECT -> { + return request.getHttpMethod().name().equals("PUT") || request.getHttpMethod().name().equals("POST"); + } + case DELETE_OBJECTS -> { + return request.getHttpMethod().name().equals("POST"); + } + case ABORT_MULTIPART_OBJECT -> { + return request.getHttpMethod().name().equals("DELETE"); + } + default -> throw new AssertionError("unknown operation [" + operation + "]"); + } } } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java index d796eb49e7bcb..6cad60f32de47 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RetryingInputStream.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.repositories.s3.S3BlobStore.Operation; import java.io.IOException; import java.io.InputStream; @@ -82,7 +83,7 @@ class S3RetryingInputStream extends InputStream { private void openStream() throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey); - getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector); + getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose)); if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { assert start + currentOffset <= end : "requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;