From 248dd2551f40cbf4a0d13f7b7a77740b18ae0662 Mon Sep 17 00:00:00 2001 From: Thomas Ploch Date: Thu, 26 Jan 2023 10:59:00 +0000 Subject: [PATCH] [FEATURE] Add contentMD5 for `putObject` to support object locking ### Motivation We are hitting an error when using this library because our bucket is configured with [object locking](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html): ``` Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: S3, Status Code: 400, Request ID: XXX, Extended Request ID: XXX) ``` The AWS S3 API requires passing the `Content-MD5` option for `putObject` requests that target buckets with object locking enabled. From the [`putObject` API description](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html): > To ensure that data is not corrupted traversing the network, use the Content-MD5 header. When you use this header, Amazon S3 checks the object against the provided MD5 value and, if they do not match, returns an error. Additionally, you can calculate the MD5 while putting an object to Amazon S3 and compare the returned ETag to the calculated MD5 value. > The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview in the Amazon S3 User Guide. This MR adds a BC support for passing an `Option[String]` as contentMD5, which is a base64 encoded MD5 hash. The library users will be responsible to provide a correct contentMD5. --- zio-s3/src/main/scala/zio/s3/Live.scala | 20 +++++++--- zio-s3/src/main/scala/zio/s3/S3.scala | 21 ++++++++-- zio-s3/src/main/scala/zio/s3/Test.scala | 31 ++++++++++----- zio-s3/src/main/scala/zio/s3/package.scala | 7 ++-- zio-s3/src/main/scala/zio/s3/s3model.scala | 19 +++++++-- zio-s3/src/test/scala/zio/s3/S3Test.scala | 46 +++++++++++++++++++--- 6 files changed, 113 insertions(+), 31 deletions(-) diff --git a/zio-s3/src/main/scala/zio/s3/Live.scala b/zio-s3/src/main/scala/zio/s3/Live.scala index 14ffd095..2e12a99d 100644 --- a/zio-s3/src/main/scala/zio/s3/Live.scala +++ b/zio-s3/src/main/scala/zio/s3/Live.scala @@ -19,15 +19,16 @@ package zio.s3 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher } import software.amazon.awssdk.core.exception.SdkException -import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder } import software.amazon.awssdk.services.s3.model._ +import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder } import zio._ import zio.interop.reactivestreams._ import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse } import zio.s3.S3Bucket.S3BucketListing -import zio.stream.{ Stream, ZSink, ZStream } import zio.s3.errors._ import zio.s3.errors.syntax._ +import zio.stream.{ Stream, ZSink, ZStream } + import java.net.URI import java.nio.ByteBuffer import java.util.concurrent.CompletableFuture @@ -114,7 +115,8 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions + options: UploadOptions, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] = content .mapErrorCause(_.flatMap(_.asS3Exception())) @@ -131,9 +133,15 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { .key(key) .metadata(options.metadata.asJava) .acl(options.cannedAcl) - options.contentType - .fold(builder)(builder.contentType) - .build() + + List( + (b: PutObjectRequest.Builder) => + options.contentType + .fold(b)(b.contentType), + (b: PutObjectRequest.Builder) => + contentMD5 + .fold(b)(b.contentMD5) + ).foldLeft(builder) { case (b, f) => f(b) }.build() }, AsyncRequestBody.fromPublisher(publisher) ) diff --git a/zio-s3/src/main/scala/zio/s3/S3.scala b/zio-s3/src/main/scala/zio/s3/S3.scala index ea50ae7d..9ff2f3e7 100644 --- a/zio-s3/src/main/scala/zio/s3/S3.scala +++ b/zio-s3/src/main/scala/zio/s3/S3.scala @@ -2,10 +2,10 @@ package zio.s3 import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception -import zio.{ IO, ZIO } -import zio.s3.errors.DecodingException import zio.s3.S3Bucket.S3BucketListing +import zio.s3.errors.DecodingException import zio.stream.{ Stream, ZPipeline, ZStream } +import zio.{ IO, ZIO } import java.nio.charset.CharacterCodingException import java.util.concurrent.CompletableFuture @@ -88,10 +88,24 @@ trait S3 { self => /** * Store data object into a specific bucket * + * ==Example of creating a contentMD5 option== + * + * The md5 option is required when the target bucket is configured with object locking, otherwise + * the AWS S3 API will not accept the [[putObject]] request. + * + * {{{ + * import software.amazon.awssdk.utils.Md5Utils + * import scala.util.Random + * + * val bytes = Random.nextString(65536).getBytes() + * val contentMD5 = Some(Md5Utils.md5AsBase64(bytes)) + * }}} + * * @param bucketName name of the bucket * @param key unique object identifier * @param contentLength length of the data in bytes * @param content object data + * @param contentMD5 [[Option]] of [[String]] containing the MD5 hash of the content encoded as base64 * @return */ def putObject[R]( @@ -99,7 +113,8 @@ trait S3 { self => key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions = UploadOptions.default + options: UploadOptions = UploadOptions.default, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] /** diff --git a/zio-s3/src/main/scala/zio/s3/Test.scala b/zio-s3/src/main/scala/zio/s3/Test.scala index 2b9830aa..4fc57b95 100644 --- a/zio-s3/src/main/scala/zio/s3/Test.scala +++ b/zio-s3/src/main/scala/zio/s3/Test.scala @@ -16,21 +16,21 @@ package zio.s3 -import java.io.FileInputStream -import java.nio.file.StandardOpenOption -import java.nio.file.attribute.BasicFileAttributes -import java.util.UUID -import java.util.concurrent.CompletableFuture import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception +import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils } import zio._ import zio.nio.channels.AsynchronousFileChannel -import zio.nio.file.{ Path => ZPath } -import zio.nio.file.Files +import zio.nio.file.{ Files, Path => ZPath } import zio.s3.S3Bucket._ import zio.stream.{ Stream, ZStream } -import java.io.FileNotFoundException +import java.io.{ FileInputStream, FileNotFoundException } +import java.nio.charset.StandardCharsets +import java.nio.file.StandardOpenOption +import java.nio.file.attribute.BasicFileAttributes +import java.util.UUID +import java.util.concurrent.CompletableFuture /** * Stub Service which is back by a filesystem storage @@ -88,9 +88,19 @@ object Test { (for { res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) (contentType, metadata) = res + contents <- Files + .readAllBytes(path / bucketName / key) + .catchAll(_ => ZIO.succeed(Chunk.fromArray("".getBytes))) file <- Files .readAttributes[BasicFileAttributes](path / bucketName / key) - .map(p => ObjectMetadata(metadata, contentType, p.size())) + .map(p => + ObjectMetadata( + metadata, + contentType, + p.size(), + BinaryUtils.toHex(Md5Utils.computeMD5Hash(contents.asString(StandardCharsets.UTF_8).getBytes)) + ) + ) } yield file).orDie override def listObjects( @@ -156,7 +166,8 @@ object Test { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions + options: UploadOptions, + contentMD5: Option[String] = None ): ZIO[R, S3Exception, Unit] = (for { _ <- refDb.update(db => diff --git a/zio-s3/src/main/scala/zio/s3/package.scala b/zio-s3/src/main/scala/zio/s3/package.scala index 27b6c9ff..abe14f5c 100644 --- a/zio-s3/src/main/scala/zio/s3/package.scala +++ b/zio-s3/src/main/scala/zio/s3/package.scala @@ -21,8 +21,8 @@ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception import zio.nio.file.{ Path => ZPath } -import zio.s3.errors._ import zio.s3.S3Bucket.S3BucketListing +import zio.s3.errors._ import zio.s3.providers.const import zio.stream.ZStream @@ -108,9 +108,10 @@ package object s3 { key: String, contentLength: Long, content: ZStream[R, Throwable, Byte], - options: UploadOptions = UploadOptions.default + options: UploadOptions = UploadOptions.default, + contentMD5: Option[String] = None ): ZIO[S3 with R, S3Exception, Unit] = - ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options)) + ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options, contentMD5)) /** * Same as multipartUpload with default parallelism = 1 diff --git a/zio-s3/src/main/scala/zio/s3/s3model.scala b/zio-s3/src/main/scala/zio/s3/s3model.scala index 643a79f2..fe86a531 100644 --- a/zio-s3/src/main/scala/zio/s3/s3model.scala +++ b/zio-s3/src/main/scala/zio/s3/s3model.scala @@ -16,11 +16,10 @@ package zio.s3 -import java.time.Instant - import software.amazon.awssdk.services.s3.model.{ Bucket, HeadObjectResponse, ListObjectsV2Response } import zio.Chunk +import java.time.Instant import scala.jdk.CollectionConverters._ final case class S3Bucket(name: String, creationDate: Instant) @@ -69,11 +68,23 @@ final case class S3ObjectSummary(bucketName: String, key: String, lastModified: * @param metadata the user-defined metadata without the "x-amz-meta-" prefix * @param contentType the content type of the object (application/json, application/zip, text/plain, ...) * @param contentLength the size of the object in bytes + * @param eTag the etag for the response as hex string */ -final case class ObjectMetadata(metadata: Map[String, String], contentType: String, contentLength: Long) +final case class ObjectMetadata( + metadata: Map[String, String], + contentType: String, + contentLength: Long, + eTag: String +) object ObjectMetadata { def fromResponse(r: HeadObjectResponse): ObjectMetadata = - ObjectMetadata(r.metadata().asScala.toMap, r.contentType(), r.contentLength()) + ObjectMetadata( + r.metadata().asScala.toMap, + r.contentType(), + r.contentLength(), + // ETag is including quotes + r.eTag().replace("\"", "") + ) } diff --git a/zio-s3/src/test/scala/zio/s3/S3Test.scala b/zio-s3/src/test/scala/zio/s3/S3Test.scala index d08f6357..fa33466b 100644 --- a/zio-s3/src/test/scala/zio/s3/S3Test.scala +++ b/zio-s3/src/test/scala/zio/s3/S3Test.scala @@ -1,11 +1,9 @@ package zio.s3 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials - -import java.net.URI -import java.util.UUID import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.{ ObjectCannedACL, S3Exception } +import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils } import zio.nio.file.{ Path => ZPath } import zio.stream.{ ZPipeline, ZStream } import zio.test.Assertion._ @@ -13,6 +11,8 @@ import zio.test.TestAspect.sequential import zio.test._ import zio.{ Chunk, Scope, ZLayer } +import java.net.URI +import java.util.UUID import scala.util.Random object S3LiveSpec extends ZIOSpecDefault { @@ -204,13 +204,20 @@ object S3Suite { }, test("put object") { - val c = Chunk.fromArray(Random.nextString(65536).getBytes()) + val bytes = Random.nextString(65536).getBytes() + val c = Chunk.fromArray(bytes) val contentLength = c.length.toLong val data = ZStream.fromChunks(c).rechunk(5) val tmpKey = Random.alphanumeric.take(10).mkString for { - _ <- putObject(bucketName, tmpKey, contentLength, data) + _ <- putObject( + bucketName, + tmpKey, + contentLength, + data, + UploadOptions.default + ) objectContentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* deleteObject(bucketName, tmpKey) } yield assertTrue(objectContentLength == contentLength) @@ -350,6 +357,35 @@ object S3Suite { objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey) } yield assertTrue(objectMetadata.contentType == "application/json") && assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")) + }, + test("put object when there is a contentMD5 option, content type and metadata") { + val bytes = Random.nextString(65536).getBytes() + val _metadata = Map("key1" -> "value1") + val md5Base64 = Md5Utils.md5AsBase64(bytes) + val c = Chunk.fromArray(bytes) + val contentLength = c.length.toLong + val data = ZStream.fromChunks(c).rechunk(5) + val tmpKey = Random.alphanumeric.take(10).mkString + + for { + _ <- putObject( + bucketName, + tmpKey, + contentLength, + data, + UploadOptions.from(_metadata, "application/json"), + Some(md5Base64) + ) + metadata <- getObjectMetadata(bucketName, tmpKey) <* + deleteObject(bucketName, tmpKey) + actualMD5 = BinaryUtils.toBase64(BinaryUtils.fromHex(metadata.eTag)) + } yield assertTrue( + metadata.contentLength == contentLength, + actualMD5 == md5Base64, + metadata.contentType == "application/json", + metadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1") + ) + } ) @@ sequential