Skip to content

Commit

Permalink
update ZioJdbcContext.transaction to keep existing errors and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
endertunc committed Apr 2, 2023
1 parent e6c55fa commit f34ee1f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 43 deletions.
47 changes: 24 additions & 23 deletions quill-jdbc-zio/src/main/scala/io/getquill/context/ZioJdbc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,58 +74,59 @@ object ZioJdbc {
}
}

implicit class QuillZioDataSourceExt[T](qzio: ZIO[DataSource, Throwable, T]) {
implicit class QuillZioDataSourceExt[T, E](qzio: ZIO[DataSource, E, T]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, SQLException, T] =
(for {
def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, E, T] =
for {
q <- qzio.provideEnvironment(ZEnvironment(implicitEnv.env))
} yield q).refineToOrDie[SQLException]
} yield q
}

implicit class QuillZioSomeDataSourceExt[T, R](qzio: ZIO[DataSource with R, Throwable, T])(implicit tag: Tag[R]) {
implicit class QuillZioSomeDataSourceExt[T, E, R](qzio: ZIO[DataSource with R, E, T])(implicit tag: Tag[R]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def implicitSomeDS(implicit implicitEnv: Implicit[DataSource]): ZIO[R, SQLException, T] =
(for {
def implicitSomeDS(implicit implicitEnv: Implicit[DataSource]): ZIO[R, E, T] =
for {
r <- ZIO.environment[R]
q <- qzio
.provideSomeLayer[DataSource](ZLayer.succeedEnvironment(r))
.provideEnvironment(ZEnvironment(implicitEnv.env))
} yield q).refineToOrDie[SQLException]
} yield q
}

implicit class QuillZioExtPlain[E, T](qzio: ZIO[Connection, E, T]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def onDataSource: ZIO[DataSource, E | SQLException, T] = qzio.provideSomeLayer(Quill.Connection.acquireScoped)

def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, E | SQLException, T] = {
val zioWithDataSource: ZIO[DataSource, E | SQLException, T] = qzio.provideSomeLayer(Quill.Connection.acquireScoped)
zioWithDataSource.provideEnvironment(ZEnvironment(implicitEnv.env))
}
def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, E | SQLException, T] =
onDataSource.provideEnvironment(ZEnvironment(implicitEnv.env))
}

