Skip to content

Commit

Permalink
[FEATURE] Add contentMD5 for putObject to support object locking
Browse files Browse the repository at this point in the history
### Motivation

We are hitting an error when using this library because our bucket is configured with
[object locking](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html):

```
Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters
(Service: S3, Status Code: 400, Request ID: XXX, Extended Request ID: XXX)
```

The AWS S3 API requires passing the `Content-MD5` option for `putObject` requests that
target buckets with object locking enabled.

From the [`putObject` API description](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html):

> To ensure that data is not corrupted traversing the network, use the Content-MD5 header.
  When you use this header, Amazon S3 checks the object against the provided MD5 value and,
  if they do not match, returns an error. Additionally, you can calculate the MD5 while
  putting an object to Amazon S3 and compare the returned ETag to the calculated MD5 value.

> The Content-MD5 header is required for any request to upload an object with a retention
  period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object
  Lock, see Amazon S3 Object Lock Overview in the Amazon S3 User Guide.

This MR adds a BC support for passing an `Option[String]` as contentMD5, which is a base64
encoded MD5 hash. The library users will be responsible to provide a correct contentMD5.
  • Loading branch information
tPl0ch committed Jan 26, 2023
1 parent 18f6493 commit 248dd25
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 31 deletions.
20 changes: 14 additions & 6 deletions zio-s3/src/main/scala/zio/s3/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package zio.s3
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher }
import software.amazon.awssdk.core.exception.SdkException
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder }
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder }
import zio._
import zio.interop.reactivestreams._
import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse }
import zio.s3.S3Bucket.S3BucketListing
import zio.stream.{ Stream, ZSink, ZStream }
import zio.s3.errors._
import zio.s3.errors.syntax._
import zio.stream.{ Stream, ZSink, ZStream }

import java.net.URI
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -114,7 +115,8 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
options: UploadOptions,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit] =
content
.mapErrorCause(_.flatMap(_.asS3Exception()))
Expand All @@ -131,9 +133,15 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
.key(key)
.metadata(options.metadata.asJava)
.acl(options.cannedAcl)
options.contentType
.fold(builder)(builder.contentType)
.build()

List(
(b: PutObjectRequest.Builder) =>
options.contentType
.fold(b)(b.contentType),
(b: PutObjectRequest.Builder) =>
contentMD5
.fold(b)(b.contentMD5)
).foldLeft(builder) { case (b, f) => f(b) }.build()
},
AsyncRequestBody.fromPublisher(publisher)
)
Expand Down
21 changes: 18 additions & 3 deletions zio-s3/src/main/scala/zio/s3/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package zio.s3

import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import zio.{ IO, ZIO }
import zio.s3.errors.DecodingException
import zio.s3.S3Bucket.S3BucketListing
import zio.s3.errors.DecodingException
import zio.stream.{ Stream, ZPipeline, ZStream }
import zio.{ IO, ZIO }

