Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Sandbox-on-X] Ledger-side in-memory command deduplication [DPP-872] #12596

Merged
merged 9 commits into from
Jan 28, 2022
13 changes: 13 additions & 0 deletions compatibility/bazel_tools/testing.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,19 @@ excluded_test_tool_tests = [
},
],
},
{
# Sandbox-on-X doesn't use participant-side command deduplication starting with next release,
# hence older tests will fail to assert it.
"end": "2.0.0-snapshot.20220126.9029.1",
"platform_ranges": [
{
"start": "2.0.0-snapshot.20220126.9029.1",
"exclusions": [
"CommandDeduplicationIT:ParticipantCommandDeduplication",
],
},
],
},
]

def in_range(version, range):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.error.definitions

import com.daml.error._
import com.daml.error.definitions.ErrorGroups.ParticipantErrorGroup.LedgerApiErrorGroup
import com.daml.ledger.participant.state.v2.ChangeId
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.PackageId
import com.daml.lf.engine.Error.Validation.ReplayMismatch
Expand Down Expand Up @@ -736,16 +737,20 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
case class Reject(
_definiteAnswer: Boolean = false,
_existingCommandSubmissionId: Option[String],
_changeId: Option[ChangeId] = None,
)(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
cause = "A command with the given command id has already been successfully processed",
definiteAnswer = _definiteAnswer,
) {
override def context: Map[String, String] =
super.context ++ _existingCommandSubmissionId.map("existing_submission_id" -> _).toList
super.context ++ _existingCommandSubmissionId
.map("existing_submission_id" -> _)
.toList ++ _changeId
.map(changeId => Seq("changeId" -> changeId.toString))
.getOrElse(Seq.empty)
}

}

