Skip to content

Commit

Permalink
Record operation purpose for s3 stats collection (elastic#100236)
Browse files Browse the repository at this point in the history
A new no-op OperationPurpose parameter is added in elastic#99615 to all blob
store/container operation method. This PR updates the s3 stats
collection code to actually use this parameter for finer grained stats
collection and reports. This differentiation between purposes are kept 
internally for now. The stats are currently aggregated over operations for 
existing stats reporting. This means responses from both 
GetRepositoriesMetering API and GetBlobStoreStats API will not be changed. 
We will have follow-ups to expose the finer stats separately.

Relates: elastic#99615
Relates: ES-6800
  • Loading branch information
ywangd authored Oct 9, 2023
1 parent dfaec0d commit a4db40d
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 99 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100236.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100236
summary: Record operation purpose for s3 stats collection
area: Distributed
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,23 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
Expand Down Expand Up @@ -207,28 +212,58 @@ public void testAbortRequestStats() throws Exception {
}

public void testRequestStatsWithOperationPurposes() throws IOException {
// The operationPurpose parameter is added but not yet used. This test asserts the new parameter does not change
// the existing stats collection.
final String repoName = createRepository(randomRepositoryName());
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final BlobStore blobStore = repository.blobStore();

assertThat(blobStore, instanceOf(BlobStoreWrapper.class));
final BlobStore delegateBlobStore = ((BlobStoreWrapper) blobStore).delegate();
assertThat(delegateBlobStore, instanceOf(S3BlobStore.class));
final S3BlobStore.StatsCollectors statsCollectors = ((S3BlobStore) delegateBlobStore).getStatsCollectors();

// Initial stats are collected with the default operation purpose
final Set<String> allOperations = EnumSet.allOf(S3BlobStore.Operation.class)
.stream()
.map(S3BlobStore.Operation::getKey)
.collect(Collectors.toUnmodifiableSet());
statsCollectors.collectors.keySet().forEach(statsKey -> assertThat(statsKey.purpose(), is(OperationPurpose.SNAPSHOT)));
final Map<String, Long> initialStats = blobStore.stats();
assertThat(initialStats.keySet(), equalTo(allOperations));

// Collect more stats with an operation purpose other than the default
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values()));
final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
final OperationPurpose purpose = randomFrom(OperationPurpose.values());
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
blobContainer.writeBlob(purpose, "test.txt", whatToWrite, true);
try (InputStream is = blobContainer.readBlob(purpose, "test.txt")) {
is.readAllBytes();
}
blobContainer.delete(purpose);

final Map<String, Long> stats = blobStore.stats();
// Internal stats collection is fine-grained and records different purposes
assertThat(
stats.keySet(),
containsInAnyOrder("GetObject", "ListObjects", "PutObject", "PutMultipartObject", "DeleteObjects", "AbortMultipartObject")
statsCollectors.collectors.keySet().stream().map(S3BlobStore.StatsKey::purpose).collect(Collectors.toUnmodifiableSet()),
equalTo(Set.of(OperationPurpose.SNAPSHOT, purpose))
);
// The stats report aggregates over different purposes
final Map<String, Long> newStats = blobStore.stats();
assertThat(newStats.keySet(), equalTo(allOperations));
assertThat(newStats, not(equalTo(initialStats)));

final Set<String> operationsSeenForTheNewPurpose = statsCollectors.collectors.keySet()
.stream()
.filter(sk -> sk.purpose() != OperationPurpose.SNAPSHOT)
.map(sk -> sk.operation().getKey())
.collect(Collectors.toUnmodifiableSet());

newStats.forEach((k, v) -> {
if (operationsSeenForTheNewPurpose.contains(k)) {
assertThat(newStats.get(k), greaterThan(initialStats.get(k)));
} else {
assertThat(newStats.get(k), equalTo(initialStats.get(k)));
}
});
}

public void testEnforcedCooldownPeriod() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -204,7 +205,7 @@ protected void onCompletion() throws IOException {
uploadId.get(),
parts
);
complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
}
}
Expand Down Expand Up @@ -237,15 +238,15 @@ private UploadPartRequest createPartUploadRequest(
uploadRequest.setUploadId(uploadId);
uploadRequest.setPartNumber(number);
uploadRequest.setInputStream(stream);
uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
uploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
uploadRequest.setPartSize(size);
uploadRequest.setLastPart(lastPart);
return uploadRequest;
}

