From 25892624d69d9e50408a4d9bf7c14c7aa6b3ab59 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Thu, 23 Jun 2022 11:14:26 +0900 Subject: [PATCH 1/4] Migrate to ZIO 2.0.0-RC6 (#311) * Migrate to ZIO 2.0.0-RC6 --- build.sbt | 6 +- docker-compose.yml | 10 +- docs/quickstart/index.md | 28 +- minio/{data/bucket-1 => export}/console.log | 0 .../{data/bucket-1 => export}/dir1/hello.txt | 0 minio/{data/bucket-1 => export}/dir1/user.csv | 0 src/main/scala/zio/s3/Live.scala | 31 ++- src/main/scala/zio/s3/S3.scala | 172 ++++++++++++ src/main/scala/zio/s3/Test.scala | 65 ++--- src/main/scala/zio/s3/errors.scala | 5 + src/main/scala/zio/s3/package.scala | 244 +++--------------- src/main/scala/zio/s3/providers.scala | 77 +++--- src/main/scala/zio/s3/s3model.scala | 2 +- src/main/scala/zio/s3/settings.scala | 2 +- src/test/scala/zio/s3/S3LayerTest.scala | 14 +- src/test/scala/zio/s3/S3ProvidersTest.scala | 84 +++--- src/test/scala/zio/s3/S3SettingsTest.scala | 18 +- src/test/scala/zio/s3/S3Test.scala | 239 +++++++++-------- 18 files changed, 488 insertions(+), 509 deletions(-) rename minio/{data/bucket-1 => export}/console.log (100%) rename minio/{data/bucket-1 => export}/dir1/hello.txt (100%) rename minio/{data/bucket-1 => export}/dir1/user.csv (100%) create mode 100644 src/main/scala/zio/s3/S3.scala diff --git a/build.sbt b/build.sbt index d1457a9f..315a79a5 100644 --- a/build.sbt +++ b/build.sbt @@ -22,7 +22,7 @@ inThisBuild( addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") -val zioVersion = "1.0.13" +val zioVersion = "2.0.0-RC6" val awsVersion = "2.16.61" lazy val `zio-s3` = project @@ -35,8 +35,8 @@ lazy val `zio-s3` = project libraryDependencies ++= Seq( "dev.zio" %% "zio" % zioVersion, "dev.zio" %% "zio-streams" % zioVersion, - "dev.zio" %% "zio-nio" % "1.0.0-RC12", - "dev.zio" %% "zio-interop-reactivestreams" % "1.3.12", + "dev.zio" %% "zio-nio" % "2.0.0-RC7", + "dev.zio" %% "zio-interop-reactivestreams" % "2.0.0-RC7", "software.amazon.awssdk" % "s3" % awsVersion, "software.amazon.awssdk" % "sts" % awsVersion, "dev.zio" %% "zio-test" % zioVersion % Test, diff --git a/docker-compose.yml b/docker-compose.yml index 1417812e..fedb7d7c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,8 @@ services: mc: image: minio/mc + volumes: + - ./minio/export:/export depends_on: - minio environment: @@ -28,13 +30,11 @@ services: entrypoint: > /bin/sh -c " echo Waiting for minio service to start...; - apk add --no-cache curl; - while ! curl -I 'http://minio:9000/minio/health/live' 2>&1 | grep -w '200\|301'; - do - sleep 10; - done; + curl --retry 10 --retry-delay 10 -s -o /dev/null http://minio:9000/minio/health/live + echo Minio is started; /usr/bin/mc config host add my-minio http://minio:9000 $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY}; /usr/bin/mc mb -p my-minio/bucket-1; + /usr/bin/mc mirror export/ my-minio/bucket-1; " \ No newline at end of file diff --git a/docs/quickstart/index.md b/docs/quickstart/index.md index b451a77f..2ebb2e1e 100644 --- a/docs/quickstart/index.md +++ b/docs/quickstart/index.md @@ -20,7 +20,7 @@ ZIO-S3 is a thin wrapper over the s3 async java client. It exposes the main oper ```scala import software.amazon.awssdk.auth.credentials.AwsBasicCredentials -import zio.{Chunk, ZManaged} +import zio.Chunk import zio.s3._ import zio.stream.{ZSink, ZStream} import software.amazon.awssdk.services.s3.model.S3Exception @@ -32,7 +32,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception // list all objects of all buckets val l2: ZStream[S3, S3Exception, String] = (for { - bucket <- ZStream.fromIterableM(listBuckets) + bucket <- ZStream.fromIterableZIO(listBuckets) obj <- listAllObjects(bucket.name) } yield obj.bucketName + "/" + obj.key).provideLayer( live("us-east-1", AwsBasicCredentials.create("accessKeyId", "secretAccessKey")) @@ -51,7 +51,6 @@ If credentials cannot be found in one or multiple providers selected the operati ```scala import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import zio._ -import zio.blocking._ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.S3Exception import zio.s3._ @@ -63,17 +62,17 @@ val s3: Layer[S3Exception, S3] = // build S3 Layer from System properties or Environment variables val s3: Layer[S3Exception, S3] = - liveM(Region.AF_SOUTH_1, system <> env) + liveZIO(Region.AF_SOUTH_1, system <> env) // build S3 Layer from Instance profile credentials val s3: Layer[S3Exception, S3] = - liveM(Region.AF_SOUTH_1, instanceProfile) + liveZIO(Region.AF_SOUTH_1, instanceProfile) // build S3 Layer from web identity token credentials with STS. awssdk sts module required to be on classpath -val s3: Layer[S3Exception, S3] = liveM(Region.AF_SOUTH_1, webIdentity) +val s3: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, webIdentity) // build S3 Layer from default available credentials providers -val s3: Layer[S3Exception, S3] = liveM(Region.AF_SOUTH_1, default) +val s3: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, default) // use custom logic to fetch aws credentials val zcredentials: ZIO[R, S3Exception, AwsCredentials] = ??? // specific implementation to fetch credentials @@ -90,20 +89,16 @@ a stub implementation of s3 storage is provided for testing purpose and use inte ```scala import zio.nio.core.file.{Path => ZPath} import zio.s3._ -import zio.blocking.Blocking -// required to provide a Blocking context -val stub: ZLayer[Blocking, Any, S3] = stub(ZPath("/tmp/s3-data")) - -// use a Blocking context to build s3 Layer -val stubS3: ZLayer[Any, Any, S3] = Blocking.live >>> stub(ZPath("/tmp/s3-data")) +// build s3 Layer +val stubS3: ZLayer[Any, Nothing, S3] = stub(ZPath("/tmp/s3-data")) // list all buckets available by using S3 Stub Layer // will list all directories of `/tmp/s3-data` listBuckets.provideLayer(stubS3) ``` -More informations here how to use [ZLayer https://zio.dev/docs/howto/howto_use_layers](https://zio.dev/docs/howto/howto_use_layers) +More information here on how to use [ZLayer https://zio.dev/docs/howto/howto_use_layers](https://zio.dev/docs/howto/howto_use_layers) Examples @@ -112,7 +107,6 @@ Examples ```scala import software.amazon.awssdk.services.s3.model.S3Exception import zio._ -import zio.blocking.Blocking import zio.stream.{ ZSink, ZStream } import zio.s3._ @@ -131,7 +125,7 @@ import java.io.FileInputStream import java.nio.file.Paths val is = ZStream.fromInputStream(new FileInputStream(Paths.get("/my/path/to/myfile.zip").toFile)) -val proc2: ZIO[S3 with Blocking, S3Exception, Unit] = +val proc2: ZIO[S3, S3Exception, Unit] = multipartUpload( "bucket-1", "upload/myfile.zip", @@ -143,7 +137,7 @@ val proc2: ZIO[S3 with Blocking, S3Exception, Unit] = import java.io.OutputStream val os: OutputStream = ??? -val proc3: ZIO[Blocking with S3, Exception, Long] = getObject("bucket-1", "upload/myfile.zip").run(ZSink.fromOutputStream(os)) +val proc3: ZIO[S3, Exception, Long] = getObject("bucket-1", "upload/myfile.zip").run(ZSink.fromOutputStream(os)) ``` Support any commands ? diff --git a/minio/data/bucket-1/console.log b/minio/export/console.log similarity index 100% rename from minio/data/bucket-1/console.log rename to minio/export/console.log diff --git a/minio/data/bucket-1/dir1/hello.txt b/minio/export/dir1/hello.txt similarity index 100% rename from minio/data/bucket-1/dir1/hello.txt rename to minio/export/dir1/hello.txt diff --git a/minio/data/bucket-1/dir1/user.csv b/minio/export/dir1/user.csv similarity index 100% rename from minio/data/bucket-1/dir1/user.csv rename to minio/export/dir1/user.csv diff --git a/src/main/scala/zio/s3/Live.scala b/src/main/scala/zio/s3/Live.scala index 70757e43..e8de5322 100644 --- a/src/main/scala/zio/s3/Live.scala +++ b/src/main/scala/zio/s3/Live.scala @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._ * * @param unsafeClient: Amazon Async S3 Client */ -final class Live(unsafeClient: S3AsyncClient) extends S3.Service { +final class Live(unsafeClient: S3AsyncClient) extends S3 { override def createBucket(bucketName: String): IO[S3Exception, Unit] = execute(_.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())).unit @@ -47,9 +47,9 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] = execute(_.headBucket(HeadBucketRequest.builder().bucket(bucketName).build())) - .map(_ => true) + .as(true) .catchSome { - case _: NoSuchBucketException => Task.succeed(false) + case _: NoSuchBucketException => ZIO.succeed(false) } override val listBuckets: IO[S3Exception, S3BucketListing] = @@ -58,7 +58,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] = ZStream - .fromEffect( + .fromZIO( execute( _.getObject[StreamResponse]( GetObjectRequest.builder().bucket(bucketName).key(key).build(), @@ -176,9 +176,9 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { ).map(_.uploadId()) parts <- ZStream - .managed( + .scoped[R]( content - .chunkN(options.partSize) + .rechunk(options.partSize) .mapChunks(Chunk.single) .peel(ZSink.head[Chunk[Byte]]) ) @@ -187,7 +187,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service { case (None, _) => ZStream(Chunk.empty) } .zipWithIndex - .mapMPar(parallelism) { + .mapZIOPar(parallelism) { case (chunk, partNumber) => execute( _.uploadPart( @@ -233,12 +233,12 @@ object Live { def connect[R]( region: S3Region, - provider: RManaged[R, AwsCredentialsProvider], + provider: RIO[R with Scope, AwsCredentialsProvider], uriEndpoint: Option[URI] - ): ZManaged[R, ConnectionError, S3.Service] = + ): ZIO[R with Scope, ConnectionError, S3] = for { credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause)) - builder <- ZManaged.succeed { + builder <- ZIO.succeed { val builder = S3AsyncClient .builder() .credentialsProvider(credentials) @@ -249,11 +249,10 @@ object Live { service <- connect(builder) } yield service - def connect[R](builder: S3AsyncClientBuilder): ZManaged[R, ConnectionError, S3.Service] = - ZManaged - .fromAutoCloseable(Task(builder.build())) - .map(new Live(_)) - .mapError(e => ConnectionError(e.getMessage, e.getCause)) + def connect[R](builder: S3AsyncClientBuilder): ZIO[R with Scope, ConnectionError, S3] = + ZIO + .fromAutoCloseable(ZIO.attempt(builder.build())) + .mapBoth(e => ConnectionError(e.getMessage, e.getCause), new Live(_)) type StreamResponse = ZStream[Any, Throwable, Chunk[Byte]] @@ -264,7 +263,7 @@ object Live { override def onResponse(response: GetObjectResponse): Unit = () override def onStream(publisher: SdkPublisher[ByteBuffer]): Unit = { - cf.complete(publisher.toStream().map(Chunk.fromByteBuffer)) + cf.complete(publisher.toZIOStream().map(Chunk.fromByteBuffer)) () } diff --git a/src/main/scala/zio/s3/S3.scala b/src/main/scala/zio/s3/S3.scala new file mode 100644 index 00000000..ce4f3945 --- /dev/null +++ b/src/main/scala/zio/s3/S3.scala @@ -0,0 +1,172 @@ +package zio.s3 + +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.model.S3Exception +import zio.{ IO, ZIO } +import zio.s3.S3Bucket.S3BucketListing +import zio.stream.{ Stream, ZPipeline, ZStream } + +import java.nio.charset.CharacterCodingException +import java.util.concurrent.CompletableFuture + +/** + * The `S3` module provides access to a s3 amazon storage. + * All operations are async since we are relying on the amazon async client + */ +trait S3 { self => + + /** + * Create a bucket + * + * @param bucketName name of the bucket + */ + def createBucket(bucketName: String): IO[S3Exception, Unit] + + /** + * Delete bucket, the operation fail if bucket is not present + * + * @param bucketName name of the bucket + */ + def deleteBucket(bucketName: String): IO[S3Exception, Unit] + + /** + * Check if bucket exists + * + * @param bucketName name of the bucket + */ + def isBucketExists(bucketName: String): IO[S3Exception, Boolean] + + /** + * List all available buckets + */ + val listBuckets: IO[S3Exception, S3BucketListing] + + /** + * delete an object from a bucket, if not present it will succeed + * + * @param bucketName name of the bucket + * @param key object identifier to remove + */ + def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] + + /** + * Read an object from a bucket, the operation fail if object is not present + * + * @param bucketName name of the bucket + * @param key object identifier to read + * @return + */ + def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] + + /** + * Retrieves metadata from an object without returning the object itself. + * This operation is useful if you're only interested in an object's metadata. + * @param bucketName name of the bucket + * @param key object identifier to read + * @return the [[ObjectMetadata]] + */ + def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] + + /** + * list all object for a specific bucket + * + * @param bucketName name of the bucket + */ + def listObjects(bucketName: String): IO[S3Exception, S3ObjectListing] = + listObjects(bucketName, ListObjectOptions.default) + + def listObjects(bucketName: String, options: ListObjectOptions): IO[S3Exception, S3ObjectListing] + + /** + * Fetch the next object listing from a specific object listing. + * + * @param listing listing to use as a start + */ + def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] + + /** + * Store data object into a specific bucket + * + * @param bucketName name of the bucket + * @param key unique object identifier + * @param contentLength length of the data in bytes + * @param content object data + * @return + */ + def putObject[R]( + bucketName: String, + key: String, + contentLength: Long, + content: ZStream[R, Throwable, Byte], + options: UploadOptions = UploadOptions.default + ): ZIO[R, S3Exception, Unit] + + /** + * * + * + * Store data object into a specific bucket, minimum size of the data is 5 Mb to use multipart upload (restriction from amazon API) + * + * @param bucketName name of the bucket + * @param key unique object identifier + * @param content object data + * @param options the optional configurations of the multipart upload + * @param parallelism the number of parallel requests to upload chunks + */ + def multipartUpload[R]( + bucketName: String, + key: String, + content: ZStream[R, Throwable, Byte], + options: MultipartUploadOptions = MultipartUploadOptions.default + )(parallelism: Int): ZIO[R, S3Exception, Unit] + + /** + * Read an object by lines + * + * @param bucketName name of the bucket + * @param key: unique key of the object + */ + def streamLines(bucketName: String, key: String): Stream[S3Exception, String] = + (self.getObject(bucketName, key) >>> ZPipeline.utf8Decode >>> ZPipeline.splitLines).refineOrDie { + case ex: S3Exception => ex + case ex: CharacterCodingException => DecodingException(ex) + } + + /** + * List all descendant objects of a bucket + * Fetch all objects recursively of all nested directory by traversing all of them + * + * @param bucketName name of the bucket + * + * MaxKeys have a default value to 1000 elements + */ + def listAllObjects(bucketName: String): Stream[S3Exception, S3ObjectSummary] = + listAllObjects(bucketName, ListObjectOptions.default) + + def listAllObjects(bucketName: String, options: ListObjectOptions): Stream[S3Exception, S3ObjectSummary] = + ZStream + .fromZIO(self.listObjects(bucketName, options)) + .flatMap( + paginate(_).mapConcat(_.objectSummaries) + ) + + /** + * List all objects by traversing all nested directories + * + * @param initialListing object listing to start with + * @return + */ + def paginate(initialListing: S3ObjectListing): Stream[S3Exception, S3ObjectListing] = + ZStream.paginateZIO(initialListing) { + case current @ S3ObjectListing(_, _, _, _, None, _) => ZIO.succeed(current -> None) + case current => self.getNextObjects(current).map(next => current -> Some(next)) + } + + /** + * * + * expose safely amazon s3 async client + * + * @param f call any operations on s3 async client + * @tparam T value type to return + */ + def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] +} diff --git a/src/main/scala/zio/s3/Test.scala b/src/main/scala/zio/s3/Test.scala index 02d98082..cf13ee28 100644 --- a/src/main/scala/zio/s3/Test.scala +++ b/src/main/scala/zio/s3/Test.scala @@ -24,8 +24,7 @@ import java.util.concurrent.CompletableFuture import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception import zio._ -import zio.blocking.Blocking -import zio.nio.channels.{ AsynchronousFileChannel } +import zio.nio.channels.AsynchronousFileChannel import zio.nio.file.{ Path => ZPath } import zio.nio.file.Files import zio.s3.S3Bucket._ @@ -47,56 +46,53 @@ object Test { .build() .asInstanceOf[S3Exception] - def connect(path: ZPath): Blocking => S3.Service = { blocking => + def connect(path: ZPath): S3 = { type ContentType = String type Metadata = Map[String, String] - new S3.Service { + new S3 { private val refDb: Ref[Map[String, (ContentType, Metadata)]] = Ref.unsafeMake(Map.empty[String, (ContentType, Metadata)]) override def createBucket(bucketName: String): IO[S3Exception, Unit] = - Files.createDirectory(path / bucketName).orDie.provide(blocking) + Files.createDirectory(path / bucketName).orDie override def deleteBucket(bucketName: String): IO[S3Exception, Unit] = - Files.delete(path / bucketName).orDie.provide(blocking) + Files.delete(path / bucketName).orDie override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] = - Files.exists(path / bucketName).provide(blocking) + Files.exists(path / bucketName) override val listBuckets: IO[S3Exception, S3BucketListing] = Files .list(path) - .filterM(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory)) - .mapM { p => + .filterZIO(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory)) + .mapZIO { p => Files .readAttributes[BasicFileAttributes](p) .map(attr => S3Bucket(p.filename.toString, attr.creationTime().toInstant)) } .runCollect .orDie - .provide(blocking) override def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] = - Files.deleteIfExists(path / bucketName / key).orDie.provide(blocking).unit + Files.deleteIfExists(path / bucketName / key).orDie.unit override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] = ZStream - .managed(ZManaged.fromAutoCloseable(Task(new FileInputStream((path / bucketName / key).toFile)))) + .scoped(ZIO.fromAutoCloseable(ZIO.attempt(new FileInputStream((path / bucketName / key).toFile)))) .flatMap(ZStream.fromInputStream(_, 2048)) .refineOrDie { case e: FileNotFoundException => fileNotFound(e) } - .provide(blocking) override def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] = (for { - (contentType, metadata) <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) - - file <- Files - .readAttributes[BasicFileAttributes](path / bucketName / key) - .map(p => ObjectMetadata(metadata, contentType, p.size())) - .provide(blocking) + res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) + (contentType, metadata) = res + file <- Files + .readAttributes[BasicFileAttributes](path / bucketName / key) + .map(p => ObjectMetadata(metadata, contentType, p.size())) } yield file).orDie override def listObjects( @@ -110,7 +106,7 @@ object Test { case (p, _) => options.prefix.fold(true)(p.filename.toString().startsWith) } - .mapM(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p)) + .mapZIO(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p)) .filter { case (attr, _) => attr.isRegularFile } .map { case (attr, f) => @@ -150,7 +146,6 @@ object Test { S3ObjectListing(bucketName, options.delimiter, options.starAfter, list, None, None) } .orDie - .provide(blocking) override def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] = listing.nextContinuationToken match { @@ -172,21 +167,27 @@ object Test { ) filePath = path / bucketName / key _ <- filePath.parent - .map(parentPath => Files.createDirectories(parentPath).provide(blocking)) + .map(parentPath => Files.createDirectories(parentPath)) .getOrElse(ZIO.unit) - _ <- - AsynchronousFileChannel - .open(filePath, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE) - .use(channel => - content - .mapChunks(Chunk.succeed) - .foldM(0L) { case (pos, c) => channel.writeChunk(c, pos).map(_ => pos + c.length) } - ) + _ <- ZIO.scoped[R]( + AsynchronousFileChannel + .open( + filePath, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.CREATE + ) + .flatMap(channel => + content + .mapChunks(Chunk.succeed) + .runFoldZIO(0L) { case (pos, c) => channel.writeChunk(c, pos).as(pos + c.length) } + ) + ) } yield ()).orDie override def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] = - IO.dieMessage("Not implemented error - please don't call execute() S3 Test mode") + ZIO.dieMessage("Not implemented error - please don't call execute() S3 Test mode") override def multipartUpload[R]( bucketName: String, @@ -208,7 +209,7 @@ object Test { bucketName, key, 0, - content.chunkN(options.partSize), + content.rechunk(options.partSize), options.uploadOptions.copy(contentType = _contentType) ) } yield () diff --git a/src/main/scala/zio/s3/errors.scala b/src/main/scala/zio/s3/errors.scala index d1ea8e4f..e183af47 100644 --- a/src/main/scala/zio/s3/errors.scala +++ b/src/main/scala/zio/s3/errors.scala @@ -19,6 +19,8 @@ package zio.s3 import software.amazon.awssdk.core.exception.SdkException import software.amazon.awssdk.services.s3.model.S3Exception +import java.nio.charset.CharacterCodingException + final case class SdkError(error: SdkException) extends S3Exception(S3Exception.builder().message(error.getMessage).cause(error)) @@ -30,3 +32,6 @@ final case class ConnectionError(message: String, cause: Throwable) extends S3Exception(S3Exception.builder().message(message)) final case class InvalidPartSize(message: String, size: Int) extends S3Exception(S3Exception.builder().message(message)) + +final case class DecodingException(cause: CharacterCodingException) + extends S3Exception(S3Exception.builder().cause(cause)) diff --git a/src/main/scala/zio/s3/package.scala b/src/main/scala/zio/s3/package.scala index d397b46b..4606f928 100644 --- a/src/main/scala/zio/s3/package.scala +++ b/src/main/scala/zio/s3/package.scala @@ -17,250 +17,78 @@ package zio import software.amazon.awssdk.auth.credentials.{ AwsCredentials, AwsCredentialsProvider } - -import java.net.URI -import java.util.concurrent.CompletableFuture import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.S3Exception -import zio.blocking.Blocking import zio.nio.file.{ Path => ZPath } import zio.s3.S3Bucket.S3BucketListing import zio.s3.providers.const -import zio.stream.{ Stream, ZStream, ZTransducer } +import zio.stream.ZStream + +import java.net.URI +import java.util.concurrent.CompletableFuture package object s3 { - type S3 = Has[S3.Service] - type Settings = Has[S3Settings] type S3Stream[A] = ZStream[S3, S3Exception, A] - /** - * The `S3` module provides access to a s3 amazon storage. - * All operations are async since we are relying on the amazon async client - */ - object S3 { - - trait Service { self => - - /** - * Create a bucket - * - * @param bucketName name of the bucket - */ - def createBucket(bucketName: String): IO[S3Exception, Unit] - - /** - * Delete bucket, the operation fail if bucket is not present - * - * @param bucketName name of the bucket - */ - def deleteBucket(bucketName: String): IO[S3Exception, Unit] - - /** - * Check if bucket exists - * - * @param bucketName name of the bucket - */ - def isBucketExists(bucketName: String): IO[S3Exception, Boolean] - - /** - * List all available buckets - */ - val listBuckets: IO[S3Exception, S3BucketListing] - - /** - * delete an object from a bucket, if not present it will succeed - * - * @param bucketName name of the bucket - * @param key object identifier to remove - */ - def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] - - /** - * Read an object from a bucket, the operation fail if object is not present - * - * @param bucketName name of the bucket - * @param key object identifier to read - * @return - */ - def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] - - /** - * Retrieves metadata from an object without returning the object itself. - * This operation is useful if you're only interested in an object's metadata. - * @param bucketName name of the bucket - * @param key object identifier to read - * @return the [[ObjectMetadata]] - */ - def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] - - /** - * list all object for a specific bucket - * - * @param bucketName name of the bucket - * @param prefix filter all object key by the prefix, default value is an empty string - * @param maxKeys max total number of objects, default value is 1000 elements - */ - def listObjects(bucketName: String): IO[S3Exception, S3ObjectListing] = - listObjects(bucketName, ListObjectOptions.default) - - def listObjects(bucketName: String, options: ListObjectOptions): IO[S3Exception, S3ObjectListing] - - /** - * Fetch the next object listing from a specific object listing. - * - * @param listing listing to use as a start - */ - def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] - - /** - * Store data object into a specific bucket - * - * @param bucketName name of the bucket - * @param key unique object identifier - * @param contentLength length of the data in bytes - * @param content object data - * @return - */ - def putObject[R]( - bucketName: String, - key: String, - contentLength: Long, - content: ZStream[R, Throwable, Byte], - options: UploadOptions = UploadOptions.default - ): ZIO[R, S3Exception, Unit] - - /** - * * - * - * Store data object into a specific bucket, minimun size of the data is 5 Mb to use multipartt upload (restriction from amazon API) - * - * @param bucketName name of the bucket - * @param key unique object identifier - * @param content object data - * @param options the optional configurations of the multipart upload - * @param parallelism the number of parallel requests to upload chunks - */ - def multipartUpload[R]( - bucketName: String, - key: String, - content: ZStream[R, Throwable, Byte], - options: MultipartUploadOptions = MultipartUploadOptions.default - )(parallelism: Int): ZIO[R, S3Exception, Unit] - - /** - * Read an object by lines - * - * @param bucketName name of the bucket - * @param key: unique key of the object - */ - def streamLines(bucketName: String, key: String): Stream[S3Exception, String] = - self - .getObject(bucketName, key) - .transduce(ZTransducer.utf8Decode >>> ZTransducer.splitLines) - - /** - * List all descendant objects of a bucket - * Fetch all objects recursively of all nested directory by traversing all of them - * - * @param bucketName name of the bucket - * @param prefix filter all object identifier which start with this `prefix` - * - * MaxKeys have a default value to 1000 elements - */ - def listAllObjects(bucketName: String): Stream[S3Exception, S3ObjectSummary] = - listAllObjects(bucketName, ListObjectOptions.default) - - def listAllObjects(bucketName: String, options: ListObjectOptions): Stream[S3Exception, S3ObjectSummary] = - ZStream - .fromEffect(self.listObjects(bucketName, options)) - .flatMap( - paginate(_).mapConcat(_.objectSummaries) - ) - - /** - * List all objects by traversing all nested directories - * - * @param initialListing object listing to start with - * @return - */ - def paginate(initialListing: S3ObjectListing): Stream[S3Exception, S3ObjectListing] = - ZStream.paginateM(initialListing) { - case current @ S3ObjectListing(_, _, _, _, None, _) => ZIO.succeed(current -> None) - case current => self.getNextObjects(current).map(next => current -> Some(next)) - } - - /** - * * - * expose safely amazon s3 async client - * - * @param f call any operations on s3 async client - * @tparam T value type to return - */ - def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] - } - } - def live(region: Region, credentials: AwsCredentials, uriEndpoint: Option[URI] = None): Layer[S3Exception, S3] = - liveM(region, const(credentials.accessKeyId, credentials.secretAccessKey), uriEndpoint) + liveZIO(region, const(credentials.accessKeyId, credentials.secretAccessKey), uriEndpoint) - def liveM[R]( + def liveZIO[R]( region: Region, - provider: RManaged[R, AwsCredentialsProvider], + provider: RIO[R with Scope, AwsCredentialsProvider], uriEndpoint: Option[URI] = None ): ZLayer[R, S3Exception, S3] = - ZLayer.fromManaged( - ZManaged - .fromEffect(ZIO.fromEither(S3Region.from(region))) - .flatMap(Live.connect(_, provider, uriEndpoint)) + ZLayer.scoped[R]( + ZIO + .fromEither(S3Region.from(region)) + .flatMap(Live.connect[R](_, provider, uriEndpoint)) ) - def settings[R](region: Region, cred: ZIO[R, S3Exception, AwsCredentials]): ZLayer[R, S3Exception, Settings] = - ZLayer.fromEffect(cred.flatMap(S3Settings.from(region, _))) + def settings[R](region: Region, cred: ZIO[R, S3Exception, AwsCredentials]): ZLayer[R, S3Exception, S3Settings] = + ZLayer(cred.flatMap(S3Settings.from(region, _))) - val live: ZLayer[Settings, ConnectionError, S3] = ZLayer.fromFunctionManaged(s => - Live.connect( - s.get.s3Region, - const(s.get.credentials.accessKeyId, s.get.credentials.secretAccessKey), - None + val live: ZLayer[S3Settings, ConnectionError, S3] = ZLayer.scoped( + ZIO.serviceWithZIO[S3Settings](s => + Live.connect(s.s3Region, const(s.credentials.accessKeyId, s.credentials.secretAccessKey), None) ) ) - def stub(path: ZPath): ZLayer[Blocking, Nothing, S3] = - ZLayer.fromFunction(Test.connect(path)) + def stub(path: ZPath): ZLayer[Any, Nothing, S3] = + ZLayer.succeed(Test.connect(path)) def listAllObjects(bucketName: String): S3Stream[S3ObjectSummary] = - ZStream.accessStream[S3](_.get.listAllObjects(bucketName)) + ZStream.serviceWithStream[S3](_.listAllObjects(bucketName)) def listAllObjects(bucketName: String, options: ListObjectOptions): S3Stream[S3ObjectSummary] = - ZStream.accessStream[S3](_.get.listAllObjects(bucketName, options)) + ZStream.serviceWithStream[S3](_.listAllObjects(bucketName, options)) def paginate(initialListing: S3ObjectListing): S3Stream[S3ObjectListing] = - ZStream.accessStream[S3](_.get.paginate(initialListing)) + ZStream.serviceWithStream[S3](_.paginate(initialListing)) def streamLines(bucketName: String, key: String): S3Stream[String] = - ZStream.accessStream[S3](_.get.streamLines(bucketName, key)) + ZStream.serviceWithStream[S3](_.streamLines(bucketName, key)) def createBucket(bucketName: String): ZIO[S3, S3Exception, Unit] = - ZIO.accessM(_.get.createBucket(bucketName)) + ZIO.serviceWithZIO(_.createBucket(bucketName)) def deleteBucket(bucketName: String): ZIO[S3, S3Exception, Unit] = - ZIO.accessM(_.get.deleteBucket(bucketName)) + ZIO.serviceWithZIO(_.deleteBucket(bucketName)) def isBucketExists(bucketName: String): ZIO[S3, S3Exception, Boolean] = - ZIO.accessM(_.get.isBucketExists(bucketName)) + ZIO.serviceWithZIO(_.isBucketExists(bucketName)) val listBuckets: ZIO[S3, S3Exception, S3BucketListing] = - ZIO.accessM(_.get.listBuckets) + ZIO.serviceWithZIO(_.listBuckets) def deleteObject(bucketName: String, key: String): ZIO[S3, S3Exception, Unit] = - ZIO.accessM(_.get.deleteObject(bucketName, key)) + ZIO.serviceWithZIO(_.deleteObject(bucketName, key)) def getObject(bucketName: String, key: String): ZStream[S3, S3Exception, Byte] = - ZStream.accessStream(_.get.getObject(bucketName, key)) + ZStream.serviceWithStream(_.getObject(bucketName, key)) def getObjectMetadata(bucketName: String, key: String): ZIO[S3, S3Exception, ObjectMetadata] = - ZIO.accessM(_.get.getObjectMetadata(bucketName, key)) + ZIO.serviceWithZIO(_.getObjectMetadata(bucketName, key)) /** * Same as listObjects with default values for an empty prefix and sets the maximum number of object max to `1000` @@ -268,13 +96,13 @@ package object s3 { * @param bucketName name of the bucket */ def listObjects(bucketName: String): ZIO[S3, S3Exception, S3ObjectListing] = - ZIO.accessM(_.get.listObjects(bucketName)) + ZIO.serviceWithZIO(_.listObjects(bucketName)) def listObjects(bucketName: String, options: ListObjectOptions): ZIO[S3, S3Exception, S3ObjectListing] = - ZIO.accessM(_.get.listObjects(bucketName, options)) + ZIO.serviceWithZIO(_.listObjects(bucketName, options)) def getNextObjects(listing: S3ObjectListing): ZIO[S3, S3Exception, S3ObjectListing] = - ZIO.accessM(_.get.getNextObjects(listing)) + ZIO.serviceWithZIO(_.getNextObjects(listing)) def putObject[R]( bucketName: String, @@ -283,7 +111,7 @@ package object s3 { content: ZStream[R, Throwable, Byte], options: UploadOptions = UploadOptions.default ): ZIO[S3 with R, S3Exception, Unit] = - ZIO.accessM[S3 with R](_.get.putObject(bucketName, key, contentLength, content, options)) + ZIO.serviceWithZIO[S3](_.putObject(bucketName, key, contentLength, content, options)) /** * Same as multipartUpload with default parallelism = 1 @@ -299,10 +127,8 @@ package object s3 { content: ZStream[R, Throwable, Byte], options: MultipartUploadOptions = MultipartUploadOptions.default )(parallelism: Int): ZIO[S3 with R, S3Exception, Unit] = - ZIO.accessM[S3 with R]( - _.get.multipartUpload(bucketName, key, content, options)(parallelism) - ) + ZIO.serviceWithZIO[S3](_.multipartUpload(bucketName, key, content, options)(parallelism)) def execute[T](f: S3AsyncClient => CompletableFuture[T]): ZIO[S3, S3Exception, T] = - ZIO.accessM(_.get.execute(f)) + ZIO.serviceWithZIO(_.execute(f)) } diff --git a/src/main/scala/zio/s3/providers.scala b/src/main/scala/zio/s3/providers.scala index ad1fffe9..0cad41be 100644 --- a/src/main/scala/zio/s3/providers.scala +++ b/src/main/scala/zio/s3/providers.scala @@ -1,94 +1,81 @@ package zio.s3 -import software.amazon.awssdk.auth.credentials.{ - AwsBasicCredentials, - AwsCredentials, - AwsCredentialsProvider, - ContainerCredentialsProvider, - DefaultCredentialsProvider, - EnvironmentVariableCredentialsProvider, - InstanceProfileCredentialsProvider, - ProfileCredentialsProvider, - SystemPropertyCredentialsProvider, - WebIdentityTokenFileCredentialsProvider -} -import zio.blocking.{ Blocking, effectBlocking } -import zio.{ IO, Managed, UManaged, ZIO, ZManaged } +import software.amazon.awssdk.auth.credentials._ +import zio.{ IO, Scope, UIO, ZIO } object providers { - def const(accessKeyId: String, secretAccessKey: String): UManaged[AwsCredentialsProvider] = - ZManaged.succeedNow[AwsCredentialsProvider](new AwsCredentialsProvider { - override def resolveCredentials(): AwsCredentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey) - }) + def const(accessKeyId: String, secretAccessKey: String): UIO[AwsCredentialsProvider] = + ZIO.succeedNow[AwsCredentialsProvider](() => AwsBasicCredentials.create(accessKeyId, secretAccessKey)) - val system: Managed[InvalidCredentials, SystemPropertyCredentialsProvider] = - ZManaged + val system: IO[InvalidCredentials, SystemPropertyCredentialsProvider] = + ZIO .succeed(SystemPropertyCredentialsProvider.create()) - .tapM(c => ZIO(c.resolveCredentials())) + .tap(c => ZIO.attemptBlocking(c.resolveCredentials())) .mapError(err => InvalidCredentials(err.getMessage)) - val env: Managed[InvalidCredentials, EnvironmentVariableCredentialsProvider] = - ZManaged + val env: IO[InvalidCredentials, EnvironmentVariableCredentialsProvider] = + ZIO .succeed(EnvironmentVariableCredentialsProvider.create()) - .tapM(c => + .tap(c => ZIO - .effect(c.resolveCredentials()) + .attemptBlocking(c.resolveCredentials()) .mapError(err => InvalidCredentials(err.getMessage)) ) - val profile: ZManaged[Blocking, InvalidCredentials, ProfileCredentialsProvider] = + val profile: ZIO[Scope, InvalidCredentials, ProfileCredentialsProvider] = profile(None) - def profile(name: Option[String]): ZManaged[Blocking, InvalidCredentials, ProfileCredentialsProvider] = - ZManaged - .fromAutoCloseable(IO.succeed(ProfileCredentialsProvider.create(name.orNull))) - .tapM(c => - effectBlocking(c.resolveCredentials()) + def profile(name: Option[String]): ZIO[Scope, InvalidCredentials, ProfileCredentialsProvider] = + ZIO + .fromAutoCloseable(ZIO.succeed(ProfileCredentialsProvider.create(name.orNull))) + .tap(c => + ZIO + .attemptBlocking(c.resolveCredentials()) .mapError(err => InvalidCredentials(err.getMessage)) ) - val container: ZManaged[Blocking, InvalidCredentials, ContainerCredentialsProvider] = - ZManaged + val container: ZIO[Scope, InvalidCredentials, ContainerCredentialsProvider] = + ZIO .fromAutoCloseable( - IO.succeed( + ZIO.succeed( ContainerCredentialsProvider .builder() .build() ) ) - .tapM(c => effectBlocking(c.resolveCredentials())) + .tap(c => ZIO.attemptBlocking(c.resolveCredentials())) .mapError(err => InvalidCredentials(err.getMessage)) - val instanceProfile: ZManaged[Blocking, InvalidCredentials, InstanceProfileCredentialsProvider] = - ZManaged + val instanceProfile: ZIO[Scope, InvalidCredentials, InstanceProfileCredentialsProvider] = + ZIO .fromAutoCloseable( - IO.succeed( + ZIO.succeed( InstanceProfileCredentialsProvider .create() ) ) - .tapM(c => effectBlocking(c.resolveCredentials())) + .tap(c => ZIO.attemptBlocking(c.resolveCredentials())) .mapError(err => InvalidCredentials(err.getMessage)) /** * Use of this layer requires the awssdk sts module to be on the classpath, * by default zio-s3 required this library */ - val webIdentity: ZManaged[Blocking, InvalidCredentials, WebIdentityTokenFileCredentialsProvider] = - ZManaged + val webIdentity: ZIO[Scope, InvalidCredentials, WebIdentityTokenFileCredentialsProvider] = + ZIO .succeed( WebIdentityTokenFileCredentialsProvider .create() ) - .tapM(c => effectBlocking(c.resolveCredentials())) + .tap(c => ZIO.attemptBlocking(c.resolveCredentials())) .mapError(err => InvalidCredentials(err.getMessage)) /** * Use default chaining strategy to fetch credentials */ - val default: ZManaged[Blocking, InvalidCredentials, AwsCredentialsProvider] = - ZManaged.fromAutoCloseable( - IO.succeed(DefaultCredentialsProvider.create()) + val default: ZIO[Scope, InvalidCredentials, AwsCredentialsProvider] = + ZIO.fromAutoCloseable( + ZIO.succeed(DefaultCredentialsProvider.create()) ) } diff --git a/src/main/scala/zio/s3/s3model.scala b/src/main/scala/zio/s3/s3model.scala index 22a89634..643a79f2 100644 --- a/src/main/scala/zio/s3/s3model.scala +++ b/src/main/scala/zio/s3/s3model.scala @@ -46,7 +46,7 @@ final case class S3ObjectListing( object S3ObjectListing { - def from(bucketName: String, nextContinuationToken: Option[String]) = + def from(bucketName: String, nextContinuationToken: Option[String]): S3ObjectListing = S3ObjectListing(bucketName, None, None, Chunk.empty, nextContinuationToken, None) def fromResponse(r: ListObjectsV2Response): S3ObjectListing = diff --git a/src/main/scala/zio/s3/settings.scala b/src/main/scala/zio/s3/settings.scala index 93b9d7be..027f41c5 100644 --- a/src/main/scala/zio/s3/settings.scala +++ b/src/main/scala/zio/s3/settings.scala @@ -33,7 +33,7 @@ object S3Region { self => /** * Only use for supporting other region for different s3 compatible storage provider such as OVH * Your S3 region might be invalid and will result into runtime error. - * @param s unsafe region + * @param r unsafe region */ def unsafeFromString(r: String): S3Region = new S3Region(Region.of(r)) {} diff --git a/src/test/scala/zio/s3/S3LayerTest.scala b/src/test/scala/zio/s3/S3LayerTest.scala index 11e43e69..f33adaf4 100644 --- a/src/test/scala/zio/s3/S3LayerTest.scala +++ b/src/test/scala/zio/s3/S3LayerTest.scala @@ -5,16 +5,14 @@ import software.amazon.awssdk.regions.Region import zio.test.Assertion._ import zio.test._ -object S3LayerSpec extends DefaultRunnableSpec { +object S3LayerTest extends ZIOSpecDefault { - override def spec = + override def spec: Spec[Any, Nothing] = suite("S3LayerSpec")( - testM("using ZManaged[R, E, A] in liveM compiles") { - assertM( - typeCheck( - """liveM(Region.CA_CENTRAL_1, providers.default, Some(URI.create("http://localhost:9000")))""" - ) - )(isRight) + test("using ZIO[R with Scope, E, A] in liveZIO compiles") { + typeCheck( + """liveZIO(Region.CA_CENTRAL_1, providers.default, Some(URI.create("http://localhost:9000")))""" + ).map(assert(_)(isRight)) } ) } diff --git a/src/test/scala/zio/s3/S3ProvidersTest.scala b/src/test/scala/zio/s3/S3ProvidersTest.scala index c1e685ba..9109cf4f 100644 --- a/src/test/scala/zio/s3/S3ProvidersTest.scala +++ b/src/test/scala/zio/s3/S3ProvidersTest.scala @@ -6,12 +6,12 @@ import zio.s3.providers._ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import zio.{ UIO, ZIO } +import zio.{ Scope, UIO, ZIO } -object S3ProvidersTest extends DefaultRunnableSpec { +object S3ProvidersTest extends ZIOSpecDefault { def setProps(props: (String, String)*): UIO[Unit] = - UIO { + ZIO.succeed { props.foreach { case (k, v) => System.setProperty(k, v) @@ -19,79 +19,83 @@ object S3ProvidersTest extends DefaultRunnableSpec { } def unsetProps(keys: String*): UIO[Unit] = - UIO { + ZIO.succeed { keys.foreach(System.clearProperty) } - def spec = + def spec: Spec[TestEnvironment with Scope, Any] = suite("Providers")( - testM("cred with const") { - assertM(const("k", "v").useNow.map(_.resolveCredentials()))( - equalTo(AwsBasicCredentials.create("k", "v")) - ) + test("cred with const") { + ZIO + .scoped(const("k", "v").map(_.resolveCredentials())) + .map(res => assertTrue(res == AwsBasicCredentials.create("k", "v"))) }, - testM("cred with default fallback const") { - assertM( - (env <> const("k", "v")).useNow.map(_.resolveCredentials()) - )(equalTo(AwsBasicCredentials.create("k", "v"))) + test("cred with default fallback const") { + ZIO + .scoped((env <> const("k", "v")).map(_.resolveCredentials())) + .map(res => assertTrue(res == AwsBasicCredentials.create("k", "v"))) }, - testM("cred in system properties") { + test("cred in system properties") { for { - cred <- system.use(p => ZIO(p.resolveCredentials())) - } yield assert(cred)(equalTo(AwsBasicCredentials.create("k1", "s1"))) - } @@ flaky @@ around_( + cred <- ZIO.scoped(system.flatMap(p => ZIO.attempt(p.resolveCredentials()))) + } yield assertTrue(cred == AwsBasicCredentials.create("k1", "s1")) + } @@ flaky @@ around( setProps(("aws.accessKeyId", "k1"), ("aws.secretAccessKey", "s1")), unsetProps("aws.accessKeyId", "aws.secretAccessKey") ), - testM("no cred in system properties") { + test("no cred in system properties") { for { - failure <- system.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(system).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) - } @@ around_( + } @@ around( unsetProps("aws.accessKeyId", "aws.secretAccessKey"), - UIO.unit + ZIO.unit ), - testM("no cred in environment properties") { + test("no cred in environment properties") { for { - failure <- env.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(env).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("no cred in profile") { + test("no cred in profile") { for { - failure <- profile.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(profile).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("no cred in named profile") { + test("no cred in named profile") { for { - failure <- profile(Some("name")).useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(profile(Some("name"))).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("no cred in container") { + test("no cred in container") { for { - failure <- container.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(container).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("no cred in instance profile credentials") { + test("no cred in instance profile credentials") { for { - failure <- instanceProfile.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(instanceProfile).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("no cred in webidentity credentials") { + test("no cred in webidentity credentials") { for { - failure <- webIdentity.useNow.flip.map(_.getMessage) + failure <- ZIO.scoped(webIdentity).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) }, - testM("settings from invalid creds") { + test("settings from invalid creds") { for { - failure <- settings( - Region.AF_SOUTH_1, - system.useNow.map(_.resolveCredentials()) - ).build.useNow.flip + failure <- ZIO + .scoped( + settings( + Region.AF_SOUTH_1, + ZIO.scoped(system).map(_.resolveCredentials()) + ).build + ) + .flip } yield assert(failure.getMessage)(isNonEmptyString) }, - testM("no cred when chain all providers") { + test("no cred when chain all providers") { for { - failure <- default.use(c => ZIO.effect(c.resolveCredentials())).flip.map(_.getMessage) + failure <- ZIO.scoped(default.flatMap(c => ZIO.attempt(c.resolveCredentials()))).flip.map(_.getMessage) } yield assert(failure)(isNonEmptyString) } ) @@ sequential diff --git a/src/test/scala/zio/s3/S3SettingsTest.scala b/src/test/scala/zio/s3/S3SettingsTest.scala index d6a0a94d..a993f6f3 100644 --- a/src/test/scala/zio/s3/S3SettingsTest.scala +++ b/src/test/scala/zio/s3/S3SettingsTest.scala @@ -2,29 +2,29 @@ package zio.s3 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.regions.Region -import zio.test.Assertion._ import zio.test._ -object S3SettingsTest extends DefaultRunnableSpec { +object S3SettingsTest extends ZIOSpecDefault { - def spec = + def spec: Spec[Any, InvalidSettings] = suite("Settings")( - testM("invalid region") { + test("invalid region") { for { failure <- S3Settings .from(Region.of("invalid"), AwsBasicCredentials.create("key", "secret")) .foldCause(_.failureOption.map(_.message).mkString, _ => "") - } yield assert(failure)(equalTo("Invalid aws region provided : invalid")) + } yield assertTrue(failure == "Invalid aws region provided : invalid") }, - testM("valid region") { + test("valid region") { for { success <- S3Settings.from(Region.US_EAST_2, AwsBasicCredentials.create("key", "secret")) - } yield assert(success.s3Region.region -> success.credentials)( - equalTo(Region.US_EAST_2 -> AwsBasicCredentials.create("key", "secret")) + } yield assertTrue( + success.s3Region.region -> success.credentials == + Region.US_EAST_2 -> AwsBasicCredentials.create("key", "secret") ) }, test("unsafe Region") { - assert(S3Region.unsafeFromString("blah").region)(equalTo(Region.of("blah"))) + assertTrue(S3Region.unsafeFromString("blah").region == Region.of("blah")) } ) } diff --git a/src/test/scala/zio/s3/S3Test.scala b/src/test/scala/zio/s3/S3Test.scala index e6efb282..ad2c3ad6 100644 --- a/src/test/scala/zio/s3/S3Test.scala +++ b/src/test/scala/zio/s3/S3Test.scala @@ -6,49 +6,48 @@ import java.net.URI import java.util.UUID import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model.{ ObjectCannedACL, S3Exception } -import zio.blocking.Blocking import zio.nio.file.{ Path => ZPath } -import zio.nio.file.{ Files => ZFiles } -import zio.stream.{ ZStream, ZTransducer } +import zio.stream.{ ZPipeline, ZStream } import zio.test.Assertion._ +import zio.test.TestAspect.sequential import zio.test._ -import zio.{ Chunk, ZLayer } +import zio.{ Chunk, Scope, ZLayer } import scala.util.Random -object S3LiveSpec extends DefaultRunnableSpec { - private val root = ZPath("minio/data") +object S3LiveSpec extends ZIOSpecDefault { private val s3 = - live( - Region.CA_CENTRAL_1, - AwsBasicCredentials.create("TESTKEY", "TESTSECRET"), - Some(URI.create("http://localhost:9000")) - ) + zio.s3 + .live( + Region.CA_CENTRAL_1, + AwsBasicCredentials.create("TESTKEY", "TESTSECRET"), + Some(URI.create("http://localhost:9000")) + ) .mapError(TestFailure.die) - override def spec = - S3Suite.spec("S3LiveSpec", root).provideCustomLayerShared(s3) + override def spec: Spec[TestEnvironment with Scope, Any] = + S3Suite.spec("S3LiveSpec").provideLayerShared(s3) } -object S3TestSpec extends DefaultRunnableSpec { +object S3TestSpec extends ZIOSpecDefault { private val root = ZPath("test-data") - private val s3: ZLayer[Blocking, Nothing, S3] = zio.s3.stub(root) + private val s3: ZLayer[Any, Nothing, S3] = zio.s3.stub(root) - override def spec = - S3Suite.spec("S3TestSpec", root).provideCustomLayerShared(Blocking.live >>> s3) + override def spec: Spec[TestEnvironment with Scope, Any] = + S3Suite.spec("S3TestSpec").provideLayerShared(s3) } -object InvalidS3LayerTestSpec extends DefaultRunnableSpec { +object InvalidS3LayerTestSpec extends ZIOSpecDefault { - private val s3: ZLayer[Blocking, S3Exception, S3] = - zio.s3.liveM(Region.EU_CENTRAL_1, providers.default) + private val s3: ZLayer[Any, S3Exception, S3] = + zio.s3.liveZIO[Any](Region.EU_CENTRAL_1, providers.default) - override def spec = + override def spec: Spec[Any, Nothing] = suite("InvalidS3LayerTest") { - testM("listBuckets") { - assertM(listBuckets.provideCustomLayer(s3).either)(isLeft(isSubtype[S3Exception](anything))) + test("listBuckets") { + listBuckets.provideLayer(s3).either.map(assert(_)(isLeft(isSubtype[S3Exception](anything)))) } } @@ -64,34 +63,33 @@ object S3Suite { (size, ZStream.fromChunks(Chunk.fromArray(bytes))) } - def spec(label: String, root: ZPath): Spec[S3 with Blocking, TestFailure[Exception], TestSuccess] = + def spec(label: String): Spec[S3, Exception] = suite(label)( - testM("listAllObjects") { + test("listAllObjects") { for { list <- listAllObjects(bucketName).runCollect } yield assert(list.map(_.key))(hasSameElements(List("console.log", "dir1/hello.txt", "dir1/user.csv"))) }, - testM("list buckets") { + test("list buckets") { for { buckets <- listBuckets - } yield assert(buckets.map(_.name))(equalTo(Chunk.single(bucketName))) + } yield assertTrue(buckets.map(_.name) == Chunk.single(bucketName)) }, - testM("list objects") { + test("list objects") { for { succeed <- listObjects(bucketName) - } yield assert(succeed.bucketName)(equalTo(bucketName)) && assert( - succeed.objectSummaries.map(s => s.bucketName -> s.key) - )( - hasSameElements( - List( - (bucketName, "console.log"), - (bucketName, "dir1/hello.txt"), - (bucketName, "dir1/user.csv") + } yield assertTrue(succeed.bucketName == bucketName) && + assert(succeed.objectSummaries.map(s => s.bucketName -> s.key))( + hasSameElements( + List( + (bucketName, "console.log"), + (bucketName, "dir1/hello.txt"), + (bucketName, "dir1/user.csv") + ) ) ) - ) }, - testM("list objects with prefix") { + test("list objects with prefix") { for { succeed <- listObjects(bucketName, ListObjectOptions.from("console", 10)) } yield assert(succeed)( @@ -103,48 +101,47 @@ object S3Suite { ) ) }, - testM("list objects with not match prefix") { + test("list objects with not match prefix") { for { succeed <- listObjects(bucketName, ListObjectOptions.from("blah", 10)) - } yield assert(succeed.bucketName -> succeed.objectSummaries)( - equalTo(bucketName -> Chunk.empty) - ) + } yield assertTrue(succeed.bucketName -> succeed.objectSummaries == bucketName -> Chunk.empty) }, - testM("list objects with delimiter") { + test("list objects with delimiter") { for { succeed <- listObjects(bucketName, ListObjectOptions(Some("dir1/"), 10, Some("/"), None)) - } yield assert(succeed.bucketName -> succeed.objectSummaries.map(_.key))( - equalTo(bucketName -> Chunk("dir1/hello.txt", "dir1/user.csv")) + } yield assertTrue( + succeed.bucketName -> succeed.objectSummaries + .map(_.key) == bucketName -> Chunk("dir1/hello.txt", "dir1/user.csv") ) }, - testM("list objects with startAfter dir1/hello.txt") { + test("list objects with startAfter dir1/hello.txt") { for { succeed <- listObjects(bucketName, ListObjectOptions.fromStartAfter("dir1/hello.txt")) - } yield assert(succeed.bucketName -> succeed.objectSummaries.map(_.key).sorted)( - equalTo(bucketName -> Chunk("dir1/user.csv")) + } yield assertTrue( + succeed.bucketName -> succeed.objectSummaries.map(_.key).sorted == bucketName -> Chunk("dir1/user.csv") ) }, - testM("create bucket") { + test("create bucket") { val bucketTmp = UUID.randomUUID().toString for { succeed <- createBucket(bucketTmp) - _ <- ZFiles.delete(root / bucketTmp) + _ <- deleteBucket(bucketTmp) } yield assert(succeed)(isUnit) }, - testM("create empty bucket name fail") { + test("create empty bucket name fail") { for { succeed <- createBucket("") .foldCause(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("create bucket already exist") { + test("create bucket already exist") { for { succeed <- createBucket(bucketName) .foldCause(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("delete bucket") { + test("delete bucket") { val bucketTmp = UUID.randomUUID().toString for { @@ -152,77 +149,73 @@ object S3Suite { succeed <- deleteBucket(bucketTmp) } yield assert(succeed)(isUnit) }, - testM("delete bucket dont exist") { + test("delete bucket dont exist") { for { succeed <- deleteBucket(UUID.randomUUID().toString).foldCause(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("exists bucket") { + test("exists bucket") { for { succeed <- isBucketExists(bucketName) - } yield assert(succeed)(isTrue) + } yield assertTrue(succeed) }, - testM("exists bucket - invalid identifier") { + test("exists bucket - invalid identifier") { for { succeed <- isBucketExists(UUID.randomUUID().toString) - } yield assert(succeed)(isFalse) - }, - testM("delete object") { - val objectTmp = UUID.randomUUID().toString - - for { - _ <- ZFiles.createFile(root / bucketName / objectTmp) - succeed <- deleteObject(bucketName, objectTmp) - } yield assert(succeed)(isUnit) + } yield assertTrue(!succeed) }, - testM("delete object - invalid identifier") { + test("delete object - invalid identifier") { for { succeed <- deleteObject(bucketName, UUID.randomUUID().toString) } yield assert(succeed)(isUnit) }, - testM("get object") { + test("get object") { for { - content <- getObject(bucketName, "dir1/hello.txt") - .transduce(ZTransducer.utf8Decode) - .runCollect - } yield assert(content.mkString)(equalTo("""|Hello ZIO s3 - |this is a beautiful day""".stripMargin)) + content <- getObject(bucketName, "dir1/hello.txt") + .via(ZPipeline.utf8Decode) + .runCollect + contentString = content.mkString + } yield assertTrue( + contentString == + """|Hello ZIO s3 + |this is a beautiful day""".stripMargin + ) }, - testM("get object - invalid identifier") { + test("get object - invalid identifier") { for { succeed <- getObject(bucketName, UUID.randomUUID().toString) - .transduce(ZTransducer.utf8Decode) + .via(ZPipeline.utf8Decode) .runCollect .fold(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("get nextObjects") { + test("get nextObjects") { for { token <- listObjects(bucketName, ListObjectOptions.fromMaxKeys(1)).map(_.nextContinuationToken) listing <- getNextObjects(S3ObjectListing.from(bucketName, token)) - } yield assert(listing.objectSummaries)(isNonEmpty) + } yield assertTrue(listing.objectSummaries.nonEmpty) }, - testM("get nextObjects - invalid token") { + test("get nextObjects - invalid token") { for { succeed <- getNextObjects(S3ObjectListing.from(bucketName, Some(""))).foldCause(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("put object") { + test("put object") { val c = Chunk.fromArray(Random.nextString(65536).getBytes()) val contentLength = c.length.toLong - val data = ZStream.fromChunks(c).chunkN(5) + val data = ZStream.fromChunks(c).rechunk(5) val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- putObject(bucketName, tmpKey, contentLength, data) objectContentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(objectContentLength)(equalTo(contentLength)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(objectContentLength == contentLength) }, - testM("multipart object") { + test("multipart object") { val text = """Lorem ipsum dolor sit amet, consectetur adipiscing elit. |Donec semper eros quis felis scelerisque, quis lobortis felis cursus. @@ -245,52 +238,52 @@ object S3Suite { for { _ <- multipartUpload(bucketName, tmpKey, data)(1) contentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(contentLength)(isGreaterThan(0L)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(contentLength > 0L) }, - testM("multipart with parrallelism = 1") { + test("multipart with parrallelism = 1") { val (dataLength, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- multipartUpload(bucketName, tmpKey, data)(1) contentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(contentLength)(equalTo(dataLength.toLong)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(contentLength == dataLength.toLong) }, - testM("multipart with invalid parallelism value 0") { + test("multipart with invalid parallelism value 0") { val data = ZStream.empty val tmpKey = Random.alphanumeric.take(10).mkString val io = multipartUpload(bucketName, tmpKey, data)(0) - assertM(io.run)(dies(hasMessage(equalTo("parallelism must be > 0. 0 is invalid")))) + io.exit.map(assert(_)(dies(hasMessage(equalTo("parallelism must be > 0. 0 is invalid"))))) }, - testM("multipart with invalid partSize value 0") { + test("multipart with invalid partSize value 0") { val tmpKey = Random.alphanumeric.take(10).mkString val invalidOption = MultipartUploadOptions.fromPartSize(0) val io = multipartUpload(bucketName, tmpKey, ZStream.empty, invalidOption)(1) - assertM(io.run)(dies(hasMessage(equalTo(s"Invalid part size 0.0 Mb, minimum size is 5 Mb")))) + io.exit.map(assert(_)(dies(hasMessage(equalTo(s"Invalid part size 0.0 Mb, minimum size is 5 Mb"))))) }, - testM("multipart object when the content is empty") { + test("multipart object when the content is empty") { val data = ZStream.empty val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- multipartUpload(bucketName, tmpKey, data)(1) contentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(contentLength)(equalTo(0L)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(contentLength == 0L) }, - testM("multipart object when the content type is not provided") { + test("multipart object when the content type is not provided") { val (_, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- multipartUpload(bucketName, tmpKey, data)(4) contentType <- getObjectMetadata(bucketName, tmpKey).map(_.contentType) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(contentType)(equalTo("binary/octet-stream")) + deleteObject(bucketName, tmpKey) + } yield assertTrue(contentType == "binary/octet-stream") }, - testM("multipart object when there is a content type and metadata") { + test("multipart object when there is a content type and metadata") { val metadata = Map("key1" -> "value1") val (_, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString @@ -304,43 +297,43 @@ object S3Suite { UploadOptions(metadata, ObjectCannedACL.PRIVATE, Some("application/json")) ) )(4) - objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* ZFiles.delete(root / bucketName / tmpKey) - } yield assert(objectMetadata.contentType)(equalTo("application/json")) && - assert(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v })(equalTo(Map("key1" -> "value1"))) + objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey) + } yield assertTrue(objectMetadata.contentType == "application/json") && + assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")) }, - testM("multipart object when the chunk size and parallelism are customized") { + test("multipart object when the chunk size and parallelism are customized") { val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- multipartUpload(bucketName, tmpKey, data, MultipartUploadOptions.fromPartSize(10 * PartSize.Mega))(4) contentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(contentLength)(equalTo(dataSize.toLong)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(contentLength == dataSize.toLong) }, - testM("stream lines") { + test("stream lines") { for { list <- streamLines(bucketName, "dir1/user.csv").runCollect - } yield assert(list.headOption)(isSome(equalTo("John,Doe,120 jefferson st.,Riverside, NJ, 08075"))) && - assert(list.lastOption)(isSome(equalTo("Marie,White,20 time square,Bronx, NY,08220"))) + } yield assertTrue(list.headOption.get == "John,Doe,120 jefferson st.,Riverside, NJ, 08075") && + assertTrue(list.lastOption.get == "Marie,White,20 time square,Bronx, NY,08220") }, - testM("stream lines - invalid key") { + test("stream lines - invalid key") { for { succeed <- streamLines(bucketName, "blah").runCollect.fold(_ => false, _ => true) - } yield assert(succeed)(isFalse) + } yield assertTrue(!succeed) }, - testM("put object when the content type is not provided") { + test("put object when the content type is not provided") { val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString for { _ <- putObject(bucketName, tmpKey, dataSize.toLong, data) contentLength <- getObjectMetadata(bucketName, tmpKey).map(_.contentLength) <* - ZFiles.delete(root / bucketName / tmpKey) - } yield assert(dataSize.toLong)(equalTo(contentLength)) + deleteObject(bucketName, tmpKey) + } yield assertTrue(dataSize.toLong == contentLength) }, - testM("put object when there is a content type and metadata") { + test("put object when there is a content type and metadata") { val _metadata = Map("key1" -> "value1") val (dataSize, data) = randomNEStream() val tmpKey = Random.alphanumeric.take(10).mkString @@ -353,10 +346,10 @@ object S3Suite { data, UploadOptions.from(_metadata, "application/json") ) - objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* ZFiles.delete(root / bucketName / tmpKey) - } yield assert(objectMetadata.contentType)(equalTo("application/json")) && - assert(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v })(equalTo(Map("key1" -> "value1"))) + objectMetadata <- getObjectMetadata(bucketName, tmpKey) <* deleteObject(bucketName, tmpKey) + } yield assertTrue(objectMetadata.contentType == "application/json") && + assertTrue(objectMetadata.metadata.map { case (k, v) => k.toLowerCase -> v } == Map("key1" -> "value1")) } - ) + ) @@ sequential } From 79b75240166c818ad3ac0e4a3b1b8f59aeb3373c Mon Sep 17 00:00:00 2001 From: Regis Leray Date: Sat, 2 Jul 2022 20:34:04 -0400 Subject: [PATCH 2/4] zio 2.0.0 (#312) --- build.sbt | 6 +- src/main/scala/zio/s3/Test.scala | 321 ++++++++++++++-------------- src/main/scala/zio/s3/package.scala | 2 +- 3 files changed, 164 insertions(+), 165 deletions(-) diff --git a/build.sbt b/build.sbt index 315a79a5..27da42d4 100644 --- a/build.sbt +++ b/build.sbt @@ -22,7 +22,7 @@ inThisBuild( addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") -val zioVersion = "2.0.0-RC6" +val zioVersion = "2.0.0" val awsVersion = "2.16.61" lazy val `zio-s3` = project @@ -35,8 +35,8 @@ lazy val `zio-s3` = project libraryDependencies ++= Seq( "dev.zio" %% "zio" % zioVersion, "dev.zio" %% "zio-streams" % zioVersion, - "dev.zio" %% "zio-nio" % "2.0.0-RC7", - "dev.zio" %% "zio-interop-reactivestreams" % "2.0.0-RC7", + "dev.zio" %% "zio-nio" % "2.0.0", + "dev.zio" %% "zio-interop-reactivestreams" % "2.0.0", "software.amazon.awssdk" % "s3" % awsVersion, "software.amazon.awssdk" % "sts" % awsVersion, "dev.zio" %% "zio-test" % zioVersion % Test, diff --git a/src/main/scala/zio/s3/Test.scala b/src/main/scala/zio/s3/Test.scala index cf13ee28..2b9830aa 100644 --- a/src/main/scala/zio/s3/Test.scala +++ b/src/main/scala/zio/s3/Test.scala @@ -46,173 +46,172 @@ object Test { .build() .asInstanceOf[S3Exception] - def connect(path: ZPath): S3 = { + def connect(path: ZPath): UIO[S3] = { type ContentType = String type Metadata = Map[String, String] - new S3 { - private val refDb: Ref[Map[String, (ContentType, Metadata)]] = - Ref.unsafeMake(Map.empty[String, (ContentType, Metadata)]) - - override def createBucket(bucketName: String): IO[S3Exception, Unit] = - Files.createDirectory(path / bucketName).orDie - - override def deleteBucket(bucketName: String): IO[S3Exception, Unit] = - Files.delete(path / bucketName).orDie - - override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] = - Files.exists(path / bucketName) - - override val listBuckets: IO[S3Exception, S3BucketListing] = - Files - .list(path) - .filterZIO(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory)) - .mapZIO { p => - Files - .readAttributes[BasicFileAttributes](p) - .map(attr => S3Bucket(p.filename.toString, attr.creationTime().toInstant)) - } - .runCollect - .orDie - - override def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] = - Files.deleteIfExists(path / bucketName / key).orDie.unit - - override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] = - ZStream - .scoped(ZIO.fromAutoCloseable(ZIO.attempt(new FileInputStream((path / bucketName / key).toFile)))) - .flatMap(ZStream.fromInputStream(_, 2048)) - .refineOrDie { - case e: FileNotFoundException => fileNotFound(e) - } - - override def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] = - (for { - res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) - (contentType, metadata) = res - file <- Files - .readAttributes[BasicFileAttributes](path / bucketName / key) - .map(p => ObjectMetadata(metadata, contentType, p.size())) - } yield file).orDie - - override def listObjects( - bucketName: String, - options: ListObjectOptions - ): IO[S3Exception, S3ObjectListing] = - Files - .find(path / bucketName) { - case (p, _) if options.delimiter.nonEmpty => - options.prefix.fold(true)((path / bucketName).relativize(p).toString().startsWith) - case (p, _) => - options.prefix.fold(true)(p.filename.toString().startsWith) - } - .mapZIO(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p)) - .filter { case (attr, _) => attr.isRegularFile } - .map { - case (attr, f) => - S3ObjectSummary( - bucketName, - (path / bucketName).relativize(f).toString(), - attr.lastModifiedTime().toInstant, - attr.size() - ) - } - .runCollect - .map( - _.sortBy(_.key) - .mapAccum(options.starAfter) { - case (Some(startWith), o) => - if (startWith.startsWith(o.key)) - None -> Chunk.empty - else - Some(startWith) -> Chunk.empty - case (_, o) => - None -> Chunk(o) - } - ._2 - .flatten - ) - .map { - case list if list.size > options.maxKeys => - S3ObjectListing( - bucketName, - options.delimiter, - options.starAfter, - list.take(options.maxKeys.toInt), - Some(UUID.randomUUID().toString), - None - ) - case list => - S3ObjectListing(bucketName, options.delimiter, options.starAfter, list, None, None) + Ref.make(Map.empty[String, (ContentType, Metadata)]).map { refDb => + new S3 { + override def createBucket(bucketName: String): IO[S3Exception, Unit] = + Files.createDirectory(path / bucketName).orDie + + override def deleteBucket(bucketName: String): IO[S3Exception, Unit] = + Files.delete(path / bucketName).orDie + + override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] = + Files.exists(path / bucketName) + + override val listBuckets: IO[S3Exception, S3BucketListing] = + Files + .list(path) + .filterZIO(p => Files.readAttributes[BasicFileAttributes](p).map(_.isDirectory)) + .mapZIO { p => + Files + .readAttributes[BasicFileAttributes](p) + .map(attr => S3Bucket(p.filename.toString, attr.creationTime().toInstant)) + } + .runCollect + .orDie + + override def deleteObject(bucketName: String, key: String): IO[S3Exception, Unit] = + Files.deleteIfExists(path / bucketName / key).orDie.unit + + override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] = + ZStream + .scoped(ZIO.fromAutoCloseable(ZIO.attempt(new FileInputStream((path / bucketName / key).toFile)))) + .flatMap(ZStream.fromInputStream(_, 2048)) + .refineOrDie { + case e: FileNotFoundException => fileNotFound(e) + } + + override def getObjectMetadata(bucketName: String, key: String): IO[S3Exception, ObjectMetadata] = + (for { + res <- refDb.get.map(_.getOrElse(bucketName + key, "" -> Map.empty[String, String])) + (contentType, metadata) = res + file <- Files + .readAttributes[BasicFileAttributes](path / bucketName / key) + .map(p => ObjectMetadata(metadata, contentType, p.size())) + } yield file).orDie + + override def listObjects( + bucketName: String, + options: ListObjectOptions + ): IO[S3Exception, S3ObjectListing] = + Files + .find(path / bucketName) { + case (p, _) if options.delimiter.nonEmpty => + options.prefix.fold(true)((path / bucketName).relativize(p).toString().startsWith) + case (p, _) => + options.prefix.fold(true)(p.filename.toString().startsWith) + } + .mapZIO(p => Files.readAttributes[BasicFileAttributes](p).map(a => a -> p)) + .filter { case (attr, _) => attr.isRegularFile } + .map { + case (attr, f) => + S3ObjectSummary( + bucketName, + (path / bucketName).relativize(f).toString(), + attr.lastModifiedTime().toInstant, + attr.size() + ) + } + .runCollect + .map( + _.sortBy(_.key) + .mapAccum(options.starAfter) { + case (Some(startWith), o) => + if (startWith.startsWith(o.key)) + None -> Chunk.empty + else + Some(startWith) -> Chunk.empty + case (_, o) => + None -> Chunk(o) + } + ._2 + .flatten + ) + .map { + case list if list.size > options.maxKeys => + S3ObjectListing( + bucketName, + options.delimiter, + options.starAfter, + list.take(options.maxKeys.toInt), + Some(UUID.randomUUID().toString), + None + ) + case list => + S3ObjectListing(bucketName, options.delimiter, options.starAfter, list, None, None) + } + .orDie + + override def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] = + listing.nextContinuationToken match { + case Some(token) if token.nonEmpty => listObjects(listing.bucketName, ListObjectOptions.fromMaxKeys(100)) + case _ => ZIO.dieMessage("Empty token is invalid") } - .orDie - override def getNextObjects(listing: S3ObjectListing): IO[S3Exception, S3ObjectListing] = - listing.nextContinuationToken match { - case Some(token) if token.nonEmpty => listObjects(listing.bucketName, ListObjectOptions.fromMaxKeys(100)) - case _ => ZIO.dieMessage("Empty token is invalid") + override def putObject[R]( + bucketName: String, + key: String, + contentLength: Long, + content: ZStream[R, Throwable, Byte], + options: UploadOptions + ): ZIO[R, S3Exception, Unit] = + (for { + _ <- refDb.update(db => + db + (bucketName + key -> (options.contentType + .getOrElse("application/octet-stream") -> options.metadata)) + ) + filePath = path / bucketName / key + _ <- filePath.parent + .map(parentPath => Files.createDirectories(parentPath)) + .getOrElse(ZIO.unit) + + _ <- ZIO.scoped[R]( + AsynchronousFileChannel + .open( + filePath, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.CREATE + ) + .flatMap(channel => + content + .mapChunks(Chunk.succeed) + .runFoldZIO(0L) { case (pos, c) => channel.writeChunk(c, pos).as(pos + c.length) } + ) + ) + } yield ()).orDie + + override def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] = + ZIO.dieMessage("Not implemented error - please don't call execute() S3 Test mode") + + override def multipartUpload[R]( + bucketName: String, + key: String, + content: ZStream[R, Throwable, Byte], + options: MultipartUploadOptions + )(parallelism: Int): ZIO[R, S3Exception, Unit] = { + val _contentType = options.uploadOptions.contentType.orElse(Some("binary/octet-stream")) + + for { + _ <- ZIO.dieMessage(s"parallelism must be > 0. $parallelism is invalid").unless(parallelism > 0) + _ <- + ZIO + .dieMessage( + s"Invalid part size ${Math.floor(options.partSize.toDouble / PartSize.Mega.toDouble * 100d) / 100d} Mb, minimum size is ${PartSize.Min / PartSize.Mega} Mb" + ) + .unless(options.partSize >= PartSize.Min) + _ <- putObject( + bucketName, + key, + 0, + content.rechunk(options.partSize), + options.uploadOptions.copy(contentType = _contentType) + ) + } yield () } - - override def putObject[R]( - bucketName: String, - key: String, - contentLength: Long, - content: ZStream[R, Throwable, Byte], - options: UploadOptions - ): ZIO[R, S3Exception, Unit] = - (for { - _ <- - refDb.update(db => - db + (bucketName + key -> (options.contentType.getOrElse("application/octet-stream") -> options.metadata)) - ) - filePath = path / bucketName / key - _ <- filePath.parent - .map(parentPath => Files.createDirectories(parentPath)) - .getOrElse(ZIO.unit) - - _ <- ZIO.scoped[R]( - AsynchronousFileChannel - .open( - filePath, - StandardOpenOption.WRITE, - StandardOpenOption.TRUNCATE_EXISTING, - StandardOpenOption.CREATE - ) - .flatMap(channel => - content - .mapChunks(Chunk.succeed) - .runFoldZIO(0L) { case (pos, c) => channel.writeChunk(c, pos).as(pos + c.length) } - ) - ) - } yield ()).orDie - - override def execute[T](f: S3AsyncClient => CompletableFuture[T]): IO[S3Exception, T] = - ZIO.dieMessage("Not implemented error - please don't call execute() S3 Test mode") - - override def multipartUpload[R]( - bucketName: String, - key: String, - content: ZStream[R, Throwable, Byte], - options: MultipartUploadOptions - )(parallelism: Int): ZIO[R, S3Exception, Unit] = { - val _contentType = options.uploadOptions.contentType.orElse(Some("binary/octet-stream")) - - for { - _ <- ZIO.dieMessage(s"parallelism must be > 0. $parallelism is invalid").unless(parallelism > 0) - _ <- - ZIO - .dieMessage( - s"Invalid part size ${Math.floor(options.partSize.toDouble / PartSize.Mega.toDouble * 100d) / 100d} Mb, minimum size is ${PartSize.Min / PartSize.Mega} Mb" - ) - .unless(options.partSize >= PartSize.Min) - _ <- putObject( - bucketName, - key, - 0, - content.rechunk(options.partSize), - options.uploadOptions.copy(contentType = _contentType) - ) - } yield () } } } diff --git a/src/main/scala/zio/s3/package.scala b/src/main/scala/zio/s3/package.scala index 4606f928..3fe63d52 100644 --- a/src/main/scala/zio/s3/package.scala +++ b/src/main/scala/zio/s3/package.scala @@ -55,7 +55,7 @@ package object s3 { ) def stub(path: ZPath): ZLayer[Any, Nothing, S3] = - ZLayer.succeed(Test.connect(path)) + ZLayer.fromZIO(Test.connect(path)) def listAllObjects(bucketName: String): S3Stream[S3ObjectSummary] = ZStream.serviceWithStream[S3](_.listAllObjects(bucketName)) From c1ba528372eff777bd14b23d53cd04703b61b6fa Mon Sep 17 00:00:00 2001 From: Regis Leray Date: Wed, 13 Jul 2022 22:47:22 -0400 Subject: [PATCH 3/4] fix minio docker-compose (#327) --- docker-compose.yml | 19 +++++++++---------- src/test/scala/zio/s3/S3Test.scala | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index fedb7d7c..13cecd32 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,39 +2,38 @@ version: "3.7" services: minio: - image: minio/minio + image: quay.io/minio/minio ports: - "9000:9000" + - "9001:9001" volumes: - ./minio/data:/data environment: - - MINIO_ACCESS_KEY=TESTKEY - - MINIO_SECRET_KEY=TESTSECRET + - MINIO_ROOT_USER=TESTKEY + - MINIO_ROOT_PASSWORD=TESTSECRET healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s timeout: 20s retries: 3 - - command: server --compat /data + command: server /data --console-address ":9001" mc: - image: minio/mc + image: quay.io/minio/mc volumes: - ./minio/export:/export depends_on: - minio environment: - - MINIO_ACCESS_KEY=TESTKEY - - MINIO_SECRET_KEY=TESTSECRET + - MINIO_ROOT_USER=TESTKEY + - MINIO_ROOT_PASSWORD=TESTSECRET entrypoint: > /bin/sh -c " echo Waiting for minio service to start...; - curl --retry 10 --retry-delay 10 -s -o /dev/null http://minio:9000/minio/health/live echo Minio is started; - /usr/bin/mc config host add my-minio http://minio:9000 $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY}; + /usr/bin/mc config host add my-minio http://minio:9000 $${MINIO_ROOT_USER} $${MINIO_ROOT_PASSWORD}; /usr/bin/mc mb -p my-minio/bucket-1; /usr/bin/mc mirror export/ my-minio/bucket-1; " \ No newline at end of file diff --git a/src/test/scala/zio/s3/S3Test.scala b/src/test/scala/zio/s3/S3Test.scala index ad2c3ad6..cc4b1707 100644 --- a/src/test/scala/zio/s3/S3Test.scala +++ b/src/test/scala/zio/s3/S3Test.scala @@ -22,7 +22,7 @@ object S3LiveSpec extends ZIOSpecDefault { .live( Region.CA_CENTRAL_1, AwsBasicCredentials.create("TESTKEY", "TESTSECRET"), - Some(URI.create("http://localhost:9000")) + Some(URI.create("http://127.0.0.1:9000")) ) .mapError(TestFailure.die) From 3ce265875cf304b9640389d55ab94cf16bef4c9a Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 14 Jul 2022 07:50:37 +0000 Subject: [PATCH 4/4] Update sbt-bloop to 1.5.2 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 4e901dbf..3f4b2ff0 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.2.24") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.3") -addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0") +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.2") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.0") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.5")