From ceaa570dd7192c800dce10ee70e1352566f92a8d Mon Sep 17 00:00:00 2001 From: George Leung Date: Thu, 19 Aug 2021 06:59:13 -0400 Subject: [PATCH] AWS S3: use strict http entity for in-memory chunks (#2703) --- .../scala/akka/stream/alpakka/s3/impl/Chunk.scala | 15 ++++++++++++++- .../akka/stream/alpakka/s3/impl/DiskBuffer.scala | 2 +- .../stream/alpakka/s3/impl/HttpRequests.scala | 7 +++---- .../stream/alpakka/s3/impl/MemoryBuffer.scala | 3 +-- .../akka/stream/alpakka/s3/impl/S3Stream.scala | 4 ++-- .../stream/alpakka/s3/impl/DiskBufferSpec.scala | 12 +++++++++--- .../stream/alpakka/s3/impl/HttpRequestsSpec.scala | 6 +++--- .../stream/alpakka/s3/impl/MemoryBufferSpec.scala | 4 +++- 8 files changed, 36 insertions(+), 17 deletions(-) diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Chunk.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Chunk.scala index e70df1d1fa..736cc7b794 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Chunk.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Chunk.scala @@ -7,9 +7,22 @@ package akka.stream.alpakka.s3.impl import akka.stream.scaladsl.Source import akka.NotUsed import akka.annotation.InternalApi +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, RequestEntity} import akka.util.ByteString /** * Internal Api */ -@InternalApi private[impl] final case class Chunk(data: Source[ByteString, NotUsed], size: Int) +@InternalApi private[impl] sealed trait Chunk { + def asEntity(): RequestEntity + def size: Int +} + +@InternalApi private[impl] final case class DiskChunk(data: Source[ByteString, NotUsed], size: Int) extends Chunk { + def asEntity(): RequestEntity = HttpEntity(ContentTypes.`application/octet-stream`, size, data) +} + +@InternalApi private[impl] final case class MemoryChunk(data: ByteString) extends Chunk { + def asEntity(): RequestEntity = HttpEntity.Strict(ContentTypes.`application/octet-stream`, data) + def size: Int = data.size +} diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala index 8b6f35398b..9484fa9b8b 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/DiskBuffer.scala @@ -95,7 +95,7 @@ import akka.annotation.InternalApi }(ExecutionContexts.parasitic) NotUsed } - emit(out, Chunk(src, length), () => completeStage()) + emit(out, DiskChunk(src, length), () => completeStage()) } setHandlers(in, out, this) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index c7f5f603d9..7d139a2f23 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -12,7 +12,7 @@ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model.Uri.{Authority, Query} import akka.http.scaladsl.model.headers.{`Raw-Request-URI`, Host, RawHeader} -import akka.http.scaladsl.model.{ContentTypes, RequestEntity, _} +import akka.http.scaladsl.model.{RequestEntity, _} import akka.stream.alpakka.s3.AccessStyle.{PathAccessStyle, VirtualHostAccessStyle} import akka.stream.alpakka.s3.{ApiVersion, S3Settings} import akka.stream.scaladsl.Source @@ -97,15 +97,14 @@ import scala.concurrent.{ExecutionContext, Future} def uploadPartRequest(upload: MultipartUpload, partNumber: Int, - payload: Source[ByteString, _], - payloadSize: Int, + payload: Chunk, s3Headers: Seq[HttpHeader] = Seq.empty)(implicit conf: S3Settings): HttpRequest = s3Request( upload.s3Location, HttpMethods.PUT, _.withQuery(Query("partNumber" -> partNumber.toString, "uploadId" -> upload.uploadId)) ).withDefaultHeaders(s3Headers) - .withEntity(HttpEntity(ContentTypes.`application/octet-stream`, payloadSize, payload)) + .withEntity(payload.asEntity()) def completeMultipartUploadRequest(upload: MultipartUpload, parts: Seq[(Int, String)], headers: Seq[HttpHeader])( implicit ec: ExecutionContext, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala index a13b7f5039..bdbcb4b063 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/MemoryBuffer.scala @@ -6,7 +6,6 @@ package akka.stream.alpakka.s3.impl import akka.annotation.InternalApi import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.stream.scaladsl.Source import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.util.ByteString @@ -46,7 +45,7 @@ import akka.util.ByteString completeStage() } - private def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage()) + private def emit(): Unit = emit(out, MemoryChunk(buffer), () => completeStage()) setHandlers(in, out, this) } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index 411d81059a..7879f9464e 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -645,7 +645,7 @@ import scala.util.{Failure, Success, Try} if (prefix.nonEmpty) { Source(prefix).concat(tail) } else { - Source.single(Chunk(Source.empty, 0)) + Source.single(MemoryChunk(ByteString.empty)) } } @@ -655,7 +655,7 @@ import scala.util.{Failure, Success, Try} case (chunkedPayload, (uploadInfo, chunkIndex)) => //each of the payload requests are created val partRequest = - uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload.data, chunkedPayload.size, headers) + uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload, headers) (partRequest, (uploadInfo, chunkIndex)) } .flatMapConcat { case (req, info) => Signer.signedRequest(req, signingKey).zip(Source.single(info)) } diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala index 8e6cf30117..9efb130dab 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/DiskBufferSpec.scala @@ -43,8 +43,12 @@ class DiskBufferSpec(_system: ActorSystem) result should have size (1) val chunk = result.head + chunk shouldBe a[DiskChunk] + val diskChunk = chunk.asInstanceOf[DiskChunk] chunk.size should be(14) - chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))) + diskChunk.data.runWith(Sink.seq).futureValue should be( + Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) + ) } it should "fail if more than maxSize bytes are fed into it" in { @@ -63,12 +67,14 @@ class DiskBufferSpec(_system: ActorSystem) it should "delete its temp file after N materializations" in { val tmpDir = Files.createTempDirectory("DiskBufferSpec").toFile() val before = tmpDir.list().size - val source = Source(Vector(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))) + val chunk = Source(Vector(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))) .via(new DiskBuffer(2, 200, Some(tmpDir.toPath))) .runWith(Sink.seq) .futureValue .head - .data + + chunk shouldBe a[DiskChunk] + val source = chunk.asInstanceOf[DiskChunk].data tmpDir.list().size should be(before + 1) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index dab45f65ab..f4e0e3b950 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -14,8 +14,8 @@ import akka.http.scaladsl.model.headers.{`Raw-Request-URI`, ByteRange, RawHeader import akka.stream.alpakka.s3.headers.{CannedAcl, ServerSideEncryption, StorageClass} import akka.stream.alpakka.s3._ import akka.stream.alpakka.testkit.scaladsl.LogCapturing -import akka.stream.scaladsl.Source import akka.testkit.{SocketUtil, TestKit, TestProbe} +import akka.util.ByteString import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -272,7 +272,7 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers with ScalaFutures with implicit val settings = getSettings(s3Region = Region.EU_WEST_1).withEndpointUrl("http://localhost:8080") val req = - HttpRequests.uploadPartRequest(multipartUpload, 1, Source.empty, 1) + HttpRequests.uploadPartRequest(multipartUpload, 1, MemoryChunk(ByteString.empty)) req.uri.scheme shouldEqual "http" req.uri.authority.host.address shouldEqual "localhost" @@ -284,7 +284,7 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers with ScalaFutures with val myKey = "my-key" val md5Key = "md5-key" val s3Headers = ServerSideEncryption.customerKeys(myKey).withMd5(md5Key).headersFor(UploadPart) - val req = HttpRequests.uploadPartRequest(multipartUpload, 1, Source.empty, 1, s3Headers) + val req = HttpRequests.uploadPartRequest(multipartUpload, 1, MemoryChunk(ByteString.empty), s3Headers) req.headers should contain(RawHeader("x-amz-server-side-encryption-customer-algorithm", "AES256")) req.headers should contain(RawHeader("x-amz-server-side-encryption-customer-key", myKey)) diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala index c3ba9b3185..849560aeb1 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/MemoryBufferSpec.scala @@ -39,7 +39,9 @@ class MemoryBufferSpec(_system: ActorSystem) result should have size (1) val chunk = result.head chunk.size should be(14) - chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))) + + chunk shouldBe a[MemoryChunk] + chunk.asInstanceOf[MemoryChunk].data should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) } it should "fail if more than maxSize bytes are fed into it" in {