Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Add MultiPartUpload and ListParts APIs #2730

Merged
merged 6 commits into from
Oct 1, 2021

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Sep 15, 2021

This PR adds https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html and https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html to the S3 API with the ultimate aim of being able to automatically resume a previously aborted S3 multipart upload with the same given bucket/key in the same way that GCStorage.resumableUpload works

The implementation of S3.resumableUpload still needs to be done using the rough logic outlined in https://stackoverflow.com/questions/53764876/resume-s3-multipart-upload-partetag. The basic implementation of S3.resumableUpload is as follows

  1. Firstly check if there have been any previously aborted/cancelled multipart uploads for the same given key/bucket by using S3.listMultipartUpload. If not then just call already existing S3.multipartUpload otherwise
  2. With the results of S3.listMultipartUpload call S3.listPart in order to retrieve the latest eTag/partNumber ?
  3. Call S3.multipartUpload with that given eTag/partNumber

As an additional note I have generalized the ListBucketState to work with any arbitrary type rather than just String by renaming it to S3PaginationState and allowing it to accept a type parameter, this change was put into its own commit so that its clear. This is a necessary because the added API calls either have a different type for a continuationToken (i.e. Int instead of String) or the continuationToken requires multiples tokens rather than just a single one.

@ennru I am creating this PR prematurely as a draft since it ended up being quite big and I want someone else to have a look at it to see if I am on the right track. I have commented on specific parts of the PR just to clarify things

. Here is a checklist of the things that need to be done

  • Add list multipart uploads to the S3 API
  • Add list parts to the S3 API
  • Add a test for multipart uploads
  • Add a test for the list part uploads
  • Handle issue where AWSIntegration tests work locally against a real S3 but don't work on Alpakka's Travis's CI
  • Add AbortMultipartUpload (i.e. https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) so that I can cleanup after test runs
  • [ ] Add a S3.resumableUpload function to the S3 API
  • [ ] Add a test for the S3.resumableUpload
  • Make S3Stream.completeMultipartUpload public with a proper Scala/Java API to allow users to manually complete a multipart upload on their will
  • Add S3.resumeMultipartUpload so that it accepts an optional sequence of partNumber/etag along with an uploadId parameter that lets you resume an upload from a given arbitrary part. This will allow you to manually resume a multipart upload, its up to you retrieve these partNumbers/etags.
  • Test the adjustment to S3.multipartUpload

@@ -114,6 +115,244 @@ object MultipartUploadResult {
)
}

final class AWSIdentity private (val id: String, val displayName: String) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you have better suggestions for AWSIdentity, basically this same datastructure is used in many places where you want to identify who or what initiated the request and owns the entity in question

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

@mdedetrich mdedetrich marked this pull request as draft September 15, 2021 11:31

case object Starting extends ListBucketState
final case class Starting[T]() extends S3PaginationState[T]
Copy link
Contributor Author

@mdedetrich mdedetrich Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally both Starting and Finished should be case objects however I didn't manage to get type inference to work nicely by doing case object Starting extends S3PaginationState[Nothing] so I ended up converting them to a case class that has a zero parameters since thats the only way to propagate the type T across.

Maybe there is a nicer solution for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not fight for it. It is internal API anyway.

@mdedetrich mdedetrich changed the title Add listuploadmultipart Add MultiPartUplaod and ListParts APIs Sep 15, 2021
@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch 12 times, most recently from 8afc2b5 to b11e714 Compare September 17, 2021 13:25
@mdedetrich mdedetrich changed the title Add MultiPartUplaod and ListParts APIs Add MultiPartUpload and ListParts APIs Sep 17, 2021
@mdedetrich
Copy link
Contributor Author

So one thing that I have noticed is that locally the tests for multipart upload that I have added work when I run them against a real S3 instance however the TravisCI in the gihub repo is failing these tests.

I think this may be due to the fact that TravisCI is doing tests against Minio and I suppose that Minio doesn't have the same behavior as the real S3 in this regard?

