diff --git a/ledger/ledger-api-bench-tool/BUILD.bazel b/ledger/ledger-api-bench-tool/BUILD.bazel index 995f05d09493..75f631f16c89 100644 --- a/ledger/ledger-api-bench-tool/BUILD.bazel +++ b/ledger/ledger-api-bench-tool/BUILD.bazel @@ -40,6 +40,10 @@ da_scala_library( "@maven//:com_lihaoyi_pprint", "@maven//:com_typesafe_akka_akka_actor", "@maven//:com_typesafe_akka_akka_actor_typed", + "@maven//:com_typesafe_akka_akka_stream", + "@maven//:io_circe_circe_core", + "@maven//:io_circe_circe_yaml", + "@maven//:org_typelevel_cats_core", ], visibility = ["//visibility:public"], deps = [ @@ -51,6 +55,8 @@ da_scala_library( "//libs-scala/resources", "//libs-scala/resources-akka", "//libs-scala/resources-grpc", + "//ledger/test-common:model-tests-%s.scala" % "1.14", # TODO: make the LF version configurable + "//ledger/test-common:dar-files-%s-lib" % "1.14", # TODO: make the LF version configurable "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:io_grpc_grpc_api", "@maven//:io_grpc_grpc_core", diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala new file mode 100644 index 000000000000..1b7412b82b35 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Benchmark.scala @@ -0,0 +1,116 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool + +import com.daml.ledger.api.benchtool.metrics.{ + MetricRegistryOwner, + MetricsCollector, + MetricsSet, + StreamMetrics, +} +import com.daml.ledger.api.benchtool.services.LedgerApiServices +import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner +import com.daml.ledger.resources.ResourceContext +import org.slf4j.LoggerFactory + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +object Benchmark { + private val logger = LoggerFactory.getLogger(getClass) + + def run( + config: Config, + apiServices: LedgerApiServices, + )(implicit ec: ExecutionContext, resourceContext: ResourceContext): Future[Unit] = { + val resources = for { + system <- TypedActorSystemResourceOwner.owner() + registry <- new MetricRegistryOwner( + reporter = config.metricsReporter, + reportingInterval = config.reportingPeriod, + logger = logger, + ) + } yield (system, registry) + + resources.use { case (system, registry) => + Future + .traverse(config.streams) { + case streamConfig: Config.StreamConfig.TransactionsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.transactionMetrics(streamConfig.objectives), + logger = logger, + exposedMetrics = Some( + MetricsSet + .transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod) + ), + )(system, ec) + .flatMap { observer => + apiServices.transactionService.transactions(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives), + logger = logger, + exposedMetrics = Some( + MetricsSet.transactionTreesExposedMetrics( + streamConfig.name, + registry, + config.reportingPeriod, + ) + ), + )(system, ec) + .flatMap { observer => + apiServices.transactionService.transactionTrees(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.activeContractsMetrics, + logger = logger, + exposedMetrics = Some( + MetricsSet.activeContractsExposedMetrics( + streamConfig.name, + registry, + config.reportingPeriod, + ) + ), + )(system, ec) + .flatMap { observer => + apiServices.activeContractsService.getActiveContracts(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.CompletionsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.completionsMetrics, + logger = logger, + exposedMetrics = Some( + MetricsSet + .completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod) + ), + )(system, ec) + .flatMap { observer => + apiServices.commandCompletionService.completions(streamConfig, observer) + } + } + .transform { + case Success(results) => + if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated)) + Failure(new RuntimeException("Metrics objectives not met.")) + else Success(()) + case Failure(ex) => + Failure(ex) + } + } + } +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala index 603f2801dfd3..8ee714570fab 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala @@ -10,6 +10,7 @@ import com.daml.ledger.api.v1.value.Identifier import com.daml.metrics.MetricsReporter import scopt.{OptionDef, OptionParser, Read} +import java.io.File import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success, Try} @@ -32,7 +33,6 @@ object Cli { opt[Config.StreamConfig]("consume-stream") .abbr("s") .unbounded() - .minOccurs(1) .text( s"Stream configuration." ) @@ -43,6 +43,15 @@ object Cli { config.copy(streams = config.streams :+ streamConfig) } + opt[File]("contract-set-descriptor") + .hidden() // TODO: uncomment when production-ready + .abbr("d") + .optional() + .text("A contract set descriptor file.") + .action { case (descriptorFile, config) => + config.copy(contractSetDescriptorFile = Some(descriptorFile)) + } + opt[FiniteDuration]("log-interval") .abbr("r") .text("Stream metrics log interval.") diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala index af793f4d9204..cc251f5209e0 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala @@ -8,6 +8,7 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.api.v1.value.Identifier import com.daml.metrics.MetricsReporter +import java.io.File import scala.concurrent.duration._ case class Config( @@ -16,6 +17,7 @@ case class Config( tls: TlsConfiguration, streams: List[Config.StreamConfig], reportingPeriod: FiniteDuration, + contractSetDescriptorFile: Option[File], metricsReporter: MetricsReporter, ) @@ -90,6 +92,7 @@ object Config { tls = TlsConfiguration.Empty.copy(enabled = false), streams = List.empty[Config.StreamConfig], reportingPeriod = 5.seconds, + contractSetDescriptorFile = None, metricsReporter = MetricsReporter.Console, ) } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala index 9d1f6fde47d0..0f14d2cb5d13 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala @@ -3,19 +3,8 @@ package com.daml.ledger.api.benchtool -import com.daml.ledger.api.benchtool.metrics.{ - MetricRegistryOwner, - MetricsCollector, - MetricsSet, - StreamMetrics, -} -import com.daml.ledger.api.benchtool.services.{ - ActiveContractsService, - CommandCompletionService, - LedgerIdentityService, - TransactionService, -} -import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner +import com.daml.ledger.api.benchtool.generating.ContractProducer +import com.daml.ledger.api.benchtool.services._ import com.daml.ledger.api.tls.TlsConfiguration import com.daml.ledger.resources.{ResourceContext, ResourceOwner} import io.grpc.Channel @@ -31,127 +20,60 @@ import java.util.concurrent.{ } import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} -import scala.util.{Failure, Success} object LedgerApiBenchTool { def main(args: Array[String]): Unit = { Cli.config(args) match { case Some(config) => - val benchmark = runBenchmark(config)(ExecutionContext.Implicits.global) + val result = run(config)(ExecutionContext.Implicits.global) .recover { case ex => println(s"Error: ${ex.getMessage}") sys.exit(1) }(scala.concurrent.ExecutionContext.Implicits.global) - Await.result(benchmark, atMost = Duration.Inf) + Await.result(result, atMost = Duration.Inf) () case _ => logger.error("Invalid configuration arguments.") } } - private def runBenchmark(config: Config)(implicit ec: ExecutionContext): Future[Unit] = { + private def run(config: Config)(implicit ec: ExecutionContext): Future[Unit] = { val printer = pprint.PPrinter(200, 1000) logger.info(s"Starting benchmark with configuration:\n${printer(config).toString()}") implicit val resourceContext: ResourceContext = ResourceContext(ec) - val resources = for { - executorService <- threadPoolExecutorOwner(config.concurrency) - channel <- channelOwner(config.ledger, config.tls, executorService) - system <- TypedActorSystemResourceOwner.owner() - registry <- new MetricRegistryOwner( - reporter = config.metricsReporter, - reportingInterval = config.reportingPeriod, - logger = logger, - ) - } yield (channel, system, registry) - - resources.use { case (channel, system, registry) => - val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel) - val ledgerId: String = ledgerIdentityService.fetchLedgerId() - val transactionService = new TransactionService(channel, ledgerId) - val activeContractsService = new ActiveContractsService(channel, ledgerId) - val commandCompletionService = new CommandCompletionService(channel, ledgerId) - Future - .traverse(config.streams) { - case streamConfig: Config.StreamConfig.TransactionsStreamConfig => - StreamMetrics - .observer( - streamName = streamConfig.name, - logInterval = config.reportingPeriod, - metrics = MetricsSet.transactionMetrics(streamConfig.objectives), - logger = logger, - exposedMetrics = Some( - MetricsSet - .transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod) - ), - )(system, ec) - .flatMap { observer => - transactionService.transactions(streamConfig, observer) - } - case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig => - StreamMetrics - .observer( - streamName = streamConfig.name, - logInterval = config.reportingPeriod, - metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives), - logger = logger, - exposedMetrics = Some( - MetricsSet.transactionTreesExposedMetrics( - streamConfig.name, - registry, - config.reportingPeriod, - ) - ), - )(system, ec) - .flatMap { observer => - transactionService.transactionTrees(streamConfig, observer) - } - case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig => - StreamMetrics - .observer( - streamName = streamConfig.name, - logInterval = config.reportingPeriod, - metrics = MetricsSet.activeContractsMetrics, - logger = logger, - exposedMetrics = Some( - MetricsSet.activeContractsExposedMetrics( - streamConfig.name, - registry, - config.reportingPeriod, - ) - ), - )(system, ec) - .flatMap { observer => - activeContractsService.getActiveContracts(streamConfig, observer) - } - case streamConfig: Config.StreamConfig.CompletionsStreamConfig => - StreamMetrics - .observer( - streamName = streamConfig.name, - logInterval = config.reportingPeriod, - metrics = MetricsSet.completionsMetrics, - logger = logger, - exposedMetrics = Some( - MetricsSet - .completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod) - ), - )(system, ec) - .flatMap { observer => - commandCompletionService.completions(streamConfig, observer) - } - } - .transform { - case Success(results) => - if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated)) - Failure(new RuntimeException("Metrics objectives not met.")) - else Success(()) - case Failure(ex) => - Failure(ex) - } + apiServicesOwner(config).use { apiServices => + def testContractsGenerationStep(): Future[Unit] = config.contractSetDescriptorFile match { + case None => + Future.successful( + logger.info("No contract set descriptor file provided. Skipping contracts generation.") + ) + case Some(descriptorFile) => ContractProducer(apiServices).create(descriptorFile) + } + + def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) { + Future.successful(logger.info(s"No streams defined. Skipping the benchmark step.")) + } else { + Benchmark.run(config, apiServices) + } + + for { + _ <- testContractsGenerationStep() + _ <- benchmarkStep() + } yield () } } + private def apiServicesOwner( + config: Config + )(implicit ec: ExecutionContext): ResourceOwner[LedgerApiServices] = + for { + executorService <- threadPoolExecutorOwner(config.concurrency) + channel <- channelOwner(config.ledger, config.tls, executorService) + services <- ResourceOwner.forFuture(() => LedgerApiServices.forChannel(channel)) + } yield services + private def channelOwner( ledger: Config.Ledger, tls: TlsConfiguration, diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractProducer.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractProducer.scala new file mode 100644 index 000000000000..a06e78f3d5c2 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractProducer.scala @@ -0,0 +1,128 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.generating + +import akka.actor.ActorSystem +import akka.stream.Materializer +import akka.stream.scaladsl.Source +import com.daml.ledger.api.benchtool.infrastructure.TestDars +import com.daml.ledger.api.v1.commands.Commands +import com.daml.ledger.client.binding.Primitive.Party +import com.daml.ledger.api.benchtool.services.LedgerApiServices +import com.daml.ledger.api.benchtool.util.SimpleFileReader +import com.daml.ledger.client.binding.Primitive +import com.daml.ledger.resources.{ResourceContext, ResourceOwner} +import org.slf4j.LoggerFactory + +import java.io.File +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} +import scalaz.syntax.tag._ +import com.daml.ledger.test.model.Foo.Foo1 +import scala.concurrent.duration._ + +case class ContractProducer(services: LedgerApiServices) { + private val logger = LoggerFactory.getLogger(getClass) + + private val identifierSuffix = f"${System.nanoTime}%x" + private val applicationId = "benchtool" + private val workflowId = s"$applicationId-$identifierSuffix" + private def commandId(index: Int) = s"command-$index-$identifierSuffix" + + def create( + descriptorFile: File + )(implicit ec: ExecutionContext): Future[Unit] = { + logger.info("Generating contracts...") + for { + descriptor <- Future.fromTry(parseDescriptor(descriptorFile)) + party <- allocateParty() + _ <- uploadTestDars() + _ <- createContracts(descriptor = descriptor, party = party) + } yield { + logger.info("Contracts produced successfully.") + } + + } + + private def parseDescriptor(descriptorFile: File): Try[ContractSetDescriptor] = { + SimpleFileReader.readFile(descriptorFile)(DescriptorParser.parse).flatMap { + case Left(err: DescriptorParser.DescriptorParserError) => + val message = s"Descriptor parsing error. Details: ${err.details}" + logger.error(message) + Failure(new RuntimeException(message)) + case Right(descriptor) => + logger.info(s"Descriptor parsed: $descriptor") + Success(descriptor) + } + } + + private def allocateParty()(implicit ec: ExecutionContext): Future[Primitive.Party] = { + val party0Hint = s"party-0-$identifierSuffix" + services.partyManagementService.allocateParty(party0Hint) + } + + private def uploadTestDars()(implicit ec: ExecutionContext): Future[Unit] = { + def uploadDar(dar: TestDars.DarFile, submissionId: String): Future[Unit] = { + logger.info(s"Uploading dar: ${dar.name}") + services.packageManagementService.uploadDar( + bytes = dar.bytes, + submissionId = submissionId, + ) + } + + for { + dars <- Future.fromTry(TestDars.readAll()) + _ <- Future.sequence(dars.zipWithIndex.map { case (dar, index) => + uploadDar(dar, s"submission-dars-$index-$identifierSuffix") + }) + } yield () + } + + private def createContract(index: Int, party: Party)(implicit + ec: ExecutionContext + ): Future[Int] = { + val createCommand = Foo1(signatory = party, observers = List(party)).create.command + val commands = new Commands( + ledgerId = services.ledgerId, + applicationId = applicationId, + commandId = commandId(index), + party = party.unwrap, + commands = List(createCommand), + workflowId = workflowId, + ) + services.commandService.submitAndWait(commands).map(_ => index) + } + + private def createContracts(descriptor: ContractSetDescriptor, party: Party)(implicit + ec: ExecutionContext + ): Future[Unit] = { + def logProgress(index: Int): Unit = + if (index % 100 == 0) { + logger.info( + s"Created contracts: $index out of ${descriptor.numberOfInstances} (${(index.toDouble / descriptor.numberOfInstances) * 100}%)" + ) + } + + implicit val resourceContext: ResourceContext = ResourceContext(ec) + materializerOwner() + .use { implicit materializer => + Source + .fromIterator(() => (1 to descriptor.numberOfInstances).iterator) + .throttle( + elements = 100, + per = 1.second, + ) + .mapAsync(8)(index => createContract(index, party)) + .runForeach(logProgress) + } + .map(_ => ()) + } + + private def materializerOwner(): ResourceOwner[Materializer] = { + for { + actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem("CommandSubmissionSystem")) + materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)) + } yield materializer + } +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractSetDescriptor.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractSetDescriptor.scala new file mode 100644 index 000000000000..6f3ff45d8168 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/ContractSetDescriptor.scala @@ -0,0 +1,8 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.generating + +final case class ContractSetDescriptor( + numberOfInstances: Int +) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/DescriptorParser.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/DescriptorParser.scala new file mode 100644 index 000000000000..bf89dc2a4091 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/generating/DescriptorParser.scala @@ -0,0 +1,25 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.generating + +import io.circe._ +import io.circe.yaml.parser + +import java.io.Reader + +object DescriptorParser { + + def parse(reader: Reader): Either[DescriptorParserError, ContractSetDescriptor] = + parser + .parse(reader) + .flatMap(_.as[ContractSetDescriptor]) + .left + .map(error => DescriptorParserError(error.getLocalizedMessage)) + + implicit val decoder: Decoder[ContractSetDescriptor] = + Decoder.forProduct1("num_instances")(ContractSetDescriptor.apply) + + case class DescriptorParserError(details: String) + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/infrastructure/TestDars.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/infrastructure/TestDars.scala new file mode 100644 index 000000000000..3836913bdd20 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/infrastructure/TestDars.scala @@ -0,0 +1,33 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.infrastructure + +import com.daml.ledger.api.benchtool.util.SimpleFileReader +import com.daml.ledger.test.TestDar +import com.google.protobuf.ByteString + +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +object TestDars { + private val TestDarInfix = "model-tests" + private lazy val resources: List[String] = TestDar.paths.filter(_.contains(TestDarInfix)) + + def readAll(): Try[List[DarFile]] = { + (TestDars.resources + .foldLeft[Try[List[DarFile]]](Success(List.empty)) { (acc, resourceName) => + for { + dars <- acc + bytes <- SimpleFileReader.readResource(resourceName) + } yield DarFile(resourceName, bytes) :: dars + }) + .recoverWith { case NonFatal(ex) => + Failure(TestDarsError(s"Reading test dars failed. Details: ${ex.getLocalizedMessage}", ex)) + } + } + + case class TestDarsError(message: String, cause: Throwable) extends Exception(message, cause) + + case class DarFile(name: String, bytes: ByteString) +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandService.scala new file mode 100644 index 000000000000..7801e36ffa10 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandService.scala @@ -0,0 +1,53 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import com.daml.ledger.api.v1.command_service._ +import com.daml.ledger.api.v1.commands.{Command, Commands} +import com.daml.ledger.client.binding.Primitive.Party +import com.google.protobuf.empty.Empty +import io.grpc.Channel +import org.slf4j.LoggerFactory +import scalaz.syntax.tag._ + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal + +class CommandService(channel: Channel) { + private val logger = LoggerFactory.getLogger(getClass) + private val service: CommandServiceGrpc.CommandServiceStub = CommandServiceGrpc.stub(channel) + + def submitAndWait(commands: Commands)(implicit ec: ExecutionContext): Future[Empty] = + service + .submitAndWait(new SubmitAndWaitRequest(Some(commands))) + .recoverWith { case NonFatal(ex) => + Future.failed { + logger.error(s"Command submission error. Details: ${ex.getLocalizedMessage}", ex) + ex + } + } +} + +object CommandService { + def submitAndWaitRequest( + ledgerId: String, + applicationId: String, + commandId: String, + workflowId: String, + party: Party, + commands: List[Command], + ): SubmitAndWaitRequest = + new SubmitAndWaitRequest( + Some( + new Commands( + ledgerId = ledgerId, + applicationId = applicationId, + commandId = commandId, + party = party.unwrap, + commands = commands, + workflowId = workflowId, + ) + ) + ) +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerApiServices.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerApiServices.scala new file mode 100644 index 000000000000..d9fd80790f66 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerApiServices.scala @@ -0,0 +1,26 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import io.grpc.Channel + +import scala.concurrent.{ExecutionContext, Future} + +class LedgerApiServices(channel: Channel, val ledgerId: String) { + + val activeContractsService = new ActiveContractsService(channel, ledgerId) + val commandService = new CommandService(channel) + val commandCompletionService = new CommandCompletionService(channel, ledgerId) + val packageManagementService = new PackageManagementService(channel) + val partyManagementService = new PartyManagementService(channel) + val transactionService = new TransactionService(channel, ledgerId) + +} + +object LedgerApiServices { + def forChannel(channel: Channel)(implicit ec: ExecutionContext): Future[LedgerApiServices] = { + val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel) + ledgerIdentityService.fetchLedgerId().map(ledgerId => new LedgerApiServices(channel, ledgerId)) + } +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerIdentityService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerIdentityService.scala index a0bdef5e3b8a..8101f4110e83 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerIdentityService.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/LedgerIdentityService.scala @@ -10,16 +10,32 @@ import com.daml.ledger.api.v1.ledger_identity_service.{ import io.grpc.Channel import org.slf4j.LoggerFactory +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + final class LedgerIdentityService(channel: Channel) { private val logger = LoggerFactory.getLogger(getClass) - private val service = LedgerIdentityServiceGrpc.blockingStub(channel) + private val service = LedgerIdentityServiceGrpc.stub(channel) - def fetchLedgerId(): String = { - val response = service.getLedgerIdentity( - new GetLedgerIdentityRequest() - ) - logger.info(s"Fetched ledger ID: ${response.ledgerId}") - response.ledgerId - } + def fetchLedgerId()(implicit ec: ExecutionContext): Future[String] = + service + .getLedgerIdentity( + new GetLedgerIdentityRequest() + ) + .transformWith { + case Success(response) => + Future.successful { + logger.info(s"Fetched ledger ID: ${response.ledgerId}") + response.ledgerId + } + case Failure(exception) => + Future.failed { + logger.error( + s"Error during fetching the ledger id. Details: ${exception.getLocalizedMessage}", + exception, + ) + exception + } + } } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PackageManagementService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PackageManagementService.scala new file mode 100644 index 000000000000..58d9dc4d6636 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PackageManagementService.scala @@ -0,0 +1,25 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import com.daml.ledger.api.v1.admin.package_management_service.{ + PackageManagementServiceGrpc, + UploadDarFileRequest, +} +import com.google.protobuf.ByteString +import io.grpc.Channel + +import scala.concurrent.{ExecutionContext, Future} + +class PackageManagementService(channel: Channel) { + private val service = PackageManagementServiceGrpc.stub(channel) + + def uploadDar(bytes: ByteString, submissionId: String)(implicit + ec: ExecutionContext + ): Future[Unit] = + service + .uploadDarFile(new UploadDarFileRequest(bytes, submissionId)) + .map(_ => ()) + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PartyManagementService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PartyManagementService.scala new file mode 100644 index 000000000000..3af3ed96eda1 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/PartyManagementService.scala @@ -0,0 +1,42 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import com.daml.ledger.api.v1.admin.party_management_service.{ + AllocatePartyRequest, + PartyManagementServiceGrpc, +} +import com.daml.ledger.client.binding.Primitive.Party +import io.grpc.Channel +import org.slf4j.{Logger, LoggerFactory} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +class PartyManagementService(channel: Channel) { + private val logger: Logger = LoggerFactory.getLogger(getClass) + private val service: PartyManagementServiceGrpc.PartyManagementServiceStub = + PartyManagementServiceGrpc.stub(channel) + + def allocateParty(hint: String)(implicit ec: ExecutionContext): Future[Party] = + service + .allocateParty(new AllocatePartyRequest(partyIdHint = hint)) + .transformWith { + case Success(response) => + Future.successful { + val party = Party(response.partyDetails.get.party) + logger.info(s"Allocated party: $party") + party + } + case Failure(exception) => + Future.failed { + logger.error( + s"Error during party allocation. Details: ${exception.getLocalizedMessage}", + exception, + ) + exception + } + } + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/SimpleFileReader.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/SimpleFileReader.scala new file mode 100644 index 000000000000..8421de51f79c --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/SimpleFileReader.scala @@ -0,0 +1,19 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.util + +import com.google.protobuf.ByteString + +import java.io.{BufferedReader, File, FileReader, Reader} +import scala.util.{Try, Using} + +object SimpleFileReader { + + def readResource[Result](name: String): Try[ByteString] = + Using(getClass.getClassLoader.getResourceAsStream(name))(ByteString.readFrom) + + def readFile[Result](file: File)(f: Reader => Result): Try[Result] = + Using(new BufferedReader(new FileReader(file)))(f) + +} diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/generating/DescriptorParserSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/generating/DescriptorParserSpec.scala new file mode 100644 index 000000000000..9e3d42b71f69 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/generating/DescriptorParserSpec.scala @@ -0,0 +1,29 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.generating + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.io.StringReader + +class DescriptorParserSpec extends AnyWordSpec with Matchers { + "DescriptorParser" should { + "return error when empty yaml" in { + parseYaml("") shouldBe a[Left[_, _]] + } + "parse number of instances" in { + parseYaml("""num_instances: 123""") shouldBe Right( + ContractSetDescriptor( + numberOfInstances = 123 + ) + ) + } + } + + def parseYaml( + yaml: String + ): Either[DescriptorParser.DescriptorParserError, ContractSetDescriptor] = + DescriptorParser.parse(new StringReader(yaml)) +} diff --git a/ledger/test-common/src/main/daml/model/Foo.daml b/ledger/test-common/src/main/daml/model/Foo.daml new file mode 100644 index 000000000000..ec9a65d2c58c --- /dev/null +++ b/ledger/test-common/src/main/daml/model/Foo.daml @@ -0,0 +1,13 @@ +-- Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +module Foo where + +template Foo1 + with + signatory : Party + observers : [Party] + where + signatory signatory + observer observers +