Skip to content

Commit

Permalink
[Mutable cache] Resolve with full lookup on negative cache read-throu…
Browse files Browse the repository at this point in the history
…gh lookups [DPP-501] (#10262)

* [Mutable cache] Resolve with full lookup on missed cache thread-throughs
* Do not store negative lookups
* Metrics for counting divulgence and full lookups
* Metrics for counting read-through not found

CHANGELOG_BEGIN
CHANGELOG_END

* Added comment detailing secondary lookups for divulgence

* Apply suggestions from code review

Co-authored-by: Simon Meier <[email protected]>

Co-authored-by: Simon Meier <[email protected]>
  • Loading branch information
tudor-da and meiersi-da authored Jul 20, 2021
1 parent f59951b commit 93d0ed6
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 31 deletions.
8 changes: 8 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ final class Metrics(val registry: MetricRegistry) {

val dispatcherLag: Timer = registry.timer(Prefix :+ "dispatcher_lag")

val resolveDivulgenceLookup: Counter =
registry.counter(Prefix :+ "resolve_divulgence_lookup")

val resolveFullLookup: Counter =
registry.counter(Prefix :+ "resolve_full_lookup")

val readThroughNotFound: Counter = registry.counter(Prefix :+ "read_through_not_found")

val indexSequentialId = new VarGauge[Long](0L)
registry.register(
Prefix :+ "index_sequential_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

private[platform] class MutableCacheBackedContractStore(
Expand Down Expand Up @@ -124,7 +125,15 @@ private[platform] class MutableCacheBackedContractStore(
_ <- contractsCache.putAsync(
key = contractId,
validAt = currentCacheSequentialId,
eventualValue = eventualValue,
eventualValue = eventualValue.transformWith {
case Success(NotFound) =>
metrics.daml.execution.cache.readThroughNotFound.inc()
// We must not cache negative lookups by contract-id, as they can be invalidated by later divulgence events.
// This is OK from a performance perspective, as we do not expect uses-cases that require
// caching of contract absence or the results of looking up divulged contracts.
Future.failed(ContractReadThroughNotFound(contractId))
case result => Future.fromTry(result)
},
)
value <- eventualValue
} yield value
Expand All @@ -149,31 +158,34 @@ private[platform] class MutableCacheBackedContractStore(
Future.successful(Some(contract))
case Archived(stakeholders) if nonEmptyIntersection(stakeholders, readers) =>
Future.successful(Option.empty)
case ContractStateValue.NotFound =>
logger.warn(s"Contract not found for $contractId")
Future.successful(Option.empty)
case existingContractValue: ExistingContractValue =>
case contractStateValue =>
// This flow is exercised when the readers are not stakeholders of the contract
// (the contract might have been divulged to the readers)
// OR the contract was not found in the index
//
logger.debug(s"Checking divulgence for contractId=$contractId and readers=$readers")
resolveDivulgenceLookup(existingContractValue, contractId, readers)
resolveDivulgenceLookup(contractStateValue, contractId, readers)
}

private def resolveDivulgenceLookup(
existingContractValue: ExistingContractValue,
contractStateValue: ContractStateValue,
contractId: ContractId,
forParties: Set[Party],
)(implicit
loggingContext: LoggingContext
): Future[Option[Contract]] =
existingContractValue match {
contractStateValue match {
case Active(contract, _, _) =>
metrics.daml.execution.cache.resolveDivulgenceLookup.inc()
contractsReader.lookupActiveContractWithCachedArgument(
forParties,
contractId,
contract.arg,
)
case _: Archived =>
// We need to fetch the contract here since the archival
case _: Archived | NotFound =>
// We need to fetch the contract here since the contract creation or archival
// may have not been divulged to the readers
metrics.daml.execution.cache.resolveFullLookup.inc()
contractsReader.lookupActiveContractAndLoadArgument(
forParties,
contractId,
Expand Down Expand Up @@ -373,6 +385,11 @@ private[platform] object MutableCacheBackedContractStore {
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)

final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
override def getMessage: String =
s"Contract not found for contract id $contractId. Hint: this could be due racing with a concurrent archival."
}

private[cache] class CacheIndex {
private val offsetRef: AtomicReference[Option[(Offset, EventSequentialId)]] =
new AtomicReference(Option.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ private[platform] case class StateCache[K, V](cache: Cache[K, V], registerUpdate
eventualUpdate: Future[V],
validAt: Long,
)(implicit loggingContext: LoggingContext): Future[Unit] =
eventualUpdate.transform[Unit](
(value: V) =>
eventualUpdate
.map { (value: V) =>
pendingUpdates.synchronized {
pendingUpdates
.get(key)
Expand All @@ -107,13 +107,14 @@ private[platform] case class StateCache[K, V](cache: Cache[K, V], registerUpdate
removeFromPending(key)
}
.getOrElse(logger.error(s"Pending updates tracker for $key not registered "))
},
(err: Throwable) => {
removeFromPending(key)
}
}
.recover { case err =>
pendingUpdates.synchronized {
removeFromPending(key)
}
logger.warn(s"Failure in pending cache update for key $key", err)
err
},
)
}

private def removeFromPending(key: K)(implicit loggingContext: LoggingContext): Unit =
discard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ import com.daml.platform.store.cache.MutableCacheBackedContractStoreSpec.{
}
import com.daml.platform.store.dao.events.ContractStateEvent
import com.daml.platform.store.interfaces.LedgerDaoContractsReader
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{KeyAssigned, KeyUnassigned}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
ContractState,
KeyAssigned,
KeyUnassigned,
}
import org.mockito.MockitoSugar
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -204,6 +208,37 @@ class MutableCacheBackedContractStoreSpec
}
}

"read-through the cache without storing negative lookups" in {
val spyContractsReader = spy(ContractsReaderFixture())
for {
store <- contractStore(cachesSize = 1L, spyContractsReader).asFuture
_ = store.cacheIndex.set(unusedOffset, 1L)
negativeLookup_cId6 <- store.lookupActiveContract(Set(alice), cId_6)
positiveLookup_cId6 <- store.lookupActiveContract(Set(alice), cId_6)
} yield {
negativeLookup_cId6 shouldBe Option.empty
positiveLookup_cId6 shouldBe Some(contract6)

verify(spyContractsReader, times(wantedNumberOfInvocations = 2))
.lookupContractState(cId_6, 1L)
succeed
}
}

"resort to resolveDivulgenceLookup on not found" in {
val spyContractsReader = spy(ContractsReaderFixture())
for {
store <- contractStore(cachesSize = 1L, spyContractsReader).asFuture
_ = store.cacheIndex.set(unusedOffset, 1L)
resolvedLookup_cId7 <- store.lookupActiveContract(Set(bob), cId_7)
} yield {
resolvedLookup_cId7 shouldBe Some(contract7)

verify(spyContractsReader).lookupActiveContractAndLoadArgument(Set(bob), cId_7)
succeed
}
}

"present the contract state if visible at specific cache offsets (with no cache)" in {
for {
store <- contractStore(cachesSize = 0L).asFuture
Expand Down Expand Up @@ -383,11 +418,11 @@ object MutableCacheBackedContractStoreSpec {
private val unusedOffset = Offset.beforeBegin
private val Seq(alice, bob, charlie) = Seq("alice", "bob", "charlie").map(party)
private val (
Seq(cId_1, cId_2, cId_3, cId_4, cId_5),
Seq(contract1, contract2, contract3, contract4, _),
Seq(t1, t2, t3, t4, _),
Seq(cId_1, cId_2, cId_3, cId_4, cId_5, cId_6, cId_7),
Seq(contract1, contract2, contract3, contract4, _, contract6, contract7),
Seq(t1, t2, t3, t4, _, t6, _),
) =
(1 to 5).map { id =>
(1 to 7).map { id =>
(contractId(id), contract(s"id$id"), Instant.ofEpochSecond(id.toLong))
}.unzip3

Expand All @@ -413,6 +448,8 @@ object MutableCacheBackedContractStoreSpec {
.acquire()(ResourceContext(scala.concurrent.ExecutionContext.global))

case class ContractsReaderFixture() extends LedgerDaoContractsReader {
@volatile private var initialResultForCid6 = Future.successful(Option.empty[ContractState])

override def lookupKeyState(key: Key, validAt: Long)(implicit
loggingContext: LoggingContext
): Future[LedgerDaoContractsReader.KeyState] = (key, validAt) match {
Expand All @@ -423,14 +460,21 @@ object MutableCacheBackedContractStoreSpec {

override def lookupContractState(contractId: ContractId, validAt: Long)(implicit
loggingContext: LoggingContext
): Future[Option[LedgerDaoContractsReader.ContractState]] = (contractId, validAt) match {
case (`cId_1`, 0L) => activeContract(contract1, Set(alice), t1)
case (`cId_1`, validAt) if validAt > 0L => archivedContract(Set(alice))
case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2)
case (`cId_3`, _) => activeContract(contract3, Set(bob), t3)
case (`cId_4`, _) => activeContract(contract4, Set(bob), t4)
case (`cId_5`, _) => archivedContract(Set(bob))
case _ => Future.successful(Option.empty)
): Future[Option[LedgerDaoContractsReader.ContractState]] = {
(contractId, validAt) match {
case (`cId_1`, 0L) => activeContract(contract1, Set(alice), t1)
case (`cId_1`, validAt) if validAt > 0L => archivedContract(Set(alice))
case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2)
case (`cId_3`, _) => activeContract(contract3, Set(bob), t3)
case (`cId_4`, _) => activeContract(contract4, Set(bob), t4)
case (`cId_5`, _) => archivedContract(Set(bob))
case (`cId_6`, _) =>
// Simulate store being populated from one query to another
val result = initialResultForCid6
initialResultForCid6 = activeContract(contract6, Set(alice), t6)
result
case _ => Future.successful(Option.empty)
}
}

override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
Expand Down Expand Up @@ -459,6 +503,7 @@ object MutableCacheBackedContractStoreSpec {
Future.successful(Some(contract3))
case (`cId_1`, parties) if parties.contains(charlie) =>
Future.successful(Some(contract1))
case (`cId_7`, parties) if parties == Set(bob) => Future.successful(Some(contract7))
case _ => Future.successful(Option.empty)
}

Expand Down

0 comments on commit 93d0ed6

Please sign in to comment.