Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more determinism in delete order #16401

Merged
merged 8 commits into from
Feb 28, 2023
2 changes: 2 additions & 0 deletions ledger-service/db-backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ da_scala_library(
"//ledger/metrics",
"//libs-scala/contextualized-logging",
"//libs-scala/nonempty",
"//libs-scala/nonempty-cats",
"//libs-scala/scala-utils",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import nonempty.NonEmptyReturningOps._
import doobie._
import doobie.implicits._
import scala.annotation.nowarn
import scala.collection.immutable.{Seq => ISeq, SortedMap}
import scala.collection.immutable.{Seq => ISeq, SortedMap, SortedSet}
import scalaz.{@@, Cord, Functor, OneAnd, Tag, \/, -\/, \/-}
import scalaz.Digit._0
import scalaz.syntax.foldable._
Expand Down Expand Up @@ -258,7 +258,7 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
)(implicit log: LogHandler): ConnectionIO[Int] = {
val (existingParties, newParties) = {
import cats.syntax.foldable._
parties.toList.partition(p => lastOffsets.contains(p))
parties.toList.sorted.partition(p => lastOffsets.contains(p))
}
// If a concurrent transaction inserted an offset for a new party, the insert will fail.
val insert =
Expand Down Expand Up @@ -294,6 +294,9 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
protected[this] type DBContractKey
protected[this] def toDBContractKey[CK: JsonWriter](ck: CK): DBContractKey

/** Whether strict determinism can be avoided by the contracts-fetch process. */
def allowDamlTransactionBatching: Boolean

final def insertContracts[F[_]: cats.Foldable: Functor, CK: JsonWriter, PL: JsonWriter](
dbcs: F[DBContract[SurrogateTpId, CK, PL, Seq[String]]]
)(implicit log: LogHandler): ConnectionIO[Int] =
Expand All @@ -306,21 +309,30 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
final def deleteContracts(
cids: Set[String]
)(implicit log: LogHandler): ConnectionIO[Int] = {
import cats.data.NonEmptyVector
import cats.instances.vector._
import cats.instances.int._
import cats.syntax.foldable._
NonEmptyVector.fromVector(cids.toVector) match {
case None =>
import nonempty.catsinstances._
cids to SortedSet match {
case NonEmpty(cids) =>
if (allowDamlTransactionBatching) {
val del = fr"DELETE FROM $contractTableName WHERE " ++ {
val chunks =
maxListSize.fold(NonEmpty(Vector, cids)) { size =>
val NonEmpty(groups) =
cids.grouped(size).collect { case NonEmpty(group) => group }.toVector
groups
}
joinFragment(
chunks.map(cids => Fragments.in(fr"contract_id", cids.toVector.toNEF)),
fr" OR ",
)
}
del.update.run
} else {
Update[String](s"DELETE FROM $contractTableNameRaw WHERE contract_id = ?")
.updateMany(cids.toNEF)
}
case _ =>
free.connection.pure(0)
case Some(cids) =>
val chunks = maxListSize.fold(Vector(cids))(size => cids.grouped(size).toVector)
chunks
.map(chunk =>
(fr"DELETE FROM $contractTableName WHERE " ++ Fragments
.in(fr"contract_id", chunk)).update.run
)
.foldA
}
}

Expand Down Expand Up @@ -508,6 +520,10 @@ object Queries {
case object GTEQ extends OrderOperator
}

// XXX SC I'm pretty certain we can use NonEmpty all the way down
private[http] def joinFragment(xs: NonEmpty[Vector[Fragment]], sep: Fragment): Fragment =
concatFragment(intersperse(xs.toOneAnd, sep))

private[http] def joinFragment(xs: OneAnd[Vector, Fragment], sep: Fragment): Fragment =
concatFragment(intersperse(xs, sep))

Expand Down Expand Up @@ -715,6 +731,8 @@ private final class PostgresQueries(tablePrefix: String, tpIdCacheMaxEntries: Lo

protected[this] override def toDBContractKey[CK: JsonWriter](x: CK) = x.toJson

override def allowDamlTransactionBatching = true

protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor](
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler): ConnectionIO[Int] = {
Expand Down Expand Up @@ -882,6 +900,8 @@ private final class OracleQueries(
protected[this] override def toDBContractKey[CK: JsonWriter](x: CK): DBContractKey =
JsObject(Map("key" -> x.toJson))

override def allowDamlTransactionBatching = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


protected[this] override def primInsertContracts[F[_]: cats.Foldable: Functor](
dbcs: F[DBContract[SurrogateTpId, DBContractKey, JsValue, Array[String]]]
)(implicit log: LogHandler): ConnectionIO[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,68 +291,74 @@ private class ContractsFetch(
import domain.Offset._, fetchContext.{jwt, ledgerId, parties}
val startOffset = offsets.values.toList.minimum.cata(AbsoluteBookmark(_), LedgerBegin)

val graph = RunnableGraph.fromGraph(
GraphDSL.createGraph(
Sink.queue[ConnectionIO[Unit]](),
Sink.last[BeginBookmark[domain.Offset]],
)(Keep.both) { implicit builder => (acsSink, offsetSink) =>
import GraphDSL.Implicits._

val txnK = getCreatesAndArchivesSince(
jwt,
ledgerId,
transactionFilter(parties, List(templateId)),
_: lav1.ledger_offset.LedgerOffset,
absEnd,
)(lc)

// include ACS iff starting at LedgerBegin
val (idses, lastOff) = (startOffset, disableAcs) match {
case (LedgerBegin, false) =>
val stepsAndOffset = builder add acsFollowingAndBoundary(txnK)
stepsAndOffset.in <~ getActiveContracts(
jwt,
ledgerId,
transactionFilter(parties, List(templateId)),
true,
)(lc)
(stepsAndOffset.out0, stepsAndOffset.out1)

case (AbsoluteBookmark(_), _) | (LedgerBegin, true) =>
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
stepsAndOffset.in <~ Source.single(startOffset)
(
(stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0,
stepsAndOffset.out1,
)
}
// skip if *we don't use the acs* (which can read past absEnd) and current
// DB is already caught up to absEnd
if (startOffset == AbsoluteBookmark(absEnd.toDomain))
fconn.pure(startOffset)
else {
val graph = RunnableGraph.fromGraph(
GraphDSL.createGraph(
Sink.queue[ConnectionIO[Unit]](),
Sink.last[BeginBookmark[domain.Offset]],
)(Keep.both) { implicit builder => (acsSink, offsetSink) =>
import GraphDSL.Implicits._

val txnK = getCreatesAndArchivesSince(
jwt,
ledgerId,
transactionFilter(parties, List(templateId)),
_: lav1.ledger_offset.LedgerOffset,
absEnd,
)(lc)

// include ACS iff starting at LedgerBegin
val (idses, lastOff) = (startOffset, disableAcs) match {
case (LedgerBegin, false) =>
val stepsAndOffset = builder add acsFollowingAndBoundary(txnK)
stepsAndOffset.in <~ getActiveContracts(
jwt,
ledgerId,
transactionFilter(parties, List(templateId)),
true,
)(lc)
(stepsAndOffset.out0, stepsAndOffset.out1)

case (AbsoluteBookmark(_), _) | (LedgerBegin, true) =>
val stepsAndOffset = builder add transactionsFollowingBoundary(txnK)
stepsAndOffset.in <~ Source.single(startOffset)
(
(stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0,
stepsAndOffset.out1,
)
}

val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.via(conflation)
.map(insertAndDelete)
val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.via(if (sjd.q.queries.allowDamlTransactionBatching) conflation else Flow.apply)
.map(insertAndDelete)

idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
lastOff ~> offsetSink
idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
lastOff ~> offsetSink

ClosedShape
}
)
ClosedShape
}
)

val (acsQueue, lastOffsetFuture) = graph.run()
val (acsQueue, lastOffsetFuture) = graph.run()

for {
_ <- sinkCioSequence_(acsQueue)
offset0 <- connectionIOFuture(lastOffsetFuture)
offsetOrError <- offset0 max AbsoluteBookmark(absEnd.toDomain) match {
case ab @ AbsoluteBookmark(newOffset) =>
ContractDao
.updateOffset(parties, templateId, newOffset, offsets)
.map(_ => ab)
case LedgerBegin =>
fconn.pure(LedgerBegin)
}
} yield offsetOrError
for {
_ <- sinkCioSequence_(acsQueue)
offset0 <- connectionIOFuture(lastOffsetFuture)
offsetOrError <- offset0 max AbsoluteBookmark(absEnd.toDomain) match {
case ab @ AbsoluteBookmark(newOffset) =>
ContractDao
.updateOffset(parties, templateId, newOffset, offsets)
.map(_ => ab)
case LedgerBegin =>
fconn.pure(LedgerBegin)
}
} yield offsetOrError
}
}
}

Expand Down