Skip to content

Commit

Permalink
ledger-api-test-tool: Add assertions and a test case for successful c…
Browse files Browse the repository at this point in the history
…ompletions when converting deduplication durations to offsets [KVL-1220] (#12462)

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
hubert-da authored Jan 21, 2022
1 parent 2267d1a commit cdde8df
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 115 deletions.
2 changes: 2 additions & 0 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ da_scala_test_suite(
"@maven//:com_typesafe_akka_akka_testkit",
"@maven//:io_spray_spray_json",
"@maven//:org_mockito_mockito_scala",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalactic_scalactic",
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
"@maven//:org_scalatest_scalatest_shouldmatchers",
"@maven//:org_scalatest_scalatest_wordspec",
"@maven//:org_scalatestplus_scalacheck_1_15",
"@maven//:org_scalaz_scalaz_core",
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scala_lang_modules_scala_parallel_collections",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.deduplication
package com.daml.platform.participant.util

import java.math.BigInteger

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.deduplication
package com.daml.platform.participant.util

import java.math.BigInteger

Expand Down
3 changes: 2 additions & 1 deletion ledger/ledger-api-test-tool-on-canton/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ conformance_test(
"DeeplyNestedValueIT", # FIXME: Too deeply nested values flake with a time out (half of the time)
"CommandDeduplicationIT:SimpleDeduplicationBasic", # sync vs async error (part of canton #6301)
"CommandDeduplicationIT:DeduplicateSubmitterBasic",
"CommandDeduplicationIT:DeduplicateUsingOffsets", # temporary disabled until we adapt it for canton
"CommandDeduplicationIT:DeduplicateUsingDurations", # disabled while Canton does not support the feature descriptors
"CommandDeduplicationIT:DeduplicateUsingOffsets", # disabled while Canton does not support the feature descriptors
"RaceConditionIT:RWArchiveVsFailedLookupByKey", # finding a lookup failure after contract creation
# dynamic config management not supported by Canton
"ConfigManagementServiceIT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

package com.daml.ledger.api.testtool.infrastructure

import java.time.Instant

import com.daml.ledger.api.testtool.infrastructure.FutureAssertions.ExpectedFailureException
import com.daml.ledger.api.testtool.infrastructure.time.DelayMechanism
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.timer.Delayed

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -50,9 +48,10 @@ object FutureAssertions {
/** Runs the test case after the specified delay
*/
def assertAfter[V](
delay: FiniteDuration
)(test: => Future[V]): Future[V] =
Delayed.Future.by(delay)(test)
delay: FiniteDuration,
delayMechanism: DelayMechanism,
)(test: => Future[V])(implicit executionContext: ExecutionContext): Future[V] =
delayMechanism.delayBy(delay).flatMap(_ => test)

/** Run the test every [[retryDelay]] up to [[maxRetryDuration]].
* The test case will run up to [[ceil(maxRetryDuration / retryDelay)]] times.
Expand All @@ -62,6 +61,7 @@ object FutureAssertions {
def succeedsEventually[V](
retryDelay: FiniteDuration = 100.millis,
maxRetryDuration: FiniteDuration,
delayMechanism: DelayMechanism,
description: String,
)(
test: => Future[V]
Expand All @@ -75,7 +75,7 @@ object FutureAssertions {
)
}
else
assertAfter(retryDelay)(test).recoverWith { case NonFatal(ex) =>
assertAfter(retryDelay, delayMechanism)(test).recoverWith { case NonFatal(ex) =>
logger.debug(
s"Failed assertion: $description. Running again with new max duration $nextRetryRemainingDuration",
ex,
Expand All @@ -87,40 +87,6 @@ object FutureAssertions {
internalSucceedsEventually(maxRetryDuration)
}

/** Run the test every [[rerunDelay]] for a duration of [[succeedDuration]] or until the current time exceeds [[succeedDeadline]].
* The assertion will succeed if all of the test case runs are successful and [[succeedDuration]] is exceeded or [[succeedDeadline]] is exceeded.
* The assertion will fail if any test case runs fail.
*/
def succeedsUntil[V](
rerunDelay: FiniteDuration = 100.millis,
succeedDuration: FiniteDuration,
succeedDeadline: Option[Instant] = None,
)(
test: => Future[V]
)(implicit ec: ExecutionContext, loggingContext: LoggingContext): Future[V] = {
def internalSucceedsUntil(remainingDuration: FiniteDuration): Future[V] = {
val nextRerunRemainingDuration = remainingDuration - rerunDelay
if (
succeedDeadline.exists(
_.isBefore(Instant.now().plusSeconds(rerunDelay.toSeconds))
) || nextRerunRemainingDuration < Duration.Zero
) test
else
assertAfter(rerunDelay)(test)
.flatMap { _ =>
internalSucceedsUntil(nextRerunRemainingDuration)
}
}

internalSucceedsUntil(succeedDuration)
.andThen { case Failure(exception) =>
logger.error(
s"Repeated assertion failed with a succeed duration of $succeedDuration.",
exception,
)
}
}

def forAllParallel[T](
data: Seq[T]
)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationPeriodSu
DurationSupport,
OffsetSupport,
}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.lf.data.Ref
import com.daml.platform.testing.WithTimeout
import com.google.protobuf.duration.{Duration => DurationProto}
import io.grpc.Status.Code
Expand Down Expand Up @@ -51,37 +53,84 @@ object CommandDeduplicationAssertions {
completionResponse.completion.deduplicationPeriod.deduplicationOffset,
"No deduplication offset has been reported",
)
val reportedHexOffset = Ref.HexString.assertFromString(reportedOffset)
if (completionResponse.completion.getStatus.code == Code.ALREADY_EXISTS.value) {
// Search for the first accepting completion with the same command ID since the beginning of the test case.
// Multiple consecutive accepting completions are not supported.
val completionStreamRequest = ledger.completionStreamRequest()(submittingParty)
WithTimeout(5.seconds)(
ledger
.findCompletion(completionStreamRequest)(c =>
c.commandId == c.commandId && c.getStatus.code == Code.OK.value
)
).map { optAcceptedCompletionResponse =>
val acceptedCompletionResponse = assertDefined(
optAcceptedCompletionResponse,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' has been found",
)
assert(
acceptedCompletionResponse.offset.getAbsolute > reportedOffset,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' after the reported offset $reportedOffset has been found",
)
assert(
acceptedCompletionResponse.offset.getAbsolute < completionResponse.offset.getAbsolute,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' before the completion's offset ${completionResponse.offset} has been found",
)
}
assertReportedOffsetForDuplicateSubmission(
reportedHexOffset,
completionResponse,
submittingParty,
ledger,
)
} else {
fail("This case is not yet supported by this assertion")
assertReportedOffsetForAcceptedSubmission(
reportedHexOffset,
requestedDuration,
completionResponse,
submittingParty,
ledger,
)
}
case DurationSupport.Unrecognized(_) =>
fail("Unrecognized deduplication duration support")
}
}