@ennru
Copy link
Member

ennru commented Sep 17, 2021

I think this may be due to the fact that TravisCI is doing tests against Minio and I suppose that Minio doesn't have the same behavior as the real S3 in this regard?

That sounds likely. Did you try to use MinIO locally?

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch 2 times, most recently from 78366e0 to df122c6 Compare September 17, 2021 13:49
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Sep 17, 2021

That sounds likely. Did you try to use MinIO locally?

That is next on the list, I have just finished updating the initial test (good news, I can validate that aborting multipart upload and the result of listing current aborted multipart uploads is behaving 100% as I expect for actual S3) so I am going to look into why Minio isn't working.

Apart from this I need to implement the transparent resuming capability and it should be good to go.

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch 3 times, most recently from 9bf205c to 05e0026 Compare September 17, 2021 14:53
@mdedetrich
Copy link
Contributor Author

@ennru I just ran minio tests locally and can confirm they also fail in the exact same way Travis CI does, so it appears that minio doesn't have the same behavior for list aborted multipart uploads.

Will have a look if there is an upstream issue on this, also need to figure out how to disable this test only when MinioS3IntegrationSpec is run.

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch from 05e0026 to 1866209 Compare September 18, 2021 13:09
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Sep 18, 2021

So I just checked upstream on minio about this and it seems this change is intentional however I don't agree with the reasoning (see minio/minio#5613 (comment)). I have created a new issue at minio/minio#13246 but for now I have disabled this test for minio specifically

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Sep 20, 2021

Okay so an update coming from minio/minio#13246 (comment), basically its not intended to use the state on S3 server when completing previously aborted multipart uploads, instead the state is designed to be maintained locally on the client (which is how typical S3 clients including alpakka works).

I didn't actually get a completely clear answer about what actual issues are with using the list multipart uploads route, from what I gathered there are concurrency issues (which I assume occur when you are doing concurrent uploads to a single uploadId and/or doing concurrent list multipart upload requests while a multipart upload is still progressing). For my personal use case neither of these situations occur but nevertheless it is documented on https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html

An in-progress multipart upload is an upload that you have initiated, but have not yet completed or stopped. Each request returns at most 1,000 multipart uploads. If there are more than 1,000 multipart uploads in progress, you need to send additional requests to retrieve the remaining multipart uploads. Only use the returned listing for verification. You should not use the result of this listing when sending a complete multipart upload request. Instead, maintain your own list of the part numbers you specified when uploading parts and the corresponding ETag values that Amazon S3 returns

That we shouldn't be using this method of resuming a multipart upload.

Given this, creating a public S3.resumableUpload as originally planned isn't really appropriate because S3 themselves advertise that it shouldn't be used that way (even though from my testing with the limitations I said previously its completely reliable). Going forward I believe the most appropriate path is that instead of implementing S3.resumableUpload I will make

