From 50deee3ef193a7970d2aade6bbb56cab08e4b439 Mon Sep 17 00:00:00 2001 From: nicu-da Date: Tue, 11 Jan 2022 01:53:55 -0800 Subject: [PATCH] [ledger-api-test-tool] - Merge KVCommandDeduplicationIT and CommandDeduplicationIT (#12257) changelog_begin [ledger-api-test-tool] - Merge command deduplication ledger API tests (`KVCommandDeduplicationIT`, `CommandDeduplicationIT`) into a single suite(`CommandDeduplicationIT`) which uses feature descriptors to handle different participant behaviors changelog_end --- compatibility/bazel_tools/testing.bzl | 22 +- ledger/daml-on-sql/BUILD.bazel | 4 - .../BUILD.bazel | 8 +- ledger/ledger-api-test-tool/BUILD.bazel | 9 +- ledger/ledger-api-test-tool/README.md | 15 - .../api/testtool/LedgerApiTestTool.scala | 7 +- .../infrastructure/FutureAssertions.scala | 10 +- .../CommandDeduplicationBase.scala | 914 ---------------- .../participant/ParticipantTestContext.scala | 17 +- .../suites/CommandDeduplicationIT.scala | 990 +++++++++++++++++- .../CommandDeduplicationParallelIT.scala | 10 +- .../CompletionDeduplicationInfoIT.scala | 2 +- .../suites/KVCommandDeduplicationIT.scala | 140 --- .../ledger/api/testtool/tests/Tests.scala | 14 +- ledger/ledger-on-memory/BUILD.bazel | 8 - ledger/ledger-on-sql/BUILD.bazel | 10 - ledger/participant-state/kvutils/BUILD.bazel | 1 - ledger/sandbox-classic/BUILD.bazel | 29 +- ledger/sandbox-on-x/BUILD.bazel | 9 - ledger/sandbox/BUILD.bazel | 4 - .../NonRepudiationProxyConformance.scala | 17 +- 21 files changed, 1028 insertions(+), 1212 deletions(-) delete mode 100644 ledger/ledger-api-test-tool/README.md delete mode 100644 ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala delete mode 100644 ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala diff --git a/compatibility/bazel_tools/testing.bzl b/compatibility/bazel_tools/testing.bzl index 04562f2fc186..e073ccef9370 100644 --- a/compatibility/bazel_tools/testing.bzl +++ b/compatibility/bazel_tools/testing.bzl @@ -533,13 +533,24 @@ excluded_test_tool_tests = [ }, { "start": "1.18.0", + "platform_ranges": [ + { + "end": "1.18.0", + "exclusions": [ + "CommandDeduplicationIT", # Latest version of the test is dependent on having the submission id populated + ], + }, + ], + }, + { + "start": "1.18.0", + "end": "2.0.0-snapshot.20220105.8777.1", # was removed in 2.0 "platform_ranges": [ { "end": "1.18.0", "exclusions": [ # Exclude dedup tests due to large number of changes (removed participant deduplication, switch to append-only schema, changes in deduplication duration) "KVCommandDeduplicationIT", - "CommandDeduplicationIT", # Latest version of the test is dependent on having the submission id populated ], }, ], @@ -575,14 +586,13 @@ excluded_test_tool_tests = [ "end": "1.18.0", "exclusions": [ # Unexpected failure (StatusRuntimeException) ALREADY_EXISTS: DUPLICATE_COMMAND(10,KVComman): - "KVCommandDeduplicationIT:KVCommandDeduplicationSimpleDeduplicationMixedClients", + "CommandDeduplicationIT:DeduplicationMixedClients", # Assertion failed: Expecting completion with status code OK but completion has status Some(Status(6,DUPLICATE_COMMAND(10,972fae4b) - "KVCommandDeduplicationIT:KVCommandDeduplicationSimpleDeduplicationBasic", + "CommandDeduplicationIT:SimpleDeduplicationBasic", # Unexpected failure (StatusRuntimeException) ALREADY_EXISTS: DUPLICATE_COMMAND(10,KVComman): - "KVCommandDeduplicationIT:KVCommandDeduplicationSimpleDeduplicationCommandClient", + "CommandDeduplicationIT:SimpleDeduplicationCommandClient", # Offsets are not supported for versions < 2.0.0 - "KVCommandDeduplicationIT:KVCommandDeduplicationDeduplicateUsingOffsets", - "CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", + "CommandDeduplicationIT:DeduplicateUsingOffsets", # Actual error id (INCONSISTENT) does not match expected error id (DUPLICATE_CONTRACT_KEY} "ExceptionsIT:ExRollbackDuplicateKeyCreated", "ExceptionsIT:ExRollbackDuplicateKeyArchived", diff --git a/ledger/daml-on-sql/BUILD.bazel b/ledger/daml-on-sql/BUILD.bazel index d5269caa83c2..0ba79f2560f2 100644 --- a/ledger/daml-on-sql/BUILD.bazel +++ b/ledger/daml-on-sql/BUILD.bazel @@ -119,10 +119,6 @@ conformance_test( "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", "--additional=ContractIdIT:Accept", - # Exclude offset command deduplication tests - # daml-on-sql uses the the sandbox server which has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", ], diff --git a/ledger/ledger-api-test-tool-on-canton/BUILD.bazel b/ledger/ledger-api-test-tool-on-canton/BUILD.bazel index 00c9b2fd1ba4..dbbdad7e93ad 100644 --- a/ledger/ledger-api-test-tool-on-canton/BUILD.bazel +++ b/ledger/ledger-api-test-tool-on-canton/BUILD.bazel @@ -88,16 +88,14 @@ conformance_test( ",ContractKeysIT:CKFetchOrLookup,ContractKeysIT:CKNoFetchUndisclosed,ContractKeysIT:CKMaintainerScoped" + ",ExceptionsIT,ExceptionRaceConditionIT" + # need UCK mode - added below ",DeeplyNestedValueIT" + # FIXME: Too deeply nested values flake with a time out (half of the time) - ",CommandDeduplicationIT:ParticipantCommandDeduplicationSimpleDeduplicationBasic" + # sync vs async error (part of canton #6301) - ",CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateSubmitterBasic" + - ",CommandDeduplicationIT:ParticipantCommandDeduplicationSimpleDeduplicationMixedClients" + - ",CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets" + # temporary disabled until we adapt it for canton + ",CommandDeduplicationIT:SimpleDeduplicationBasic" + # sync vs async error (part of canton #6301) + ",CommandDeduplicationIT:DeduplicateSubmitterBasic" + + ",CommandDeduplicationIT:DeduplicateUsingOffsets" + # temporary disabled until we adapt it for canton # Also exclude "optional tests" - which are run separately below ",CompletionDeduplicationInfoITCommandService" + ",CompletionDeduplicationInfoITCommandSubmissionService" + ",CommandDeduplicationParallelIT" + ",ContractIdIT" + - ",KVCommandDeduplicationIT" + # only for KV-utils ",MultiPartySubmissionIT" + ",ParticipantPruningIT" + ",TLSOnePointThreeIT" + diff --git a/ledger/ledger-api-test-tool/BUILD.bazel b/ledger/ledger-api-test-tool/BUILD.bazel index 7dd14d0a6395..64a320738421 100644 --- a/ledger/ledger-api-test-tool/BUILD.bazel +++ b/ledger/ledger-api-test-tool/BUILD.bazel @@ -129,17 +129,19 @@ da_scala_binary( "//ledger/error", "//ledger/ledger-api-common", "//ledger/ledger-resources", - "//libs-scala/resources", - "//libs-scala/resources-akka", - "//libs-scala/resources-grpc", "//ledger/test-common:test-common-%s" % lf_version, "//ledger/test-common:package_management-tests-%s.scala" % lf_version, "//ledger/test-common:model-tests-%s.scala" % lf_version, "//ledger/test-common:performance-tests-%s.scala" % lf_version, "//ledger/test-common:semantic-tests-%s.scala" % lf_version, "//ledger/test-common:dar-files-%s-lib" % lf_version, + "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", + "//libs-scala/resources", + "//libs-scala/resources-akka", + "//libs-scala/resources-grpc", "//libs-scala/timer-utils", + "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:io_grpc_grpc_api", "@maven//:io_grpc_grpc_context", "@maven//:io_grpc_grpc_netty", @@ -256,7 +258,6 @@ conformance_test( "--additional=LotsOfPartiesIT", "--additional=TransactionScaleIT", "--additional=TLSOnePointThreeIT", - "--exclude=CommandDeduplicationIT", # Makes sure that deprecated CLI options can still be used to make sure existing CI pipelines are not broken. # This test should fail if any deprecated CLI option has any effect whatsoever -- they are preserved # exclusively for backwards-compatibility. diff --git a/ledger/ledger-api-test-tool/README.md b/ledger/ledger-api-test-tool/README.md deleted file mode 100644 index a9fe3ca9a3e6..000000000000 --- a/ledger/ledger-api-test-tool/README.md +++ /dev/null @@ -1,15 +0,0 @@ -### Command deduplication test suites - -As different ledgers have different support for command deduplication, we provide multiple test suites which try to be as comprehensive as possible for all the various ledgers - -* default - is it a default test suite which does not need to be included explicitly -* append-only - Requires the schema to be append-only because we use the submission id set in the completion, which is - present only for append-only schemas -* configuration-required - If it requires specific settings to be set for the ledger configuration - -| Name | Default | Append only | Configuration required | Details | -| --- | --- | --- | --- | --- | -|CommandDeduplicationIT|Yes|No| No |Tests participant deduplication| -|KVCommandDeduplicationIT| No | No | minSkew set to 1 second. maxDeduplicationDuration has to be < 5s | Extends the test cases from `CommandDeduplicationIT` with committer side test cases. Requires the time model update because KV committer deduplication is based on maxDeduplicationDuration + minSkew| -|AppendOnlyKVCommandDeduplicationIT|No|Yes|Same as KVCommandDeduplicationIT | Same as `KVCommandDeduplicationIT` but it requires an append-only schema so that we have access to the submission id| -|AppendOnlyCommandDeduplicationParallelIT | No |Yes| No | Requires append only schema so that we have access to the submission id. | \ No newline at end of file diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/LedgerApiTestTool.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/LedgerApiTestTool.scala index 5f6ba043e2d3..0a739eab0967 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/LedgerApiTestTool.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/LedgerApiTestTool.scala @@ -92,14 +92,9 @@ object LedgerApiTestTool { val defaultTests: Vector[LedgerTestSuite] = Tests.default( timeoutScaleFactor = config.timeoutScaleFactor, - ledgerClockGranularity = config.ledgerClockGranularity, - staticTime = config.staticTime, - ) - val optionalTests: Vector[LedgerTestSuite] = Tests.optional( - timeoutScaleFactor = config.timeoutScaleFactor, - ledgerClockGranularity = config.ledgerClockGranularity, staticTime = config.staticTime, ) + val optionalTests: Vector[LedgerTestSuite] = Tests.optional() val visibleTests: Vector[LedgerTestSuite] = defaultTests ++ optionalTests val allTests: Vector[LedgerTestSuite] = visibleTests ++ Tests.retired val allTestCaseNames: Set[String] = allTests.flatMap(_.tests).map(_.name).toSet diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala index 1d4bc06a46a9..6f4bf86de6f2 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/FutureAssertions.scala @@ -123,7 +123,9 @@ object FutureAssertions { def forAllParallel[T]( data: Seq[T] - )(testCase: T => Future[Unit])(implicit ec: ExecutionContext): Future[Seq[Unit]] = Future + )( + testCase: T => Future[Unit] + )(implicit ec: ExecutionContext, loggingContext: LoggingContext): Future[Seq[Unit]] = Future .traverse(data)(input => testCase(input).map(Right(_)).recover { case NonFatal(ex) => Left(input -> ex) @@ -131,14 +133,16 @@ object FutureAssertions { ) .map { results => val (failures, successes) = results.partitionMap(identity) - if (failures.nonEmpty) + if (failures.nonEmpty) { + failures + .foreach(res => logger.error(s"Failed parallel test case for input ${res._1}", res._2)) throw ParallelTestFailureException( s"Failed parallel test case. Failures: ${failures.length}. Success: ${successes.length}\nFailed inputs: ${failures .map(_._1) .mkString("[", ",", "]")}", failures.last._2, ) - else successes + } else successes } def optionalAssertion(runs: Boolean, description: String)( diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala deleted file mode 100644 index daa083258828..000000000000 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ /dev/null @@ -1,914 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.api.testtool.infrastructure.deduplication - -import java.util - -import com.daml.error.ErrorCode -import com.daml.error.definitions.LedgerApiErrors -import com.daml.grpc.GrpcStatus -import com.daml.ledger.api.SubmissionIdGenerator -import com.daml.ledger.api.testtool.infrastructure.Allocation._ -import com.daml.ledger.api.testtool.infrastructure.Assertions._ -import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite -import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ -import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase._ -import com.daml.ledger.api.testtool.infrastructure.participant.{Features, ParticipantTestContext} -import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest -import com.daml.ledger.api.v1.command_submission_service.SubmitRequest -import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod -import com.daml.ledger.api.v1.completion.Completion -import com.daml.ledger.api.v1.ledger_offset.LedgerOffset -import com.daml.ledger.api.v1.version_service.DeduplicationPeriodSupport.OffsetSupport -import com.daml.ledger.client.binding.Primitive.Party -import com.daml.ledger.test.model.DA.Types.Tuple2 -import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, TextKeyOperations} -import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.SubmissionId -import com.daml.timer.Delayed -import io.grpc.Status.Code - -import scala.annotation.nowarn -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -@nowarn("msg=deprecated") -private[testtool] abstract class CommandDeduplicationBase( - timeoutScaleFactor: Double, - ledgerTimeInterval: FiniteDuration, - staticTime: Boolean, -) extends LedgerTestSuite { - - val deduplicationDuration: FiniteDuration = scaledDuration(3.seconds) - - val ledgerWaitInterval: FiniteDuration = ledgerTimeInterval * 2 - val defaultDeduplicationWindowWait: FiniteDuration = deduplicationDuration + ledgerWaitInterval - - protected def runWithDeduplicationDelay( - participants: Seq[ParticipantTestContext] - )( - testWithDelayMechanism: DelayMechanism => Future[Unit] - )(implicit ec: ExecutionContext): Future[Unit] - - protected def testNamingPrefix: String - - testGivenAllParticipants( - s"${testNamingPrefix}SimpleDeduplicationBasic", - "Deduplicate commands within the deduplication duration window", - allocate(SingleParty), - runConcurrently = false, - )(implicit ec => - configuredParticipants => { case Participants(Participant(ledger, party)) => - val request = ledger - .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) - .update( - _.commands.deduplicationPeriod := - DeduplicationPeriod.DeduplicationTime(deduplicationDuration.asProtobuf) - ) - runWithDeduplicationDelay(configuredParticipants) { delay => - val acceptedSubmissionId1 = newSubmissionId() - val acceptedSubmissionId2 = newSubmissionId() - - for { - // Submit command (first deduplication window) - // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, - // only one submission is therefore sent to the ledger. - (offset1, completion1) <- submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(request, acceptedSubmissionId1), - party, - ) - _ <- submitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(request), - acceptedSubmissionId1, - offset1, - party, - ) - // Wait until the end of first deduplication window - _ <- delay.delayForEntireDeduplicationPeriod() - - // Submit command (second deduplication window) - // Note: the deduplication window is guaranteed to have passed on both - // the ledger API server and the ledger itself, since the test waited more than - // `deduplicationSeconds` after receiving the first command *completion*. - // The first submit() in this block should therefore lead to an accepted transaction. - (offset2, completion2) <- submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(request, acceptedSubmissionId2), - party, - ) - _ <- submitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(request), - acceptedSubmissionId2, - offset2, - party, - ) - // Inspect created contracts - _ <- assertPartyHasActiveContracts( - ledger, - party, - noOfActiveContracts = 2, - ) - } yield { - assert( - completion1.commandId == request.commands.get.commandId, - "The command ID of the first completion does not match the command ID of the submission", - ) - assert( - completion2.commandId == request.commands.get.commandId, - "The command ID of the second completion does not match the command ID of the submission", - ) - } - } - } - ) - - test( - s"${testNamingPrefix}StopOnSubmissionFailure", - "Stop deduplicating commands on submission failure", - allocate(TwoParties), - )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => - // Do not set the deduplication timeout. - // The server will default to the maximum possible deduplication timeout. - val requestA = ledger.submitRequest(alice, Dummy(bob).create.command) - - for { - // Submit an invalid command (should fail with INVALID_ARGUMENT) - _ <- submitRequestAndAssertSyncFailure( - ledger, - requestA, - Code.INVALID_ARGUMENT, - LedgerApiErrors.CommandExecution.Interpreter.AuthorizationError, - ) - - // Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS) - _ <- submitRequestAndAssertSyncFailure( - ledger, - updateWithFreshSubmissionId(requestA), - Code.INVALID_ARGUMENT, - LedgerApiErrors.CommandExecution.Interpreter.AuthorizationError, - ) - } yield {} - }) - - test( - s"${testNamingPrefix}StopOnCompletionFailure", - "Stop deduplicating commands on completion failure", - allocate(SingleParty), - )(implicit ec => { case Participants(Participant(ledger, party)) => - val key = ledger.nextKeyId() - - for { - // Create a helper and a text key - ko <- ledger.create(party, TextKeyOperations(party)) - _ <- ledger.create(party, TextKey(party, key, List())) - - // Create two competing requests - requestA = ledger.submitAndWaitRequest( - party, - ko.exerciseTKOFetchAndRecreate(party, Tuple2(party, key)).command, - ) - requestB = ledger.submitAndWaitRequest( - party, - ko.exerciseTKOFetchAndRecreate(party, Tuple2(party, key)).command, - ) - - // Submit both requests in parallel. - // Either both succeed (if one transaction is recorded faster than the other submission starts command interpretation, unlikely) - // Or one submission is rejected (if one transaction is recorded during the call of lookupMaximumLedgerTime() in [[LedgerTimeHelper]], unlikely) - // Or one transaction is rejected (this is what we want to test) - submissionResults <- Future.traverse(List(requestA, requestB))(request => - ledger.submitAndWait(request).transform(result => Success(request -> result)) - ) - - // Resubmit a failed command. - // No matter what the rejection reason was (hopefully it was a rejected transaction), - // a resubmission of exactly the same command should succeed. - _ <- submissionResults - .collectFirst { case (request, Failure(_)) => request } - .fold(Future.unit)(request => ledger.submitAndWait(updateWithFreshSubmissionId(request))) - } yield { - () - } - }) - - testGivenAllParticipants( - s"${testNamingPrefix}SimpleDeduplicationCommandClient", - "Deduplicate commands within the deduplication time window using the command client", - allocate(SingleParty), - runConcurrently = false, - )(implicit ec => - configuredParticipants => { case Participants(Participant(ledger, party)) => - val request = ledger - .submitAndWaitRequest(party, Dummy(party).create.command) - .update( - _.commands.deduplicationTime := deduplicationDuration.asProtobuf - ) - val acceptedSubmissionId1 = newSubmissionId() - val acceptedSubmissionId2 = newSubmissionId() - runWithDeduplicationDelay(configuredParticipants) { delay => - for { - // Submit command (first deduplication window) - (completionOffset1, _) <- submitAndWaitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(request, acceptedSubmissionId1), - party, - ) - _ <- submitAndWaitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(request), - acceptedSubmissionId1, - completionOffset1, - ) - - // Wait until the end of first deduplication window - _ <- delay.delayForEntireDeduplicationPeriod() - - // Submit command (second deduplication window) - (completionOffset2, _) <- submitAndWaitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(request, acceptedSubmissionId2), - party, - ) - _ <- submitAndWaitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(request), - acceptedSubmissionId2, - completionOffset2, - ) - - // Inspect created contracts - _ <- assertPartyHasActiveContracts( - ledger, - party, - noOfActiveContracts = 2, - ) - } yield {} - } - } - ) - - // staticTime - we run calls in parallel and with static time we would need to advance the time, - // therefore this cannot be run in static time - testGivenAllParticipants( - s"${testNamingPrefix}SimpleDeduplicationMixedClients", - "Deduplicate commands within the deduplication time window using the command client and the command submission client", - allocate(SingleParty), - enabled = _ => !staticTime, - disabledReason = "Cannot work in static time as we run multiple test cases in parallel", - )(implicit ec => - configuredParticipants => { case Participants(Participant(ledger, party)) => - def generateVariations(elements: List[List[Boolean]]): List[List[Boolean]] = - elements match { - case Nil => List(Nil) - case currentElement :: tail => - currentElement.flatMap(value => generateVariations(tail).map(value :: _)) - } - - runWithDeduplicationDelay(configuredParticipants) { delay => - { - val numberOfCalls = 4 - Future // cover all the different generated variations of submit and submitAndWait - .traverse(generateVariations(List.fill(numberOfCalls)(List(true, false)))) { - case firstCall :: secondCall :: thirdCall :: fourthCall :: Nil => - val submitAndWaitRequest = ledger - .submitAndWaitRequest(party, Dummy(party).create.command) - .update( - _.commands.deduplicationTime := deduplicationDuration.asProtobuf - ) - val submitRequest = ledger - .submitRequest(party, Dummy(party).create.command) - .update( - _.commands.commandId := submitAndWaitRequest.getCommands.commandId, - _.commands.deduplicationTime := deduplicationDuration.asProtobuf, - ) - - def submitAndAssertAccepted( - submitAndWait: Boolean, - acceptedSubmissionId: SubmissionId, - ): Future[LedgerOffset] = { - if (submitAndWait) - submitAndWaitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(submitAndWaitRequest, acceptedSubmissionId), - party, - ) - .map(_._1) - else - submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(submitRequest, acceptedSubmissionId), - party, - ) - .map(_._1) - } - - def submitAndAssertDeduplicated( - submitAndWait: Boolean, - acceptedSubmissionId: SubmissionId, - acceptedLedgerOffset: LedgerOffset, - ): Future[Unit] = - if (submitAndWait) - submitAndWaitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(submitAndWaitRequest), - acceptedSubmissionId, - acceptedLedgerOffset, - ) - else - submitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(submitRequest), - acceptedSubmissionId, - acceptedLedgerOffset, - party, - ) - - val acceptedSubmissionId1 = newSubmissionId() - val acceptedSubmissionId2 = newSubmissionId() - for { - // Submit command (first deduplication window) - ledgerOffset1 <- submitAndAssertAccepted(firstCall, acceptedSubmissionId1) - _ <- submitAndAssertDeduplicated( - secondCall, - acceptedSubmissionId1, - ledgerOffset1, - ) - - // Wait until the end of first deduplication window - _ <- delay.delayForEntireDeduplicationPeriod() - - // Submit command (second deduplication window) - ledgerOffset2 <- submitAndAssertAccepted(thirdCall, acceptedSubmissionId2) - _ <- submitAndAssertDeduplicated( - fourthCall, - acceptedSubmissionId2, - ledgerOffset2, - ) - } yield {} - case _ => throw new IllegalArgumentException("Wrong call list constructed") - } - .flatMap { _ => - assertPartyHasActiveContracts( - ledger, - party = party, - noOfActiveContracts = 32, // 16 test cases, with 2 contracts per test case - ) - } - } - } - } - ) - - test( - s"${testNamingPrefix}DeduplicateSubmitterBasic", - "Commands with identical submitter and command identifier should be deduplicated by the submission client", - allocate(TwoParties), - )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => - val aliceRequest = ledger.submitRequest(alice, Dummy(alice).create.command) - val bobRequest = ledger - .submitRequest(bob, Dummy(bob).create.command) - .update(_.commands.commandId := aliceRequest.getCommands.commandId) - - val aliceAcceptedSubmissionId = newSubmissionId() - val bobAcceptedSubmissionId = newSubmissionId() - - for { - // Submit a command as alice - (aliceCompletionOffset, _) <- submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(aliceRequest, aliceAcceptedSubmissionId), - alice, - ) - _ = println(s"Alice completion offset: $aliceCompletionOffset") - _ <- submitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(aliceRequest), - aliceAcceptedSubmissionId, - aliceCompletionOffset, - alice, - ) - - // Submit another command that uses same commandId, but is submitted by Bob - (bobCompletionOffset, _) <- submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(bobRequest, bobAcceptedSubmissionId), - bob, - ) - _ = println(s"Bob completion offset: $aliceCompletionOffset") - _ <- submitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(bobRequest), - bobAcceptedSubmissionId, - bobCompletionOffset, - bob, - ) - _ <- assertPartyHasActiveContracts( - ledger, - party = alice, - noOfActiveContracts = 1, - ) - _ <- assertPartyHasActiveContracts( - ledger, - party = bob, - noOfActiveContracts = 1, - ) - } yield {} - }) - - test( - s"${testNamingPrefix}DeduplicateSubmitterCommandClient", - "Commands with identical submitter and command identifier should be deduplicated by the command client", - allocate(TwoParties), - )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => - val aliceRequest = ledger.submitAndWaitRequest(alice, Dummy(alice).create.command) - val bobRequest = ledger - .submitAndWaitRequest(bob, Dummy(bob).create.command) - .update(_.commands.commandId := aliceRequest.getCommands.commandId) - - val aliceAcceptedSubmissionId = newSubmissionId() - val bobAcceptedSubmissionId = newSubmissionId() - for { - // Submit a command as alice - (aliceCompletionOffset, _) <- submitAndWaitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(aliceRequest, aliceAcceptedSubmissionId), - alice, - ) - _ <- submitAndWaitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(aliceRequest), - aliceAcceptedSubmissionId, - aliceCompletionOffset, - ) - - // Submit another command that uses same commandId, but is submitted by Bob - (bobCompletionOffset, _) <- submitAndWaitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(bobRequest, bobAcceptedSubmissionId), - bob, - ) - _ <- submitAndWaitRequestAndAssertDeduplication( - ledger, - updateWithFreshSubmissionId(bobRequest), - bobAcceptedSubmissionId, - bobCompletionOffset, - ) - // Inspect the ledger state - _ <- assertPartyHasActiveContracts( - ledger, - party = alice, - noOfActiveContracts = 1, - ) - _ <- assertPartyHasActiveContracts( - ledger, - party = bob, - noOfActiveContracts = 1, - ) - } yield {} - }) - - testGivenAllParticipants( - s"${testNamingPrefix}DeduplicateUsingOffsets", - "Deduplicate commands within the deduplication period defined by the offset", - allocate(SingleParty), - runConcurrently = false, // updates the time model - )(implicit ec => - configuredParticipants => { case Participants(Participant(ledger, party)) => - val request = ledger - .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) - val acceptedSubmissionId = newSubmissionId() - runWithDeduplicationDelay(configuredParticipants) { delay => - for { - (offset1, _) <- submitRequestAndAssertCompletionAccepted( - ledger, - updateSubmissionId(request, acceptedSubmissionId), - party, - ) - // Wait for any ledgers that might adjust based on time skews - // This is done so that we can validate that the third command is accepted - _ <- delayForOffsetIfRequired(ledger, delay, ledger.features) - // Submit command again using the first offset as the deduplication offset - (_, _) <- submitRequestAndAssertAsyncDeduplication( - ledger, - updateWithFreshSubmissionId( - request.update( - _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset( - Ref.HexString.assertFromString(offset1.getAbsolute) - ) - ) - ), - acceptedSubmissionId, - offset1, - party, - ) - (offset3, _) <- submitRequestAndAssertCompletionAccepted( - ledger, - ledger.submitRequest(party, Dummy(party).create.command), - party, - ) - // Submit command again using the rejection offset as a deduplication period - _ <- submitRequestAndAssertCompletionAccepted( - ledger, - updateWithFreshSubmissionId( - request.update( - _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset( - Ref.HexString.assertFromString(offset3.getAbsolute) - ) - ) - ), - party, - ) - } yield {} - } - } - ) - - private def delayForOffsetIfRequired( - participantTestContext: ParticipantTestContext, - delayMechanism: DelayMechanism, - features: Features, - )(implicit ec: ExecutionContext): Future[Unit] = - features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport match { - case OffsetSupport.OFFSET_NATIVE_SUPPORT => - Future.unit - case OffsetSupport.OFFSET_CONVERT_TO_DURATION => - // the converted duration is calculated as the interval between submission time - // and offset record time + minSkew (used to determine maxRecordTime) - // - // the duration is extended with up to minSkew + maxSkew when using pre-execution, - // as we use maxRecordTime and minRecordTime to calculate the interval between the two commands - participantTestContext - .getTimeModel() - .flatMap(response => { - delayMechanism.delayBy( - response.getTimeModel.getMaxSkew.asScala + - 2 * response.getTimeModel.getMinSkew.asScala - ) - }) - case OffsetSupport.Unrecognized(_) | OffsetSupport.OFFSET_NOT_SUPPORTED => - Future.unit - } - - protected def assertPartyHasActiveContracts( - ledger: ParticipantTestContext, - party: Party, - noOfActiveContracts: Int, - )(implicit ec: ExecutionContext): Future[Unit] = { - ledger - .activeContracts(party) - .map(contracts => - assert( - contracts.length == noOfActiveContracts, - s"Expected $noOfActiveContracts active contracts for $party but found ${contracts.length} active contracts", - ) - ) - } - - protected def submitRequestAndAssertCompletionAccepted( - ledger: ParticipantTestContext, - request: SubmitRequest, - parties: Party* - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - submitRequestAndAssertCompletion(ledger, request, parties: _*) { completion => - assertCompletionStatus(request.toString, completion, Code.OK) - } - - protected def submitAndWaitRequestAndAssertCompletionAccepted( - ledger: ParticipantTestContext, - request: SubmitAndWaitRequest, - parties: Party* - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - submitAndWaitRequestAndAssertCompletion(ledger, request, parties: _*) { completion => - assertCompletionStatus(request.toString, completion, Code.OK) - } - - protected def submitRequestAndAssertDeduplication( - ledger: ParticipantTestContext, - request: SubmitRequest, - acceptedSubmissionId: SubmissionId, - acceptedOffset: LedgerOffset, - parties: Party* - )(implicit - ec: ExecutionContext - ): Future[Unit] = - if ( - ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationSupported - ) - submitRequestAndAssertSyncDeduplication(ledger, request, acceptedSubmissionId, acceptedOffset) - else - submitRequestAndAssertAsyncDeduplication( - ledger, - request, - acceptedSubmissionId, - acceptedOffset, - parties: _* - ).map(_ => ()) - - protected def submitRequestAndAssertSyncDeduplication( - ledger: ParticipantTestContext, - request: SubmitRequest, - acceptedSubmissionId: SubmissionId, - acceptedOffset: LedgerOffset, - )(implicit ec: ExecutionContext): Future[Unit] = - submitRequestAndAssertSyncFailure( - ledger, - request, - Code.ALREADY_EXISTS, - LedgerApiErrors.ConsistencyErrors.DuplicateCommand, - assertDeduplicatedSubmissionIdAndOffsetOnError( - acceptedSubmissionId, - acceptedOffset, - _, - ), - ) - - private def submitRequestAndAssertSyncFailure( - ledger: ParticipantTestContext, - request: SubmitRequest, - code: Code, - selfServiceErrorCode: ErrorCode, - additionalErrorAssertions: Throwable => Unit = _ => (), - )(implicit ec: ExecutionContext): Future[Unit] = - ledger - .submit(request) - .mustFail(s"Request expected to fail with code $code") - .map( - assertGrpcError( - ledger, - _, - code, - selfServiceErrorCode, - exceptionMessageSubstring = None, - checkDefiniteAnswerMetadata = true, - additionalErrorAssertions, - ) - ) - - protected def submitAndWaitRequestAndAssertDeduplication( - ledger: ParticipantTestContext, - request: SubmitAndWaitRequest, - acceptedSubmissionId: SubmissionId, - acceptedOffset: LedgerOffset, - )(implicit ec: ExecutionContext): Future[Unit] = - ledger - .submitAndWait(request) - .mustFail("Request was accepted but we were expecting it to fail with a duplicate error") - .map( - assertGrpcError( - ledger, - _, - expectedCode = Code.ALREADY_EXISTS, - selfServiceErrorCode = LedgerApiErrors.ConsistencyErrors.DuplicateCommand, - exceptionMessageSubstring = None, - checkDefiniteAnswerMetadata = true, - assertDeduplicatedSubmissionIdAndOffsetOnError( - acceptedSubmissionId, - acceptedOffset, - _, - ), - ) - ) - - protected def submitRequestAndAssertAsyncDeduplication( - ledger: ParticipantTestContext, - request: SubmitRequest, - acceptedSubmissionId: SubmissionId, - acceptedOffset: LedgerOffset, - parties: Party* - )(implicit ec: ExecutionContext): Future[OffsetWithCompletion] = - submitRequestAndAssertCompletion( - ledger, - request, - parties: _* - ) { completion => - assertCompletionStatus(request.toString, completion, Code.ALREADY_EXISTS) - assertDeduplicatedSubmissionIdAndOffsetOnCompletion( - acceptedSubmissionId, - acceptedOffset, - completion, - ) - } - - private def assertCompletionStatus( - requestString: String, - completion: Completion, - statusCode: Code, - ): Unit = - assert( - completion.getStatus.code == statusCode.value(), - s"""Expecting completion with status code $statusCode but completion has status ${completion.status}. - |Request: $requestString - |Completion: $completion - |Metadata: ${extractErrorInfoMetadata( - GrpcStatus.toJavaProto(completion.getStatus) - )}""".stripMargin, - ) - - private def assertDeduplicatedSubmissionIdAndOffsetOnError( - acceptedSubmissionId: SubmissionId, - acceptedCompletionOffset: LedgerOffset, - t: Throwable, - ): Unit = t match { - case exception: Exception => - val metadata = extractErrorInfoMetadata(exception) - assertExistingSubmissionIdOnMetadata(metadata, acceptedSubmissionId) - assertExistingCompletionOffsetOnMetadata(metadata, acceptedCompletionOffset) - case _ => () - } - - private def assertDeduplicatedSubmissionIdAndOffsetOnCompletion( - acceptedSubmissionId: SubmissionId, - acceptedCompletionOffset: LedgerOffset, - completion: Completion, - ): Unit = { - val metadata = extractErrorInfoMetadata( - GrpcStatus.toJavaProto(completion.getStatus) - ) - assertExistingSubmissionIdOnMetadata(metadata, acceptedSubmissionId) - assertExistingCompletionOffsetOnMetadata(metadata, acceptedCompletionOffset) - } - - private def assertExistingSubmissionIdOnMetadata( - metadata: util.Map[String, String], - acceptedSubmissionId: SubmissionId, - ): Unit = - Option(metadata.get("existing_submission_id")).foreach { metadataExistingSubmissionId => - assertEquals( - "submission ID mismatch", - metadataExistingSubmissionId, - acceptedSubmissionId, - ) - } - - private def assertExistingCompletionOffsetOnMetadata( - metadata: util.Map[String, String], - acceptedCompletionOffset: LedgerOffset, - ): Unit = - Option(metadata.get("completion_offset")).foreach { offset => - assertEquals( - "completion offset mismatch", - absoluteLedgerOffset(offset), - acceptedCompletionOffset, - ) - } - - private def submitRequestAndAssertCompletion( - ledger: ParticipantTestContext, - request: SubmitRequest, - parties: Party* - )( - additionalCompletionAssertion: Completion => Unit - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - submitRequestAndFindCompletion(ledger, request, parties: _*).map { case (offset, completion) => - additionalCompletionAssertion(completion) - offset -> completion - } - - private def submitAndWaitRequestAndAssertCompletion( - ledger: ParticipantTestContext, - request: SubmitAndWaitRequest, - parties: Party* - )( - additionalCompletionAssertion: Completion => Unit - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - submitRequestAndFindCompletion(ledger, request, parties: _*).map { case (offset, completion) => - additionalCompletionAssertion(completion) - offset -> completion - } - - protected def submitRequestAndFindCompletion( - ledger: ParticipantTestContext, - request: SubmitRequest, - parties: Party* - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - submitRequest(ledger)(request) - .flatMap(ledgerEnd => { - ledger - .findCompletion(ledger.completionStreamRequest(ledgerEnd)(parties: _*)) { completion => - request.commands.map(_.submissionId).contains(completion.submissionId) - } - .map(_.toList) - }) - .map { completions => - assertSingleton("Expected only one completion", completions) - } - - protected def submitRequestAndFindCompletion( - ledger: ParticipantTestContext, - request: SubmitAndWaitRequest, - parties: Party* - )(implicit - ec: ExecutionContext - ): Future[OffsetWithCompletion] = - ledger - .submitAndWait(request) - .flatMap { _ => - ledger - .findCompletion(ledger.completionStreamRequest()(parties: _*)) { completion => - request.commands.map(_.submissionId).contains(completion.submissionId) - } - .map(_.toList) - } - .map { completions => - assert(completions.head._1.getAbsolute.nonEmpty, "Expected a populated completion offset") - assertSingleton("Expected only one completion", completions) - } - - protected def submitRequest( - ledger: ParticipantTestContext - )( - request: SubmitRequest - )(implicit ec: ExecutionContext): Future[LedgerOffset] = for { - ledgerEnd <- ledger.currentEnd() - _ <- ledger.submit(request) - } yield { - ledgerEnd - } - - protected def scaledDuration(duration: FiniteDuration): FiniteDuration = asFiniteDuration( - duration * timeoutScaleFactor - ) - - protected def asFiniteDuration(duration: Duration): FiniteDuration = duration match { - case duration: FiniteDuration => duration - case _ => - throw new IllegalArgumentException(s"Invalid timeout scale factor: $timeoutScaleFactor") - } - - private def absoluteLedgerOffset(value: String) = - LedgerOffset(LedgerOffset.Value.Absolute(value)) - - private def updateSubmissionId( - request: SubmitRequest, - submissionId: SubmissionId, - ): SubmitRequest = - request.update(_.commands.submissionId := submissionId) - - private def updateSubmissionId( - request: SubmitAndWaitRequest, - acceptedSubmissionId1: SubmissionId, - ): SubmitAndWaitRequest = - request.update(_.commands.submissionId := acceptedSubmissionId1) - - private def updateWithFreshSubmissionId(request: SubmitRequest): SubmitRequest = - request.update(_.commands.submissionId := newSubmissionId()) - - private def updateWithFreshSubmissionId(request: SubmitAndWaitRequest): SubmitAndWaitRequest = - request.update(_.commands.submissionId := newSubmissionId()) - - private def newSubmissionId(): SubmissionId = SubmissionIdGenerator.Random.generate() -} - -object CommandDeduplicationBase { - type OffsetWithCompletion = (LedgerOffset, Completion) - - trait DelayMechanism { - val deduplicationDuration: Duration - val extraWait: Duration - - /** Delay by the guaranteed full deduplication period. After calling this method any duplicate calls should succeed - */ - def delayForEntireDeduplicationPeriod(): Future[Unit] = - delayBy(deduplicationDuration + extraWait) - - def delayBy(duration: Duration): Future[Unit] - } - - class TimeDelayMechanism( - val deduplicationDuration: Duration, - val extraWait: Duration, - )(implicit ec: ExecutionContext) - extends DelayMechanism { - override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(()) - } - - class StaticTimeDelayMechanism( - ledger: ParticipantTestContext, - val deduplicationDuration: Duration, - val extraWait: Duration, - )(implicit ec: ExecutionContext) - extends DelayMechanism { - override def delayBy(duration: Duration): Future[Unit] = - ledger - .time() - .flatMap { currentTime => - ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis)) - } - } - -} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala index 8b73db1176e8..2fa27b914e3b 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala @@ -679,19 +679,20 @@ private[testtool] final class ParticipantTestContext private[participant] ( def findCompletion( request: CompletionStreamRequest - )(p: Completion => Boolean): Future[Option[(LedgerOffset, Completion)]] = + )(p: Completion => Boolean): Future[Option[CompletionResponse]] = new StreamConsumer[CompletionStreamResponse]( services.commandCompletion.completionStream(request, _) ).find(_.completions.exists(p)) - .map(response => - response.checkpoint - .flatMap(_.offset) - .flatMap(offset => response.completions.find(p).map(offset -> _)) - ) + .map(response => { + val checkpoint = response.getCheckpoint + response.completions + .find(p) + .map(CompletionResponse(_, checkpoint.getOffset, checkpoint.getRecordTime.asJava)) + }) def findCompletion(parties: Party*)( p: Completion => Boolean - ): Future[Option[(LedgerOffset, Completion)]] = + ): Future[Option[CompletionResponse]] = findCompletion(completionStreamRequest()(parties: _*))(p) def checkpoints(n: Int, request: CompletionStreamRequest): Future[Vector[Checkpoint]] = @@ -798,3 +799,5 @@ private[testtool] final class ParticipantTestContext private[participant] ( private def reservePartyNames(n: Int): Future[Vector[Party]] = Future.successful(Vector.fill(n)(Party(nextPartyHintId()))) } + +case class CompletionResponse(completion: Completion, offset: LedgerOffset, recordTime: Instant) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index c7a22391a918..c43315152065 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -3,35 +3,987 @@ package com.daml.ledger.api.testtool.suites -import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase -import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.{ +import java.{time, util} + +import com.daml.error.ErrorCode +import com.daml.error.definitions.LedgerApiErrors +import com.daml.grpc.GrpcStatus +import com.daml.ledger.api.SubmissionIdGenerator +import com.daml.ledger.api.testtool.infrastructure.Allocation._ +import com.daml.ledger.api.testtool.infrastructure.Assertions._ +import com.daml.ledger.api.testtool.infrastructure.FutureAssertions._ +import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite +import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.participant.{ + CompletionResponse, + Features, + ParticipantTestContext, +} +import com.daml.ledger.api.testtool.suites.CommandDeduplicationIT.{ DelayMechanism, + StaticTimeDelayMechanism, TimeDelayMechanism, } -import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.ledger.api.v1.admin.config_management_service.TimeModel +import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest +import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.api.v1.completion.Completion +import com.daml.ledger.api.v1.completion.Completion.{ + DeduplicationPeriod => CompletionDeduplicationPeriod +} +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset +import com.daml.ledger.api.v1.version_service.DeduplicationPeriodSupport.OffsetSupport +import com.daml.ledger.client.binding.Primitive.Party +import com.daml.ledger.test.model.DA.Types.Tuple2 +import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, TextKeyOperations} +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.{LedgerString, SubmissionId} +import com.daml.logging.LoggingContext +import com.daml.timer.Delayed +import io.grpc.Status.Code +import org.slf4j.{Logger, LoggerFactory} -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal +import scala.util.{Failure, Success} -/** Command deduplication tests for participant side deduplication - * Should be disabled for ledgers that have committer side deduplication enabled (KV) - */ final class CommandDeduplicationIT( timeoutScaleFactor: Double, - ledgerTimeInterval: FiniteDuration, staticTime: Boolean, -) extends CommandDeduplicationBase( - timeoutScaleFactor, - ledgerTimeInterval, - staticTime, - ) { - - override def runWithDeduplicationDelay( - participants: Seq[ParticipantTestContext] +) extends LedgerTestSuite { + + private[this] val logger: Logger = LoggerFactory.getLogger(getClass.getName) + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private val deduplicationDuration: FiniteDuration = scaledDuration(2.seconds) + + test( + s"SimpleDeduplicationBasic", + "Deduplicate commands within the deduplication duration window", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + val request = ledger + .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) + .update( + _.commands.deduplicationPeriod := + DeduplicationPeriod.DeduplicationDuration(deduplicationDuration.asProtobuf) + ) + val acceptedSubmissionId1 = newSubmissionId() + for { + // Submit command (first deduplication window) + // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, + // only one submission is therefore sent to the ledger. + response <- submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(request, acceptedSubmissionId1), + party, + ) + _ <- submitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(request), + acceptedSubmissionId1, + response.offset, + party, + ) + // Inspect created contracts + _ <- assertPartyHasActiveContracts( + ledger, + party, + noOfActiveContracts = 1, + ) + } yield { + assert( + response.completion.commandId == request.commands.get.commandId, + "The command ID of the first completion does not match the command ID of the submission", + ) + } + }) + + test( + s"StopOnSubmissionFailure", + "Stop deduplicating commands on submission failure", + allocate(TwoParties), + )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => + // Do not set the deduplication timeout. + // The server will default to the maximum possible deduplication timeout. + val requestA = ledger.submitRequest(alice, Dummy(bob).create.command) + + for { + // Submit an invalid command (should fail with INVALID_ARGUMENT) + _ <- submitRequestAndAssertSyncFailure( + ledger, + requestA, + Code.INVALID_ARGUMENT, + LedgerApiErrors.CommandExecution.Interpreter.AuthorizationError, + ) + + // Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS) + _ <- submitRequestAndAssertSyncFailure( + ledger, + updateWithFreshSubmissionId(requestA), + Code.INVALID_ARGUMENT, + LedgerApiErrors.CommandExecution.Interpreter.AuthorizationError, + ) + } yield {} + }) + + test( + s"StopOnCompletionFailure", + "Stop deduplicating commands on completion failure", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + val key = ledger.nextKeyId() + + for { + // Create a helper and a text key + ko <- ledger.create(party, TextKeyOperations(party)) + _ <- ledger.create(party, TextKey(party, key, List())) + + // Create two competing requests + requestA = ledger.submitAndWaitRequest( + party, + ko.exerciseTKOFetchAndRecreate(party, Tuple2(party, key)).command, + ) + requestB = ledger.submitAndWaitRequest( + party, + ko.exerciseTKOFetchAndRecreate(party, Tuple2(party, key)).command, + ) + + // Submit both requests in parallel. + // Either both succeed (if one transaction is recorded faster than the other submission starts command interpretation, unlikely) + // Or one submission is rejected (if one transaction is recorded during the call of lookupMaximumLedgerTime() in [[LedgerTimeHelper]], unlikely) + // Or one transaction is rejected (this is what we want to test) + submissionResults <- Future.traverse(List(requestA, requestB))(request => + ledger.submitAndWait(request).transform(result => Success(request -> result)) + ) + + // Resubmit a failed command. + // No matter what the rejection reason was (hopefully it was a rejected transaction), + // a resubmission of exactly the same command should succeed. + _ <- submissionResults + .collectFirst { case (request, Failure(_)) => request } + .fold(Future.unit)(request => ledger.submitAndWait(updateWithFreshSubmissionId(request))) + } yield { + () + } + }) + + test( + s"SimpleDeduplicationCommandClient", + "Deduplicate commands within the deduplication time window using the command client", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + val request = ledger + .submitAndWaitRequest(party, Dummy(party).create.command) + .update( + _.commands.deduplicationTime := deduplicationDuration.asProtobuf + ) + val acceptedSubmissionId1 = newSubmissionId() + for { + // Submit command (first deduplication window) + response <- submitAndWaitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(request, acceptedSubmissionId1), + party, + ) + _ <- submitAndWaitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(request), + acceptedSubmissionId1, + response.offset, + ) + + // Inspect created contract + _ <- assertPartyHasActiveContracts( + ledger, + party, + noOfActiveContracts = 1, + ) + } yield {} + }) + + // staticTime - we run calls in parallel and with static time we would need to advance the time, + // therefore this cannot be run in static time + testGivenAllParticipants( + "DeduplicationMixedClients", + "Deduplicate commands within the deduplication time window using the command client and the command submission client", + allocate(Parties(16)), + enabled = _ => !staticTime, + disabledReason = "Cannot work in static time as we run multiple test cases in parallel", + runConcurrently = false, // updates the time model + timeoutScale = 3, + )(implicit ec => + configuredParticipants => { case Participants(Participant(ledger, parties @ _*)) => + def generateVariations(elements: List[List[Boolean]]): List[List[Boolean]] = + elements match { + case Nil => List(Nil) + case currentElement :: tail => + currentElement.flatMap(value => generateVariations(tail).map(value :: _)) + } + + runWithTimeModel(configuredParticipants) { delay => + val numberOfCalls = 4 + // cover all the different generated variations of submit and submitAndWait + val allGeneratedVariations = + generateVariations(List.fill(numberOfCalls)(List(true, false))).zip(parties) + forAllParallel(allGeneratedVariations) { + case (firstCall :: secondCall :: thirdCall :: fourthCall :: Nil, party) => + mixedClientsCommandDeduplicationTestCase(ledger, party, delay)( + firstCall, + secondCall, + thirdCall, + fourthCall, + ) + case _ => throw new IllegalArgumentException("Wrong call list constructed") + } + .map(_ => ()) + } + } + ) + + private def mixedClientsCommandDeduplicationTestCase( + ledger: ParticipantTestContext, + party: Party, + delay: DelayMechanism, + )(firstCall: Boolean, secondCall: Boolean, thirdCall: Boolean, fourthCall: Boolean)(implicit + ec: ExecutionContext + ) = { + val submitAndWaitRequest = ledger + .submitAndWaitRequest(party, Dummy(party).create.command) + .update( + _.commands.deduplicationTime := deduplicationDuration.asProtobuf + ) + val submitRequest = ledger + .submitRequest(party, Dummy(party).create.command) + .update( + _.commands.commandId := submitAndWaitRequest.getCommands.commandId, + _.commands.deduplicationTime := deduplicationDuration.asProtobuf, + ) + + def submitAndAssertAccepted( + submitAndWait: Boolean + ): Future[CompletionResponse] = { + val acceptedSubmissionId: SubmissionId = newSubmissionId() + if (submitAndWait) + submitAndWaitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(submitAndWaitRequest, acceptedSubmissionId), + party, + ) + else + submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(submitRequest, acceptedSubmissionId), + party, + ) + } + + def submitAndAssertDeduplicated( + submitAndWait: Boolean, + acceptedSubmissionId: SubmissionId, + acceptedLedgerOffset: LedgerOffset, + ): Future[Option[Completion]] = + if (submitAndWait) + submitAndWaitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(submitAndWaitRequest), + acceptedSubmissionId, + acceptedLedgerOffset, + ).map(_ => None) + else + submitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(submitRequest), + acceptedSubmissionId, + acceptedLedgerOffset, + party, + ) + + for { + // Submit command (first deduplication window) + firstAcceptedCommand <- submitAndAssertAccepted(firstCall) + duplicateResponse <- submitAndAssertDeduplicated( + secondCall, + LedgerString.assertFromString(firstAcceptedCommand.completion.submissionId), + firstAcceptedCommand.offset, + ) + deduplicationDurationFromPeriod = duplicateResponse + .map(_.deduplicationPeriod) + .map { + case CompletionDeduplicationPeriod.Empty => + throw new IllegalStateException("received empty completion") + case CompletionDeduplicationPeriod.DeduplicationOffset(_) => + deduplicationDuration + case CompletionDeduplicationPeriod.DeduplicationDuration(value) => + value.asScala + } + .getOrElse(deduplicationDuration + delay.skews) + .asInstanceOf[FiniteDuration] + eventuallyAccepted <- succeedsEventually( + maxRetryDuration = deduplicationDurationFromPeriod + delay.skews + 10.seconds, + description = + s"Deduplication period expires and request is accepted for command ${submitRequest.getCommands}.", + ) { + submitAndAssertAccepted(thirdCall) + } + _ = if ( // participant deduplication is based on submittedAt, and thus the delta between record times can actually be smaller than the deduplication duration + !ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationSupported + ) + assert( + time.Duration + .between(firstAcceptedCommand.recordTime, eventuallyAccepted.recordTime) + .toMillis > deduplicationDuration.toMillis, + s"Interval between accepted commands is smaller than the deduplication duration. First accepted command record time: ${firstAcceptedCommand.recordTime}. Second accepted command record time: ${eventuallyAccepted.recordTime}", + ) + _ <- submitAndAssertDeduplicated( + fourthCall, + LedgerString.assertFromString(eventuallyAccepted.completion.submissionId), + eventuallyAccepted.offset, + ) + _ <- assertPartyHasActiveContracts( + ledger, + party = party, + noOfActiveContracts = 2, + ) + } yield {} + } + + test( + "DeduplicateSubmitterBasic", + "Commands with identical submitter and command identifier should be deduplicated by the submission client", + allocate(TwoParties), + )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => + val aliceRequest = ledger.submitRequest(alice, Dummy(alice).create.command) + val bobRequest = ledger + .submitRequest(bob, Dummy(bob).create.command) + .update(_.commands.commandId := aliceRequest.getCommands.commandId) + + val aliceAcceptedSubmissionId = newSubmissionId() + val bobAcceptedSubmissionId = newSubmissionId() + + for { + // Submit a command as alice + aliceResponse <- submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(aliceRequest, aliceAcceptedSubmissionId), + alice, + ) + _ <- submitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(aliceRequest), + aliceAcceptedSubmissionId, + aliceResponse.offset, + alice, + ) + + // Submit another command that uses same commandId, but is submitted by Bob + bobResponse <- submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(bobRequest, bobAcceptedSubmissionId), + bob, + ) + _ <- submitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(bobRequest), + bobAcceptedSubmissionId, + bobResponse.offset, + bob, + ) + _ <- assertPartyHasActiveContracts( + ledger, + party = alice, + noOfActiveContracts = 1, + ) + _ <- assertPartyHasActiveContracts( + ledger, + party = bob, + noOfActiveContracts = 1, + ) + } yield {} + }) + + test( + "DeduplicateSubmitterCommandClient", + "Commands with identical submitter and command identifier should be deduplicated by the command client", + allocate(TwoParties), + )(implicit ec => { case Participants(Participant(ledger, alice, bob)) => + val aliceRequest = ledger.submitAndWaitRequest(alice, Dummy(alice).create.command) + val bobRequest = ledger + .submitAndWaitRequest(bob, Dummy(bob).create.command) + .update(_.commands.commandId := aliceRequest.getCommands.commandId) + + val aliceAcceptedSubmissionId = newSubmissionId() + val bobAcceptedSubmissionId = newSubmissionId() + for { + // Submit a command as alice + aliceResponse <- submitAndWaitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(aliceRequest, aliceAcceptedSubmissionId), + alice, + ) + _ <- submitAndWaitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(aliceRequest), + aliceAcceptedSubmissionId, + aliceResponse.offset, + ) + + // Submit another command that uses same commandId, but is submitted by Bob + bobReponse <- submitAndWaitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(bobRequest, bobAcceptedSubmissionId), + bob, + ) + _ <- submitAndWaitRequestAndAssertDeduplication( + ledger, + updateWithFreshSubmissionId(bobRequest), + bobAcceptedSubmissionId, + bobReponse.offset, + ) + // Inspect the ledger state + _ <- assertPartyHasActiveContracts( + ledger, + party = alice, + noOfActiveContracts = 1, + ) + _ <- assertPartyHasActiveContracts( + ledger, + party = bob, + noOfActiveContracts = 1, + ) + } yield {} + }) + + testGivenAllParticipants( + "DeduplicateUsingOffsets", + "Deduplicate commands within the deduplication period defined by the offset", + allocate(SingleParty), + enabled = + !_.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport.isOffsetNotSupported, + disabledReason = "Deduplication periods represented by offsets are not supported", + runConcurrently = false, // updates the time model + )(implicit ec => + configuredParticipants => { case Participants(Participant(ledger, party)) => + val request = ledger + .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) + val acceptedSubmissionId = newSubmissionId() + runWithTimeModel(configuredParticipants) { delay => + for { + response <- submitRequestAndAssertCompletionAccepted( + ledger, + updateSubmissionId(request, acceptedSubmissionId), + party, + ) + // Wait for any ledgers that might adjust based on time skews + // This is done so that we can validate that the third command is accepted + _ <- delayForOffsetIfRequired(ledger, delay, ledger.features) + // Submit command again using the first offset as the deduplication offset + _ <- submitRequestAndAssertAsyncDeduplication( + ledger, + updateWithFreshSubmissionId( + request.update( + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset( + Ref.HexString.assertFromString(response.offset.getAbsolute) + ) + ) + ), + acceptedSubmissionId, + response.offset, + party, + ) + response3 <- submitRequestAndAssertCompletionAccepted( + ledger, + ledger.submitRequest(party, Dummy(party).create.command), + party, + ) + // Submit command again using the rejection offset as a deduplication period + _ <- submitRequestAndAssertCompletionAccepted( + ledger, + updateWithFreshSubmissionId( + request.update( + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationOffset( + Ref.HexString.assertFromString(response3.offset.getAbsolute) + ) + ) + ), + party, + ) + } yield {} + } + } + ) + + private def delayForOffsetIfRequired( + participantTestContext: ParticipantTestContext, + delayMechanism: DelayMechanism, + features: Features, + )(implicit ec: ExecutionContext): Future[Unit] = + features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport match { + case OffsetSupport.OFFSET_NATIVE_SUPPORT => + Future.unit + case OffsetSupport.OFFSET_CONVERT_TO_DURATION => + // the converted duration is calculated as the interval between submission time + // and offset record time + minSkew (used to determine maxRecordTime) + // + // the duration is extended with up to minSkew + maxSkew when using pre-execution, + // as we use maxRecordTime and minRecordTime to calculate the interval between the two commands + participantTestContext + .getTimeModel() + .flatMap(response => { + delayMechanism.delayBy( + response.getTimeModel.getMaxSkew.asScala + + 2 * response.getTimeModel.getMinSkew.asScala + ) + }) + case OffsetSupport.Unrecognized(_) | OffsetSupport.OFFSET_NOT_SUPPORTED => + Future.unit + } + + protected def assertPartyHasActiveContracts( + ledger: ParticipantTestContext, + party: Party, + noOfActiveContracts: Int, + )(implicit ec: ExecutionContext): Future[Unit] = { + ledger + .activeContracts(party) + .map(contracts => + assert( + contracts.length == noOfActiveContracts, + s"Expected $noOfActiveContracts active contracts for $party but found ${contracts.length} active contracts", + ) + ) + } + + protected def submitRequestAndAssertCompletionAccepted( + ledger: ParticipantTestContext, + request: SubmitRequest, + parties: Party* + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + submitRequestAndAssertCompletion(ledger, request, parties: _*) { completion => + assertCompletionStatus(request.toString, completion, Code.OK) + } + + protected def submitAndWaitRequestAndAssertCompletionAccepted( + ledger: ParticipantTestContext, + request: SubmitAndWaitRequest, + parties: Party* + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + submitAndWaitRequestAndAssertCompletion(ledger, request, parties: _*) { completion => + assertCompletionStatus(request.toString, completion, Code.OK) + } + + protected def submitRequestAndAssertDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + acceptedSubmissionId: SubmissionId, + acceptedOffset: LedgerOffset, + parties: Party* + )(implicit + ec: ExecutionContext + ): Future[Option[Completion]] = + if ( + ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationSupported + ) + submitRequestAndAssertSyncDeduplication(ledger, request, acceptedSubmissionId, acceptedOffset) + .map(_ => None) + else + submitRequestAndAssertAsyncDeduplication( + ledger, + request, + acceptedSubmissionId, + acceptedOffset, + parties: _* + ).map(response => Some(response.completion)) + + protected def submitRequestAndAssertSyncDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + acceptedSubmissionId: SubmissionId, + acceptedOffset: LedgerOffset, + )(implicit ec: ExecutionContext): Future[Unit] = + submitRequestAndAssertSyncFailure( + ledger, + request, + Code.ALREADY_EXISTS, + LedgerApiErrors.ConsistencyErrors.DuplicateCommand, + assertDeduplicatedSubmissionIdAndOffsetOnError( + acceptedSubmissionId, + acceptedOffset, + _, + ), + ) + + private def submitRequestAndAssertSyncFailure( + ledger: ParticipantTestContext, + request: SubmitRequest, + code: Code, + selfServiceErrorCode: ErrorCode, + additionalErrorAssertions: Throwable => Unit = _ => (), + )(implicit ec: ExecutionContext): Future[Unit] = + ledger + .submit(request) + .mustFail(s"Request expected to fail with code $code") + .map( + assertGrpcError( + ledger, + _, + code, + selfServiceErrorCode, + exceptionMessageSubstring = None, + checkDefiniteAnswerMetadata = true, + additionalErrorAssertions, + ) + ) + + protected def submitAndWaitRequestAndAssertDeduplication( + ledger: ParticipantTestContext, + request: SubmitAndWaitRequest, + acceptedSubmissionId: SubmissionId, + acceptedOffset: LedgerOffset, + )(implicit ec: ExecutionContext): Future[Unit] = + ledger + .submitAndWaitForTransaction(request) + .mustFail("Request was accepted but we were expecting it to fail with a duplicate error") + .map( + assertGrpcError( + ledger, + _, + expectedCode = Code.ALREADY_EXISTS, + selfServiceErrorCode = LedgerApiErrors.ConsistencyErrors.DuplicateCommand, + exceptionMessageSubstring = None, + checkDefiniteAnswerMetadata = true, + assertDeduplicatedSubmissionIdAndOffsetOnError( + acceptedSubmissionId, + acceptedOffset, + _, + ), + ) + ) + + protected def submitRequestAndAssertAsyncDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + acceptedSubmissionId: SubmissionId, + acceptedOffset: LedgerOffset, + parties: Party* + )(implicit ec: ExecutionContext): Future[CompletionResponse] = + submitRequestAndAssertCompletion( + ledger, + request, + parties: _* + ) { completion => + assertCompletionStatus(request.toString, completion, Code.ALREADY_EXISTS) + assertDeduplicatedSubmissionIdAndOffsetOnCompletion( + acceptedSubmissionId, + acceptedOffset, + completion, + ) + } + + private def assertCompletionStatus( + requestString: String, + response: CompletionResponse, + statusCode: Code, + ): Unit = + assert( + response.completion.getStatus.code == statusCode.value(), + s"""Expecting completion with status code $statusCode but completion has status ${response.completion.status}. + |Request: $requestString + |Response: $response + |Metadata: ${extractErrorInfoMetadata( + GrpcStatus.toJavaProto(response.completion.getStatus) + )}""".stripMargin, + ) + + private def assertDeduplicatedSubmissionIdAndOffsetOnError( + acceptedSubmissionId: SubmissionId, + acceptedCompletionOffset: LedgerOffset, + t: Throwable, + ): Unit = t match { + case exception: Exception => + val metadata = extractErrorInfoMetadata(exception) + assertExistingSubmissionIdOnMetadata(metadata, acceptedSubmissionId) + assertExistingCompletionOffsetOnMetadata(metadata, acceptedCompletionOffset) + case _ => () + } + + private def assertDeduplicatedSubmissionIdAndOffsetOnCompletion( + acceptedSubmissionId: SubmissionId, + acceptedCompletionOffset: LedgerOffset, + response: CompletionResponse, + ): Unit = { + val metadata = extractErrorInfoMetadata( + GrpcStatus.toJavaProto(response.completion.getStatus) + ) + assertExistingSubmissionIdOnMetadata(metadata, acceptedSubmissionId) + assertExistingCompletionOffsetOnMetadata(metadata, acceptedCompletionOffset) + } + + private def assertExistingSubmissionIdOnMetadata( + metadata: util.Map[String, String], + acceptedSubmissionId: SubmissionId, + ): Unit = + Option(metadata.get("existing_submission_id")).foreach { metadataExistingSubmissionId => + assertEquals( + "submission ID mismatch", + metadataExistingSubmissionId, + acceptedSubmissionId, + ) + } + + private def assertExistingCompletionOffsetOnMetadata( + metadata: util.Map[String, String], + acceptedCompletionOffset: LedgerOffset, + ): Unit = + Option(metadata.get("completion_offset")).foreach { offset => + assertEquals( + "completion offset mismatch", + absoluteLedgerOffset(offset), + acceptedCompletionOffset, + ) + } + + private def submitRequestAndAssertCompletion( + ledger: ParticipantTestContext, + request: SubmitRequest, + parties: Party* )( + additionalCompletionAssertion: CompletionResponse => Unit + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + submitRequestAndFindCompletion(ledger, request, parties: _*).map { response => + additionalCompletionAssertion(response) + response + } + + private def submitAndWaitRequestAndAssertCompletion( + ledger: ParticipantTestContext, + request: SubmitAndWaitRequest, + parties: Party* + )( + additionalCompletionAssertion: CompletionResponse => Unit + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + submitRequestAndFindCompletion(ledger, request, parties: _*).map { response => + additionalCompletionAssertion(response) + response + } + + protected def submitRequestAndFindCompletion( + ledger: ParticipantTestContext, + request: SubmitRequest, + parties: Party* + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + submitRequest(ledger)(request) + .flatMap(ledgerEnd => { + ledger + .findCompletion(ledger.completionStreamRequest(ledgerEnd)(parties: _*)) { completion => + request.commands.map(_.submissionId).contains(completion.submissionId) + } + .map(_.toList) + }) + .map { completions => + assertSingleton("Expected only one completion", completions) + } + + protected def submitRequestAndFindCompletion( + ledger: ParticipantTestContext, + request: SubmitAndWaitRequest, + parties: Party* + )(implicit + ec: ExecutionContext + ): Future[CompletionResponse] = + ledger + .submitAndWait(request) + .flatMap { _ => + ledger + .findCompletion(ledger.completionStreamRequest()(parties: _*)) { completion => + request.commands.map(_.submissionId).contains(completion.submissionId) + } + .map(_.toList) + } + .map { completions => + assert( + completions.head.offset.getAbsolute.nonEmpty, + "Expected a populated completion offset", + ) + assertSingleton("Expected only one completion", completions) + } + + protected def submitRequest( + ledger: ParticipantTestContext + )( + request: SubmitRequest + )(implicit ec: ExecutionContext): Future[LedgerOffset] = for { + ledgerEnd <- ledger.currentEnd() + _ <- ledger.submit(request) + } yield { + ledgerEnd + } + + protected def scaledDuration(duration: FiniteDuration): FiniteDuration = asFiniteDuration( + duration * timeoutScaleFactor + ) + + protected def asFiniteDuration(duration: Duration): FiniteDuration = duration match { + case duration: FiniteDuration => duration + case _ => + throw new IllegalArgumentException(s"Invalid timeout scale factor: $timeoutScaleFactor") + } + + private def absoluteLedgerOffset(value: String) = + LedgerOffset(LedgerOffset.Value.Absolute(value)) + + private def updateSubmissionId( + request: SubmitRequest, + submissionId: SubmissionId, + ): SubmitRequest = + request.update(_.commands.submissionId := submissionId) + + private def updateSubmissionId( + request: SubmitAndWaitRequest, + acceptedSubmissionId1: SubmissionId, + ): SubmitAndWaitRequest = + request.update(_.commands.submissionId := acceptedSubmissionId1) + + private def updateWithFreshSubmissionId(request: SubmitRequest): SubmitRequest = + request.update(_.commands.submissionId := newSubmissionId()) + + private def updateWithFreshSubmissionId(request: SubmitAndWaitRequest): SubmitAndWaitRequest = + request.update(_.commands.submissionId := newSubmissionId()) + + private def newSubmissionId(): SubmissionId = SubmissionIdGenerator.Random.generate() + + private def runWithTimeModel(participants: Seq[ParticipantTestContext])( testWithDelayMechanism: DelayMechanism => Future[Unit] - )(implicit ec: ExecutionContext): Future[Unit] = - testWithDelayMechanism(new TimeDelayMechanism(deduplicationDuration, ledgerWaitInterval)) + )(implicit ec: ExecutionContext): Future[Unit] = { + // deduplication duration is adjusted by min skew and max skew when running using pre-execution + // to account for this we adjust the time model + val skew = scaledDuration(3.second).asProtobuf + runWithUpdatedTimeModel( + participants, + _.update(_.minSkew := skew, _.maxSkew := skew), + ) { timeModel => + val anyParticipant = participants.head + val skews = asFiniteDuration(timeModel.getMinSkew.asScala + timeModel.getMaxSkew.asScala) + testWithDelayMechanism(delayMechanism(anyParticipant, skews)) + } + } + + private def delayMechanism(ledger: ParticipantTestContext, skews: FiniteDuration)(implicit + ec: ExecutionContext + ) = { + if (staticTime) { + new StaticTimeDelayMechanism(ledger, skews) + } else { + new TimeDelayMechanism(skews) + } + } + + private def runWithUpdatedTimeModel( + participants: Seq[ParticipantTestContext], + timeModelUpdate: TimeModel => TimeModel, + )(test: TimeModel => Future[Unit])(implicit ec: ExecutionContext): Future[Unit] = { + val anyParticipant = participants.head + anyParticipant + .getTimeModel() + .flatMap(timeModel => { + def restoreTimeModel(participant: ParticipantTestContext) = { + val ledgerTimeModelRestoreResult = for { + time <- participant.time() + // retrieve current configuration generation, which can be updated by the test + currentConfigurationGeneration <- participant + .getTimeModel() + .map(_.configurationGeneration) + _ <- participant + .setTimeModel( + time.plusSeconds(1), + currentConfigurationGeneration, + timeModel.getTimeModel, + ) + } yield {} + ledgerTimeModelRestoreResult.recover { case NonFatal(exception) => + logger.warn("Failed to restore time model for ledger", exception) + () + } + } + + for { + time <- anyParticipant.time() + updatedModel = timeModelUpdate(timeModel.getTimeModel) + (timeModelForTest, participantThatDidTheUpdate) <- tryTimeModelUpdateOnAllParticipants( + participants, + _.setTimeModel( + time.plusSeconds(1), + timeModel.configurationGeneration, + updatedModel, + ).map(_ => updatedModel), + ).recover { case NonFatal(exception) => + logger.warn( + "Failed to update time model for test. WIll run test with already configured time model", + exception, + ) + timeModel.getTimeModel -> anyParticipant + } + _ <- test(timeModelForTest) + .transformWith(testResult => + restoreTimeModel(participantThatDidTheUpdate).transform(_ => testResult) + ) + } yield {} + }) + } + + /** Try to run the update sequentially on all the participants. + * The function returns the first success or the last failure of the update operation. + * Useful for updating the configuration when we don't know which participant can update the config, + * as only the first one that submitted the initial configuration has the permissions to do so. + */ + private def tryTimeModelUpdateOnAllParticipants( + participants: Seq[ParticipantTestContext], + timeModelUpdate: ParticipantTestContext => Future[TimeModel], + )(implicit ec: ExecutionContext): Future[(TimeModel, ParticipantTestContext)] = { + participants.foldLeft( + Future.failed[(TimeModel, ParticipantTestContext)]( + new IllegalStateException("No participant") + ) + ) { (result, participant) => + result.recoverWith { case NonFatal(_) => + timeModelUpdate(participant).map(_ -> participant) + } + } + } +} + +object CommandDeduplicationIT { + + trait DelayMechanism { + val skews: FiniteDuration + def delayBy(duration: Duration): Future[Unit] + + } + + class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext) + extends DelayMechanism { + override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(()) + } - override def testNamingPrefix: String = "ParticipantCommandDeduplication" + class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit + ec: ExecutionContext + ) extends DelayMechanism { + override def delayBy(duration: Duration): Future[Unit] = + ledger + .time() + .flatMap { currentTime => + ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis)) + } + } } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala index eb2acc6c445a..29df8cc9615c 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala @@ -39,7 +39,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { private val numberOfParallelRequests = 10 test( - s"DeduplicateParallelSubmissionsUsingCommandSubmissionService", + "DeduplicateParallelSubmissionsUsingCommandSubmissionService", "Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated", allocate(SingleParty), )(implicit ec => { case Participants(Participant(ledger, party)) => @@ -52,7 +52,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { }) test( - s"DeduplicateParallelSubmissionsUsingCommandService", + "DeduplicateParallelSubmissionsUsingCommandService", "Commands submitted at the same, in parallel, using the CommandService, should be deduplicated", allocate(SingleParty), runConcurrently = false, @@ -66,7 +66,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { }) test( - s"DeduplicateParallelSubmissionsUsingMixedCommandServiceAndCommandSubmissionService", + "DeduplicateParallelSubmissionsUsingMixedCommandServiceAndCommandSubmissionService", "Commands submitted at the same, in parallel, using the CommandService and the CommandSubmissionService, should be deduplicated", allocate(SingleParty), runConcurrently = false, @@ -182,8 +182,8 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { completion.submissionId == submissionId }) .map { - case Some((_, completion)) => - Status.fromCodeValue(completion.getStatus.code).getCode + case Some(response) => + Status.fromCodeValue(response.completion.getStatus.code).getCode case None => fail(s"Did not find completion for request with submission id $submissionId") } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CompletionDeduplicationInfoIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CompletionDeduplicationInfoIT.scala index 0fb78073cca7..34cbade59060 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CompletionDeduplicationInfoIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CompletionDeduplicationInfoIT.scala @@ -148,7 +148,7 @@ private[testtool] object CompletionDeduplicationInfoIT { WithTimeout(5.seconds)( ledger .findCompletion(ledger.completionStreamRequest(offset)(party))(_ => true) - .map(_.map(_._2)) + .map(_.map(_.completion)) ) private def assertSubmissionIdIsPreserved( diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala deleted file mode 100644 index f3b02f46d7d8..000000000000 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.api.testtool.suites - -import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ -import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase -import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.{ - DelayMechanism, - StaticTimeDelayMechanism, - TimeDelayMechanism, -} -import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext -import com.daml.ledger.api.v1.admin.config_management_service.TimeModel -import org.slf4j.LoggerFactory - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal - -/** Command deduplication tests for KV ledgers - * KV ledgers have committer side deduplication. - * The committer side deduplication period adds `minSkew` to the deduplication period, so we have to account for that as well. - * If updating the time model fails then the tests will assume a `minSkew` of 1 second. - */ -class KVCommandDeduplicationIT( - timeoutScaleFactor: Double, - ledgerTimeInterval: FiniteDuration, - staticTime: Boolean, -) extends CommandDeduplicationBase( - timeoutScaleFactor, - ledgerTimeInterval, - staticTime, - ) { - - private[this] val logger = LoggerFactory.getLogger(getClass.getName) - - override def testNamingPrefix: String = "KVCommandDeduplication" - - protected override def runWithDeduplicationDelay( - participants: Seq[ParticipantTestContext] - )( - testWithDelayMechanism: DelayMechanism => Future[Unit] - )(implicit ec: ExecutionContext): Future[Unit] = { - runWithConfig(participants) { extraWait => - val anyParticipant = participants.head - testWithDelayMechanism(delayMechanism(anyParticipant, extraWait)) - } - } - - private def delayMechanism( - ledger: ParticipantTestContext, - extraWait: Duration, - )(implicit ec: ExecutionContext) = { - if (staticTime) { - new StaticTimeDelayMechanism(ledger, deduplicationDuration, extraWait) - } else { - new TimeDelayMechanism(deduplicationDuration, extraWait) - } - } - - private def runWithConfig( - participants: Seq[ParticipantTestContext] - )(test: FiniteDuration => Future[Unit])(implicit ec: ExecutionContext): Future[Unit] = { - // deduplication duration is adjusted by min skew and max skew when running using pre-execution - // to account for this we adjust the time model - val skew = scaledDuration(3.second).asProtobuf - runWithUpdatedTimeModel( - participants, - _.update(_.minSkew := skew, _.maxSkew := skew), - )(timeModel => - test( - asFiniteDuration(timeModel.getMinSkew.asScala + timeModel.getMaxSkew.asScala) - ) - ) - } - - private def runWithUpdatedTimeModel( - participants: Seq[ParticipantTestContext], - timeModelUpdate: TimeModel => TimeModel, - )(test: TimeModel => Future[Unit])(implicit ec: ExecutionContext): Future[Unit] = { - val anyParticipant = participants.head - anyParticipant - .getTimeModel() - .flatMap(timeModel => { - def restoreTimeModel(participant: ParticipantTestContext) = { - val ledgerTimeModelRestoreResult = for { - time <- participant.time() - _ <- participant - .setTimeModel( - time.plusSeconds(1), - timeModel.configurationGeneration + 1, - timeModel.getTimeModel, - ) - } yield {} - ledgerTimeModelRestoreResult.recover { case NonFatal(exception) => - logger.warn("Failed to restore time model for ledger", exception) - () - } - } - - for { - time <- anyParticipant.time() - updatedModel = timeModelUpdate(timeModel.getTimeModel) - (timeModelForTest, participantThatDidTheUpdate) <- tryTimeModelUpdateOnAllParticipants( - participants, - _.setTimeModel( - time.plusSeconds(1), - timeModel.configurationGeneration, - updatedModel, - ).map(_ => updatedModel), - ) - _ <- test(timeModelForTest) - .transformWith(testResult => - restoreTimeModel(participantThatDidTheUpdate).transform(_ => testResult) - ) - } yield {} - }) - } - - /** Try to run the update sequentially on all the participants. - * The function returns the first success or the last failure of the update operation. - * Useful for updating the configuration when we don't know which participant can update the config, - * as only the first one that submitted the initial configuration has the permissions to do so. - */ - private def tryTimeModelUpdateOnAllParticipants( - participants: Seq[ParticipantTestContext], - timeModelUpdate: ParticipantTestContext => Future[TimeModel], - )(implicit ec: ExecutionContext): Future[(TimeModel, ParticipantTestContext)] = { - participants.foldLeft( - Future.failed[(TimeModel, ParticipantTestContext)]( - new IllegalStateException("No participant") - ) - ) { (result, participant) => - result.recoverWith { case NonFatal(_) => - timeModelUpdate(participant).map(_ -> participant) - } - } - } -} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala index 4266a9e5d8c0..09946c06759d 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala @@ -3,6 +3,8 @@ package com.daml.ledger.api.testtool.tests +import java.nio.file.Path + import com.daml.ledger.api.testtool.infrastructure.{BenchmarkReporter, Envelope, LedgerTestSuite} import com.daml.ledger.api.testtool.suites.CompletionDeduplicationInfoIT.{ CommandService, @@ -12,9 +14,7 @@ import com.daml.ledger.api.testtool.suites._ import com.daml.ledger.test.TestDar import com.daml.lf.language.LanguageVersion -import java.nio.file.Path import scala.collection.SortedSet -import scala.concurrent.duration.FiniteDuration object Tests { @@ -25,13 +25,12 @@ object Tests { def default( timeoutScaleFactor: Double = Defaults.TimeoutScaleFactor, - ledgerClockGranularity: FiniteDuration = Defaults.LedgerClockGranularity, staticTime: Boolean = Defaults.StaticTime, ): Vector[LedgerTestSuite] = Vector( new ActiveContractsServiceIT, new ClosedWorldIT, - new CommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity, staticTime), + new CommandDeduplicationIT(timeoutScaleFactor, staticTime), new CommandServiceIT, new CommandSubmissionCompletionIT, new ConfigManagementServiceIT, @@ -63,18 +62,13 @@ object Tests { ) ++ (if (supportsExceptions) Vector(new ExceptionsIT, new ExceptionRaceConditionIT) else Vector.empty) - def optional( - timeoutScaleFactor: Double = Defaults.TimeoutScaleFactor, - ledgerClockGranularity: FiniteDuration = Defaults.LedgerClockGranularity, - staticTime: Boolean = Defaults.StaticTime, - ): Vector[LedgerTestSuite] = + def optional(): Vector[LedgerTestSuite] = Vector( new CompletionDeduplicationInfoIT(CommandService), new CompletionDeduplicationInfoIT(CommandSubmissionService), new CommandDeduplicationParallelIT, new CommandDeduplicationPeriodValidationIT, new ContractIdIT, - new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity, staticTime), new MultiPartySubmissionIT, new ParticipantPruningIT, new MonotonicRecordTimeIT, diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index 59ab7246df40..bb69ec79b1cd 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -156,8 +156,6 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -184,8 +182,6 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -209,13 +205,11 @@ conformance_test( "--verbose", # Notoriously times-out #"--additional=ParticipantPruningIT", - "--additional=KVCommandDeduplicationIT", # The following two tests don't actually care about multi-participant but they do need append-only. "--additional=CommandDeduplicationParallelIT", "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ConfigManagementServiceIT", ], ) @@ -241,8 +235,6 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ConfigManagementServiceIT", # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index 3cf18b45834e..e43049248eb7 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -252,8 +252,6 @@ da_scala_test_suite( "--verbose", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only append-only schema functionality "--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences,ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -310,10 +308,8 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -336,10 +332,8 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -363,10 +357,8 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], @@ -390,10 +382,8 @@ conformance_test( "--additional=CommandDeduplicationParallelIT", "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", ], diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index 1d88e530ec53..549c53d8efb5 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -266,7 +266,6 @@ client_server_build( client_args = [ "--timeout-scale-factor=20", "localhost:%PORT%", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=HealthServiceIT", # Does not make sense to run it for the export ], output_env = "KVUTILS_LEDGER_EXPORT", diff --git a/ledger/sandbox-classic/BUILD.bazel b/ledger/sandbox-classic/BUILD.bazel index 8cf59f8dccbf..30a12ad11772 100644 --- a/ledger/sandbox-classic/BUILD.bazel +++ b/ledger/sandbox-classic/BUILD.bazel @@ -363,10 +363,6 @@ server_conformance_test( # Excluding tests that require using pruneAllDivulgedContracts option that is not supported by sandbox-classic "--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences,ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", "--exclude=ClosedWorldIT", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", ], @@ -397,10 +393,6 @@ server_conformance_test( # Excluding tests that require using pruneAllDivulgedContracts option that is not supported by sandbox-classic "--exclude=ParticipantPruningIT:PRLocalAndNonLocalRetroactiveDivulgences,ParticipantPruningIT:PRRetroactiveDivulgences,ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", "--exclude=ClosedWorldIT", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", ], @@ -423,10 +415,6 @@ server_conformance_test( "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", "--exclude=ClosedWorldIT", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", ], @@ -444,15 +432,12 @@ server_conformance_test( "--concurrent-test-runs=1", # sandbox classic doesn't scale well with concurrent tests (almost no effect on overall run time) "--timeout-scale-factor=2", # sandbox classic is slow in general "--open-world", + "--static-time", "--additional=CommandDeduplicationParallelIT", "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", "--exclude=ClosedWorldIT", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", ], @@ -471,10 +456,6 @@ server_conformance_test( "--timeout-scale-factor=2", # sandbox classic is slow in general "--open-world", "--exclude=ClosedWorldIT", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", ], ) @@ -501,10 +482,6 @@ server_conformance_test( "--exclude=ContractKeysIT:CKMaintainerScoped", "--exclude=ExceptionsIT:ExRollbackDuplicateKeyCreated", "--exclude=ExceptionsIT:ExRollbackDuplicateKeyArchived", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # Below test suites are duplicates of the above ExceptionsIT and ContractKeysIT # but with more generic assertions for the in-memory ledger # TODO sandbox-classic removal: Delete these tests @@ -529,10 +506,6 @@ server_conformance_test( "--exclude=ContractKeysIT:CKMaintainerScoped", "--exclude=ExceptionsIT:ExRollbackDuplicateKeyCreated", "--exclude=ExceptionsIT:ExRollbackDuplicateKeyArchived", - # Exclude offset command deduplication tests - # sandbox classic has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # Below test suites are duplicates of the above ExceptionsIT and ContractKeysIT # but with more generic assertions for the in-memory ledger # TODO sandbox-classic removal: Delete these tests diff --git a/ledger/sandbox-on-x/BUILD.bazel b/ledger/sandbox-on-x/BUILD.bazel index d9d4d5b5e1c7..bc7df064c03b 100644 --- a/ledger/sandbox-on-x/BUILD.bazel +++ b/ledger/sandbox-on-x/BUILD.bazel @@ -132,7 +132,6 @@ conformance_test( "--verbose", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--exclude=CommandDeduplicationIT", "--exclude=ClosedWorldIT", "--exclude=ContractKeysIT", "--exclude=SemanticTests", @@ -165,10 +164,6 @@ conformance_test( "--additional=ContractIdIT:Accept", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - # Exclude offset command deduplication tests as - # Sandbox-on-X has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods. - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", # Disable tests targeting only multi-participant setups @@ -192,10 +187,6 @@ conformance_test( "--static-time", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - # Exclude offset command deduplication tests as - # Sandbox-on-X has support only for participant side deduplication, - # which in turn only has support for durations as deduplication periods. - "--exclude=CommandDeduplicationIT:ParticipantCommandDeduplicationDeduplicateUsingOffsets", # not enforced "--exclude=CommandDeduplicationPeriodValidationIT:DeduplicationDurationExceedsMaxDeduplicationDuration", # Disable tests targeting only multi-participant setups diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index 1b893b687e1e..48576903dc2a 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -277,8 +277,6 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--additional=ContractIdIT:RejectV0,ContractIdIT:AcceptSuffixedV1,ContractIdIT:AcceptNonSuffixedV1", "--exclude=ClosedWorldIT", ], @@ -315,8 +313,6 @@ conformance_test( "--additional=CommandDeduplicationPeriodValidationIT", "--additional=CompletionDeduplicationInfoITCommandService", "--additional=CompletionDeduplicationInfoITCommandSubmissionService", - "--additional=KVCommandDeduplicationIT", - "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ClosedWorldIT", ], ) diff --git a/runtime-components/non-repudiation-conformance/src/test/scala/com/daml/nonrepudiation/postgresql/NonRepudiationProxyConformance.scala b/runtime-components/non-repudiation-conformance/src/test/scala/com/daml/nonrepudiation/postgresql/NonRepudiationProxyConformance.scala index b09f6c281f81..63e96cd1f912 100644 --- a/runtime-components/non-repudiation-conformance/src/test/scala/com/daml/nonrepudiation/postgresql/NonRepudiationProxyConformance.scala +++ b/runtime-components/non-repudiation-conformance/src/test/scala/com/daml/nonrepudiation/postgresql/NonRepudiationProxyConformance.scala @@ -3,6 +3,8 @@ package com.daml.nonrepudiation.postgresql +import java.time.{Clock, Duration} + import com.daml.doobie.logging.Slf4jLogHandler import com.daml.ledger.api.testtool.infrastructure import com.daml.ledger.api.testtool.infrastructure.{ @@ -11,11 +13,7 @@ import com.daml.ledger.api.testtool.infrastructure.{ LedgerTestSummary, Result, } -import com.daml.ledger.api.testtool.suites.{ - ClosedWorldIT, - CommandDeduplicationIT, - KVCommandDeduplicationIT, -} +import com.daml.ledger.api.testtool.suites.ClosedWorldIT import com.daml.ledger.api.testtool.tests.Tests import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.CommandSubmissionService @@ -33,7 +31,6 @@ import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{Inside, OptionValues} -import java.time.{Clock, Duration} import scala.concurrent.duration.DurationInt final class NonRepudiationProxyConformance @@ -49,17 +46,11 @@ final class NonRepudiationProxyConformance .default() .filter { case _: ClosedWorldIT => false - case _: CommandDeduplicationIT => false case _ => true } - private val optionalTestsToRun = Tests.optional().filter { - case _: KVCommandDeduplicationIT => true - case _ => false - } - private val conformanceTestCases: Vector[LedgerTestCase] = - (defaultTestsToRun ++ optionalTestsToRun) + defaultTestsToRun .flatMap(_.tests) it should "pass all conformance tests" in {