diff --git a/zio-s3/src/main/scala/zio/s3/Live.scala b/zio-s3/src/main/scala/zio/s3/Live.scala index 14ffd095..2e12a99d 100644 --- a/zio-s3/src/main/scala/zio/s3/Live.scala +++ b/zio-s3/src/main/scala/zio/s3/Live.scala @@ -19,15 +19,16 @@ package zio.s3 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher } import software.amazon.awssdk.core.exception.SdkException -import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder } import software.amazon.awssdk.services.s3.model._ +import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder } import zio._ import zio.interop.reactivestreams._ import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse } import zio.s3.S3Bucket.S3BucketListing -import zio.stream.{ Stream, ZSink, ZStream } import zio.s3.errors._ import zio.s3.errors.syntax._ +import zio.stream.{ Stream, ZSink, ZStream } + import java.net.URI import java.nio.ByteBuffer import java.util.concurrent.CompletableFuture @@ -114,7 +115,8 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions + options: UploadOptions, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] = content .mapErrorCause(_.flatMap(_.asS3Exception())) @@ -131,9 +133,15 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { .key(key) .metadata(options.metadata.asJava) .acl(options.cannedAcl) - options.contentType - .fold(builder)(builder.contentType) - .build() + + List( + (b: PutObjectRequest.Builder) => + options.contentType + .fold(b)(b.contentType), + (b: PutObjectRequest.Builder) => + contentMD5 + .fold(b)(b.contentMD5) + ).foldLeft(builder) { case (b, f) => f(b) }.build() }, AsyncRequestBody.fromPublisher(publisher) ) diff --git a/zio-s3/src/main/scala/zio/s3/S3.scala b/zio-s3/src/main/scala/zio/s3/S3.scala index ea50ae7d..f5a5cd94 100644 --- a/zio-s3/src/main/scala/zio/s3/S3.scala +++ b/zio-s3/src/main/scala/zio/s3/S3.scala @@ -2,10 +2,10 @@ package zio.s3 import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception -import zio.{ IO, ZIO } -import zio.s3.errors.DecodingException import zio.s3.S3Bucket.S3BucketListing +import zio.s3.errors.DecodingException import zio.stream.{ Stream, ZPipeline, ZStream } +import zio.{ IO, ZIO } import java.nio.charset.CharacterCodingException import java.util.concurrent.CompletableFuture @@ -88,10 +88,24 @@ trait S3 { self => /** * Store data object into a specific bucket * + * ==Example of creating a contentMD5 option== + * + * The md5 option is required when the target bucket is configured with object locking, otherwise + * the AWS S3 API will not accept the [[putObject]] request. + * + * {{{ + * import software.amazon.awssdk.utils.Md5Utils + * import scala.util.Random + * + * val bytes = Random.nextString(65536).getBytes() + * val contentMD5 = Some(Md5Utils.md5AsBase64(bytes)) + * }}} + * * @param bucketName name of the bucket * @param key unique object identifier * @param contentLength length of the data in bytes * @param content object data + * @param contentMD5 a String Option containing the MD5 hash of the content encoded as base64 * @return */ def putObject[R]( @@ -99,7 +113,8 @@ trait S3 { self => key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions = UploadOptions.default + options: UploadOptions = UploadOptions.default, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] /** diff --git a/zio-s3/src/main/scala/zio/s3/Test.scala b/zio-s3/src/main/scala/zio/s3/Test.scala index 088d5d72..f2b5b3c2 100644 --- a/zio-s3/src/main/scala/zio/s3/Test.scala +++ b/zio-s3/src/main/scala/zio/s3/Test.scala @@ -16,20 +16,22 @@ package zio.s3 -import java.io.{ FileInputStream, FileNotFoundException } -import java.nio.file.{ NoSuchFileException, StandardOpenOption } -import java.nio.file.attribute.BasicFileAttributes -import java.util.UUID -import java.util.concurrent.CompletableFuture import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception +import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils } import zio._ import zio.nio.channels.AsynchronousFileChannel -import zio.nio.file.{ Path => ZPath } -import zio.nio.file.Files +import zio.nio.file.{ Files, Path => ZPath } import zio.s3.S3Bucket._ import zio.stream.{ Stream, ZStream } +import java.io.{ FileInputStream, FileNotFoundException } +import java.nio.charset.StandardCharsets +import java.nio.file.{ NoSuchFileException, StandardOpenOption } +import java.nio.file.attribute.BasicFileAttributes +import java.util.UUID +import java.util.concurrent.CompletableFuture + /** * Stub Service which is back by a filesystem storage */ @@ -86,9 +88,19 @@ object Test { (for { res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) (contentType, metadata) = res + contents <- Files + .readAllBytes(path / bucketName / key) + .catchAll(_ => ZIO.succeed(Chunk.fromArray("".getBytes))) file <- Files .readAttributes[BasicFileAttributes](path / bucketName / key) - .map(p => ObjectMetadata(metadata, contentType, p.size())) + .map(p => + ObjectMetadata( + metadata, + contentType, + p.size(), + BinaryUtils.toHex(Md5Utils.computeMD5Hash(contents.asString(StandardCharsets.UTF_8).getBytes)) + ) + ) } yield file) .refineOrDie { case e: NoSuchFileException => fileNotFound(e) @@ -157,7 +169,8 @@ object Test { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions + options: UploadOptions, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] = (for { _ <- refDb.update(db => diff --git a/zio-s3/src/main/scala/zio/s3/package.scala b/zio-s3/src/main/scala/zio/s3/package.scala index 27b6c9ff..abe14f5c 100644 --- a/zio-s3/src/main/scala/zio/s3/package.scala +++ b/zio-s3/src/main/scala/zio/s3/package.scala @@ -21,8 +21,8 @@ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception import zio.nio.file.{ Path => ZPath } -import zio.s3.errors._ import zio.s3.S3Bucket.S3BucketListing +import zio.s3.errors._ import zio.s3.providers.const import zio.stream.ZStream @@ -108,9 +108,10 @@ package object s3 { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions = UploadOptions.default + options: UploadOptions = UploadOptions.default, + contentMD5: Option[String] = None ): ZIO[S3 with R, S3Exception, Unit] = - ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options)) + ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options, contentMD5)) /** * Same as multipartUpload with default parallelism = 1 diff --git a/zio-s3/src/main/scala/zio/s3/s3model.scala b/zio-s3/src/main/scala/zio/s3/s3model.scala index 643a79f2..fe86a531 100644 --- a/zio-s3/src/main/scala/zio/s3/s3model.scala +++ b/zio-s3/src/main/scala/zio/s3/s3model.scala @@ -16,11 +16,10 @@ package zio.s3 -import java.time.Instant - import software.amazon.awssdk.services.s3.model.{ Bucket, HeadObjectResponse, ListObjectsV2Response } import zio.Chunk +import java.time.Instant import scala.jdk.CollectionConverters._ final case class S3Bucket(name: String, creationDate: Instant) @@ -69,11 +68,23 @@ final case class S3ObjectSummary(bucketName: String, key: String, lastModified: * @param metadata the user-defined metadata without the "x-amz-meta-" prefix * @param contentType the content type of the object (application/json, application/zip, text/plain, ...) * @param contentLength the size of the object in bytes + * @param eTag the etag for the response as hex string */ -final case class ObjectMetadata(metadata: Map[String, String], contentType: String, contentLength: Long) +final case class ObjectMetadata( + metadata: Map[String, String], + contentType: String, + contentLength: Long, + eTag: String +) object ObjectMetadata { def fromResponse(r: HeadObjectResponse): ObjectMetadata = - ObjectMetadata(r.metadata().asScala.toMap, r.contentType(), r.contentLength()) + ObjectMetadata( + r.metadata().asScala.toMap, + r.contentType(), + r.contentLength(), + // ETag is including quotes + r.eTag().replace("\"", "") + ) } diff --git a/zio-s3/src/test/scala/zio/s3/S3Test.scala b/zio-s3/src/test/scala/zio/s3/S3Test.scala index a385e8df..07805ee5 100644 --- a/zio-s3/src/test/scala/zio/s3/S3Test.scala +++ b/zio-s3/src/test/scala/zio/s3/S3Test.scala @@ -1,11 +1,9 @@ package zio.s3 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials - -import java.net.URI -import java.util.UUID import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.{ ObjectCannedACL, S3Exception } +import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils } import zio.nio.file.{ Path => ZPath } import zio.stream.{ ZPipeline, ZStream } import zio.test.Assertion._ @@ -13,6 +11,8 @@ import zio.test.TestAspect.sequential import zio.test._ import zio.{ Chunk, Scope, ZLayer } +import java.net.URI +import java.util.UUID import scala.util.Random object S3LiveSpec extends ZIOSpecDefault { @@ -211,13 +211,20 @@ object S3Suite { }, test("put object") { - val c = Chunk.fromArray(Random.nextString(65536).getBytes()) + val bytes = Random.nextString(65536).getBytes() + val c = Chunk.fromArray(bytes) val contentLength = c.length.toLong val data = ZStream.fromChunks(c).rechunk(5) val tmpKey = Random.alphanumeric.take(10).mkString for { - _ <- putObject(bucketName, tmpKey, contentLength, data) + _ <- putObject( + bucketName, + tmpKey, + contentLength, + data, + UploadOptions.default + ) objectContentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* deleteObject(bucketName, tmpKey) } yield assertTrue(objectContentLength == contentLength) @@ -357,6 +364,35 @@ object S3Suite { objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey) } yield assertTrue(objectMetadata.contentType == "application/json") && assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")) + }, + test("put object when there is a contentMD5 option, content type and metadata") { + val bytes = Random.nextString(65536).getBytes() + val _metadata = Map("key1" -> "value1") + val md5Base64 = Md5Utils.md5AsBase64(bytes) + val c = Chunk.fromArray(bytes) + val contentLength = c.length.toLong + val data = ZStream.fromChunks(c).rechunk(5) + val tmpKey = Random.alphanumeric.take(10).mkString + + for { + _ <- putObject( + bucketName, + tmpKey, + contentLength, + data, + UploadOptions.from(_metadata, "application/json"), + Some(md5Base64) + ) + metadata <- getObjectMetadata(bucketName, tmpKey) <* + deleteObject(bucketName, tmpKey) + actualMD5 = BinaryUtils.toBase64(BinaryUtils.fromHex(metadata.eTag)) + } yield assertTrue( + metadata.contentLength == contentLength, + actualMD5 == md5Base64, + metadata.contentType == "application/json", + metadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1") + ) + } ) @@ sequential