Skip to content

Commit

Permalink
Add resumeMultipartUpload to S3 API
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 22, 2021
1 parent 3ba53a1 commit e7cccd7
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 6 deletions.
39 changes: 33 additions & 6 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,22 @@ import scala.util.{Failure, Success, Try}
chunkAndRequest(s3Location, contentType, s3Headers, chunkSize)(chunkingParallelism)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)

def resumeMultipartUpload(s3Location: S3Location,
uploadId: String,
previousParts: immutable.Iterable[Part],
contentType: ContentType = ContentTypes.`application/octet-stream`,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] = {
val initialUpload = Some((uploadId, previousParts.size + 1))
val successfulParts = previousParts.map { part =>
SuccessfulUploadPart(MultipartUpload(s3Location, uploadId), part.partNumber, part.eTag)
}
chunkAndRequest(s3Location, contentType, s3Headers, chunkSize, initialUpload)(chunkingParallelism)
.prepend(Source(successfulParts))
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)
}

private def initiateMultipartUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] =
Expand Down Expand Up @@ -821,7 +837,8 @@ import scala.util.{Failure, Success, Try}
s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
chunkSize: Int
chunkSize: Int,
initialUploadState: Option[(String, Int)] = None
)(parallelism: Int): Flow[ByteString, UploadPartResponse, NotUsed] = {

def getChunkBuffer(chunkSize: Int, bufferSize: Int, maxRetriesPerChunk: Int)(implicit settings: S3Settings) =
Expand All @@ -846,10 +863,20 @@ import scala.util.{Failure, Success, Try}

val chunkBufferSize = chunkSize * 2

// First step of the multi part upload process is made.
// The response is then used to construct the subsequent individual upload part requests
val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
initiateUpload(s3Location, contentType, s3Headers.headersFor(InitiateMultipartUpload))
val requestInfoOrInitialUploadState = initialUploadState match {
case Some((uploadId, initialIndex)) =>
// We are resuming from a previously aborted Multipart upload so rather than creating a new MultipartUpload
// resource we just need to set up the initial state
Source
.single(s3Location)
.flatMapConcat(_ => Source.single(MultipartUpload(s3Location, uploadId)))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(initialIndex)))
case None =>
// First step of the multi part upload process is made.
// The response is then used to construct the subsequent individual upload part requests
initiateUpload(s3Location, contentType, s3Headers.headersFor(InitiateMultipartUpload))
}

val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart))

Expand Down Expand Up @@ -891,7 +918,7 @@ import scala.util.{Failure, Success, Try}
.mergeSubstreamsWithParallelism(parallelism)
.filter(_.size > 0)
.via(atLeastOne)
.zip(requestInfo)
.zip(requestInfoOrInitialUploadState)
.groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism })
// Allow requests that fail with transient errors to be retried, using the already buffered chunk.
.via(RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, retriableFlow) {
Expand Down
66 changes: 66 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,72 @@ object S3 {
def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, ContentTypes.APPLICATION_OCTET_STREAM)

/**
* Uploads a S3 Object by making multiple requests
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param uploadId the upload that you want to resume
* @param previousParts The previously uploaded parts ending just before when this upload will commence
* @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]]
* @param s3Headers any headers you want to add
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def resumeMultipartUpload(bucket: String,
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part],
contentType: ContentType,
s3Headers: S3Headers): Sink[ByteString, CompletionStage[MultipartUploadResult]] = {
S3Stream
.resumeMultipartUpload(S3Location(bucket, key),
uploadId,
previousParts.asScala.toList,
contentType.asInstanceOf[ScalaContentType],
s3Headers)
.mapMaterializedValue(_.toJava)
.asJava
}

/**
* Uploads a S3 Object by making multiple requests
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param uploadId the upload that you want to resume
* @param previousParts The previously uploaded parts ending just before when this upload will commence
* @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]]
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def resumeMultipartUpload(bucket: String,
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part],
contentType: ContentType): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
resumeMultipartUpload(bucket,
key,
uploadId,
previousParts,
contentType,
S3Headers.empty.withCannedAcl(CannedAcl.Private))

/**
* Uploads a S3 Object by making multiple requests
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param uploadId the upload that you want to resume
* @param previousParts The previously uploaded parts ending just before when this upload will commence
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def resumeMultipartUpload(
bucket: String,
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part]
): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
resumeMultipartUpload(bucket, key, uploadId, previousParts, ContentTypes.APPLICATION_OCTET_STREAM)

/**
* Copy a S3 Object by making multiple requests.
*
Expand Down
43 changes: 43 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ final class ListPartsResultParts(val lastModified: Instant, val eTag: String, va

override def hashCode(): Int =
Objects.hash(lastModified, eTag, Int.box(partNumber), Long.box(size))

def toPart: Part = Part(eTag, partNumber)
}

object ListPartsResultParts {
Expand All @@ -356,6 +358,47 @@ object ListPartsResultParts {
apply(lastModified, eTag, partNumber, size)
}

final class Part(val eTag: String, val partNumber: Int) {

/** Java API */
def getETag: String = eTag

/** Java API */
def getPartNumber: Int = partNumber

def withETag(value: String): Part = copy(eTag = value)

def withPartNumber(value: Int): Part = copy(partNumber = value)

private def copy(eTag: String = eTag, partNumber: Int = partNumber): Part = new Part(eTag, partNumber)

override def toString: String =
"Part(" +
s"eTag=$eTag," +
s"partNumber=$partNumber" +
")"

override def equals(other: Any): Boolean =
other match {
case that: Part =>
Objects.equals(this.eTag, that.eTag) &&
Objects.equals(this.partNumber, that.partNumber)
}

override def hashCode(): Int =
Objects.hash(this.eTag, Int.box(this.partNumber))

}

