From 4e49cf68141930ea3f7ea873a1e6ebd95fad9c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bo=C5=BCek?= <72492440+kamil-da@users.noreply.github.com> Date: Wed, 9 Jun 2021 09:55:07 +0200 Subject: [PATCH] `ledger-api-bench-tool` - Active contracts stream, completions stream [DPP-398, DPP-399] (#9857) * ledger-api-bench-tool: Active contracts streams CHANGELOG_BEGIN - [Integration Kit] - ledger-api-bench-tool - reading active contract streams CHANGELOG_END * ledger-api-bench-tool: Completions stream (#9872) CHANGELOG_BEGIN - [Integration Kit] - ledger-api-bench-tool - Reading completions stream CHANGELOG_END --- .../com/daml/ledger/api/benchtool/Cli.scala | 100 ++++++++++++-- .../daml/ledger/api/benchtool/Config.scala | 48 +++++-- .../api/benchtool/LedgerApiBenchTool.scala | 108 +++++++-------- .../metrics/MeteredStreamObserver.scala | 21 +-- .../benchtool/metrics/MetricsCollector.scala | 94 +++++++++++++ .../benchtool/metrics/MetricsManager.scala | 125 ++++++------------ ...nsactionMetrics.scala => MetricsSet.scala} | 86 +++++------- .../api/benchtool/metrics/StreamMetrics.scala | 27 ++++ .../services/ActiveContractsService.scala | 58 ++++++++ .../services/CommandCompletionService.scala | 51 +++++++ .../services/TransactionService.scala | 31 +++-- .../util/TypedActorSystemResourceOwner.scala | 20 ++- ...rSpec.scala => MetricsCollectorSpec.scala} | 50 +++---- 13 files changed, 551 insertions(+), 268 deletions(-) create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollector.scala rename ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/{TransactionMetrics.scala => MetricsSet.scala} (54%) create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/StreamMetrics.scala create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/ActiveContractsService.scala create mode 100644 ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandCompletionService.scala rename ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/{MetricsManagerSpec.scala => MetricsCollectorSpec.scala} (72%) diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala index f45c87927ded..76be2d8a7c5f 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Cli.scala @@ -3,10 +3,11 @@ package com.daml.ledger.api.benchtool +import com.daml.ledger.api.benchtool.Config.StreamConfig import com.daml.ledger.api.tls.TlsConfigurationCli import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.api.v1.value.Identifier -import scopt.{OptionParser, Read} +import scopt.{OptionDef, OptionParser, Read} import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success, Try} @@ -35,7 +36,7 @@ object Cli { s"Stream configuration." ) .valueName( - "stream-type=,name=,party=[,begin-offset=][,end-offset=][,template-ids=|][,max-delay=]" + "=,=,..." ) .action { case (streamConfig, config) => config.copy(streams = config.streams :+ streamConfig) @@ -66,6 +67,34 @@ object Cli { help("help").text("Prints this information") + private def note(level: Int, param: String, desc: String = ""): OptionDef[Unit, Config] = { + val paddedParam = s"${" " * level * 2}$param" + val internalPadding = math.max(1, 40 - paddedParam.length) + note(s"$paddedParam${" " * internalPadding}$desc") + } + + note(0, "") + note(0, "Stream configuration parameters:") + note(1, "Transactions/transaction trees:") + note(2, "stream-type=", "(required)") + note(2, "name=", "Stream name used to identify results (required)") + note(2, "party=", "(required)") + note(2, "begin-offset=") + note(2, "end-offset=") + note(2, "template-ids=|") + note(2, "max-delay=", "Max record time delay objective") + note(2, "min-consumption-speed=", "Min consumption speed objective") + note(1, "Active contract sets:") + note(2, "stream-type=active-contracts", "(required)") + note(2, "name=", "Stream name used to identify results (required)") + note(2, "party=", "(required)") + note(2, "template-ids=|") + note(1, "Command completions:") + note(2, "stream-type=completions", "(required)") + note(2, "name=", "Stream name used to identify results (required)") + note(2, "party=", "(required)") + note(2, "begin-offset=") + note(2, "template-ids=|") } def config(args: Array[String]): Option[Config] = @@ -99,14 +128,9 @@ object Cli { def offset(stringValue: String): LedgerOffset = LedgerOffset.defaultInstance.withAbsolute(stringValue) - val config = for { + def transactionsConfig: Either[String, StreamConfig.TransactionsStreamConfig] = for { name <- stringField("name") party <- stringField("party") - streamType <- stringField("stream-type").flatMap[String, Config.StreamConfig.StreamType] { - case "transactions" => Right(Config.StreamConfig.StreamType.Transactions) - case "transaction-trees" => Right(Config.StreamConfig.StreamType.TransactionTrees) - case invalid => Left(s"Invalid stream type: $invalid") - } templateIds <- optionalStringField("template-ids").flatMap { case Some(ids) => listOfTemplateIds(ids).map(Some(_)) case None => Right(None) @@ -115,9 +139,8 @@ object Cli { endOffset <- optionalStringField("end-offset").map(_.map(offset)) maxDelaySeconds <- optionalLongField("max-delay") minConsumptionSpeed <- optionalDoubleField("min-consumption-speed") - } yield Config.StreamConfig( + } yield Config.StreamConfig.TransactionsStreamConfig( name = name, - streamType = streamType, party = party, templateIds = templateIds, beginOffset = beginOffset, @@ -128,6 +151,63 @@ object Cli { ), ) + def transactionTreesConfig: Either[String, StreamConfig.TransactionTreesStreamConfig] = + for { + name <- stringField("name") + party <- stringField("party") + templateIds <- optionalStringField("template-ids").flatMap { + case Some(ids) => listOfTemplateIds(ids).map(Some(_)) + case None => Right(None) + } + beginOffset <- optionalStringField("begin-offset").map(_.map(offset)) + endOffset <- optionalStringField("end-offset").map(_.map(offset)) + maxDelaySeconds <- optionalLongField("max-delay") + minConsumptionSpeed <- optionalDoubleField("min-consumption-speed") + } yield Config.StreamConfig.TransactionTreesStreamConfig( + name = name, + party = party, + templateIds = templateIds, + beginOffset = beginOffset, + endOffset = endOffset, + objectives = Config.StreamConfig.Objectives( + maxDelaySeconds = maxDelaySeconds, + minConsumptionSpeed = minConsumptionSpeed, + ), + ) + + def activeContractsConfig: Either[String, StreamConfig.ActiveContractsStreamConfig] = for { + name <- stringField("name") + party <- stringField("party") + templateIds <- optionalStringField("template-ids").flatMap { + case Some(ids) => listOfTemplateIds(ids).map(Some(_)) + case None => Right(None) + } + } yield Config.StreamConfig.ActiveContractsStreamConfig( + name = name, + party = party, + templateIds = templateIds, + ) + + def completionsConfig: Either[String, StreamConfig.CompletionsStreamConfig] = for { + name <- stringField("name") + party <- stringField("party") + applicationId <- stringField("application-id") + beginOffset <- optionalStringField("begin-offset").map(_.map(offset)) + } yield Config.StreamConfig.CompletionsStreamConfig( + name = name, + party = party, + applicationId = applicationId, + beginOffset = beginOffset, + ) + + val config = stringField("stream-type").flatMap[String, Config.StreamConfig] { + case "transactions" => transactionsConfig + case "transaction-trees" => transactionTreesConfig + case "active-contracts" => activeContractsConfig + case "completions" => completionsConfig + case invalid => Left(s"Invalid stream type: $invalid") + } + config.fold(error => throw new IllegalArgumentException(error), identity) } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala index 7b8ce820f304..c0748fedde9b 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/Config.scala @@ -18,22 +18,42 @@ case class Config( ) object Config { - case class StreamConfig( - name: String, - streamType: Config.StreamConfig.StreamType, - party: String, - templateIds: Option[List[Identifier]], - beginOffset: Option[LedgerOffset], - endOffset: Option[LedgerOffset], - objectives: StreamConfig.Objectives, - ) + trait StreamConfig { + def name: String + def party: String + } object StreamConfig { - sealed trait StreamType - object StreamType { - case object Transactions extends StreamType - case object TransactionTrees extends StreamType - } + case class TransactionsStreamConfig( + name: String, + party: String, + templateIds: Option[List[Identifier]], + beginOffset: Option[LedgerOffset], + endOffset: Option[LedgerOffset], + objectives: StreamConfig.Objectives, + ) extends StreamConfig + + case class TransactionTreesStreamConfig( + name: String, + party: String, + templateIds: Option[List[Identifier]], + beginOffset: Option[LedgerOffset], + endOffset: Option[LedgerOffset], + objectives: StreamConfig.Objectives, + ) extends StreamConfig + + case class ActiveContractsStreamConfig( + name: String, + party: String, + templateIds: Option[List[Identifier]], + ) extends StreamConfig + + case class CompletionsStreamConfig( + name: String, + party: String, + applicationId: String, + beginOffset: Option[LedgerOffset], + ) extends StreamConfig case class Objectives( maxDelaySeconds: Option[Long], diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala index 27a1d2f6b2a9..b6788ffa67c3 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/LedgerApiBenchTool.scala @@ -3,20 +3,15 @@ package com.daml.ledger.api.benchtool -import akka.actor.typed.{ActorSystem, SpawnProtocol} -import com.daml.ledger.api.benchtool.metrics.{ - Creator, - MeteredStreamObserver, - MetricsManager, - TransactionMetrics, +import com.daml.ledger.api.benchtool.metrics.{MetricsCollector, MetricsSet, StreamMetrics} +import com.daml.ledger.api.benchtool.services.{ + ActiveContractsService, + CommandCompletionService, + LedgerIdentityService, + TransactionService, } -import com.daml.ledger.api.benchtool.services.{LedgerIdentityService, TransactionService} import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner import com.daml.ledger.api.tls.TlsConfiguration -import com.daml.ledger.api.v1.transaction_service.{ - GetTransactionTreesResponse, - GetTransactionsResponse, -} import com.daml.ledger.resources.{ResourceContext, ResourceOwner} import io.grpc.Channel import io.grpc.netty.{NegotiationType, NettyChannelBuilder} @@ -58,53 +53,65 @@ object LedgerApiBenchTool { val resources = for { executorService <- threadPoolExecutorOwner(config.concurrency) channel <- channelOwner(config.ledger, config.tls, executorService) - system <- actorSystemResourceOwner() + system <- TypedActorSystemResourceOwner.owner() } yield (channel, system) resources.use { case (channel, system) => val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel) val ledgerId: String = ledgerIdentityService.fetchLedgerId() val transactionService = new TransactionService(channel, ledgerId) + val activeContractsService = new ActiveContractsService(channel, ledgerId) + val commandCompletionService = new CommandCompletionService(channel, ledgerId) Future - .traverse(config.streams) { streamConfig => - streamConfig.streamType match { - case Config.StreamConfig.StreamType.Transactions => - TransactionMetrics - .transactionsMetricsManager( - streamConfig.name, - config.reportingPeriod, - streamConfig.objectives, - )(system) - .flatMap { manager => - val observer: MeteredStreamObserver[GetTransactionsResponse] = - new MeteredStreamObserver[GetTransactionsResponse]( - streamConfig.name, - logger, - manager, - )(system) - transactionService.transactions(streamConfig, observer) - } - case Config.StreamConfig.StreamType.TransactionTrees => - TransactionMetrics - .transactionTreesMetricsManager( - streamConfig.name, - config.reportingPeriod, - streamConfig.objectives, - )(system) - .flatMap { manager => - val observer = - new MeteredStreamObserver[GetTransactionTreesResponse]( - streamConfig.name, - logger, - manager, - )(system) - transactionService.transactionTrees(streamConfig, observer) - } - } + .traverse(config.streams) { + case streamConfig: Config.StreamConfig.TransactionsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.transactionMetrics(streamConfig.objectives), + logger = logger, + )(system, ec) + .flatMap { observer => + transactionService.transactions(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives), + logger = logger, + )(system, ec) + .flatMap { observer => + transactionService.transactionTrees(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.activeContractsMetrics, + logger = logger, + )(system, ec) + .flatMap { observer => + activeContractsService.getActiveContracts(streamConfig, observer) + } + case streamConfig: Config.StreamConfig.CompletionsStreamConfig => + StreamMetrics + .observer( + streamName = streamConfig.name, + logInterval = config.reportingPeriod, + metrics = MetricsSet.completionsMetrics, + logger = logger, + )(system, ec) + .flatMap { observer => + commandCompletionService.completions(streamConfig, observer) + } } .transform { case Success(results) => - if (results.contains(MetricsManager.Message.MetricsResult.ObjectivesViolated)) + if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated)) Failure(new RuntimeException("Metrics objectives not met.")) else Success(()) case Failure(ex) => @@ -143,11 +150,6 @@ object LedgerApiBenchTool { ResourceOwner.forChannel(channelBuilder, ShutdownTimeout) } - private def actorSystemResourceOwner(): ResourceOwner[ActorSystem[SpawnProtocol.Command]] = - new TypedActorSystemResourceOwner[SpawnProtocol.Command](() => - ActorSystem(Creator(), "Creator") - ) - private def threadPoolExecutorOwner( config: Config.Concurrency ): ResourceOwner[ThreadPoolExecutor] = diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MeteredStreamObserver.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MeteredStreamObserver.scala index f1463bd840b4..298bcdb679d8 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MeteredStreamObserver.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MeteredStreamObserver.scala @@ -3,7 +3,6 @@ package com.daml.ledger.api.benchtool.metrics -import akka.actor.typed.{ActorRef, ActorSystem} import com.daml.ledger.api.benchtool.util.ObserverWithResult import org.slf4j.Logger @@ -12,25 +11,17 @@ import scala.concurrent.Future class MeteredStreamObserver[T]( val streamName: String, logger: Logger, - metricsManager: ActorRef[MetricsManager.Message], -)(implicit system: ActorSystem[_]) - extends ObserverWithResult[T, MetricsManager.Message.MetricsResult](logger) { - import MetricsManager.Message._ + manager: MetricsManager[T], +) extends ObserverWithResult[T, MetricsCollector.Message.MetricsResult](logger) { override def onNext(value: T): Unit = { - metricsManager ! NewValue(value) + manager.sendNewValue(value) super.onNext(value) } - override def completeWith(): Future[MetricsResult] = { - // TODO: abstract over the ask pattern (possibly a container with the actor and a method for asking) - import akka.actor.typed.scaladsl.AskPattern._ - import akka.util.Timeout - - import scala.concurrent.duration._ - logger.debug(withStreamName(s"Sending $StreamCompleted notification.")) - implicit val timeout: Timeout = 3.seconds - metricsManager.ask(StreamCompleted(_)) + override def completeWith(): Future[MetricsCollector.Message.MetricsResult] = { + logger.debug(withStreamName(s"Asking for stream result...")) + manager.result() } } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollector.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollector.scala new file mode 100644 index 000000000000..d8022caf2be7 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollector.scala @@ -0,0 +1,94 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.metrics + +import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} +import akka.actor.typed.{ActorRef, Behavior} +import com.daml.ledger.api.benchtool.util.{MetricReporter, TimeUtil} + +import java.time.Instant +import scala.concurrent.duration._ + +object MetricsCollector { + + sealed trait Message + object Message { + sealed trait MetricsResult + object MetricsResult { + final case object Ok extends MetricsResult + final case object ObjectivesViolated extends MetricsResult + } + final case class NewValue[T](value: T) extends Message + final case class PeriodicUpdateCommand() extends Message + final case class StreamCompleted(replyTo: ActorRef[MetricsResult]) extends Message + } + + def apply[T]( + streamName: String, + metrics: List[Metric[T]], + logInterval: FiniteDuration, + reporter: MetricReporter, + ): Behavior[Message] = + Behaviors.withTimers { timers => + val startTime: Instant = Instant.now() + new MetricsCollector[T](timers, streamName, logInterval, reporter, startTime) + .handlingMessages(metrics, startTime) + } + +} + +class MetricsCollector[T]( + timers: TimerScheduler[MetricsCollector.Message], + streamName: String, + logInterval: FiniteDuration, + reporter: MetricReporter, + startTime: Instant, +) { + import MetricsCollector._ + import MetricsCollector.Message._ + + timers.startTimerWithFixedDelay(PeriodicUpdateCommand(), logInterval) + + def handlingMessages(metrics: List[Metric[T]], lastPeriodicCheck: Instant): Behavior[Message] = { + Behaviors.receive { case (context, message) => + message match { + case newValue: NewValue[T] => + handlingMessages(metrics.map(_.onNext(newValue.value)), lastPeriodicCheck) + + case _: PeriodicUpdateCommand => + val currentTime = Instant.now() + val (newMetrics, values) = metrics + .map(_.periodicValue(TimeUtil.durationBetween(lastPeriodicCheck, currentTime))) + .unzip + context.log.info(namedMessage(reporter.formattedValues(values))) + handlingMessages(newMetrics, currentTime) + + case message: StreamCompleted => + context.log.info( + namedMessage( + reporter.finalReport( + streamName = streamName, + metrics = metrics, + duration = totalDuration, + ) + ) + ) + message.replyTo ! result(metrics) + Behaviors.stopped + } + } + } + + private def result(metrics: List[Metric[T]]): MetricsResult = { + val atLeastOneObjectiveViolated = metrics.exists(_.violatedObjective.nonEmpty) + + if (atLeastOneObjectiveViolated) MetricsResult.ObjectivesViolated + else MetricsResult.Ok + } + + private def namedMessage(message: String) = s"[$streamName] $message" + + private def totalDuration: java.time.Duration = + TimeUtil.durationBetween(startTime, Instant.now()) +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsManager.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsManager.scala index 406a165af629..e0585b0e932c 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsManager.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsManager.scala @@ -3,100 +3,51 @@ package com.daml.ledger.api.benchtool.metrics -import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} -import akka.actor.typed.{ActorRef, Behavior, SpawnProtocol} -import com.daml.ledger.api.benchtool.util.{MetricReporter, TimeUtil} +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.{ActorRef, ActorSystem, Props, SpawnProtocol} +import akka.util.Timeout +import com.daml.ledger.api.benchtool.util.MetricReporter -import java.time.Instant import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} -object MetricsManager { +case class MetricsManager[T](collector: ActorRef[MetricsCollector.Message])(implicit + system: ActorSystem[SpawnProtocol.Command] +) { + def sendNewValue(value: T): Unit = + collector ! MetricsCollector.Message.NewValue(value) - sealed trait Message - object Message { - sealed trait MetricsResult - object MetricsResult { - final case object Ok extends MetricsResult - final case object ObjectivesViolated extends MetricsResult - } - final case class NewValue[T](value: T) extends Message - final case class PeriodicUpdateCommand() extends Message - final case class StreamCompleted(replyTo: ActorRef[MetricsResult]) extends Message + def result[Result](): Future[MetricsCollector.Message.MetricsResult] = { + implicit val timeout: Timeout = Timeout(3.seconds) + collector.ask(MetricsCollector.Message.StreamCompleted) } +} - def apply[T]( +object MetricsManager { + def apply[StreamElem]( streamName: String, - metrics: List[Metric[T]], logInterval: FiniteDuration, - reporter: MetricReporter, - ): Behavior[Message] = - Behaviors.withTimers { timers => - val startTime: Instant = Instant.now() - new MetricsManager[T](timers, streamName, logInterval, reporter, startTime) - .handlingMessages(metrics, startTime) - } - -} - -class MetricsManager[T]( - timers: TimerScheduler[MetricsManager.Message], - streamName: String, - logInterval: FiniteDuration, - reporter: MetricReporter, - startTime: Instant, -) { - import MetricsManager._ - import MetricsManager.Message._ - - timers.startTimerWithFixedDelay(PeriodicUpdateCommand(), logInterval) - - def handlingMessages(metrics: List[Metric[T]], lastPeriodicCheck: Instant): Behavior[Message] = { - Behaviors.receive { case (context, message) => - message match { - case newValue: NewValue[T] => - handlingMessages(metrics.map(_.onNext(newValue.value)), lastPeriodicCheck) - - case _: PeriodicUpdateCommand => - val currentTime = Instant.now() - val (newMetrics, values) = metrics - .map(_.periodicValue(TimeUtil.durationBetween(lastPeriodicCheck, currentTime))) - .unzip - context.log.info(namedMessage(reporter.formattedValues(values))) - handlingMessages(newMetrics, currentTime) - - case message: StreamCompleted => - context.log.info( - namedMessage( - reporter.finalReport( - streamName = streamName, - metrics = metrics, - duration = totalDuration, - ) - ) - ) - message.replyTo ! result(metrics) - Behaviors.stopped - } - } - } - - private def result(metrics: List[Metric[T]]): MetricsResult = { - val atLeastOneObjectiveViolated = metrics.exists(_.violatedObjective.nonEmpty) - - if (atLeastOneObjectiveViolated) MetricsResult.ObjectivesViolated - else MetricsResult.Ok + metrics: List[Metric[StreamElem]], + )(implicit + system: ActorSystem[SpawnProtocol.Command], + ec: ExecutionContext, + ): Future[MetricsManager[StreamElem]] = { + implicit val timeout: Timeout = Timeout(3.seconds) + + val collectorActor: Future[ActorRef[MetricsCollector.Message]] = system.ask( + SpawnProtocol.Spawn( + behavior = MetricsCollector( + streamName = streamName, + metrics = metrics, + logInterval = logInterval, + reporter = MetricReporter.Default, + ), + name = s"${streamName}-collector", + props = Props.empty, + _, + ) + ) + + collectorActor.map(MetricsManager[StreamElem](_)) } - - private def namedMessage(message: String) = s"[$streamName] $message" - - private def totalDuration: java.time.Duration = - TimeUtil.durationBetween(startTime, Instant.now()) -} - -object Creator { - def apply(): Behavior[SpawnProtocol.Command] = - Behaviors.setup { context => - context.log.debug(s"Starting Creator actor") - SpawnProtocol() - } } diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/TransactionMetrics.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala similarity index 54% rename from ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/TransactionMetrics.scala rename to ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala index 59ce582b184e..6c5cd19c2963 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/TransactionMetrics.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/MetricsSet.scala @@ -3,11 +3,10 @@ package com.daml.ledger.api.benchtool.metrics -import akka.actor.typed.{ActorRef, ActorSystem, Props, SpawnProtocol} -import akka.util.Timeout import com.daml.ledger.api.benchtool.Config.StreamConfig.Objectives import com.daml.ledger.api.benchtool.metrics.objectives.{MaxDelay, MinConsumptionSpeed} -import com.daml.ledger.api.benchtool.util.MetricReporter +import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse +import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.api.v1.transaction_service.{ GetTransactionTreesResponse, GetTransactionsResponse, @@ -15,58 +14,9 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.google.protobuf.timestamp.Timestamp import java.time.Clock -import scala.concurrent.Future -import scala.concurrent.duration._ -object TransactionMetrics { - implicit val timeout: Timeout = Timeout(3.seconds) - import akka.actor.typed.scaladsl.AskPattern._ - - def transactionsMetricsManager( - streamName: String, - logInterval: FiniteDuration, - objectives: Objectives, - )(implicit - system: ActorSystem[SpawnProtocol.Command] - ): Future[ActorRef[MetricsManager.Message]] = { - system.ask( - SpawnProtocol.Spawn( - behavior = MetricsManager( - streamName = streamName, - metrics = transactionMetrics(objectives), - logInterval = logInterval, - reporter = MetricReporter.Default, - ), - name = s"${streamName}-manager", - props = Props.empty, - _, - ) - ) - } - - def transactionTreesMetricsManager( - streamName: String, - logInterval: FiniteDuration, - objectives: Objectives, - )(implicit - system: ActorSystem[SpawnProtocol.Command] - ): Future[ActorRef[MetricsManager.Message]] = { - system.ask( - SpawnProtocol.Spawn( - behavior = MetricsManager( - streamName = streamName, - metrics = transactionTreesMetrics(objectives), - logInterval = logInterval, - reporter = MetricReporter.Default, - ), - name = s"${streamName}-manager", - props = Props.empty, - _, - ) - ) - } - - private def transactionMetrics(objectives: Objectives): List[Metric[GetTransactionsResponse]] = +object MetricsSet { + def transactionMetrics(objectives: Objectives): List[Metric[GetTransactionsResponse]] = all[GetTransactionsResponse]( countingFunction = _.transactions.length, sizingFunction = _.serializedSize.toLong, @@ -76,7 +26,7 @@ object TransactionMetrics { objectives = objectives, ) - private def transactionTreesMetrics( + def transactionTreesMetrics( objectives: Objectives ): List[Metric[GetTransactionTreesResponse]] = all[GetTransactionTreesResponse]( @@ -88,6 +38,32 @@ object TransactionMetrics { objectives = objectives, ) + def activeContractsMetrics: List[Metric[GetActiveContractsResponse]] = + List[Metric[GetActiveContractsResponse]]( + CountRateMetric.empty[GetActiveContractsResponse]( + countingFunction = _.activeContracts.length + ), + TotalCountMetric.empty[GetActiveContractsResponse]( + countingFunction = _.activeContracts.length + ), + SizeMetric.empty[GetActiveContractsResponse]( + sizingFunction = _.serializedSize.toLong + ), + ) + + def completionsMetrics: List[Metric[CompletionStreamResponse]] = + List[Metric[CompletionStreamResponse]]( + CountRateMetric.empty( + countingFunction = _.completions.length + ), + TotalCountMetric.empty( + countingFunction = _.completions.length + ), + SizeMetric.empty( + sizingFunction = _.serializedSize.toLong + ), + ) + private def all[T]( countingFunction: T => Int, sizingFunction: T => Long, diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/StreamMetrics.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/StreamMetrics.scala new file mode 100644 index 000000000000..29594e7aef99 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/metrics/StreamMetrics.scala @@ -0,0 +1,27 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.metrics + +import akka.actor.typed.{ActorSystem, SpawnProtocol} +import org.slf4j.Logger + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} + +object StreamMetrics { + + def observer[StreamElem]( + streamName: String, + logInterval: FiniteDuration, + metrics: List[Metric[StreamElem]], + logger: Logger, + )(implicit + system: ActorSystem[SpawnProtocol.Command], + ec: ExecutionContext, + ): Future[MeteredStreamObserver[StreamElem]] = + MetricsManager(streamName, logInterval, metrics).map { manager => + new MeteredStreamObserver[StreamElem](streamName, logger, manager) + } + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/ActiveContractsService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/ActiveContractsService.scala new file mode 100644 index 000000000000..709fff9ed49b --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/ActiveContractsService.scala @@ -0,0 +1,58 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import com.daml.ledger.api.benchtool.Config +import com.daml.ledger.api.benchtool.util.ObserverWithResult +import com.daml.ledger.api.v1.active_contracts_service._ +import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter} +import io.grpc.Channel +import org.slf4j.LoggerFactory + +import scala.concurrent.Future + +final class ActiveContractsService( + channel: Channel, + ledgerId: String, +) { + + private val logger = LoggerFactory.getLogger(getClass) + private val service: ActiveContractsServiceGrpc.ActiveContractsServiceStub = + ActiveContractsServiceGrpc.stub(channel) + + def getActiveContracts[Result]( + config: Config.StreamConfig.ActiveContractsStreamConfig, + observer: ObserverWithResult[GetActiveContractsResponse, Result], + ): Future[Result] = { + service.getActiveContracts(getActiveContractsRequest(ledgerId, config), observer) + logger.info("Started fetching active contracts") + observer.result + } + + private def getActiveContractsRequest( + ledgerId: String, + config: Config.StreamConfig.ActiveContractsStreamConfig, + ) = { + val templatesFilter = config.templateIds match { + case Some(ids) => + Filters.defaultInstance.withInclusive( + InclusiveFilters.defaultInstance.addAllTemplateIds(ids) + ) + case None => + Filters.defaultInstance + } + + GetActiveContractsRequest.defaultInstance + .withLedgerId(ledgerId) + .withFilter( + TransactionFilter.defaultInstance + .withFiltersByParty( + Map( + config.party -> templatesFilter + ) + ) + ) + } + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandCompletionService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandCompletionService.scala new file mode 100644 index 000000000000..49cc867965c3 --- /dev/null +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/CommandCompletionService.scala @@ -0,0 +1,51 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.api.benchtool.services + +import com.daml.ledger.api.benchtool.Config +import com.daml.ledger.api.benchtool.util.ObserverWithResult +import com.daml.ledger.api.v1.command_completion_service.{ + CommandCompletionServiceGrpc, + CompletionStreamRequest, + CompletionStreamResponse, +} +import io.grpc.Channel +import org.slf4j.LoggerFactory + +import scala.concurrent.Future + +class CommandCompletionService( + channel: Channel, + ledgerId: String, +) { + private val logger = LoggerFactory.getLogger(getClass) + private val service: CommandCompletionServiceGrpc.CommandCompletionServiceStub = + CommandCompletionServiceGrpc.stub(channel) + + def completions[Result]( + config: Config.StreamConfig.CompletionsStreamConfig, + observer: ObserverWithResult[CompletionStreamResponse, Result], + ): Future[Result] = { + val request = completionsRequest(ledgerId, config) + service.completionStream(request, observer) + logger.info(s"Started fetching completions") + observer.result + } + + private def completionsRequest( + ledgerId: String, + config: Config.StreamConfig.CompletionsStreamConfig, + ): CompletionStreamRequest = { + val request = CompletionStreamRequest.defaultInstance + .withLedgerId(ledgerId) + .withParties(List(config.party)) + .withApplicationId(config.applicationId) + + config.beginOffset match { + case Some(offset) => request.withOffset(offset) + case None => request + } + } + +} diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/TransactionService.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/TransactionService.scala index f57850a1af21..92fbfdb39e2c 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/TransactionService.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/services/TransactionService.scala @@ -28,23 +28,35 @@ final class TransactionService( TransactionServiceGrpc.stub(channel) def transactions[Result]( - config: Config.StreamConfig, + config: Config.StreamConfig.TransactionsStreamConfig, observer: ObserverWithResult[GetTransactionsResponse, Result], ): Future[Result] = { - val request = getTransactionsRequest(ledgerId, config) + val request = getTransactionsRequest( + ledgerId = ledgerId, + party = config.party, + templateIds = config.templateIds, + beginOffset = config.beginOffset, + endOffset = config.endOffset, + ) service.getTransactions(request, observer) logger.info("Started fetching transactions") observer.result } def transactionTrees[Result]( - config: Config.StreamConfig, + config: Config.StreamConfig.TransactionTreesStreamConfig, observer: ObserverWithResult[ GetTransactionTreesResponse, Result, ], ): Future[Result] = { - val request = getTransactionsRequest(ledgerId, config) + val request = getTransactionsRequest( + ledgerId = ledgerId, + party = config.party, + templateIds = config.templateIds, + beginOffset = config.beginOffset, + endOffset = config.endOffset, + ) service.getTransactionTrees(request, observer) logger.info("Started fetching transaction trees") observer.result @@ -52,13 +64,16 @@ final class TransactionService( private def getTransactionsRequest( ledgerId: String, - config: Config.StreamConfig, + party: String, + templateIds: Option[List[Identifier]], + beginOffset: Option[LedgerOffset], + endOffset: Option[LedgerOffset], ): GetTransactionsRequest = { GetTransactionsRequest.defaultInstance .withLedgerId(ledgerId) - .withBegin(config.beginOffset.getOrElse(ledgerBeginOffset)) - .withEnd(config.endOffset.getOrElse(ledgerEndOffset)) - .withFilter(partyFilter(config.party, config.templateIds)) + .withBegin(beginOffset.getOrElse(ledgerBeginOffset)) + .withEnd(endOffset.getOrElse(ledgerEndOffset)) + .withFilter(partyFilter(party, templateIds)) } private def partyFilter( diff --git a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/TypedActorSystemResourceOwner.scala b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/TypedActorSystemResourceOwner.scala index bd5b34571ea2..470f423bfec7 100644 --- a/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/TypedActorSystemResourceOwner.scala +++ b/ledger/ledger-api-bench-tool/src/main/scala/com/daml/ledger/api/benchtool/util/TypedActorSystemResourceOwner.scala @@ -3,8 +3,9 @@ package com.daml.ledger.api.benchtool.util -import akka.actor.typed.ActorSystem -import com.daml.ledger.resources.ResourceContext +import akka.actor.typed.{ActorSystem, Behavior, SpawnProtocol} +import akka.actor.typed.scaladsl.Behaviors +import com.daml.ledger.resources.{ResourceContext, ResourceOwner} import com.daml.resources.{AbstractResourceOwner, ReleasableResource, Resource} import scala.concurrent.Future @@ -17,3 +18,18 @@ class TypedActorSystemResourceOwner[BehaviorType]( ): Resource[ResourceContext, ActorSystem[BehaviorType]] = ReleasableResource(Future(acquireActorSystem()))(system => Future(system.terminate())) } + +object TypedActorSystemResourceOwner { + def owner(): ResourceOwner[ActorSystem[SpawnProtocol.Command]] = + new TypedActorSystemResourceOwner[SpawnProtocol.Command](() => + ActorSystem(Creator(), "Creator") + ) + + object Creator { + def apply(): Behavior[SpawnProtocol.Command] = + Behaviors.setup { context => + context.log.debug(s"Starting Creator actor") + SpawnProtocol() + } + } +} diff --git a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsManagerSpec.scala b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollectorSpec.scala similarity index 72% rename from ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsManagerSpec.scala rename to ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollectorSpec.scala index 9f7d7c9fd032..80531c59d5f7 100644 --- a/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsManagerSpec.scala +++ b/ledger/ledger-api-bench-tool/src/test/suite/scala/com/daml/ledger/api/benchtool/metrics/MetricsCollectorSpec.scala @@ -11,9 +11,9 @@ import akka.actor.testkit.typed.scaladsl.{ TestProbe, } import akka.actor.typed.{ActorRef, Behavior} -import com.daml.ledger.api.benchtool.metrics.MetricsManager.Message +import com.daml.ledger.api.benchtool.metrics.MetricsCollector.Message import com.daml.ledger.api.benchtool.metrics.objectives.ServiceLevelObjective -import com.daml.ledger.api.benchtool.metrics.{Metric, MetricValue, MetricsManager} +import com.daml.ledger.api.benchtool.metrics.{Metric, MetricValue, MetricsCollector} import com.daml.ledger.api.benchtool.util.MetricReporter import org.scalatest.wordspec.AnyWordSpecLike @@ -21,21 +21,23 @@ import java.time.Duration import scala.concurrent.duration._ import scala.util.Random -class MetricsManagerSpec extends ScalaTestWithActorTestKit(ManualTime.config) with AnyWordSpecLike { +class MetricsCollectorSpec + extends ScalaTestWithActorTestKit(ManualTime.config) + with AnyWordSpecLike { - "The MetricsManager" should { + "The MetricsCollector" should { val manualTime: ManualTime = ManualTime() "log periodic report" in { val logInterval = 100.millis - val manager = spawnManager(logInterval) + val collector = spawn(logInterval) val first = "first" val second = "second" - manager ! MetricsManager.Message.NewValue(first) - manager ! MetricsManager.Message.NewValue(second) + collector ! MetricsCollector.Message.NewValue(first) + collector ! MetricsCollector.Message.NewValue(second) manualTime.timePasses(logInterval - 1.milli) @@ -48,51 +50,51 @@ class MetricsManagerSpec extends ScalaTestWithActorTestKit(ManualTime.config) wi } "respond with metrics result on StreamCompleted message" in { - val manager = spawnManager() + val collector = spawn() val probe = aTestProbe() - manager ! MetricsManager.Message.StreamCompleted(probe.ref) - probe.expectMessage(MetricsManager.Message.MetricsResult.Ok) + collector ! MetricsCollector.Message.StreamCompleted(probe.ref) + probe.expectMessage(MetricsCollector.Message.MetricsResult.Ok) } "respond with information about violated objectives" in { - val manager = spawnManager() + val collector = spawn() val probe = aTestProbe() - manager ! MetricsManager.Message.NewValue("a value") - manager ! MetricsManager.Message.NewValue(TestObjective.TestViolatingValue) - manager ! MetricsManager.Message.StreamCompleted(probe.ref) + collector ! MetricsCollector.Message.NewValue("a value") + collector ! MetricsCollector.Message.NewValue(TestObjective.TestViolatingValue) + collector ! MetricsCollector.Message.StreamCompleted(probe.ref) - probe.expectMessage(MetricsManager.Message.MetricsResult.ObjectivesViolated) + probe.expectMessage(MetricsCollector.Message.MetricsResult.ObjectivesViolated) } "stop when the stream completes" in { val probe = aTestProbe() - val behaviorTestKit = BehaviorTestKit(managerBehavior()) + val behaviorTestKit = BehaviorTestKit(behavior()) behaviorTestKit.isAlive shouldBe true - behaviorTestKit.run(MetricsManager.Message.StreamCompleted(probe.ref)) + behaviorTestKit.run(MetricsCollector.Message.StreamCompleted(probe.ref)) behaviorTestKit.isAlive shouldBe false } } private def aTestProbe(): TestProbe[Message.MetricsResult] = - testKit.createTestProbe[MetricsManager.Message.MetricsResult]() + testKit.createTestProbe[MetricsCollector.Message.MetricsResult]() - private def spawnManager( + private def spawn( logInterval: FiniteDuration = 100.millis - ): ActorRef[MetricsManager.Message] = + ): ActorRef[MetricsCollector.Message] = testKit.spawn( - behavior = managerBehavior(logInterval), + behavior = behavior(logInterval), name = Random.alphanumeric.take(10).mkString, ) - private def managerBehavior( + private def behavior( logInterval: FiniteDuration = 100.millis - ): Behavior[MetricsManager.Message] = - MetricsManager[String]( + ): Behavior[MetricsCollector.Message] = + MetricsCollector[String]( streamName = "testStream", metrics = List(TestMetric()), logInterval = logInterval,