Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test case for parallel command deduplication using mixed clients [KVL-1090] #11093

Merged
merged 3 commits into from
Sep 30, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,37 @@ import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestCo
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client
import com.daml.ledger.client.binding
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation}
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import io.grpc.Status
import io.grpc.Status.Code

import scala.collection.compat._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Random, Success}
import scala.util.control.NonFatal

/** Should be enabled for ledgers that fill the submission ID in the completions,
* as we need to use the submission id to find completions for parallel submissions
*/
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {

private val deduplicationDuration = 3.seconds

test(
s"DeduplicateParallelSubmissionsUsingCommandSubmissionService",
"Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
lazy val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
lazy val request = submitRequest(ledger, party)
runTestWithSubmission[SubmitRequest](
ledger,
party,
request,
submitRequestAndGetStatusCode(ledger)(_, party),
() => submitRequestAndGetStatusCode(ledger)(request, party),
)
})

Expand All @@ -63,33 +59,44 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
allocate(SingleParty),
runConcurrently = false,
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val request = ledger
.submitAndWaitRequest(party, Dummy(party).create.command)
.update(
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
)
val request = submitAndWaitRequest(ledger, party)
runTestWithSubmission[SubmitAndWaitRequest](
ledger,
party,
() => submitAndWaitRequestAndGetStatusCode(ledger)(request, party),
)
})

test(
s"DeduplicateParallelSubmissionsUsingMixedCommandServiceAndCommandSubmissionService",
"Commands submitted at the same, in parallel, using the CommandService the CommandSubmissionService, should be deduplicated",
allocate(SingleParty),
runConcurrently = false,
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val submitAndWaitRequest = submitAndWaitRequest(ledger, party)
val submitRequest = submitRequest(ledger, party)
runTestWithSubmission[SubmitAndWaitRequest](
ledger,
party,
request,
submitAndWaitRequestAndGetStatusCode(ledger)(_, party),
() =>
if (Random.nextBoolean())
submitAndWaitRequestAndGetStatusCode(ledger)(submitAndWaitRequest, party)
else submitRequestAndGetStatusCode(ledger)(submitRequest, party),
)
})

private def runTestWithSubmission[T](
ledger: ParticipantTestContext,
party: Party,
request: T,
submitRequestAndGetStatus: T => Future[Code],
submitRequestAndGetStatus: () => Future[Code],
)(implicit
ec: ExecutionContext
) = {
val numberOfParallelRequests = 10
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
for {
responses <- Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatus(request)
.traverse(Seq.fill(numberOfParallelRequests)(()))(_ => {
submitRequestAndGetStatus()
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
activeContracts <- ledger.activeContracts(party)
Expand All @@ -109,6 +116,36 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
}
}

private def submitRequest(
ledger: ParticipantTestContext,
party: client.binding.Primitive.Party,
) = {
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
ledger
.submitRequest(
party,
DummyWithAnnotation(party, "Duplicate Using CommandSubmissionService").create.command,
)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
}

private def submitAndWaitRequest(
ledger: ParticipantTestContext,
party: binding.Primitive.Party,
) = {
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
ledger
.submitAndWaitRequest(
party,
DummyWithAnnotation(party, "Duplicate using CommandService").create.command,
)
.update(
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
)
}

private def submitAndWaitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = {
Expand Down