From 56531e86105bea38fbe9633b24227a37983abafc Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Fri, 12 Nov 2021 12:05:58 -0500 Subject: [PATCH] Backport: Support deletion of a large number of contracts (#11646) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support deletion of a large number of contracts (#10353) Backport of #10353 / 9c9b91eca7d5e8e9d391dec8000868ca4300f736. * Support deletion of a large number of contracts fixes #10339 There are two orthogonal issues here: 1. scalaz’s toVector from the Foldable[Set] instance stackoverflows. I’ve just avoided using that altogether. 2. Oracle doesn’t like more than 1k items in the IN clause. I chunked the queries into chunks of size 1k to fix this. changelog_begin - [JSON API] Fix an error where transactions that delete a large number of contracts resulted in stackoverflows with the PostgreSQL backend and database errors with Oracle. changelog_end * fix benchmark changelog_begin changelog_end * Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala Co-authored-by: Stephen Compall * Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala Co-authored-by: Stephen Compall * that's not how you foldA changelog_begin changelog_end Co-authored-by: Stephen Compall * backport past some moves and type parameter adds * backport encodeCreateAndExerciseCommand from #9837 / 25b7e544ceba Co-authored-by: Moritz Kiefer --- .../digitalasset/http/dbbackend/Queries.scala | 37 ++++++-- .../daml/http/HttpServiceTestFixture.scala | 2 + .../http-json/src/it/daml/Account.daml | 20 ++++ .../AbstractHttpServiceIntegrationTest.scala | 95 +++++++++++++++++++ .../digitalasset/http/ContractsFetch.scala | 2 +- .../http/json/DomainJsonEncoder.scala | 17 ++++ 6 files changed, 163 insertions(+), 10 deletions(-) diff --git a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala index e4d40b63ac1b..33e1f045a126 100644 --- a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala +++ b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala @@ -10,7 +10,7 @@ import nonempty.NonEmptyReturningOps._ import doobie._ import doobie.implicits._ import scala.collection.immutable.{Iterable, Seq => ISeq} -import scalaz.{@@, Foldable, Functor, OneAnd, Tag} +import scalaz.{@@, Functor, OneAnd, Tag} import scalaz.Id.Id import scalaz.syntax.foldable._ import scalaz.syntax.functor._ @@ -72,6 +72,9 @@ sealed abstract class Queries { protected[this] def textType: Fragment protected[this] def agreementTextType: Fragment + // The max list size that can be used in `IN` clauses + protected[this] def maxListSize: Option[Int] + protected[this] def jsonColumn(name: Fragment): Fragment private[this] val createTemplateIdsTable = CreateTable( @@ -214,15 +217,26 @@ sealed abstract class Queries { dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]] )(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int] - final def deleteContracts[F[_]: Foldable]( - cids: F[String] + final def deleteContracts( + cids: Set[String] )(implicit log: LogHandler): ConnectionIO[Int] = { - cids.toVector match { - case Vector(hd, tl @ _*) => - (sql"DELETE FROM contract WHERE contract_id IN (" - ++ concatFragment(OneAnd(sql"$hd", tl.toIndexedSeq map (cid => sql", $cid"))) - ++ sql")").update.run - case _ => free.connection.pure(0) + import cats.data.NonEmptyVector + import cats.instances.vector._ + import cats.instances.int._ + import cats.syntax.foldable._ + NonEmptyVector.fromVector(cids.toVector) match { + case None => + free.connection.pure(0) + case Some(cids) => + val chunks = maxListSize.fold(Vector(cids)) { size => + require(size >= 1, s"size=$size but must be positive") + cids.toVector.grouped(size).map(NonEmptyVector.fromVectorUnsafe).toVector + } + chunks + .map(chunk => + (fr"DELETE FROM contract WHERE " ++ Fragments.in(fr"contract_id", chunk)).update.run + ) + .foldA } } @@ -394,6 +408,8 @@ private object PostgresQueries extends Queries { protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL" + protected[this] override val maxListSize = None + private[this] val indexContractsKeys = CreateIndex(sql""" CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key) """) @@ -496,6 +512,9 @@ private object OracleQueries extends Queries { protected[this] override def jsonColumn(name: Fragment) = name ++ sql" CLOB NOT NULL CONSTRAINT ensure_json_" ++ name ++ sql" CHECK (" ++ name ++ sql" IS JSON)" + // See http://www.dba-oracle.com/t_ora_01795_maximum_number_of_expressions_in_a_list_is_1000.htm + protected[this] override def maxListSize = Some(1000) + protected[this] override def contractsTableSignatoriesObservers = sql"" private val createSignatoriesTable = CreateTable( diff --git a/ledger-service/http-json-testing/src/main/scala/com/daml/http/HttpServiceTestFixture.scala b/ledger-service/http-json-testing/src/main/scala/com/daml/http/HttpServiceTestFixture.scala index fa6676807e8f..46dedc805fb0 100644 --- a/ledger-service/http-json-testing/src/main/scala/com/daml/http/HttpServiceTestFixture.scala +++ b/ledger-service/http-json-testing/src/main/scala/com/daml/http/HttpServiceTestFixture.scala @@ -65,6 +65,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside { jdbcConfig: Option[JdbcConfig], staticContentConfig: Option[StaticContentConfig], leakPasswords: LeakPasswords = LeakPasswords.FiresheepStyle, + maxInboundMessageSize: Int = HttpService.DefaultMaxInboundMessageSize, useTls: UseTls = UseTls.NoTls, wsConfig: Option[WebsocketConfig] = None, )(testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A])(implicit @@ -89,6 +90,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside { tlsConfig = if (useTls) clientTlsConfig else noTlsConfig, wsConfig = wsConfig, accessTokenFile = None, + maxInboundMessageSize = maxInboundMessageSize, allowNonHttps = leakPasswords, staticContentConfig = staticContentConfig, packageReloadInterval = doNotReloadPackages, diff --git a/ledger-service/http-json/src/it/daml/Account.daml b/ledger-service/http-json/src/it/daml/Account.daml index f48d1d456e2e..b2b710035e36 100644 --- a/ledger-service/http-json/src/it/daml/Account.daml +++ b/ledger-service/http-json/src/it/daml/Account.daml @@ -4,6 +4,8 @@ module Account where +import DA.Foldable + data AccountStatus = Enabled Time | Disabled Time deriving (Eq, Show) @@ -33,3 +35,21 @@ template KeyedByVariantAndRecord with signatory party key (party, fooVariant, bazRecord): (Party, Foo, BazRecord) maintainer key._1 + +template Helper + with + owner : Party + where + signatory owner + choice CreateN : [ContractId Account] + with + n : Int + controller owner + do t <- getTime + mapA (\i -> create (Account owner (show i) (Enabled t))) [1 .. n] + + choice ArchiveAll : () + with + cids : [ContractId Account] + controller owner + do mapA_ archive cids diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala index 90c9325c9c38..85666f562c96 100644 --- a/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractHttpServiceIntegrationTest.scala @@ -125,6 +125,22 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging { )(testFn) } + protected def withHttpServiceAndClient[A](maxInboundMessageSize: Int)( + testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A] + ): Future[A] = + HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId, None, useTls) { + case (ledgerPort, _) => + HttpServiceTestFixture.withHttpService[A]( + testId, + ledgerPort, + jdbcConfig, + staticContentConfig, + useTls = useTls, + wsConfig = wsConfig, + maxInboundMessageSize = maxInboundMessageSize, + )(testFn) + } + protected def withHttpService[A]( f: (Uri, DomainJsonEncoder, DomainJsonDecoder) => Future[A] ): Future[A] = @@ -1512,4 +1528,83 @@ abstract class AbstractHttpServiceIntegrationTest } }: Future[Assertion] } + + "archiving a large number of contracts should succeed" in withHttpServiceAndClient( + HttpService.DefaultMaxInboundMessageSize * 10 + ) { (uri, encoder, _, _) => + val numContracts: Long = 10000 + val helperId = domain.TemplateId(None, "Account", "Helper") + val payload = v.Record( + fields = List(v.RecordField("owner", Some(v.Value(v.Value.Sum.Party("Alice"))))) + ) + val createCmd: domain.CreateAndExerciseCommand[v.Record, v.Value] = + domain.CreateAndExerciseCommand( + templateId = helperId, + payload = payload, + choice = lar.Choice("CreateN"), + argument = boxedRecord( + v.Record(fields = + List(v.RecordField("n", Some(v.Value(v.Value.Sum.Int64(numContracts))))) + ) + ), + meta = None, + ) + def encode(cmd: domain.CreateAndExerciseCommand[v.Record, v.Value]): JsValue = + encoder.encodeCreateAndExerciseCommand(cmd).valueOr(e => fail(e.shows)) + def archiveCmd(cids: List[String]) = + domain.CreateAndExerciseCommand( + templateId = helperId, + payload = payload, + choice = lar.Choice("ArchiveAll"), + argument = boxedRecord( + v.Record(fields = + List( + v.RecordField( + "cids", + Some( + v.Value( + v.Value.Sum.List(v.List(cids.map(cid => v.Value(v.Value.Sum.ContractId(cid))))) + ) + ), + ) + ) + ) + ), + meta = None, + ) + def queryN(n: Long): Future[Assertion] = postJsonRequest( + uri.withPath(Uri.Path("/v1/query")), + jsObject("""{"templateIds": ["Account:Account"]}"""), + ).flatMap { case (status, output) => + status shouldBe StatusCodes.OK + assertStatus(output, StatusCodes.OK) + inside(getResult(output)) { case JsArray(result) => + result should have length n + } + } + + for { + resp <- postJsonRequest(uri.withPath(Uri.Path("/v1/create-and-exercise")), encode(createCmd)) + (status, output) = resp + _ = { + status shouldBe StatusCodes.OK + assertStatus(output, StatusCodes.OK) + } + created = getChild(getResult(output), "exerciseResult").convertTo[List[String]] + _ = created should have length numContracts + + _ <- queryN(numContracts) + + status <- postJsonRequest( + uri.withPath(Uri.Path("/v1/create-and-exercise")), + encode(archiveCmd(created)), + ).map(_._1) + _ = { + status shouldBe StatusCodes.OK + assertStatus(output, StatusCodes.OK) + } + + _ <- queryN(0) + } yield succeed + } } diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala index 2f503fa676da..533ea90ea97c 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala @@ -486,7 +486,7 @@ private[http] object ContractsFetch { )(implicit log: doobie.LogHandler, sjd: SupportedJdbcDriver): ConnectionIO[Unit] = { import doobie.implicits._, cats.syntax.functor._ surrogateTemplateIds(step.inserts.iterator.map(_.templateId).toSet).flatMap { stidMap => - import cats.syntax.apply._, cats.instances.vector._, scalaz.std.set._ + import cats.syntax.apply._, cats.instances.vector._ import json.JsonProtocol._ import sjd._ (queries.deleteContracts(step.deletes.keySet) *> diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/DomainJsonEncoder.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/DomainJsonEncoder.scala index 9b99e37b2d3b..957dc7e8af4d 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/DomainJsonEncoder.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/DomainJsonEncoder.scala @@ -33,6 +33,23 @@ class DomainJsonEncoder( } yield y + def encodeCreateAndExerciseCommand( + cmd: domain.CreateAndExerciseCommand[ + lav1.value.Record, + lav1.value.Value, + ] + )(implicit + ev: JsonWriter[ + domain.CreateAndExerciseCommand[JsValue, JsValue] + ] + ): JsonError \/ JsValue = + for { + payload <- apiRecordToJsObject(cmd.payload): JsonError \/ JsValue + argument <- apiValueToJsValue(cmd.argument) + y <- SprayJson.encode(cmd.copy(payload = payload, argument = argument)).liftErr(JsonError) + + } yield y + object implicits { implicit val ApiValueJsonWriter: JsonWriter[lav1.value.Value] = (obj: lav1.value.Value) => apiValueToJsValue(obj).valueOr(e => spray.json.serializationError(e.shows))