diff --git a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala index 648345408d4b..258748778599 100644 --- a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala +++ b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala @@ -32,7 +32,7 @@ sealed abstract class Queries { type SqlInterpol - protected[this] def dropTableIfExists(table: String): Fragment + protected[this] def dropIfExists(drop: Droppable): Fragment /** for use when generating predicates */ protected[this] val contractColumnName: Fragment = sql"payload" @@ -101,7 +101,7 @@ sealed abstract class Queries { private[http] def dropAllTablesIfExist(implicit log: LogHandler): ConnectionIO[Unit] = { import cats.instances.vector._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps} initDatabaseDdls.reverse - .collect { case CreateTable(name, _) => dropTableIfExists(name) } + .collect { case d: Droppable => dropIfExists(d) } .traverse_(_.update.run) } @@ -329,8 +329,15 @@ object Queries { } private[dbbackend] object InitDdl { - final case class CreateTable(name: String, create: Fragment) extends InitDdl + // something that we must drop to re-create the DB + sealed abstract class Droppable(val what: String) extends InitDdl { + val name: String + } + final case class CreateTable(name: String, create: Fragment) extends Droppable("TABLE") + final case class CreateMaterializedView(name: String, create: Fragment) + extends Droppable("MATERIALIZED VIEW") final case class CreateIndex(create: Fragment) extends InitDdl + final case class DoMagicSetup(create: Fragment) extends InitDdl } /** Whether selectContractsMultiTemplate computes a matchedQueries marker, @@ -449,13 +456,13 @@ object Queries { } private object PostgresQueries extends Queries { - import Queries._, Queries.InitDdl.CreateIndex + import Queries._, Queries.InitDdl.{Droppable, CreateIndex} import Implicits._ type SqlInterpol = Queries.SqlInterpolation.StringArray - protected[this] override def dropTableIfExists(table: String) = - Fragment.const(s"DROP TABLE IF EXISTS ${table}") + protected[this] override def dropIfExists(d: Droppable) = + Fragment.const(s"DROP ${d.what} IF EXISTS ${d.name}") protected[this] override def bigIntType = sql"BIGINT" protected[this] override def bigSerialType = sql"BIGSERIAL" @@ -581,26 +588,32 @@ private object PostgresQueries extends Queries { } private object OracleQueries extends Queries { - import Queries._ + import Queries._, InitDdl._ import Implicits._ type SqlInterpol = Queries.SqlInterpolation.Unused - protected[this] override def dropTableIfExists(table: String) = sql"""BEGIN - EXECUTE IMMEDIATE 'DROP TABLE ' || $table; + protected[this] override def dropIfExists(d: Droppable) = { + val sqlCode = d match { + case _: CreateTable => -942 + case _: CreateMaterializedView => -12003 + } + sql"""BEGIN + EXECUTE IMMEDIATE ${s"DROP ${d.what} ${d.name}"}; EXCEPTION WHEN OTHERS THEN - IF SQLCODE != -942 THEN + IF SQLCODE != $sqlCode THEN RAISE; END IF; END;""" + } protected[this] override def bigIntType = sql"NUMBER(19,0)" protected[this] override def bigSerialType = bigIntType ++ sql" GENERATED ALWAYS AS IDENTITY" protected[this] override def textType = sql"NVARCHAR2(100)" protected[this] override def packageIdType = sql"NVARCHAR2(64)" - protected[this] override def partyOffsetContractIdType = sql"NVARCHAR2(255)" + protected[this] override def partyOffsetContractIdType = sql"VARCHAR2(255)" // if >=1578: ORA-01450: maximum key length (6398) exceeded protected[this] override def nameType = sql"NVARCHAR2(1562)" protected[this] override def agreementTextType = sql"NCLOB" @@ -610,10 +623,30 @@ private object OracleQueries extends Queries { protected[this] override def contractsTableSignatoriesObservers = sql""" - ,""" ++ jsonColumn(sql"signatories") ++ sql""" - ,""" ++ jsonColumn(sql"observers") ++ sql""" + ,${jsonColumn(sql"signatories")} + ,${jsonColumn(sql"observers")} """ + private[this] def stakeholdersPrep = DoMagicSetup( + sql"""CREATE MATERIALIZED VIEW LOG ON contract""" + ) + + private[this] def stakeholdersView = CreateMaterializedView( + "contract_stakeholders", + sql"""CREATE MATERIALIZED VIEW contract_stakeholders + BUILD IMMEDIATE REFRESH FAST ON COMMIT AS + SELECT contract_id, stakeholder FROM contract, + json_table(json_array(signatories, observers), '$$[*][*]' + columns (stakeholder $partyType path '$$'))""", + ) + + private[this] def stakeholdersIndex = CreateIndex( + sql"""CREATE INDEX stakeholder_idx ON contract_stakeholders (stakeholder)""" + ) + + protected[this] override def initDatabaseDdls = + super.initDatabaseDdls ++ Seq(stakeholdersPrep, stakeholdersView, stakeholdersIndex) + protected[this] type DBContractKey = JsValue protected[this] override def toDBContractKey[CK: JsonWriter](x: CK): DBContractKey = @@ -660,16 +693,19 @@ private object OracleQueries extends Queries { fr" OR ", ) } - val quotedParties = parties.toVector.map(p => s""""$p"""").mkString(", ") - val partiesQuery = oracleShortPathEscape( - '$' -: ("[*]?(@ in (": Cord) :+ quotedParties :+ "))" - ) - val q = - sql"""SELECT c.contract_id contract_id, $tpid template_id, key, payload, signatories, observers, agreement_text + import Queries.CompatImplicits.catsReducibleFromFoldable1 + val outerSelectList = + sql"""contract_id, template_id, key, payload, + signatories, observers, agreement_text""" + val dupQ = + sql"""SELECT c.contract_id contract_id, $tpid template_id, key, payload, + signatories, observers, agreement_text, + row_number() over (PARTITION BY c.contract_id ORDER BY c.contract_id) AS rownumber FROM contract c - WHERE (JSON_EXISTS(signatories, $partiesQuery) - OR JSON_EXISTS(observers, $partiesQuery)) - AND $queriesCondition""" + LEFT JOIN contract_stakeholders cst ON (c.contract_id = cst.contract_id) + WHERE (${Fragments.in(fr"cst.stakeholder", parties)}) + AND ($queriesCondition)""" + val q = sql"SELECT $outerSelectList FROM ($dupQ) WHERE rownumber = 1" q.query[ (String, Mark0, JsValue, JsValue, JsValue, JsValue, Option[String]) ].map { case (cid, tpid, key, payload, signatories, observers, agreement) => diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala index ca72ca9aa6a2..8517a7e55a4c 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala @@ -139,11 +139,11 @@ private class ContractsFetch( lc: LoggingContextOf[InstanceUUID], ): ConnectionIO[BeginBookmark[domain.Offset]] = { - import doobie.implicits._ + import doobie.implicits._, cats.syntax.apply._ def loop(maxAttempts: Int): ConnectionIO[BeginBookmark[domain.Offset]] = { logger.debug(s"contractsIo, maxAttempts: $maxAttempts") - contractsIo_(jwt, parties, disableAcs, absEnd, templateId).exceptSql { + (contractsIo_(jwt, parties, disableAcs, absEnd, templateId) <* fconn.commit).exceptSql { case e if maxAttempts > 0 && retrySqlStates(e.getSQLState) => logger.debug(s"contractsIo, exception: ${e.description}, state: ${e.getSQLState}") fconn.rollback flatMap (_ => loop(maxAttempts - 1)) diff --git a/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala b/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala index 9dcb23d68db8..1c3c5bebb716 100644 --- a/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala +++ b/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala @@ -48,7 +48,7 @@ trait OracleAround { stmt.execute(s"""create user $name identified by $pwd""") stmt.execute(s"""grant connect, resource to $name""") stmt.execute( - s"""grant create table, create view, create procedure, create sequence, create type to $name""" + s"""grant create table, create materialized view, create view, create procedure, create sequence, create type to $name""" ) stmt.execute(s"""alter user $name quota unlimited on users""") }.get