diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 904f3fb5..fa578c52 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -108,21 +108,6 @@ object BuildHelper { "-Xmax-classfile-name", "242" ) ++ std2xOptions ++ optimizerOptions(optimize) - case Some((2, 11)) => - Seq( - "-Ypartial-unification", - "-Yno-adapted-args", - "-Ywarn-inaccessible", - "-Ywarn-infer-any", - "-Ywarn-nullary-override", - "-Ywarn-nullary-unit", - "-Xexperimental", - "-Ywarn-unused-import", - "-Xfuture", - "-Xsource:2.13", - "-Xmax-classfile-name", - "242" - ) ++ std2xOptions case _ => Seq.empty } diff --git a/src/main/scala/zio/s3/Live.scala b/src/main/scala/zio/s3/Live.scala index dd065fca..2c9601ae 100644 --- a/src/main/scala/zio/s3/Live.scala +++ b/src/main/scala/zio/s3/Live.scala @@ -119,12 +119,13 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { options: UploadOptions ): ZIO[R, S3Exception, Unit] = content - .mapChunks(Chunk.single) .mapError(e => S3Exception.builder().message(e.getMessage).cause(e).build()) .refineOrDie { case e: S3Exception => e } - .mapM { chunk => + .mapChunks(c => Chunk(ByteBuffer.wrap(c.toArray))) + .toPublisher + .flatMap { publisher => execute( _.putObject( { @@ -139,11 +140,10 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { .fold(builder)(builder.contentType) .build() }, - AsyncRequestBody.fromBytes(chunk.toArray) + AsyncRequestBody.fromPublisher(publisher) ) ) } - .runCollect .unit def multipartUpload[R]( diff --git a/src/test/scala/zio/s3/S3Test.scala b/src/test/scala/zio/s3/S3Test.scala index f8578c7a..433a5918 100644 --- a/src/test/scala/zio/s3/S3Test.scala +++ b/src/test/scala/zio/s3/S3Test.scala @@ -57,6 +57,13 @@ object InvalidS3LayerTestSpec extends DefaultRunnableSpec { object S3Suite { val bucketName = "bucket-1" + private[this] def randomNEStream(): (Int, ZStream[Any, Nothing, Byte]) = { + val size = PartSize.Min + Random.nextInt(100) + val bytes = new Array[Byte](size) + Random.nextBytes(bytes) + (size, ZStream.fromChunks(Chunk.fromArray(bytes))) + } + def spec(label: String, root: ZPath): Spec[S3 with Blocking, TestFailure[Exception], TestSuccess] = suite(label)( testM("listAllObjects") { @@ -203,9 +210,9 @@ object S3Suite { }, testM("put object") { - val c = Chunk.fromArray("Hello F World".getBytes) + val c = Chunk.fromArray(Random.nextString(65536).getBytes()) val contentLength = c.length.toLong - val data = ZStream.fromChunks(c) + val data = ZStream.fromChunks(c).chunkN(5) val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -242,7 +249,7 @@ object S3Suite { } yield assert(contentLength)(isGreaterThan(0L)) }, testM("multipart with parrallelism = 1") { - val (dataLength, data) = randomNEStream + val (dataLength, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -274,7 +281,7 @@ object S3Suite { } yield assert(contentLength)(equalTo(0L)) }, testM("multipart object when the content type is not provided") { - val (_, data) = randomNEStream + val (_, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -285,7 +292,7 @@ object S3Suite { }, testM("multipart object when there is a content type and metadata") { val metadata = Map("key1" -> "value1") - val (_, data) = randomNEStream + val (_, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -302,7 +309,7 @@ object S3Suite { assert(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v })(equalTo(Map("key1" -> "value1"))) }, testM("multipart object when the chunk size and parallelism are customized") { - val (dataSize, data) = randomNEStream + val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -324,7 +331,7 @@ object S3Suite { } yield assert(succeed)(isFalse) }, testM("put object when the content type is not provided") { - val (dataSize, data) = randomNEStream + val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -335,7 +342,7 @@ object S3Suite { }, testM("put object when there is a content type and metadata") { val _metadata = Map("key1" -> "value1") - val (dataSize, data) = randomNEStream + val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { @@ -352,11 +359,4 @@ object S3Suite { } ) - //TODO remove and use generator - private[this] def randomNEStream = { - val size = PartSize.Min + Random.nextInt(100) - val bytes = new Array[Byte](size) - Random.nextBytes(bytes) - (size, ZStream.fromChunks(Chunk.fromArray(bytes))) - } }