Skip to content

Commit

Permalink
kvutils - Deprecate deduplicate_until [kvl-1174] (#11765)
Browse files Browse the repository at this point in the history
[kvutils] - deduplicate_until is no longer used by backwards looking command deduplication, therefore we deprecate the field in the proto files (keep it there for backwards compatibility) and remove references to it from the committer context

changelog_begin
changelog_end
  • Loading branch information
nicu-da authored Nov 18, 2021
1 parent f5a6302 commit 4ebdaf5
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ message DamlLogEntryId {
message DamlOutOfTimeBoundsEntry {
// We don't expect entry.recordTime to be present.
DamlLogEntry entry = 1;
google.protobuf.Timestamp duplicate_until = 2;
// Read-only for backwards compatibility, will not be set for new entries
google.protobuf.Timestamp duplicate_until = 2 [deprecated = true];
google.protobuf.Timestamp too_early_until = 3;
google.protobuf.Timestamp too_late_from = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ message DamlCommandDedupValue {
oneof time {
// The time until when future commands with the same
// deduplication key will be rejected due to a duplicate submission.
google.protobuf.Timestamp deduplicated_until = 2;
// Read-only for backwards compatibility
google.protobuf.Timestamp deduplicated_until = 2 [deprecated = true];
google.protobuf.Timestamp record_time = 3;
PreExecutionDeduplicationBounds record_time_bounds = 4;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import com.daml.lf.data.Ref.LedgerString
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.CommittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}

import com.google.common.io.BaseEncoding
import com.google.protobuf.ByteString

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
Expand Down Expand Up @@ -333,12 +333,16 @@ object KeyValueConsumption {
List.empty
}

@nowarn("msg=deprecated")
private[kvutils] case class TimeBounds(
tooEarlyUntil: Option[Timestamp] = None,
tooLateFrom: Option[Timestamp] = None,
@deprecated("Read-only for backwards compatibility.")
deduplicateUntil: Option[Timestamp] = None,
)

// Ignore deprecation warnings caused by deduplicate until, which is read-only for backwards compatibility
@nowarn("msg=deprecated")
private[kvutils] def outOfTimeBoundsEntryToUpdate(
recordTime: Timestamp,
outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry,
Expand Down Expand Up @@ -426,6 +430,7 @@ object KeyValueConsumption {
Some(correlationId(rejectionEntry.getSubmitterInfo)),
)

@nowarn("msg=deprecated")
private def parseTimeBounds(outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry): TimeBounds = {
val duplicateUntilMaybe = parseOptionalTimestamp(
outOfTimeBoundsEntry.hasDuplicateUntil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ private[kvutils] case class CommitContext(

var minimumRecordTime: Option[Timestamp] = None
var maximumRecordTime: Option[Timestamp] = None
var deduplicateUntil: Option[Timestamp] = None

// Rejection log entry used for generating an out-of-time-bounds log entry in case of
// pre-execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ private[committer] trait Committer[PartialResult] extends SubmissionExecutor {
commitContext.maximumRecordTime.foreach { timestamp =>
builder.setTooLateFrom(buildTimestamp(timestamp))
}
commitContext.deduplicateUntil.foreach { timestamp =>
builder.setDuplicateUntil(buildTimestamp(timestamp))
}
DamlLogEntry.newBuilder
.setOutOfTimeBoundsEntry(builder)
.build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import com.daml.ledger.participant.state.kvutils.store.{
import com.daml.ledger.participant.state.kvutils.{Conversions, Err}
import com.daml.logging.LoggingContext

import scala.annotation.nowarn

private[transaction] object CommandDeduplication {

/** Reject duplicate commands
*/
@nowarn("msg=deprecated")
def deduplicateCommandStep(rejections: Rejections): Step =
new Step {
def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.daml.ledger.participant.state.kvutils.committer.transaction

import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.Conversions.{commandDedupKey, parseTimestamp}
import com.daml.ledger.participant.state.kvutils.committer.Committer.getCurrentConfiguration
import com.daml.ledger.participant.state.kvutils.committer.{CommitContext, StepContinue, StepResult}
import com.daml.lf.data.Time.Timestamp
Expand All @@ -20,20 +19,16 @@ object TimeBoundBindingStep {
val timeModel = config.timeModel

if (commitContext.preExecute) {
val maybeDeduplicateUntil =
getLedgerDeduplicateUntil(transactionEntry, commitContext)
val minimumRecordTime = transactionMinRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
maybeDeduplicateUntil,
timeModel,
)
val maximumRecordTime = transactionMaxRecordTime(
transactionEntry.submissionTime,
transactionEntry.ledgerEffectiveTime,
timeModel,
)
commitContext.deduplicateUntil = maybeDeduplicateUntil
commitContext.minimumRecordTime = Some(minimumRecordTime)
commitContext.maximumRecordTime = Some(maximumRecordTime)
}
Expand All @@ -45,14 +40,9 @@ object TimeBoundBindingStep {
private def transactionMinRecordTime(
submissionTime: Timestamp,
ledgerTime: Timestamp,
maybeDeduplicateUntil: Option[Timestamp],
timeModel: LedgerTimeModel,
): Timestamp =
List(
maybeDeduplicateUntil
.map(
_.add(Timestamp.Resolution)
), // DeduplicateUntil defines a rejection window, endpoints inclusive
Some(timeModel.minRecordTime(ledgerTime)),
Some(timeModel.minRecordTime(submissionTime)),
).flatten.max
Expand All @@ -64,14 +54,4 @@ object TimeBoundBindingStep {
): Timestamp =
List(timeModel.maxRecordTime(ledgerTime), timeModel.maxRecordTime(submissionTime)).min

private def getLedgerDeduplicateUntil(
transactionEntry: DamlTransactionEntrySummary,
commitContext: CommitContext,
): Option[Timestamp] =
for {
dedupEntry <- commitContext.get(commandDedupKey(transactionEntry.submitterInfo))
dedupTimestamp <- PartialFunction.condOpt(dedupEntry.getCommandDedup.hasDeduplicatedUntil) {
case true => dedupEntry.getCommandDedup.getDeduplicatedUntil
}
} yield parseTimestamp(dedupTimestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.prop.Tables.Table
import org.scalatest.prop.{TableFor1, TableFor4, TableFor5}
import org.scalatest.wordspec.AnyWordSpec

import java.time.Instant

import scala.annotation.nowarn

class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
private val aLogEntryIdString = "test"
private val aLogEntryId =
Expand Down Expand Up @@ -491,6 +492,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
)
}

@nowarn("msg=deprecated")
private def buildOutOfTimeBoundsEntry(
timeBounds: TimeBounds,
logEntryType: DamlLogEntry.PayloadCase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.wordspec.AnyWordSpec

import scala.annotation.nowarn

@nowarn("msg=deprecated")
class CommitterSpec
extends AnyWordSpec
with TableDrivenPropertyChecks
Expand All @@ -47,7 +50,6 @@ class CommitterSpec
val expectedMaxRecordTime = Timestamp.assertFromLong(200 * 1000 * 1000)
when(mockContext.minimumRecordTime).thenReturn(Some(expectedMinRecordTime))
when(mockContext.maximumRecordTime).thenReturn(Some(expectedMaxRecordTime))
when(mockContext.deduplicateUntil).thenReturn(None)
when(mockContext.outOfTimeBoundsLogEntry).thenReturn(Some(aRejectionLogEntry))
val expectedReadSet = Set(
DamlStateKey.newBuilder.setContractId("1").build,
Expand All @@ -71,7 +73,6 @@ class CommitterSpec
val mockContext = mock[CommitContext]
when(mockContext.minimumRecordTime).thenReturn(None)
when(mockContext.maximumRecordTime).thenReturn(None)
when(mockContext.deduplicateUntil).thenReturn(None)
when(mockContext.outOfTimeBoundsLogEntry).thenReturn(Some(aRejectionLogEntry))
when(mockContext.getOutputs).thenReturn(Map.empty)
when(mockContext.getAccessedInputKeys).thenReturn(Set.empty[DamlStateKey])
Expand All @@ -90,20 +91,16 @@ class CommitterSpec
when(mockContext.getAccessedInputKeys).thenReturn(Set.empty[DamlStateKey])
val expectedMinRecordTime = Timestamp.assertFromLong(100 * 1000 * 1000)
val expectedMaxRecordTime = Timestamp.assertFromLong(200 * 1000 * 1000)
val expectedDuplicateUntil = Timestamp.assertFromLong(99 * 1000 * 1000)
when(mockContext.minimumRecordTime).thenReturn(Some(expectedMinRecordTime))
when(mockContext.maximumRecordTime).thenReturn(Some(expectedMaxRecordTime))
when(mockContext.deduplicateUntil).thenReturn(Some(expectedDuplicateUntil))
val instance = createCommitter()
val actual = instance.preExecute(aDamlSubmission, mockContext)

actual.outOfTimeBoundsLogEntry.hasOutOfTimeBoundsEntry shouldBe true
val actualOutOfTimeBoundsLogEntry = actual.outOfTimeBoundsLogEntry.getOutOfTimeBoundsEntry
actualOutOfTimeBoundsLogEntry.getTooEarlyUntil shouldBe buildTimestamp(expectedMinRecordTime)
actualOutOfTimeBoundsLogEntry.getTooLateFrom shouldBe buildTimestamp(expectedMaxRecordTime)
actualOutOfTimeBoundsLogEntry.getDuplicateUntil shouldBe buildTimestamp(
expectedDuplicateUntil
)
actualOutOfTimeBoundsLogEntry.hasDuplicateUntil shouldBe false
actualOutOfTimeBoundsLogEntry.hasEntry shouldBe true
actualOutOfTimeBoundsLogEntry.getEntry shouldBe aRejectionLogEntry
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ import com.daml.ledger.participant.state.kvutils.store.events.{
DamlConfigurationEntry,
DamlTransactionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{
DamlCommandDedupValue,
DamlStateKey,
DamlStateValue,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Conversions, TestHelpers}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext
Expand All @@ -46,26 +42,15 @@ class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
private val aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes =
DamlTransactionEntrySummary(aDamlTransactionEntryWithSubmissionAndLedgerEffectiveTimes)

private val aDedupKey: DamlStateKey = Conversions
.commandDedupKey(aDamlTransactionEntry.getSubmitterInfo)
private val aDeduplicateUntil = createProtobufTimestamp(seconds = 3)

private val aDedupValue = DamlStateValue.newBuilder
.setCommandDedup(
DamlCommandDedupValue.newBuilder.setDeduplicatedUntil(toJavaProto(aDeduplicateUntil))
)
.build()

private val emptyConfigurationStateValue: DamlStateValue =
defaultConfigurationStateValueBuilder().build

private val inputWithTimeModelAndCommandDeduplication: Map[DamlStateKey, Some[DamlStateValue]] =
private val inputWithConfiguration: Map[DamlStateKey, Some[DamlStateValue]] =
Map(
Conversions.configurationStateKey -> Some(emptyConfigurationStateValue),
aDedupKey -> Some(aDedupValue),
Conversions.configurationStateKey -> Some(emptyConfigurationStateValue)
)
private val inputWithTimeModelAndEmptyCommandDeduplication =
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue), aDedupKey -> None)
Map(Conversions.configurationStateKey -> Some(emptyConfigurationStateValue))

"time bounds binding step" should {
"continue" in {
Expand All @@ -80,7 +65,7 @@ class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
}
}

"compute and correctly set the min/max ledger time without deduplicateUntil" in {
"compute and correctly set the min/max ledger time" in {
val context = contextWithTimeModelAndEmptyCommandDeduplication()

step.apply(
Expand All @@ -96,36 +81,21 @@ class TimeBoundsBindingStepSpec extends AnyWordSpec with Matchers {
protoTimestampToLf(aSubmissionTime)
.add(theDefaultConfig.timeModel.maxSkew)
)
context.deduplicateUntil shouldBe empty
}

"set deduplicate until when available" in {
val context = contextWithTimeModelAndCommandDeduplication()
step.apply(
context,
aDamlTransactionEntrySummaryWithSubmissionAndLedgerEffectiveTimes,
)
context.deduplicateUntil shouldEqual Some(
protoTimestampToLf(aDeduplicateUntil)
)
}

"mark config and dedup key as accessed in context" in {
"mark config as accessed in context" in {
val commitContext =
createCommitContext(recordTime = None, inputWithTimeModelAndCommandDeduplication)
createCommitContext(recordTime = None, inputWithConfiguration)

step.apply(commitContext, aTransactionEntrySummary)

commitContext.getAccessedInputKeys should contain allOf (configurationStateKey, aDedupKey)
commitContext.getAccessedInputKeys should contain(configurationStateKey)
}
}

private def contextWithTimeModelAndEmptyCommandDeduplication() =
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndEmptyCommandDeduplication)

private def contextWithTimeModelAndCommandDeduplication() =
createCommitContext(recordTime = None, inputs = inputWithTimeModelAndCommandDeduplication)

private def createProtobufTimestamp(seconds: Long): timestamp.Timestamp = {
timestamp.Timestamp(seconds)
}
Expand Down

0 comments on commit 4ebdaf5

Please sign in to comment.