Skip to content

Commit

Permalink
Extract command deduplication steps from the TransactionCommitter
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN

CHANGELOG_END
  • Loading branch information
nicu-da committed Nov 4, 2021
1 parent efaf20b commit 87c2d35
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 289 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.committer.transaction

import java.time.Instant

import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.ledger.participant.state.kvutils.Conversions.{
buildDuration,
commandDedupKey,
parseDuration,
parseTimestamp,
}
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.ledger.participant.state.kvutils.store.{DamlCommandDedupValue, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlTransactionRejectionEntry,
Duplicate,
}
import com.daml.logging.LoggingContext

private[transaction] object CommandDeduplication {

def overwriteDeduplicationPeriodWithMaxDurationStep(
defaultConfig: Configuration
): Step = new Step {
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
val submission = input.submission.toBuilder
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
buildDuration(currentConfig.maxDeduplicationTime)
)
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
}
}

/** Reject duplicate commands
*/
def deduplicateCommandStep(rejections: Rejections): Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime
.map { recordTime =>
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val dedupEntry = commitContext.get(dedupKey)
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
StepContinue(transactionEntry)
} else {
rejections.reject(
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
.setDefiniteAnswer(false)
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
"the command is a duplicate",
commitContext.recordTime,
)
}
}
.getOrElse(StepContinue(transactionEntry))
}
}

