Skip to content

Commit

Permalink
Migrate to ZIO 2.0.0-RC6 (#311)
Browse files Browse the repository at this point in the history
* Migrate to ZIO 2.0.0-RC6
  • Loading branch information
ghostdogpr authored Jun 23, 2022
1 parent f89ee07 commit 2589262
Show file tree
Hide file tree
Showing 18 changed files with 488 additions and 509 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ inThisBuild(
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "1.0.13"
val zioVersion = "2.0.0-RC6"
val awsVersion = "2.16.61"

lazy val `zio-s3` = project
Expand All @@ -35,8 +35,8 @@ lazy val `zio-s3` = project
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-nio" % "1.0.0-RC12",
"dev.zio" %% "zio-interop-reactivestreams" % "1.3.12",
"dev.zio" %% "zio-nio" % "2.0.0-RC7",
"dev.zio" %% "zio-interop-reactivestreams" % "2.0.0-RC7",
"software.amazon.awssdk" % "s3" % awsVersion,
"software.amazon.awssdk" % "sts" % awsVersion,
"dev.zio" %% "zio-test" % zioVersion % Test,
Expand Down
10 changes: 5 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ services:

mc:
image: minio/mc
volumes:
- ./minio/export:/export
depends_on:
- minio
environment:
Expand All @@ -28,13 +30,11 @@ services:
entrypoint: >
/bin/sh -c "
echo Waiting for minio service to start...;
apk add --no-cache curl;
while ! curl -I 'http://minio:9000/minio/health/live' 2>&1 | grep -w '200\|301';
do
sleep 10;
done;
curl --retry 10 --retry-delay 10 -s -o /dev/null http://minio:9000/minio/health/live
echo Minio is started;
/usr/bin/mc config host add my-minio http://minio:9000 $${MINIO_ACCESS_KEY} $${MINIO_SECRET_KEY};
/usr/bin/mc mb -p my-minio/bucket-1;
/usr/bin/mc mirror export/ my-minio/bucket-1;
"
28 changes: 11 additions & 17 deletions docs/quickstart/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ZIO-S3 is a thin wrapper over the s3 async java client. It exposes the main oper

```scala
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import zio.{Chunk, ZManaged}
import zio.Chunk
import zio.s3._
import zio.stream.{ZSink, ZStream}
import software.amazon.awssdk.services.s3.model.S3Exception
Expand All @@ -32,7 +32,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception

// list all objects of all buckets
val l2: ZStream[S3, S3Exception, String] = (for {
bucket <- ZStream.fromIterableM(listBuckets)
bucket <- ZStream.fromIterableZIO(listBuckets)
obj <- listAllObjects(bucket.name)
} yield obj.bucketName + "/" + obj.key).provideLayer(
live("us-east-1", AwsBasicCredentials.create("accessKeyId", "secretAccessKey"))
Expand All @@ -51,7 +51,6 @@ If credentials cannot be found in one or multiple providers selected the operati
```scala
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import zio._
import zio.blocking._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.S3Exception
import zio.s3._
Expand All @@ -63,17 +62,17 @@ val s3: Layer[S3Exception, S3] =

// build S3 Layer from System properties or Environment variables
val s3: Layer[S3Exception, S3] =
liveM(Region.AF_SOUTH_1, system <> env)
liveZIO(Region.AF_SOUTH_1, system <> env)

// build S3 Layer from Instance profile credentials
val s3: Layer[S3Exception, S3] =
liveM(Region.AF_SOUTH_1, instanceProfile)
liveZIO(Region.AF_SOUTH_1, instanceProfile)

// build S3 Layer from web identity token credentials with STS. awssdk sts module required to be on classpath
val s3: Layer[S3Exception, S3] = liveM(Region.AF_SOUTH_1, webIdentity)
val s3: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, webIdentity)

// build S3 Layer from default available credentials providers
val s3: Layer[S3Exception, S3] = liveM(Region.AF_SOUTH_1, default)
val s3: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, default)

// use custom logic to fetch aws credentials
val zcredentials: ZIO[R, S3Exception, AwsCredentials] = ??? // specific implementation to fetch credentials
Expand All @@ -90,20 +89,16 @@ a stub implementation of s3 storage is provided for testing purpose and use inte
```scala
import zio.nio.core.file.{Path => ZPath}
import zio.s3._
import zio.blocking.Blocking

// required to provide a Blocking context
val stub: ZLayer[Blocking, Any, S3] = stub(ZPath("/tmp/s3-data"))

// use a Blocking context to build s3 Layer
val stubS3: ZLayer[Any, Any, S3] = Blocking.live >>> stub(ZPath("/tmp/s3-data"))
// build s3 Layer
val stubS3: ZLayer[Any, Nothing, S3] = stub(ZPath("/tmp/s3-data"))

// list all buckets available by using S3 Stub Layer
// will list all directories of `/tmp/s3-data`
listBuckets.provideLayer(stubS3)
```

