Skip to content

Commit

Permalink
Generalize bucketManagementRequest into s3ManagementRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Sep 18, 2021
1 parent 4501e8b commit ca25522
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -540,32 +540,38 @@ import scala.util.{Failure, Success, Try}
case _ => downloadRequest
}

private def bucketManagementRequest(bucket: String)(method: HttpMethod, conf: S3Settings): HttpRequest =
HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""), method)(conf)

def makeBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] =
bucketManagementRequest[Done](
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketManagementRequest(bucket),
headers.headersFor(MakeBucket),
process = processBucketLifecycleResponse
process = processS3LifecycleResponse
)

def makeBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
makeBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)

def deleteBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] =
bucketManagementRequest[Done](
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
httpRequest = bucketManagementRequest(bucket),
headers.headersFor(DeleteBucket),
process = processBucketLifecycleResponse
process = processS3LifecycleResponse
)

def deleteBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
deleteBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)

def checkIfBucketExistsSource(bucketName: String, headers: S3Headers): Source[BucketAccess, NotUsed] =
bucketManagementRequest[BucketAccess](
s3ManagementRequest[BucketAccess](
bucket = bucketName,
method = HttpMethods.HEAD,
httpRequest = bucketManagementRequest(bucketName),
headers.headersFor(CheckBucket),
process = processCheckIfExistsResponse
)
Expand All @@ -574,9 +580,10 @@ import scala.util.{Failure, Success, Try}
attr: Attributes): Future[BucketAccess] =
checkIfBucketExistsSource(bucket, headers).withAttributes(attr).runWith(Sink.head)

private def bucketManagementRequest[T](
private def s3ManagementRequest[T](
bucket: String,
method: HttpMethod,
httpRequest: (HttpMethod, S3Settings) => HttpRequest,
headers: Seq[HttpHeader],
process: (HttpResponse, Materializer) => Future[T]
): Source[T, NotUsed] =
Expand All @@ -585,13 +592,11 @@ import scala.util.{Failure, Success, Try}
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings(attr, mat.system)

val location = S3Location(bucket = bucket, key = "")
val conf: S3Settings = resolveSettings(attr, mat.system)

signAndRequest(
requestHeaders(
HttpRequests.bucketManagementRequest(location, method),
httpRequest(method, conf),
None
)
).mapAsync(1) { response =>
Expand All @@ -600,7 +605,7 @@ import scala.util.{Failure, Success, Try}
}
.mapMaterializedValue(_ => NotUsed)

private def processBucketLifecycleResponse(response: HttpResponse, materializer: Materializer): Future[Done] = {
private def processS3LifecycleResponse(response: HttpResponse, materializer: Materializer): Future[Done] = {
implicit val mat: Materializer = materializer

response match {
Expand Down

0 comments on commit ca25522

Please sign in to comment.