Skip to content

Commit

Permalink
ledger-api-bench-tool - Active contracts stream, completions stream…
Browse files Browse the repository at this point in the history
… [DPP-398, DPP-399] (#9857)

* ledger-api-bench-tool: Active contracts streams

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - reading active contract streams
CHANGELOG_END

* ledger-api-bench-tool: Completions stream (#9872)

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - Reading completions stream
CHANGELOG_END
  • Loading branch information
kamil-da authored Jun 9, 2021
1 parent fce8e54 commit 4e49cf6
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

package com.daml.ledger.api.benchtool

import com.daml.ledger.api.benchtool.Config.StreamConfig
import com.daml.ledger.api.tls.TlsConfigurationCli
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.value.Identifier
import scopt.{OptionParser, Read}
import scopt.{OptionDef, OptionParser, Read}

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -35,7 +36,7 @@ object Cli {
s"Stream configuration."
)
.valueName(
"stream-type=<transactions|transaction-trees>,name=<streamName>,party=<party>[,begin-offset=<offset>][,end-offset=<offset>][,template-ids=<id1>|<id2>][,max-delay=<seconds>]"
"<param1>=<value1>,<param2>=<value2>,..."
)
.action { case (streamConfig, config) =>
config.copy(streams = config.streams :+ streamConfig)
Expand Down Expand Up @@ -66,6 +67,34 @@ object Cli {

help("help").text("Prints this information")

private def note(level: Int, param: String, desc: String = ""): OptionDef[Unit, Config] = {
val paddedParam = s"${" " * level * 2}$param"
val internalPadding = math.max(1, 40 - paddedParam.length)
note(s"$paddedParam${" " * internalPadding}$desc")
}

note(0, "")
note(0, "Stream configuration parameters:")
note(1, "Transactions/transaction trees:")
note(2, "stream-type=<transactions|transaction-trees>", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "begin-offset=<offset>")
note(2, "end-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
note(2, "max-delay=<seconds>", "Max record time delay objective")
note(2, "min-consumption-speed=<speed>", "Min consumption speed objective")
note(1, "Active contract sets:")
note(2, "stream-type=active-contracts", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "template-ids=<id1>|<id2>")
note(1, "Command completions:")
note(2, "stream-type=completions", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "begin-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
}

def config(args: Array[String]): Option[Config] =
Expand Down Expand Up @@ -99,14 +128,9 @@ object Cli {
def offset(stringValue: String): LedgerOffset =
LedgerOffset.defaultInstance.withAbsolute(stringValue)

val config = for {
def transactionsConfig: Either[String, StreamConfig.TransactionsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
streamType <- stringField("stream-type").flatMap[String, Config.StreamConfig.StreamType] {
case "transactions" => Right(Config.StreamConfig.StreamType.Transactions)
case "transaction-trees" => Right(Config.StreamConfig.StreamType.TransactionTrees)
case invalid => Left(s"Invalid stream type: $invalid")
}
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
Expand All @@ -115,9 +139,8 @@ object Cli {
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig(
} yield Config.StreamConfig.TransactionsStreamConfig(
name = name,
streamType = streamType,
party = party,
templateIds = templateIds,
beginOffset = beginOffset,
Expand All @@ -128,6 +151,63 @@ object Cli {
),
)

def transactionTreesConfig: Either[String, StreamConfig.TransactionTreesStreamConfig] =
for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig.TransactionTreesStreamConfig(
name = name,
party = party,
templateIds = templateIds,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Config.StreamConfig.Objectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
),
)

def activeContractsConfig: Either[String, StreamConfig.ActiveContractsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
} yield Config.StreamConfig.ActiveContractsStreamConfig(
name = name,
party = party,
templateIds = templateIds,
)

def completionsConfig: Either[String, StreamConfig.CompletionsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
applicationId <- stringField("application-id")
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
} yield Config.StreamConfig.CompletionsStreamConfig(
name = name,
party = party,
applicationId = applicationId,
beginOffset = beginOffset,
)

val config = stringField("stream-type").flatMap[String, Config.StreamConfig] {
case "transactions" => transactionsConfig
case "transaction-trees" => transactionTreesConfig
case "active-contracts" => activeContractsConfig
case "completions" => completionsConfig
case invalid => Left(s"Invalid stream type: $invalid")
}

config.fold(error => throw new IllegalArgumentException(error), identity)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,42 @@ case class Config(
)

object Config {
case class StreamConfig(
name: String,
streamType: Config.StreamConfig.StreamType,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
)
trait StreamConfig {
def name: String
def party: String
}

object StreamConfig {
sealed trait StreamType
object StreamType {
case object Transactions extends StreamType
case object TransactionTrees extends StreamType
}
case class TransactionsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig

case class TransactionTreesStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig

case class ActiveContractsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
) extends StreamConfig

case class CompletionsStreamConfig(
name: String,
party: String,
applicationId: String,
beginOffset: Option[LedgerOffset],
) extends StreamConfig

case class Objectives(
maxDelaySeconds: Option[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@

package com.daml.ledger.api.benchtool

import akka.actor.typed.{ActorSystem, SpawnProtocol}
import com.daml.ledger.api.benchtool.metrics.{
Creator,
MeteredStreamObserver,
MetricsManager,
TransactionMetrics,
import com.daml.ledger.api.benchtool.metrics.{MetricsCollector, MetricsSet, StreamMetrics}
import com.daml.ledger.api.benchtool.services.{
ActiveContractsService,
CommandCompletionService,
LedgerIdentityService,
TransactionService,
}
import com.daml.ledger.api.benchtool.services.{LedgerIdentityService, TransactionService}
import com.daml.ledger.api.benchtool.util.TypedActorSystemResourceOwner
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsResponse,
}
import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
Expand Down Expand Up @@ -58,53 +53,65 @@ object LedgerApiBenchTool {
val resources = for {
executorService <- threadPoolExecutorOwner(config.concurrency)
channel <- channelOwner(config.ledger, config.tls, executorService)
system <- actorSystemResourceOwner()
system <- TypedActorSystemResourceOwner.owner()
} yield (channel, system)

resources.use { case (channel, system) =>
val ledgerIdentityService: LedgerIdentityService = new LedgerIdentityService(channel)
val ledgerId: String = ledgerIdentityService.fetchLedgerId()
val transactionService = new TransactionService(channel, ledgerId)
val activeContractsService = new ActiveContractsService(channel, ledgerId)
val commandCompletionService = new CommandCompletionService(channel, ledgerId)
Future
.traverse(config.streams) { streamConfig =>
streamConfig.streamType match {
case Config.StreamConfig.StreamType.Transactions =>
TransactionMetrics
.transactionsMetricsManager(
streamConfig.name,
config.reportingPeriod,
streamConfig.objectives,
)(system)
.flatMap { manager =>
val observer: MeteredStreamObserver[GetTransactionsResponse] =
new MeteredStreamObserver[GetTransactionsResponse](
streamConfig.name,
logger,
manager,
)(system)
transactionService.transactions(streamConfig, observer)
}
case Config.StreamConfig.StreamType.TransactionTrees =>
TransactionMetrics
.transactionTreesMetricsManager(
streamConfig.name,
config.reportingPeriod,
streamConfig.objectives,
)(system)
.flatMap { manager =>
val observer =
new MeteredStreamObserver[GetTransactionTreesResponse](
streamConfig.name,
logger,
manager,
)(system)
transactionService.transactionTrees(streamConfig, observer)
}
}
.traverse(config.streams) {
case streamConfig: Config.StreamConfig.TransactionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionMetrics(streamConfig.objectives),
logger = logger,
)(system, ec)
.flatMap { observer =>
transactionService.transactions(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.TransactionTreesStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.transactionTreesMetrics(streamConfig.objectives),
logger = logger,
)(system, ec)
.flatMap { observer =>
transactionService.transactionTrees(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.ActiveContractsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.activeContractsMetrics,
logger = logger,
)(system, ec)
.flatMap { observer =>
activeContractsService.getActiveContracts(streamConfig, observer)
}
case streamConfig: Config.StreamConfig.CompletionsStreamConfig =>
StreamMetrics
.observer(
streamName = streamConfig.name,
logInterval = config.reportingPeriod,
metrics = MetricsSet.completionsMetrics,
logger = logger,
)(system, ec)
.flatMap { observer =>
commandCompletionService.completions(streamConfig, observer)
}
}
.transform {
case Success(results) =>
if (results.contains(MetricsManager.Message.MetricsResult.ObjectivesViolated))
if (results.contains(MetricsCollector.Message.MetricsResult.ObjectivesViolated))
Failure(new RuntimeException("Metrics objectives not met."))
else Success(())
case Failure(ex) =>
Expand Down Expand Up @@ -143,11 +150,6 @@ object LedgerApiBenchTool {
ResourceOwner.forChannel(channelBuilder, ShutdownTimeout)
}

private def actorSystemResourceOwner(): ResourceOwner[ActorSystem[SpawnProtocol.Command]] =
new TypedActorSystemResourceOwner[SpawnProtocol.Command](() =>
ActorSystem(Creator(), "Creator")
)

private def threadPoolExecutorOwner(
config: Config.Concurrency
): ResourceOwner[ThreadPoolExecutor] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.daml.ledger.api.benchtool.metrics

import akka.actor.typed.{ActorRef, ActorSystem}
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import org.slf4j.Logger

Expand All @@ -12,25 +11,17 @@ import scala.concurrent.Future
class MeteredStreamObserver[T](
val streamName: String,
logger: Logger,
metricsManager: ActorRef[MetricsManager.Message],
)(implicit system: ActorSystem[_])
extends ObserverWithResult[T, MetricsManager.Message.MetricsResult](logger) {
import MetricsManager.Message._
manager: MetricsManager[T],
) extends ObserverWithResult[T, MetricsCollector.Message.MetricsResult](logger) {

override def onNext(value: T): Unit = {
metricsManager ! NewValue(value)
manager.sendNewValue(value)
super.onNext(value)
}

override def completeWith(): Future[MetricsResult] = {
// TODO: abstract over the ask pattern (possibly a container with the actor and a method for asking)
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout

import scala.concurrent.duration._
logger.debug(withStreamName(s"Sending $StreamCompleted notification."))
implicit val timeout: Timeout = 3.seconds
metricsManager.ask(StreamCompleted(_))
override def completeWith(): Future[MetricsCollector.Message.MetricsResult] = {
logger.debug(withStreamName(s"Asking for stream result..."))
manager.result()
}

}
Loading

0 comments on commit 4e49cf6

Please sign in to comment.