Skip to content

Commit

Permalink
Extract common code for command dedup conformance tests [KVL-1090] (#…
Browse files Browse the repository at this point in the history
…11092)

* Extract common code for command dedup conformance tests

CHANGELOG_BEGIN

CHANGELOG_END
  • Loading branch information
nicu-da authored Sep 30, 2021
1 parent 7a4963b commit 517e866
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.deduplication.CommandDeduplicationBase.DeduplicationFeatures
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.completion.Completion
Expand All @@ -19,7 +20,6 @@ import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.DA.Types.Tuple2
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, TextKeyOperations}
import com.daml.timer.Delayed
import io.grpc.Status
import io.grpc.Status.Code

import scala.annotation.nowarn
Expand Down Expand Up @@ -55,55 +55,44 @@ private[testtool] abstract class CommandDeduplicationBase(
runConcurrently = false,
)(implicit ec =>
configuredParticipants => { case Participants(Participant(ledger, party)) =>
lazy val requestA1 = ledger
.submitRequest(party, DummyWithAnnotation(party, "First submission").create.command)
val request = ledger
.submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod.DeduplicationTime(
deduplicationDuration.asProtobuf
)
)
lazy val requestA2 = ledger
.submitRequest(party, DummyWithAnnotation(party, "Second submission").create.command)
.update(
_.commands.deduplicationPeriod := DeduplicationPeriod
.DeduplicationDuration(
deduplicationDuration.asProtobuf
), //same semantics as `DeduplicationTime`
_.commands.commandId := requestA1.commands.get.commandId,
)
runGivenDeduplicationWait(configuredParticipants) { deduplicationWait =>
for {
// Submit command A (first deduplication window)
// Submit command (first deduplication window)
// Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server,
// only one submission is therefore sent to the ledger.
completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA1, party)
_ <- submitRequestAndAssertDeduplication(ledger)(requestA1, party)
completion1 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
_ <- submitRequestAndAssertDeduplication(ledger)(request, party)
// Wait until the end of first deduplication window
_ <- Delayed.by(deduplicationWait)(())

// Submit command A (second deduplication window)
// Submit command (second deduplication window)
// Note: the deduplication window is guaranteed to have passed on both
// the ledger API server and the ledger itself, since the test waited more than
// `deduplicationSeconds` after receiving the first command *completion*.
// The first submit() in this block should therefore lead to an accepted transaction.
completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(requestA2, party)
_ <- submitRequestAndAssertDeduplication(ledger)(requestA2, party)
completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
_ <- submitRequestAndAssertDeduplication(ledger)(request, party)
// Inspect created contracts
activeContracts <- ledger.activeContracts(party)
_ <- assertPartyHasActiveContracts(ledger)(
party = party,
noOfActiveContracts = 2,
)
} yield {
assert(
completion1.commandId == requestA1.commands.get.commandId,
completion1.commandId == request.commands.get.commandId,
"The command ID of the first completion does not match the command ID of the submission",
)
assert(
completion2.commandId == requestA2.commands.get.commandId,
completion2.commandId == request.commands.get.commandId,
"The command ID of the second completion does not match the command ID of the submission",
)

assert(
activeContracts.size == 2,
s"There should be 2 active contracts, but received $activeContracts",
)
}
}
}
Expand Down Expand Up @@ -175,51 +164,30 @@ private[testtool] abstract class CommandDeduplicationBase(
runConcurrently = false,
)(implicit ec =>
configuredParticipants => { case Participants(Participant(ledger, party)) =>
val requestA = ledger
val request = ledger
.submitAndWaitRequest(party, Dummy(party).create.command)
.update(
_.commands.deduplicationTime := deduplicationDuration.asProtobuf
)
runGivenDeduplicationWait(configuredParticipants) { deduplicationWait =>
for {
// Submit command A (first deduplication window)
_ <- ledger.submitAndWait(requestA)
failure1 <- ledger
.submitAndWait(requestA)
.mustFail("submitting a request for the second time, in the first deduplication window")
// Submit command (first deduplication window)
_ <- ledger.submitAndWait(request)
_ <- submitAndWaitRequestAndAssertDeduplication(ledger)(request)

// Wait until the end of first deduplication window
_ <- Delayed.by(deduplicationWait)(())

// Submit command A (second deduplication window)
_ <- ledger.submitAndWait(requestA)
failure2 <- ledger
.submitAndWait(requestA)
.mustFail(
"submitting a request for the second time, in the second deduplication window"
)
// Submit command (second deduplication window)
_ <- ledger.submitAndWait(request)
_ <- submitAndWaitRequestAndAssertDeduplication(ledger)(request)

// Inspect created contracts
activeContracts <- ledger.activeContracts(party)
} yield {
assertGrpcError(
failure1,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
_ <- assertPartyHasActiveContracts(ledger)(
party = party,
noOfActiveContracts = 2,
)
assertGrpcError(
failure2,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
)

assert(
activeContracts.size == 2,
s"There should be 2 active contracts, but received $activeContracts",
)
}
} yield {}
}
}
)
Expand All @@ -236,46 +204,21 @@ private[testtool] abstract class CommandDeduplicationBase(

for {
// Submit a command as alice
_ <- ledger.submit(aliceRequest)
failure1 <- ledger
.submit(aliceRequest)
.mustFail("submitting a request as Alice for the second time")
_ <- submitRequestAndAssertCompletionAccepted(ledger)(aliceRequest, alice)
_ <- submitRequestAndAssertDeduplication(ledger)(aliceRequest)

// Submit another command that uses same commandId, but is submitted by Bob
_ <- ledger.submit(bobRequest)
failure2 <- ledger
.submit(bobRequest)
.mustFail("submitting the same request as Bob, for the second time")

// Wait for command completions and inspect the ledger state
_ <- ledger.firstCompletions(alice)
_ <- ledger.firstCompletions(bob)
aliceContracts <- ledger.activeContracts(alice)
bobContracts <- ledger.activeContracts(bob)
} yield {
assertGrpcError(
failure1,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
_ <- submitRequestAndAssertCompletionAccepted(ledger)(bobRequest, bob)
_ <- submitRequestAndAssertDeduplication(ledger)(bobRequest)
_ <- assertPartyHasActiveContracts(ledger)(
party = alice,
noOfActiveContracts = 1,
)
assertGrpcError(
failure2,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
_ <- assertPartyHasActiveContracts(ledger)(
party = bob,
noOfActiveContracts = 1,
)

assert(
aliceContracts.length == 1,
s"Only one contract was expected to be seen by $alice but ${aliceContracts.length} appeared",
)

assert(
bobContracts.length == 1,
s"Only one contract was expected to be seen by $bob but ${bobContracts.length} appeared",
)
}
} yield {}
})

