Skip to content

Commit

Permalink
DPP-390 configurable event decoding parallelism (#9931)
Browse files Browse the repository at this point in the history
* Make events decoding parallelism configurable

changelog_begin
changelog_end

* Add CLI parameters

* Rename parameter

* Change default value

* Reuse parameter

* Rename cli flag
  • Loading branch information
rautenrieth-da authored Jun 10, 2021
1 parent cc7af93 commit e1db529
Show file tree
Hide file tree
Showing 23 changed files with 82 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class ApiServerConfig(
tlsConfig: Option[TlsConfiguration],
maxInboundMessageSize: Int,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism,
portFile: Option[Path],
seeding: Seeding,
managementServiceTimeout: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ final class StandaloneApiServer(
databaseConnectionPoolSize = config.databaseConnectionPoolSize,
databaseConnectionTimeout = config.databaseConnectionTimeout,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ package com.daml.platform.configuration
object IndexConfiguration {

val DefaultEventsPageSize: Int = 1000
val DefaultEventsProcessingParallelism: Int = 8

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ private[platform] object JdbcIndex {
databaseConnectionPoolSize: Int,
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -44,6 +45,7 @@ private[platform] object JdbcIndex {
databaseConnectionTimeout = databaseConnectionTimeout,
initialLedgerId = ledgerId,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[platform] object ReadOnlySqlLedger {
databaseConnectionPoolSize: Int,
databaseConnectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -132,6 +133,7 @@ private[platform] object ReadOnlySqlLedger {
databaseConnectionPoolSize,
databaseConnectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ case class IndexerConfig(
databaseConnectionTimeout: FiniteDuration = DefaultDatabaseConnectionTimeout,
restartDelay: FiniteDuration = DefaultRestartDelay,
eventsPageSize: Int = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism: Int = IndexConfiguration.DefaultEventsProcessingParallelism,
updatePreparationParallelism: Int = DefaultUpdatePreparationParallelism,
allowExistingSchema: Boolean = false,
// TODO append-only: remove after removing support for the current (mutating) schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ object JdbcIndexer {
config.databaseConnectionPoolSize,
config.databaseConnectionTimeout,
config.eventsPageSize,
config.eventsProcessingParallelism,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object IndexMetadata {
connectionPoolSize = 1,
connectionTimeout = 250.millis,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private class JdbcLedgerDao(
dbType: DbType,
servicesExecutionContext: ExecutionContext,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
performPostCommitValidation: Boolean,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -914,7 +915,14 @@ private class JdbcLedgerDao(
)

override val transactionsReader: TransactionsReader =
new TransactionsReader(dbDispatcher, dbType, eventsPageSize, metrics, translation)(
new TransactionsReader(
dispatcher = dbDispatcher,
dbType = dbType,
pageSize = eventsPageSize,
eventProcessingParallelism = eventsProcessingParallelism,
metrics = metrics,
lfValueTranslation = translation,
)(
servicesExecutionContext
)

Expand Down Expand Up @@ -1021,6 +1029,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -1033,6 +1042,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize,
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = false,
servicesExecutionContext,
metrics,
Expand All @@ -1049,6 +1059,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -1063,6 +1074,7 @@ private[platform] object JdbcLedgerDao {
dbType.maxSupportedWriteConnections(connectionPoolSize),
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = false,
servicesExecutionContext,
metrics,
Expand All @@ -1081,6 +1093,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -1096,6 +1109,7 @@ private[platform] object JdbcLedgerDao {
dbType.maxSupportedWriteConnections(connectionPoolSize),
connectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
validate = true,
servicesExecutionContext,
metrics,
Expand Down Expand Up @@ -1158,6 +1172,7 @@ private[platform] object JdbcLedgerDao {
connectionPoolSize: Int,
connectionTimeout: FiniteDuration,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
validate: Boolean,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
Expand All @@ -1183,6 +1198,7 @@ private[platform] object JdbcLedgerDao {
dbType,
servicesExecutionContext,
eventsPageSize,
eventsProcessingParallelism,
validate,
metrics,
lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ import scala.util.{Failure, Success}
/** @param dispatcher Executes the queries prepared by this object
* @param executionContext Runs transformations on data fetched from the database, including Daml-LF value deserialization
* @param pageSize The number of events to fetch at a time the database when serving streaming calls
* @param eventProcessingParallelism The parallelism for loading and decoding state events
* @param lfValueTranslation The delegate in charge of translating serialized Daml-LF values
* @see [[PaginatingAsyncStream]]
*/
private[appendonlydao] final class TransactionsReader(
dispatcher: DbDispatcher,
dbType: DbType,
pageSize: Int,
eventProcessingParallelism: Int,
metrics: Metrics,
lfValueTranslation: LfValueTranslation,
)(implicit executionContext: ExecutionContext)
Expand All @@ -62,11 +64,6 @@ private[appendonlydao] final class TransactionsReader(
// This significantly improves the performance of the transaction service.
private val outputStreamBufferSize = 128

// TODO: make this parameter configurable
private val ContractStateEventsStreamParallelismLevel = 4

private val TransactionEventsFetchParallelism = 8

private val MinParallelFetchChunkSize = 10

private def offsetFor(response: GetTransactionsResponse): Offset =
Expand Down Expand Up @@ -270,13 +267,13 @@ private[appendonlydao] final class TransactionsReader(
.splitRange(
startExclusive._2,
endInclusive._2,
TransactionEventsFetchParallelism,
eventProcessingParallelism,
MinParallelFetchChunkSize,
)
.iterator
)
// Dispatch database fetches in parallel
.mapAsync(TransactionEventsFetchParallelism) { range =>
.mapAsync(eventProcessingParallelism) { range =>
dispatcher.executeSql(dbMetrics.getTransactionLogUpdates) { implicit conn =>
QueryNonPruned.executeSqlOrThrow(
query = TransactionLogUpdatesReader.readRawEvents(range),
Expand All @@ -288,7 +285,7 @@ private[appendonlydao] final class TransactionsReader(
}
.flatMapConcat(v => Source.fromIterator(() => v.iterator))
// Decode transaction log updates in parallel
.mapAsync(TransactionEventsFetchParallelism) { raw =>
.mapAsync(eventProcessingParallelism) { raw =>
Timed.future(
metrics.daml.index.decodeTransactionLogUpdate,
Future(TransactionLogUpdatesReader.toTransactionEvent(raw)),
Expand Down Expand Up @@ -401,7 +398,7 @@ private[appendonlydao] final class TransactionsReader(
query,
nextPageRangeContracts(endInclusive),
)(EventsRange(startExclusive, endInclusive)).async
.mapAsync(ContractStateEventsStreamParallelismLevel) { raw =>
.mapAsync(eventProcessingParallelism) { raw =>
Timed.future(
metrics.daml.index.decodeStateEvent,
Future(ContractStateEventsReader.toContractStateEvent(raw, lfValueTranslation)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {

protected def enableAppendOnlySchema: Boolean

protected def daoOwner(eventsPageSize: Int)(implicit
protected def daoOwner(
eventsPageSize: Int,
eventsProcessingParallelism: Int,
)(implicit
loggingContext: LoggingContext
): ResourceOwner[LedgerDao] =
if (!enableAppendOnlySchema) {
Expand All @@ -69,6 +72,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
connectionPoolSize = 16,
connectionTimeout = 250.millis,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand All @@ -92,7 +96,7 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll {
_ <- Resource.fromFuture(
new FlywayMigrations(jdbcUrl).migrate(enableAppendOnlySchema = enableAppendOnlySchema)
)
dao <- daoOwner(100).acquire()
dao <- daoOwner(100, 4).acquire()
_ <- Resource.fromFuture(dao.initializeLedger(TestLedgerId))
_ <- Resource.fromFuture(dao.initializeParticipantId(TestParticipantId))
} yield dao
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import scala.concurrent.duration.DurationInt
private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>

override protected def daoOwner(eventsPageSize: Int)(implicit
override protected def daoOwner(eventsPageSize: Int, eventsProcessingParallelism: Int)(implicit
loggingContext: LoggingContext
): ResourceOwner[LedgerDao] =
): ResourceOwner[LedgerDao] = {
val _ = eventsProcessingParallelism // Silence unused param warning
JdbcLedgerDao
.validatingWriteOwner(
serverRole = ServerRole.Testing(getClass),
Expand All @@ -35,6 +36,7 @@ private[dao] trait JdbcLedgerDaoPostCommitValidationSpec extends LoneElement {
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
)
}

private val ok = io.grpc.Status.Code.OK.value()
private val aborted = io.grpc.Status.Code.ABORTED.value()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid

// `pageSize = 2` and the offset gaps in the `commandWithOffsetGaps` above are to make sure
// that streaming works with event pages separated by offsets that don't have events in the store
ledgerDao <- createLedgerDao(pageSize = 2)
ledgerDao <- createLedgerDao(pageSize = 2, eventsProcessingParallelism = 8)

response <- ledgerDao.transactionsReader
.getFlatTransactions(
Expand Down Expand Up @@ -624,9 +624,12 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
): Vector[Transaction] =
responses.foldLeft(Vector.empty[Transaction])((b, a) => b ++ a._2.transactions.toVector)

private def createLedgerDao(pageSize: Int) =
private def createLedgerDao(pageSize: Int, eventsProcessingParallelism: Int) =
LoggingContext.newLoggingContext { implicit loggingContext =>
daoOwner(eventsPageSize = pageSize).acquire()(ResourceContext(executionContext))
daoOwner(
eventsPageSize = pageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
).acquire()(ResourceContext(executionContext))
}.asFuture

// XXX SC much of this is repeated because we're more concerned here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final case class Config[Extra](
participants: Seq[ParticipantConfig],
maxInboundMessageSize: Int,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
stateValueCache: caching.WeightedCache.Configuration,
lfValueTranslationEventCache: caching.SizedCache.Configuration,
lfValueTranslationContractCache: caching.SizedCache.Configuration,
Expand Down Expand Up @@ -63,6 +64,7 @@ object Config {
participants = Vector.empty,
maxInboundMessageSize = DefaultMaxInboundMessageSize,
eventsPageSize = IndexConfiguration.DefaultEventsPageSize,
eventsProcessingParallelism = IndexConfiguration.DefaultEventsProcessingParallelism,
stateValueCache = caching.WeightedCache.Configuration.none,
lfValueTranslationEventCache = caching.SizedCache.Configuration.none,
lfValueTranslationContractCache = caching.SizedCache.Configuration.none,
Expand Down Expand Up @@ -390,6 +392,15 @@ object Config {
)
.action((eventsPageSize, config) => config.copy(eventsPageSize = eventsPageSize))

opt[Int]("buffers-prefetching-parallelism")
.optional()
.text(
s"Number of events fetched/decoded in parallel for populating the Ledger API internal buffers. Default is ${IndexConfiguration.DefaultEventsProcessingParallelism}."
)
.action((eventsProcessingParallelism, config) =>
config.copy(eventsProcessingParallelism = eventsProcessingParallelism)
)

opt[Long]("max-state-value-cache-size")
.optional()
.text(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait ConfigProvider[ExtraConfig] {
databaseConnectionPoolSize = participantConfig.indexerConfig.databaseConnectionPoolSize,
startupMode = IndexerStartupMode.MigrateAndStart,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
allowExistingSchema = participantConfig.indexerConfig.allowExistingSchema,
enableAppendOnlySchema = config.enableAppendOnlySchema,
maxInputBufferSize = participantConfig.indexerConfig.maxInputBufferSize,
Expand Down Expand Up @@ -70,6 +71,7 @@ trait ConfigProvider[ExtraConfig] {
tlsConfig = config.tlsConfig,
maxInboundMessageSize = config.maxInboundMessageSize,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
portFile = participantConfig.portFile,
seeding = config.seeding,
managementServiceTimeout = participantConfig.managementServiceTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ final class SandboxServer(
transactionCommitter = transactionCommitter,
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[sandbox] object SandboxIndexAndWriteService {
transactionCommitter: TransactionCommitter,
templateStore: InMemoryPackageStore,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand All @@ -83,6 +84,7 @@ private[sandbox] object SandboxIndexAndWriteService {
transactionCommitter = transactionCommitter,
startMode = startMode,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[sandbox] object SqlLedger {
transactionCommitter: TransactionCommitter,
startMode: SqlStartMode,
eventsPageSize: Int,
eventsProcessingParallelism: Int,
servicesExecutionContext: ExecutionContext,
metrics: Metrics,
lfValueTranslationCache: LfValueTranslationCache.Cache,
Expand Down Expand Up @@ -246,6 +247,7 @@ private[sandbox] object SqlLedger {
connectionPoolSize = databaseConnectionPoolSize,
connectionTimeout = databaseConnectionTimeout,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private[sandbox] object LedgerResource {
transactionCommitter = StandardTransactionCommitter,
startMode = SqlStartMode.ResetAndStart,
eventsPageSize = 100,
eventsProcessingParallelism = 8,
servicesExecutionContext = servicesExecutionContext,
metrics = new Metrics(metrics),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
Expand Down
Loading

0 comments on commit e1db529

Please sign in to comment.