Skip to content

Commit

Permalink
ledger-api-bench-tool - flexible stream filters [DPP-667] (#11458)
Browse files Browse the repository at this point in the history
* ACS filters

* Moved filters to a separate object

* Filters for transactions and transaction trees streams

* Updated help

CHANGELOG_BEGIN
- [Integration Kit] - ledger-api-bench-tool - added flexible party and template filters for transactions, transaction trees and active contract streams
CHANGELOG_END

* Changed filter separators to + and @
  • Loading branch information
kamil-da authored Nov 4, 2021
1 parent 214c7ab commit d86fe92
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,19 @@ object Cli {
note(1, "Transactions/transaction trees:")
note(2, "stream-type=<transactions|transaction-trees>", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "filters=party1|template1|template2&party2", "(required)")
note(2, "begin-offset=<offset>")
note(2, "end-offset=<offset>")
note(2, "template-ids=<id1>|<id2>")
note(2, "max-delay=<seconds>", "Max record time delay objective")
note(2, "min-consumption-speed=<speed>", "Min consumption speed objective")
note(1, "Active contract sets:")
note(2, "stream-type=active-contracts", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
note(2, "party=<party>", "(required)")
note(2, "template-ids=<id1>|<id2>")
note(
2,
"filters=party1@template1@template2+party2",
"List of per-party filters separated by the plus symbol (required)",
)
note(1, "Command completions:")
note(2, "stream-type=completions", "(required)")
note(2, "name=<stream-name>", "Stream name used to identify results (required)")
Expand Down Expand Up @@ -153,19 +155,14 @@ object Cli {

def transactionsConfig: Either[String, StreamConfig.TransactionsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
filters <- stringField("filters").flatMap(filters)
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig.TransactionsStreamConfig(
name = name,
party = party,
templateIds = templateIds,
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Config.StreamConfig.Objectives(
Expand All @@ -177,19 +174,14 @@ object Cli {
def transactionTreesConfig: Either[String, StreamConfig.TransactionTreesStreamConfig] =
for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
filters <- stringField("filters").flatMap(filters)
beginOffset <- optionalStringField("begin-offset").map(_.map(offset))
endOffset <- optionalStringField("end-offset").map(_.map(offset))
maxDelaySeconds <- optionalLongField("max-delay")
minConsumptionSpeed <- optionalDoubleField("min-consumption-speed")
} yield Config.StreamConfig.TransactionTreesStreamConfig(
name = name,
party = party,
templateIds = templateIds,
filters = filters,
beginOffset = beginOffset,
endOffset = endOffset,
objectives = Config.StreamConfig.Objectives(
Expand All @@ -200,15 +192,10 @@ object Cli {

def activeContractsConfig: Either[String, StreamConfig.ActiveContractsStreamConfig] = for {
name <- stringField("name")
party <- stringField("party")
templateIds <- optionalStringField("template-ids").flatMap {
case Some(ids) => listOfTemplateIds(ids).map(Some(_))
case None => Right(None)
}
filters <- stringField("filters").flatMap(filters)
} yield Config.StreamConfig.ActiveContractsStreamConfig(
name = name,
party = party,
templateIds = templateIds,
filters = filters,
)

def completionsConfig: Either[String, StreamConfig.CompletionsStreamConfig] = for {
Expand All @@ -234,19 +221,42 @@ object Cli {
config.fold(error => throw new IllegalArgumentException(error), identity)
}

private def listOfTemplateIds(listOfIds: String): Either[String, List[Identifier]] =
private def filters(listOfIds: String): Either[String, Map[String, Option[List[Identifier]]]] =
listOfIds
.split('|')
.split('+')
.toList
.map(templateIdFromString)
.foldLeft[Either[String, List[Identifier]]](Right(List.empty[Identifier])) {
.map(filter)
.foldLeft[Either[String, Map[String, List[Identifier]]]](Right(Map.empty)) {
case (acc, next) =>
for {
ids <- acc
id <- next
} yield id :: ids
filters <- acc
filter <- next
} yield filters + filter
}
.map { filters =>
filters.map { case (party, templateIds) =>
party -> Some(templateIds).filter(_.nonEmpty)
}
}

private def filter(filterString: String): Either[String, (String, List[Identifier])] =
filterString
.split('@')
.toList match {
case party :: templates =>
templates
.map(templateIdFromString)
.foldLeft[Either[String, List[Identifier]]](Right(List.empty[Identifier])) {
case (acc, next) =>
for {
ids <- acc
id <- next
} yield id :: ids
}
.map(party -> _)
case _ => Left("Filter cannot be empty")
}

private def templateIdFromString(fullyQualifiedTemplateId: String): Either[String, Identifier] =
fullyQualifiedTemplateId
.split(':')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,28 @@ case class Config(
object Config {
trait StreamConfig {
def name: String
def party: String
}

object StreamConfig {
case class TransactionsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
filters: Map[String, Option[List[Identifier]]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig

case class TransactionTreesStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
filters: Map[String, Option[List[Identifier]]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
objectives: StreamConfig.Objectives,
) extends StreamConfig

case class ActiveContractsStreamConfig(
name: String,
party: String,
templateIds: Option[List[Identifier]],
filters: Map[String, Option[List[Identifier]]],
) extends StreamConfig

case class CompletionsStreamConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool.services
import com.daml.ledger.api.benchtool.Config
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import com.daml.ledger.api.v1.active_contracts_service._
import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
import io.grpc.Channel
import org.slf4j.LoggerFactory

Expand All @@ -33,26 +32,9 @@ final class ActiveContractsService(
private def getActiveContractsRequest(
ledgerId: String,
config: Config.StreamConfig.ActiveContractsStreamConfig,
) = {
val templatesFilter = config.templateIds match {
case Some(ids) =>
Filters.defaultInstance.withInclusive(
InclusiveFilters.defaultInstance.addAllTemplateIds(ids)
)
case None =>
Filters.defaultInstance
}

): GetActiveContractsRequest =
GetActiveContractsRequest.defaultInstance
.withLedgerId(ledgerId)
.withFilter(
TransactionFilter.defaultInstance
.withFiltersByParty(
Map(
config.party -> templatesFilter
)
)
)
}
.withFilter(StreamFilters.transactionFilters(config.filters))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api.benchtool.services

import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
import com.daml.ledger.api.v1.value.Identifier

object StreamFilters {

def transactionFilters(
filters: Map[String, Option[List[Identifier]]]
): TransactionFilter = {
val byParty: Map[String, Filters] = filters.map {
case (party, Some(templateIds)) =>
party -> Filters.defaultInstance.withInclusive(
InclusiveFilters.defaultInstance.addAllTemplateIds(templateIds)
)
case (party, None) =>
party -> Filters.defaultInstance
}

TransactionFilter.defaultInstance.withFiltersByParty(byParty)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.ledger.api.benchtool.services
import com.daml.ledger.api.benchtool.Config
import com.daml.ledger.api.benchtool.util.ObserverWithResult
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.transaction_filter.{Filters, InclusiveFilters, TransactionFilter}
import com.daml.ledger.api.v1.transaction_service.{
GetTransactionTreesResponse,
GetTransactionsRequest,
Expand All @@ -33,8 +32,7 @@ final class TransactionService(
): Future[Result] = {
val request = getTransactionsRequest(
ledgerId = ledgerId,
party = config.party,
templateIds = config.templateIds,
filters = config.filters,
beginOffset = config.beginOffset,
endOffset = config.endOffset,
)
Expand All @@ -52,8 +50,7 @@ final class TransactionService(
): Future[Result] = {
val request = getTransactionsRequest(
ledgerId = ledgerId,
party = config.party,
templateIds = config.templateIds,
filters = config.filters,
beginOffset = config.beginOffset,
endOffset = config.endOffset,
)
Expand All @@ -64,38 +61,21 @@ final class TransactionService(

private def getTransactionsRequest(
ledgerId: String,
party: String,
templateIds: Option[List[Identifier]],
filters: Map[String, Option[List[Identifier]]],
beginOffset: Option[LedgerOffset],
endOffset: Option[LedgerOffset],
): GetTransactionsRequest = {
val getTransactionsRequest = GetTransactionsRequest.defaultInstance
.withLedgerId(ledgerId)
.withBegin(beginOffset.getOrElse(ledgerBeginOffset))
.withFilter(partyFilter(party, templateIds))
.withFilter(StreamFilters.transactionFilters(filters))

endOffset match {
case Some(end) => getTransactionsRequest.withEnd(end)
case None => getTransactionsRequest
}
}

private def partyFilter(
party: String,
templateIds: Option[List[Identifier]],
): TransactionFilter = {
val templatesFilter = templateIds match {
case Some(ids) =>
Filters.defaultInstance.withInclusive(
InclusiveFilters.defaultInstance.addAllTemplateIds(ids)
)
case None =>
Filters.defaultInstance
}
TransactionFilter()
.withFiltersByParty(Map(party -> templatesFilter))
}

private def ledgerBeginOffset: LedgerOffset =
LedgerOffset().withBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN)

Expand Down

0 comments on commit d86fe92

Please sign in to comment.