From f4adee91ca182c9bac1847585491adbac7ef82dd Mon Sep 17 00:00:00 2001 From: nicu-da Date: Thu, 16 Sep 2021 04:16:39 -0700 Subject: [PATCH] Add conformance test for command deduplication using the CommandService [KVL-1099] (#10883) * Add conformance test for command deduplication using the CommandService CHANGELOG_BEGIN CHANGELOG_END --- ...ndOnlyCommandDeduplicationParallelIT.scala | 109 ++++++++++++++---- ledger/ledger-on-memory/BUILD.bazel | 1 + ledger/ledger-on-sql/BUILD.bazel | 23 ++-- 3 files changed, 98 insertions(+), 35 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala index 87b055890b3a..00ad8dd3c967 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala @@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest import com.daml.ledger.api.v1.command_submission_service.SubmitRequest import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.client.binding.Primitive import com.daml.ledger.client.binding.Primitive.Party -import com.daml.ledger.test.model.Test.DummyWithAnnotation +import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation} import io.grpc.Status import io.grpc.Status.Code @@ -33,12 +35,11 @@ import scala.concurrent.{ExecutionContext, Future} class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { test( - s"DeduplicateParallelSubmissions", - "Commands submitted at the same, in parallel, should be deduplicated", + s"DeduplicateParallelSubmissionsUsingCommandSubmissionService", + "Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated", allocate(SingleParty), )(implicit ec => { case Participants(Participant(ledger, party)) => val deduplicationDuration = 3.seconds - val numberOfParallelRequests = 10 lazy val request = ledger .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) .update( @@ -46,35 +47,93 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { deduplicationDuration.asProtobuf ) ) - Future - .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { - submitRequestAndGetStatusCode(ledger)(request, party) - }) - .map(_.groupBy(identity).view.mapValues(_.size).toMap) - .map(responses => { - val expectedDuplicateResponses = numberOfParallelRequests - 1 - val okResponses = responses.getOrElse(Code.OK, 0) - val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) - // Participant-based command de-duplication can currently also reject duplicates via a SQL exception - val internalResponses = responses.getOrElse(Code.INTERNAL, 0) - // Canton can return ABORTED for duplicate submissions - val abortedResponses = responses.getOrElse(Code.ABORTED, 0) - val duplicateResponses = - alreadyExistsResponses + internalResponses + abortedResponses - assert( - okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1, - s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", - ) - }) + runTestWithSubmission[SubmitRequest]( + ledger, + party, + request, + submitRequestAndGetStatusCode(ledger)(_, party), + ) }) + test( + s"DeduplicateParallelSubmissionsUsingCommandService", + "Commands submitted at the same, in parallel, using the CommandService, should be deduplicated", + allocate(SingleParty), + runConcurrently = false, + )(implicit ec => { case Participants(Participant(ledger, party)) => + val deduplicationDuration = 3.seconds + val request = ledger + .submitAndWaitRequest(party, Dummy(party).create.command) + .update( + _.commands.deduplicationDuration := deduplicationDuration.asProtobuf + ) + runTestWithSubmission[SubmitAndWaitRequest]( + ledger, + party, + request, + submitAndWaitRequestAndGetStatusCode(ledger)(_, party), + ) + }) + + private def runTestWithSubmission[T]( + ledger: ParticipantTestContext, + party: Party, + request: T, + submitRequestAndGetStatus: T => Future[Code], + )(implicit + ec: ExecutionContext + ) = { + val numberOfParallelRequests = 10 + for { + responses <- Future + .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { + submitRequestAndGetStatus(request) + }) + .map(_.groupBy(identity).view.mapValues(_.size).toMap) + activeContracts <- ledger.activeContracts(party) + } yield { + val expectedDuplicateResponses = numberOfParallelRequests - 1 + val okResponses = responses.getOrElse(Code.OK, 0) + val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) + // Participant-based command de-duplication can currently also reject duplicates via a SQL exception when using the CommandSubmissionService + val internalResponses = responses.getOrElse(Code.INTERNAL, 0) + // Canton can return ABORTED for duplicate submissions + // Participant based command de-duplication can currently also return ABORTED when using the CommandService + val abortedResponses = responses.getOrElse(Code.ABORTED, 0) + val duplicateResponses = + alreadyExistsResponses + internalResponses + abortedResponses + assert( + okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1, + s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", + ) + assert(activeContracts.size == 1) + } + } + + private def submitAndWaitRequestAndGetStatusCode( + ledger: ParticipantTestContext + )(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = { + val submissionId = UUID.randomUUID().toString + val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId) + val submitResult = ledger.submitAndWait(requestWithSubmissionId) + submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*) + } protected def submitRequestAndGetStatusCode( ledger: ParticipantTestContext )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = { val submissionId = UUID.randomUUID().toString val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId) - ledger + val submitResult = ledger .submit(requestWithSubmissionId) + submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*) + } + + private def submissionResultToFinalStatusCode( + ledger: ParticipantTestContext + )(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit + ec: ExecutionContext + ) = { + submitResult .flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId)) .map { case Some(completion) => diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index 75b47d3c5389..9f19e65dedab 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -213,6 +213,7 @@ conformance_test( "--participant=participant-id=example1,port=6865", "--participant=participant-id=example2,port=6866", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], test_tool_args = [ "--verbose", diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index bdb6c1fb957a..9867ee8c13fc 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -317,12 +317,12 @@ conformance_test( tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -340,16 +340,17 @@ conformance_test( "--mutable-contract-state-cache", "--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -366,16 +367,17 @@ conformance_test( "--index-append-only-schema", "--mutable-contract-state-cache", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -418,12 +420,12 @@ conformance_test( tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -441,16 +443,17 @@ conformance_test( "--mutable-contract-state-cache", "--buffered-ledger-api-streams-unsafe", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [ "--verbose", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",