Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger-api-test-tool: Add assertions for deduplication period, use them in existing tests [KVL-1220] #12250

Merged
merged 10 commits into from
Jan 17, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ object Assertions {
)
}

def assertDefined[T](option: Option[T], errorMessage: String): T = {
assert(option.isDefined, errorMessage)
option.get
}

/** Allows for assertions with more information in the error messages. */
implicit def futureAssertions[T](future: Future[T]): FutureAssertions[T] =
new FutureAssertions[T](future)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.testtool.infrastructure.assertions

import java.time.Duration

import com.daml.api.util.DurationConversion
import com.daml.ledger.api.testtool.infrastructure.Assertions.{assertDefined, fail}
import com.daml.ledger.api.testtool.infrastructure.participant.{
CompletionResponse,
ParticipantTestContext,
}
import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationPeriodSupport.{
DurationSupport,
OffsetSupport,
}
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.platform.testing.WithTimeout
import com.google.protobuf.duration.{Duration => DurationProto}
import io.grpc.Status.Code

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.math.Ordering.Implicits._

object CommandDeduplicationAssertions {

def assertDeduplicationDuration(
requestedDeduplicationDuration: DurationProto,
completionResponse: CompletionResponse,
submittingParty: Party,
ledger: ParticipantTestContext,
)(implicit executionContext: ExecutionContext): Future[Unit] = {
val requestedDuration = DurationConversion.fromProto(requestedDeduplicationDuration)
ledger.features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.durationSupport match {
case DurationSupport.DURATION_NATIVE_SUPPORT =>
val reportedDurationProto = assertDefined(
completionResponse.completion.deduplicationPeriod.deduplicationDuration,
"No deduplication duration has been reported",
)
val reportedDuration = DurationConversion.fromProto(reportedDurationProto)
assert(
reportedDuration >= requestedDuration,
s"The reported deduplication duration $reportedDuration was smaller than the requested deduplication duration $requestedDuration.",
)
Future.unit
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
case DurationSupport.DURATION_CONVERT_TO_OFFSET =>
val reportedOffset = assertDefined(
completionResponse.completion.deduplicationPeriod.deduplicationOffset,
"No deduplication offset has been reported",
)
if (completionResponse.completion.getStatus.code == Code.ALREADY_EXISTS.value) {
val completionStreamRequest = ledger.completionStreamRequest()(submittingParty)
hubert-da marked this conversation as resolved.
Show resolved Hide resolved
WithTimeout(5.seconds)(
ledger
.findCompletion(completionStreamRequest)(c =>
c.commandId == c.commandId && c.getStatus.code == Code.OK.value
)
).map { optAcceptedCompletionResponse =>
andreaslochbihler-da marked this conversation as resolved.
Show resolved Hide resolved
val acceptedCompletionResponse = assertDefined(
optAcceptedCompletionResponse,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' has been found",
)
assert(
acceptedCompletionResponse.offset.getAbsolute >= reportedOffset,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' after the reported offset $reportedOffset has been found",
)
assert(
acceptedCompletionResponse.offset.getAbsolute < completionResponse.offset.getAbsolute,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' before the completion's offset ${completionResponse.offset} has been found",
)
}
} else {
fail("This case is not yet supported by this assertion")
}
case DurationSupport.Unrecognized(_) =>
fail("Unrecognized deduplication duration support")
}
}

def assertDeduplicationOffset(
requestedDeduplicationOffsetCompletionResponse: CompletionResponse,
completionResponse: CompletionResponse,
offsetSupport: OffsetSupport,
): Unit =
offsetSupport match {
case OffsetSupport.OFFSET_NATIVE_SUPPORT =>
val reportedOffset = assertDefined(
completionResponse.completion.deduplicationPeriod.deduplicationOffset,
"No deduplication offset has been reported",
)
val requestedDeduplicationOffset =
requestedDeduplicationOffsetCompletionResponse.offset.getAbsolute
assert(
reportedOffset <= requestedDeduplicationOffset,
s"The reported deduplication offset $reportedOffset was more recent than the requested deduplication offset $requestedDeduplicationOffset.",
)
case OffsetSupport.OFFSET_CONVERT_TO_DURATION =>
val reportedDurationProto = assertDefined(
completionResponse.completion.deduplicationPeriod.deduplicationDuration,
"No deduplication duration has been reported",
)
val reportedDuration = DurationConversion.fromProto(reportedDurationProto)
val durationBetweenDeduplicationOffsetAndCompletionRecordTimes = Duration
.between(
requestedDeduplicationOffsetCompletionResponse.recordTime,
completionResponse.recordTime,
)
assert(
reportedDuration >= durationBetweenDeduplicationOffsetAndCompletionRecordTimes,
s"The reported duration $reportedDuration was smaller than the duration between deduplication offset and completion record times ($durationBetweenDeduplicationOffsetAndCompletionRecordTimes).",
)
case OffsetSupport.Unrecognized(_) | OffsetSupport.OFFSET_NOT_SUPPORTED =>
fail("Deduplication offsets are not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions._
import com.daml.ledger.api.testtool.infrastructure.FutureAssertions._
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.assertions.CommandDeduplicationAssertions.{
assertDeduplicationDuration,
assertDeduplicationOffset,
}
import com.daml.ledger.api.testtool.infrastructure.participant.{
CompletionResponse,
Features,
Expand All @@ -28,7 +32,6 @@ import com.daml.ledger.api.v1.admin.config_management_service.TimeModel
import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.daml.ledger.api.v1.command_submission_service.SubmitRequest
import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod
import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.completion.Completion.{
DeduplicationPeriod => CompletionDeduplicationPeriod
}
Expand Down Expand Up @@ -69,20 +72,20 @@ final class CommandDeduplicationIT(
_.commands.deduplicationPeriod :=
DeduplicationPeriod.DeduplicationDuration(deduplicationDuration.asProtobuf)
)
val acceptedSubmissionId1 = newSubmissionId()
val firstAcceptedSubmissionId = newSubmissionId()
for {
// Submit command (first deduplication window)
// Note: the second submit() in this block is deduplicated and thus rejected by the ledger API server,
// only one submission is therefore sent to the ledger.
response <- submitRequestAndAssertCompletionAccepted(
ledger,
updateSubmissionId(request, acceptedSubmissionId1),
updateSubmissionId(request, firstAcceptedSubmissionId),
party,
)
_ <- submitRequestAndAssertDeduplication(
optCompletionResponse <- submitRequestAndAssertDeduplication(
ledger,
updateWithFreshSubmissionId(request),
acceptedSubmissionId1,
firstAcceptedSubmissionId,
response.offset,
party,
)
Expand All @@ -92,6 +95,16 @@ final class CommandDeduplicationIT(
party,
noOfActiveContracts = 1,
)
_ <-
if (!ledger.features.commandDeduplicationFeatures.deduplicationType.isSyncOnly) {
val completion = assertDefined(optCompletionResponse, "No completion has been produced")
assertDeduplicationDuration(
deduplicationDuration.asProtobuf,
completion,
party,
ledger,
)
} else Future.unit
} yield {
assert(
response.completion.commandId == request.commands.get.commandId,
Expand Down Expand Up @@ -283,7 +296,7 @@ final class CommandDeduplicationIT(
submitAndWait: Boolean,
acceptedSubmissionId: SubmissionId,
acceptedLedgerOffset: LedgerOffset,
): Future[Option[Completion]] =
): Future[Option[CompletionResponse]] =
if (submitAndWait)
submitAndWaitRequestAndAssertDeduplication(
ledger,
Expand All @@ -309,7 +322,7 @@ final class CommandDeduplicationIT(
firstAcceptedCommand.offset,
)
deduplicationDurationFromPeriod = duplicateResponse
.map(_.deduplicationPeriod)
.map(_.completion.deduplicationPeriod)
.map {
case CompletionDeduplicationPeriod.Empty =>
throw new IllegalStateException("received empty completion")
Expand Down Expand Up @@ -479,7 +492,7 @@ final class CommandDeduplicationIT(
// This is done so that we can validate that the third command is accepted
_ <- delayForOffsetIfRequired(ledger, delay, ledger.features)
// Submit command again using the first offset as the deduplication offset
_ <- submitRequestAndAssertAsyncDeduplication(
response2 <- submitRequestAndAssertAsyncDeduplication(
ledger,
updateWithFreshSubmissionId(
request.update(
Expand All @@ -498,7 +511,7 @@ final class CommandDeduplicationIT(
party,
)
// Submit command again using the rejection offset as a deduplication period
_ <- submitRequestAndAssertCompletionAccepted(
response4 <- submitRequestAndAssertCompletionAccepted(
ledger,
updateWithFreshSubmissionId(
request.update(
Expand All @@ -509,7 +522,18 @@ final class CommandDeduplicationIT(
),
party,
)
} yield {}
} yield {
assertDeduplicationOffset(
response,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: This test assumes inclusive deduplication offsets. Will need to change when we switch to exclusive dedup offsets.

response2,
ledger.features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport,
)
assertDeduplicationOffset(
response3,
response4,
ledger.features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport,
)
}
}
}
)
Expand Down Expand Up @@ -585,7 +609,7 @@ final class CommandDeduplicationIT(
parties: Party*
)(implicit
ec: ExecutionContext
): Future[Option[Completion]] =
): Future[Option[CompletionResponse]] =
if (ledger.features.commandDeduplicationFeatures.deduplicationType.isSyncOnly)
submitRequestAndAssertSyncDeduplication(ledger, request, acceptedSubmissionId, acceptedOffset)
.map(_ => None)
Expand All @@ -596,7 +620,7 @@ final class CommandDeduplicationIT(
acceptedSubmissionId,
acceptedOffset,
parties: _*
).map(response => Some(response.completion))
).map(response => Some(response))

protected def submitRequestAndAssertSyncDeduplication(
ledger: ParticipantTestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.api.testtool.suites

import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.testtool.infrastructure.Allocation._
import com.daml.ledger.api.testtool.infrastructure.Assertions.assertDefined
import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.ledger.api.testtool.suites.CompletionDeduplicationInfoIT._
Expand Down Expand Up @@ -100,7 +101,7 @@ private[testtool] object CompletionDeduplicationInfoIT {
} yield completion

override def assertCompletion(optCompletion: Option[Completion]): Unit = {
val completion = assertDefined(optCompletion)
val completion = assertDefined(optCompletion, "No completion has been produced")
assert(completion.status.forall(_.code == Status.Code.OK.value()))
assert(
Ref.SubmissionId.fromString(completion.submissionId).isRight,
Expand Down Expand Up @@ -135,7 +136,7 @@ private[testtool] object CompletionDeduplicationInfoIT {
} yield completion

override def assertCompletion(optCompletion: Option[Completion]): Unit = {
val completion = assertDefined(optCompletion)
val completion = assertDefined(optCompletion, "No completion has been produced")
assert(completion.status.forall(_.code == Status.Code.OK.value()))
}
}
Expand All @@ -155,7 +156,7 @@ private[testtool] object CompletionDeduplicationInfoIT {
optCompletion: Option[Completion],
requestedSubmissionId: Ref.SubmissionId,
): Unit = {
val submissionIdCompletion = assertDefined(optCompletion)
val submissionIdCompletion = assertDefined(optCompletion, "No completion has been produced")
val actualSubmissionId = submissionIdCompletion.submissionId
assert(submissionIdCompletion.status.forall(_.code == Status.Code.OK.value()))
assert(
Expand All @@ -168,7 +169,7 @@ private[testtool] object CompletionDeduplicationInfoIT {
private def assertDeduplicationPeriodIsReported(
optCompletion: Option[Completion]
): Unit = {
val completion = assertDefined(optCompletion)
val completion = assertDefined(optCompletion, "No completion has been produced")
assert(completion.status.forall(_.code == Status.Code.OK.value()))
assert(completion.deduplicationPeriod.isDefined, "The deduplication period was not reported")
}
Expand All @@ -178,7 +179,7 @@ private[testtool] object CompletionDeduplicationInfoIT {
optCompletion: Option[Completion],
): Unit = {
val expectedApplicationId = requestedApplicationId
assertDefined(optCompletion)
assertDefined(optCompletion, "No completion has been produced")
val applicationIdCompletion = optCompletion.get
assert(applicationIdCompletion.status.forall(_.code == Status.Code.OK.value()))
val actualApplicationId = applicationIdCompletion.applicationId
Expand All @@ -189,11 +190,6 @@ private[testtool] object CompletionDeduplicationInfoIT {
)
}

private def assertDefined(optCompletion: Option[Completion]): Completion = {
assert(optCompletion.isDefined, "No completion has been produced")
optCompletion.get
}

private def simpleCreate(party: Primitive.Party): Command = Dummy(party).create.command

private val RandomSubmissionId =
Expand Down