-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement a presignGetObject
method
#500
base: series/2.x
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ jobs: | |
with: | ||
fetch-depth: 0 | ||
- name: Setup Scala and Java | ||
uses: olafurpg/setup-scala@v13 | ||
uses: coursier/setup-action@v1 | ||
- name: Cache scala dependencies | ||
uses: coursier/cache-action@v6 | ||
- name: Lint code | ||
|
@@ -35,7 +35,7 @@ jobs: | |
- name: Checkout current branch | ||
uses: actions/[email protected] | ||
- name: Setup Scala and Java | ||
uses: olafurpg/setup-scala@v13 | ||
uses: coursier/setup-action@v1 | ||
- name: Cache scala dependencies | ||
uses: coursier/cache-action@v6 | ||
- name: Check Document Generation | ||
|
@@ -55,7 +55,7 @@ jobs: | |
with: | ||
fetch-depth: 0 | ||
- name: Setup Scala and Java | ||
uses: olafurpg/setup-scala@v13 | ||
uses: coursier/setup-action@v1 | ||
with: | ||
java-version: ${{ matrix.java }} | ||
- name: Cache scala dependencies | ||
|
@@ -72,16 +72,16 @@ jobs: | |
strategy: | ||
fail-fast: false | ||
matrix: | ||
java: ['17', '21'] | ||
java: ['corretto:17.0.11.9.1', 'corretto:21.0.3.9.1'] | ||
steps: | ||
- name: Checkout current branch | ||
uses: actions/[email protected] | ||
with: | ||
fetch-depth: 0 | ||
- name: Setup Scala and Java | ||
uses: olafurpg/setup-scala@v13 | ||
uses: coursier/setup-action@v1 | ||
with: | ||
java-version: ${{ matrix.java }} | ||
jvm: ${{ matrix.java }} | ||
- name: Cache scala dependencies | ||
uses: coursier/cache-action@v6 | ||
- name: Start containers | ||
|
@@ -108,7 +108,7 @@ jobs: | |
with: | ||
fetch-depth: 0 | ||
- name: Setup Scala and Java | ||
uses: olafurpg/setup-scala@v13 | ||
uses: coursier/setup-action@v1 | ||
- name: Cache scala dependencies | ||
uses: coursier/cache-action@v6 | ||
- name: Release artifacts | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,9 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider | |
import software.amazon.awssdk.core.async.{ AsyncRequestBody, AsyncResponseTransformer, SdkPublisher } | ||
import software.amazon.awssdk.core.exception.SdkException | ||
import software.amazon.awssdk.services.s3.model._ | ||
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder } | ||
import software.amazon.awssdk.services.s3.presigner.S3Presigner | ||
import software.amazon.awssdk.services.s3.presigner.model.{ GetObjectPresignRequest, PresignedGetObjectRequest } | ||
import software.amazon.awssdk.services.s3.{ S3AsyncClient, S3AsyncClientBuilder, S3Configuration } | ||
import zio._ | ||
import zio.interop.reactivestreams._ | ||
import zio.s3.Live.{ StreamAsyncResponseTransformer, StreamResponse } | ||
|
@@ -39,7 +41,7 @@ import scala.jdk.CollectionConverters._ | |
* | ||
* @param unsafeClient: Amazon Async S3 Client | ||
*/ | ||
final class Live(unsafeClient: S3AsyncClient) extends S3 { | ||
final class Live(unsafeClient: S3AsyncClient, s3Presigner: S3Presigner) extends S3 { | ||
|
||
override def createBucket(bucketName: String): IO[S3Exception, Unit] = | ||
execute(_.createBucket(CreateBucketRequest.builder().bucket(bucketName).build())).unit | ||
|
@@ -149,6 +151,29 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { | |
} | ||
.unit | ||
|
||
/** | ||
* This method generates a `PresignedGetObjectRequest` containing an URL that allows the object to be downloaded | ||
* without any credentials. Signature happens locally, it does not issue any network call. It's not a pure | ||
* computation neither as it performs an IO by looking at the current time. | ||
* | ||
* See https://web.archive.org/web/20240621234636/https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html | ||
* for further details regarding signature with S3. | ||
*/ | ||
override def presignGetObject( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i dont think this code should be in the client. since there is nothing related to this. you could move this into the extra utils function
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, the reason why we chose to put it in the client is that the configuration required is very similiar to the S3 client one. |
||
bucketName: String, | ||
key: String, | ||
signatureDuration: Duration | ||
): IO[S3Exception, PresignedGetObjectRequest] = | ||
ZIO.succeed { | ||
s3Presigner.presignGetObject( | ||
GetObjectPresignRequest | ||
.builder() | ||
.signatureDuration(signatureDuration) | ||
.getObjectRequest(GetObjectRequest.builder().bucket(bucketName).key(key).build()) | ||
.build() | ||
) | ||
} | ||
|
||
def multipartUpload[R]( | ||
bucketName: String, | ||
key: String, | ||
|
@@ -227,6 +252,7 @@ final class Live(unsafeClient: S3AsyncClient) extends S3 { | |
case s3: S3Exception => s3 | ||
case sdk: SdkException => SdkError(sdk) | ||
} | ||
|
||
} | ||
|
||
object Live { | ||
|
@@ -238,23 +264,39 @@ object Live { | |
forcePathStyle: Option[Boolean] = None | ||
): ZIO[R with Scope, ConnectionError, S3] = | ||
for { | ||
credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause)) | ||
builder <- ZIO.succeed { | ||
val builder = S3AsyncClient | ||
.builder() | ||
.credentialsProvider(credentials) | ||
.region(region.region) | ||
uriEndpoint.foreach(builder.endpointOverride) | ||
forcePathStyle.foreach(builder.forcePathStyle(_)) | ||
builder | ||
} | ||
service <- connect(builder) | ||
credentials <- provider.mapError(e => ConnectionError(e.getMessage, e.getCause)) | ||
builder <- ZIO.succeed { | ||
val builder = S3AsyncClient | ||
.builder() | ||
.credentialsProvider(credentials) | ||
.region(region.region) | ||
uriEndpoint.foreach(builder.endpointOverride) | ||
forcePathStyle.foreach(builder.forcePathStyle(_)) | ||
builder | ||
} | ||
s3PresignerBuilder <- ZIO.succeed { | ||
val builder = S3Presigner | ||
.builder() | ||
.credentialsProvider(credentials) | ||
.region(region.region) | ||
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) | ||
uriEndpoint.foreach(builder.endpointOverride) | ||
builder | ||
} | ||
service <- connect(builder, s3PresignerBuilder) | ||
} yield service | ||
|
||
def connect[R](builder: S3AsyncClientBuilder): ZIO[R with Scope, ConnectionError, S3] = | ||
ZIO | ||
.fromAutoCloseable(ZIO.attempt(builder.build())) | ||
.mapBoth(e => ConnectionError(e.getMessage, e.getCause), new Live(_)) | ||
def connect[R]( | ||
builder: S3AsyncClientBuilder, | ||
s3PresignerBuilder: S3Presigner.Builder | ||
): ZIO[R with Scope, ConnectionError, S3] = | ||
for { | ||
s3Client <- ZIO | ||
.fromAutoCloseable(ZIO.attempt(builder.build())) | ||
.mapError(e => ConnectionError(e.getMessage, e.getCause)) | ||
s3Presigner <- ZIO.fromAutoCloseable(ZIO.succeed(s3PresignerBuilder.build())) | ||
|
||
} yield new Live(s3Client, s3Presigner) | ||
|
||
type StreamResponse = ZStream[Any, Throwable, Chunk[Byte]] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
this is a log file | ||
on multi line |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package zio.s3 | ||
|
||
import com.dimafeng.testcontainers.MinIOContainer | ||
import org.testcontainers.utility.DockerImageName | ||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials | ||
import software.amazon.awssdk.regions.Region | ||
import software.amazon.awssdk.services.s3.model.S3Exception | ||
import sttp.client3 | ||
import sttp.client3.httpclient.zio.HttpClientZioBackend | ||
import sttp.client3.{ SttpBackend, basicRequest } | ||
import sttp.model.Uri | ||
import zio.stream.ZStream | ||
import zio.test.Assertion._ | ||
import zio.test._ | ||
import zio.{ Chunk, Scope, Task, ZIO, ZLayer, durationInt, s3 } | ||
|
||
import java.net.{ URI, URL } | ||
|
||
object S3PresignedTest extends ZIOSpecDefault { | ||
|
||
val bucketName = "initial-bucket" | ||
val objectKey = "path/to/object" | ||
|
||
val spec = (suite("S3Presigned")( | ||
test("presignGetObject should return an url") { | ||
for { | ||
s3 <- ZIO.service[S3] | ||
url <- s3.presignGetObject(bucketName, objectKey, 1.minute) | ||
} yield assert(url.url.toExternalForm)( | ||
containsString(bucketName) && | ||
containsString(objectKey) && | ||
containsString("X-Amz-Expires=60") && | ||
containsString("X-Amz-Signature") | ||
) | ||
}, | ||
test("presignGetObject url should be downloadable") { | ||
|
||
for { | ||
s3 <- ZIO.service[S3] | ||
fileContent <- putObject(bucketName, "my-new-key") | ||
url <- s3.presignGetObject(bucketName, "my-new-key", 1.minute) | ||
actual <- getUrlContent(url.url) | ||
expected <- fileContent.runCollect | ||
} yield assert(Chunk.fromArray(actual))(equalTo(expected)) | ||
} | ||
) @@ TestAspect.before(s3.createBucket(bucketName) *> putObject(bucketName, objectKey))) | ||
.provideSome[Scope](minio, zioS3Layer, HttpClientZioBackend.layer()) | ||
|
||
def getUrlContent(url: URL): ZIO[SttpBackend[Task, Any], Throwable, Array[Byte]] = | ||
for { | ||
url <- ZIO.fromEither(Uri.parse(url.toExternalForm)).orDieWith(e => new Throwable(e)) | ||
request = basicRequest.response(client3.asByteArrayAlways).get(url) | ||
result <- ZIO.serviceWithZIO[SttpBackend[Task, Any]](_.send(request)).map(_.body) | ||
} yield result | ||
|
||
def putObject(bucketName: String, key: String): ZIO[S3, Throwable, ZStream[Any, Throwable, Byte]] = { | ||
val filePath = ClassLoader.getSystemResource("console.log").getFile | ||
val fileContent = ZStream.fromFileName(filePath) | ||
for { | ||
fileLength <- fileContent.runCount | ||
_ <- s3.putObject(bucketName = bucketName, key = key, contentLength = fileLength, content = fileContent) | ||
} yield fileContent | ||
} | ||
|
||
def zioS3Layer: ZLayer[MinIOContainer, S3Exception, S3] = | ||
ZLayer { | ||
for { | ||
minio <- ZIO.service[MinIOContainer] | ||
} yield s3.live( | ||
region = Region.US_WEST_1, | ||
credentials = AwsBasicCredentials.create(minio.userName, minio.password), | ||
uriEndpoint = Some(URI.create(minio.s3URL)), | ||
forcePathStyle = Some(true) | ||
) | ||
}.flatten | ||
|
||
def minio: ZLayer[Scope, Nothing, MinIOContainer] = | ||
ZLayer { | ||
ZIO | ||
.attemptBlocking( | ||
new MinIOContainer(dockerImageName = DockerImageName.parse("minio/minio:RELEASE.2024-06-22T05-26-45Z")) | ||
) | ||
.tap(minio => ZIO.attemptBlockingIO(minio.start())) | ||
.withFinalizerAuto | ||
}.orDie | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove test container, we already have a minio in place with docker compose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Regis,
Actually we find that handy to not have a launch containers by hand before running tests.
Could you elaborate on why you don't like it, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For multiple reason: slow down the execution, more code to add in unit-test, more librairies dependencies etc...
Prefer to stick to docker-compose, since it works pretty well.
thank you