diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index ae1f3291ad..1cb25f904e 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -670,6 +670,22 @@ import scala.util.{Failure, Success, Try} chunkAndRequest(s3Location, contentType, s3Headers, chunkSize)(chunkingParallelism) .toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right) + def resumeMultipartUpload(s3Location: S3Location, + uploadId: String, + previousParts: immutable.Iterable[Part], + contentType: ContentType = ContentTypes.`application/octet-stream`, + s3Headers: S3Headers, + chunkSize: Int = MinChunkSize, + chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] = { + val initialUpload = Some((uploadId, previousParts.size + 1)) + val successfulParts = previousParts.map { part => + SuccessfulUploadPart(MultipartUpload(s3Location, uploadId), part.partNumber, part.eTag) + } + chunkAndRequest(s3Location, contentType, s3Headers, chunkSize, initialUpload)(chunkingParallelism) + .prepend(Source(successfulParts)) + .toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right) + } + private def initiateMultipartUpload(s3Location: S3Location, contentType: ContentType, s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] = @@ -821,7 +837,8 @@ import scala.util.{Failure, Success, Try} s3Location: S3Location, contentType: ContentType, s3Headers: S3Headers, - chunkSize: Int + chunkSize: Int, + initialUploadState: Option[(String, Int)] = None )(parallelism: Int): Flow[ByteString, UploadPartResponse, NotUsed] = { def getChunkBuffer(chunkSize: Int, bufferSize: Int, maxRetriesPerChunk: Int)(implicit settings: S3Settings) = @@ -846,10 +863,20 @@ import scala.util.{Failure, Success, Try} val chunkBufferSize = chunkSize * 2 - // First step of the multi part upload process is made. - // The response is then used to construct the subsequent individual upload part requests - val requestInfo: Source[(MultipartUpload, Int), NotUsed] = - initiateUpload(s3Location, contentType, s3Headers.headersFor(InitiateMultipartUpload)) + val requestInfoOrInitialUploadState = initialUploadState match { + case Some((uploadId, initialIndex)) => + // We are resuming from a previously aborted Multipart upload so rather than creating a new MultipartUpload + // resource we just need to set up the initial state + Source + .single(s3Location) + .flatMapConcat(_ => Source.single(MultipartUpload(s3Location, uploadId))) + .mapConcat(r => Stream.continually(r)) + .zip(Source.fromIterator(() => Iterator.from(initialIndex))) + case None => + // First step of the multi part upload process is made. + // The response is then used to construct the subsequent individual upload part requests + initiateUpload(s3Location, contentType, s3Headers.headersFor(InitiateMultipartUpload)) + } val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart)) @@ -891,7 +918,7 @@ import scala.util.{Failure, Success, Try} .mergeSubstreamsWithParallelism(parallelism) .filter(_.size > 0) .via(atLeastOne) - .zip(requestInfo) + .zip(requestInfoOrInitialUploadState) .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism }) // Allow requests that fail with transient errors to be retried, using the already buffered chunk. .via(RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, retriableFlow) { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala index 9e9853b540..d033722da8 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala @@ -641,6 +641,72 @@ object S3 { def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] = multipartUpload(bucket, key, ContentTypes.APPLICATION_OCTET_STREAM) + /** + * Uploads a S3 Object by making multiple requests + * + * @param bucket the s3 bucket name + * @param key the s3 object key + * @param uploadId the upload that you want to resume + * @param previousParts The previously uploaded parts ending just before when this upload will commence + * @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]] + * @param s3Headers any headers you want to add + * @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]] + */ + def resumeMultipartUpload(bucket: String, + key: String, + uploadId: String, + previousParts: java.lang.Iterable[Part], + contentType: ContentType, + s3Headers: S3Headers): Sink[ByteString, CompletionStage[MultipartUploadResult]] = { + S3Stream + .resumeMultipartUpload(S3Location(bucket, key), + uploadId, + previousParts.asScala.toList, + contentType.asInstanceOf[ScalaContentType], + s3Headers) + .mapMaterializedValue(_.toJava) + .asJava + } + + /** + * Uploads a S3 Object by making multiple requests + * + * @param bucket the s3 bucket name + * @param key the s3 object key + * @param uploadId the upload that you want to resume + * @param previousParts The previously uploaded parts ending just before when this upload will commence + * @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]] + * @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]] + */ + def resumeMultipartUpload(bucket: String, + key: String, + uploadId: String, + previousParts: java.lang.Iterable[Part], + contentType: ContentType): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + resumeMultipartUpload(bucket, + key, + uploadId, + previousParts, + contentType, + S3Headers.empty.withCannedAcl(CannedAcl.Private)) + + /** + * Uploads a S3 Object by making multiple requests + * + * @param bucket the s3 bucket name + * @param key the s3 object key + * @param uploadId the upload that you want to resume + * @param previousParts The previously uploaded parts ending just before when this upload will commence + * @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]] + */ + def resumeMultipartUpload( + bucket: String, + key: String, + uploadId: String, + previousParts: java.lang.Iterable[Part] + ): Sink[ByteString, CompletionStage[MultipartUploadResult]] = + resumeMultipartUpload(bucket, key, uploadId, previousParts, ContentTypes.APPLICATION_OCTET_STREAM) + /** * Copy a S3 Object by making multiple requests. * diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/model.scala b/s3/src/main/scala/akka/stream/alpakka/s3/model.scala index 5a78d2dcdf..e005351131 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/model.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/model.scala @@ -343,6 +343,8 @@ final class ListPartsResultParts(val lastModified: Instant, val eTag: String, va override def hashCode(): Int = Objects.hash(lastModified, eTag, Int.box(partNumber), Long.box(size)) + + def toPart: Part = Part(eTag, partNumber) } object ListPartsResultParts { @@ -356,6 +358,47 @@ object ListPartsResultParts { apply(lastModified, eTag, partNumber, size) } +final class Part(val eTag: String, val partNumber: Int) { + + /** Java API */ + def getETag: String = eTag + + /** Java API */ + def getPartNumber: Int = partNumber + + def withETag(value: String): Part = copy(eTag = value) + + def withPartNumber(value: Int): Part = copy(partNumber = value) + + private def copy(eTag: String = eTag, partNumber: Int = partNumber): Part = new Part(eTag, partNumber) + + override def toString: String = + "Part(" + + s"eTag=$eTag," + + s"partNumber=$partNumber" + + ")" + + override def equals(other: Any): Boolean = + other match { + case that: Part => + Objects.equals(this.eTag, that.eTag) && + Objects.equals(this.partNumber, that.partNumber) + } + + override def hashCode(): Int = + Objects.hash(this.eTag, Int.box(this.partNumber)) + +} + +object Part { + + /** Scala API */ + def apply(eTag: String, partNumber: Int): Part = new Part(eTag, partNumber) + + /** Java API */ + def create(eTag: String, partNumber: Int): Part = new Part(eTag, partNumber) +} + /** * Thrown when multipart upload or multipart copy fails because of a server failure. */ diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala index 5fd2c9d7b2..e74d878513 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala @@ -14,6 +14,7 @@ import akka.stream.scaladsl.{RunnableGraph, Sink, Source} import akka.util.ByteString import akka.{Done, NotUsed} +import scala.collection.immutable import scala.concurrent.Future /** @@ -388,6 +389,78 @@ object S3 { chunkingParallelism ) + /** + * Resumes from a previously aborted multipart upload by providing the uploadId and previous upload part identifiers + * + * @param bucket the s3 bucket name + * @param key the s3 object key + * @param uploadId the upload that you want to resume + * @param previousParts The previously uploaded parts ending just before when this upload will commence + * @param contentType an optional [[akka.http.scaladsl.model.ContentType ContentType]] + * @param metaHeaders any meta-headers you want to add + * @param cannedAcl a [[CannedAcl]], defaults to [[CannedAcl.Private]] + * @param chunkSize the size of the requests sent to S3, minimum [[MinChunkSize]] + * @param chunkingParallelism the number of parallel requests used for the upload, defaults to 4 + * @return a [[akka.stream.scaladsl.Sink Sink]] that accepts [[ByteString]]'s and materializes to a [[scala.concurrent.Future Future]] of [[MultipartUploadResult]] + */ + def resumeMultipartUpload( + bucket: String, + key: String, + uploadId: String, + previousParts: immutable.Iterable[Part], + contentType: ContentType = ContentTypes.`application/octet-stream`, + metaHeaders: MetaHeaders = MetaHeaders(Map()), + cannedAcl: CannedAcl = CannedAcl.Private, + chunkSize: Int = MinChunkSize, + chunkingParallelism: Int = 4, + sse: Option[ServerSideEncryption] = None + ): Sink[ByteString, Future[MultipartUploadResult]] = { + val headers = + S3Headers.empty.withCannedAcl(cannedAcl).withMetaHeaders(metaHeaders).withOptionalServerSideEncryption(sse) + resumeMultipartUploadWithHeaders(bucket, + key, + uploadId, + previousParts, + contentType, + chunkSize, + chunkingParallelism, + headers) + } + + /** + * Resumes from a previously aborted multipart upload by providing the uploadId and previous upload part identifiers + * + * @param bucket the s3 bucket name + * @param key the s3 object key + * @param uploadId the upload that you want to resume + * @param previousParts The previously uploaded parts ending just before when this upload will commence + * @param contentType an optional [[akka.http.scaladsl.model.ContentType ContentType]] + * @param chunkSize the size of the requests sent to S3, minimum [[MinChunkSize]] + * @param chunkingParallelism the number of parallel requests used for the upload, defaults to 4 + * @param s3Headers any headers you want to add + * @return a [[akka.stream.scaladsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[scala.concurrent.Future Future]] of [[MultipartUploadResult]] + */ + def resumeMultipartUploadWithHeaders( + bucket: String, + key: String, + uploadId: String, + previousParts: immutable.Iterable[Part], + contentType: ContentType = ContentTypes.`application/octet-stream`, + chunkSize: Int = MinChunkSize, + chunkingParallelism: Int = 4, + s3Headers: S3Headers = S3Headers.empty + ): Sink[ByteString, Future[MultipartUploadResult]] = + S3Stream + .resumeMultipartUpload( + S3Location(bucket, key), + uploadId, + previousParts, + contentType, + s3Headers, + chunkSize, + chunkingParallelism + ) + /** * Copy an S3 object from source bucket to target bucket using multi part copy upload. * diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3IntegrationSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3IntegrationSpec.scala index 79956457cf..de461fc495 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3IntegrationSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/scaladsl/S3IntegrationSpec.scala @@ -494,6 +494,71 @@ trait S3IntegrationSpec } } + it should "upload 1 file slowly, cancel it and then resume it to complete the upload" in { + // This test doesn't work on Minio since minio doesn't properly implement this API, see + // https://github.com/minio/minio/issues/13246 + assume(this.isInstanceOf[AWSS3IntegrationSpec]) + val sourceKey = "original/file-slow-2.txt" + val sharedKillSwitch = KillSwitches.shared("abort-multipart-upload-2") + + val inputData = createStringCollectionWithMinChunkSize(6) + + val slowSource = createSlowSource(inputData, Some(sharedKillSwitch)) + + val multiPartUpload = + slowSource + .toMat(S3.multipartUpload(defaultBucket, sourceKey).withAttributes(attributes))( + Keep.right + ) + .run + + val results = for { + _ <- akka.pattern.after(25.seconds)(Future { + sharedKillSwitch.abort(AbortException) + }) + _ <- multiPartUpload.recover { + case AbortException => () + } + incomplete <- S3.listMultipartUpload(defaultBucket, None).withAttributes(attributes).runWith(Sink.seq) + + uploadId = incomplete.collectFirst { + case uploadPart if uploadPart.key == sourceKey => uploadPart.uploadId + }.get + + parts <- S3.listParts(defaultBucket, sourceKey, uploadId).runWith(Sink.seq) + + remainingData = inputData.slice(3, 6) + _ <- Source(remainingData) + .toMat( + S3.resumeMultipartUpload(defaultBucket, sourceKey, uploadId, parts.map(_.toPart)) + .withAttributes(attributes) + )( + Keep.right + ) + .run + + // This delay is here because sometimes there is a delay when you complete a large file and its + // actually downloadable + downloaded <- akka.pattern.after(5.seconds)( + S3.download(defaultBucket, sourceKey).withAttributes(attributes).runWith(Sink.head).flatMap { + case Some((downloadSource, _)) => + downloadSource + .runWith(Sink.seq) + case None => throw new Exception(s"Expected object in bucket $defaultBucket with key $sourceKey") + } + ) + + _ <- S3.deleteObject(defaultBucket, sourceKey).withAttributes(attributes).runWith(Sink.head) + } yield downloaded + + whenReady(results) { downloads => + val fullDownloadedFile = downloads.fold(ByteString.empty)(_ ++ _) + val fullInputData = inputData.fold(ByteString.empty)(_ ++ _) + + fullInputData.utf8String shouldEqual fullDownloadedFile.utf8String + } + } + it should "make a bucket with given name" in { implicit val attr: Attributes = attributes val bucketName = "samplebucket1"