Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit contract insertion/deletion batching on backpressure #11589

Merged
merged 4 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private[daml] final case class InsertDeleteStep[+D, +C](
deletes ++ o.deletes,
)

/** NB: This is ''not'' distributive across `append`. */
def size: Int = inserts.length + deletes.size

def nonEmpty: Boolean = inserts.nonEmpty || deletes.nonEmpty

def leftMap[DD](f: D => DD): InsertDeleteStep[DD, C] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.http

import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ClosedShape, FanOutShape2, Materializer}
import com.daml.http.dbbackend.{ContractDao, SupportedJdbcDriver}
Expand Down Expand Up @@ -327,7 +328,7 @@ private class ContractsFetch(

val transactInsertsDeletes = Flow
.fromFunction(jsonifyInsertDeleteStep)
.conflate(_ append _)
.via(conflation)
.map(insertAndDelete)

idses.map(_.toInsertDelete) ~> transactInsertsDeletes ~> acsSink
Expand Down Expand Up @@ -406,6 +407,16 @@ private[http] object ContractsFetch {
}.void
}

private def conflation[D, C: InsertDeleteStep.Cid]
: Flow[InsertDeleteStep[D, C], InsertDeleteStep[D, C], NotUsed] = {
// when considering this cost, keep in mind that each deleteContracts
// may entail a table scan. Backpressure indicates that DB operations
// are slow, the idea here is to set the DB up for success
val maxCost = 250L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable, rather than hard-coded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so. The benefit comes from setting a [reasonably large] limit, more than what that limit is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@S11001001 how did we come up with this maxCost value , also can you remind me of the semantics of delete and why it may entail a table scan ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We came up with it by me taking the number 200 from the client output and adding 50 to it. It's a buffer size; it doesn’t really matter what the number is so much as that there is a number and it is reasonably "large" so that the buffer doesn't grow without bound but still gives you good-sized "chunks".

As for the deletes, it depends on how the DB implements delete from contracts where contract_id in (id1, id2, ...).

Flow[InsertDeleteStep[D, C]]
.batchWeighted(max = maxCost, costFn = _.size.toLong, identity)(_ append _)
}

private final case class FetchContext(
jwt: Jwt,
ledgerId: LedgerApiDomain.LedgerId,
Expand Down