diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractDatabaseIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractDatabaseIntegrationTest.scala index baf445ef9c38..483ea9484178 100644 --- a/ledger-service/http-json/src/itlib/scala/http/AbstractDatabaseIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractDatabaseIntegrationTest.scala @@ -249,4 +249,59 @@ abstract class AbstractDatabaseIntegrationTest extends AsyncFreeSpecLike with Be } } + "retrySqlStates" - { + import dao.logHandler, /*dao.jdbcDriver.retrySqlStates, */ dao.jdbcDriver.q.queries + import com.daml.fetchcontracts.util.AkkaStreamsDoobie.connectionIOFuture + import java.util.{concurrent => juc}, juc.TimeUnit.SECONDS + "detects deadlocks and retries" in { + import dbbackend.Queries.DBContract, spray.json.{JsObject, JsNull, JsValue}, + spray.json.DefaultJsonProtocol._ + val tpId = TemplateId("pkg", "mod", "DRetry") + val preInsert = new juc.CountDownLatch(2) + val beforeCommit = new juc.CountDownLatch(2) + def waitFor(c: juc.CountDownLatch) = connectionIOFuture(Future { + c.countDown() + c.await(5, SECONDS) + }) + + def dbupdates(left: Boolean) = instanceUUIDLogCtx { implicit lc => + for { + _ <- if (left) fconn.pure(()) else waitFor(preInsert) + stid <- queries.surrogateTemplateId(tpId.packageId, tpId.moduleName, tpId.entityName) + _ <- if (left) waitFor(preInsert) else fconn.pure(()) + contract = DBContract( + contractId = "foo", + templateId = stid, + key = JsNull: JsValue, + keyHash = None, + payload = JsObject(): JsValue, + signatories = Seq("Alice"), + observers = Seq.empty, + agreementText = "", + ) + _ <- if (false) queries.insertContracts(List(contract)) else fconn.pure(()) + _ <- if (left) fconn.pure(()) else waitFor(beforeCommit) + _ <- queries.updateOffset(List("Alice"), stid, "fortytwo", Map.empty) + _ <- if (left) waitFor(beforeCommit) else fconn.pure(()) + _ <- fconn.commit + } yield () + } + + def runAsyncProbably[A](ca: fconn.ConnectionIO[A]) = + dao + .transact(fconn.evalOn(implicitly[scala.concurrent.ExecutionContext])(ca)) + .unsafeToFuture() + + dao + .transact(for { + _ <- queries.dropAllTablesIfExist + _ <- queries.initDatabase + _ <- fconn.commit + } yield ()) + .unsafeToFuture() flatMap (_ => + runAsyncProbably(dbupdates(true)) zip runAsyncProbably(dbupdates(false)) + map (_ should ===(((), ()))) + ) + } + } }