More informations here how to use [ZLayer https://zio.dev/docs/howto/howto_use_layers](https://zio.dev/docs/howto/howto_use_layers)
More information here on how to use [ZLayer https://zio.dev/docs/howto/howto_use_layers](https://zio.dev/docs/howto/howto_use_layers)


Examples
Expand All @@ -112,7 +107,6 @@ Examples
```scala
import software.amazon.awssdk.services.s3.model.S3Exception
import zio._
import zio.blocking.Blocking
import zio.stream.{ ZSink, ZStream }
import zio.s3._

Expand All @@ -131,7 +125,7 @@ import java.io.FileInputStream
import java.nio.file.Paths

val is = ZStream.fromInputStream(new FileInputStream(Paths.get("/my/path/to/myfile.zip").toFile))
val proc2: ZIO[S3 with Blocking, S3Exception, Unit] =
val proc2: ZIO[S3, S3Exception, Unit] =
multipartUpload(
"bucket-1",
"upload/myfile.zip",
Expand All @@ -143,7 +137,7 @@ val proc2: ZIO[S3 with Blocking, S3Exception, Unit] =
import java.io.OutputStream

val os: OutputStream = ???
val proc3: ZIO[Blocking with S3, Exception, Long] = getObject("bucket-1", "upload/myfile.zip").run(ZSink.fromOutputStream(os))
val proc3: ZIO[S3, Exception, Long] = getObject("bucket-1", "upload/myfile.zip").run(ZSink.fromOutputStream(os))
```

Support any commands ?
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
31 changes: 15 additions & 16 deletions src/main/scala/zio/s3/Live.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._
*
* @param unsafeClient: Amazon Async S3 Client
*/
final class Live(unsafeClient: S3AsyncClient) extends S3.Service {
final class Live(unsafeClient: S3AsyncClient) extends S3 {

override def createBucket(bucketName: String): IO[S3Exception, Unit] =
execute(_.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())).unit
Expand All @@ -47,9 +47,9 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {

override def isBucketExists(bucketName: String): IO[S3Exception, Boolean] =
execute(_.headBucket(HeadBucketRequest.builder().bucket(bucketName).build()))
.map(_ => true)
.as(true)
.catchSome {
case _: NoSuchBucketException => Task.succeed(false)
case _: NoSuchBucketException => ZIO.succeed(false)
}

override val listBuckets: IO[S3Exception, S3BucketListing] =
Expand All @@ -58,7 +58,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {

override def getObject(bucketName: String, key: String): Stream[S3Exception, Byte] =
ZStream
.fromEffect(
.fromZIO(
execute(
_.getObject[StreamResponse](
GetObjectRequest.builder().bucket(bucketName).key(key).build(),
Expand Down Expand Up @@ -176,9 +176,9 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {
).map(_.uploadId())

parts <- ZStream
.managed(
.scoped[R](
content
.chunkN(options.partSize)
.rechunk(options.partSize)
.mapChunks(Chunk.single)
.peel(ZSink.head[Chunk[Byte]])
)
Expand All @@ -187,7 +187,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3.Service {
case (None, _) => ZStream(Chunk.empty)
}
.zipWithIndex
.mapMPar(parallelism) {
.mapZIOPar(parallelism) {
case (chunk, partNumber) =>
execute(
_.uploadPart(
Expand Down Expand Up @@ -233,12 +233,12 @@ object Live {

def connect[R](
region: S3Region,
provider: RManaged[R, AwsCredentialsProvider],
provider: RIO[R with Scope, AwsCredentialsProvider],
uriEndpoint: Option[URI]
): ZManaged[R, ConnectionError, S3.Service] =
): ZIO[R with Scope, ConnectionError, S3] =
for {
credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause))
builder <- ZManaged.succeed {
builder <- ZIO.succeed {
val builder = S3AsyncClient
.builder()
.credentialsProvider(credentials)
Expand All @@ -249,11 +249,10 @@ object Live {
service <- connect(builder)
} yield service

def connect[R](builder: S3AsyncClientBuilder): ZManaged[R, ConnectionError, S3.Service] =
ZManaged
.fromAutoCloseable(Task(builder.build()))
.map(new Live(_))
.mapError(e => ConnectionError(e.getMessage, e.getCause))
def connect[R](builder: S3AsyncClientBuilder): ZIO[R with Scope, ConnectionError, S3] =
ZIO
.fromAutoCloseable(ZIO.attempt(builder.build()))
.mapBoth(e => ConnectionError(e.getMessage, e.getCause), new Live(_))

type StreamResponse = ZStream[Any, Throwable, Chunk[Byte]]

Expand All @@ -264,7 +263,7 @@ object Live {
override def onResponse(response: GetObjectResponse): Unit = ()

override def onStream(publisher: SdkPublisher[ByteBuffer]): Unit = {
cf.complete(publisher.toStream().map(Chunk.fromByteBuffer))
cf.complete(publisher.toZIOStream().map(Chunk.fromByteBuffer))
()
}

Expand Down
Loading

0 comments on commit 2589262

Please sign in to comment.