Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command submission to the ledger-api-bench-tool (basic support) [DPP-658] #11296

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ledger/ledger-api-bench-tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ da_scala_library(
"@maven//:com_lihaoyi_pprint",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_actor_typed",
"@maven//:com_typesafe_akka_akka_stream",
"@maven//:io_circe_circe_core",
"@maven//:io_circe_circe_yaml",
"@maven//:org_typelevel_cats_core",
],
visibility = ["//visibility:public"],
deps = [
Expand All @@ -51,6 +55,8 @@ da_scala_library(
"//libs-scala/resources",
"//libs-scala/resources-akka",
"//libs-scala/resources-grpc",
"//ledger/test-common:model-tests-%s.scala" % "1.14", # TODO: make the LF version configurable
"//ledger/test-common:dar-files-%s-lib" % "1.14", # TODO: make the LF version configurable
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_core",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.metrics.{
MetricRegistryOwner,
MetricsCollector,
MetricsSet,
StreamMetrics,
}
import com.daml.ledger.api.benchtool.services.LedgerApiServices
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.resources.ResourceContext
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object Benchmark {
private val logger = LoggerFactory.getLogger(getClass)

def run(
config: Config,
apiServices: LedgerApiServices,
)(implicit ec: ExecutionContext, resourceContext: ResourceContext): Future[Unit] = {
Comment on lines +23 to +26
Copy link
Contributor Author

@kamil-da kamil-da Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a refactor. The logic in this class was taken directly from

private def runBenchmark(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {

val resources = for {
system <- TypedActorSystemResourceOwner.owner()
registry <- new MetricRegistryOwner(
reporter = config.metricsReporter,
reportingInterval = config.reportingPeriod,
logger = logger,
)
} yield (system, registry)

resources.use { case (system, registry) =>
Future
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet
.transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
apiServices.transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.transactionTreesExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
apiServices.transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
apiServices.activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet
.completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
apiServices.commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
Failure(ex)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.daml.ledger.api.v1.value.Identifier
import com.daml.metrics.MetricsReporter
import scopt.{OptionDef, OptionParser, Read}

import java.io.File
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}

Expand All @@ -32,7 +33,6 @@ object Cli {
opt[Config.StreamConfig]("consume-stream")
.abbr("s")
.unbounded()
.minOccurs(1)
.text(
s"Stream configuration."
)
Expand All @@ -43,6 +43,15 @@ object Cli {
config.copy(streams = config.streams :+ streamConfig)
}

opt[File]("contract-set-descriptor")
.hidden() // TODO: uncomment when production-ready
.abbr("d")
.optional()
.text("A contract set descriptor file.")
.action { case (descriptorFile, config) =>
config.copy(contractSetDescriptorFile = Some(descriptorFile))
}

opt[FiniteDuration]("log-interval")
.abbr("r")
.text("Stream metrics log interval.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.value.Identifier
import com.daml.metrics.MetricsReporter

import java.io.File
import scala.concurrent.duration._

case class Config(
Expand All @@ -16,6 +17,7 @@ case class Config(
tls: TlsConfiguration,
streams: List[Config.StreamConfig],
reportingPeriod: FiniteDuration,
contractSetDescriptorFile: Option[File],
metricsReporter: MetricsReporter,
)

Expand Down Expand Up @@ -90,6 +92,7 @@ object Config {
tls = TlsConfiguration.Empty.copy(enabled = false),
streams = List.empty[Config.StreamConfig],
reportingPeriod = 5.seconds,
contractSetDescriptorFile = None,
metricsReporter = MetricsReporter.Console,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,8 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.metrics.{
MetricRegistryOwner,
MetricsCollector,
MetricsSet,
StreamMetrics,
}
import com.daml.ledger.api.benchtool.services.{
ActiveContractsService,
CommandCompletionService,
LedgerIdentityService,
TransactionService,
}
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.api.benchtool.generating.ContractProducer
import com.daml.ledger.api.benchtool.services._
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
Expand All @@ -31,127 +20,60 @@ import java.util.concurrent.{
}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success}

object LedgerApiBenchTool {
def main(args: Array[String]): Unit = {
Cli.config(args) match {
case Some(config) =>
val benchmark = runBenchmark(config)(ExecutionContext.Implicits.global)
val result = run(config)(ExecutionContext.Implicits.global)
.recover { case ex =>
println(s"Error: ${ex.getMessage}")
sys.exit(1)
}(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(benchmark, atMost = Duration.Inf)
Await.result(result, atMost = Duration.Inf)
()
case _ =>
logger.error("Invalid configuration arguments.")
}
}

private def runBenchmark(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
private def run(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
val printer = pprint.PPrinter(200, 1000)
logger.info(s"Starting benchmark with configuration:\n${printer(config).toString()}")

implicit val resourceContext: ResourceContext = ResourceContext(ec)

val resources = for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
system <- TypedActorSystemResourceOwner.owner()
registry <- new MetricRegistryOwner(
reporter = config.metricsReporter,
reportingInterval = config.reportingPeriod,
logger = logger,
)
} yield (channel, system, registry)

resources.use { case (channel, system, registry) =>
val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel)
val ledgerId: String = ledgerIdentityService.fetchLedgerId()
val transactionService = new TransactionService(channel, ledgerId)
val activeContractsService = new ActiveContractsService(channel, ledgerId)
val commandCompletionService = new CommandCompletionService(channel, ledgerId)
Future
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet
.transactionExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
exposedMetrics = Some(
MetricsSet.transactionTreesExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet.activeContractsExposedMetrics(
streamConfig.name,
registry,
config.reportingPeriod,
)
),
)(system, ec)
.flatMap { observer =>
activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
exposedMetrics = Some(
MetricsSet
.completionsExposedMetrics(streamConfig.name, registry, config.reportingPeriod)
),
)(system, ec)
.flatMap { observer =>
commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
Failure(ex)
}
apiServicesOwner(config).use { apiServices =>
def testContractsGenerationStep(): Future[Unit] = config.contractSetDescriptorFile match {
case None =>
Future.successful(
logger.info("No contract set descriptor file provided. Skipping contracts generation.")
)
case Some(descriptorFile) => ContractProducer(apiServices).create(descriptorFile)
}

def benchmarkStep(): Future[Unit] = if (config.streams.isEmpty) {
Future.successful(logger.info(s"No streams defined. Skipping the benchmark step."))
} else {
Benchmark.run(config, apiServices)
}

for {
_ <- testContractsGenerationStep()
_ <- benchmarkStep()
} yield ()
}
}

private def apiServicesOwner(
config: Config
)(implicit ec: ExecutionContext): ResourceOwner[LedgerApiServices] =
for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
services <- ResourceOwner.forFuture(() => LedgerApiServices.forChannel(channel))
} yield services

private def channelOwner(
ledger: Config.Ledger,
tls: TlsConfiguration,
Expand Down
Loading