@Explanation("An input contract has been archived by a concurrent transaction submission.")
Expand Down
2 changes: 2 additions & 0 deletions ledger/sandbox-classic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ test_deps = [
"//ledger/metrics",
"//ledger/participant-state",
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils/app",
"//ledger/sandbox-common",
"//ledger/sandbox-common:sandbox-common-scala-tests-lib",
"//ledger/sandbox-on-x:sandbox-on-x",
"//ledger/test-common",
"//ledger/test-common:dar-files-default-lib",
"//libs-scala/contextualized-logging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ object ConfigConverter {
// TODO SoX-to-sandbox-classic: Dedicated submissionBufferSize CLI param for sanbox-classic
submissionBufferSize = sandboxConfig.maxParallelSubmissions,
// TODO SoX-to-sandbox-classic: Dedicated submissionBufferSize CLI param for sanbox-classic
maxDedupSeconds = BridgeConfigProvider.defaultExtraConfig.maxDedupSeconds,
profileDir = sandboxConfig.profileDir,
stackTraces = sandboxConfig.stackTraces,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.daml.ledger.api.v1.ledger_configuration_service.{
LedgerConfiguration,
LedgerConfigurationServiceGrpc,
}
import com.daml.ledger.sandbox.BridgeConfigProvider
import com.daml.platform.sandbox.SandboxBackend
import com.daml.platform.sandbox.services.SandboxFixture
import com.google.protobuf.duration.Duration
Expand All @@ -30,7 +31,7 @@ sealed trait LedgerConfigurationServiceITBase extends AnyWordSpec with Matchers
.getLedgerConfiguration

maxDeduplicationTime shouldEqual toProto(
config.initialLedgerConfiguration.configuration.maxDeduplicationTime
BridgeConfigProvider.DefaultMaximumDeduplicationTime
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions ledger/sandbox-on-x/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ conformance_test(
"--exclude=ExceptionsIT:ExRollbackDuplicateKeyCreated",
"--exclude=ExceptionsIT:ExRollbackDuplicateKeyArchived",
"--exclude=ConfigManagementServiceIT:CMConcurrentSetConflicting",
"--exclude=CommandDeduplication",
"--exclude=CommandServiceIT:CSduplicate",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import com.daml.jwt.JwtVerifierConfigurationCli
import com.daml.ledger.api.auth.{AuthService, AuthServiceJWT, AuthServiceWildcard}
import com.daml.ledger.participant.state.kvutils.app.{Config, ConfigProvider}
import com.daml.platform.apiserver.TimeServiceBackend
import com.daml.platform.configuration.InitialLedgerConfiguration
import com.daml.platform.services.time.TimeProviderType
import scopt.OptionParser

import java.io.File
import java.nio.file.Path
import java.time.Instant
import java.time.{Duration, Instant}

// TODO SoX: Keep only ledger-bridge-related configurations in this class
// and extract the participant-specific configs in the main config file.
case class BridgeConfig(
conflictCheckingEnabled: Boolean,
maxDedupSeconds: Int,
submissionBufferSize: Int,
implicitPartyAllocation: Boolean,
timeProviderType: TimeProviderType,
Expand All @@ -29,11 +29,6 @@ case class BridgeConfig(

object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
override def extraConfigParser(parser: OptionParser[Config[BridgeConfig]]): Unit = {
parser
.opt[Int]("bridge-max-dedup-seconds")
.text("Maximum deduplication time in seconds. Defaults to 30.")
.action((p, c) => c.copy(extra = c.extra.copy(maxDedupSeconds = p)))

parser
.opt[Int]("bridge-submission-buffer-size")
.text("Submission buffer size. Defaults to 500.")
Expand Down Expand Up @@ -80,6 +75,14 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
JwtVerifierConfigurationCli.parse(parser)((v, c) =>
c.copy(extra = c.extra.copy(authService = AuthServiceJWT(v)))
)

parser.checkConfig(c =>
Either.cond(
c.maxDeduplicationDuration.forall(_.compareTo(Duration.ofHours(1L)) <= 0),
(),
"Maximum supported deduplication duration is one hour",
)
)
()
}

Expand All @@ -89,15 +92,23 @@ object BridgeConfigProvider extends ConfigProvider[BridgeConfig] {
case TimeProviderType.WallClock => None
}

override def initialLedgerConfig(config: Config[BridgeConfig]): InitialLedgerConfiguration = {
val superConfig = super.initialLedgerConfig(config)
superConfig.copy(configuration =
superConfig.configuration.copy(maxDeduplicationTime = DefaultMaximumDeduplicationTime)
)
}

override val defaultExtraConfig: BridgeConfig = BridgeConfig(
// TODO SoX: Enabled by default
conflictCheckingEnabled = false,
maxDedupSeconds = 30,
submissionBufferSize = 500,
implicitPartyAllocation = false,
timeProviderType = TimeProviderType.WallClock,
authService = AuthServiceWildcard,
profileDir = None,
stackTraces = false,
)

val DefaultMaximumDeduplicationTime: Duration = Duration.ofMinutes(5L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}

import java.time.Duration

class BridgeReadService(
ledgerId: LedgerId,
maxDedupSeconds: Int,
maximumDeduplicationDuration: Duration,
stateUpdatesSource: Source[(Offset, Update), NotUsed],
)(implicit
loggingContext: LoggingContext
Expand All @@ -36,7 +38,7 @@ class BridgeReadService(
config = Configuration(
generation = 1L,
timeModel = LedgerTimeModel.reasonableDefault,
maxDeduplicationTime = java.time.Duration.ofSeconds(maxDedupSeconds.toLong),
maxDeduplicationTime = maximumDeduplicationDuration,
),
initialRecordTime = Timestamp.now(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.definitions.LedgerApiErrors
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.health.{HealthStatus, Healthy}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2._
import com.daml.ledger.sandbox.bridge.{BridgeMetrics, LedgerBridge}
import com.daml.ledger.sandbox.domain.Submission
import com.daml.ledger.sandbox.domain.{Rejection, Submission}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.InstrumentedSource
import com.daml.telemetry.TelemetryContext

import java.time.Duration
import java.util.concurrent.{CompletableFuture, CompletionStage}

class BridgeWriteService(
Expand All @@ -35,7 +37,12 @@ class BridgeWriteService(

private[this] val logger = ContextualizedLogger.get(getClass)

override def isApiDeduplicationEnabled: Boolean = true
override def isApiDeduplicationEnabled: Boolean = false

override def close(): Unit = {
logger.info("Shutting down BridgeWriteService.")
queue.complete()
}

override def submitTransaction(
submitterInfo: SubmitterInfo,
Expand All @@ -45,15 +52,33 @@ class BridgeWriteService(
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): CompletionStage[SubmissionResult] =
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
)
)
): CompletionStage[SubmissionResult] = {
implicit val errorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, submitterInfo.submissionId)
submitterInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(deduplicationDuration) =>
validateDeduplicationDurationAndSubmit(
submitterInfo,
transactionMeta,
transaction,
estimatedInterpretationCost,
deduplicationDuration,
)
case DeduplicationPeriod.DeduplicationOffset(_) =>
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.LedgerBridgeInternalError(
new RuntimeException(
"Deduplication offset periods are not supported in Sandbox-on-X ledger bridge"
),
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
}
}

override def submitConfiguration(
maxRecordTime: Time.Timestamp,
Expand Down Expand Up @@ -136,9 +161,35 @@ class BridgeWriteService(
private def submit(submission: Submission): CompletionStage[SubmissionResult] =
toSubmissionResult(submission.submissionId, queue.offer(submission))

override def close(): Unit = {
logger.info("Shutting down BridgeLedgerFactory.")
queue.complete()
private def validateDeduplicationDurationAndSubmit(
submitterInfo: SubmitterInfo,
transactionMeta: TransactionMeta,
transaction: SubmittedTransaction,
estimatedInterpretationCost: Long,
deduplicationDuration: Duration,
)(implicit errorLogger: ContextualizedErrorLogger): CompletionStage[SubmissionResult] = {
val maxDeduplicationDuration = submitterInfo.ledgerConfiguration.maxDeduplicationTime
if (deduplicationDuration.compareTo(maxDeduplicationDuration) > 0)
CompletableFuture.completedFuture(
SubmissionResult.SynchronousError(
Rejection
.MaxDeduplicationDurationExceeded(
deduplicationDuration,
maxDeduplicationDuration,
submitterInfo.toCompletionInfo(),
)
.toStatus
)
)
else
submit(
Submission.Transaction(
submitterInfo = submitterInfo,
transactionMeta = transactionMeta,
transaction = transaction,
estimatedInterpretationCost = estimatedInterpretationCost,
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ object SandboxOnXRunner {

readServiceWithSubscriber = new BridgeReadService(
ledgerId = config.ledgerId,
maxDedupSeconds = config.extra.maxDedupSeconds,
maximumDeduplicationDuration = config.maxDeduplicationDuration.getOrElse(
BridgeConfigProvider.DefaultMaximumDeduplicationTime
),
stateUpdatesSource,
)

Expand Down Expand Up @@ -254,8 +256,8 @@ object SandboxOnXRunner {
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
deduplicationType = CommandDeduplicationType.SYNC_ONLY,
maxDeduplicationDurationEnforced = false,
deduplicationType = CommandDeduplicationType.ASYNC_ONLY,
maxDeduplicationDurationEnforced = true,
),
contractIdFeatures = ExperimentalContractIds.of(
v1 = ExperimentalContractIds.ContractIdV1Support.NON_SUFFIXED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class BridgeMetrics(metrics: Metrics) {
val keyStateSize: Histogram = registry.histogram(Prefix :+ "keys")
val consumedContractsStateSize: Histogram = registry.histogram(Prefix :+ "consumed_contracts")
val sequencerQueueLength: Histogram = registry.histogram(Prefix :+ "queue")
val deduplicationQueueLength: Histogram = registry.histogram(Prefix :+ "deduplication_queue")
}

object InputQueue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.kvutils.app.{Config, ParticipantConfig}
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.resources.ResourceOwner
import com.daml.ledger.sandbox.BridgeConfig
import com.daml.ledger.sandbox.{BridgeConfig, BridgeConfigProvider}
import com.daml.ledger.sandbox.bridge.validate.ConflictCheckingLedgerBridge
import com.daml.ledger.sandbox.domain.Submission
import com.daml.lf.data.Ref.ParticipantId
Expand Down Expand Up @@ -90,6 +90,11 @@ object LedgerBridge {
),
validatePartyAllocation = !config.extra.implicitPartyAllocation,
servicesThreadPoolSize = servicesThreadPoolSize,
maxDeduplicationDuration = initialLedgerConfiguration
.map(_.maxDeduplicationTime)
.getOrElse(
BridgeConfigProvider.initialLedgerConfig(config).configuration.maxDeduplicationTime
),
)

private[bridge] def packageUploadSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.events._

import java.time.Duration
import scala.concurrent.{ExecutionContext, Future}

private[validate] class ConflictCheckingLedgerBridge(
Expand Down Expand Up @@ -63,6 +64,7 @@ private[bridge] object ConflictCheckingLedgerBridge {
errorFactories: ErrorFactories,
validatePartyAllocation: Boolean,
servicesThreadPoolSize: Int,
maxDeduplicationDuration: Duration,
)(implicit
servicesExecutionContext: ExecutionContext
): ConflictCheckingLedgerBridge =
Expand All @@ -80,6 +82,7 @@ private[bridge] object ConflictCheckingLedgerBridge {
validatePartyAllocation = validatePartyAllocation,
bridgeMetrics = bridgeMetrics,
errorFactories = errorFactories,
maxDeduplicationDuration = maxDeduplicationDuration,
),
servicesThreadPoolSize = servicesThreadPoolSize,
)
Expand Down
Loading