Skip to content

Commit

Permalink
Add AbortMultipartUpload API to S3 API
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 18, 2021
1 parent ca25522 commit 1866209
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 1 deletion.
17 changes: 17 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ import scala.concurrent.{ExecutionContext, Future}
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def uploadManagementRequest(
s3Location: S3Location,
uploadId: String,
method: HttpMethod,
headers: Seq[HttpHeader] = Seq.empty[HttpHeader]
)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location,
method,
_.withQuery(
Query(
Map(
"uploadId" -> uploadId
)
)
))
.withDefaultHeaders(headers)

def uploadRequest(s3Location: S3Location,
payload: Source[ByteString, _],
contentLength: Long,
Expand Down
17 changes: 17 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,23 @@ import scala.util.{Failure, Success, Try}
attr: Attributes): Future[BucketAccess] =
checkIfBucketExistsSource(bucket, headers).withAttributes(attr).runWith(Sink.head)

private def uploadManagementRequest(bucket: String, key: String, uploadId: String)(method: HttpMethod,
conf: S3Settings): HttpRequest =
HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId, method)(conf)

def deleteUploadSource(bucket: String, key: String, uploadId: String, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
httpRequest = uploadManagementRequest(bucket, key, uploadId),
headers.headersFor(DeleteBucket),
process = processS3LifecycleResponse
)

def deleteUpload(bucket: String, key: String, uploadId: String, headers: S3Headers)(implicit mat: Materializer,
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId, headers).withAttributes(attr).runWith(Sink.ignore)

private def s3ManagementRequest[T](
bucket: String,
method: HttpMethod,
Expand Down
64 changes: 64 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,70 @@ object S3 {
def checkIfBucketExistsSource(bucketName: String, s3Headers: S3Headers): Source[BucketAccess, NotUsed] =
S3Stream.checkIfBucketExistsSource(bucketName, s3Headers).asJava

/**
* Delete all existing parts for a specific upload id
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUpload(bucketName: String, key: String, uploadId: String)(
implicit system: ClassicActorSystemProvider,
attributes: Attributes = Attributes()
): CompletionStage[Done] =
deleteUpload(bucketName, key, uploadId, S3Headers.empty)

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @param s3Headers any headers you want to add
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUpload(
bucketName: String,
key: String,
uploadId: String,
s3Headers: S3Headers
)(implicit system: ClassicActorSystemProvider, attributes: Attributes): CompletionStage[Done] =
S3Stream
.deleteUpload(bucketName, key, uploadId, s3Headers)(SystemMaterializer(system).materializer, attributes)
.toJava

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUploadSource(bucketName: String, key: String, uploadId: String): Source[Done, NotUsed] =
deleteUploadSource(bucketName, key, uploadId, S3Headers.empty)

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
*
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @param s3Headers any headers you want to add
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUploadSource(bucketName: String,
key: String,
uploadId: String,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream.deleteUploadSource(bucketName, key, uploadId, s3Headers).asJava

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
Expand Down
63 changes: 63 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,67 @@ object S3 {
*/
def checkIfBucketExistsSource(bucketName: String, s3Headers: S3Headers): Source[BucketAccess, NotUsed] =
S3Stream.checkIfBucketExistsSource(bucketName, s3Headers)

/**
* Delete all existing parts for a specific upload id
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
*
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @return [[scala.concurrent.Future Future]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUpload(bucketName: String, key: String, uploadId: String)(
implicit system: ClassicActorSystemProvider,
attributes: Attributes = Attributes()
): Future[Done] =
deleteUpload(bucketName, key, uploadId, S3Headers.empty)

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @param s3Headers any headers you want to add
* @return [[scala.concurrent.Future Future]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUpload(
bucketName: String,
key: String,
uploadId: String,
s3Headers: S3Headers
)(implicit system: ClassicActorSystemProvider, attributes: Attributes): Future[Done] =
S3Stream.deleteUpload(bucketName, key, uploadId, s3Headers)

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @return [[akka.stream.scaladsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUploadSource(bucketName: String, key: String, uploadId: String): Source[Done, NotUsed] =
deleteUploadSource(bucketName, key, uploadId, S3Headers.empty)

/**
* Delete all existing parts for a specific upload
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
*
* @param bucketName Which bucket the upload is inside
* @param key The key for the upload
* @param uploadId Unique identifier of the upload
* @param s3Headers any headers you want to add
* @return [[akka.stream.scaladsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteUploadSource(bucketName: String,
key: String,
uploadId: String,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream.deleteUploadSource(bucketName, key, uploadId, s3Headers)
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ trait S3IntegrationSpec
slowSource.toMat(S3.multipartUpload(defaultBucket, sourceKey).withAttributes(attributes))(Keep.right).run

val results = for {
_ <- akka.pattern.after(20.seconds)(Future {
_ <- akka.pattern.after(25.seconds)(Future {
sharedKillSwitch.abort(AbortException)
})
_ <- multiPartUpload.recover {
Expand All @@ -473,6 +473,10 @@ trait S3IntegrationSpec
parts <- Future.sequence(uploadIds.map { uploadId =>
S3.listParts(defaultBucket, sourceKey, uploadId).runWith(Sink.seq)
})
// Cleanup the uploads after
_ <- Future.sequence(uploadIds.map { uploadId =>
S3.deleteUpload(defaultBucket, sourceKey, uploadId)
})
} yield (uploadIds, incomplete, parts.flatten)

whenReady(results) {
Expand Down

0 comments on commit 1866209

Please sign in to comment.