Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart the submission interpretation in case of a race [DPP-737] #11552

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In

override def lookupMaximumLedgerTime(
ids: Set[Value.ContractId]
)(implicit loggingContext: LoggingContext): Future[Option[Timestamp]] =
)(implicit
loggingContext: LoggingContext
): Future[Either[Set[Value.ContractId], Option[Timestamp]]] =
delegate.lookupMaximumLedgerTime(ids)

override def getLedgerId()(implicit loggingContext: LoggingContext): Future[LedgerId] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met

override def lookupMaximumLedgerTime(
ids: Set[Value.ContractId]
)(implicit loggingContext: LoggingContext): Future[Option[Timestamp]] =
)(implicit
loggingContext: LoggingContext
): Future[Either[Set[Value.ContractId], Option[Timestamp]]] =
Timed.future(
metrics.daml.services.index.lookupMaximumLedgerTime,
delegate.lookupMaximumLedgerTime(ids),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,19 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Either[ErrorCause, CommandExecutionResult]] =
): Future[Either[ErrorCause, CommandExecutionResult]] = {
def retryOrError(newCommands: Commands) = {
if (retriesLeft > 0) {
mziolekda marked this conversation as resolved.
Show resolved Hide resolved
metrics.daml.execution.retry.mark()
logger.info(s"Restarting the computation")
loop(newCommands, submissionSeed, ledgerConfiguration, retriesLeft - 1)
} else {
logger.info(
s"Did not find a suitable ledger time after ${maxRetries - retriesLeft} retries. Giving up."
)
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
}
}
delegate
.execute(commands, submissionSeed, ledgerConfiguration)
.flatMap {
Expand All @@ -66,39 +78,42 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor(
else
contractStore
.lookupMaximumLedgerTime(usedContractIds)
.flatMap { maxUsedTime =>
if (maxUsedTime.forall(_ <= commands.commands.ledgerEffectiveTime)) {
Future.successful(Right(cer))
} else if (!cer.dependsOnLedgerTime) {
logger.debug(
s"Advancing ledger effective time for the output from ${commands.commands.ledgerEffectiveTime} to $maxUsedTime"
)
Future.successful(Right(advanceOutputTime(cer, maxUsedTime)))
} else if (retriesLeft > 0) {
metrics.daml.execution.retry.mark()
logger.debug(
s"Restarting the computation with new ledger effective time $maxUsedTime"
.flatMap {
case Left(missingContracts) =>
logger.info(
s"Did not find the ledger time for some contracts. This can happen if there is contention on contracts used by the transaction. Missing contracts: ${missingContracts
.mkString(", ")}."
)
val advancedCommands = advanceInputTime(commands, maxUsedTime)
loop(advancedCommands, submissionSeed, ledgerConfiguration, retriesLeft - 1)
} else {
Future.successful(Left(ErrorCause.LedgerTime(maxRetries)))
}
retryOrError(commands)
tudor-da marked this conversation as resolved.
Show resolved Hide resolved

case Right(maxUsedTime) =>
if (maxUsedTime.forall(_ <= commands.commands.ledgerEffectiveTime)) {
Future.successful(Right(cer))
} else if (!cer.dependsOnLedgerTime) {
logger.info(
s"Advancing ledger effective time for the output from ${commands.commands.ledgerEffectiveTime} to $maxUsedTime"
)
Future.successful(Right(advanceOutputTime(cer, maxUsedTime)))
} else {
logger.info(
s"Ledger time was used in the Daml code, but a different ledger effective time $maxUsedTime was determined afterwards."
)
val advancedCommands = advanceInputTime(commands, maxUsedTime)
retryOrError(advancedCommands)
}
}
.recover {
// An error while looking up the maximum ledger time for the used contracts
// most likely means that one of the contracts is already not active anymore,
// which can happen under contention.
// A retry would only be successful in case the archived contracts were referenced by key.
// Direct references to archived contracts will result in the same error.
// An error while looking up the maximum ledger time for the used contracts. The nature of this error is not known.
// Not retrying automatically. All other automatically retryable cases are covered by the logic above.
case error =>
logger.info(
s"Lookup of maximum ledger time failed. This can happen if there is contention on contracts used by the transaction. Used contracts: ${usedContractIds
s"Lookup of maximum ledger time failed after ${maxRetries - retriesLeft}. Used contracts: ${usedContractIds
.mkString(", ")}. Details: $error"
)
Left(ErrorCause.LedgerTime(maxRetries - retriesLeft))
}
}
}

// Does nothing if `newTime` is empty. This happens if the transaction only regarded divulged contracts.
private[this] def advanceOutputTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private[platform] final class LedgerBackedIndexService(

override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[Option[Timestamp]] =
): Future[Either[Set[ContractId], Option[Timestamp]]] =
ledger.lookupMaximumLedgerTime(ids)

override def lookupContractKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M

override def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[Option[Timestamp]] =
)(implicit loggingContext: LoggingContext): Future[Either[Set[ContractId], Option[Timestamp]]] =
Timed.future(
metrics.daml.index.lookupMaximumLedgerTime,
ledger.lookupMaximumLedgerTime(contractIds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private[platform] abstract class BaseLedger(

override def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[Option[Timestamp]] =
)(implicit loggingContext: LoggingContext): Future[Either[Set[ContractId], Option[Timestamp]]] =
contractStore.lookupMaximumLedgerTime(contractIds)

override def getParties(parties: Seq[Ref.Party])(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable

def lookupMaximumLedgerTime(
contractIds: Set[ContractId]
)(implicit loggingContext: LoggingContext): Future[Option[Timestamp]]
)(implicit loggingContext: LoggingContext): Future[Either[Set[ContractId], Option[Timestamp]]]

def lookupKey(key: GlobalKey, forParties: Set[Ref.Party])(implicit
loggingContext: LoggingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.platform.store.appendonlydao.events

import java.io.ByteArrayInputStream

import com.codahale.metrics.Timer
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
Expand All @@ -26,14 +27,13 @@ private[appendonlydao] sealed class ContractsReader(

override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[Option[Timestamp]] =
): Future[Either[Set[ContractId], Option[Timestamp]]] =
Timed.future(
metrics.daml.index.db.lookupMaximumLedgerTime,
dispatcher
.executeSql(metrics.daml.index.db.lookupMaximumLedgerTimeDbMetrics)(
storageBackend.maximumLedgerTime(ids)
)
.map(_.get),
),
)

/** Lookup a contract key state at a specific ledger offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.daml.scalautil.NeverEqualsOverride

import javax.sql.DataSource
import scala.annotation.unused
import scala.util.Try

/** Encapsulates the interface which hides database technology specific implementations.
* Naming convention for the interface methods, which requiring Connection:
Expand Down Expand Up @@ -202,7 +201,9 @@ trait CompletionStorageBackend {

trait ContractStorageBackend {
def contractKeyGlobally(key: Key)(connection: Connection): Option[ContractId]
def maximumLedgerTime(ids: Set[ContractId])(connection: Connection): Try[Option[Timestamp]]
def maximumLedgerTime(ids: Set[ContractId])(
connection: Connection
): Either[Set[ContractId], Option[Timestamp]]
def keyState(key: Key, validAt: Long)(connection: Connection): KeyState
def contractState(contractId: ContractId, before: Long)(
connection: Connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
KeyUnassigned,
}

import scala.util.{Failure, Success, Try}

class ContractStorageBackendTemplate(
queryStrategy: QueryStrategy,
ledgerEndCache: LedgerEndCache,
) extends ContractStorageBackend {

override def contractKeyGlobally(key: Key)(connection: Connection): Option[ContractId] =
contractKey(
resultColumns = List("contract_id"),
Expand All @@ -44,16 +41,6 @@ class ContractStorageBackendTemplate(
validAt = ledgerEndCache()._2,
)(connection)

private def emptyContractIds: Throwable =
new IllegalArgumentException(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)

private def notFound(missingContractIds: Set[ContractId]): Throwable =
new IllegalArgumentException(
s"The following contracts have not been found: ${missingContractIds.map(_.coid).mkString(", ")}"
)

protected def maximumLedgerTimeSqlLiteral(
id: ContractId,
lastEventSequentialId: Long,
Expand Down Expand Up @@ -102,31 +89,28 @@ class ContractStorageBackendTemplate(
// TODO append-only: consider pulling up traversal logic to upper layer
override def maximumLedgerTime(
ids: Set[ContractId]
)(connection: Connection): Try[Option[Timestamp]] = {
)(connection: Connection): Either[Set[ContractId], Option[Timestamp]] = {
val lastEventSequentialId = ledgerEndCache()._2
if (ids.isEmpty) {
Failure(emptyContractIds)
mziolekda marked this conversation as resolved.
Show resolved Hide resolved
} else {
def lookup(id: ContractId): Option[Option[Timestamp]] =
maximumLedgerTimeSqlLiteral(id, lastEventSequentialId).as(
timestampFromMicros("ledger_effective_time").?.singleOpt
)(
connection
)
def lookup(id: ContractId): Option[Option[Timestamp]] =
maximumLedgerTimeSqlLiteral(id, lastEventSequentialId).as(
timestampFromMicros("ledger_effective_time").?.singleOpt
)(
connection
)

val queriedIds: List[(ContractId, Option[Option[Timestamp]])] = ids.toList
.map(id => id -> lookup(id))
val foundLedgerEffectiveTimes: List[Option[Timestamp]] = queriedIds
.collect { case (_, Some(found)) =>
found
}
if (foundLedgerEffectiveTimes.size != ids.size) {
val missingIds = queriedIds.collect { case (missingId, None) =>
missingId
}
Left(missingIds.toSet)
} else Right(foundLedgerEffectiveTimes.max)

val queriedIds: List[(ContractId, Option[Option[Timestamp]])] = ids.toList
.map(id => id -> lookup(id))
val foundLedgerEffectiveTimes: List[Option[Timestamp]] = queriedIds
.collect { case (_, Some(found)) =>
found
}
if (foundLedgerEffectiveTimes.size != ids.size) {
val missingIds = queriedIds.collect { case (missingId, None) =>
missingId
}
Failure(notFound(missingIds.toSet))
} else Success(foundLedgerEffectiveTimes.max)
}
}

override def keyState(key: Key, validAt: Long)(connection: Connection): KeyState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.platform.store.cache

import java.util.concurrent.atomic.AtomicReference

import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch}
import akka.{Done, NotUsed}
Expand All @@ -26,11 +28,10 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
KeyState,
}

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

private[platform] class MutableCacheBackedContractStore(
metrics: Metrics,
Expand Down Expand Up @@ -72,36 +73,38 @@ private[platform] class MutableCacheBackedContractStore(

override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[Option[Timestamp]] =
if (ids.isEmpty)
Future.failed(EmptyContractIds())
else {
Future
.fromTry(partitionCached(ids))
.flatMap {
case (cached, toBeFetched) if toBeFetched.isEmpty =>
Future.successful(Some(cached.max))
case (cached, toBeFetched) =>
contractsReader
.lookupMaximumLedgerTime(toBeFetched)
.map(_.map(m => (cached + m).max))
}
}
): Future[Either[Set[ContractId], Option[Timestamp]]] =
Future
.successful(partitionCached(ids))
.flatMap {
case Right((cached, toBeFetched)) if toBeFetched.isEmpty =>
Future.successful(Right(Some(cached.max)))
case Right((cached, toBeFetched)) =>
contractsReader
.lookupMaximumLedgerTime(toBeFetched)
.map(_.map(_.map(m => (cached + m).max)))
case Left(missingContracts) => Future.successful(Left(missingContracts))
}

private def partitionCached(
ids: Set[ContractId]
)(implicit loggingContext: LoggingContext) = {
)(implicit
loggingContext: LoggingContext
): Either[Set[ContractId], (Set[Timestamp], Set[ContractId])] = {
val cacheQueried = ids.map(id => id -> contractsCache.get(id))

val cached = cacheQueried.view
.map(_._2)
.foldLeft(Try(Set.empty[Timestamp])) {
case (Success(timestamps), Some(active: Active)) =>
Success(timestamps + active.createLedgerEffectiveTime)
case (Success(_), Some(Archived(_))) => Failure(ContractNotFound(ids))
case (Success(_), Some(NotFound)) => Failure(ContractNotFound(ids))
case (Success(timestamps), None) => Success(timestamps)
case (failure, _) => failure
.foldLeft[Either[Set[ContractId], Set[Timestamp]]](Right(Set.empty[Timestamp])) {
// successful lookups
case (Right(timestamps), (_, Some(active: Active))) =>
Right(timestamps + active.createLedgerEffectiveTime)
case (Right(timestamps), (_, None)) => Right(timestamps)

// failure cases
case (acc, (cid, Some(Archived(_) | NotFound))) =>
val missingContracts = acc.left.getOrElse(Set.empty) + cid
Left(missingContracts)
case (acc @ Left(_), _) => acc
}

cached.map { cached =>
Expand Down Expand Up @@ -384,11 +387,6 @@ private[platform] object MutableCacheBackedContractStore {
s"One or more of the following contract identifiers has not been found: ${contractIds.map(_.coid).mkString(", ")}"
)

final case class EmptyContractIds()
extends IllegalArgumentException(
"Cannot lookup the maximum ledger time for an empty set of contract identifiers"
)

final case class ContractReadThroughNotFound(contractId: ContractId) extends NoStackTrace {
override def getMessage: String =
s"Contract not found for contract id ${contractId.coid}. Hint: this could be due racing with a concurrent archival."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TranslationCacheBackedContractStore(
*/
override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[Option[Timestamp]] =
): Future[Either[Set[ContractId], Option[Timestamp]]] =
contractsReader.lookupMaximumLedgerTime(ids)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ private[platform] trait LedgerDaoContractsReader {
*/
def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit
loggingContext: LoggingContext
): Future[Option[Timestamp]]
): Future[Either[Set[ContractId], Option[Timestamp]]]

/** Looks up an active or divulged contract if it is visible for the given party.
*
Expand Down
Loading