object Part {

/** Scala API */
def apply(eTag: String, partNumber: Int): Part = new Part(eTag, partNumber)

/** Java API */
def create(eTag: String, partNumber: Int): Part = new Part(eTag, partNumber)
}

/**
* Thrown when multipart upload or multipart copy fails because of a server failure.
*/
Expand Down
73 changes: 73 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import akka.util.ByteString
import akka.{Done, NotUsed}

import scala.collection.immutable
import scala.concurrent.Future

/**
Expand Down Expand Up @@ -388,6 +389,78 @@ object S3 {
chunkingParallelism
)

/**
* Resumes from a previously aborted multipart upload by providing the uploadId and previous upload part identifiers
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param uploadId the upload that you want to resume
* @param previousParts The previously uploaded parts ending just before when this upload will commence
* @param contentType an optional [[akka.http.scaladsl.model.ContentType ContentType]]
* @param metaHeaders any meta-headers you want to add
* @param cannedAcl a [[CannedAcl]], defaults to [[CannedAcl.Private]]
* @param chunkSize the size of the requests sent to S3, minimum [[MinChunkSize]]
* @param chunkingParallelism the number of parallel requests used for the upload, defaults to 4
* @return a [[akka.stream.scaladsl.Sink Sink]] that accepts [[ByteString]]'s and materializes to a [[scala.concurrent.Future Future]] of [[MultipartUploadResult]]
*/
def resumeMultipartUpload(
bucket: String,
key: String,
uploadId: String,
previousParts: immutable.Iterable[Part],
contentType: ContentType = ContentTypes.`application/octet-stream`,
metaHeaders: MetaHeaders = MetaHeaders(Map()),
cannedAcl: CannedAcl = CannedAcl.Private,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
sse: Option[ServerSideEncryption] = None
): Sink[ByteString, Future[MultipartUploadResult]] = {
val headers =
S3Headers.empty.withCannedAcl(cannedAcl).withMetaHeaders(metaHeaders).withOptionalServerSideEncryption(sse)
resumeMultipartUploadWithHeaders(bucket,
key,
uploadId,
previousParts,
contentType,
chunkSize,
chunkingParallelism,
headers)
}

/**
* Resumes from a previously aborted multipart upload by providing the uploadId and previous upload part identifiers
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param uploadId the upload that you want to resume
* @param previousParts The previously uploaded parts ending just before when this upload will commence
* @param contentType an optional [[akka.http.scaladsl.model.ContentType ContentType]]
* @param chunkSize the size of the requests sent to S3, minimum [[MinChunkSize]]
* @param chunkingParallelism the number of parallel requests used for the upload, defaults to 4
* @param s3Headers any headers you want to add
* @return a [[akka.stream.scaladsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[scala.concurrent.Future Future]] of [[MultipartUploadResult]]
*/
def resumeMultipartUploadWithHeaders(
bucket: String,
key: String,
uploadId: String,
previousParts: immutable.Iterable[Part],
contentType: ContentType = ContentTypes.`application/octet-stream`,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
s3Headers: S3Headers = S3Headers.empty
): Sink[ByteString, Future[MultipartUploadResult]] =
S3Stream
.resumeMultipartUpload(
S3Location(bucket, key),
uploadId,
previousParts,
contentType,
s3Headers,
chunkSize,
chunkingParallelism
)

/**
* Copy an S3 object from source bucket to target bucket using multi part copy upload.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,71 @@ trait S3IntegrationSpec
}
}

it should "upload 1 file slowly, cancel it and then resume it to complete the upload" in {
// This test doesn't work on Minio since minio doesn't properly implement this API, see
// https://github.com/minio/minio/issues/13246
assume(this.isInstanceOf[AWSS3IntegrationSpec])
val sourceKey = "original/file-slow-2.txt"
val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2")

val inputData = createStringCollectionWithMinChunkSize(6)

val slowSource = createSlowSource(inputData, Some(sharedKillSwitch))

val multiPartUpload =
slowSource
.toMat(S3.multipartUpload(defaultBucket, sourceKey).withAttributes(attributes))(
Keep.right
)
.run

val results = for {
_ <- akka.pattern.after(25.seconds)(Future {
sharedKillSwitch.abort(AbortException)
})
_ <- multiPartUpload.recover {
case AbortException => ()
}
incomplete <- S3.listMultipartUpload(defaultBucket, None).withAttributes(attributes).runWith(Sink.seq)

uploadId = incomplete.collectFirst {
case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId
}.get

parts <- S3.listParts(defaultBucket, sourceKey, uploadId).runWith(Sink.seq)

remainingData = inputData.slice(3, 6)
_ <- Source(remainingData)
.toMat(
S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId, parts.map(_.toPart))
.withAttributes(attributes)
)(
Keep.right
)
.run

// This delay is here because sometimes there is a delay when you complete a large file and its
// actually downloadable
downloaded <- akka.pattern.after(5.seconds)(
S3.download(defaultBucket, sourceKey).withAttributes(attributes).runWith(Sink.head).flatMap {
case Some((downloadSource, _)) =>
downloadSource
.runWith(Sink.seq)
case None => throw new Exception(s"Expected object in bucket $defaultBucket with key $sourceKey")
}
)

_ <- S3.deleteObject(defaultBucket, sourceKey).withAttributes(attributes).runWith(Sink.head)
} yield downloaded

whenReady(results) { downloads =>
val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _)
val fullInputData = inputData.fold(ByteString.empty)(_ ++ _)

fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String
}
}

it should "make a bucket with given name" in {
implicit val attr: Attributes = attributes
val bucketName = "samplebucket1"
Expand Down

0 comments on commit e7cccd7

Please sign in to comment.