import java.nio.charset.CharacterCodingException
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -88,18 +88,33 @@ trait S3 { self =>
/**
* Store data object into a specific bucket
*
* ==Example of creating a contentMD5 option==
*
* The md5 option is required when the target bucket is configured with object locking, otherwise
* the AWS S3 API will not accept the [[putObject]] request.
*
* {{{
* import software.amazon.awssdk.utils.Md5Utils
* import scala.util.Random
*
* val bytes = Random.nextString(65536).getBytes()
* val contentMD5 = Some(Md5Utils.md5AsBase64(bytes))
* }}}
*
* @param bucketName name of the bucket
* @param key unique object identifier
* @param contentLength length of the data in bytes
* @param content object data
* @param contentMD5 [[Option]] of [[String]] containing the MD5 hash of the content encoded as base64
* @return
*/
def putObject[R](
bucketName: String,
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions = UploadOptions.default
options: UploadOptions = UploadOptions.default,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit]

/**
Expand Down
31 changes: 21 additions & 10 deletions zio-s3/src/main/scala/zio/s3/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

package zio.s3

import java.io.FileInputStream
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.util.UUID
import java.util.concurrent.CompletableFuture
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils }
import zio._
import zio.nio.channels.AsynchronousFileChannel
import zio.nio.file.{ Path => ZPath }
import zio.nio.file.Files
import zio.nio.file.{ Files, Path => ZPath }
import zio.s3.S3Bucket._
import zio.stream.{ Stream, ZStream }

import java.io.FileNotFoundException
import java.io.{ FileInputStream, FileNotFoundException }
import java.nio.charset.StandardCharsets
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.util.UUID
import java.util.concurrent.CompletableFuture

/**
* Stub Service which is back by a filesystem storage
Expand Down Expand Up @@ -88,9 +88,19 @@ object Test {
(for {
res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String]))
(contentType, metadata) = res
contents <- Files
.readAllBytes(path / bucketName / key)
.catchAll(_ => ZIO.succeed(Chunk.fromArray("".getBytes)))
file <- Files
.readAttributes[BasicFileAttributes](path / bucketName / key)
.map(p => ObjectMetadata(metadata, contentType, p.size()))
.map(p =>
ObjectMetadata(
metadata,
contentType,
p.size(),
BinaryUtils.toHex(Md5Utils.computeMD5Hash(contents.asString(StandardCharsets.UTF_8).getBytes))
)
)
} yield file).orDie

override def listObjects(
Expand Down Expand Up @@ -156,7 +166,8 @@ object Test {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
options: UploadOptions,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit] =
(for {
_ <- refDb.update(db =>
Expand Down
7 changes: 4 additions & 3 deletions zio-s3/src/main/scala/zio/s3/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import zio.nio.file.{ Path => ZPath }
import zio.s3.errors._
import zio.s3.S3Bucket.S3BucketListing
import zio.s3.errors._
import zio.s3.providers.const
import zio.stream.ZStream

Expand Down Expand Up @@ -108,9 +108,10 @@ package object s3 {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions = UploadOptions.default
options: UploadOptions = UploadOptions.default,
contentMD5: Option[String] = None
): ZIO[S3 with R, S3Exception, Unit] =
ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options))
ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options, contentMD5))

/**
* Same as multipartUpload with default parallelism = 1
Expand Down
19 changes: 15 additions & 4 deletions zio-s3/src/main/scala/zio/s3/s3model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package zio.s3

import java.time.Instant

import software.amazon.awssdk.services.s3.model.{ Bucket, HeadObjectResponse, ListObjectsV2Response }
import zio.Chunk

import java.time.Instant
import scala.jdk.CollectionConverters._

final case class S3Bucket(name: String, creationDate: Instant)
Expand Down Expand Up @@ -69,11 +68,23 @@ final case class S3ObjectSummary(bucketName: String, key: String, lastModified:
* @param metadata the user-defined metadata without the "x-amz-meta-" prefix
* @param contentType the content type of the object (application/json, application/zip, text/plain, ...)
* @param contentLength the size of the object in bytes
* @param eTag the etag for the response as hex string
*/
final case class ObjectMetadata(metadata: Map[String, String], contentType: String, contentLength: Long)
final case class ObjectMetadata(
metadata: Map[String, String],
contentType: String,
contentLength: Long,
eTag: String
)

object ObjectMetadata {

def fromResponse(r: HeadObjectResponse): ObjectMetadata =
ObjectMetadata(r.metadata().asScala.toMap, r.contentType(), r.contentLength())
ObjectMetadata(
r.metadata().asScala.toMap,
r.contentType(),
r.contentLength(),
// ETag is including quotes
r.eTag().replace("\"", "")
)
}
46 changes: 41 additions & 5 deletions zio-s3/src/test/scala/zio/s3/S3Test.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package zio.s3

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials

import java.net.URI
import java.util.UUID
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.{ ObjectCannedACL, S3Exception }
import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils }
import zio.nio.file.{ Path => ZPath }
import zio.stream.{ ZPipeline, ZStream }
import zio.test.Assertion._
import zio.test.TestAspect.sequential
import zio.test._
import zio.{ Chunk, Scope, ZLayer }

import java.net.URI
import java.util.UUID
import scala.util.Random

object S3LiveSpec extends ZIOSpecDefault {
Expand Down Expand Up @@ -204,13 +204,20 @@ object S3Suite {

},
test("put object") {
val c = Chunk.fromArray(Random.nextString(65536).getBytes())
val bytes = Random.nextString(65536).getBytes()
val c = Chunk.fromArray(bytes)
val contentLength = c.length.toLong
val data = ZStream.fromChunks(c).rechunk(5)
val tmpKey = Random.alphanumeric.take(10).mkString

for {
_ <- putObject(bucketName, tmpKey, contentLength, data)
_ <- putObject(
bucketName,
tmpKey,
contentLength,
data,
UploadOptions.default
)
objectContentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <*
deleteObject(bucketName, tmpKey)
} yield assertTrue(objectContentLength == contentLength)
Expand Down Expand Up @@ -350,6 +357,35 @@ object S3Suite {
objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey)
} yield assertTrue(objectMetadata.contentType == "application/json") &&
assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1"))
},
test("put object when there is a contentMD5 option, content type and metadata") {
val bytes = Random.nextString(65536).getBytes()
val _metadata = Map("key1" -> "value1")
val md5Base64 = Md5Utils.md5AsBase64(bytes)
val c = Chunk.fromArray(bytes)
val contentLength = c.length.toLong
val data = ZStream.fromChunks(c).rechunk(5)
val tmpKey = Random.alphanumeric.take(10).mkString

for {
_ <- putObject(
bucketName,
tmpKey,
contentLength,
data,
UploadOptions.from(_metadata, "application/json"),
Some(md5Base64)
)
metadata <- getObjectMetadata(bucketName, tmpKey) <*
deleteObject(bucketName, tmpKey)
actualMD5 = BinaryUtils.toBase64(BinaryUtils.fromHex(metadata.eTag))
} yield assertTrue(
metadata.contentLength == contentLength,
actualMD5 == md5Base64,
metadata.contentType == "application/json",
metadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")
)

}
) @@ sequential

Expand Down

0 comments on commit 248dd25

Please sign in to comment.