Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Add contentMD5 for putObject to support object locking #394

Merged
merged 2 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions zio-s3/src/main/scala/zio/s3/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package zio.s3
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher }
import software.amazon.awssdk.core.exception.SdkException
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder }
import software.amazon.awssdk.services.s3.model._
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder }
import zio._
import zio.interop.reactivestreams._
import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse }
import zio.s3.S3Bucket.S3BucketListing
import zio.stream.{ Stream, ZSink, ZStream }
import zio.s3.errors._
import zio.s3.errors.syntax._
import zio.stream.{ Stream, ZSink, ZStream }

import java.net.URI
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -114,7 +115,8 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
options: UploadOptions,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit] =
content
.mapErrorCause(_.flatMap(_.asS3Exception()))
Expand All @@ -131,9 +133,15 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 {
.key(key)
.metadata(options.metadata.asJava)
.acl(options.cannedAcl)
options.contentType
.fold(builder)(builder.contentType)
.build()

List(
(b: PutObjectRequest.Builder) =>
options.contentType
.fold(b)(b.contentType),
(b: PutObjectRequest.Builder) =>
contentMD5
.fold(b)(b.contentMD5)
).foldLeft(builder) { case (b, f) => f(b) }.build()
},
AsyncRequestBody.fromPublisher(publisher)
)
Expand Down
21 changes: 18 additions & 3 deletions zio-s3/src/main/scala/zio/s3/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package zio.s3

import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import zio.{ IO, ZIO }
import zio.s3.errors.DecodingException
import zio.s3.S3Bucket.S3BucketListing
import zio.s3.errors.DecodingException
import zio.stream.{ Stream, ZPipeline, ZStream }
import zio.{ IO, ZIO }

import java.nio.charset.CharacterCodingException
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -88,18 +88,33 @@ trait S3 { self =>
/**
* Store data object into a specific bucket
*
* ==Example of creating a contentMD5 option==
*
* The md5 option is required when the target bucket is configured with object locking, otherwise
* the AWS S3 API will not accept the [[putObject]] request.
*
* {{{
* import software.amazon.awssdk.utils.Md5Utils
* import scala.util.Random
*
* val bytes = Random.nextString(65536).getBytes()
* val contentMD5 = Some(Md5Utils.md5AsBase64(bytes))
* }}}
*
* @param bucketName name of the bucket
* @param key unique object identifier
* @param contentLength length of the data in bytes
* @param content object data
* @param contentMD5 a String Option containing the MD5 hash of the content encoded as base64
* @return
*/
def putObject[R](
bucketName: String,
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions = UploadOptions.default
options: UploadOptions = UploadOptions.default,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit]

/**
Expand Down
31 changes: 22 additions & 9 deletions zio-s3/src/main/scala/zio/s3/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@

package zio.s3

import java.io.{ FileInputStream, FileNotFoundException }
import java.nio.file.{ NoSuchFileException, StandardOpenOption }
import java.nio.file.attribute.BasicFileAttributes
import java.util.UUID
import java.util.concurrent.CompletableFuture
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils }
import zio._
import zio.nio.channels.AsynchronousFileChannel
import zio.nio.file.{ Path => ZPath }
import zio.nio.file.Files
import zio.nio.file.{ Files, Path => ZPath }
import zio.s3.S3Bucket._
import zio.stream.{ Stream, ZStream }

import java.io.{ FileInputStream, FileNotFoundException }
import java.nio.charset.StandardCharsets
import java.nio.file.{ NoSuchFileException, StandardOpenOption }
import java.nio.file.attribute.BasicFileAttributes
import java.util.UUID
import java.util.concurrent.CompletableFuture

/**
* Stub Service which is back by a filesystem storage
*/
Expand Down Expand Up @@ -86,9 +88,19 @@ object Test {
(for {
res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String]))
(contentType, metadata) = res
contents <- Files
.readAllBytes(path / bucketName / key)
.catchAll(_ => ZIO.succeed(Chunk.fromArray("".getBytes)))
file <- Files
.readAttributes[BasicFileAttributes](path / bucketName / key)
.map(p => ObjectMetadata(metadata, contentType, p.size()))
.map(p =>
ObjectMetadata(
metadata,
contentType,
p.size(),
BinaryUtils.toHex(Md5Utils.computeMD5Hash(contents.asString(StandardCharsets.UTF_8).getBytes))
)
)
} yield file)
.refineOrDie {
case e: NoSuchFileException => fileNotFound(e)
Expand Down Expand Up @@ -157,7 +169,8 @@ object Test {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
options: UploadOptions,
contentMD5: Option[String] = None
): ZIO[R, S3Exception, Unit] =
(for {
_ <- refDb.update(db =>
Expand Down
7 changes: 4 additions & 3 deletions zio-s3/src/main/scala/zio/s3/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.S3Exception
import zio.nio.file.{ Path => ZPath }
import zio.s3.errors._
import zio.s3.S3Bucket.S3BucketListing
import zio.s3.errors._
import zio.s3.providers.const
import zio.stream.ZStream

Expand Down Expand Up @@ -108,9 +108,10 @@ package object s3 {
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions = UploadOptions.default
options: UploadOptions = UploadOptions.default,
contentMD5: Option[String] = None
): ZIO[S3 with R, S3Exception, Unit] =
ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options))
ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options, contentMD5))

