Skip to content

Commit

Permalink
kvutils - Add Writer which can handle deduplication periods as offset…
Browse files Browse the repository at this point in the history
…s [KVL-1172] (#11900)

changelog_begin
changelog_end
  • Loading branch information
nicu-da authored Nov 29, 2021
1 parent 0b9d57b commit 61334cf
Show file tree
Hide file tree
Showing 23 changed files with 932 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,18 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
id = "INVALID_DEDUPLICATION_PERIOD",
ErrorCategory.InvalidGivenCurrentSystemStateOther,
) {
case class Reject(_reason: String, _maxDeduplicationDuration: Duration)(implicit
case class Reject(
_reason: String,
_maxDeduplicationDuration: Option[Duration],
)(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
cause = s"The submitted command had an invalid deduplication period: ${_reason}"
) {
override def context: Map[String, String] =
super.context + ("max_deduplication_duration" -> _maxDeduplicationDuration.toString)
super.context ++ _maxDeduplicationDuration
.map("max_deduplication_duration" -> _.toString)
.toList
}
}

Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ da_scala_library(
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-grpc",
"//ledger/ledger-offset",
"//ledger/ledger-resources",
"//ledger/metrics",
"//libs-scala/concurrent",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

package com.daml.ledger.api.validation

import java.time.{Duration, Instant}

import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.error.{ContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands
import com.daml.ledger.api.v1.commands.Command.Command.{
Create => ProtoCreate,
CreateAndExercise => ProtoCreateAndExercise,
Expand All @@ -16,14 +18,18 @@ import com.daml.ledger.api.v1.commands.Command.Command.{
}
import com.daml.ledger.api.v1.commands.{Command => ProtoCommand, Commands => ProtoCommands}
import com.daml.ledger.api.validation.CommandsValidator.{Submitters, effectiveSubmitters}
import com.daml.ledger.api.{DeduplicationPeriod, domain}
import com.daml.lf.command._
import com.daml.lf.data._
import com.daml.lf.value.{Value => Lf}
import com.daml.platform.server.api.validation.{ErrorFactories, FieldValidations}
import io.grpc.StatusRuntimeException
import com.daml.platform.server.api.validation.{
DeduplicationPeriodValidator,
ErrorFactories,
FieldValidations,
}
import io.grpc.{Status, StatusRuntimeException}
import scalaz.syntax.tag._

import java.time.{Duration, Instant}
import scala.Ordering.Implicits.infixOrderingOps
import scala.collection.immutable

Expand All @@ -35,9 +41,10 @@ final class CommandsValidator(
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
private val fieldValidations = FieldValidations(errorFactories)
private val valueValidator = new ValueValidator(errorFactories, fieldValidations)
private val deduplicationPeriodValidator = new DeduplicationPeriodValidator(errorFactories)

import fieldValidations._
import errorFactories._
import fieldValidations._
import valueValidator._

def validateCommands(
Expand Down Expand Up @@ -73,7 +80,6 @@ final class CommandsValidator(
deduplicationPeriod <- validateDeduplicationPeriod(
commands.deduplicationPeriod,
maxDeduplicationTime,
"deduplication_period",
)
} yield domain.Commands(
ledgerId = ledgerId,
Expand Down Expand Up @@ -217,6 +223,33 @@ final class CommandsValidator(
_ <- actAsMustNotBeEmpty(actAs)
} yield Submitters(actAs, readAs)
}

/** We validate only using current time because we set the currentTime as submitTime so no need to check both
*/
def validateDeduplicationPeriod(
deduplicationPeriod: commands.Commands.DeduplicationPeriod,
optMaxDeduplicationDuration: Option[Duration],
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, DeduplicationPeriod] =
optMaxDeduplicationDuration.fold[Either[StatusRuntimeException, DeduplicationPeriod]](
Left(missingLedgerConfig(Status.Code.UNAVAILABLE)(definiteAnswer = Some(false)))
) { maxDeduplicationDuration =>
val convertedDeduplicationPeriod = deduplicationPeriod match {
case commands.Commands.DeduplicationPeriod.Empty =>
maxDeduplicationDuration
case commands.Commands.DeduplicationPeriod.DeduplicationTime(duration) =>
DurationConversion.fromProto(duration)
case commands.Commands.DeduplicationPeriod.DeduplicationDuration(duration) =>
DurationConversion.fromProto(duration)
}
deduplicationPeriodValidator
.validateDuration(
convertedDeduplicationPeriod,
maxDeduplicationDuration,
)
.map(DeduplicationPeriod.DeduplicationDuration)
}
}

object CommandsValidator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.platform.server.api.services.grpc

import java.time.{Duration, Instant}

import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
Expand All @@ -16,7 +18,6 @@ import com.daml.platform.server.api.{ProxyCloseable, ValidationLogger}
import com.google.protobuf.empty.Empty
import io.grpc.ServerServiceDefinition

import java.time.{Duration, Instant}
import scala.concurrent.{ExecutionContext, Future}

class GrpcCommandService(
Expand All @@ -39,58 +40,30 @@ class GrpcCommandService(
FieldValidations(ErrorFactories(errorCodesVersionSwitcher)),
)

override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
)
}
override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] =
enrichRequestAndSubmit(request)(service.submitAndWait)

override def submitAndWaitForTransactionId(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionIdResponse] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
)
}
): Future[SubmitAndWaitForTransactionIdResponse] =
enrichRequestAndSubmit(request)(service.submitAndWaitForTransactionId)

