Skip to content

Commit

Permalink
Allow retriable lookupMaximumLedgerTime on contention (#10211)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored Jul 9, 2021
1 parent ca2a9e9 commit 0bea5e3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

private[platform] class MutableCacheBackedContractStore(
metrics: Metrics,
Expand Down Expand Up @@ -74,24 +75,38 @@ private[platform] class MutableCacheBackedContractStore(
if (ids.isEmpty)
Future.failed(EmptyContractIds())
else {
val (cached, toBeFetched) = partitionCached(ids)
if (toBeFetched.isEmpty) Future.successful(Some(cached.max))
else
contractsReader
.lookupMaximumLedgerTime(toBeFetched)
.map(_.map(m => (cached + m).max))
Future
.fromTry(partitionCached(ids))
.flatMap {
case (cached, toBeFetched) if toBeFetched.isEmpty =>
Future.successful(Some(cached.max))
case (cached, toBeFetched) =>
contractsReader
.lookupMaximumLedgerTime(toBeFetched)
.map(_.map(m => (cached + m).max))
}
}

private def partitionCached(
ids: Set[ContractId]
)(implicit loggingContext: LoggingContext) = {
val cacheQueried = ids.map(id => id -> contractsCache.get(id))
val cached = cacheQueried.collect {
case (_, Some(Active(_, _, createLedgerEffectiveTime))) => createLedgerEffectiveTime
case (_, Some(_)) => throw ContractNotFound(ids)

val cached = cacheQueried.view
.map(_._2)
.foldLeft(Try(Set.empty[Instant])) {
case (Success(timestamps), Some(active: Active)) =>
Success(timestamps + active.createLedgerEffectiveTime)
case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids))
case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids))
case (Success(timestamps), None) => Success(timestamps)
case (failure, _) => failure
}

cached.map { cached =>
val missing = cacheQueried.collect { case (id, None) => id }
(cached, missing)
}
val missing = cacheQueried.collect { case (id, None) => id }
(cached, missing)
}

private def readThroughContractsCache(contractId: ContractId)(implicit
Expand Down Expand Up @@ -350,7 +365,7 @@ private[platform] object MutableCacheBackedContractStore {

final case class ContractNotFound(contractIds: Set[ContractId])
extends IllegalArgumentException(
s"One or more of the following contract identifiers has been found: ${contractIds.map(_.coid).mkString(", ")}"
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
)

final case class EmptyContractIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,23 +297,33 @@ class MutableCacheBackedContractStoreSpec
}
}

"fail if one of the contract ids doesn't have an associated active contract" in {
recoverToSucceededIf[ContractNotFound] {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ = store.cacheIndex.set(unusedOffset, 2L)
_ <- store.lookupMaximumLedgerTime(Set(cId_1, cId_2))
} yield succeed
}
"fail if one of the cached contract ids doesn't have an associated active contract" in {
for {
store <- contractStore(cachesSize = 1L).asFuture
_ = store.cacheIndex.set(unusedOffset, 2L)
// populate the cache
_ <- store.lookupActiveContract(Set(bob), cId_5)
assertion <- recoverToSucceededIf[ContractNotFound](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
)
} yield assertion
}

"fail if one of the fetched contract ids doesn't have an associated active contract" in {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ = store.cacheIndex.set(unusedOffset, 2L)
assertion <- recoverToSucceededIf[IllegalArgumentException](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
)
} yield assertion
}

"fail if the requested contract id set is empty" in {
recoverToSucceededIf[EmptyContractIds] {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ <- store.lookupMaximumLedgerTime(Set.empty)
} yield succeed
}
for {
store <- contractStore(cachesSize = 0L).asFuture
_ <- recoverToSucceededIf[EmptyContractIds](store.lookupMaximumLedgerTime(Set.empty))
} yield succeed
}
}

Expand Down Expand Up @@ -373,11 +383,11 @@ object MutableCacheBackedContractStoreSpec {
private val unusedOffset = Offset.beforeBegin
private val Seq(alice, bob, charlie) = Seq("alice", "bob", "charlie").map(party)
private val (
Seq(cId_1, cId_2, cId_3, cId_4),
Seq(contract1, contract2, contract3, contract4),
Seq(t1, t2, t3, t4),
Seq(cId_1, cId_2, cId_3, cId_4, cId_5),
Seq(contract1, contract2, contract3, contract4, _),
Seq(t1, t2, t3, t4, _),
) =
(1 to 4).map { id =>
(1 to 5).map { id =>
(contractId(id), contract(s"id$id"), Instant.ofEpochSecond(id.toLong))
}.unzip3

Expand Down Expand Up @@ -419,6 +429,7 @@ object MutableCacheBackedContractStoreSpec {
case (`cId_2`, validAt) if validAt >= 1L => activeContract(contract2, Set(bob), t2)
case (`cId_3`, _) => activeContract(contract3, Set(bob), t3)
case (`cId_4`, _) => activeContract(contract4, Set(bob), t4)
case (`cId_5`, _) => archivedContract(Set(bob))
case _ => Future.successful(Option.empty)
}

Expand All @@ -429,7 +440,12 @@ object MutableCacheBackedContractStoreSpec {
Future.successful(Some(t4))
case set if set.isEmpty =>
Future.failed(EmptyContractIds())
case _ => Future.failed(ContractNotFound(ids))
case _ =>
Future.failed(
new IllegalArgumentException(
s"The following contracts have not been found: ${ids.map(_.coid).mkString(", ")}"
)
)
}

override def lookupActiveContractAndLoadArgument(
Expand Down

0 comments on commit 0bea5e3

Please sign in to comment.