Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conformance test for command deduplication using the CommandService [KVL-1099] #10883

Merged
merged 18 commits into from
Sep 16, 2021
Merged
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation}
import io.grpc.Status
import io.grpc.Status.Code

Expand All @@ -33,22 +35,49 @@ import scala.concurrent.{ExecutionContext, Future}
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {

test(
s"DeduplicateParallelSubmissions",
"Commands submitted at the same, in parallel, should be deduplicated",
s"DeduplicateParallelSubmissionsUsingCommandSubmissionService",
"Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val numberOfParallelRequests = 10
lazy val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
runTestWithSubmission[SubmitRequest](
request,
submitRequestAndGetStatusCode(ledger)(_, party),
)
})

test(
s"DeduplicateParallelSubmissionsUsingCommandService",
"Commands submitted at the same, in parallel, using the CommandService, should be deduplicated",
allocate(SingleParty),
runConcurrently = false,
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val request = ledger
.submitAndWaitRequest(party, Dummy(party).create.command)
.update(
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
)
runTestWithSubmission[SubmitAndWaitRequest](
request,
submitAndWaitRequestAndGetStatusCode(ledger)(_, party),
)
})

private def runTestWithSubmission[T](request: T, submitRequestAndGetStatus: T => Future[Code])(
implicit ec: ExecutionContext
) = {
val numberOfParallelRequests = 10
Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatusCode(ledger)(request, party)
submitRequestAndGetStatus(request)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
.map(responses => {
Expand All @@ -66,15 +95,32 @@ class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
})
})
}

private def submitAndWaitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
val submitResult = ledger.submitAndWait(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}
protected def submitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
ledger
val submitResult = ledger
.submit(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}

private def submissionResultToFinalStatusCode(
ledger: ParticipantTestContext
)(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit
ec: ExecutionContext
) = {
submitResult
.flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId))
.map {
case Some(completion) =>
Expand Down