Skip to content

Commit

Permalink
Allow defining ledger-begin and ledger-end offset conditions in ledge…
Browse files Browse the repository at this point in the history
…r-api-bench-tool [DPP-836] (#12521)

* Allow defining ledger-begin and ledger-end offset conditions for transactions and transaction-trees streams in ledger-api-bench-tool

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool allow to define ledger-begin and ledger-end offset boundaries for streams.
CHANGELOG_END

* Print ledger-api-bench-tool config in black&white to avoid colouring special characters printed

* Reduce duplication

* Add more unit tests for offset
  • Loading branch information
kamil-da authored Jan 31, 2022
1 parent f1560ce commit 49f37b8
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 95 deletions.
2 changes: 1 addition & 1 deletion bazel-java-deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def install_java_deps():
"com.google.code.gson:gson:2.8.2",
"com.google.guava:guava:29.0-jre",
"com.h2database:h2:2.1.210",
"com.lihaoyi:pprint_{}:0.6.0".format(scala_major_version),
"com.lihaoyi:pprint_{}:0.7.1".format(scala_major_version),
"com.lihaoyi:sjsonnet_{}:0.3.0".format(scala_major_version),
"commons-io:commons-io:2.5",
"com.oracle.database.jdbc:ojdbc8:19.8.0.0",
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-bench-tool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ da_scala_test_suite(
deps = [
":ledger-api-bench-tool-lib",
"//language-support/scala/bindings",
"//ledger/ledger-api-common",
"//ledger/metrics",
"@maven//:com_typesafe_config",
"@maven//:io_dropwizard_metrics_metrics_core",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.daml.ledger.resources.{ResourceContext, ResourceOwner}
import io.grpc.Channel
import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
import org.slf4j.{Logger, LoggerFactory}
import pprint.PPrinter

import java.util.concurrent.{
ArrayBlockingQueue,
Expand Down Expand Up @@ -146,6 +145,6 @@ object LedgerApiBenchTool {
)

private val logger: Logger = LoggerFactory.getLogger(getClass)
private val printer: PPrinter = pprint.PPrinter(200, 1000)
private val printer = pprint.PPrinter.BlackWhite
private def prettyPrint(x: Any): String = printer(x).toString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,33 @@ object Cli {
}

def offset(stringValue: String): LedgerOffset =
LedgerOffset.defaultInstance.withAbsolute(stringValue)
stringValue match {
case "ledger-begin" =>
LedgerOffset.defaultInstance.withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN)
case "ledger-end" =>
LedgerOffset.defaultInstance.withBoundary(LedgerOffset.LedgerBoundary.LEDGER_END)
case _ =>
LedgerOffset.defaultInstance.withAbsolute(stringValue)
}

def transactionObjectives(
maxDelaySeconds: Option[Long],
minConsumptionSpeed: Option[Double],
minItemRate: Option[Double],
maxItemRate: Option[Double],
): Option[WorkflowConfig.StreamConfig.TransactionObjectives] =
(maxDelaySeconds, minConsumptionSpeed, minItemRate, maxItemRate) match {
case (None, None, None, None) => None
case _ =>
Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
)
}

def transactionsConfig
: Either[String, WorkflowConfig.StreamConfig.TransactionsStreamConfig] = for {
Expand All @@ -187,14 +213,8 @@ object Cli {
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
objectives =
transactionObjectives(maxDelaySeconds, minConsumptionSpeed, minItemRate, maxItemRate),
)

def transactionTreesConfig
Expand All @@ -213,16 +233,25 @@ object Cli {
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Some(
WorkflowConfig.StreamConfig.TransactionObjectives(
maxDelaySeconds = maxDelaySeconds,
minConsumptionSpeed = minConsumptionSpeed,
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
objectives =
transactionObjectives(maxDelaySeconds, minConsumptionSpeed, minItemRate, maxItemRate),
)

def rateObjectives(
minItemRate: Option[Double],
maxItemRate: Option[Double],
): Option[WorkflowConfig.StreamConfig.RateObjectives] =
(minItemRate, maxItemRate) match {
case (None, None) => None
case _ =>
Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
)
}

def activeContractsConfig
: Either[String, WorkflowConfig.StreamConfig.ActiveContractsStreamConfig] = for {
name <- stringField("name")
Expand All @@ -232,12 +261,7 @@ object Cli {
} yield WorkflowConfig.StreamConfig.ActiveContractsStreamConfig(
name = name,
filters = filters,
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
objectives = rateObjectives(minItemRate, maxItemRate),
)

def completionsConfig: Either[String, WorkflowConfig.StreamConfig.CompletionsStreamConfig] =
Expand All @@ -253,12 +277,7 @@ object Cli {
party = party,
applicationId = applicationId,
beginOffset = beginOffset,
objectives = Some(
WorkflowConfig.StreamConfig.RateObjectives(
minItemRate = minItemRate,
maxItemRate = maxItemRate,
)
),
objectives = rateObjectives(minItemRate, maxItemRate),
)

val config = stringField("stream-type").flatMap[String, WorkflowConfig.StreamConfig] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object WorkflowConfig {
)
}

sealed trait StreamConfig {
sealed trait StreamConfig extends Product with Serializable {
def name: String
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ object WorkflowConfigParser {
"max_item_rate",
)(StreamConfig.RateObjectives.apply)

implicit val offsetDecoder: Decoder[LedgerOffset] =
Decoder.decodeString.map(LedgerOffset.defaultInstance.withAbsolute)
implicit val offsetDecoder: Decoder[LedgerOffset] = {
Decoder.decodeString.map {
case "ledger-begin" => LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN)
case "ledger-end" => LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_END)
case absolute => LedgerOffset.defaultInstance.withAbsolute(absolute)
}
}

implicit val partyFilterDecoder: Decoder[StreamConfig.PartyFilter] =
Decoder.forProduct2(
Expand Down
Loading

0 comments on commit 49f37b8

Please sign in to comment.