Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACS testing - payload support [DPP-661] #11308

Merged
merged 3 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ledger/ledger-api-bench-tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ da_scala_test_suite(
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_actor_typed",
"@maven//:com_typesafe_akka_akka_actor_testkit_typed",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
"@maven//:org_scalatest_scalatest_shouldmatchers",
"@maven//:org_scalatest_scalatest_wordspec",
"@maven//:org_scalatestplus_scalacheck_1_15",
],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.generating.ContractProducer
import com.daml.ledger.api.benchtool.submission.CommandSubmitter
import com.daml.ledger.api.benchtool.services._
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
Expand Down Expand Up @@ -49,7 +49,7 @@ object LedgerApiBenchTool {
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) => ContractProducer(apiServices).create(descriptorFile)
case Some(descriptorFile) => CommandSubmitter(apiServices).submitCommands(descriptorFile)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class LedgerIdentityService(channel: Channel) {
case Failure(exception) =>
Future.failed {
logger.error(
s"Error during fetching the ledger id. Details: ${exception.getLocalizedMessage}",
s"Error during fetching of the ledger id. Details: ${exception.getLocalizedMessage}",
exception,
)
exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ class PartyManagementService(channel: Channel) {
exception
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.submission

import com.daml.ledger.api.v1.commands.Command
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.Foo._

import java.nio.charset.StandardCharsets

final class CommandGenerator(
randomnessProvider: RandomnessProvider,
descriptor: ContractSetDescriptor,
party: Party,
) {
private val distribution = new Distribution(descriptor.instanceDistribution.map(_.weight))
private lazy val descriptionMapping: Map[Int, ContractSetDescriptor.ContractDescription] =
descriptor.instanceDistribution.zipWithIndex
.map(_.swap)
.toMap

def next(): Either[String, Party => Command] = {
val description = descriptionMapping(distribution.index(randomnessProvider.randomDouble()))
createContractCommand(
template = description.template,
observers = List(party),
payload = randomPayload(description.payloadSizeBytes),
)
}

private def createContractCommand(
template: String,
observers: List[Party],
payload: String,
): Either[String, Party => Command] = {
template match {
case "Foo1" => Right(Foo1(_, observers, payload).create.command)
case "Foo2" => Right(Foo2(_, observers, payload).create.command)
case "Foo3" => Right(Foo3(_, observers, payload).create.command)
case invalid => Left(s"Invalid template: $invalid")
}
}

private def randomPayload(sizeBytes: Int): String =
new String(randomnessProvider.randomBytes(sizeBytes), StandardCharsets.UTF_8)
}
Original file line number Diff line number Diff line change
@@ -1,97 +1,92 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.generating
package com.daml.ledger.api.benchtool.submission

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.ledger.api.benchtool.infrastructure.TestDars
import com.daml.ledger.api.v1.commands.Commands
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.util.SimpleFileReader
import com.daml.ledger.api.v1.commands.{Command, Commands}
import com.daml.ledger.client.binding.Primitive
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import org.slf4j.LoggerFactory
import scalaz.syntax.tag._

import java.io.File
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
import scalaz.syntax.tag._
import com.daml.ledger.test.model.Foo.Foo1
import scala.concurrent.duration._

case class ContractProducer(services: LedgerApiServices) {
case class CommandSubmitter(services: LedgerApiServices) {
private val logger = LoggerFactory.getLogger(getClass)

private val identifierSuffix = f"${System.nanoTime}%x"
private val applicationId = "benchtool"
private val workflowId = s"$applicationId-$identifierSuffix"
private def commandId(index: Int) = s"command-$index-$identifierSuffix"

def create(
def submitCommands(
descriptorFile: File
)(implicit ec: ExecutionContext): Future[Unit] = {
logger.info("Generating contracts...")
for {
)(implicit ec: ExecutionContext): Future[Unit] =
(for {
_ <- Future.successful(logger.info("Generating contracts..."))
descriptor <- Future.fromTry(parseDescriptor(descriptorFile))
party <- allocateParty()
_ <- uploadTestDars()
_ <- createContracts(descriptor = descriptor, party = party)
} yield {
logger.info("Contracts produced successfully.")
} yield logger.info("Contracts produced successfully.")).recoverWith { case NonFatal(ex) =>
logger.error(s"Command submission failed. Details: ${ex.getLocalizedMessage}", ex)
Future.failed(CommandSubmitter.CommandSubmitterError(ex.getLocalizedMessage))
}

}

private def parseDescriptor(descriptorFile: File): Try[ContractSetDescriptor] = {
private def parseDescriptor(descriptorFile: File): Try[ContractSetDescriptor] =
SimpleFileReader.readFile(descriptorFile)(DescriptorParser.parse).flatMap {
case Left(err: DescriptorParser.DescriptorParserError) =>
val message = s"Descriptor parsing error. Details: ${err.details}"
logger.error(message)
Failure(new RuntimeException(message))
Failure(CommandSubmitter.CommandSubmitterError(message))
case Right(descriptor) =>
logger.info(s"Descriptor parsed: $descriptor")
Success(descriptor)
}
}

private def allocateParty()(implicit ec: ExecutionContext): Future[Primitive.Party] = {
val party0Hint = s"party-0-$identifierSuffix"
services.partyManagementService.allocateParty(party0Hint)
}
private def allocateParty()(implicit ec: ExecutionContext): Future[Primitive.Party] =
services.partyManagementService.allocateParty(
hint = s"party-0-$identifierSuffix"
)

private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] = {
def uploadDar(dar: TestDars.DarFile, submissionId: String): Future[Unit] = {
logger.info(s"Uploading dar: ${dar.name}")
services.packageManagementService.uploadDar(
bytes = dar.bytes,
submissionId = submissionId,
)
}
private def uploadDar(dar: TestDars.DarFile, submissionId: String)(implicit
ec: ExecutionContext
): Future[Unit] =
services.packageManagementService.uploadDar(
bytes = dar.bytes,
submissionId = submissionId,
)

private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] =
for {
dars <- Future.fromTry(TestDars.readAll())
_ <- Future.sequence(dars.zipWithIndex.map { case (dar, index) =>
uploadDar(dar, s"submission-dars-$index-$identifierSuffix")
})
} yield ()
}

private def createContract(index: Int, party: Party)(implicit
private def createContract(id: String, party: Party, createCommand: Command)(implicit
ec: ExecutionContext
): Future[Int] = {
val createCommand = Foo1(signatory = party, observers = List(party)).create.command
): Future[Unit] = {
val commands = new Commands(
ledgerId = services.ledgerId,
applicationId = applicationId,
commandId = commandId(index),
commandId = id,
party = party.unwrap,
commands = List(createCommand),
workflowId = workflowId,
)
services.commandService.submitAndWait(commands).map(_ => index)
services.commandService.submitAndWait(commands).map(_ => ())
}

private def createContracts(descriptor: ContractSetDescriptor, party: Party)(implicit
Expand All @@ -104,16 +99,28 @@ case class ContractProducer(services: LedgerApiServices) {
)
}

val generator = new CommandGenerator(RandomnessProvider.Default, descriptor, party)

implicit val resourceContext: ResourceContext = ResourceContext(ec)
materializerOwner()
.use { implicit materializer =>
Source
.fromIterator(() => (1 to descriptor.numberOfInstances).iterator)
.throttle(
elements = 100,
per = 1.second,
)
.mapAsync(8)(index => createContract(index, party))
.mapAsync(100) { index =>
generator.next() match {
case Right(command) =>
createContract(
id = commandId(index),
party = party,
createCommand = command(party),
).map(_ => index)
case Left(error) =>
Future.failed {
logger.error(s"Command generation failed. Details: $error")
CommandSubmitter.CommandSubmitterError(error)
}
}
}
.runForeach(logProgress)
}
.map(_ => ())
Expand All @@ -126,3 +133,7 @@ case class ContractProducer(services: LedgerApiServices) {
} yield materializer
}
}

object CommandSubmitter {
case class CommandSubmitterError(msg: String) extends RuntimeException(msg)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.submission

final case class ContractSetDescriptor(
numberOfInstances: Int,
instanceDistribution: List[ContractSetDescriptor.ContractDescription],
)

object ContractSetDescriptor {
final case class ContractDescription(
template: String,
weight: Int,
payloadSizeBytes: Int,
)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.generating
package com.daml.ledger.api.benchtool.submission

import io.circe._
import io.circe.yaml.parser

import java.io.Reader

object DescriptorParser {
import Decoders._

def parse(reader: Reader): Either[DescriptorParserError, ContractSetDescriptor] =
parser
Expand All @@ -17,9 +18,21 @@ object DescriptorParser {
.left
.map(error => DescriptorParserError(error.getLocalizedMessage))

implicit val decoder: Decoder[ContractSetDescriptor] =
Decoder.forProduct1("num_instances")(ContractSetDescriptor.apply)

case class DescriptorParserError(details: String)

object Decoders {
implicit val contractDescriptionDecoder: Decoder[ContractSetDescriptor.ContractDescription] =
Decoder.forProduct3(
"template",
"weight",
"payload_size_bytes",
)(ContractSetDescriptor.ContractDescription.apply)

implicit val descriptorDecoder: Decoder[ContractSetDescriptor] =
Decoder.forProduct2(
"num_instances",
"instance_distribution",
)(ContractSetDescriptor.apply)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.submission

class Distribution(weights: List[Int]) {
assert(weights.nonEmpty, "Weights list must not be empty.")
assert(!weights.exists(_ < 1), "Weights must be strictly positive.")

def index(randomDouble: Double): Int = {
assert(randomDouble <= 1.0, "Given random double must be <= 1.0.")
// Consider changing implementation to use binary search when using on large lists.
distribution.indexWhere(_ >= randomDouble)
}

private lazy val totalWeight: Long = weights.map(_.toLong).sum
private lazy val distribution: List[Double] =
weights.scanLeft(0)((sum, weight) => sum + weight).map(_.toDouble / totalWeight).tail

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.submission

trait RandomnessProvider {
def randomDouble(): Double // 0.0 <= x <= 1.0
def randomBytes(n: Int): Array[Byte]
}

object RandomnessProvider {
object Default extends RandomnessProvider {
private val r = new scala.util.Random(System.currentTimeMillis())
override def randomDouble(): Double = r.nextDouble()
override def randomBytes(n: Int): Array[Byte] = {
val arr = Array.ofDim[Byte](n)
r.nextBytes(arr)
arr
}
}
}
Loading