Skip to content

Commit

Permalink
Backport: Support deletion of a large number of contracts (#11646)
Browse files Browse the repository at this point in the history
* Support deletion of a large number of contracts (#10353)

Backport of #10353 / 9c9b91e.

* Support deletion of a large number of contracts

fixes #10339

There are two orthogonal issues here:

1. scalaz’s toVector from the Foldable[Set] instance
   stackoverflows. I’ve just avoided using that altogether.
2. Oracle doesn’t like more than 1k items in the IN clause. I chunked
   the queries into chunks of size 1k to fix this.

changelog_begin

- [JSON API] Fix an error where transactions that delete a large
  number of contracts resulted in stackoverflows with the PostgreSQL
  backend and database errors with Oracle.

changelog_end

* fix benchmark

changelog_begin
changelog_end

* Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala

Co-authored-by: Stephen Compall <[email protected]>

* Update ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala

Co-authored-by: Stephen Compall <[email protected]>

* that's not how you foldA

changelog_begin
changelog_end

Co-authored-by: Stephen Compall <[email protected]>

* backport past some moves and type parameter adds

* backport encodeCreateAndExerciseCommand from #9837 / 25b7e54

Co-authored-by: Moritz Kiefer <[email protected]>
  • Loading branch information
S11001001 and cocreature authored Nov 12, 2021
1 parent 6bf19fc commit 56531e8
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import nonempty.NonEmptyReturningOps._
import doobie._
import doobie.implicits._
import scala.collection.immutable.{Iterable, Seq => ISeq}
import scalaz.{@@, Foldable, Functor, OneAnd, Tag}
import scalaz.{@@, Functor, OneAnd, Tag}
import scalaz.Id.Id
import scalaz.syntax.foldable._
import scalaz.syntax.functor._
Expand Down Expand Up @@ -72,6 +72,9 @@ sealed abstract class Queries {
protected[this] def textType: Fragment
protected[this] def agreementTextType: Fragment

// The max list size that can be used in `IN` clauses
protected[this] def maxListSize: Option[Int]

protected[this] def jsonColumn(name: Fragment): Fragment

private[this] val createTemplateIdsTable = CreateTable(
Expand Down Expand Up @@ -214,15 +217,26 @@ sealed abstract class Queries {
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler, pas: Put[Array[String]]): ConnectionIO[Int]

final def deleteContracts[F[_]: Foldable](
cids: F[String]
final def deleteContracts(
cids: Set[String]
)(implicit log: LogHandler): ConnectionIO[Int] = {
cids.toVector match {
case Vector(hd, tl @ _*) =>
(sql"DELETE FROM contract WHERE contract_id IN ("
++ concatFragment(OneAnd(sql"$hd", tl.toIndexedSeq map (cid => sql", $cid")))
++ sql")").update.run
case _ => free.connection.pure(0)
import cats.data.NonEmptyVector
import cats.instances.vector._
import cats.instances.int._
import cats.syntax.foldable._
NonEmptyVector.fromVector(cids.toVector) match {
case None =>
free.connection.pure(0)
case Some(cids) =>
val chunks = maxListSize.fold(Vector(cids)) { size =>
require(size >= 1, s"size=$size but must be positive")
cids.toVector.grouped(size).map(NonEmptyVector.fromVectorUnsafe).toVector
}
chunks
.map(chunk =>
(fr"DELETE FROM contract WHERE " ++ Fragments.in(fr"contract_id", chunk)).update.run
)
.foldA
}
}

Expand Down Expand Up @@ -394,6 +408,8 @@ private object PostgresQueries extends Queries {

protected[this] override def jsonColumn(name: Fragment) = name ++ sql" JSONB NOT NULL"

protected[this] override val maxListSize = None

private[this] val indexContractsKeys = CreateIndex(sql"""
CREATE INDEX contract_tpid_key_idx ON contract USING BTREE (tpid, key)
""")
Expand Down Expand Up @@ -496,6 +512,9 @@ private object OracleQueries extends Queries {
protected[this] override def jsonColumn(name: Fragment) =
name ++ sql" CLOB NOT NULL CONSTRAINT ensure_json_" ++ name ++ sql" CHECK (" ++ name ++ sql" IS JSON)"

// See http://www.dba-oracle.com/t_ora_01795_maximum_number_of_expressions_in_a_list_is_1000.htm
protected[this] override def maxListSize = Some(1000)

protected[this] override def contractsTableSignatoriesObservers = sql""

private val createSignatoriesTable = CreateTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
jdbcConfig: Option[JdbcConfig],
staticContentConfig: Option[StaticContentConfig],
leakPasswords: LeakPasswords = LeakPasswords.FiresheepStyle,
maxInboundMessageSize: Int = HttpService.DefaultMaxInboundMessageSize,
useTls: UseTls = UseTls.NoTls,
wsConfig: Option[WebsocketConfig] = None,
)(testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A])(implicit
Expand All @@ -89,6 +90,7 @@ object HttpServiceTestFixture extends LazyLogging with Assertions with Inside {
tlsConfig = if (useTls) clientTlsConfig else noTlsConfig,
wsConfig = wsConfig,
accessTokenFile = None,
maxInboundMessageSize = maxInboundMessageSize,
allowNonHttps = leakPasswords,
staticContentConfig = staticContentConfig,
packageReloadInterval = doNotReloadPackages,
Expand Down
20 changes: 20 additions & 0 deletions ledger-service/http-json/src/it/daml/Account.daml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

module Account where

import DA.Foldable

data AccountStatus = Enabled Time | Disabled Time
deriving (Eq, Show)

Expand Down Expand Up @@ -33,3 +35,21 @@ template KeyedByVariantAndRecord with
signatory party
key (party, fooVariant, bazRecord): (Party, Foo, BazRecord)
maintainer key._1

template Helper
with
owner : Party
where
signatory owner
choice CreateN : [ContractId Account]
with
n : Int
controller owner
do t <- getTime
mapA (\i -> create (Account owner (show i) (Enabled t))) [1 .. n]

choice ArchiveAll : ()
with
cids : [ContractId Account]
controller owner
do mapA_ archive cids
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ trait AbstractHttpServiceIntegrationTestFuns extends StrictLogging {
)(testFn)
}

protected def withHttpServiceAndClient[A](maxInboundMessageSize: Int)(
testFn: (Uri, DomainJsonEncoder, DomainJsonDecoder, LedgerClient) => Future[A]
): Future[A] =
HttpServiceTestFixture.withLedger[A](List(dar1, dar2), testId, None, useTls) {
case (ledgerPort, _) =>
HttpServiceTestFixture.withHttpService[A](
testId,
ledgerPort,
jdbcConfig,
staticContentConfig,
useTls = useTls,
wsConfig = wsConfig,
maxInboundMessageSize = maxInboundMessageSize,
)(testFn)
}

protected def withHttpService[A](
f: (Uri, DomainJsonEncoder, DomainJsonDecoder) => Future[A]
): Future[A] =
Expand Down Expand Up @@ -1512,4 +1528,83 @@ abstract class AbstractHttpServiceIntegrationTest
}
}: Future[Assertion]
}

"archiving a large number of contracts should succeed" in withHttpServiceAndClient(
HttpService.DefaultMaxInboundMessageSize * 10
) { (uri, encoder, _, _) =>
val numContracts: Long = 10000
val helperId = domain.TemplateId(None, "Account", "Helper")
val payload = v.Record(
fields = List(v.RecordField("owner", Some(v.Value(v.Value.Sum.Party("Alice")))))
)
val createCmd: domain.CreateAndExerciseCommand[v.Record, v.Value] =
domain.CreateAndExerciseCommand(
templateId = helperId,
payload = payload,
choice = lar.Choice("CreateN"),
argument = boxedRecord(
v.Record(fields =
List(v.RecordField("n", Some(v.Value(v.Value.Sum.Int64(numContracts)))))
)
),
meta = None,
)
def encode(cmd: domain.CreateAndExerciseCommand[v.Record, v.Value]): JsValue =
encoder.encodeCreateAndExerciseCommand(cmd).valueOr(e => fail(e.shows))
def archiveCmd(cids: List[String]) =
domain.CreateAndExerciseCommand(
templateId = helperId,
payload = payload,
choice = lar.Choice("ArchiveAll"),
argument = boxedRecord(
v.Record(fields =
List(
v.RecordField(
"cids",
Some(
v.Value(
v.Value.Sum.List(v.List(cids.map(cid => v.Value(v.Value.Sum.ContractId(cid)))))
)
),
)
)
)
),
meta = None,
)
def queryN(n: Long): Future[Assertion] = postJsonRequest(
uri.withPath(Uri.Path("/v1/query")),
jsObject("""{"templateIds": ["Account:Account"]}"""),
).flatMap { case (status, output) =>
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
inside(getResult(output)) { case JsArray(result) =>
result should have length n
}
}

for {
resp <- postJsonRequest(uri.withPath(Uri.Path("/v1/create-and-exercise")), encode(createCmd))
(status, output) = resp
_ = {
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
}
created = getChild(getResult(output), "exerciseResult").convertTo[List[String]]
_ = created should have length numContracts

_ <- queryN(numContracts)

status <- postJsonRequest(
uri.withPath(Uri.Path("/v1/create-and-exercise")),
encode(archiveCmd(created)),
).map(_._1)
_ = {
status shouldBe StatusCodes.OK
assertStatus(output, StatusCodes.OK)
}

_ <- queryN(0)
} yield succeed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private[http] object ContractsFetch {
)(implicit log: doobie.LogHandler, sjd: SupportedJdbcDriver): ConnectionIO[Unit] = {
import doobie.implicits._, cats.syntax.functor._
surrogateTemplateIds(step.inserts.iterator.map(_.templateId).toSet).flatMap { stidMap =>
import cats.syntax.apply._, cats.instances.vector._, scalaz.std.set._
import cats.syntax.apply._, cats.instances.vector._
import json.JsonProtocol._
import sjd._
(queries.deleteContracts(step.deletes.keySet) *>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ class DomainJsonEncoder(

} yield y

def encodeCreateAndExerciseCommand(
cmd: domain.CreateAndExerciseCommand[
lav1.value.Record,
lav1.value.Value,
]
)(implicit
ev: JsonWriter[
domain.CreateAndExerciseCommand[JsValue, JsValue]
]
): JsonError \/ JsValue =
for {
payload <- apiRecordToJsObject(cmd.payload): JsonError \/ JsValue
argument <- apiValueToJsValue(cmd.argument)
y <- SprayJson.encode(cmd.copy(payload = payload, argument = argument)).liftErr(JsonError)

} yield y

object implicits {
implicit val ApiValueJsonWriter: JsonWriter[lav1.value.Value] = (obj: lav1.value.Value) =>
apiValueToJsValue(obj).valueOr(e => spray.json.serializationError(e.shows))
Expand Down

0 comments on commit 56531e8

Please sign in to comment.