Skip to content

Commit

Permalink
Workflow for the ledger-api-bench-tool defined in YAML file [DPP-669] (
Browse files Browse the repository at this point in the history
…#11682)

* Moved StreamConfig to a separate class WorkflowConfig

* Change streams from Option[List[]] to List[]

* Simplifying

* Dedicated PartyFilter case class

* Use String instead of Identifier in filter config to allow using short names

* Submission config

* Switched to unified WorkflowConfig

* Minor change

* Minor change

* Workflow config parser tests

* Removed leftovers

* Added more tests

* Removed leftovers

* Updated CLI

* Complete stream configuration support in the YAML file in the ledger-api-bench-tool

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - All stream configuration parameters can be defined through the YAML configuration file.
CHANGELOG_END

* Addressed review comments (renaming).
  • Loading branch information
kamil-da authored Nov 15, 2021
1 parent 1791a52 commit 90ad968
Show file tree
Hide file tree
Showing 21 changed files with 805 additions and 574 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig
import com.daml.ledger.api.benchtool.metrics.{
MetricRegistryOwner,
MetricsCollector,
Expand All @@ -23,7 +24,7 @@ object Benchmark {
private val logger = LoggerFactory.getLogger(getClass)

def run(
streams: List[Config.StreamConfig],
streams: List[StreamConfig],
reportingPeriod: FiniteDuration,
apiServices: LedgerApiServices,
metricsReporter: MetricsReporter,
Expand All @@ -40,7 +41,7 @@ object Benchmark {
resources.use { case (system, registry) =>
Future
.traverse(streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
case streamConfig: StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
Expand All @@ -55,7 +56,7 @@ object Benchmark {
.flatMap { observer =>
apiServices.transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
case streamConfig: StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
Expand All @@ -73,7 +74,7 @@ object Benchmark {
.flatMap { observer =>
apiServices.transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
case streamConfig: StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
Expand All @@ -91,7 +92,7 @@ object Benchmark {
.flatMap { observer =>
apiServices.activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
case streamConfig: StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.config.WorkflowConfig.StreamConfig
import com.daml.ledger.api.benchtool.submission.CommandSubmitter
import com.daml.ledger.api.v1.value.Identifier
import com.daml.ledger.test.model.Foo.{Foo1, Foo2, Foo3}
import scalaz.syntax.tag._

object ConfigEnricher {

def enrichStreamConfig(
streamConfig: StreamConfig,
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
): StreamConfig = {
streamConfig match {
case config: StreamConfig.TransactionsStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionSummary))
case config: StreamConfig.TransactionTreesStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionSummary))
case config: StreamConfig.ActiveContractsStreamConfig =>
config.copy(filters = enrichFilters(config.filters, submissionSummary))
case config: StreamConfig.CompletionsStreamConfig =>
config.copy(party = convertParty(config.party, submissionSummary))
}
}

private def convertParty(
party: String,
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
): String =
submissionSummary match {
case None => party
case Some(summary) =>
summary.observers
.map(_.unwrap)
.find(_.contains(party))
.getOrElse(throw new RuntimeException(s"Observer not found: $party"))
}

private def enrichFilters(
filters: List[StreamConfig.PartyFilter],
submissionSummary: Option[CommandSubmitter.SubmissionSummary],
): List[StreamConfig.PartyFilter] = {
def identifierToFullyQualifiedString(id: Identifier) =
s"${id.packageId}:${id.moduleName}:${id.entityName}"
def fullyQualifiedTemplateId(template: String): String =
template match {
case "Foo1" => identifierToFullyQualifiedString(Foo1.id.unwrap)
case "Foo2" => identifierToFullyQualifiedString(Foo2.id.unwrap)
case "Foo3" => identifierToFullyQualifiedString(Foo3.id.unwrap)
case other => other
}

filters.map { filter =>
StreamConfig.PartyFilter(
party = convertParty(filter.party, submissionSummary),
templates = filter.templates.map(fullyQualifiedTemplateId),
)
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.config.{Config, ConfigMaker, WorkflowConfig}
import com.daml.ledger.api.benchtool.submission.CommandSubmitter
import com.daml.ledger.api.benchtool.services._
import com.daml.ledger.api.benchtool.util.SimpleFileReader
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import org.slf4j.{Logger, LoggerFactory}
import pprint.PPrinter

import java.io.File
import java.util.concurrent.{
ArrayBlockingQueue,
Executor,
Expand All @@ -23,31 +22,28 @@ import java.util.concurrent.{
}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object LedgerApiBenchTool {
def main(args: Array[String]): Unit = {
Cli.config(args) match {
case Some(config) =>
ConfigMaker.make(args) match {
case Left(error) =>
logger.error(s"Configuration error: ${error.details}")
case Right(config) =>
logger.info(s"Starting benchmark with configuration:\n${prettyPrint(config)}")
val result = run(config)(ExecutionContext.Implicits.global)
.recover { case ex =>
println(s"Error: ${ex.getMessage}")
sys.exit(1)
}(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(result, atMost = Duration.Inf)
()
case _ =>
logger.error("Invalid configuration arguments.")
}
}

private def run(config: Config)(implicit ec: ExecutionContext): Future[Unit] = {
logger.info(s"Starting benchmark with configuration:\n${prettyPrint(config)}")

implicit val resourceContext: ResourceContext = ResourceContext(ec)

apiServicesOwner(config).use { apiServices =>
def benchmarkStep(streams: List[Config.StreamConfig]): Future[Unit] =
def benchmarkStep(streams: List[WorkflowConfig.StreamConfig]): Future[Unit] =
if (streams.isEmpty) {
Future.successful(logger.info(s"No streams defined. Skipping the benchmark step."))
} else {
Expand All @@ -60,48 +56,35 @@ object LedgerApiBenchTool {
}

def submissionStep(
submissionDescriptor: Option[SubmissionDescriptor]
submissionConfig: Option[WorkflowConfig.SubmissionConfig]
): Future[Option[CommandSubmitter.SubmissionSummary]] =
submissionDescriptor match {
submissionConfig match {
case None =>
logger.info(s"No submission defined. Skipping.")
Future.successful(None)
case Some(descriptor) =>
case Some(submissionConfig) =>
CommandSubmitter(apiServices)
.submit(
descriptor = descriptor,
config = submissionConfig,
maxInFlightCommands = config.maxInFlightCommands,
submissionBatchSize = config.submissionBatchSize,
)
.map(Some(_))
}

config.contractSetDescriptorFile match {
case None =>
benchmarkStep(config.streams)
case Some(descriptorFile) =>
for {
descriptor <- Future.fromTry(parseDescriptor(descriptorFile))
_ = logger.info(prettyPrint(descriptor))
summary <- submissionStep(descriptor.submission)
streams = descriptor.streams.map(
DescriptorConverter.streamDescriptorToConfig(_, summary)
)
_ = logger.info(s"Converted stream configs: ${prettyPrint(streams)}")
_ <- benchmarkStep(streams)
} yield ()
}
for {
summary <- submissionStep(config.workflow.submission)
streams = config.workflow.streams.map(
ConfigEnricher.enrichStreamConfig(_, summary)
)
_ = logger.info(
s"Stream configs adapted after the submission step: ${prettyPrint(streams)}"
)
_ <- benchmarkStep(streams)
} yield ()
}
}

private def parseDescriptor(descriptorFile: File): Try[WorkflowDescriptor] =
SimpleFileReader.readFile(descriptorFile)(WorkflowParser.parse).flatMap {
case Left(err: WorkflowParser.ParserError) =>
Failure(new RuntimeException(s"Workflow parsing error. Details: ${err.details}"))
case Right(descriptor) =>
Success(descriptor)
}

private def apiServicesOwner(
config: Config
)(implicit ec: ExecutionContext): ResourceOwner[LedgerApiServices] =
Expand Down
Loading

0 comments on commit 90ad968

Please sign in to comment.