private def assertReportedOffsetForDuplicateSubmission(
reportedOffset: Ref.HexString,
completionResponse: CompletionResponse,
submittingParty: Party,
ledger: ParticipantTestContext,
)(implicit executionContext: ExecutionContext) =
WithTimeout(5.seconds)(
ledger.findCompletionAtOffset(
reportedOffset,
c => c.commandId == completionResponse.completion.commandId && c.getStatus.code == Code.OK.value,
)(submittingParty)
).map { optAcceptedCompletionResponse =>
val acceptedCompletionResponse = assertDefined(
optAcceptedCompletionResponse,
s"No accepted completion with the command ID '${completionResponse.completion.commandId}' since the reported offset $reportedOffset has been found",
)
assert(
acceptedCompletionResponse.offset.getAbsolute < completionResponse.offset.getAbsolute,
s"An accepted completion with the command ID '${completionResponse.completion.commandId}' at the offset ${acceptedCompletionResponse.offset} that is not before the completion's offset ${completionResponse.offset} has been found",
)
}

private def assertReportedOffsetForAcceptedSubmission(
reportedOffset: Ref.HexString,
requestedDuration: Duration,
completionResponse: CompletionResponse,
submittingParty: Party,
ledger: ParticipantTestContext,
)(implicit executionContext: ExecutionContext) =
WithTimeout(5.seconds)(
ledger.findCompletionAtOffset(
reportedOffset,
_.commandId == completionResponse.completion.commandId,
)(submittingParty)
).map { optReportedOffsetCompletionResponse =>
val reportedOffsetCompletionResponse = assertDefined(
optReportedOffsetCompletionResponse,
s"No completion with the command ID '${completionResponse.completion.commandId}' since the reported offset $reportedOffset has been found",
)
assert(
reportedOffsetCompletionResponse.offset == LedgerOffset.of(
LedgerOffset.Value.Absolute(reportedOffset)
),
s"No completion with the reported offset $reportedOffset has been found, the ${reportedOffsetCompletionResponse.offset} offset has been found instead",
)
val durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes = Duration
.between(
reportedOffsetCompletionResponse.recordTime,
completionResponse.recordTime,
)
assert(
durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes >= requestedDuration,
s"The requested deduplication duration $requestedDuration was greater than the duration between the reported deduplication offset and completion record times ($durationBetweenReportedDeduplicationOffsetAndCompletionRecordTimes).",
)
}

def assertDeduplicationOffset(
requestedDeduplicationOffsetCompletionResponse: CompletionResponse,
completionResponse: CompletionResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.daml.ledger.api.v1.value.{Identifier, Value}
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.client.binding.{Primitive, Template}
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.participant.util.HexOffset
import com.daml.platform.testing.StreamConsumer
import com.google.protobuf.ByteString
import io.grpc.health.v1.health.{HealthCheckRequest, HealthCheckResponse}
Expand Down Expand Up @@ -685,6 +687,20 @@ private[testtool] final class ParticipantTestContext private[participant] (
def firstCompletions(parties: Party*): Future[Vector[Completion]] =
firstCompletions(completionStreamRequest()(parties: _*))

def findCompletionAtOffset(
offset: Ref.HexString,
p: Completion => Boolean,
)(parties: Party*): Future[Option[CompletionResponse]] = {
// We have to request an offset before the reported offset, as offsets are exclusive in the completion service.
val offsetPreviousToReportedOffset = HexOffset
.previous(offset)
.map(offset => LedgerOffset.of(LedgerOffset.Value.Absolute(offset)))
.getOrElse(referenceOffset)
val reportedOffsetCompletionStreamRequest =
completionStreamRequest(offsetPreviousToReportedOffset)(parties: _*)
findCompletion(reportedOffsetCompletionStreamRequest)(p)
}

def findCompletion(
request: CompletionStreamRequest
)(p: Completion => Boolean): Future[Option[CompletionResponse]] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.testtool.infrastructure.time

import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.timer.Delayed

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}

trait DelayMechanism {
val skews: FiniteDuration
def delayBy(duration: Duration): Future[Unit]
}

class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext)
extends DelayMechanism {
override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(())
}

class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit
ec: ExecutionContext
) extends DelayMechanism {
override def delayBy(duration: Duration): Future[Unit] =
ledger
.time()
.flatMap { currentTime =>
ledger.setTime(currentTime, currentTime.plusMillis(duration.toMillis))
}
}
Loading

0 comments on commit cdde8df

Please sign in to comment.