private def completeMultipartUpload(s3Location: S3Location,
parts: Seq[SuccessfulUploadPart],
sse: Option[ServerSideEncryption])(
implicit mat: Materializer,
attr: Attributes
): Future[CompleteMultipartUploadResult] = {
public with a nice API and adjust
/**
* Uploads a S3 Object by making multiple requests
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param contentType an optional [[akka.http.scaladsl.model.ContentType ContentType]]
* @param metaHeaders any meta-headers you want to add
* @param cannedAcl a [[CannedAcl]], defaults to [[CannedAcl.Private]]
* @param chunkSize the size of the requests sent to S3, minimum [[MinChunkSize]]
* @param chunkingParallelism the number of parallel requests used for the upload, defaults to 4
* @return a [[akka.stream.scaladsl.Sink Sink]] that accepts [[ByteString]]'s and materializes to a [[scala.concurrent.Future Future]] of [[MultipartUploadResult]]
*/
def multipartUpload(
bucket: String,
key: String,
contentType: ContentType = ContentTypes.`application/octet-stream`,
metaHeaders: MetaHeaders = MetaHeaders(Map()),
cannedAcl: CannedAcl = CannedAcl.Private,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
sse: Option[ServerSideEncryption] = None
): Sink[ByteString, Future[MultipartUploadResult]] = {
val headers =
S3Headers.empty.withCannedAcl(cannedAcl).withMetaHeaders(metaHeaders).withOptionalServerSideEncryption(sse)
multipartUploadWithHeaders(bucket, key, contentType, chunkSize, chunkingParallelism, headers)
}
so that it can accept an initial sequence of partNumber/etag along with an uploadId that will allow you to resume an already existing upload that has been somehow paused.

This means that its up to the S3 user how they handle uploads, we give them the tools and can even document that its not recommend to use parts retrieved from the list multipart uploads API

@ennru Do you agree with this?

@ennru
Copy link
Member

ennru commented Sep 20, 2021

Thank you for checking the underlying recommendations. Sounds good to me.

Instead of making akka.stream.alpakka.s3.impl.S3Stream.completeMultipartUpload part of the public API directly, you should offer a delegating method in the javadsl/scaladsl.S3 classes to support this use case (but I assume that is what you meant).

@mdedetrich
Copy link
Contributor Author

(but I assume that is what you meant)

Indeed that is what I meant, thanks for confirming I will proceed with the changes.

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch 3 times, most recently from 88e4095 to e7cccd7 Compare September 22, 2021 14:19
@mdedetrich mdedetrich marked this pull request as ready for review September 22, 2021 14:20
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Sep 22, 2021

@ennru The PR is now ready, some additional notes

  • Originally I wanted to adjust the multipartUpload to allow resuming from a previously aborted multipart upload however this wasn't so nice from an API perspective so instead I created a new S3.resumeMultipartUpload method which I find more clear and nicer
  • I also wanted to make S3Stream.completeMultipartUpload public however I realize that I didn't actually need this and so in the interests of not dragging out the PR even longer I decided to leave it for now. If I do end up needing it I will just create a future PR
  • For the Scala API we are using immutable.Iterable to pass in the Part's where as for the Java API we just use the Java Iterable. immutable.Iterable is used in the Scala API because thats what Source.apply requires where as Java I just used what was the closest comparable collection (java.lang.Iterable).
  • The createSlowSource function that is used when created a throttled source exists creates a pacing of 10 seconds per element. This may seem slow but each part is a 5 meg file so we need enough time to upload those 5 megs to S3 for a internet connection that is reasonable.

I added a test which covers the whole scenario, i.e. created a multipart upload, aborting it with an exception (via a killswitch), using S3.listMultipartUpload/S3.listParts to get the previously uploaded parts and then feeding that into the new S3.resumeMultipartUpload to complete the upload. The test finally compares the contents of the downloaded file to make sure its exactly the same.

I have also disabled the tests when they are run via MinioS3IntegrationSpec since Minio doesn't support S3.listMultipartUpload/S3.listParts (and they are not willing to support it in the future). Due to Alpakka CI not running the tests against an actual S3 test account this means you need to run the tests locally against your own S3 account. I am willing to spend the time to integrate S3 tests into Alpakka CI if you are open to do so but Lightbend would need an S3 account to do this and then put the credentials into Travis (or I guess Github Actions if you are planning to migrate at some point).

Let me know if anything else is needed!

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch from e7cccd7 to e5acfa3 Compare September 23, 2021 13:50
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Sep 23, 2021

I actually ended up adding the S3.completeMultipartUpload (as an additional commit) since it was really trivial and I realized that I require it

@mdedetrich mdedetrich force-pushed the add-listuploadmultipart branch from e5acfa3 to 88f3fdf Compare September 29, 2021 09:05
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a chunk of great work @mdedetrich!
LGTM.


case object Starting extends ListBucketState
final case class Starting[T]() extends S3PaginationState[T]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not fight for it. It is internal API anyway.

@@ -114,6 +115,244 @@ object MultipartUploadResult {
)
}

final class AWSIdentity private (val id: String, val displayName: String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants