Skip to content

Commit

Permalink
Support deletion of a large number of contracts (#10353)
Browse files Browse the repository at this point in the history
* Support deletion of a large number of contracts

fixes #10339

There are two orthogonal issues here:

1. scalaz’s toVector from the Foldable[Set] instance
   stackoverflows. I’ve just avoided using that altogether.
2. Oracle doesn’t like more than 1k items in the IN clause. I chunked
   the queries into chunks of size 1k to fix this.

changelog_begin

- [JSON API] Fix an error where transactions that delete a large
  number of contracts resulted in stackoverflows with the PostgreSQL
  backend and database errors with Oracle.

changelog_end

* fix benchmark

changelog_begin
changelog_end

* Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala

Co-authored-by: Stephen Compall <[email protected]>

* Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala

Co-authored-by: Stephen Compall <[email protected]>

* that's not how you foldA

changelog_begin
changelog_end

Co-authored-by: Stephen Compall <[email protected]>
  • Loading branch information
cocreature and S11001001 authored Jul 22, 2021
1 parent 1b5f99e commit 9c9b91e
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import doobie._
import doobie.implicits._
import scala.annotation.nowarn
import scala.collection.immutable.{Iterable, Seq => ISeq}
import scalaz.{@@, Cord, Foldable, Functor, OneAnd, Tag, \/, -\/, \/-}
import scalaz.{@@, Cord, Functor, OneAnd, Tag, \/, -\/, \/-}
import scalaz.Digit._0
import scalaz.Id.Id
import scalaz.syntax.foldable._
Expand Down Expand Up @@ -77,6 +77,9 @@ sealed abstract class Queries {
protected[this] def nameType: Fragment // Name in daml-lf-1.rst
protected[this] def agreementTextType: Fragment

// The max list size that can be used in `IN` clauses
protected[this] def maxListSize: Option[Int]

protected[this] def jsonColumn(name: Fragment): Fragment

private[this] val createTemplateIdsTable = CreateTable(
Expand Down Expand Up @@ -207,15 +210,23 @@ sealed abstract class Queries {
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, ipol: SqlInterpol): ConnectionIO[Int]

final def deleteContracts[F[_]: Foldable](
cids: F[String]
final def deleteContracts(
cids: Set[String]
)(implicit log: LogHandler): ConnectionIO[Int] = {
cids.toVector match {
case Vector(hd, tl @ _*) =>
(sql"DELETE FROM contract WHERE contract_id IN ("
++ concatFragment(OneAnd(sql"$hd", tl.toIndexedSeq map (cid => sql", $cid")))
++ sql")").update.run
case _ => free.connection.pure(0)
import cats.data.NonEmptyVector
import cats.instances.vector._
import cats.instances.int._
import cats.syntax.foldable._
NonEmptyVector.fromVector(cids.toVector) match {
case None =>
free.connection.pure(0)
case Some(cids) =>
val chunks = maxListSize.fold(Vector(cids))(size => cids.grouped(size).toVector)
chunks
.map(chunk =>
(fr"DELETE FROM contract WHERE " ++ Fragments.in(fr"contract_id", chunk)).update.run
)
.foldA
}
}

Expand Down Expand Up @@ -466,6 +477,8 @@ private object PostgresQueries extends Queries {

protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL"

protected[this] override val maxListSize = None

private[this] val indexContractsKeys = CreateIndex(sql"""
CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key)
""")
Expand Down Expand Up @@ -616,6 +629,9 @@ private object OracleQueries extends Queries {
protected[this] override def jsonColumn(name: Fragment) =
name ++ sql" CLOB NOT NULL CONSTRAINT ensure_json_" ++ name ++ sql" CHECK (" ++ name ++ sql" IS JSON)"

// See http://www.dba-oracle.com/t_ora_01795_maximum_number_of_expressions_in_a_list_is_1000.htm
protected[this] override def maxListSize = Some(1000)

protected[this] override def contractsTableSignatoriesObservers =
sql"""
,${jsonColumn(sql"signatories")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
jdbcConfig: Option[JdbcConfig],
staticContentConfig: Option[StaticContentConfig],
leakPasswords: LeakPasswords = LeakPasswords.FiresheepStyle,
maxInboundMessageSize: Int = StartSettings.DefaultMaxInboundMessageSize,
useTls: UseTls = UseTls.NoTls,
wsConfig: Option[WebsocketConfig] = None,
nonRepudiation: nonrepudiation.Configuration.Cli = nonrepudiation.Configuration.Cli.Empty,
Expand All @@ -96,6 +97,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
tlsConfig = if (useTls) clientTlsConfig else noTlsConfig,
wsConfig = wsConfig,
accessTokenFile = None,
maxInboundMessageSize = maxInboundMessageSize,
allowNonHttps = leakPasswords,
staticContentConfig = staticContentConfig,
packageReloadInterval = doNotReloadPackages,
Expand Down
8 changes: 4 additions & 4 deletions ledger-service/http-json/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ alias(
]
]

alias(
test_suite(
name = "integration-tests",
actual = "integration-tests-ce",
tests = ["integration-tests-ce"],
)

[
Expand Down Expand Up @@ -422,9 +422,9 @@ alias(
]
]

alias(
test_suite(
name = "failure-tests",
actual = "failure-tests-ce",
tests = ["failure-tests-ce"],
)

exports_files(["src/main/resources/logback.xml"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class InsertBenchmark extends ContractDaoBenchmark {

private var contracts: List[DBContract[SurrogateTpId, JsValue, JsValue, Seq[String]]] = _

private var contractCids: List[String] = _
private var contractCids: Set[String] = _

private var tpid: SurrogateTpId = _

Expand All @@ -33,7 +33,7 @@ class InsertBenchmark extends ContractDaoBenchmark {
contract(-i, "Alice", tpid)
}.toList

contractCids = contracts.map(_.contractId)
contractCids = contracts.view.map(_.contractId).toSet

(0 until batches).foreach { batch =>
insertBatch("Alice", tpid, batch * batchSize)
Expand Down
20 changes: 20 additions & 0 deletions ledger-service/http-json/src/it/daml/Account.daml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

module Account where

import DA.Foldable

data AccountStatus = Enabled Time | Disabled Time
deriving (Eq, Show)

Expand Down Expand Up @@ -43,3 +45,21 @@ template KeyedByVariantAndRecord with
signatory party
key (party, fooVariant, bazRecord): (Party, Foo, BazRecord)
maintainer key._1

template Helper
with
owner : Party
where
signatory owner
choice CreateN : [ContractId Account]
with
n : Int
controller owner
do t <- getTime
mapA (\i -> create (Account owner (show i) (Enabled t))) [1 .. n]

choice ArchiveAll : ()
with
cids : [ContractId Account]
controller owner
do mapA_ archive cids
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
)(testFn)
}

protected def withHttpServiceAndClient[A](maxInboundMessageSize: Int)(
testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, DamlLedgerClient) => Future[A]
): Future[A] =
HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId, None, useTls) {
case (ledgerPort, _) =>
HttpServiceTestFixture.withHttpService[A](
testId,
ledgerPort,
jdbcConfig,
staticContentConfig,
useTls = useTls,
wsConfig = wsConfig,
maxInboundMessageSize = maxInboundMessageSize,
)(testFn)
}

protected def withHttpService[A](
f: (Uri, DomainJsonEncoder, DomainJsonDecoder) => Future[A]
): Future[A] =
Expand Down Expand Up @@ -1545,4 +1561,83 @@ abstract class AbstractHttpServiceIntegrationTest
}
}: Future[Assertion]
}

"archiving a large number of contracts should succeed" in withHttpServiceAndClient(
StartSettings.DefaultMaxInboundMessageSize * 10
) { (uri, encoder, _, _) =>
val numContracts: Long = 10000
val helperId = domain.TemplateId(None, "Account", "Helper")
val payload = v.Record(
fields = List(v.RecordField("owner", Some(v.Value(v.Value.Sum.Party("Alice")))))
)
val createCmd: domain.CreateAndExerciseCommand[v.Record, v.Value, OptionalPkg] =
domain.CreateAndExerciseCommand(
templateId = helperId,
payload = payload,
choice = lar.Choice("CreateN"),
argument = boxedRecord(
v.Record(fields =
List(v.RecordField("n", Some(v.Value(v.Value.Sum.Int64(numContracts)))))
)
),
meta = None,
)
def encode(cmd: domain.CreateAndExerciseCommand[v.Record, v.Value, OptionalPkg]): JsValue =
encoder.encodeCreateAndExerciseCommand(cmd).valueOr(e => fail(e.shows))
def archiveCmd(cids: List[String]) =
domain.CreateAndExerciseCommand(
templateId = helperId,
payload = payload,
choice = lar.Choice("ArchiveAll"),
argument = boxedRecord(
v.Record(fields =
List(
v.RecordField(
"cids",
Some(
v.Value(
v.Value.Sum.List(v.List(cids.map(cid => v.Value(v.Value.Sum.ContractId(cid)))))
)
),
)
)
)
),
meta = None,
)
def queryN(n: Long): Future[Assertion] = postJsonRequest(
uri.withPath(Uri.Path("/v1/query")),
jsObject("""{"templateIds": ["Account:Account"]}"""),
).flatMap { case (status, output) =>
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
inside(getResult(output)) { case JsArray(result) =>
result should have length n
}
}

for {
resp <- postJsonRequest(uri.withPath(Uri.Path("/v1/create-and-exercise")), encode(createCmd))
(status, output) = resp
_ = {
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
}
created = getChild(getResult(output), "exerciseResult").convertTo[List[String]]
_ = created should have length numContracts

_ <- queryN(numContracts)

status <- postJsonRequest(
uri.withPath(Uri.Path("/v1/create-and-exercise")),
encode(archiveCmd(created)),
).map(_._1)
_ = {
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
}

_ <- queryN(0)
} yield succeed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ abstract class AbstractNonRepudiationTest

private def withJsonApi[A](participantPort: Port) =
HttpServiceTestFixture.withHttpService[A](
testId,
participantPort,
jdbcConfig,
staticContentConfig,
LeakPasswords.No,
useTls,
wsConfig,
nonRepudiation,
testName = testId,
ledgerPort = participantPort,
jdbcConfig = jdbcConfig,
staticContentConfig = staticContentConfig,
leakPasswords = LeakPasswords.No,
useTls = useTls,
wsConfig = wsConfig,
nonRepudiation = nonRepudiation,
) _

protected def withSetup[A](test: (Tables, Uri, DomainJsonEncoder) => Future[Assertion]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private[http] object ContractsFetch {
)(implicit log: doobie.LogHandler, sjd: SupportedJdbcDriver): ConnectionIO[Unit] = {
import doobie.implicits._, cats.syntax.functor._
surrogateTemplateIds(step.inserts.iterator.map(_.templateId).toSet).flatMap { stidMap =>
import cats.syntax.apply._, cats.instances.vector._, scalaz.std.set._
import cats.syntax.apply._, cats.instances.vector._
import json.JsonProtocol._
import sjd._
(queries.deleteContracts(step.deletes.keySet) *>
Expand Down

0 comments on commit 9c9b91e

Please sign in to comment.