Skip to content

Commit

Permalink
chore: add s3
Browse files Browse the repository at this point in the history
  • Loading branch information
pipinet committed Jan 24, 2024
1 parent b8d6f47 commit c717b46
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 25 deletions.
182 changes: 177 additions & 5 deletions storage-s3/src/main/java/com/qwlabs/storage/s3/CustomS3Client.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
package com.qwlabs.storage.s3;

import com.qwlabs.storage.exceptions.StorageException;
import com.qwlabs.storage.models.StorageObject;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListPartsRequest;
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.Part;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import software.amazon.awssdk.services.s3.presigner.model.PresignedPutObjectRequest;
import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest;
import software.amazon.awssdk.services.s3.presigner.model.UploadPartPresignRequest;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;

@Slf4j
public class CustomS3Client {
private static final int UPLOAD_EXPIRES_IN_DAYS = 1;
private static final int DOWNLOAD_EXPIRES_IN_DAYS = 1;
private final S3Client syncClient;
private final S3AsyncClient asyncClient;
private final S3Presigner presigner;
private static final int MAX_PART_COUNT = 1000;

public CustomS3Client(S3Client syncClient, S3AsyncClient asyncClient, S3Presigner presigner) {
this.syncClient = syncClient;
Expand All @@ -28,7 +54,7 @@ public CustomS3Client(S3Client syncClient, S3AsyncClient asyncClient, S3Presigne

public boolean exist(String bucket, String objectName) {
var request = HeadObjectRequest.builder()
.bucket(bucket).key(objectName).build();
.bucket(bucket).key(objectName).build();
try {
var response = syncClient.headObject(request);
return response.contentLength() > 0;
Expand All @@ -41,17 +67,163 @@ public boolean exist(String bucket, String objectName) {
}
}

protected String createUploadId(String bucket, String objectName, String contentType) {
try {
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.contentType(contentType)
.key(objectName)
.build();
var response = asyncClient.createMultipartUpload(request);
return response.get().uploadId();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Can not create upload id.", e);
throw new StorageException("Can not create upload id.", e);
}
}

protected String createUploadUrl(String bucket, String objectName) {
try {
PutObjectRequest objectRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.build();
PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder()
.signatureDuration(Duration.ofDays(UPLOAD_EXPIRES_IN_DAYS))
.putObjectRequest(objectRequest)
.build();
PresignedPutObjectRequest presignedRequest = presigner.presignPutObject(presignRequest);
return presignedRequest.url().toString();
} catch (S3Exception e) {
throw new StorageException("Can not create upload url.", e);
}
}

protected String createUploadUrl(String bucket, String objectName, String uploadId, Integer partNumber) {
try {
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(objectName)
.uploadId(uploadId)
.partNumber(partNumber)
.build();
UploadPartPresignRequest uploadPartPresignRequest = UploadPartPresignRequest.builder()
.signatureDuration(Duration.ofDays(UPLOAD_EXPIRES_IN_DAYS))
.uploadPartRequest(uploadPartRequest)
.build();
var presignedRequest = presigner.presignUploadPart(uploadPartPresignRequest);
return presignedRequest.url().toString();
} catch (S3Exception e) {
throw new StorageException("Can not create upload url.", e);
}
}

public String createDownloadUrl(String bucket, String objectName) {
var getRequest = GetObjectRequest.builder().bucket(bucket).key(objectName).build();
var request = GetObjectPresignRequest.builder()
.getObjectRequest(getRequest)
.signatureDuration(Duration.ofDays(DOWNLOAD_EXPIRES_IN_DAYS))
.build();
.getObjectRequest(getRequest)
.signatureDuration(Duration.ofDays(DOWNLOAD_EXPIRES_IN_DAYS))
.build();
try {
return this.presigner.presignGetObject(request).url().toString();
} catch (Exception e) {
} catch (S3Exception e) {
LOGGER.error("Can not create upload url.", e);
throw new StorageException("Can not create upload url.", e);
}
}

public boolean bucketExists(String bucket) {
HeadBucketRequest request = HeadBucketRequest.builder().bucket(bucket).build();
try {
syncClient.headBucket(request);
return true;
} catch (S3Exception e) {
LOGGER.error("Can not check object exist.", e);
return false;

}
}

public void makeBucket(String bucket) {
CreateBucketRequest request = CreateBucketRequest.builder()
.bucket(bucket)
.build();
try {
syncClient.createBucket(request);
} catch (S3Exception e) {
LOGGER.error("Can not make bucket.", e);
throw new StorageException("Can not make bucket.", e);
}
}

public ListPartsResponse listParts(String bucket, String objectName, String uploadId) {
try {
ListPartsRequest request = ListPartsRequest.builder()
.bucket(bucket)
.key(objectName)
.uploadId(uploadId)
.maxParts(MAX_PART_COUNT)
.build();
return syncClient.listParts(request);
} catch (S3Exception e) {
LOGGER.error("Can not list multipart uploads.", e);
throw new StorageException("Can not list multipart uploads.", e);
}
}

public CompleteMultipartUploadResponse completeUpload(String bucket,
String objectName,
String uploadId,
List<Part> parts) {
List<CompletedPart> completedParts = parts.stream().map(
part -> CompletedPart.builder()
.eTag(part.eTag())
.partNumber(part.partNumber())
.build()
).toList();
try {
CompletedMultipartUpload upload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(objectName)
.multipartUpload(upload)
.uploadId(uploadId)
.build();
return syncClient.completeMultipartUpload(request);
} catch (S3Exception e) {
LOGGER.error("Can not complete multipart upload.", e);
throw new StorageException("Can not complete multipart upload.", e);
}
}

public ResponseInputStream<GetObjectResponse> getObject(String bucket, String objectName) {
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.build();
return syncClient.getObject(request);
} catch (S3Exception e) {
LOGGER.error("Can not get object", e);
throw new StorageException("Can not get object", e);
}
}

public StorageObject putObject(String bucket, String objectName, InputStream stream) {
try {
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucket)
.key(objectName)
.build();
RequestBody requestBody = RequestBody.fromInputStream(stream, stream.available());
syncClient.putObject(request, requestBody);
return StorageObject.of(bucket, objectName);
} catch (S3Exception | IOException e) {
LOGGER.error("Can not get object", e);
throw new StorageException("Can not get object", e);
}
}

}
3 changes: 3 additions & 0 deletions storage-s3/src/main/java/com/qwlabs/storage/s3/S3Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public class S3Config {
private final String secretKey;

public void validate() {
if (S2.isBlank(region)) {
throw StorageMessages.INSTANCE.lostConfig("region");
}
if (S2.isBlank(url)) {
throw StorageMessages.INSTANCE.lostConfig("url");
}
Expand Down
63 changes: 48 additions & 15 deletions storage-s3/src/main/java/com/qwlabs/storage/s3/S3StorageEngine.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.qwlabs.storage.s3;


import com.google.common.collect.Lists;
import com.qwlabs.cdi.dispatch.Dispatchable;
import com.qwlabs.storage.messages.StorageMessages;
import com.qwlabs.storage.models.CompleteUploadCommand;
import com.qwlabs.storage.models.GetDownloadUrlCommand;
import com.qwlabs.storage.models.GetObjectCommand;
Expand All @@ -12,12 +14,13 @@
import com.qwlabs.storage.models.UploadUrl;
import com.qwlabs.storage.models.UploadUrls;
import com.qwlabs.storage.services.StorageEngine;
import jakarta.annotation.Nullable;
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
import software.amazon.awssdk.services.s3.model.Part;

import java.io.InputStream;
import java.util.List;
import java.util.Objects;


public class S3StorageEngine implements StorageEngine, Dispatchable<String> {
private static final String PROVIDER = "s3";
protected final CustomS3Client s3Client;
Expand All @@ -28,41 +31,71 @@ public S3StorageEngine(CustomS3Client s3Client) {

@Override
public UploadUrl createUploadUrl(GetUploadUrlCommand command) {
return StorageEngine.super.createUploadUrl(command);
setupBucket(command.getBucket());
var url = s3Client.createUploadUrl(command.getBucket(), command.getObjectName());
return UploadUrl.builder()
.provider(command.getProvider())
.bucket(command.getBucket())
.objectName(command.getObjectName())
.url(url)
.build();
}

@Override
public UploadUrls createUploadUrls(GetUploadUrlsCommand command) {
return StorageEngine.super.createUploadUrls(command);
setupBucket(command.getBucket());
String uploadId = s3Client.createUploadId(command.getBucket(),
command.getObjectName(), command.getContentType());
List<String> urls = Lists.newArrayList();
for (int partNumber = 1; partNumber <= command.getPartCount(); partNumber++) {
urls.add(s3Client.createUploadUrl(command.getBucket(), command.getObjectName(), uploadId, partNumber));
}
return UploadUrls.builder()
.provider(command.getProvider())
.bucket(command.getBucket())
.objectName(command.getObjectName())
.uploadId(uploadId)
.urls(urls)
.build();
}

@Override
public String completeUpload(CompleteUploadCommand command) {
return StorageEngine.super.completeUpload(command);
ListPartsResponse result = s3Client.listParts(command.getBucket(), command.getObjectName(), command.getUploadId());
if (result.parts().size() != command.getPartCount()) {
throw StorageMessages.INSTANCE.invalidPartCount(command.getPartCount(),
result.parts().size());
}
List<Part> parts = result.parts();
s3Client.completeUpload(command.getBucket(), command.getObjectName(), command.getUploadId(), parts);
return s3Client.createDownloadUrl(command.getBucket(), command.getObjectName());
}


@Override
public InputStream getObject(GetObjectCommand command) {
return StorageEngine.super.getObject(command);
public String getDownloadUrl(GetDownloadUrlCommand command) {
return s3Client.createDownloadUrl(command.getBucket(), command.getObjectName());
}

@Override
public StorageObject putObject(PutObjectCommand command) {
return StorageEngine.super.putObject(command);
public InputStream getObject(GetObjectCommand command) {
return s3Client.getObject(command.getBucket(), command.getObjectName());
}

@Override
public String putObjectForUrl(PutObjectCommand command) {
return StorageEngine.super.putObjectForUrl(command);
public StorageObject putObject(PutObjectCommand command) {
setupBucket(command.getBucket());
return s3Client.putObject(command.getBucket(), command.getObjectName(), command.getInputStream());
}

@Override
public String getDownloadUrl(GetDownloadUrlCommand command) {
return this.s3Client.createDownloadUrl(command.getBucket(), command.getObjectName());
private void setupBucket(String bucket) {
if (!s3Client.bucketExists(bucket)) {
s3Client.makeBucket(bucket);
}
}

@Override
public boolean dispatchable(@Nullable String context) {
public boolean dispatchable(String context) {
return Objects.equals(PROVIDER, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ private S3StorageHelper() {

public static CustomS3Client createS3Client(S3Config config) {
return new CustomS3Client(
createS3SyncClient(config),
createS3AsyncClient(config),
createS3Presigner(config)
createS3SyncClient(config),
createS3AsyncClient(config),
createS3Presigner(config)
);
}

Expand All @@ -47,7 +47,7 @@ private static void configure(S3Presigner.Builder builder,
S3Config config) {
config.validate();
builder.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
if (!S2.isBlank(config.getRegion())) {
builder.region(Region.of(config.getRegion()));
}
Expand All @@ -58,7 +58,7 @@ private static void configure(S3BaseClientBuilder builder,
S3Config config) {
config.validate();
builder.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
if (!S2.isBlank(config.getRegion())) {
builder.region(Region.of(config.getRegion()));
}
Expand Down

0 comments on commit c717b46

Please sign in to comment.