Skip to content

Commit

Permalink
255 - put back publisher with ByteBuffer for put content (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray authored Sep 13, 2021
1 parent fc68d50 commit 26c357d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 34 deletions.
15 changes: 0 additions & 15 deletions project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,6 @@ object BuildHelper {
"-Xmax-classfile-name",
"242"
) ++ std2xOptions ++ optimizerOptions(optimize)
case Some((2, 11)) =>
Seq(
"-Ypartial-unification",
"-Yno-adapted-args",
"-Ywarn-inaccessible",
"-Ywarn-infer-any",
"-Ywarn-nullary-override",
"-Ywarn-nullary-unit",
"-Xexperimental",
"-Ywarn-unused-import",
"-Xfuture",
"-Xsource:2.13",
"-Xmax-classfile-name",
"242"
) ++ std2xOptions
case _ => Seq.empty
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/zio/s3/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {
options: UploadOptions
): ZIO[R, S3Exception, Unit] =
content
.mapChunks(Chunk.single)
.mapError(e => S3Exception.builder().message(e.getMessage).cause(e).build())
.refineOrDie {
case e: S3Exception => e
}
.mapM { chunk =>
.mapChunks(c => Chunk(ByteBuffer.wrap(c.toArray)))
.toPublisher
.flatMap { publisher =>
execute(
_.putObject(
{
Expand All @@ -139,11 +140,10 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {
.fold(builder)(builder.contentType)
.build()
},
AsyncRequestBody.fromBytes(chunk.toArray)
AsyncRequestBody.fromPublisher(publisher)
)
)
}
.runCollect
.unit

def multipartUpload[R](
Expand Down
30 changes: 15 additions & 15 deletions src/test/scala/zio/s3/S3Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ object InvalidS3LayerTestSpec extends DefaultRunnableSpec {
object S3Suite {
val bucketName = "bucket-1"

private[this] def randomNEStream(): (Int, ZStream[Any, Nothing, Byte]) = {
val size = PartSize.Min + Random.nextInt(100)
val bytes = new Array[Byte](size)
Random.nextBytes(bytes)
(size, ZStream.fromChunks(Chunk.fromArray(bytes)))
}

def spec(label: String, root: ZPath): Spec[S3 with Blocking, TestFailure[Exception], TestSuccess] =
suite(label)(
testM("listAllObjects") {
Expand Down Expand Up @@ -203,9 +210,9 @@ object S3Suite {

},
testM("put object") {
val c = Chunk.fromArray("Hello F World".getBytes)
val c = Chunk.fromArray(Random.nextString(65536).getBytes())
val contentLength = c.length.toLong
val data = ZStream.fromChunks(c)
val data = ZStream.fromChunks(c).chunkN(5)
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand Down Expand Up @@ -242,7 +249,7 @@ object S3Suite {
} yield assert(contentLength)(isGreaterThan(0L))
},
testM("multipart with parrallelism = 1") {
val (dataLength, data) = randomNEStream
val (dataLength, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand Down Expand Up @@ -274,7 +281,7 @@ object S3Suite {
} yield assert(contentLength)(equalTo(0L))
},
testM("multipart object when the content type is not provided") {
val (_, data) = randomNEStream
val (_, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand All @@ -285,7 +292,7 @@ object S3Suite {
},
testM("multipart object when there is a content type and metadata") {
val metadata = Map("key1" -> "value1")
val (_, data) = randomNEStream
val (_, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand All @@ -302,7 +309,7 @@ object S3Suite {
assert(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v })(equalTo(Map("key1" -> "value1")))
},
testM("multipart object when the chunk size and parallelism are customized") {
val (dataSize, data) = randomNEStream
val (dataSize, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand All @@ -324,7 +331,7 @@ object S3Suite {
} yield assert(succeed)(isFalse)
},
testM("put object when the content type is not provided") {
val (dataSize, data) = randomNEStream
val (dataSize, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand All @@ -335,7 +342,7 @@ object S3Suite {
},
testM("put object when there is a content type and metadata") {
val _metadata = Map("key1" -> "value1")
val (dataSize, data) = randomNEStream
val (dataSize, data) = randomNEStream()
val tmpKey = Random.alphanumeric.take(10).mkString

for {
Expand All @@ -352,11 +359,4 @@ object S3Suite {
}
)

//TODO remove and use generator
private[this] def randomNEStream = {
val size = PartSize.Min + Random.nextInt(100)
val bytes = new Array[Byte](size)
Random.nextBytes(bytes)
(size, ZStream.fromChunks(Chunk.fromArray(bytes)))
}
}

0 comments on commit 26c357d

Please sign in to comment.