test(
Expand All @@ -291,46 +234,36 @@ private[testtool] abstract class CommandDeduplicationBase(
for {
// Submit a command as alice
_ <- ledger.submitAndWait(aliceRequest)
failure1 <- ledger
.submitAndWait(aliceRequest)
.mustFail("submitting a request as Alice for the second time")
_ <- submitAndWaitRequestAndAssertDeduplication(ledger)(aliceRequest)

// Submit another command that uses same commandId, but is submitted by Bob
_ <- ledger.submitAndWait(bobRequest)
failure2 <- ledger
.submitAndWait(bobRequest)
.mustFail("submitting the same request as Bob, for the second time")

_ <- submitAndWaitRequestAndAssertDeduplication(ledger)(bobRequest)
// Inspect the ledger state
aliceContracts <- ledger.activeContracts(alice)
bobContracts <- ledger.activeContracts(bob)
} yield {
assertGrpcError(
failure1,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
_ <- assertPartyHasActiveContracts(ledger)(
party = alice,
noOfActiveContracts = 1,
)
assertGrpcError(
failure2,
Status.Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
)

assert(
aliceContracts.length == 1,
s"Only one contract was expected to be seen by $alice but ${aliceContracts.length} appeared",
)

assert(
bobContracts.length == 1,
s"Only one contract was expected to be seen by $bob but ${bobContracts.length} appeared",
_ <- assertPartyHasActiveContracts(ledger)(
party = bob,
noOfActiveContracts = 1,
)
}
} yield {}
})

def submitRequestAndAssertCompletionAccepted(
protected def assertPartyHasActiveContracts(
ledger: ParticipantTestContext
)(party: Party, noOfActiveContracts: Int)(implicit ec: ExecutionContext): Future[Unit] = {
ledger
.activeContracts(party)
.map(contracts =>
assert(
contracts.length == noOfActiveContracts,
s"Expected $noOfActiveContracts active contracts for $party but found ${contracts.length} active contracts",
)
)
}
protected def submitRequestAndAssertCompletionAccepted(
ledger: ParticipantTestContext
)(request: SubmitRequest, parties: Party*)(implicit ec: ExecutionContext): Future[Completion] = {
submitRequestAndAssertCompletionStatus(ledger)(request, Code.OK, parties: _*)
Expand Down Expand Up @@ -369,6 +302,22 @@ private[testtool] abstract class CommandDeduplicationBase(
)
)

protected def submitAndWaitRequestAndAssertDeduplication(ledger: ParticipantTestContext)(
request: SubmitAndWaitRequest
)(implicit ec: ExecutionContext): Future[Unit] = {
ledger
.submitAndWait(request)
.mustFail("Request was accepted but we were expecting it to fail with a duplicate error")
.map(
assertGrpcError(
_,
expectedCode = Code.ALREADY_EXISTS,
exceptionMessageSubstring = None,
checkDefiniteAnswerMetadata = true,
)
)
}

protected def submitRequestAndAssertAsyncDeduplication(ledger: ParticipantTestContext)(
request: SubmitRequest,
parties: Party*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ abstract class KVCommandDeduplicationBase(
// Deduplication has finished
completion2 <- submitRequestAndAssertCompletionAccepted(ledger)(request, party)
// Inspect created contracts
activeContracts <- ledger.activeContracts(party)
_ <- assertPartyHasActiveContracts(ledger)(
party = party,
noOfActiveContracts = 2,
)
} yield {
assert(
completion1.commandId == request.commands.get.commandId,
Expand Down Expand Up @@ -95,10 +98,6 @@ abstract class KVCommandDeduplicationBase(
s"Second completion deduplication period [${completion1.deduplicationPeriod}] is not the max deduplication",
)
}
assert(
activeContracts.size == 2,
s"There should be 2 active contracts, but received $activeContracts",
)
}
}
}
Expand Down

0 comments on commit 517e866

Please sign in to comment.