From b65772db262f33375201905ba33948819627a05e Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Mon, 13 Sep 2021 15:52:06 +0200 Subject: [PATCH 01/14] Extract deduplication "features" into a configuration to be used around the tests. Better naming for assertions that support sync and async deduplication CHANGELOG_BEGIN CHANGELOG_END --- .../CommandDeduplicationBase.scala | 93 ++++++++++++------- .../KVCommandDeduplicationBase.scala | 15 +-- .../AppendOnlyKVCommandDeduplicationIT.scala | 13 ++- .../suites/CommandDeduplicationIT.scala | 11 ++- .../suites/KVCommandDeduplicationIT.scala | 12 ++- 5 files changed, 94 insertions(+), 50 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index cb120e722419..fbec413a51da 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -9,6 +9,7 @@ import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertGrpcError, assertSingleton, _} import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.ledger.api.v1.command_submission_service.SubmitRequest import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod @@ -37,10 +38,7 @@ private[testtool] abstract class CommandDeduplicationBase( val ledgerWaitInterval: FiniteDuration = ledgerTimeInterval * 2 val defaultDeduplicationWindowWait: FiniteDuration = deduplicationDuration + ledgerWaitInterval - /** For [[Completion]], the submission id and deduplication period are filled only for append only schemas - * Therefore, we need to assert on those fields only if it's an append only schema - */ - protected def isAppendOnly: Boolean + def deduplicationFeatures: DeduplicationFeatures protected def runGivenDeduplicationWait( participants: Seq[ParticipantTestContext] @@ -78,8 +76,8 @@ private[testtool] abstract class CommandDeduplicationBase( // Submit command A (first deduplication window) // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, // only one submission is therefore sent to the ledger. - completion1 <- requestHasOkCompletion(ledger)(requestA1, party) - _ <- submitRequestAndAssertFailure(ledger)(requestA1, Code.ALREADY_EXISTS) + completion1 <- submitRequestAndAssertAccepted(ledger)(requestA1, party) + _ <- submitRequestAndAssertDeduplication(ledger)(requestA1) // Wait until the end of first deduplication window _ <- Delayed.by(deduplicationWait)(()) @@ -88,8 +86,8 @@ private[testtool] abstract class CommandDeduplicationBase( // the ledger API server and the ledger itself, since the test waited more than // `deduplicationSeconds` after receiving the first command *completion*. // The first submit() in this block should therefore lead to an accepted transaction. - completion2 <- requestHasOkCompletion(ledger)(requestA2, party) - _ <- submitRequestAndAssertFailure(ledger)(requestA2, Code.ALREADY_EXISTS) + completion2 <- submitRequestAndAssertAccepted(ledger)(requestA2, party) + _ <- submitRequestAndAssertDeduplication(ledger)(requestA2, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) } yield { @@ -122,10 +120,10 @@ private[testtool] abstract class CommandDeduplicationBase( for { // Submit an invalid command (should fail with INVALID_ARGUMENT) - _ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertCompletionStatus(ledger)(requestA, Code.INVALID_ARGUMENT) // Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS) - _ <- submitRequestAndAssertFailure(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertCompletionStatus(ledger)(requestA, Code.INVALID_ARGUMENT) } yield {} }) @@ -302,47 +300,66 @@ private[testtool] abstract class CommandDeduplicationBase( } }) - def requestHasOkCompletion( + def submitRequestAndAssertAccepted( + ledger: ParticipantTestContext + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { + submitRequestAndAssertCompletionStatus(ledger)(request, Code.OK, parties: _*) + } + + protected def submitRequestAndAssertDeduplication( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = { - requestHasCompletionWithStatusCode(ledger)(request, party, Code.OK) + )(request: SubmitRequest, parties: Party*)(implicit + ec: ExecutionContext + ): Future[Unit] = { + if (deduplicationFeatures.participantDeduplication) { + submitRequestAndAssertSyncDeduplication(ledger, request) + } else { + submitRequestAndAssertAsyncDeduplication(ledger, request, parties: _*) + .map(_ => ()) + } } - def requestHasCompletionWithStatusCode( + protected def submitRequestAndAssertSyncDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + )(implicit ec: ExecutionContext): Future[Unit] = ledger + .submit(request) + .mustFail(s"Request expected to fail with code ${Code.ALREADY_EXISTS}") + .map(assertGrpcError(_, Code.ALREADY_EXISTS, None)) + + protected def submitRequestAndAssertAsyncDeduplication( + ledger: ParticipantTestContext, + request: SubmitRequest, + parties: Party* + )(implicit ec: ExecutionContext): Future[Completion] = submitRequestAndAssertCompletionStatus( + ledger + )(request, Code.ALREADY_EXISTS, parties: _*) + + private def submitRequestAndAssertCompletionStatus( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party, code: Code)(implicit + )(request: SubmitRequest, statusCode: Code, parties: Party*)(implicit ec: ExecutionContext - ): Future[Completion] = { - submitRequestAndFindCompletion(ledger)(request, party).map(completion => { + ): Future[Completion] = + submitRequestAndFindCompletion(ledger)(request, parties: _*).map(completion => { assert( - completion.getStatus.code == code.value(), - s"Expecting completion with status code $code but completion has status ${completion.status}", + completion.getStatus.code == statusCode.value(), + s"Expecting completion with status code $statusCode but completion has status ${completion.status}", ) completion }) - } - - protected def submitRequestAndAssertFailure( - ledger: ParticipantTestContext - )(request: SubmitRequest, statusCode: Code)(implicit ec: ExecutionContext): Future[Unit] = { - ledger - .submit(request) - .mustFail(s"Request expected to fail with status $statusCode") - .map(assertGrpcError(_, statusCode, None)) - } protected def submitRequestAndFindCompletion( ledger: ParticipantTestContext - )(request: SubmitRequest, party: Party)(implicit ec: ExecutionContext): Future[Completion] = { + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { val submissionId = UUID.randomUUID().toString submitRequest(ledger)(request.update(_.commands.submissionId := submissionId)) .flatMap(ledgerEnd => { - ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(party)) + ledger.firstCompletions(ledger.completionStreamRequest(ledgerEnd)(parties: _*)) }) .map { completions => val completion = assertSingleton("Expected only one completion", completions) // The [[Completion.submissionId]] is set only for append-only ledgers - if (isAppendOnly) + if (deduplicationFeatures.appendOnlySchema) assert( completion.submissionId == submissionId, s"Submission id is different for completion. Completion has submission id [${completion.submissionId}], request has submission id [$submissionId]", @@ -370,3 +387,15 @@ private[testtool] abstract class CommandDeduplicationBase( throw new IllegalArgumentException(s"Invalid timeout scale factor: $timeoutScaleFactor") } } + +object CommandDeduplicationBase { + + /** @param participantDeduplication If participant deduplication is enabled then we will receive synchronous rejections + * @param appendOnlySchema For [[Completion]], the submission id and deduplication period are filled only for append only schemas + * Therefore, we need to assert on those fields only if it's an append only schema + */ + case class DeduplicationFeatures( + participantDeduplication: Boolean, + appendOnlySchema: Boolean, + ) +} diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala index 92df616f5f57..cd259d635016 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala @@ -16,7 +16,6 @@ import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod import com.daml.ledger.api.v1.completion.Completion import com.daml.ledger.test.model.Test.DummyWithAnnotation import com.daml.timer.Delayed -import io.grpc.Status.Code import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} @@ -50,22 +49,18 @@ abstract class KVCommandDeduplicationBase( ) runWithConfig(configuredParticipants) { (maxDeduplicationDuration, minSkew) => for { - completion1 <- requestHasOkCompletion(ledger)(request, party) + completion1 <- submitRequestAndAssertAccepted(ledger)(request, party) // participant side deduplication, sync result - _ <- submitRequestAndAssertFailure(ledger)(request, Code.ALREADY_EXISTS) + _ <- submitRequestAndAssertSyncDeduplication(ledger, request) // Wait for the end of participant deduplication // We also add min skew to also validate the committer deduplication duration _ <- Delayed.by(deduplicationDuration.plus(minSkew))(()) // Validate committer deduplication - duplicateCompletion <- requestHasCompletionWithStatusCode(ledger)( - request, - party, - Code.ALREADY_EXISTS, - ) + duplicateCompletion <- submitRequestAndAssertAsyncDeduplication(ledger, request, party) // Wait for the end of committer deduplication, we already waited for minSkew _ <- Delayed.by(maxDeduplicationDuration.minus(deduplicationDuration))(()) // Deduplication has finished - completion2 <- requestHasOkCompletion(ledger)(request, party) + completion2 <- submitRequestAndAssertAccepted(ledger)(request, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) } yield { @@ -82,7 +77,7 @@ abstract class KVCommandDeduplicationBase( "The command ID of the duplicate completion does not match the command ID of the submission", ) // The [[Completion.deduplicationPeriod]] is set only for append-only ledgers - if (isAppendOnly) { + if (deduplicationFeatures.appendOnlySchema) { val expectedCompletionDeduplicationPeriod = Completion.DeduplicationPeriod.DeduplicationTime( maxDeduplicationDuration.asProtobuf diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala index 5392c609d99c..9b4fb93d1e0c 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyKVCommandDeduplicationIT.scala @@ -3,7 +3,11 @@ package com.daml.ledger.api.testtool.suites -import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures +import com.daml.ledger.api.testtool.infrastructure.deduplication.{ + CommandDeduplicationBase, + KVCommandDeduplicationBase, +} import scala.concurrent.duration.FiniteDuration @@ -14,7 +18,12 @@ class AppendOnlyKVCommandDeduplicationIT( timeoutScaleFactor: Double, ledgerTimeInterval: FiniteDuration, ) extends KVCommandDeduplicationBase(timeoutScaleFactor, ledgerTimeInterval) { - override protected def isAppendOnly: Boolean = true override protected def testNamingPrefix: String = "AppendOnlyKVCommandDeduplication" + + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = true, + ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index 124665f57fe0..314f4443ac9e 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -4,10 +4,11 @@ package com.daml.ledger.api.testtool.suites import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} /** Command deduplication tests for participant side deduplication * Should be disabled for ledgers that have committer side deduplication enabled (KV) @@ -23,7 +24,9 @@ final class CommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterva override def testNamingPrefix: String = "ParticipantCommandDeduplication" - /** Assertions for append-only schema are important for KV ledgers - */ - override protected def isAppendOnly: Boolean = false + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = false, + ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala index 520687e51577..30e4b83123bb 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/KVCommandDeduplicationIT.scala @@ -3,7 +3,11 @@ package com.daml.ledger.api.testtool.suites -import com.daml.ledger.api.testtool.infrastructure.deduplication.KVCommandDeduplicationBase +import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures +import com.daml.ledger.api.testtool.infrastructure.deduplication.{ + CommandDeduplicationBase, + KVCommandDeduplicationBase, +} import scala.concurrent.duration.FiniteDuration @@ -17,5 +21,9 @@ class KVCommandDeduplicationIT(timeoutScaleFactor: Double, ledgerTimeInterval: F override def testNamingPrefix: String = "KVCommandDeduplication" - override protected def isAppendOnly: Boolean = false + override def deduplicationFeatures: CommandDeduplicationBase.DeduplicationFeatures = + DeduplicationFeatures( + participantDeduplication = true, + appendOnlySchema = false, + ) } From a47250cbc79a9dda5c6537b3264c13206fa064ec Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Mon, 13 Sep 2021 17:11:46 +0200 Subject: [PATCH 02/14] Fix broken test and use consistency for tests --- .../CommandDeduplicationBase.scala | 27 +++++++++++-------- .../KVCommandDeduplicationBase.scala | 6 ++--- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index fbec413a51da..1990d21be5fa 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -76,7 +76,7 @@ private[testtool] abstract class CommandDeduplicationBase( // Submit command A (first deduplication window) // Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server, // only one submission is therefore sent to the ledger. - completion1 <- submitRequestAndAssertAccepted(ledger)(requestA1, party) + completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA1, party) _ <- submitRequestAndAssertDeduplication(ledger)(requestA1) // Wait until the end of first deduplication window _ <- Delayed.by(deduplicationWait)(()) @@ -86,7 +86,7 @@ private[testtool] abstract class CommandDeduplicationBase( // the ledger API server and the ledger itself, since the test waited more than // `deduplicationSeconds` after receiving the first command *completion*. // The first submit() in this block should therefore lead to an accepted transaction. - completion2 <- submitRequestAndAssertAccepted(ledger)(requestA2, party) + completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA2, party) _ <- submitRequestAndAssertDeduplication(ledger)(requestA2, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) @@ -120,10 +120,10 @@ private[testtool] abstract class CommandDeduplicationBase( for { // Submit an invalid command (should fail with INVALID_ARGUMENT) - _ <- submitRequestAndAssertCompletionStatus(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT) // Re-submit the invalid command (should again fail with INVALID_ARGUMENT and not with ALREADY_EXISTS) - _ <- submitRequestAndAssertCompletionStatus(ledger)(requestA, Code.INVALID_ARGUMENT) + _ <- submitRequestAndAssertSyncFailure(ledger)(requestA, Code.INVALID_ARGUMENT) } yield {} }) @@ -300,7 +300,7 @@ private[testtool] abstract class CommandDeduplicationBase( } }) - def submitRequestAndAssertAccepted( + def submitRequestAndAssertCompletionAccepted( ledger: ParticipantTestContext )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { submitRequestAndAssertCompletionStatus(ledger)(request, Code.OK, parties: _*) @@ -314,7 +314,7 @@ private[testtool] abstract class CommandDeduplicationBase( if (deduplicationFeatures.participantDeduplication) { submitRequestAndAssertSyncDeduplication(ledger, request) } else { - submitRequestAndAssertAsyncDeduplication(ledger, request, parties: _*) + submitRequestAndAssertAsyncDeduplication(ledger)(request, parties: _*) .map(_ => ()) } } @@ -322,13 +322,18 @@ private[testtool] abstract class CommandDeduplicationBase( protected def submitRequestAndAssertSyncDeduplication( ledger: ParticipantTestContext, request: SubmitRequest, - )(implicit ec: ExecutionContext): Future[Unit] = ledger + )(implicit ec: ExecutionContext): Future[Unit] = + submitRequestAndAssertSyncFailure(ledger)(request, Code.ALREADY_EXISTS) + + private def submitRequestAndAssertSyncFailure(ledger: ParticipantTestContext)( + request: SubmitRequest, + code: Code, + )(implicit ec: ExecutionContext) = ledger .submit(request) - .mustFail(s"Request expected to fail with code ${Code.ALREADY_EXISTS}") - .map(assertGrpcError(_, Code.ALREADY_EXISTS, None)) + .mustFail(s"Request expected to fail with code $code") + .map(assertGrpcError(_, code, None)) - protected def submitRequestAndAssertAsyncDeduplication( - ledger: ParticipantTestContext, + protected def submitRequestAndAssertAsyncDeduplication(ledger: ParticipantTestContext)( request: SubmitRequest, parties: Party* )(implicit ec: ExecutionContext): Future[Completion] = submitRequestAndAssertCompletionStatus( diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala index cd259d635016..24bbcad0149c 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/KVCommandDeduplicationBase.scala @@ -49,18 +49,18 @@ abstract class KVCommandDeduplicationBase( ) runWithConfig(configuredParticipants) { (maxDeduplicationDuration, minSkew) => for { - completion1 <- submitRequestAndAssertAccepted(ledger)(request, party) + completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party) // participant side deduplication, sync result _ <- submitRequestAndAssertSyncDeduplication(ledger, request) // Wait for the end of participant deduplication // We also add min skew to also validate the committer deduplication duration _ <- Delayed.by(deduplicationDuration.plus(minSkew))(()) // Validate committer deduplication - duplicateCompletion <- submitRequestAndAssertAsyncDeduplication(ledger, request, party) + duplicateCompletion <- submitRequestAndAssertAsyncDeduplication(ledger)(request, party) // Wait for the end of committer deduplication, we already waited for minSkew _ <- Delayed.by(maxDeduplicationDuration.minus(deduplicationDuration))(()) // Deduplication has finished - completion2 <- submitRequestAndAssertAccepted(ledger)(request, party) + completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party) // Inspect created contracts activeContracts <- ledger.activeContracts(party) } yield { From 149ce3531ad89f984be009262fe524178764c6a3 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Mon, 13 Sep 2021 18:12:09 +0200 Subject: [PATCH 03/14] ledger-api-test-tool - Add conformance test for parallel command deduplication CHANGELOG_BEGIN CHANGELOG_END --- .../CommandDeduplicationBase.scala | 48 +++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index 1990d21be5fa..04f888da2213 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -5,6 +5,7 @@ package com.daml.ledger.api.testtool.infrastructure.deduplication import java.util.UUID +import com.daml.grpc.GrpcException import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertGrpcError, assertSingleton, _} import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite @@ -300,6 +301,33 @@ private[testtool] abstract class CommandDeduplicationBase( } }) + test( + s"${testNamingPrefix}DeduplicateParallelSubmissions", + "Commands submitted at the same, in parallel, should be deduplicated", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + lazy val request = ledger + .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) + .update( + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationTime( + deduplicationDuration.asProtobuf + ) + ) + val numberOfParallelRequests = 10 + Future + .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { + submitRequestAndGetStatusCode(ledger)(request, party) + }) + .map(_.groupMapReduce(identity)(_ => 1)(_ + _)) + .map(responses => { + val expectedDuplicateResponses = numberOfParallelRequests - 1 + assert( + responses == Map(Code.ALREADY_EXISTS -> expectedDuplicateResponses, Code.OK -> 1), + s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", + ) + }) + }) + def submitRequestAndAssertCompletionAccepted( ledger: ParticipantTestContext )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { @@ -311,12 +339,26 @@ private[testtool] abstract class CommandDeduplicationBase( )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext ): Future[Unit] = { - if (deduplicationFeatures.participantDeduplication) { + if (deduplicationFeatures.participantDeduplication) submitRequestAndAssertSyncDeduplication(ledger, request) - } else { + else submitRequestAndAssertAsyncDeduplication(ledger)(request, parties: _*) .map(_ => ()) - } + } + + def submitRequestAndGetStatusCode( + ledger: ParticipantTestContext + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = { + if (deduplicationFeatures.participantDeduplication) + ledger.submit(request).map(_ => Code.OK).recover { + case GrpcException(status, _) => + Status.fromCodeValue(status.getCode.value()).getCode + case otherException => fail("Not a GRPC exception", otherException) + } + else + submitRequestAndFindCompletion(ledger)(request, parties: _*).map(completion => + Status.fromCodeValue(completion.getStatus.code).getCode + ) } protected def submitRequestAndAssertSyncDeduplication( From 0a9eb8d013bb6215025baa0778c38438777c9e7b Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Mon, 13 Sep 2021 21:23:05 +0200 Subject: [PATCH 04/14] Add import for 2.12 compat --- .../infrastructure/deduplication/CommandDeduplicationBase.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index 1777e2c04eba..eee1058f7b79 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -24,6 +24,7 @@ import io.grpc.Status import io.grpc.Status.Code import scala.annotation.nowarn +import scala.collection.compat._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} From 4e04f058fc91565b0245c3aaf90e29fc335a8b52 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 08:37:42 +0200 Subject: [PATCH 05/14] Add silencer plugin --- ledger/ledger-api-test-tool/BUILD.bazel | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ledger/ledger-api-test-tool/BUILD.bazel b/ledger/ledger-api-test-tool/BUILD.bazel index 709770324e5a..f7a6c24d45c4 100644 --- a/ledger/ledger-api-test-tool/BUILD.bazel +++ b/ledger/ledger-api-test-tool/BUILD.bazel @@ -73,6 +73,9 @@ da_scala_binary( da_scala_library( name = "ledger-api-test-tool-%s-lib" % lf_version, srcs = glob(["src/main/scala/com/daml/ledger/api/testtool/infrastructure/**/*.scala"]), + plugins = [ + silencer_plugin, + ], scala_deps = [ "@maven//:com_typesafe_akka_akka_actor", "@maven//:com_typesafe_akka_akka_stream", From 0849a1d457b14f0f347a2c710bc539ded8322c60 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 11:36:36 +0200 Subject: [PATCH 06/14] Split parallel command deduplication scenario into it's own test suite --- .../CommandDeduplicationBase.scala | 44 --------- .../CommandDeduplicationParallelIT.scala | 91 +++++++++++++++++++ 2 files changed, 91 insertions(+), 44 deletions(-) create mode 100644 ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala index eee1058f7b79..857a50683257 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/deduplication/CommandDeduplicationBase.scala @@ -5,7 +5,6 @@ package com.daml.ledger.api.testtool.infrastructure.deduplication import java.util.UUID -import com.daml.grpc.GrpcException import com.daml.ledger.api.testtool.infrastructure.Allocation._ import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertGrpcError, assertSingleton, _} import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite @@ -24,7 +23,6 @@ import io.grpc.Status import io.grpc.Status.Code import scala.annotation.nowarn -import scala.collection.compat._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -332,33 +330,6 @@ private[testtool] abstract class CommandDeduplicationBase( } }) - test( - s"${testNamingPrefix}DeduplicateParallelSubmissions", - "Commands submitted at the same, in parallel, should be deduplicated", - allocate(SingleParty), - )(implicit ec => { case Participants(Participant(ledger, party)) => - lazy val request = ledger - .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) - .update( - _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationTime( - deduplicationDuration.asProtobuf - ) - ) - val numberOfParallelRequests = 10 - Future - .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { - submitRequestAndGetStatusCode(ledger)(request, party) - }) - .map(_.groupMapReduce(identity)(_ => 1)(_ + _)) - .map(responses => { - val expectedDuplicateResponses = numberOfParallelRequests - 1 - assert( - responses == Map(Code.ALREADY_EXISTS -> expectedDuplicateResponses, Code.OK -> 1), - s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", - ) - }) - }) - def submitRequestAndAssertCompletionAccepted( ledger: ParticipantTestContext )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = { @@ -377,21 +348,6 @@ private[testtool] abstract class CommandDeduplicationBase( .map(_ => ()) } - def submitRequestAndGetStatusCode( - ledger: ParticipantTestContext - )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = { - if (deduplicationFeatures.participantDeduplication) - ledger.submit(request).map(_ => Code.OK).recover { - case GrpcException(status, _) => - Status.fromCodeValue(status.getCode.value()).getCode - case otherException => fail("Not a GRPC exception", otherException) - } - else - submitRequestAndFindCompletion(ledger)(request, parties: _*).map(completion => - Status.fromCodeValue(completion.getStatus.code).getCode - ) - } - protected def submitRequestAndAssertSyncDeduplication( ledger: ParticipantTestContext, request: SubmitRequest, diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala new file mode 100644 index 000000000000..5c897d0bc8b6 --- /dev/null +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala @@ -0,0 +1,91 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.testtool.suites + +import java.util.UUID + +import com.daml.grpc.GrpcException +import com.daml.ledger.api.testtool.infrastructure.Allocation.{ + Participant, + Participants, + SingleParty, + allocate, +} +import com.daml.ledger.api.testtool.infrastructure.Assertions.fail +import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite +import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.client.binding.Primitive.Party +import com.daml.ledger.test.model.Test.DummyWithAnnotation +import io.grpc.Status +import io.grpc.Status.Code + +import scala.collection.compat._ +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} + +/** Should be enabled for ledgers that fill the submission id in the completions, + * as we need to use the submission id to find completions for parallel submissions + */ +class CommandDeduplicationParallelIT extends LedgerTestSuite { + + test( + s"DeduplicateParallelSubmissions", + "Commands submitted at the same, in parallel, should be deduplicated", + allocate(SingleParty), + )(implicit ec => { case Participants(Participant(ledger, party)) => + val deduplicationDuration = 3.seconds + lazy val request = ledger + .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) + .update( + _.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration( + deduplicationDuration.asProtobuf + ) + ) + val numberOfParallelRequests = 10 + Future + .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { + submitRequestAndGetStatusCode(ledger)(request, party) + }) + .map(_.groupMapReduce(identity)(_ => 1)(_ + _)) + .map(responses => { + val expectedDuplicateResponses = numberOfParallelRequests - 1 + val okResponses = responses.getOrElse(Code.OK, 0) + val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) + // Because of the way participant deduplication currently works, requests can fail with duplicate keys for command deduplication + val internalResponses = responses.getOrElse(Code.INTERNAL, 0) + // Canton can return ABORTED for duplicate submissions + val abortedResponses = responses.getOrElse(Code.ABORTED, 0) + val duplicateResponses = + alreadyExistsResponses + internalResponses + abortedResponses + assert( + okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1, + s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", + ) + }) + }) + + protected def submitRequestAndGetStatusCode( + ledger: ParticipantTestContext + )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = { + val submissionId = UUID.randomUUID().toString + val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId) + ledger + .submit(requestWithSubmissionId) + .flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId)) + .map { + case Some(completion) => + completion.getStatus.code + case None => fail(s"Did not find completion for request with submission id $submissionId") + } + .recover { + case GrpcException(status, _) => + status.getCode.value() + case otherException => fail("Not a GRPC exception", otherException) + } + .map(codeValue => Status.fromCodeValue(codeValue).getCode) + } +} From 52fa712c2ee2b7c29c4c337dcfa70d3baafa0b35 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 12:33:08 +0200 Subject: [PATCH 07/14] Add the parallel command deduplication test to append only ledgers --- .../testtool/suites/CommandDeduplicationParallelIT.scala | 3 +++ .../scala/com/daml/ledger/api/testtool/tests/Tests.scala | 3 ++- ledger/ledger-on-sql/BUILD.bazel | 6 +++--- ledger/sandbox/BUILD.bazel | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala index 5c897d0bc8b6..8ffc9ac3b5fb 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala @@ -36,6 +36,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { s"DeduplicateParallelSubmissions", "Commands submitted at the same, in parallel, should be deduplicated", allocate(SingleParty), + runConcurrently = false, )(implicit ec => { case Participants(Participant(ledger, party)) => val deduplicationDuration = 3.seconds lazy val request = ledger @@ -78,11 +79,13 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { .flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId)) .map { case Some(completion) => + println(s"Found completions $completion") completion.getStatus.code case None => fail(s"Did not find completion for request with submission id $submissionId") } .recover { case GrpcException(status, _) => + println(s"Exception status $status") status.getCode.value() case otherException => fail("Not a GRPC exception", otherException) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala index bd6a6b8fb9f9..2a61f8bf7031 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala @@ -69,8 +69,9 @@ object Tests { new AppendOnlyCompletionDeduplicationInfoIT(CommandService), new AppendOnlyCompletionDeduplicationInfoIT(CommandSubmissionService), new AppendOnlyKVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity), - new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity), new ContractIdIT, + new CommandDeduplicationParallelIT, + new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity), new MultiPartySubmissionIT, new ParticipantPruningIT, new MonotonicRecordTimeIT, diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index c7c4ae190f8e..f3f7ac9efce6 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -414,7 +414,7 @@ conformance_test( "--include=" + "AppendOnlyCompletionDeduplicationInfoITCommandService" + ",AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService" + - ",AppendOnlyKVCommandDeduplicationIT", + ",AppendOnlyKVCommandDeduplicationIT,CommandDeduplicationParallelIT", ], ) @@ -436,7 +436,7 @@ conformance_test( "--include=" + "AppendOnlyCompletionDeduplicationInfoITCommandService" + ",AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService" + - ",AppendOnlyKVCommandDeduplicationIT", + ",AppendOnlyKVCommandDeduplicationIT,CommandDeduplicationParallelIT", ], ) @@ -457,7 +457,7 @@ conformance_test( "--include=" + "AppendOnlyCompletionDeduplicationInfoITCommandService" + ",AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService" + - ",AppendOnlyKVCommandDeduplicationIT", + ",AppendOnlyKVCommandDeduplicationIT,CommandDeduplicationParallelIT", ], ) diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index c5dd59debc65..a9667a4ea4ce 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -351,7 +351,7 @@ server_conformance_test( "--include=" + "AppendOnlyCompletionDeduplicationInfoITCommandService" + ",AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService" + - ",AppendOnlyKVCommandDeduplicationIT", + ",AppendOnlyKVCommandDeduplicationIT,CommandDeduplicationParallelIT", ], ) From 40a72be3006c5bd4e86d566c55a4c00416bbea4e Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 15:20:51 +0200 Subject: [PATCH 08/14] Run parallel command deduplication tests for append only ledgers --- ledger/daml-on-sql/BUILD.bazel | 1 + ledger/ledger-on-memory/BUILD.bazel | 1 + ledger/ledger-on-sql/BUILD.bazel | 5 +++++ ledger/sandbox-classic/BUILD.bazel | 2 ++ ledger/sandbox/BUILD.bazel | 1 + 5 files changed, 10 insertions(+) diff --git a/ledger/daml-on-sql/BUILD.bazel b/ledger/daml-on-sql/BUILD.bazel index 362189237288..64d4e590b838 100644 --- a/ledger/daml-on-sql/BUILD.bazel +++ b/ledger/daml-on-sql/BUILD.bazel @@ -131,6 +131,7 @@ conformance_test( "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=CommandDeduplicationParallelIT", ], ) diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index 3f8e97a7e6ec..e11ff2edd263 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -221,6 +221,7 @@ conformance_test( # The following two tests don't actually care about multi-participant but they do need append-only. "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ConfigManagementServiceIT", ], diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index f59d147fbbb8..f24289fe93cc 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -322,6 +322,7 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -348,6 +349,7 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -373,6 +375,7 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -420,6 +423,7 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -446,6 +450,7 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", diff --git a/ledger/sandbox-classic/BUILD.bazel b/ledger/sandbox-classic/BUILD.bazel index 4c2f0f40fe22..b099afdb4c17 100644 --- a/ledger/sandbox-classic/BUILD.bazel +++ b/ledger/sandbox-classic/BUILD.bazel @@ -366,6 +366,7 @@ server_conformance_test( "--open-world", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=CommandDeduplicationParallelIT", "--exclude=ClosedWorldIT", ], ) @@ -396,6 +397,7 @@ server_conformance_test( "--open-world", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=CommandDeduplicationParallelIT", "--additional=ParticipantPruningIT", "--exclude=ClosedWorldIT", # Excluding tests that require using pruneAllDivulgedContracts option that is not supported by sandbox-classic diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index 50142c617abe..938a5f59025e 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -340,6 +340,7 @@ server_conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=AppendOnlyKVCommandDeduplicationIT", + "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ClosedWorldIT", ], From 6941c0ef70abd3ff2cf84d6efeef8207e477ffdd Mon Sep 17 00:00:00 2001 From: nicu-da Date: Tue, 14 Sep 2021 07:51:14 -0700 Subject: [PATCH 09/14] Apply suggestions from code review Co-authored-by: fabiotudone-da --- .../api/testtool/suites/CommandDeduplicationParallelIT.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala index 8ffc9ac3b5fb..d7b6991413b4 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala @@ -27,7 +27,7 @@ import scala.collection.compat._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -/** Should be enabled for ledgers that fill the submission id in the completions, +/** Should be enabled for ledgers that fill the submission ID in the completions, * as we need to use the submission id to find completions for parallel submissions */ class CommandDeduplicationParallelIT extends LedgerTestSuite { @@ -56,7 +56,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { val expectedDuplicateResponses = numberOfParallelRequests - 1 val okResponses = responses.getOrElse(Code.OK, 0) val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) - // Because of the way participant deduplication currently works, requests can fail with duplicate keys for command deduplication + // Participant-based command de-duplication can currently also reject duplicates via a SQL exception val internalResponses = responses.getOrElse(Code.INTERNAL, 0) // Canton can return ABORTED for duplicate submissions val abortedResponses = responses.getOrElse(Code.ABORTED, 0) From 64ab5e444e7cded69f5c1e4b4c6fe98a433ee0a1 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 16:59:37 +0200 Subject: [PATCH 10/14] Code review renames --- ledger/daml-on-sql/BUILD.bazel | 2 +- ... => AppendOnlyCommandDeduplicationParallelIT.scala} | 9 +++------ .../com/daml/ledger/api/testtool/tests/Tests.scala | 2 +- ledger/ledger-on-memory/BUILD.bazel | 2 +- ledger/ledger-on-sql/BUILD.bazel | 10 +++++----- ledger/sandbox-classic/BUILD.bazel | 4 ++-- ledger/sandbox/BUILD.bazel | 2 +- 7 files changed, 14 insertions(+), 17 deletions(-) rename ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/{CommandDeduplicationParallelIT.scala => AppendOnlyCommandDeduplicationParallelIT.scala} (93%) diff --git a/ledger/daml-on-sql/BUILD.bazel b/ledger/daml-on-sql/BUILD.bazel index 64d4e590b838..afd9f612dfb3 100644 --- a/ledger/daml-on-sql/BUILD.bazel +++ b/ledger/daml-on-sql/BUILD.bazel @@ -129,9 +129,9 @@ conformance_test( test_tool_args = [ "--verbose", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", - "--additional=CommandDeduplicationParallelIT", ], ) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala similarity index 93% rename from ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala rename to ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala index d7b6991413b4..76bdd302f291 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala @@ -23,14 +23,13 @@ import com.daml.ledger.test.model.Test.DummyWithAnnotation import io.grpc.Status import io.grpc.Status.Code -import scala.collection.compat._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} /** Should be enabled for ledgers that fill the submission ID in the completions, * as we need to use the submission id to find completions for parallel submissions */ -class CommandDeduplicationParallelIT extends LedgerTestSuite { +class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { test( s"DeduplicateParallelSubmissions", @@ -39,6 +38,7 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { runConcurrently = false, )(implicit ec => { case Participants(Participant(ledger, party)) => val deduplicationDuration = 3.seconds + val numberOfParallelRequests = 10 lazy val request = ledger .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) .update( @@ -46,12 +46,11 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { deduplicationDuration.asProtobuf ) ) - val numberOfParallelRequests = 10 Future .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { submitRequestAndGetStatusCode(ledger)(request, party) }) - .map(_.groupMapReduce(identity)(_ => 1)(_ + _)) + .map(_.groupBy(identity).view.mapValues(_.size).toMap) .map(responses => { val expectedDuplicateResponses = numberOfParallelRequests - 1 val okResponses = responses.getOrElse(Code.OK, 0) @@ -79,13 +78,11 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite { .flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId)) .map { case Some(completion) => - println(s"Found completions $completion") completion.getStatus.code case None => fail(s"Did not find completion for request with submission id $submissionId") } .recover { case GrpcException(status, _) => - println(s"Exception status $status") status.getCode.value() case otherException => fail("Not a GRPC exception", otherException) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala index 2a61f8bf7031..c46a4b79ac5b 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/tests/Tests.scala @@ -69,8 +69,8 @@ object Tests { new AppendOnlyCompletionDeduplicationInfoIT(CommandService), new AppendOnlyCompletionDeduplicationInfoIT(CommandSubmissionService), new AppendOnlyKVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity), + new AppendOnlyCommandDeduplicationParallelIT, new ContractIdIT, - new CommandDeduplicationParallelIT, new KVCommandDeduplicationIT(timeoutScaleFactor, ledgerClockGranularity), new MultiPartySubmissionIT, new ParticipantPruningIT, diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index e11ff2edd263..75b47d3c5389 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -219,9 +219,9 @@ conformance_test( "--additional=ParticipantPruningIT", "--additional=AppendOnlyKVCommandDeduplicationIT", # The following two tests don't actually care about multi-participant but they do need append-only. + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ConfigManagementServiceIT", ], diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index f24289fe93cc..bdb6c1fb957a 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -321,8 +321,8 @@ conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -348,8 +348,8 @@ conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -374,8 +374,8 @@ conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -422,8 +422,8 @@ conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -449,8 +449,8 @@ conformance_test( "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", diff --git a/ledger/sandbox-classic/BUILD.bazel b/ledger/sandbox-classic/BUILD.bazel index b099afdb4c17..19f95e478049 100644 --- a/ledger/sandbox-classic/BUILD.bazel +++ b/ledger/sandbox-classic/BUILD.bazel @@ -364,9 +364,9 @@ server_conformance_test( servers = ONLY_POSTGRES_SERVER, test_tool_args = [ "--open-world", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", - "--additional=CommandDeduplicationParallelIT", "--exclude=ClosedWorldIT", ], ) @@ -395,9 +395,9 @@ server_conformance_test( servers = ONLY_POSTGRES_SERVER, test_tool_args = [ "--open-world", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", - "--additional=CommandDeduplicationParallelIT", "--additional=ParticipantPruningIT", "--exclude=ClosedWorldIT", # Excluding tests that require using pruneAllDivulgedContracts option that is not supported by sandbox-classic diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index 938a5f59025e..b7ea56da4a7b 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -337,10 +337,10 @@ server_conformance_test( servers = {"postgresql": NEXT_SERVERS["postgresql"]}, test_tool_args = [ "--open-world", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", "--additional=AppendOnlyKVCommandDeduplicationIT", - "--additional=CommandDeduplicationParallelIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant "--exclude=ClosedWorldIT", ], From 7b3fd9fa07e24220ce7f48c3c756ff1c90bc7263 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 14 Sep 2021 17:04:56 +0200 Subject: [PATCH 11/14] Add conformance test for command deduplication using the CommandService CHANGELOG_BEGIN CHANGELOG_END --- ...ndOnlyCommandDeduplicationParallelIT.scala | 60 ++++++++++++++++--- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala index 76bdd302f291..8aa1f6e89a52 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala @@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext +import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest import com.daml.ledger.api.v1.command_submission_service.SubmitRequest import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.client.binding.Primitive import com.daml.ledger.client.binding.Primitive.Party -import com.daml.ledger.test.model.Test.DummyWithAnnotation +import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation} import io.grpc.Status import io.grpc.Status.Code @@ -32,13 +34,12 @@ import scala.concurrent.{ExecutionContext, Future} class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { test( - s"DeduplicateParallelSubmissions", - "Commands submitted at the same, in parallel, should be deduplicated", + s"DeduplicateParallelSubmissionsUsingCommandSubmissionService", + "Commands submitted at the same, in parallel, should be deduplicated", allocate(SingleParty), runConcurrently = false, )(implicit ec => { case Participants(Participant(ledger, party)) => val deduplicationDuration = 3.seconds - val numberOfParallelRequests = 10 lazy val request = ledger .submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command) .update( @@ -46,9 +47,37 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { deduplicationDuration.asProtobuf ) ) + runTestWithSubmission( + request, + submitRequestAndGetStatusCode(ledger)(_, party), + ) + }) + + test( + s"DeduplicateParallelSubmissionsUsingCommandService", + "Commands submitted at the same, in parallel, should be deduplicated", + allocate(SingleParty), + runConcurrently = false, + )(implicit ec => { case Participants(Participant(ledger, party)) => + val deduplicationDuration = 3.seconds + val request = ledger + .submitAndWaitRequest(party, Dummy(party).create.command) + .update( + _.commands.deduplicationDuration := deduplicationDuration.asProtobuf + ) + runTestWithSubmission( + request, + submitAndWaitRequestAndGetStatusCode(ledger)(_, party), + ) + }) + + private def runTestWithSubmission[T](request: T, submitRequestAndGetStatus: T => Future[Code])( + implicit ec: ExecutionContext + ) = { + val numberOfParallelRequests = 10 Future .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { - submitRequestAndGetStatusCode(ledger)(request, party) + submitRequestAndGetStatus(request) }) .map(_.groupBy(identity).view.mapValues(_.size).toMap) .map(responses => { @@ -66,15 +95,32 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", ) }) - }) + } + private def submitAndWaitRequestAndGetStatusCode( + ledger: ParticipantTestContext + )(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = { + val submissionId = UUID.randomUUID().toString + val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId) + val submitResult = ledger.submitAndWait(requestWithSubmissionId) + submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*) + } protected def submitRequestAndGetStatusCode( ledger: ParticipantTestContext )(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = { val submissionId = UUID.randomUUID().toString val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId) - ledger + val submitResult = ledger .submit(requestWithSubmissionId) + submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*) + } + + private def submissionResultToFinalStatusCode( + ledger: ParticipantTestContext + )(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit + ec: ExecutionContext + ) = { + submitResult .flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId)) .map { case Some(completion) => From b70190cfff27bc09e7270dd2571fc365f4fadf71 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Wed, 15 Sep 2021 14:39:16 +0200 Subject: [PATCH 12/14] Add type annotation --- .../suites/AppendOnlyCommandDeduplicationParallelIT.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala index 70284f9b0915..bd54173dafc0 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala @@ -47,7 +47,7 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { deduplicationDuration.asProtobuf ) ) - runTestWithSubmission( + runTestWithSubmission[SubmitRequest]( request, submitRequestAndGetStatusCode(ledger)(_, party), ) @@ -65,7 +65,7 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { .update( _.commands.deduplicationDuration := deduplicationDuration.asProtobuf ) - runTestWithSubmission( + runTestWithSubmission[SubmitAndWaitRequest]( request, submitAndWaitRequestAndGetStatusCode(ledger)(_, party), ) From 0e1eb3a67f5c48482e27d075ba3ca4bb6e70507a Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Thu, 16 Sep 2021 11:37:23 +0200 Subject: [PATCH 13/14] Add workaround for not receiving a response from the CommandService when using h2/oracle --- ...ndOnlyCommandDeduplicationParallelIT.scala | 57 ++++++++++++------- ledger/ledger-on-memory/BUILD.bazel | 1 + ledger/ledger-on-sql/BUILD.bazel | 23 ++++---- 3 files changed, 49 insertions(+), 32 deletions(-) diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala index bd54173dafc0..00ad8dd3c967 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/AppendOnlyCommandDeduplicationParallelIT.scala @@ -48,6 +48,8 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { ) ) runTestWithSubmission[SubmitRequest]( + ledger, + party, request, submitRequestAndGetStatusCode(ledger)(_, party), ) @@ -66,35 +68,46 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite { _.commands.deduplicationDuration := deduplicationDuration.asProtobuf ) runTestWithSubmission[SubmitAndWaitRequest]( + ledger, + party, request, submitAndWaitRequestAndGetStatusCode(ledger)(_, party), ) }) - private def runTestWithSubmission[T](request: T, submitRequestAndGetStatus: T => Future[Code])( - implicit ec: ExecutionContext + private def runTestWithSubmission[T]( + ledger: ParticipantTestContext, + party: Party, + request: T, + submitRequestAndGetStatus: T => Future[Code], + )(implicit + ec: ExecutionContext ) = { val numberOfParallelRequests = 10 - Future - .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { - submitRequestAndGetStatus(request) - }) - .map(_.groupBy(identity).view.mapValues(_.size).toMap) - .map(responses => { - val expectedDuplicateResponses = numberOfParallelRequests - 1 - val okResponses = responses.getOrElse(Code.OK, 0) - val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) - // Participant-based command de-duplication can currently also reject duplicates via a SQL exception - val internalResponses = responses.getOrElse(Code.INTERNAL, 0) - // Canton can return ABORTED for duplicate submissions - val abortedResponses = responses.getOrElse(Code.ABORTED, 0) - val duplicateResponses = - alreadyExistsResponses + internalResponses + abortedResponses - assert( - okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1, - s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", - ) - }) + for { + responses <- Future + .traverse(Seq.fill(numberOfParallelRequests)(request))(request => { + submitRequestAndGetStatus(request) + }) + .map(_.groupBy(identity).view.mapValues(_.size).toMap) + activeContracts <- ledger.activeContracts(party) + } yield { + val expectedDuplicateResponses = numberOfParallelRequests - 1 + val okResponses = responses.getOrElse(Code.OK, 0) + val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0) + // Participant-based command de-duplication can currently also reject duplicates via a SQL exception when using the CommandSubmissionService + val internalResponses = responses.getOrElse(Code.INTERNAL, 0) + // Canton can return ABORTED for duplicate submissions + // Participant based command de-duplication can currently also return ABORTED when using the CommandService + val abortedResponses = responses.getOrElse(Code.ABORTED, 0) + val duplicateResponses = + alreadyExistsResponses + internalResponses + abortedResponses + assert( + okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1, + s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses", + ) + assert(activeContracts.size == 1) + } } private def submitAndWaitRequestAndGetStatusCode( diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index 75b47d3c5389..7aa4ac080273 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -213,6 +213,7 @@ conformance_test( "--participant=participant-id=example1,port=6865", "--participant=participant-id=example2,port=6866", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], test_tool_args = [ "--verbose", diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index bdb6c1fb957a..c3c2b79fffc5 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -317,12 +317,12 @@ conformance_test( tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -340,16 +340,17 @@ conformance_test( "--mutable-contract-state-cache", "--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -366,16 +367,17 @@ conformance_test( "--index-append-only-schema", "--mutable-contract-state-cache", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -418,12 +420,12 @@ conformance_test( tags = [], test_tool_args = [ "--verbose", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", @@ -441,16 +443,17 @@ conformance_test( "--mutable-contract-state-cache", "--buffered-ledger-api-streams-unsafe", "--max-deduplication-duration=PT5S", + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [ "--verbose", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandService", + "--additional=AppendOnlyCommandDeduplicationParallelIT", "--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService", + "--additional=AppendOnlyKVCommandDeduplicationIT", "--additional=ParticipantPruningIT", "--additional=MultiPartySubmissionIT", - "--additional=AppendOnlyCommandDeduplicationParallelIT", - "--additional=AppendOnlyKVCommandDeduplicationIT", "--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant # Disable tests targeting only multi-participant setups "--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence", From d9178ad94d28887dab0856d3f0d6bfe664bff267 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Thu, 16 Sep 2021 11:49:37 +0200 Subject: [PATCH 14/14] Format build file --- ledger/ledger-on-memory/BUILD.bazel | 2 +- ledger/ledger-on-sql/BUILD.bazel | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index 7aa4ac080273..9f19e65dedab 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -213,7 +213,7 @@ conformance_test( "--participant=participant-id=example1,port=6865", "--participant=participant-id=example2,port=6866", "--max-deduplication-duration=PT5S", - "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], test_tool_args = [ "--verbose", diff --git a/ledger/ledger-on-sql/BUILD.bazel b/ledger/ledger-on-sql/BUILD.bazel index c3c2b79fffc5..9867ee8c13fc 100644 --- a/ledger/ledger-on-sql/BUILD.bazel +++ b/ledger/ledger-on-sql/BUILD.bazel @@ -340,7 +340,7 @@ conformance_test( "--mutable-contract-state-cache", "--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test", "--max-deduplication-duration=PT5S", - "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [], test_tool_args = [ @@ -367,7 +367,7 @@ conformance_test( "--index-append-only-schema", "--mutable-contract-state-cache", "--max-deduplication-duration=PT5S", - "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [ @@ -443,7 +443,7 @@ conformance_test( "--mutable-contract-state-cache", "--buffered-ledger-api-streams-unsafe", "--max-deduplication-duration=PT5S", - "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 + "--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609 ], tags = [] if oracle_testing else ["manual"], test_tool_args = [