Skip to content

Commit

Permalink
limit contract insertion/deletion batching on backpressure (#11589)
Browse files Browse the repository at this point in the history
* move contract insertion/deletion batching to separate function

* limit contract insertion/deletion batching on backpressure

* add changelog

CHANGELOG_BEGIN
- [JSON API] While updating the contract table for a query, if the DB appears to be slow,
  JSON API will slow down its own inserts and deletes at some point rather than construct
  ever-larger INSERT and DELETE batch commands.
  See `issue #11589 <https://github.com/digital-asset/daml/pull/11589>`__.
CHANGELOG_END
  • Loading branch information
S11001001 authored Nov 9, 2021
1 parent 3588284 commit 6372d41
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private[daml] final case class InsertDeleteStep[+D, +C](
deletes ++ o.deletes,
)

/** NB: This is ''not'' distributive across `append`. */
def size: Int = inserts.length + deletes.size

def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty

def leftMap[DD](f: D => DD): InsertDeleteStep[DD, C] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.http

import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ClosedShape, FanOutShape2, Materializer}
import com.daml.http.dbbackend.{ContractDao, SupportedJdbcDriver}
Expand Down Expand Up @@ -327,7 +328,7 @@ private class ContractsFetch(

val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.conflate(_ append _)
.via(conflation)
.map(insertAndDelete)

idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
Expand Down Expand Up @@ -406,6 +407,16 @@ private[http] object ContractsFetch {
}.void
}

private def conflation[D, C: InsertDeleteStep.Cid]
: Flow[InsertDeleteStep[D, C], InsertDeleteStep[D, C], NotUsed] = {
// when considering this cost, keep in mind that each deleteContracts
// may entail a table scan. Backpressure indicates that DB operations
// are slow, the idea here is to set the DB up for success
val maxCost = 250L
Flow[InsertDeleteStep[D, C]]
.batchWeighted(max = maxCost, costFn = _.size.toLong, identity)(_ append _)
}

private final case class FetchContext(
jwt: Jwt,
ledgerId: LedgerApiDomain.LedgerId,
Expand Down

0 comments on commit 6372d41

Please sign in to comment.