Skip to content

Commit

Permalink
Enirched SequenceSpec test
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-da committed Jan 26, 2022
1 parent 2c87b7c commit bec2d72
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import akka.NotUsed
import akka.stream.scaladsl.Sink
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2._
import com.daml.ledger.sandbox.bridge.{BridgeMetrics, LedgerBridge}
import com.daml.ledger.sandbox.domain.Submission
import com.daml.ledger.sandbox.domain.{Rejection, Submission}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
Expand All @@ -21,6 +23,7 @@ import com.daml.telemetry.TelemetryContext
import com.google.rpc.code.Code
import com.google.rpc.status.Status

import java.time.Duration
import java.util.concurrent.{CompletableFuture, CompletionStage}

class BridgeWriteService(
Expand All @@ -37,6 +40,11 @@ class BridgeWriteService(

override def isApiDeduplicationEnabled: Boolean = false

override def close(): Unit = {
logger.info("Shutting down BridgeWriteService.")
queue.complete()
}

override def submitTransaction(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
Expand All @@ -45,15 +53,33 @@ class BridgeWriteService(
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
)
)
): CompletionStage[SubmissionResult] = {
implicit val errorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, submitterInfo.submissionId)
submitterInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(deduplicationDuration) =>
validateDeduplicationDurationAndSubmit(
submitterInfo,
transactionMeta,
transaction,
estimatedInterpretationCost,
deduplicationDuration,
)
case DeduplicationPeriod.DeduplicationOffset(_) =>
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.LedgerBridgeInternalError(
new RuntimeException(
"Deduplication offset periods are not supported in Sandbox-on-X ledger bridge"
),
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
}
}