private void abortMultiPartUpload(OperationPurpose purpose, String uploadId, String blobName) {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId);
abortRequest.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector);
abortRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose));
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
}
Expand All @@ -255,7 +256,7 @@ private InitiateMultipartUploadRequest initiateMultiPartUpload(OperationPurpose
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName);
initRequest.setStorageClass(blobStore.getStorageClass());
initRequest.setCannedACL(blobStore.getCannedACL());
initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
initRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
if (blobStore.serverSideEncryption()) {
final ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Expand Down Expand Up @@ -285,13 +286,13 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {
final ObjectListing list;
if (prevListing != null) {
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
listObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final Iterator<String> blobNameIterator = Iterators.map(list.getObjectSummaries().iterator(), summary -> {
Expand Down Expand Up @@ -374,7 +375,7 @@ private List<ObjectListing> executeListing(
ObjectListing list;
if (prevListing != null) {
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listNextBatchOfObjectsRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
} else {
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
Expand All @@ -393,7 +394,7 @@ private ListObjectsRequest listObjectsRequest(OperationPurpose purpose, String p
return new ListObjectsRequest().withBucketName(blobStore.bucket())
.withPrefix(pathPrefix)
.withDelimiter("/")
.withRequestMetricCollector(blobStore.listMetricCollector);
.withRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
}

// exposed for tests
Expand Down Expand Up @@ -428,7 +429,7 @@ void executeSingleUpload(
final PutObjectRequest putRequest = new PutObjectRequest(s3BlobStore.bucket(), blobName, input, md);
putRequest.setStorageClass(s3BlobStore.getStorageClass());
putRequest.setCannedAcl(s3BlobStore.getCannedACL());
putRequest.setRequestMetricCollector(s3BlobStore.putMetricCollector);
putRequest.setRequestMetricCollector(s3BlobStore.getMetricCollector(Operation.PUT_OBJECT, purpose));

try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); });
Expand Down Expand Up @@ -506,7 +507,7 @@ void executeMultipartUpload(
uploadId.get(),
parts
);
complRequest.setRequestMetricCollector(s3BlobStore.multiPartUploadMetricCollector);
complRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
success = true;

Expand Down Expand Up @@ -582,7 +583,7 @@ private class CompareAndExchangeOperation {
private List<MultipartUpload> listMultipartUploads() {
final var listRequest = new ListMultipartUploadsRequest(bucket);
listRequest.setPrefix(blobKey);
listRequest.setRequestMetricCollector(blobStore.listMetricCollector);
listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
try {
return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
} catch (AmazonS3Exception e) {
Expand Down Expand Up @@ -670,7 +671,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
}

final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
initiateRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var uploadId = SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();

final var uploadPartRequest = new UploadPartRequest();
Expand All @@ -681,7 +682,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
uploadPartRequest.setLastPart(true);
uploadPartRequest.setInputStream(updated.streamInput());
uploadPartRequest.setPartSize(updated.length());
uploadPartRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var partETag = SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();

final var currentUploads = listMultipartUploads();
Expand Down Expand Up @@ -715,7 +716,9 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
uploadId,
List.of(partETag)
);
completeMultipartUploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
completeMultipartUploadRequest.setRequestMetricCollector(
blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)
);
SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
isComplete.set(true);
}
Expand Down Expand Up @@ -774,7 +777,7 @@ private void safeAbortMultipartUpload(String uploadId) {
private void abortMultipartUploadIfExists(String uploadId) {
try {
final var request = new AbortMultipartUploadRequest(bucket, blobKey, uploadId);
request.setRequestMetricCollector(blobStore.abortPartUploadMetricCollector);
request.setRequestMetricCollector(blobStore.getMetricCollector(Operation.ABORT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> client.abortMultipartUpload(request));
} catch (AmazonS3Exception e) {
if (e.getStatusCode() != 404) {
Expand Down Expand Up @@ -815,7 +818,7 @@ public void compareAndExchangeRegister(
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
ActionListener.completeWith(listener, () -> {
final var getObjectRequest = new GetObjectRequest(blobStore.bucket(), buildKey(key));
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector);
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.GET_OBJECT, purpose));
try (
var clientReference = blobStore.clientReference();
var s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
Expand Down
Loading

0 comments on commit a4db40d

Please sign in to comment.