Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Add MultiPartUpload and ListParts APIs #2730

Merged
merged 6 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,15 @@ private[s3] object Utils {
val tail = if (trimmed.startsWith("\"")) trimmed.drop(1) else trimmed
if (tail.endsWith("\"")) tail.dropRight(1) else tail
}

/**
* This method returns `None` if given an empty `String`. This is typically used when parsing
* XML since its common to have XML elements with an empty text value inside.
*/
def emptyStringToOption(value: String): Option[String] =
if (value == "")
None
else
Some(value)

}
68 changes: 68 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,57 @@ import scala.concurrent.{ExecutionContext, Future}
.withUri(requestUri(bucket, None).withQuery(query))
}

def listMultipartUploads(
bucket: String,
prefix: Option[String] = None,
continuationToken: Option[ListMultipartUploadContinuationToken] = None,
delimiter: Option[String] = None,
headers: Seq[HttpHeader] = Nil
)(implicit conf: S3Settings): HttpRequest = {

val baseQuery = Seq(
"prefix" -> prefix,
"delimiter" -> delimiter,
"key-marker" -> continuationToken.map(_.nextKeyMarker),
"upload-id-marker" -> continuationToken.map(_.nextUploadIdMarker)
).collect { case (k, Some(v)) => k -> v }.toMap

// We need to manually construct a query here because the Uri for getting a list of multipart uploads requires a
// query param `uploads` which has no value and the current Query dsl doesn't support mixing query params that have
// values with query params that do not have values
val query =
if (baseQuery.isEmpty)
Query("uploads")
else {
val rest = baseQuery.map { case (k, v) => s"$k=$v" }.mkString("&")
Query(s"uploads&$rest")
}

HttpRequest(HttpMethods.GET)
.withHeaders(Host(requestAuthority(bucket, conf.s3RegionProvider.getRegion)) +: headers)
.withUri(requestUri(bucket, None).withQuery(query))
}

def listParts(
bucket: String,
key: String,
uploadId: String,
continuationToken: Option[Int],
headers: Seq[HttpHeader] = Nil
)(implicit conf: S3Settings): HttpRequest = {

val query = Query(
Seq(
"part-number-marker" -> continuationToken.map(_.toString),
"uploadId" -> Some(uploadId)
).collect { case (k, Some(v)) => k -> v }.toMap
)

HttpRequest(HttpMethods.GET)
.withHeaders(Host(requestAuthority(bucket, conf.s3RegionProvider.getRegion)) +: headers)
.withUri(requestUri(bucket, Some(key)).withQuery(query))
}

def getDownloadRequest(s3Location: S3Location,
method: HttpMethod = HttpMethods.GET,
s3Headers: Seq[HttpHeader] = Seq.empty,
Expand All @@ -75,6 +126,23 @@ import scala.concurrent.{ExecutionContext, Future}
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def uploadManagementRequest(
s3Location: S3Location,
uploadId: String,
method: HttpMethod,
headers: Seq[HttpHeader] = Seq.empty[HttpHeader]
)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location,
method,
_.withQuery(
Query(
Map(
"uploadId" -> uploadId
)
)
))
.withDefaultHeaders(headers)

def uploadRequest(s3Location: S3Location,
payload: Source[ByteString, _],
contentLength: Long,
Expand Down
107 changes: 106 additions & 1 deletion s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.InternalApi
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes, Uri}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.stream.alpakka.s3.{ListBucketResultCommonPrefixes, ListBucketResultContents, Utils}
import akka.stream.alpakka.s3._

import scala.util.Try
import scala.xml.NodeSeq
Expand Down Expand Up @@ -90,4 +90,109 @@ import scala.xml.NodeSeq
CopyPartResult(lastModified, Utils.removeQuotes(eTag))
}
}

implicit val listMultipartUploadsResultUnmarshaller: FromEntityUnmarshaller[ListMultipartUploadsResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
val bucket = (x \ "Bucket").text
val keyMarker = (x \ "KeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
val uploadIdMarker = (x \ "UploadIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
val nextKeyMarker = (x \ "NextKeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
val nextUploadIdMarker = (x \ "NextUploadIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
val delimiter = (x \ "Delimiter").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
val maxUploads = (x \ "MaxUploads").text.toInt
val truncated = (x \ isTruncated).text == "true"
val uploads = (x \\ "Upload").map { u =>
val key = (u \ "Key").text
val uploadId = (u \ "UploadId").text

val initiator = (u \\ "Initiator").map { i =>
val id = (i \ "ID").text
val displayName = (i \ "DisplayName").text
AWSIdentity(id, displayName)
}.head

val owner = (u \\ "Owner").map { o =>
val id = (o \ "ID").text
val displayName = (o \ "DisplayName").text
AWSIdentity(id, displayName)
}.head

val storageClass = (u \ "StorageClass").text
val initiated = Instant.parse((u \ "Initiated").text)

ListMultipartUploadResultUploads(key, uploadId, initiator, owner, storageClass, initiated)
}

val commonPrefixes = (x \ "CommonPrefixes").map { cp =>
ListMultipartUploadResultCommonPrefixes((cp \ "Prefix").text)
}

ListMultipartUploadsResult(bucket,
keyMarker,
uploadIdMarker,
nextKeyMarker,
nextUploadIdMarker,
delimiter,
maxUploads,
truncated,
uploads,
commonPrefixes)
}
}

implicit val listPartsResultUnmarshaller: FromEntityUnmarshaller[ListPartsResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
val bucket = (x \ "Bucket").text
val key = (x \ "Key").text
val uploadId = (x \ "UploadId").text
val partNumberMarker =
(x \ "PartNumberMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)).map(_.toInt)
val nextPartNumberMarker =
(x \ "NextPartNumberMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)).map(_.toInt)

val maxParts = (x \ "MaxParts").text.toInt
val truncated = (x \ isTruncated).text == "true"

val parts = (x \\ "Part").map { u =>
val lastModified = Instant.parse((u \ "LastModified").text)
val eTag = (u \ "ETag").text
val partNumber = (u \\ "PartNumber").text.toInt
val size = (u \\ "Size").text.toLong

ListPartsResultParts(lastModified, eTag, partNumber, size)
}

val initiator = (x \\ "Initiator").map { i =>
val id = (i \ "ID").text
val displayName = (i \ "DisplayName").text
AWSIdentity(id, displayName)
}.head

val owner = (x \\ "Owner").map { o =>
val id = (o \ "ID").text
val displayName = (o \ "DisplayName").text
AWSIdentity(id, displayName)
}.head

val storageClass = (x \ "StorageClass").text

ListPartsResult(
bucket,
key,
uploadId,
partNumberMarker,
nextPartNumberMarker,
maxParts,
truncated,
parts,
initiator,
owner,
storageClass
)
}
}
}
Loading