/**
* Same as multipartUpload with default parallelism = 1
Expand Down
19 changes: 15 additions & 4 deletions zio-s3/src/main/scala/zio/s3/s3model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package zio.s3

import java.time.Instant

import software.amazon.awssdk.services.s3.model.{ Bucket, HeadObjectResponse, ListObjectsV2Response }
import zio.Chunk

import java.time.Instant
import scala.jdk.CollectionConverters._

final case class S3Bucket(name: String, creationDate: Instant)
Expand Down Expand Up @@ -69,11 +68,23 @@ final case class S3ObjectSummary(bucketName: String, key: String, lastModified:
* @param metadata the user-defined metadata without the "x-amz-meta-" prefix
* @param contentType the content type of the object (application/json, application/zip, text/plain, ...)
* @param contentLength the size of the object in bytes
* @param eTag the etag for the response as hex string
*/
final case class ObjectMetadata(metadata: Map[String, String], contentType: String, contentLength: Long)
final case class ObjectMetadata(
metadata: Map[String, String],
contentType: String,
contentLength: Long,
eTag: String
)

object ObjectMetadata {

def fromResponse(r: HeadObjectResponse): ObjectMetadata =
ObjectMetadata(r.metadata().asScala.toMap, r.contentType(), r.contentLength())
ObjectMetadata(
r.metadata().asScala.toMap,
r.contentType(),
r.contentLength(),
// ETag is including quotes
r.eTag().replace("\"", "")
)
}
46 changes: 41 additions & 5 deletions zio-s3/src/test/scala/zio/s3/S3Test.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package zio.s3

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials

import java.net.URI
import java.util.UUID
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.{ ObjectCannedACL, S3Exception }
import software.amazon.awssdk.utils.{ BinaryUtils, Md5Utils }
import zio.nio.file.{ Path => ZPath }
import zio.stream.{ ZPipeline, ZStream }
import zio.test.Assertion._
import zio.test.TestAspect.sequential
import zio.test._
import zio.{ Chunk, Scope, ZLayer }

import java.net.URI
import java.util.UUID
import scala.util.Random

object S3LiveSpec extends ZIOSpecDefault {
Expand Down Expand Up @@ -211,13 +211,20 @@ object S3Suite {

},
test("put object") {
val c = Chunk.fromArray(Random.nextString(65536).getBytes())
val bytes = Random.nextString(65536).getBytes()
val c = Chunk.fromArray(bytes)
val contentLength = c.length.toLong
val data = ZStream.fromChunks(c).rechunk(5)
val tmpKey = Random.alphanumeric.take(10).mkString

for {
_ <- putObject(bucketName, tmpKey, contentLength, data)
_ <- putObject(
bucketName,
tmpKey,
contentLength,
data,
UploadOptions.default
)
objectContentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <*
deleteObject(bucketName, tmpKey)
} yield assertTrue(objectContentLength == contentLength)
Expand Down Expand Up @@ -357,6 +364,35 @@ object S3Suite {
objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey)
} yield assertTrue(objectMetadata.contentType == "application/json") &&
assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1"))
},
test("put object when there is a contentMD5 option, content type and metadata") {
val bytes = Random.nextString(65536).getBytes()
val _metadata = Map("key1" -> "value1")
val md5Base64 = Md5Utils.md5AsBase64(bytes)
val c = Chunk.fromArray(bytes)
val contentLength = c.length.toLong
val data = ZStream.fromChunks(c).rechunk(5)
val tmpKey = Random.alphanumeric.take(10).mkString

for {
_ <- putObject(
bucketName,
tmpKey,
contentLength,
data,
UploadOptions.from(_metadata, "application/json"),
Some(md5Base64)
)
metadata <- getObjectMetadata(bucketName, tmpKey) <*
deleteObject(bucketName, tmpKey)
actualMD5 = BinaryUtils.toBase64(BinaryUtils.fromHex(metadata.eTag))
} yield assertTrue(
metadata.contentLength == contentLength,
actualMD5 == md5Base64,
metadata.contentType == "application/json",
metadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")
)

}
) @@ sequential

Expand Down