Skip to content

Commit

Permalink
sandbox-classic: Only allow --max-parallel-submissions here. (#10941)
Browse files Browse the repository at this point in the history
This option is only used by Sandbox Classic and Daml Driver for SQL.
There is no reason for it to be part of the command service
configuration.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
SamirTalwar authored Sep 20, 2021
1 parent ac02dbd commit 88ef05e
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 36 deletions.
8 changes: 8 additions & 0 deletions ledger/daml-on-sql/src/main/scala/on/sql/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private[sql] final class Cli(
)
.action((mode, config) => config.copy(sqlStartMode = PostgresStartupMode.fromString(mode)))

parser
.opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) => config.copy(maxParallelSubmissions = value))
.text(
s"Maximum number of successfully interpreted commands waiting to be sequenced. The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a `RESOURCE_EXHAUSTED` error code. Default is ${defaultConfig.maxParallelSubmissions}."
)

// Ideally we would set the relevant options to `required()`, but it doesn't seem to work.
// Even when the value is provided, it still reports that it's missing. Instead, we check the
// configuration afterwards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@ import java.time.Duration
/** Reaching either [[inputBufferSize]] or [[maxCommandsInFlight]] will trigger
* back-pressure by [[com.daml.ledger.client.services.commands.CommandClient]].
*
* Reaching [[maxParallelSubmissions]] will trigger back-pressure
* by [[com.daml.platform.sandbox.stores.ledger.sql.SqlLedger]].
*
* @param inputBufferSize
* Maximum number of commands waiting to be submitted for each party.
* @param maxParallelSubmissions
* Maximum number of commands waiting to be sequenced after being evaluated by the engine.
* This does _not_ apply to on-X ledgers, where sequencing happens after the evaluated
* transaction has been shipped via the WriteService.
* @param maxCommandsInFlight
* Maximum number of submitted commands waiting to be completed for each party.
* @param trackerRetentionPeriod
Expand All @@ -26,7 +19,6 @@ import java.time.Duration
*/
final case class CommandConfiguration(
inputBufferSize: Int,
maxParallelSubmissions: Int,
maxCommandsInFlight: Int,
trackerRetentionPeriod: Duration,
)
Expand All @@ -37,7 +29,6 @@ object CommandConfiguration {
lazy val default: CommandConfiguration =
CommandConfiguration(
inputBufferSize = 512,
maxParallelSubmissions = 512,
maxCommandsInFlight = 256,
trackerRetentionPeriod = DefaultTrackerRetentionPeriod,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,6 @@ object Config {
"Maximum number of submitted commands waiting for completion for each party (only applied when using the CommandService). Overflowing this threshold will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 256."
)

opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) =>
config.copy(commandConfig = config.commandConfig.copy(maxParallelSubmissions = value))
)
.text(
"Maximum number of successfully interpreted commands waiting to be sequenced (applied only when running sandbox-classic). The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 512."
)

opt[Int]("input-buffer-size")
.optional()
.action((value, config) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ private[sandbox] object Cli extends SandboxCli {
)
.action((url, config) => config.copy(jdbcUrl = Some(url)))
parser
.opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) => config.copy(maxParallelSubmissions = value))
.text(
s"Maximum number of successfully interpreted commands waiting to be sequenced. The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a `RESOURCE_EXHAUSTED` error code. Default is ${defaultConfig.maxParallelSubmissions}."
)
parser
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ final class SandboxServer(
timeProvider = timeProvider,
ledgerEntries = ledgerEntries,
startMode = startMode,
queueDepth = config.commandConfig.maxParallelSubmissions,
queueDepth = config.maxParallelSubmissions,
transactionCommitter = transactionCommitter,
templateStore = packageStore,
eventsPageSize = config.eventsPageSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ sealed trait CommandServiceBackPressureITBase
super.config.copy(
commandConfig = super.config.commandConfig.copy(
inputBufferSize = 1,
maxParallelSubmissions = 2,
maxCommandsInFlight = 2,
)
),
maxParallelSubmissions = 2,
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ sealed trait CompletionServiceITBase
super.config.copy(
commandConfig = super.config.commandConfig.copy(
inputBufferSize = 1,
maxParallelSubmissions = 2,
maxCommandsInFlight = 2,
)
),
maxParallelSubmissions = 2,
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class TransactionStreamTerminationIT
jdbcUrl = Some("jdbc:h2:mem:static_time;db_close_delay=-1"),
timeProviderType = Some(TimeProviderType.Static),
)
def commandClientConfig =

private def commandClientConfig =
CommandClientConfiguration(
maxCommandsInFlight = config.commandConfig.maxCommandsInFlight,
maxParallelSubmissions = config.commandConfig.maxParallelSubmissions,
maxParallelSubmissions = config.maxParallelSubmissions,
defaultDeduplicationTime = JDuration.ofSeconds(30),
)

private val applicationId = "transaction-stream-termination-test"

private def newTransactionClient(ledgerId: domain.LedgerId) =
Expand Down Expand Up @@ -82,7 +84,7 @@ class TransactionStreamTerminationIT
_.commands.ledgerId := actualLedgerId.unwrap,
_.commands.applicationId := applicationId,
_.commands.commands := List(
Command(
Command.of(
Command.Command.Create(
CreateCommand(
Some(templateIds.dummy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,6 @@ class CommonCliBase(name: LedgerName) {
"Maximum number of submitted commands waiting for completion for each party (only applied when using the CommandService). Overflowing this threshold will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 256."
)

opt[Int]("max-parallel-submissions")
.optional()
.action((value, config) =>
config.copy(commandConfig = config.commandConfig.copy(maxParallelSubmissions = value))
)
.text(
"Maximum number of successfully interpreted commands waiting to be sequenced (applied only when running sandbox-classic). The threshold is shared across all parties. Overflowing it will cause back-pressure, signaled by a RESOURCE_EXHAUSTED error code. Default is 512."
)

opt[Int]("input-buffer-size")
.optional()
.action((value, config) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ final case class SandboxConfig(
seeding: Option[Seeding],
metricsReporter: Option[MetricsReporter],
metricsReportingInterval: FiniteDuration,
maxParallelSubmissions: Int, // only used by Sandbox Classic
eventsPageSize: Int,
eventsProcessingParallelism: Int,
lfValueTranslationEventCacheConfiguration: SizedCache.Configuration,
Expand Down Expand Up @@ -115,6 +116,7 @@ object SandboxConfig {
damlPackages = Nil,
timeProviderType = None,
configurationLoadTimeout = Duration.ofSeconds(10),
maxDeduplicationDuration = None,
delayBeforeSubmittingLedgerConfiguration = Duration.ofSeconds(1),
timeModel = LedgerTimeModel.reasonableDefault,
commandConfig = CommandConfiguration.default,
Expand All @@ -132,6 +134,7 @@ object SandboxConfig {
seeding = Some(Seeding.Strong),
metricsReporter = None,
metricsReportingInterval = 10.seconds,
maxParallelSubmissions = 512,
eventsPageSize = DefaultEventsPageSize,
eventsProcessingParallelism = DefaultEventsProcessingParallelism,
lfValueTranslationEventCacheConfiguration = DefaultLfValueTranslationCacheConfiguration,
Expand All @@ -143,7 +146,6 @@ object SandboxConfig {
sqlStartMode = Some(DefaultSqlStartupMode),
enableAppendOnlySchema = false,
enableCompression = false,
maxDeduplicationDuration = None,
)

sealed abstract class EngineMode extends Product with Serializable
Expand Down

0 comments on commit 88ef05e

Please sign in to comment.