implicit class QuillZioExt[T, R](qzio: ZIO[Connection with R, Throwable, T])(implicit tag: Tag[R]) {
implicit class QuillZioExt[T, E, R](qzio: ZIO[Connection with R, E, T])(implicit tag: Tag[R]) {
/**
* Change `Connection` of a QIO to `DataSource with Closeable` by providing a `DataSourceLayer.live` instance
* which will grab a connection from the data-source, perform the QIO operation, and the immediately release the connection.
* This is used for data-sources that have pooled connections e.g. Hikari.
* {{{
* def ds: DataSource with Closeable = ...
* run(query[Person]).onDataSource.provide(Has(ds))
* run(query[Person]).onDataSource.provide(ds)
* }}}
*/
def onSomeDataSource: ZIO[DataSource with R, SQLException, T] =
(for {
def onSomeDataSource: ZIO[DataSource with R, E | SQLException, T] =
for {
r <- ZIO.environment[R]
q <- qzio
.provideSomeLayer[Connection](ZLayer.succeedEnvironment(r))
.provideSomeLayer(Quill.Connection.acquireScoped)
} yield q).refineToOrDie[SQLException]
// This needs to be typed explicitly
z: ZIO[DataSource, E | SQLException, T] =
qzio
.provideSomeLayer[Connection](ZLayer.succeedEnvironment(r))
.provideSomeLayer(Quill.Connection.acquireScoped)
q <- z
} yield q
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.context.{ContextVerbStream, ExecutionInfo, ProtoContextSecundus}
import zio.Exit.{Failure, Success}
import zio.stream.ZStream
import zio.{FiberRef, Runtime, UIO, Unsafe, ZEnvironment, ZIO}
import zio.{FiberRef, Runtime, Scope, UIO, Unsafe, ZEnvironment, ZIO}

import java.sql.{Array as _, *}
import javax.sql.DataSource
Expand All @@ -23,11 +23,11 @@ import zio.ZIO.blocking
* as a resource dependency which can be provided later (see `ZioJdbc` for helper methods
* that assist in doing this).
*
* The resource dependency itself is just a `Has[Connection]`. Since this is frequently used
* The type `QIO[T]` i.e. Quill-IO has been defined as an alias for `ZIO[Has[Connection], SQLException, T]`.
* The resource dependency itself is just a `Connection`. Since this is frequently used
* The type `QIO[T]` i.e. Quill-IO has been defined as an alias for `ZIO[Connection, SQLException, T]`.
*
* Since in most JDBC use-cases, a connection-pool datasource i.e. Hikari is used it would actually
* be much more useful to interact with `ZIO[Has[DataSource], SQLException, T]`.
* be much more useful to interact with `ZIO[DataSource, SQLException, T]`.
* The extension method `.onDataSource` in `io.getquill.context.ZioJdbc.QuillZioExt` will perform this conversion
* (for even more brevity use `onDS` which is an alias for this method).
* {{
Expand All @@ -41,7 +41,7 @@ import zio.ZIO.blocking
* Runtime.default.unsafeRun(MyZioContext.run(query[Person]).ContextTranslateProtoprovideLayer(zioDS))
* }}
*
* Note however that the one exception to these cases are the `prepare` methods where a `ZIO[Has[Connection], SQLException, PreparedStatement]`
* Note however that the one exception to these cases are the `prepare` methods where a `ZIO[Connection, SQLException, PreparedStatement]`
* is being returned. In those situations the acquire-action-release pattern does not make any sense because the `PrepareStatement`
* is only held open while it's host-connection exists.
*/
Expand Down Expand Up @@ -162,12 +162,12 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
* Execute instructions in a transaction. For example, to add a Person row to the database and return
* the contents of the Person table immediately after that:
* {{{
* val a = run(query[Person].insert(Person(...)): ZIO[Has[DataSource], SQLException, Long]
* val b = run(query[Person]): ZIO[Has[DataSource], SQLException, Person]
* transaction(a *> b): ZIO[Has[DataSource], SQLException, Person]
* val a = run(query[Person].insert(Person(...)): ZIO[DataSource, SQLException, Long]
* val b = run(query[Person]): ZIO[DataSource, SQLException, Person]
* transaction(a *> b): ZIO[DataSource, SQLException, Person]
* }}}
*
* The order of operations run in the case that a new connection needs to be aquired are as follows:
* The order of operations run in the case that a new connection needs to be acquired are as follows:
* <pre>
* getDS from env,
* acquire-connection,
Expand All @@ -179,14 +179,14 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
* release-conn
* </pre>
*/
def transaction[R <: DataSource, A](op: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = {
def transaction[R <: DataSource, E, A](op: ZIO[R, E, A]): ZIO[R, E | SQLException, A] = {
blocking(currentConnection.get.flatMap {
// We can just return the op in the case that there is already a connection set on the fiber ref
// because the op is execute___ which will lookup the connection from the fiber ref via onConnection/onConnectionStream
// This will typically happen for nested transactions e.g. transaction(transaction(a *> b) *> c)
case Some(connection) => op
case Some(_) => op
case None =>
val connection = for {
val connection: ZIO[Scope with DataSource, SQLException, Unit] = (for {
env <- ZIO.service[DataSource]
connection <- scopedBestEffort(attemptBlocking(env.getConnection))
// Get the current value of auto-commit
Expand All @@ -197,7 +197,7 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
attemptBlocking(connection.setAutoCommit(prevAutoCommit)).orDie
}
_ <- ZIO.acquireRelease(currentConnection.set(Some(connection))) { _ =>
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggresive.
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggressive.
// If the connection pool e.g. Hikari resets this property for a recycled connection anyway doing it here
// might not be necessary
currentConnection.set(None)
Expand All @@ -207,7 +207,7 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
case Success(_) => blocking(ZIO.succeed(connection.commit()))
case Failure(cause) => blocking(ZIO.succeed(connection.rollback()))
}
} yield ()
} yield ()).refineToOrDie[SQLException]

ZIO.scoped(connection *> op)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ trait QuillBaseContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] extends
* Execute instructions in a transaction. For example, to add a Person row to the database and return
* the contents of the Person table immediately after that:
* {{{
* val a = run(query[Person].insert(Person(...)): ZIO[Has[DataSource], SQLException, Long]
* val b = run(query[Person]): ZIO[Has[DataSource], SQLException, Person]
* transaction(a *> b): ZIO[Has[DataSource], SQLException, Person]
* val a = run(query[Person].insert(Person(...)): ZIO[DataSource, SQLException, Long]
* val b = run(query[Person]): ZIO[DataSource, SQLException, Person]
* transaction(a *> b): ZIO[DataSource, SQLException, Person]
* }}}
*/
def transaction[R, A](op: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] =
def transaction[R, E ,A](op: ZIO[R, E, A]): ZIO[R, E | SQLException, A] =
dsDelegate.transaction(op).provideSomeEnvironment[R]((env: ZEnvironment[R]) => env.add[DataSource](ds: DataSource))

private def onDS[T](qio: ZIO[DataSource, SQLException, T]): ZIO[Any, SQLException, T] =
private def onDS[T, E](qio: ZIO[DataSource, E, T]): ZIO[Any, E, T] =
qio.provideEnvironment(ZEnvironment(ds))

private def onDSStream[T](qstream: ZStream[DataSource, SQLException, T]): ZStream[Any, SQLException, T] =
private def onDSStream[T, E](qstream: ZStream[DataSource, E, T]): ZStream[Any, E, T] =
qstream.provideEnvironment(ZEnvironment(ds))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import zio.{ ZIO, ZLayer }
import io.getquill.context.ZioJdbc._
import io.getquill._

import java.sql.Connection
import javax.sql.DataSource

class OnDataSourceSpec extends PeopleZioProxySpec {
Expand Down Expand Up @@ -56,6 +57,15 @@ class OnDataSourceSpec extends PeopleZioProxySpec {

people mustEqual peopleEntries.filter(p => p.name == "Alex")
}
"should keep existing errors" in {
// This is how you import the encoders/decoders of `underlying` context without importing things that will conflict
// i.e. the quote and run methods
import testContext.underlying.{prepare => _, run => _, _}
import java.sql.SQLException

val zioThatCanFail: ZIO[Connection, String, Nothing] = ZIO.service[Connection] *> ZIO.fail("Custom Error")
"val result: ZIO[DataSource, String | SQLException, List[Person]] = zioThatCanFail.onDataSource" should compile
}
}

"implicitDS on underlying context" - {
Expand Down Expand Up @@ -99,5 +109,19 @@ class OnDataSourceSpec extends PeopleZioProxySpec {
svc <- ZIO.attempt(Service(ds))
} yield (svc.people)).runSyncUnsafe() mustEqual peopleEntries.filter(p => p.name == "Alex")
}
"should keep existing errors" in {
// This is how you import the encoders/decoders of `underlying` context without importing things that will conflict
// i.e. the quote and run methods
import testContext.underlying.{prepare => _, run => _, _}
import java.sql.SQLException

(for {
ds <- ZIO.service[DataSource]
given Implicit[DataSource] = Implicit(ds)
} yield {
val zioThatCanFail: ZIO[DataSource, String, Nothing] = ZIO.service[DataSource] *> ZIO.fail("Custom Error")
"val result: ZIO[DataSource, String | SQLException, List[Person]] = zioThatCanFail.implicitDS" should compile
}).runSyncUnsafe()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class ZioJdbcUnderlyingContextSpec extends ZioProxySpec {
r <- testContext.underlying.run(qr1)
} yield r).onDataSource.runSyncUnsafe().map(_.i) mustEqual List(33)
}
"keep existing errors" in {
import java.sql.{Connection, SQLException}

val zioThatCanFail: ZIO[Any, String, Nothing] = ZIO.fail("Custom Error")
"val result: ZIO[Connection, String | SQLException, List[TestEntity]] = testContext.underlying.transaction(zioThatCanFail *> testContext.underlying.run(qr1))" must compile
}
// "prepare" in {
// testContext.underlying.prepareParams(
// "select * from Person where name=? and age > ?", (ps, session) => (List("Sarah", 127), ps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,9 @@ class ZioJdbcContextSpec extends ZioSpec {
"select * from Person where name=? and age > ?", (ps, session) => (List("Sarah", 127), ps)
).runSyncUnsafe() mustEqual List("127", "'Sarah'")
}
"keep existing errors" in {
val zioThatCanFail: ZIO[Any, String, Nothing] = ZIO.fail("Custom Error")
"val result: ZIO[Any, String | SQLException, List[TestEntity]] = testContext.transaction(zioThatCanFail *> testContext.run(qr1))" must compile
}
}
}

0 comments on commit f34ee1f

Please sign in to comment.