override def submitAndWaitForTransaction(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionResponse] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(
requestWithSubmissionId,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
)
}
): Future[SubmitAndWaitForTransactionResponse] =
enrichRequestAndSubmit(request)(service.submitAndWaitForTransaction)

override def submitAndWaitForTransactionTree(
request: SubmitAndWaitRequest
): Future[SubmitAndWaitForTransactionTreeResponse] = {
): Future[SubmitAndWaitForTransactionTreeResponse] =
enrichRequestAndSubmit(request)(service.submitAndWaitForTransactionTree)

override def bindService(): ServerServiceDefinition =
CommandServiceGrpc.bindService(this, executionContext)

private def enrichRequestAndSubmit[T](
request: SubmitAndWaitRequest
)(submit: SubmitAndWaitRequest => Future[T]): Future[T] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
validator
.validate(
Expand All @@ -101,13 +74,10 @@ class GrpcCommandService(
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
_ => submit(requestWithSubmissionId),
)
}

override def bindService(): ServerServiceDefinition =
CommandServiceGrpc.bindService(this, executionContext)

private def generateSubmissionIdIfEmpty(request: SubmitAndWaitRequest): SubmitAndWaitRequest =
if (request.commands.exists(_.submissionId.isEmpty)) {
val commandsWithSubmissionId =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.server.api.validation

import java.time.Duration

import com.daml.error.ContextualizedErrorLogger
import com.daml.ledger.api.DeduplicationPeriod
import io.grpc.StatusRuntimeException

class DeduplicationPeriodValidator(
errorFactories: ErrorFactories
) {
private val fieldName = "deduplication_period"

def validate(
deduplicationPeriod: DeduplicationPeriod,
maxDeduplicationDuration: Duration,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, DeduplicationPeriod] = {
deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(duration) =>
validateDuration(duration, maxDeduplicationDuration).map(_ => deduplicationPeriod)
case DeduplicationPeriod.DeduplicationOffset(_) => Right(deduplicationPeriod)
}
}

def validateDuration(duration: Duration, maxDeduplicationDuration: Duration)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, Duration] = if (duration.isNegative)
Left(
errorFactories
.invalidField(fieldName, "Duration must be positive", definiteAnswer = Some(false))
)
else if (duration.compareTo(maxDeduplicationDuration) > 0)
Left(
errorFactories.invalidDeduplicationDuration(
fieldName,
s"The given deduplication duration of $duration exceeds the maximum deduplication time of $maxDeduplicationDuration",
definiteAnswer = Some(false),
Some(maxDeduplicationDuration),
)
)
else Right(duration)
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
fieldName: String,
message: String,
definiteAnswer: Option[Boolean],
maxDeduplicationDuration: Duration,
maxDeduplicationDuration: Option[Duration],
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
legacyInvalidField(fieldName, message, definiteAnswer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@

package com.daml.platform.server.api.validation

import java.time.Duration
import com.daml.api.util.DurationConversion
import com.daml.error.ContextualizedErrorLogger
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Commands.{DeduplicationPeriod => DeduplicationPeriodProto}
import com.daml.ledger.api.v1.value.Identifier
import com.daml.ledger.api.{DeduplicationPeriod, domain}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.value.Value.ContractId
import com.google.protobuf.duration.{Duration => DurationProto}
import io.grpc.{Status, StatusRuntimeException}
import io.grpc.StatusRuntimeException

// TODO error codes: Remove default usage of ErrorFactories
class FieldValidations private (errorFactories: ErrorFactories) {
Expand Down Expand Up @@ -139,53 +135,6 @@ class FieldValidations private (errorFactories: ErrorFactories) {
Left(missingField(fieldName, definiteAnswer = Some(false)))
)(Right(_))

/** We validate only using current time because we set the currentTime as submitTime so no need to check both
*/
def validateDeduplicationPeriod(
deduplicationPeriod: DeduplicationPeriodProto,
optMaxDeduplicationDuration: Option[Duration],
fieldName: String,
)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, DeduplicationPeriod] = {

optMaxDeduplicationDuration.fold[Either[StatusRuntimeException, DeduplicationPeriod]](
Left(missingLedgerConfig(Status.Code.UNAVAILABLE)(definiteAnswer = Some(false)))
)(maxDeduplicationDuration => {
def validateDuration(duration: Duration, exceedsMaxDurationMessage: String) = {
if (duration.isNegative)
Left(invalidField(fieldName, "Duration must be positive", definiteAnswer = Some(false)))
else if (duration.compareTo(maxDeduplicationDuration) > 0)
Left(
invalidDeduplicationDuration(
fieldName,
exceedsMaxDurationMessage,
definiteAnswer = Some(false),
maxDeduplicationDuration,
)
)
else Right(duration)
}

def protoDurationToDurationPeriod(duration: DurationProto) = {
val result = DurationConversion.fromProto(duration)
validateDuration(
result,
s"The given deduplication duration of $result exceeds the maximum deduplication time of $maxDeduplicationDuration",
).map(DeduplicationPeriod.DeduplicationDuration)
}

deduplicationPeriod match {
case DeduplicationPeriodProto.Empty =>
Right(DeduplicationPeriod.DeduplicationDuration(maxDeduplicationDuration))
case DeduplicationPeriodProto.DeduplicationTime(duration) =>
protoDurationToDurationPeriod(duration)
case DeduplicationPeriodProto.DeduplicationDuration(duration) =>
protoDurationToDurationPeriod(duration)
}
})
}

def validateIdentifier(identifier: Identifier)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): Either[StatusRuntimeException, Ref.Identifier] =
Expand Down
Loading

0 comments on commit 61334cf

Please sign in to comment.