diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index e0153cb2cb05..e1828addde32 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -84,6 +84,14 @@ final class Metrics(val registry: MetricRegistry) { val dispatcherLag: Timer = registry.timer(Prefix :+ "dispatcher_lag") + val resolveDivulgenceLookup: Counter = + registry.counter(Prefix :+ "resolve_divulgence_lookup") + + val resolveFullLookup: Counter = + registry.counter(Prefix :+ "resolve_full_lookup") + + val readThroughNotFound: Counter = registry.counter(Prefix :+ "read_through_not_found") + val indexSequentialId = new VarGauge[Long](0L) registry.register( Prefix :+ "index_sequential_id", diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala b/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala index b75e8ec7e57c..b67fd7ffe200 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala @@ -30,6 +30,7 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{ import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NoStackTrace import scala.util.{Failure, Success, Try} private[platform] class MutableCacheBackedContractStore( @@ -124,7 +125,15 @@ private[platform] class MutableCacheBackedContractStore( _ <- contractsCache.putAsync( key = contractId, validAt = currentCacheSequentialId, - eventualValue = eventualValue, + eventualValue = eventualValue.transformWith { + case Success(NotFound) => + metrics.daml.execution.cache.readThroughNotFound.inc() + // We must not cache negative lookups by contract-id, as they can be invalidated by later divulgence events. + // This is OK from a performance perspective, as we do not expect uses-cases that require + // caching of contract absence or the results of looking up divulged contracts. + Future.failed(ContractReadThroughNotFound(contractId)) + case result => Future.fromTry(result) + }, ) value <- eventualValue } yield value @@ -149,31 +158,34 @@ private[platform] class MutableCacheBackedContractStore( Future.successful(Some(contract)) case Archived(stakeholders) if nonEmptyIntersection(stakeholders, readers) => Future.successful(Option.empty) - case ContractStateValue.NotFound => - logger.warn(s"Contract not found for $contractId") - Future.successful(Option.empty) - case existingContractValue: ExistingContractValue => + case contractStateValue => + // This flow is exercised when the readers are not stakeholders of the contract + // (the contract might have been divulged to the readers) + // OR the contract was not found in the index + // logger.debug(s"Checking divulgence for contractId=$contractId and readers=$readers") - resolveDivulgenceLookup(existingContractValue, contractId, readers) + resolveDivulgenceLookup(contractStateValue, contractId, readers) } private def resolveDivulgenceLookup( - existingContractValue: ExistingContractValue, + contractStateValue: ContractStateValue, contractId: ContractId, forParties: Set[Party], )(implicit loggingContext: LoggingContext ): Future[Option[Contract]] = - existingContractValue match { + contractStateValue match { case Active(contract, _, _) => + metrics.daml.execution.cache.resolveDivulgenceLookup.inc() contractsReader.lookupActiveContractWithCachedArgument( forParties, contractId, contract.arg, ) - case _: Archived => - // We need to fetch the contract here since the archival + case _: Archived | NotFound => + // We need to fetch the contract here since the contract creation or archival // may have not been divulged to the readers + metrics.daml.execution.cache.resolveFullLookup.inc() contractsReader.lookupActiveContractAndLoadArgument( forParties, contractId, @@ -373,6 +385,11 @@ private[platform] object MutableCacheBackedContractStore { "Cannot lookup the maximum ledger time for an empty set of contract identifiers" ) + final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace { + override def getMessage: String = + s"Contract not found for contract id $contractId. Hint: this could be due racing with a concurrent archival." + } + private[cache] class CacheIndex { private val offsetRef: AtomicReference[Option[(Offset, EventSequentialId)]] = new AtomicReference(Option.empty) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/cache/StateCache.scala b/ledger/participant-integration-api/src/main/scala/platform/store/cache/StateCache.scala index 2b30ff986c66..f2f8fa19a635 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/cache/StateCache.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/cache/StateCache.scala @@ -95,8 +95,8 @@ private[platform] case class StateCache[K, V](cache: Cache[K, V], registerUpdate eventualUpdate: Future[V], validAt: Long, )(implicit loggingContext: LoggingContext): Future[Unit] = - eventualUpdate.transform[Unit]( - (value: V) => + eventualUpdate + .map { (value: V) => pendingUpdates.synchronized { pendingUpdates .get(key) @@ -107,13 +107,14 @@ private[platform] case class StateCache[K, V](cache: Cache[K, V], registerUpdate removeFromPending(key) } .getOrElse(logger.error(s"Pending updates tracker for $key not registered ")) - }, - (err: Throwable) => { - removeFromPending(key) + } + } + .recover { case err => + pendingUpdates.synchronized { + removeFromPending(key) + } logger.warn(s"Failure in pending cache update for key $key", err) - err - }, - ) + } private def removeFromPending(key: K)(implicit loggingContext: LoggingContext): Unit = discard( diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala index ff56eec4521c..ce72d64cd176 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala @@ -34,7 +34,11 @@ import com.daml.platform.store.cache.MutableCacheBackedContractStoreSpec.{ } import com.daml.platform.store.dao.events.ContractStateEvent import com.daml.platform.store.interfaces.LedgerDaoContractsReader -import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{KeyAssigned, KeyUnassigned} +import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{ + ContractState, + KeyAssigned, + KeyUnassigned, +} import org.mockito.MockitoSugar import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers @@ -204,6 +208,37 @@ class MutableCacheBackedContractStoreSpec } } + "read-through the cache without storing negative lookups" in { + val spyContractsReader = spy(ContractsReaderFixture()) + for { + store <- contractStore(cachesSize = 1L, spyContractsReader).asFuture + _ = store.cacheIndex.set(unusedOffset, 1L) + negativeLookup_cId6 <- store.lookupActiveContract(Set(alice), cId_6) + positiveLookup_cId6 <- store.lookupActiveContract(Set(alice), cId_6) + } yield { + negativeLookup_cId6 shouldBe Option.empty + positiveLookup_cId6 shouldBe Some(contract6) + + verify(spyContractsReader, times(wantedNumberOfInvocations = 2)) + .lookupContractState(cId_6, 1L) + succeed + } + } + + "resort to resolveDivulgenceLookup on not found" in { + val spyContractsReader = spy(ContractsReaderFixture()) + for { + store <- contractStore(cachesSize = 1L, spyContractsReader).asFuture + _ = store.cacheIndex.set(unusedOffset, 1L) + resolvedLookup_cId7 <- store.lookupActiveContract(Set(bob), cId_7) + } yield { + resolvedLookup_cId7 shouldBe Some(contract7) + + verify(spyContractsReader).lookupActiveContractAndLoadArgument(Set(bob), cId_7) + succeed + } + } + "present the contract state if visible at specific cache offsets (with no cache)" in { for { store <- contractStore(cachesSize = 0L).asFuture @@ -383,11 +418,11 @@ object MutableCacheBackedContractStoreSpec { private val unusedOffset = Offset.beforeBegin private val Seq(alice, bob, charlie) = Seq("alice", "bob", "charlie").map(party) private val ( - Seq(cId_1, cId_2, cId_3, cId_4, cId_5), - Seq(contract1, contract2, contract3, contract4, _), - Seq(t1, t2, t3, t4, _), + Seq(cId_1, cId_2, cId_3, cId_4, cId_5, cId_6, cId_7), + Seq(contract1, contract2, contract3, contract4, _, contract6, contract7), + Seq(t1, t2, t3, t4, _, t6, _), ) = - (1 to 5).map { id => + (1 to 7).map { id => (contractId(id), contract(s"id$id"), Instant.ofEpochSecond(id.toLong)) }.unzip3 @@ -413,6 +448,8 @@ object MutableCacheBackedContractStoreSpec { .acquire()(ResourceContext(scala.concurrent.ExecutionContext.global)) case class ContractsReaderFixture() extends LedgerDaoContractsReader { + @volatile private var initialResultForCid6 = Future.successful(Option.empty[ContractState]) + override def lookupKeyState(key: Key, validAt: Long)(implicit loggingContext: LoggingContext ): Future[LedgerDaoContractsReader.KeyState] = (key, validAt) match { @@ -423,14 +460,21 @@ object MutableCacheBackedContractStoreSpec { override def lookupContractState(contractId: ContractId, validAt: Long)(implicit loggingContext: LoggingContext - ): Future[Option[LedgerDaoContractsReader.ContractState]] = (contractId, validAt) match { - case (`cId_1`, 0L) => activeContract(contract1, Set(alice), t1) - case (`cId_1`, validAt) if validAt > 0L => archivedContract(Set(alice)) - case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2) - case (`cId_3`, _) => activeContract(contract3, Set(bob), t3) - case (`cId_4`, _) => activeContract(contract4, Set(bob), t4) - case (`cId_5`, _) => archivedContract(Set(bob)) - case _ => Future.successful(Option.empty) + ): Future[Option[LedgerDaoContractsReader.ContractState]] = { + (contractId, validAt) match { + case (`cId_1`, 0L) => activeContract(contract1, Set(alice), t1) + case (`cId_1`, validAt) if validAt > 0L => archivedContract(Set(alice)) + case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2) + case (`cId_3`, _) => activeContract(contract3, Set(bob), t3) + case (`cId_4`, _) => activeContract(contract4, Set(bob), t4) + case (`cId_5`, _) => archivedContract(Set(bob)) + case (`cId_6`, _) => + // Simulate store being populated from one query to another + val result = initialResultForCid6 + initialResultForCid6 = activeContract(contract6, Set(alice), t6) + result + case _ => Future.successful(Option.empty) + } } override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit @@ -459,6 +503,7 @@ object MutableCacheBackedContractStoreSpec { Future.successful(Some(contract3)) case (`cId_1`, parties) if parties.contains(charlie) => Future.successful(Some(contract1)) + case (`cId_7`, parties) if parties == Set(bob) => Future.successful(Some(contract7)) case _ => Future.successful(Option.empty) }