Skip to content

Commit

Permalink
Respect operation purpose for s3 stats collection
Browse files Browse the repository at this point in the history
A new no-op OperationPurpose parameter is added in elastic#99615 to all blob
store/container operation method. This PR updates the s3 stats
collection code to actually use this parameter for finer grained stats
collection and reports.

Stats are reported per combination of operation and operation purpose. A
sample output is as the follows:

```
{
  "ListObjects": 2,
  "GetObject": 1,
  "PutObject": 2,
  "PutMultipartObject": 0,
  "AbortMultipartObject": 0,
  "DeleteObjects": 1,
  "GetObject/ClusterState": 1,
  "PutObject/ClusterState": 1,
  "DeleteObjects/Translog": 1,
  "ListObjects/Indices": 1
}
```

The changes are made with BWC in mind, i.e. existing stats reports with
default operation purpose will remain unchanged. For an example, the key
"ListObjects" is equivalent "ListObjects/Snapshot". But we omit the
default purpose in the stats key so that it is backwards compatible.

Relates: elastic#99615
Relates: ES-6800
  • Loading branch information
ywangd committed Oct 3, 2023
1 parent 0a31ce6 commit cba3644
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -207,28 +209,45 @@ public void testAbortRequestStats() throws Exception {
}

public void testRequestStatsWithOperationPurposes() throws IOException {
// The operationPurpose parameter is added but not yet used. This test asserts the new parameter does not change
// the existing stats collection.
final String repoName = createRepository(randomRepositoryName());
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final BlobStore blobStore = repository.blobStore();

// Initial stats are collected with the default operation purpose
final Map<String, Long> initialStats = blobStore.stats();

// Collect more stats with an operation purpose other than the default
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values()));
final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
final OperationPurpose purpose = randomFrom(OperationPurpose.values());
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
blobContainer.writeBlob(purpose, "test.txt", whatToWrite, true);
try (InputStream is = blobContainer.readBlob(purpose, "test.txt")) {
is.readAllBytes();
}
blobContainer.delete(purpose);

final Map<String, Long> stats = blobStore.stats();
final Map<String, Long> newStats = blobStore.stats();
final Map<String, Long> netNewStats = newStats.entrySet().stream().filter(entry -> {
if (initialStats.containsKey(entry.getKey())) {
assertThat(entry.getValue(), equalTo(initialStats.get(entry.getKey())));
return false;
} else {
return true;
}
}).collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

assertThat(
stats.keySet(),
containsInAnyOrder("GetObject", "ListObjects", "PutObject", "PutMultipartObject", "DeleteObjects", "AbortMultipartObject")
netNewStats.keySet(),
containsInAnyOrder(
"PutObject/" + purpose.getKey(),
"ListObjects/" + purpose.getKey(),
"GetObject/" + purpose.getKey(),
"DeleteObjects/" + purpose.getKey()
)
);
assertThat(netNewStats.values(), everyItem(greaterThan(0L)));
}

public void testEnforcedCooldownPeriod() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -204,7 +205,7 @@ protected void onCompletion() throws IOException {
uploadId.get(),
parts
);
complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
}
}
Expand Down Expand Up @@ -237,15 +238,15 @@ private UploadPartRequest createPartUploadRequest(
uploadRequest.setUploadId(uploadId);
uploadRequest.setPartNumber(number);
uploadRequest.setInputStream(stream);
uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
uploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
uploadRequest.setPartSize(size);
uploadRequest.setLastPart(lastPart);
return uploadRequest;
}

private void abortMultiPartUpload(OperationPurpose purpose, String uploadId, String blobName) {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId);
abortRequest.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector);
abortRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose));
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
}
Expand All @@ -255,7 +256,7 @@ private InitiateMultipartUploadRequest initiateMultiPartUpload(OperationPurpose
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName);
initRequest.setStorageClass(blobStore.getStorageClass());
initRequest.setCannedACL(blobStore.getCannedACL());
initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
initRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
if (blobStore.serverSideEncryption()) {
final ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Expand Down Expand Up @@ -285,13 +286,13 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {
final ObjectListing list;
if (prevListing != null) {
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
listObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final Iterator<String> blobNameIterator = Iterators.map(list.getObjectSummaries().iterator(), summary -> {
Expand Down Expand Up @@ -374,7 +375,7 @@ private List<ObjectListing> executeListing(
ObjectListing list;
if (prevListing != null) {
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
} else {
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
Expand All @@ -393,7 +394,7 @@ private ListObjectsRequest listObjectsRequest(OperationPurpose purpose, String p
return new ListObjectsRequest().withBucketName(blobStore.bucket())
.withPrefix(pathPrefix)
.withDelimiter("/")
.withRequestMetricCollector(blobStore.listMetricCollector);
.withRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
}

// exposed for tests
Expand Down Expand Up @@ -428,7 +429,7 @@ void executeSingleUpload(
final PutObjectRequest putRequest = new PutObjectRequest(s3BlobStore.bucket(), blobName, input, md);
putRequest.setStorageClass(s3BlobStore.getStorageClass());
putRequest.setCannedAcl(s3BlobStore.getCannedACL());
putRequest.setRequestMetricCollector(s3BlobStore.putMetricCollector);
putRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_OBJECT, purpose));

try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); });
Expand Down Expand Up @@ -506,7 +507,7 @@ void executeMultipartUpload(
uploadId.get(),
parts
);
complRequest.setRequestMetricCollector(s3BlobStore.multiPartUploadMetricCollector);
complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
success = true;

Expand Down Expand Up @@ -582,7 +583,7 @@ private class CompareAndExchangeOperation {
private List<MultipartUpload> listMultipartUploads() {
final var listRequest = new ListMultipartUploadsRequest(bucket);
listRequest.setPrefix(blobKey);
listRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
try {
return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
} catch (AmazonS3Exception e) {
Expand Down Expand Up @@ -670,7 +671,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
}

final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
initiateRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var uploadId = SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();

final var uploadPartRequest = new UploadPartRequest();
Expand All @@ -681,7 +682,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
uploadPartRequest.setLastPart(true);
uploadPartRequest.setInputStream(updated.streamInput());
uploadPartRequest.setPartSize(updated.length());
uploadPartRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var partETag = SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();

final var currentUploads = listMultipartUploads();
Expand Down Expand Up @@ -715,7 +716,9 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
uploadId,
List.of(partETag)
);
completeMultipartUploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
completeMultipartUploadRequest.setRequestMetricCollector(
blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)
);
SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
isComplete.set(true);
}
Expand Down Expand Up @@ -774,7 +777,7 @@ private void safeAbortMultipartUpload(String uploadId) {
private void abortMultipartUploadIfExists(String uploadId) {
try {
final var request = new AbortMultipartUploadRequest(bucket, blobKey, uploadId);
request.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector);
request.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> client.abortMultipartUpload(request));
} catch (AmazonS3Exception e) {
if (e.getStatusCode() != 404) {
Expand Down Expand Up @@ -815,7 +818,7 @@ public void compareAndExchangeRegister(
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
ActionListener.completeWith(listener, () -> {
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector);
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose));
try (
var clientReference = blobStore.clientReference();
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
Expand Down
Loading

0 comments on commit cba3644

Please sign in to comment.