Skip to content

Commit

Permalink
Fix some tests and MutableCacheBackedContractStore
Browse files Browse the repository at this point in the history
I thought we had types that the compiler checks at compile time :(

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
gerolf-da committed Nov 4, 2021
1 parent e7b417b commit 6c81d8b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.platform.store.cache

import java.util.concurrent.atomic.AtomicReference

import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch}
import akka.{Done, NotUsed}
Expand All @@ -26,11 +28,10 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
KeyState,
}

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

private[platform] class MutableCacheBackedContractStore(
metrics: Metrics,
Expand Down Expand Up @@ -74,30 +75,35 @@ private[platform] class MutableCacheBackedContractStore(
loggingContext: LoggingContext
): Future[Either[Set[ContractId], Option[Timestamp]]] =
Future
.fromTry(partitionCached(ids))
.successful(partitionCached(ids))
.flatMap {
case (cached, toBeFetched) if toBeFetched.isEmpty =>
case Right((cached, toBeFetched)) if toBeFetched.isEmpty =>
Future.successful(Right(Some(cached.max)))
case (cached, toBeFetched) =>
case Right((cached, toBeFetched)) =>
contractsReader
.lookupMaximumLedgerTime(toBeFetched)
.map(_.map(_.map(m => (cached + m).max)))
case Left(missingContracts) => Future.successful(Left(missingContracts))
}

private def partitionCached(
ids: Set[ContractId]
)(implicit loggingContext: LoggingContext) = {
)(implicit
loggingContext: LoggingContext
): Either[Set[ContractId], (Set[Timestamp], Set[ContractId])] = {
val cacheQueried = ids.map(id => id -> contractsCache.get(id))

val cached = cacheQueried.view
.map(_._2)
.foldLeft(Try(Set.empty[Timestamp])) {
case (Success(timestamps), Some(active: Active)) =>
Success(timestamps + active.createLedgerEffectiveTime)
case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids))
case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids))
case (Success(timestamps), None) => Success(timestamps)
case (failure, _) => failure
.foldLeft[Either[Set[ContractId], Set[Timestamp]]](Right(Set.empty[Timestamp])) {
// successful lookups
case (Right(timestamps), (_, Some(active: Active))) =>
Right(timestamps + active.createLedgerEffectiveTime)
case (Right(timestamps), (_, None)) => Right(timestamps)

// failure cases
case (acc, (cid, Some(Archived(_) | NotFound))) =>
acc.left.map(_ + cid).orElse(Left(Set(cid)))
case (acc @ Left(_), _) => acc
}

cached.map { cached =>
Expand Down Expand Up @@ -380,11 +386,6 @@ private[platform] object MutableCacheBackedContractStore {
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
)

final case class EmptyContractIds()
extends IllegalArgumentException(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)

final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
override def getMessage: String =
s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import java.util.TimeZone
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.util.Success

private[backend] trait StorageBackendTestsTimestamps extends Matchers with StorageBackendSpec {
this: AsyncFlatSpec =>

Expand Down Expand Up @@ -50,9 +48,9 @@ private[backend] trait StorageBackendTestsTimestamps extends Matchers with Stora
withDefaultTimeZone("GMT+1")(contractStorageBackend.maximumLedgerTime(Set(cid)))
)
} yield {
withClue("UTC") { let1 shouldBe Success(Some(let)) }
withClue("GMT-1") { let2 shouldBe Success(Some(let)) }
withClue("GMT+1") { let3 shouldBe Success(Some(let)) }
withClue("UTC") { let1 shouldBe Right(Some(let)) }
withClue("GMT-1") { let2 shouldBe Right(Some(let)) }
withClue("GMT+1") { let3 shouldBe Right(Some(let)) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import com.daml.platform.store.appendonlydao.events.ContractStateEvent
import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned}
import com.daml.platform.store.cache.ContractStateValue.{Active, Archived}
import com.daml.platform.store.cache.MutableCacheBackedContractStore.{
ContractNotFound,
EmptyContractIds,
EventSequentialId,
SignalNewLedgerHead,
SubscribeToContractStateEvents,
Expand Down Expand Up @@ -340,7 +338,7 @@ class MutableCacheBackedContractStoreSpec
_ <- store.lookupActiveContract(Set(bob), cId_3)
maxLedgerTime <- store.lookupMaximumLedgerTime(Set(cId_2, cId_3, cId_4))
} yield {
maxLedgerTime shouldBe Some(t4)
maxLedgerTime shouldBe Right(Some(t4))
}
}

Expand All @@ -350,27 +348,16 @@ class MutableCacheBackedContractStoreSpec
_ = store.cacheIndex.set(unusedOffset, 2L)
// populate the cache
_ <- store.lookupActiveContract(Set(bob), cId_5)
assertion <- recoverToSucceededIf[ContractNotFound](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
)
} yield assertion
letLookup <- store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
} yield letLookup shouldBe Left(Set(cId_1))
}

"fail if one of the fetched contract ids doesn't have an associated active contract" in {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ = store.cacheIndex.set(unusedOffset, 2L)
assertion <- recoverToSucceededIf[IllegalArgumentException](
store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
)
} yield assertion
}

"fail if the requested contract id set is empty" in {
for {
store <- contractStore(cachesSize = 0L).asFuture
_ <- recoverToSucceededIf[EmptyContractIds](store.lookupMaximumLedgerTime(Set.empty))
} yield succeed
letLookup <- store.lookupMaximumLedgerTime(Set(cId_1, cId_5))
} yield letLookup shouldBe Left(Set(cId_1, cId_5))
}
}

Expand Down

0 comments on commit 6c81d8b

Please sign in to comment.