Skip to content

Commit

Permalink
S3: Allow bucket creation besides us-east-1 for AWS (#2742)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich authored Oct 6, 2021
1 parent 5f924a3 commit 27fdecf
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 15 deletions.
11 changes: 11 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ import scala.concurrent.{ExecutionContext, Future}
}
}

def createBucketRegionPayload(region: Region)(implicit ec: ExecutionContext): Future[RequestEntity] = {
//Do not let the start LocationConstraint be on different lines
// They tend to get split when this file is formatted by IntelliJ unless http://stackoverflow.com/a/19492318/1216965
// @formatter:off
val payload = <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LocationConstraint>{region.id()}</LocationConstraint>
</CreateBucketConfiguration>
// @formatter:on
Marshal(payload).to[RequestEntity]
}

def uploadCopyPartRequest(multipartCopy: MultipartCopy,
sourceVersionId: Option[String] = None,
s3Headers: Seq[HttpHeader] = Seq.empty)(implicit conf: S3Settings): HttpRequest = {
Expand Down
64 changes: 49 additions & 15 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.stream.scaladsl.{Flow, Keep, RetryFlow, RunnableGraph, Sink, Source,
import akka.stream.{Attributes, Materializer}
import akka.util.ByteString
import akka.{Done, NotUsed}
import software.amazon.awssdk.regions.Region

import scala.collection.immutable
import scala.collection.immutable.Seq
Expand Down Expand Up @@ -543,14 +544,32 @@ import scala.util.{Failure, Success, Try}
private def bucketManagementRequest(bucket: String)(method: HttpMethod, conf: S3Settings): HttpRequest =
HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""), method)(conf)

def makeBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketManagementRequest(bucket),
headers.headersFor(MakeBucket),
process = processS3LifecycleResponse
)
def makeBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val conf: S3Settings = resolveSettings(attr, mat.system)

val region = conf.getS3RegionProvider.getRegion

// If region is US_EAST_1 then the location constraint is not required
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateBucket.html
val maybeRegionPayload = region match {
case Region.US_EAST_1 => None
case region =>
Some(HttpRequests.createBucketRegionPayload(region)(ExecutionContexts.parasitic))
}

s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketManagementRequest(bucket),
headers.headersFor(MakeBucket),
process = processS3LifecycleResponse,
httpEntity = maybeRegionPayload
)
}
.mapMaterializedValue(_ => NotUsed)
}

def makeBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
makeBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)
Expand Down Expand Up @@ -602,7 +621,8 @@ import scala.util.{Failure, Success, Try}
method: HttpMethod,
httpRequest: (HttpMethod, S3Settings) => HttpRequest,
headers: Seq[HttpHeader],
process: (HttpResponse, Materializer) => Future[T]
process: (HttpResponse, Materializer) => Future[T],
httpEntity: Option[Future[RequestEntity]] = None
): Source[T, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
Expand All @@ -611,12 +631,26 @@ import scala.util.{Failure, Success, Try}
implicit val sys: ActorSystem = mat.system
val conf: S3Settings = resolveSettings(attr, mat.system)

signAndRequest(
requestHeaders(
httpRequest(method, conf),
None
)
).mapAsync(1) { response =>
val baseSource = httpEntity match {
case Some(requestEntity) =>
Source.future(requestEntity).flatMapConcat { requestEntity =>
signAndRequest(
requestHeaders(
httpRequest(method, conf).withEntity(requestEntity),
None
)
)
}
case None =>
signAndRequest(
requestHeaders(
httpRequest(method, conf),
None
)
)
}

baseSource.mapAsync(1) { response =>
process(response, mat)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,27 @@ trait S3IntegrationSpec
request.futureValue should equal((Done, Done))
}

it should "create a bucket in the non default us-east-1 region" in {
val bucketName = "samplebucketotherregion"

val request = for {
_ <- S3
.makeBucketSource(bucketName)
.withAttributes(S3Attributes.settings(otherRegionSettingsPathStyleAccess))
.runWith(Sink.head)
result <- S3
.checkIfBucketExistsSource(bucketName)
.withAttributes(S3Attributes.settings(otherRegionSettingsPathStyleAccess))
.runWith(Sink.head)
_ <- S3
.deleteBucketSource(bucketName)
.withAttributes(S3Attributes.settings(otherRegionSettingsPathStyleAccess))
.runWith(Sink.head)
} yield result

request.futureValue shouldEqual AccessGranted
}

it should "throw an exception while deleting bucket that doesn't exist" in {
implicit val attr: Attributes = attributes
S3.deleteBucket(nonExistingBucket).failed.futureValue shouldBe an[S3Exception]
Expand Down

0 comments on commit 27fdecf

Please sign in to comment.