def setDeduplicationEntryStep(defaultConfig: Configuration): Step =
new Step {
def apply(commitContext: CommitContext, transactionEntry: DamlTransactionEntrySummary)(
implicit loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
// Deduplication duration must be explicitly overwritten in a previous step
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
throw Err.InvalidSubmission("Deduplication duration is not set.")
}
val deduplicateUntil = Conversions.buildTimestamp(
transactionEntry.submissionTime
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
.add(config.timeModel.minSkew)
)
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
.setDeduplicatedUntil(deduplicateUntil)
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
// Set a deduplication entry.
commitContext.set(
commandDedupKey(transactionEntry.submitterInfo),
DamlStateValue.newBuilder
.setCommandDedup(
commandDedupBuilder.build
)
.build,
)
StepContinue(transactionEntry)
}
}
// Checks that the submission time of the command is after the
// deduplicationTime represented by stateValue
private def isAfterDeduplicationTime(
submissionTime: Instant,
stateValue: DamlStateValue,
): Boolean = {
val cmdDedup = stateValue.getCommandDedup
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
dedupTime.isBefore(submissionTime)
} else {
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

package com.daml.ledger.participant.state.kvutils.committer.transaction

import java.time.Instant

import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.committer.Committer._
Expand All @@ -17,12 +15,8 @@ import com.daml.ledger.participant.state.kvutils.committer.transaction.validatio
ModelConformanceValidator,
TransactionConsistencyValidator,
}
import com.daml.ledger.participant.state.kvutils.store.events.{
DamlTransactionRejectionEntry,
Duplicate,
}
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{
DamlCommandDedupValue,
DamlContractKeyState,
DamlContractState,
DamlLogEntry,
Expand Down Expand Up @@ -75,74 +69,19 @@ private[kvutils] class TransactionCommitter(
override protected val steps: Steps[DamlTransactionEntrySummary] = Iterable(
"authorize_submitter" -> authorizeSubmitters,
"check_informee_parties_allocation" -> checkInformeePartiesAllocation,
"overwrite_deduplication_period" -> overwriteDeduplicationPeriodWithMaxDuration,
"deduplicate" -> deduplicateCommand,
"overwrite_deduplication_period" -> CommandDeduplication
.overwriteDeduplicationPeriodWithMaxDurationStep(defaultConfig),
"deduplicate" -> CommandDeduplication.deduplicateCommandStep(rejections),
"set_time_bounds" -> TimeBoundBindingStep.setTimeBoundsInContextStep(defaultConfig),
"validate_ledger_time" -> ledgerTimeValidator.createValidationStep(rejections),
"validate_model_conformance" -> modelConformanceValidator.createValidationStep(rejections),
"validate_consistency" -> TransactionConsistencyValidator.createValidationStep(rejections),
"set_deduplication_entry" -> CommandDeduplication.setDeduplicationEntryStep(defaultConfig),
"blind" -> blind,
"trim_unnecessary_nodes" -> trimUnnecessaryNodes,
"build_final_log_entry" -> buildFinalLogEntry,
)

private[transaction] def overwriteDeduplicationPeriodWithMaxDuration: Step = new Step {
override def apply(context: CommitContext, input: DamlTransactionEntrySummary)(implicit
loggingContext: LoggingContext
): StepResult[DamlTransactionEntrySummary] = {
val (_, currentConfig) = getCurrentConfiguration(defaultConfig, context)
val submission = input.submission.toBuilder
submission.getSubmitterInfoBuilder.setDeduplicationDuration(
buildDuration(currentConfig.maxDeduplicationTime)
)
StepContinue(input.copyPreservingDecodedTransaction(submission.build()))
}
}

/** Reject duplicate commands
*/
private[transaction] def deduplicateCommand: Step = new Step {
def apply(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
commitContext.recordTime
.map { recordTime =>
val dedupKey = commandDedupKey(transactionEntry.submitterInfo)
val dedupEntry = commitContext.get(dedupKey)
if (dedupEntry.forall(isAfterDeduplicationTime(recordTime.toInstant, _))) {
StepContinue(transactionEntry)
} else {
rejections.reject(
DamlTransactionRejectionEntry.newBuilder
.setSubmitterInfo(transactionEntry.submitterInfo)
// No duplicate rejection is a definite answer as the deduplication entry will eventually expire.
.setDefiniteAnswer(false)
.setDuplicateCommand(Duplicate.newBuilder.setDetails("")),
"the command is a duplicate",
commitContext.recordTime,
)
}
}
.getOrElse(StepContinue(transactionEntry))
}
}

// Checks that the submission time of the command is after the
// deduplicationTime represented by stateValue
private def isAfterDeduplicationTime(
submissionTime: Instant,
stateValue: DamlStateValue,
): Boolean = {
val cmdDedup = stateValue.getCommandDedup
if (stateValue.hasCommandDedup && cmdDedup.hasDeduplicatedUntil) {
val dedupTime = parseTimestamp(cmdDedup.getDeduplicatedUntil).toInstant
dedupTime.isBefore(submissionTime)
} else {
false
}
}

/** Authorize the submission by looking up the party allocation and verifying
* that all of the submitting parties are indeed hosted by the submitting participant.
*/
Expand Down Expand Up @@ -197,8 +136,6 @@ private[kvutils] class TransactionCommitter(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): StepResult[DamlTransactionEntrySummary] = {
setDedupEntry(commitContext, transactionEntry)

val blindingInfo = Blinding.blind(transactionEntry.transaction)

val divulgedContracts =
Expand Down Expand Up @@ -319,35 +256,6 @@ private[kvutils] class TransactionCommitter(
}
}

private[transaction] def setDedupEntry(
commitContext: CommitContext,
transactionEntry: DamlTransactionEntrySummary,
)(implicit loggingContext: LoggingContext): Unit = {
val (_, config) = getCurrentConfiguration(defaultConfig, commitContext)
// Deduplication duration must be explicitly overwritten in a previous step
// (see [[TransactionCommitter.overwriteDeduplicationPeriodWithMaxDuration]]) and set to ``config.maxDeduplicationTime``.
if (!transactionEntry.submitterInfo.hasDeduplicationDuration) {
throw Err.InvalidSubmission("Deduplication duration is not set.")
}
val deduplicateUntil = Conversions.buildTimestamp(
transactionEntry.submissionTime
.add(parseDuration(transactionEntry.submitterInfo.getDeduplicationDuration))
.add(config.timeModel.minSkew)
)
val commandDedupBuilder = DamlCommandDedupValue.newBuilder
.setDeduplicatedUntil(deduplicateUntil)
.setSubmissionTime(Conversions.buildTimestamp(transactionEntry.submissionTime))
// Set a deduplication entry.
commitContext.set(
commandDedupKey(transactionEntry.submitterInfo),
DamlStateValue.newBuilder
.setCommandDedup(
commandDedupBuilder.build
)
.build,
)
}

private def updateContractStateAndFetchDivulgedContracts(
transactionEntry: DamlTransactionEntrySummary,
blindingInfo: BlindingInfo,
Expand Down
Loading

0 comments on commit 87c2d35

Please sign in to comment.