Skip to content

Commit

Permalink
Add conformance test for command deduplication using the CommandServi…
Browse files Browse the repository at this point in the history
…ce [KVL-1099] (#10883)

* Add conformance test for command deduplication using the CommandService

CHANGELOG_BEGIN

CHANGELOG_END
  • Loading branch information
nicu-da authored Sep 16, 2021
1 parent 8a39118 commit f4adee9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation}
import io.grpc.Status
import io.grpc.Status.Code

Expand All @@ -33,48 +35,105 @@ import scala.concurrent.{ExecutionContext, Future}
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {

test(
s"DeduplicateParallelSubmissions",
"Commands submitted at the same, in parallel, should be deduplicated",
s"DeduplicateParallelSubmissionsUsingCommandSubmissionService",
"Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val numberOfParallelRequests = 10
lazy val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatusCode(ledger)(request, party)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
.map(responses => {
val expectedDuplicateResponses = numberOfParallelRequests - 1
val okResponses = responses.getOrElse(Code.OK, 0)
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
// Canton can return ABORTED for duplicate submissions
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
alreadyExistsResponses + internalResponses + abortedResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
})
runTestWithSubmission[SubmitRequest](
ledger,
party,
request,
submitRequestAndGetStatusCode(ledger)(_, party),
)
})

test(
s"DeduplicateParallelSubmissionsUsingCommandService",
"Commands submitted at the same, in parallel, using the CommandService, should be deduplicated",
allocate(SingleParty),
runConcurrently = false,
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val request = ledger
.submitAndWaitRequest(party, Dummy(party).create.command)
.update(
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
)
runTestWithSubmission[SubmitAndWaitRequest](
ledger,
party,
request,
submitAndWaitRequestAndGetStatusCode(ledger)(_, party),
)
})

private def runTestWithSubmission[T](
ledger: ParticipantTestContext,
party: Party,
request: T,
submitRequestAndGetStatus: T => Future[Code],
)(implicit
ec: ExecutionContext
) = {
val numberOfParallelRequests = 10
for {
responses <- Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatus(request)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
activeContracts <- ledger.activeContracts(party)
} yield {
val expectedDuplicateResponses = numberOfParallelRequests - 1
val okResponses = responses.getOrElse(Code.OK, 0)
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception when using the CommandSubmissionService
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
// Canton can return ABORTED for duplicate submissions
// Participant based command de-duplication can currently also return ABORTED when using the CommandService
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
alreadyExistsResponses + internalResponses + abortedResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
assert(activeContracts.size == 1)
}
}

private def submitAndWaitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
val submitResult = ledger.submitAndWait(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}
protected def submitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
ledger
val submitResult = ledger
.submit(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}

private def submissionResultToFinalStatusCode(
ledger: ParticipantTestContext
)(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit
ec: ExecutionContext
) = {
submitResult
.flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId))
.map {
case Some(completion) =>
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ conformance_test(
"--participant=participant-id=example1,port=6865",
"--participant=participant-id=example2,port=6866",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
test_tool_args = [
"--verbose",
Expand Down
23 changes: 13 additions & 10 deletions ledger/ledger-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ conformance_test(
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -340,16 +340,17 @@ conformance_test(
"--mutable-contract-state-cache",
"--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -366,16 +367,17 @@ conformance_test(
"--index-append-only-schema",
"--mutable-contract-state-cache",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [] if oracle_testing else ["manual"],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand Down Expand Up @@ -418,12 +420,12 @@ conformance_test(
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -441,16 +443,17 @@ conformance_test(
"--mutable-contract-state-cache",
"--buffered-ledger-api-streams-unsafe",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [] if oracle_testing else ["manual"],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand Down

0 comments on commit f4adee9

Please sign in to comment.