From 0bea5e3ef7d9b9fb3f593d86d42b5b599dc3c4f2 Mon Sep 17 00:00:00 2001 From: tudor-da Date: Fri, 9 Jul 2021 09:30:05 +0200 Subject: [PATCH] Allow retriable lookupMaximumLedgerTime on contention (#10211) CHANGELOG_BEGIN CHANGELOG_END --- .../MutableCacheBackedContractStore.scala | 39 +++++++++----- .../MutableCacheBackedContractStoreSpec.scala | 54 ++++++++++++------- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala b/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala index 475a9556cbbd..4cbb09aac00c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/cache/MutableCacheBackedContractStore.scala @@ -30,6 +30,7 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{ import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} private[platform] class MutableCacheBackedContractStore( metrics: Metrics, @@ -74,24 +75,38 @@ private[platform] class MutableCacheBackedContractStore( if (ids.isEmpty) Future.failed(EmptyContractIds()) else { - val (cached, toBeFetched) = partitionCached(ids) - if (toBeFetched.isEmpty) Future.successful(Some(cached.max)) - else - contractsReader - .lookupMaximumLedgerTime(toBeFetched) - .map(_.map(m => (cached + m).max)) + Future + .fromTry(partitionCached(ids)) + .flatMap { + case (cached, toBeFetched) if toBeFetched.isEmpty => + Future.successful(Some(cached.max)) + case (cached, toBeFetched) => + contractsReader + .lookupMaximumLedgerTime(toBeFetched) + .map(_.map(m => (cached + m).max)) + } } private def partitionCached( ids: Set[ContractId] )(implicit loggingContext: LoggingContext) = { val cacheQueried = ids.map(id => id -> contractsCache.get(id)) - val cached = cacheQueried.collect { - case (_, Some(Active(_, _, createLedgerEffectiveTime))) => createLedgerEffectiveTime - case (_, Some(_)) => throw ContractNotFound(ids) + + val cached = cacheQueried.view + .map(_._2) + .foldLeft(Try(Set.empty[Instant])) { + case (Success(timestamps), Some(active: Active)) => + Success(timestamps + active.createLedgerEffectiveTime) + case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids)) + case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids)) + case (Success(timestamps), None) => Success(timestamps) + case (failure, _) => failure + } + + cached.map { cached => + val missing = cacheQueried.collect { case (id, None) => id } + (cached, missing) } - val missing = cacheQueried.collect { case (id, None) => id } - (cached, missing) } private def readThroughContractsCache(contractId: ContractId)(implicit @@ -350,7 +365,7 @@ private[platform] object MutableCacheBackedContractStore { final case class ContractNotFound(contractIds: Set[ContractId]) extends IllegalArgumentException( - s"One or more of the following contract identifiers has been found: ${contractIds.map(_.coid).mkString(", ")}" + s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}" ) final case class EmptyContractIds() diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala index ed264bf127a8..c3d200418b98 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/cache/MutableCacheBackedContractStoreSpec.scala @@ -297,23 +297,33 @@ class MutableCacheBackedContractStoreSpec } } - "fail if one of the contract ids doesn't have an associated active contract" in { - recoverToSucceededIf[ContractNotFound] { - for { - store <- contractStore(cachesSize = 0L).asFuture - _ = store.cacheIndex.set(unusedOffset, 2L) - _ <- store.lookupMaximumLedgerTime(Set(cId_1, cId_2)) - } yield succeed - } + "fail if one of the cached contract ids doesn't have an associated active contract" in { + for { + store <- contractStore(cachesSize = 1L).asFuture + _ = store.cacheIndex.set(unusedOffset, 2L) + // populate the cache + _ <- store.lookupActiveContract(Set(bob), cId_5) + assertion <- recoverToSucceededIf[ContractNotFound]( + store.lookupMaximumLedgerTime(Set(cId_1, cId_5)) + ) + } yield assertion + } + + "fail if one of the fetched contract ids doesn't have an associated active contract" in { + for { + store <- contractStore(cachesSize = 0L).asFuture + _ = store.cacheIndex.set(unusedOffset, 2L) + assertion <- recoverToSucceededIf[IllegalArgumentException]( + store.lookupMaximumLedgerTime(Set(cId_1, cId_5)) + ) + } yield assertion } "fail if the requested contract id set is empty" in { - recoverToSucceededIf[EmptyContractIds] { - for { - store <- contractStore(cachesSize = 0L).asFuture - _ <- store.lookupMaximumLedgerTime(Set.empty) - } yield succeed - } + for { + store <- contractStore(cachesSize = 0L).asFuture + _ <- recoverToSucceededIf[EmptyContractIds](store.lookupMaximumLedgerTime(Set.empty)) + } yield succeed } } @@ -373,11 +383,11 @@ object MutableCacheBackedContractStoreSpec { private val unusedOffset = Offset.beforeBegin private val Seq(alice, bob, charlie) = Seq("alice", "bob", "charlie").map(party) private val ( - Seq(cId_1, cId_2, cId_3, cId_4), - Seq(contract1, contract2, contract3, contract4), - Seq(t1, t2, t3, t4), + Seq(cId_1, cId_2, cId_3, cId_4, cId_5), + Seq(contract1, contract2, contract3, contract4, _), + Seq(t1, t2, t3, t4, _), ) = - (1 to 4).map { id => + (1 to 5).map { id => (contractId(id), contract(s"id$id"), Instant.ofEpochSecond(id.toLong)) }.unzip3 @@ -419,6 +429,7 @@ object MutableCacheBackedContractStoreSpec { case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2) case (`cId_3`, _) => activeContract(contract3, Set(bob), t3) case (`cId_4`, _) => activeContract(contract4, Set(bob), t4) + case (`cId_5`, _) => archivedContract(Set(bob)) case _ => Future.successful(Option.empty) } @@ -429,7 +440,12 @@ object MutableCacheBackedContractStoreSpec { Future.successful(Some(t4)) case set if set.isEmpty => Future.failed(EmptyContractIds()) - case _ => Future.failed(ContractNotFound(ids)) + case _ => + Future.failed( + new IllegalArgumentException( + s"The following contracts have not been found: ${ids.map(_.coid).mkString(", ")}" + ) + ) } override def lookupActiveContractAndLoadArgument(