override def submitConfiguration(
maxRecordTime: Time.Timestamp,
Expand Down Expand Up @@ -136,9 +162,35 @@ class BridgeWriteService(
private def submit(submission: Submission): CompletionStage[SubmissionResult] =
toSubmissionResult(queue.offer(submission))

override def close(): Unit = {
logger.info("Shutting down BridgeLedgerFactory.")
queue.complete()
private def validateDeduplicationDurationAndSubmit(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
deduplicationDuration: Duration,
)(implicit errorLogger: ContextualizedErrorLogger): CompletionStage[SubmissionResult] = {
val maxDeduplicationDuration = submitterInfo.ledgerConfiguration.maxDeduplicationTime
if (deduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.MaxDeduplicationDurationExceeded(
deduplicationDuration,
maxDeduplicationDuration,
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
else
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ object SandboxOnXRunner {
)
),
deduplicationType = CommandDeduplicationType.ASYNC_ONLY,
maxDeduplicationDurationEnforced = false,
maxDeduplicationDurationEnforced = true,
),
contractIdFeatures = ExperimentalContractIds.of(
v1 = ExperimentalContractIds.ContractIdV1Support.NON_SUFFIXED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object LedgerBridge {
maxDeduplicationDuration =
// TODO SoX: Enforce cap on this config
config.maxDeduplicationDuration
.getOrElse(Duration.ofMinutes(1L)),
.getOrElse(Duration.ofDays(1L)),
)

private[bridge] def packageUploadSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,35 @@ import scala.collection.immutable.VectorMap

case class DeduplicationState private (
private[validate] val deduplicationQueue: DeduplicationQueue,
deduplicationDuration: Duration,
currentTime: () => Time.Timestamp,
private val maxDeduplicationDuration: Duration,
private val currentTime: () => Time.Timestamp,
) {

def newTransactionAccepted(changeId: ChangeId): DeduplicationState = {
val now = currentTime()
val expiredTimestamp = expiredThreshold(deduplicationDuration, now)

val updatedQueue =
deduplicationQueue
.updated(changeId, now)
.dropWhile(_._2 <= expiredTimestamp)

DeduplicationState(
deduplicationQueue = updatedQueue,
deduplicationDuration = deduplicationDuration,
currentTime = currentTime,
)
def deduplicate(
changeId: ChangeId,
commandDeduplicationDuration: Duration,
): (DeduplicationState, Boolean) = {
if (commandDeduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
throw new RuntimeException(
s"Cannot deduplicate for a period ($commandDeduplicationDuration) longer than the max deduplication duration ($maxDeduplicationDuration)."
)
else {
val now = currentTime()
val expiredTimestamp = expiredThreshold(maxDeduplicationDuration, now)

val queueAfterEvictions = deduplicationQueue.dropWhile(_._2 <= expiredTimestamp)

val isDuplicateChangeId = queueAfterEvictions
.get(changeId)
.exists(_ > expiredThreshold(commandDeduplicationDuration, now))

if (isDuplicateChangeId)
copy(deduplicationQueue = queueAfterEvictions) -> true
else
copy(deduplicationQueue = queueAfterEvictions.updated(changeId, now)) -> false
}
}

def isDuplicate(changeId: ChangeId, commandDeduplicationDuration: Duration): Boolean =
deduplicationQueue
.get(changeId)
.exists(_ > expiredThreshold(commandDeduplicationDuration, currentTime()))

private def expiredThreshold(
deduplicationDuration: Duration,
now: Time.Timestamp,
Expand All @@ -53,7 +57,7 @@ object DeduplicationState {
): DeduplicationState =
DeduplicationState(
deduplicationQueue = VectorMap.empty,
deduplicationDuration = deduplicationDuration,
maxDeduplicationDuration = deduplicationDuration,
currentTime = currentTime,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.daml.ledger.sandbox.bridge.validate.SequencerState.LastUpdatedAt
import com.daml.ledger.sandbox.domain.Rejection._
import com.daml.ledger.sandbox.domain.Submission.{AllocateParty, Config, Transaction}
import com.daml.ledger.sandbox.domain._
import com.daml.lf.data.Ref
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.data.Ref.SubmissionId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.{Transaction => LfTransaction}
Expand All @@ -45,6 +45,7 @@ private[validate] class SequenceImpl(
bridgeMetrics: BridgeMetrics,
errorFactories: ErrorFactories,
maxDeduplicationDuration: Duration,
wallClockTime: () => Time.Timestamp = () => Timestamp.now(),
) extends Sequence {
private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)

Expand All @@ -53,7 +54,7 @@ private[validate] class SequenceImpl(
@volatile private[validate] var allocatedParties = initialAllocatedParties
@volatile private[validate] var ledgerConfiguration = initialLedgerConfiguration
@volatile private[validate] var deduplicationState =
DeduplicationState.empty(maxDeduplicationDuration, () => timeProvider.getCurrentTimestamp)
DeduplicationState.empty(maxDeduplicationDuration, wallClockTime)

override def apply(): Validation[(Offset, PreparedSubmission)] => Iterable[(Offset, Update)] =
in => {
Expand Down Expand Up @@ -155,15 +156,6 @@ private[validate] class SequenceImpl(

withErrorLogger(submitterInfo.submissionId) { implicit errorLogger =>
for {
_ <- deduplicate(
changeId = ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
),
deduplicationPeriod = txSubmission.submission.submitterInfo.deduplicationPeriod,
completionInfo = completionInfo,
)
_ <- checkTimeModel(txSubmission.submission, recordTime, ledgerConfiguration)
_ <- validateParties(
allocatedParties,
Expand All @@ -177,6 +169,15 @@ private[validate] class SequenceImpl(
inputContracts = txSubmission.inputContracts,
completionInfo = completionInfo,
)
_ <- deduplicate(
changeId = ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
),
deduplicationPeriod = txSubmission.submission.submitterInfo.deduplicationPeriod,
completionInfo = completionInfo,
)
} yield ()
}(txSubmission.submission.loggingContext, logger)
.fold(
Expand All @@ -191,14 +192,6 @@ private[validate] class SequenceImpl(
txSubmission.consumedContracts,
)

deduplicationState = deduplicationState.newTransactionAccepted(
ChangeId(
submitterInfo.applicationId,
submitterInfo.commandId,
submitterInfo.actAs.toSet,
)
)

transactionAccepted(
txSubmission.submission,
offsetIdx,
Expand Down Expand Up @@ -253,11 +246,25 @@ private[validate] class SequenceImpl(
): Validation[Unit] =
deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(commandDeduplicationDuration) =>
val isDuplicate = deduplicationState.isDuplicate(changeId, commandDeduplicationDuration)
Either.cond(!isDuplicate, (), DuplicateCommand(changeId, completionInfo))
val (newDeduplicationState, isDuplicate) =
deduplicationState.deduplicate(changeId, commandDeduplicationDuration)

deduplicationState = newDeduplicationState
Either.cond(
!isDuplicate,
(),
DuplicateCommand(changeId, completionInfo),
)
case _: DeduplicationPeriod.DeduplicationOffset =>
// TODO SoX: Handle not supported offset deduplication
throw new RuntimeException("Not supported")
Left(
Rejection
.LedgerBridgeInternalError(
new RuntimeException(
"Deduplication offset periods are not supported in Sandbox-on-X ledger bridge"
),
completionInfo,
)
)
}

private def validateParties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import com.google.protobuf.any.Any
import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status

import java.time.Duration

private[sandbox] sealed trait Rejection extends Product with Serializable {
def toStatus: Status
def completionInfo: CompletionInfo
Expand Down Expand Up @@ -59,7 +61,7 @@ private[sandbox] object Rejection {
) extends Rejection {
override def toStatus: Status = LedgerApiErrors.InternalError
.UnexpectedOrUnknownException(_err)
.rpcStatus(None)
.rpcStatus(completionInfo.submissionId)
}

final case class TransactionInternallyInconsistentKey(
Expand Down Expand Up @@ -130,6 +132,22 @@ private[sandbox] object Rejection {
.rpcStatus(completionInfo.submissionId)
}

final case class MaxDeduplicationDurationExceeded(
duration: Duration,
maxDeduplicationDuration: Duration,
completionInfo: CompletionInfo,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
) extends Rejection {
override def toStatus: Status =
LedgerApiErrors.RequestValidation.InvalidDeduplicationPeriodField
.Reject(
s"The given deduplication duration of $duration exceeds the maximum deduplication time of $maxDeduplicationDuration",
Some(maxDeduplicationDuration),
)
.rpcStatus(None)
}

final case class NoLedgerConfiguration(
completionInfo: CompletionInfo,
errorFactories: ErrorFactories,
Expand Down
Loading

0 comments on commit bec2d72

Please sign in to comment.