Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conformance test for command deduplication using the CommandService [KVL-1099] #10883

Merged
merged 18 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions.fail
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Test.DummyWithAnnotation
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation}
import io.grpc.Status
import io.grpc.Status.Code

Expand All @@ -33,48 +35,105 @@ import scala.concurrent.{ExecutionContext, Future}
class AppendOnlyCommandDeduplicationParallelIT extends LedgerTestSuite {

test(
s"DeduplicateParallelSubmissions",
"Commands submitted at the same, in parallel, should be deduplicated",
s"DeduplicateParallelSubmissionsUsingCommandSubmissionService",
"Commands submitted at the same, in parallel, using the CommandSubmissionService, should be deduplicated",
allocate(SingleParty),
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val numberOfParallelRequests = 10
lazy val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationDuration(
deduplicationDuration.asProtobuf
)
)
Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatusCode(ledger)(request, party)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
.map(responses => {
val expectedDuplicateResponses = numberOfParallelRequests - 1
val okResponses = responses.getOrElse(Code.OK, 0)
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
// Canton can return ABORTED for duplicate submissions
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
alreadyExistsResponses + internalResponses + abortedResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
})
runTestWithSubmission[SubmitRequest](
ledger,
party,
request,
submitRequestAndGetStatusCode(ledger)(_, party),
)
})

test(
s"DeduplicateParallelSubmissionsUsingCommandService",
"Commands submitted at the same, in parallel, using the CommandService, should be deduplicated",
allocate(SingleParty),
runConcurrently = false,
)(implicit ec => { case Participants(Participant(ledger, party)) =>
val deduplicationDuration = 3.seconds
val request = ledger
.submitAndWaitRequest(party, Dummy(party).create.command)
.update(
_.commands.deduplicationDuration := deduplicationDuration.asProtobuf
)
runTestWithSubmission[SubmitAndWaitRequest](
ledger,
party,
request,
submitAndWaitRequestAndGetStatusCode(ledger)(_, party),
)
})

private def runTestWithSubmission[T](
ledger: ParticipantTestContext,
party: Party,
request: T,
submitRequestAndGetStatus: T => Future[Code],
)(implicit
ec: ExecutionContext
) = {
val numberOfParallelRequests = 10
for {
responses <- Future
.traverse(Seq.fill(numberOfParallelRequests)(request))(request => {
submitRequestAndGetStatus(request)
})
.map(_.groupBy(identity).view.mapValues(_.size).toMap)
activeContracts <- ledger.activeContracts(party)
} yield {
val expectedDuplicateResponses = numberOfParallelRequests - 1
val okResponses = responses.getOrElse(Code.OK, 0)
val alreadyExistsResponses = responses.getOrElse(Code.ALREADY_EXISTS, 0)
// Participant-based command de-duplication can currently also reject duplicates via a SQL exception when using the CommandSubmissionService
val internalResponses = responses.getOrElse(Code.INTERNAL, 0)
// Canton can return ABORTED for duplicate submissions
// Participant based command de-duplication can currently also return ABORTED when using the CommandService
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
alreadyExistsResponses + internalResponses + abortedResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
s"Expected $expectedDuplicateResponses duplicate responses and one accepted, got $responses",
)
assert(activeContracts.size == 1)
}
}

private def submitAndWaitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitAndWaitRequest, parties: Party*)(implicit ec: ExecutionContext) = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
val submitResult = ledger.submitAndWait(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}
protected def submitRequestAndGetStatusCode(
ledger: ParticipantTestContext
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Code] = {
val submissionId = UUID.randomUUID().toString
val requestWithSubmissionId = request.update(_.commands.submissionId := submissionId)
ledger
val submitResult = ledger
.submit(requestWithSubmissionId)
submissionResultToFinalStatusCode(ledger)(submitResult, submissionId, parties: _*)
}

private def submissionResultToFinalStatusCode(
ledger: ParticipantTestContext
)(submitResult: Future[Unit], submissionId: String, parties: Primitive.Party*)(implicit
ec: ExecutionContext
) = {
submitResult
.flatMap(_ => ledger.findCompletion(parties: _*)(_.submissionId == submissionId))
.map {
case Some(completion) =>
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ conformance_test(
"--participant=participant-id=example1,port=6865",
"--participant=participant-id=example2,port=6866",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
test_tool_args = [
"--verbose",
Expand Down
23 changes: 13 additions & 10 deletions ledger/ledger-on-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,12 @@ conformance_test(
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -340,16 +340,17 @@ conformance_test(
"--mutable-contract-state-cache",
"--jdbc-url=jdbc:h2:mem:daml-on-sql-conformance-test",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -366,16 +367,17 @@ conformance_test(
"--index-append-only-schema",
"--mutable-contract-state-cache",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [] if oracle_testing else ["manual"],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand Down Expand Up @@ -418,12 +420,12 @@ conformance_test(
tags = [],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand All @@ -441,16 +443,17 @@ conformance_test(
"--mutable-contract-state-cache",
"--buffered-ledger-api-streams-unsafe",
"--max-deduplication-duration=PT5S",
"--tracker-retention-period=PT5S", # lower the command timeout duration, this is a workaround DPP-609
],
tags = [] if oracle_testing else ["manual"],
test_tool_args = [
"--verbose",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandService",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyCompletionDeduplicationInfoITCommandSubmissionService",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--additional=ParticipantPruningIT",
"--additional=MultiPartySubmissionIT",
"--additional=AppendOnlyCommandDeduplicationParallelIT",
"--additional=AppendOnlyKVCommandDeduplicationIT",
"--exclude=CommandDeduplicationIT", # It's a KV ledger so it needs the KV variant
# Disable tests targeting only multi-participant setups
"--exclude=ParticipantPruningIT:PRImmediateAndRetroactiveDivulgence",
Expand Down