From cdde8df9fd32bd575689b83b4ada5ddf8a1d7891 Mon Sep 17 00:00:00 2001 From: Hubert Slojewski Date: Fri, 21 Jan 2022 13:44:24 +0100 Subject: [PATCH] ledger-api-test-tool: Add assertions and a test case for successful completions when converting deduplication durations to offsets [KVL-1220] (#12462) CHANGELOG_BEGIN CHANGELOG_END --- ledger/ledger-api-common/BUILD.bazel | 2 + .../participant/util}/HexOffset.scala | 2 +- .../participant/util}/HexOffsetSpec.scala | 2 +- .../BUILD.bazel | 3 +- .../infrastructure/FutureAssertions.scala | 48 +----- .../CommandDeduplicationAssertions.scala | 95 ++++++++--- .../participant/ParticipantTestContext.scala | 16 ++ .../infrastructure/time/DelayMechanism.scala | 31 ++++ .../suites/CommandDeduplicationIT.scala | 152 ++++++++++++------ ...ionBasedDeduplicationPeriodConverter.scala | 1 + 10 files changed, 237 insertions(+), 115 deletions(-) rename ledger/{participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication => ledger-api-common/src/main/scala/com/digitalasset/platform/participant/util}/HexOffset.scala (94%) rename ledger/{participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/deduplication => ledger-api-common/src/test/suite/scala/com/digitalasset/platform/participant/util}/HexOffsetSpec.scala (96%) create mode 100644 ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index f5b799299ae2..b87b18d0296d 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -103,11 +103,13 @@ da_scala_test_suite( "@maven//:com_typesafe_akka_akka_testkit", "@maven//:io_spray_spray_json", "@maven//:org_mockito_mockito_scala", + "@maven//:org_scalacheck_scalacheck", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalatest_scalatest_matchers_core", "@maven//:org_scalatest_scalatest_shouldmatchers", "@maven//:org_scalatest_scalatest_wordspec", + "@maven//:org_scalatestplus_scalacheck_1_15", "@maven//:org_scalaz_scalaz_core", "@maven//:org_scala_lang_modules_scala_collection_compat", "@maven//:org_scala_lang_modules_scala_parallel_collections", diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffset.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/participant/util/HexOffset.scala similarity index 94% rename from ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffset.scala rename to ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/participant/util/HexOffset.scala index 777cc86d7ac3..0118f37ab9b2 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffset.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/participant/util/HexOffset.scala @@ -1,7 +1,7 @@ // Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -package com.daml.ledger.participant.state.kvutils.deduplication +package com.daml.platform.participant.util import java.math.BigInteger diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffsetSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/participant/util/HexOffsetSpec.scala similarity index 96% rename from ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffsetSpec.scala rename to ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/participant/util/HexOffsetSpec.scala index 98a5c3bc13c2..5e1d9c11fcd4 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/deduplication/HexOffsetSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/participant/util/HexOffsetSpec.scala @@ -1,7 +1,7 @@ // Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -package com.daml.ledger.participant.state.kvutils.deduplication +package com.daml.platform.participant.util import java.math.BigInteger diff --git a/ledger/ledger-api-test-tool-on-canton/BUILD.bazel b/ledger/ledger-api-test-tool-on-canton/BUILD.bazel index f0ff758bb077..3c8fc5944398 100644 --- a/ledger/ledger-api-test-tool-on-canton/BUILD.bazel +++ b/ledger/ledger-api-test-tool-on-canton/BUILD.bazel @@ -84,7 +84,8 @@ conformance_test( "DeeplyNestedValueIT", # FIXME: Too deeply nested values flake with a time out (half of the time) "CommandDeduplicationIT:SimpleDeduplicationBasic", # sync vs async error (part of canton #6301) "CommandDeduplicationIT:DeduplicateSubmitterBasic", - "CommandDeduplicationIT:DeduplicateUsingOffsets", # temporary disabled until we adapt it for canton + "CommandDeduplicationIT:DeduplicateUsingDurations", # disabled while Canton does not support the feature descriptors + "CommandDeduplicationIT:DeduplicateUsingOffsets", # disabled while Canton does not support the feature descriptors "RaceConditionIT:RWArchiveVsFailedLookupByKey", # finding a lookup failure after contract creation # dynamic config management not supported by Canton "ConfigManagementServiceIT", diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala index 21aba3c3c581..e54853f53649 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala @@ -3,11 +3,9 @@ package com.daml.ledger.api.testtool.infrastructure -import java.time.Instant - import com.daml.ledger.api.testtool.infrastructure.FutureAssertions.ExpectedFailureException +import com.daml.ledger.api.testtool.infrastructure.time.DelayMechanism import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.timer.Delayed import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -50,9 +48,10 @@ object FutureAssertions { /** Runs the test case after the specified delay */ def assertAfter[V]( - delay: FiniteDuration - )(test: => Future[V]): Future[V] = - Delayed.Future.by(delay)(test) + delay: FiniteDuration, + delayMechanism: DelayMechanism, + )(test: => Future[V])(implicit executionContext: ExecutionContext): Future[V] = + delayMechanism.delayBy(delay).flatMap(_ => test) /** Run the test every [[retryDelay]] up to [[maxRetryDuration]]. * The test case will run up to [[ceil(maxRetryDuration / retryDelay)]] times. @@ -62,6 +61,7 @@ object FutureAssertions { def succeedsEventually[V]( retryDelay: FiniteDuration = 100.millis, maxRetryDuration: FiniteDuration, + delayMechanism: DelayMechanism, description: String, )( test: => Future[V] @@ -75,7 +75,7 @@ object FutureAssertions { ) } else - assertAfter(retryDelay)(test).recoverWith { case NonFatal(ex) => + assertAfter(retryDelay, delayMechanism)(test).recoverWith { case NonFatal(ex) => logger.debug( s"Failed assertion: $description. Running again with new max duration $nextRetryRemainingDuration", ex, @@ -87,40 +87,6 @@ object FutureAssertions { internalSucceedsEventually(maxRetryDuration) } - /** Run the test every [[rerunDelay]] for a duration of [[succeedDuration]] or until the current time exceeds [[succeedDeadline]]. - * The assertion will succeed if all of the test case runs are successful and [[succeedDuration]] is exceeded or [[succeedDeadline]] is exceeded. - * The assertion will fail if any test case runs fail. - */ - def succeedsUntil[V]( - rerunDelay: FiniteDuration = 100.millis, - succeedDuration: FiniteDuration, - succeedDeadline: Option[Instant] = None, - )( - test: => Future[V] - )(implicit ec: ExecutionContext, loggingContext: LoggingContext): Future[V] = { - def internalSucceedsUntil(remainingDuration: FiniteDuration): Future[V] = { - val nextRerunRemainingDuration = remainingDuration - rerunDelay - if ( - succeedDeadline.exists( - _.isBefore(Instant.now().plusSeconds(rerunDelay.toSeconds)) - ) || nextRerunRemainingDuration < Duration.Zero - ) test - else - assertAfter(rerunDelay)(test) - .flatMap { _ => - internalSucceedsUntil(nextRerunRemainingDuration) - } - } - - internalSucceedsUntil(succeedDuration) - .andThen { case Failure(exception) => - logger.error( - s"Repeated assertion failed with a succeed duration of $succeedDuration.", - exception, - ) - } - } - def forAllParallel[T]( data: Seq[T] )( diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/assertions/CommandDeduplicationAssertions.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/assertions/CommandDeduplicationAssertions.scala index eb5889dd6b15..465518782d4d 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/assertions/CommandDeduplicationAssertions.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/assertions/CommandDeduplicationAssertions.scala @@ -15,7 +15,9 @@ import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationPeriodSu DurationSupport, OffsetSupport, } +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.client.binding.Primitive.Party +import com.daml.lf.data.Ref import com.daml.platform.testing.WithTimeout import com.google.protobuf.duration.{Duration => DurationProto} import io.grpc.Status.Code @@ -51,37 +53,84 @@ object CommandDeduplicationAssertions { completionResponse.completion.deduplicationPeriod.deduplicationOffset, "No deduplication offset has been reported", ) + val reportedHexOffset = Ref.HexString.assertFromString(reportedOffset) if (completionResponse.completion.getStatus.code == Code.ALREADY_EXISTS.value) { - // Search for the first accepting completion with the same command ID since the beginning of the test case. - // Multiple consecutive accepting completions are not supported. - val completionStreamRequest = ledger.completionStreamRequest()(submittingParty) - WithTimeout(5.seconds)( - ledger - .findCompletion(completionStreamRequest)(c => - c.commandId == c.commandId && c.getStatus.code == Code.OK.value - ) - ).map { optAcceptedCompletionResponse => - val acceptedCompletionResponse = assertDefined( - optAcceptedCompletionResponse, - s"No accepted completion with the command ID '${completionResponse.completion.commandId}' has been found", - ) - assert( - acceptedCompletionResponse.offset.getAbsolute > reportedOffset, - s"No accepted completion with the command ID '${completionResponse.completion.commandId}' after the reported offset $reportedOffset has been found", - ) - assert( - acceptedCompletionResponse.offset.getAbsolute < completionResponse.offset.getAbsolute, - s"No accepted completion with the command ID '${completionResponse.completion.commandId}' before the completion's offset ${completionResponse.offset} has been found", - ) - } + assertReportedOffsetForDuplicateSubmission( + reportedHexOffset, + completionResponse, + submittingParty, + ledger, + ) } else { - fail("This case is not yet supported by this assertion") + assertReportedOffsetForAcceptedSubmission( + reportedHexOffset, + requestedDuration, + completionResponse, + submittingParty, + ledger, + ) } case DurationSupport.Unrecognized(_) => fail("Unrecognized deduplication duration support") } } + private def assertReportedOffsetForDuplicateSubmission( + reportedOffset: Ref.HexString, + completionResponse: CompletionResponse, + submittingParty: Party, + ledger: ParticipantTestContext, + )(implicit executionContext: ExecutionContext) = + WithTimeout(5.seconds)( + ledger.findCompletionAtOffset( + reportedOffset, + c => c.commandId == completionResponse.completion.commandId && c.getStatus.code == Code.OK.value, + )(submittingParty) + ).map { optAcceptedCompletionResponse => + val acceptedCompletionResponse = assertDefined( + optAcceptedCompletionResponse, + s"No accepted completion with the command ID '${completionResponse.completion.commandId}' since the reported offset $reportedOffset has been found", + ) + assert( + acceptedCompletionResponse.offset.getAbsolute < completionResponse.offset.getAbsolute, + s"An accepted completion with the command ID '${completionResponse.completion.commandId}' at the offset ${acceptedCompletionResponse.offset} that is not before the completion's offset ${completionResponse.offset} has been found", + ) + } + + private def assertReportedOffsetForAcceptedSubmission( + reportedOffset: Ref.HexString, + requestedDuration: Duration, + completionResponse: CompletionResponse, + submittingParty: Party, + ledger: ParticipantTestContext, + )(implicit executionContext: ExecutionContext) = + WithTimeout(5.seconds)( + ledger.findCompletionAtOffset( + reportedOffset, + _.commandId == completionResponse.completion.commandId, + )(submittingParty) + ).map { optReportedOffsetCompletionResponse => + val reportedOffsetCompletionResponse = assertDefined( + optReportedOffsetCompletionResponse, + s"No completion with the command ID '${completionResponse.completion.commandId}' since the reported offset $reportedOffset has been found", + ) + assert( + reportedOffsetCompletionResponse.offset == LedgerOffset.of( + LedgerOffset.Value.Absolute(reportedOffset) + ), + s"No completion with the reported offset $reportedOffset has been found, the ${reportedOffsetCompletionResponse.offset} offset has been found instead", + ) + val durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes = Duration + .between( + reportedOffsetCompletionResponse.recordTime, + completionResponse.recordTime, + ) + assert( + durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes >= requestedDuration, + s"The requested deduplication duration $requestedDuration was greater than the duration between the reported deduplication offset and completion record times ($durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes).", + ) + } + def assertDeduplicationOffset( requestedDeduplicationOffsetCompletionResponse: CompletionResponse, completionResponse: CompletionResponse, diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala index 20964a5df7de..97903d51a614 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala @@ -78,7 +78,9 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.api.v1.value.{Identifier, Value} import com.daml.ledger.client.binding.Primitive.Party import com.daml.ledger.client.binding.{Primitive, Template} +import com.daml.lf.data.Ref import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.participant.util.HexOffset import com.daml.platform.testing.StreamConsumer import com.google.protobuf.ByteString import io.grpc.health.v1.health.{HealthCheckRequest, HealthCheckResponse} @@ -685,6 +687,20 @@ private[testtool] final class ParticipantTestContext private[participant] ( def firstCompletions(parties: Party*): Future[Vector[Completion]] = firstCompletions(completionStreamRequest()(parties: _*)) + def findCompletionAtOffset( + offset: Ref.HexString, + p: Completion => Boolean, + )(parties: Party*): Future[Option[CompletionResponse]] = { + // We have to request an offset before the reported offset, as offsets are exclusive in the completion service. + val offsetPreviousToReportedOffset = HexOffset + .previous(offset) + .map(offset => LedgerOffset.of(LedgerOffset.Value.Absolute(offset))) + .getOrElse(referenceOffset) + val reportedOffsetCompletionStreamRequest = + completionStreamRequest(offsetPreviousToReportedOffset)(parties: _*) + findCompletion(reportedOffsetCompletionStreamRequest)(p) + } + def findCompletion( request: CompletionStreamRequest )(p: Completion => Boolean): Future[Option[CompletionResponse]] = diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala new file mode 100644 index 000000000000..8ab8d89b2a19 --- /dev/null +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala @@ -0,0 +1,31 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.testtool.infrastructure.time + +import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.timer.Delayed + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.{Duration, FiniteDuration} + +trait DelayMechanism { + val skews: FiniteDuration + def delayBy(duration: Duration): Future[Unit] +} + +class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext) + extends DelayMechanism { + override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(()) +} + +class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit + ec: ExecutionContext +) extends DelayMechanism { + override def delayBy(duration: Duration): Future[Unit] = + ledger + .time() + .flatMap { currentTime => + ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis)) + } +} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index 8fe320d61a0f..aae18c346df8 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -23,7 +23,7 @@ import com.daml.ledger.api.testtool.infrastructure.participant.{ Features, ParticipantTestContext, } -import com.daml.ledger.api.testtool.suites.CommandDeduplicationIT.{ +import com.daml.ledger.api.testtool.infrastructure.time.{ DelayMechanism, StaticTimeDelayMechanism, TimeDelayMechanism, @@ -43,7 +43,6 @@ import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, Tex import com.daml.lf.data.Ref import com.daml.lf.data.Ref.{LedgerString, SubmissionId} import com.daml.logging.LoggingContext -import com.daml.timer.Delayed import io.grpc.Status.Code import org.slf4j.{Logger, LoggerFactory} @@ -82,7 +81,7 @@ final class CommandDeduplicationIT( updateSubmissionId(request, firstAcceptedSubmissionId), party, ) - optCompletionResponse <- submitRequestAndAssertDeduplication( + _ <- submitRequestAndAssertDeduplication( ledger, updateWithFreshSubmissionId(request), firstAcceptedSubmissionId, @@ -95,16 +94,6 @@ final class CommandDeduplicationIT( party, noOfActiveContracts = 1, ) - _ <- - if (!ledger.features.commandDeduplicationFeatures.deduplicationType.isSyncOnly) { - val completion = assertDefined(optCompletionResponse, "No completion has been produced") - assertDeduplicationDuration( - deduplicationDuration.asProtobuf, - completion, - party, - ledger, - ) - } else Future.unit } yield { assert( response.completion.commandId == request.commands.get.commandId, @@ -321,22 +310,16 @@ final class CommandDeduplicationIT( LedgerString.assertFromString(firstAcceptedCommand.completion.submissionId), firstAcceptedCommand.offset, ) - deduplicationDurationFromPeriod = duplicateResponse - .map(_.completion.deduplicationPeriod) - .map { - case CompletionDeduplicationPeriod.Empty => - throw new IllegalStateException("received empty completion") - case CompletionDeduplicationPeriod.DeduplicationOffset(_) => - deduplicationDuration - case CompletionDeduplicationPeriod.DeduplicationDuration(value) => - value.asScala - } - .getOrElse(deduplicationDuration + delay.skews) - .asInstanceOf[FiniteDuration] + deduplicationDurationFromPeriod = extractDurationFromDeduplicationPeriod( + deduplicationCompletionResponse = duplicateResponse, + defaultDuration = deduplicationDuration, + delayMechanism = delay, + ) eventuallyAccepted <- succeedsEventually( maxRetryDuration = deduplicationDurationFromPeriod + delay.skews + 10.seconds, description = s"Deduplication period expires and request is accepted for command ${submitRequest.getCommands}.", + delayMechanism = delay, ) { submitAndAssertAccepted(thirdCall) } @@ -468,6 +451,85 @@ final class CommandDeduplicationIT( } yield {} }) + testGivenAllParticipants( + shortIdentifier = s"DeduplicateUsingDurations", + description = "Deduplicate commands within the deduplication period defined by a duration", + participants = allocate(SingleParty), + runConcurrently = false, // updates the time model + enabled = !_.commandDeduplicationFeatures.deduplicationType.isSyncOnly, + disabledReason = + "Most of the assertions run on async responses. Also, ledgers with the sync-only deduplication support use the wall clock for deduplication.", + )(implicit ec => + configuredParticipants => { case Participants(Participant(ledger, party)) => + val request = ledger + .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) + .update( + _.commands.deduplicationPeriod := + DeduplicationPeriod.DeduplicationDuration(deduplicationDuration.asProtobuf) + ) + val firstAcceptedSubmissionId = newSubmissionId() + runWithTimeModel(configuredParticipants) { delay => + for { + completionResponse <- submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(request, firstAcceptedSubmissionId), + party, + ) + optDeduplicationCompletionResponse <- submitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(request), + firstAcceptedSubmissionId, + completionResponse.offset, + party, + ) + deduplicationDurationFromPeriod = extractDurationFromDeduplicationPeriod( + deduplicationCompletionResponse = optDeduplicationCompletionResponse, + defaultDuration = deduplicationDuration, + delayMechanism = delay, + ) + eventuallyAcceptedCompletionResponse <- succeedsEventually( + maxRetryDuration = deduplicationDurationFromPeriod + delay.skews + 10.seconds, + description = + s"The deduplication period expires and the request is accepted for the commands ${request.getCommands}.", + delayMechanism = delay, + ) { + submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(request, firstAcceptedSubmissionId), + party, + ) + } + _ <- assertPartyHasActiveContracts( + ledger, + party, + noOfActiveContracts = 2, + ) + deduplicationCompletionResponse = assertDefined( + optDeduplicationCompletionResponse, + "No completion has been produced", + ) + _ <- assertDeduplicationDuration( + deduplicationDuration.asProtobuf, + deduplicationCompletionResponse, + party, + ledger, + ) + _ <- assertDeduplicationDuration( + deduplicationDuration.asProtobuf, + eventuallyAcceptedCompletionResponse, + party, + ledger, + ) + } yield { + assert( + completionResponse.completion.commandId == request.commands.get.commandId, + "The command ID of the first completion does not match the command ID of the submission", + ) + } + } + } + ) + testGivenAllParticipants( "DeduplicateUsingOffsets", "Deduplicate commands within the deduplication period defined by the offset", @@ -996,29 +1058,23 @@ final class CommandDeduplicationIT( } } } -} - -object CommandDeduplicationIT { - - trait DelayMechanism { - val skews: FiniteDuration - def delayBy(duration: Duration): Future[Unit] - - } - class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext) - extends DelayMechanism { - override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(()) - } + private def extractDurationFromDeduplicationPeriod( + deduplicationCompletionResponse: Option[CompletionResponse], + defaultDuration: FiniteDuration, + delayMechanism: DelayMechanism, + ): FiniteDuration = + deduplicationCompletionResponse + .map(_.completion.deduplicationPeriod) + .map { + case CompletionDeduplicationPeriod.Empty => + throw new IllegalStateException("received empty completion") + case CompletionDeduplicationPeriod.DeduplicationOffset(_) => + defaultDuration + case CompletionDeduplicationPeriod.DeduplicationDuration(value) => + value.asScala + } + .getOrElse(defaultDuration + delayMechanism.skews) + .asInstanceOf[FiniteDuration] - class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit - ec: ExecutionContext - ) extends DelayMechanism { - override def delayBy(duration: Duration): Future[Unit] = - ledger - .time() - .flatMap { currentTime => - ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis)) - } - } } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/CompletionBasedDeduplicationPeriodConverter.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/CompletionBasedDeduplicationPeriodConverter.scala index 80c49f482fbb..7957ebb20a57 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/CompletionBasedDeduplicationPeriodConverter.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/deduplication/CompletionBasedDeduplicationPeriodConverter.scala @@ -12,6 +12,7 @@ import com.daml.ledger.api.v1.command_completion_service.CompletionStreamRespons import com.daml.ledger.participant.state.index.v2.IndexCompletionsService import com.daml.lf.data.Ref import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.participant.util.HexOffset import scala.concurrent.{ExecutionContext, Future} import scala.math.Ordering.Implicits._