Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ledger-api-test-tool] - Future assertions improvements [KVL-1218] #12294

Merged
merged 8 commits into from
Jan 7, 2022
Merged
1 change: 1 addition & 0 deletions ledger/ledger-api-test-tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ da_scala_binary(
"//ledger/error",
"//ledger/ledger-api-common",
"//libs-scala/build-info",
"//libs-scala/contextualized-logging",
"//libs-scala/grpc-utils",
"//libs-scala/resources",
"//libs-scala/resources-akka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

package com.daml.ledger.api.testtool

import java.io.File
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.concurrent.Executors

import com.daml.ledger.api.testtool.infrastructure.Reporter.ColorizedPrintStreamReporter
import com.daml.ledger.api.testtool.infrastructure.Result.Excluded
import com.daml.ledger.api.testtool.infrastructure._
Expand All @@ -13,9 +17,6 @@ import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import org.slf4j.LoggerFactory

import scala.collection.compat._
import java.io.File
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.concurrent.Executors
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -179,7 +180,7 @@ object LedgerApiTestTool {
testsToRun,
)

runner.flatMap(_.runTests).onComplete {
runner.flatMap(_.runTests(ExecutionContext.global)).onComplete {
case Success(summaries) =>
val excludedTestSummaries =
excludedTests.map { ledgerTestCase =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.api.testtool.infrastructure

import java.util
import java.util.regex.Pattern

import com.daml.error.ErrorCode
Expand Down Expand Up @@ -144,9 +145,7 @@ object Assertions {
errorInfo.getMetadataMap
}
.getOrElse {
fail(
s"The error did not contain a definite answer. Details were: ${details.mkString("[", ", ", "]")}"
)
new util.HashMap()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@

package com.daml.ledger.api.testtool.infrastructure

import java.time.Instant

import com.daml.ledger.api.testtool.infrastructure.FutureAssertions.ExpectedFailureException
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.timer.Delayed

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

final class FutureAssertions[T](future: Future[T]) {
Expand Down Expand Up @@ -34,13 +40,121 @@ final class FutureAssertions[T](future: Future[T]) {
case Success(value) => Failure(new ExpectedFailureException(context, value))
case Failure(other) => Failure(other)
}

}

object FutureAssertions {

private val logger = ContextualizedLogger.get(getClass)

/** Runs the test case after the specified delay
*/
def assertAfter[V](
delay: FiniteDuration
)(test: => Future[V]): Future[V] =
Delayed.Future.by(delay)(test)

/** Run the test every [[retryDelay]] up to [[maxRetryDuration]].
* The assertion will succeed if any of the test case runs are successful.
* The assertion will fail if no test case runs are successful and the [[maxRetryDuration]] is exceeded.
* The test case will run up to [[ceil(maxRetryDuration / retryDelay)]] times
nicu-da marked this conversation as resolved.
Show resolved Hide resolved
*/
def succeedsEventually[V](
fabiotudone-da marked this conversation as resolved.
Show resolved Hide resolved
retryDelay: FiniteDuration = 100.millis,
maxRetryDuration: FiniteDuration,
description: String,
)(
test: => Future[V]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually called twice or is it cached after the first call?

I did a bit of local testing and it seems fine, but I just want to make sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be in principle, but the code paths are mutually exclusive, so it's not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be called up to maxInterval / delay, as we need to retry until it either succeeds or the interval expires

)(implicit ec: ExecutionContext, loggingContext: LoggingContext): Future[V] = {
def internalSucceedsEventually(remainingDuration: FiniteDuration): Future[V] = {
val nextRetryRemainingDuration = remainingDuration - retryDelay
if (nextRetryRemainingDuration < Duration.Zero) test.andThen { case Failure(exception) =>
logger.error(
s"Assertion never succeeded after $maxRetryDuration with a delay of $retryDelay. Description: $description",
exception,
)
}
else
assertAfter(retryDelay)(test).recoverWith { case NonFatal(ex) =>
logger.debug(
s"Failed assertion: $description. Running again with new max duration $nextRetryRemainingDuration",
ex,
)
internalSucceedsEventually(nextRetryRemainingDuration)
}
}

internalSucceedsEventually(maxRetryDuration)
}

/** Run the test every [[rerunDelay]] for a duration of [[succeedDuration]] or until the current time exceeds [[succeedDeadline]].
* The assertion will succeed if all of the test case runs are successful and [[succeedDuration]] is exceeded or [[succeedDeadline]] is exceeded.
* The assertion will fail if any test case runs fail.
*/
def succeedsUntil[V](
rerunDelay: FiniteDuration = 100.millis,
succeedDuration: FiniteDuration,
succeedDeadline: Option[Instant] = None,
Comment on lines +96 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
succeedDuration: FiniteDuration,
succeedDeadline: Option[Instant] = None,
duration: FiniteDuration,
until: Option[Instant] = None,

Also, if you don't need to pass both, consider an Either.

)(
test: => Future[V]
)(implicit ec: ExecutionContext, loggingContext: LoggingContext): Future[V] = {
def internalSucceedsUntil(remainingDuration: FiniteDuration): Future[V] = {
val nextRerunRemainingDuration = remainingDuration - rerunDelay
if (
succeedDeadline.exists(
_.isBefore(Instant.now().plusSeconds(rerunDelay.toSeconds))
) || nextRerunRemainingDuration < Duration.Zero
) test
else
assertAfter(rerunDelay)(test)
.flatMap { _ =>
internalSucceedsUntil(nextRerunRemainingDuration)
}
}

internalSucceedsUntil(succeedDuration)
.andThen { case Failure(exception) =>
logger.error(
s"Repeated assertion failed with a succeed duration of $succeedDuration.",
exception,
)
}
}

def forAllParallel[T](
data: Seq[T]
)(testCase: T => Future[Unit])(implicit ec: ExecutionContext): Future[Seq[Unit]] = Future
.traverse(data)(input =>
testCase(input).map(Right(_)).recover { case NonFatal(ex) =>
Left(input -> ex)
}
)
.map { results =>
val (failures, successes) = results.partitionMap(identity)
if (failures.nonEmpty)
throw ParallelTestFailureException(
s"Failed parallel test case. Failures: ${failures.length}. Success: ${successes.length}\nFailed inputs: ${failures
.map(_._1)
.mkString("[", ",", "]")}",
failures.last._2,
)
else successes
}

def optionalAssertion(runs: Boolean, description: String)(
assertions: => Future[_]
)(implicit loggingContext: LoggingContext): Future[_] = if (runs) assertions
else {
logger.warn(s"Not running optional assertions: $description")
Future.unit
}
fabiotudone-da marked this conversation as resolved.
Show resolved Hide resolved

final class ExpectedFailureException[T](context: String, value: T)
extends NoSuchElementException(
s"Expected a failure when $context, but got a successful result of: $value"
)

}

final case class ParallelTestFailureException(message: String, failure: Throwable)
fabiotudone-da marked this conversation as resolved.
Show resolved Hide resolved
extends RuntimeException(message, failure)
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ final class LedgerTestCasesRunner(
ledgerSession,
concurrentTestCases,
concurrentTestRuns,
)(materializer, materializer.executionContext)
)(materializer, executionContext)
sequentialTestResults <- runTestCases(
ledgerSession,
sequentialTestCases,
concurrency = 1,
)(materializer, materializer.executionContext)
)(materializer, executionContext)
} yield concurrentTestResults ++ sequentialTestResults ++ excludedTestResults

testResults.recover {
Expand All @@ -259,7 +259,7 @@ final class LedgerTestCasesRunner(
val results =
for {
materializer <- materializerResources.asFuture
results <- run(participants)(materializer, materializer.executionContext)
results <- run(participants)(materializer, executionContext)
} yield results

results.onComplete(_ => materializerResources.release())
Expand Down