From 98cf8d86b30eb09d50645f0b49fc0c1bd02d2502 Mon Sep 17 00:00:00 2001 From: fabiotudone-da Date: Tue, 19 Oct 2021 10:32:57 +0200 Subject: [PATCH] KV: introduce v2 error codes behind a CLI switch [KVL-1140] (#11224) * Propagate error version switch to KeyValueConsumption CHANGELOG_BEGIN CHANGELOG_END * Introduce v2 (self-service) KV error codes behind the CLI switch * Test v2 codes and fix them * Keep newline separating methods in KeyValueConsumption * TransactionRejections: don't wrap updates in `Some` * Factor errorVersionsTable * Reorder imports * Split "convert rejection to proto models and back to expected grpc code" * Remove unneeded Option returned by decodeTransactionRejectionEntry * Formatting fix * Fix 7537e93d --- .../scala/ledger/indexerbenchmark/Main.scala | 1 + .../on/memory/InMemoryLedgerFactory.scala | 1 + ...oryLedgerReaderWriterIntegrationSpec.scala | 7 +- .../daml/ledger/on/sql/SqlLedgerFactory.scala | 9 +- ...edgerReaderWriterIntegrationSpecBase.scala | 9 +- ledger/participant-state/kvutils/BUILD.bazel | 3 + .../state/kvutils/app/LedgerFactory.scala | 1 + .../state/kvutils/Conversions.scala | 215 ++------- .../state/kvutils/KeyValueConsumption.scala | 94 ++-- .../api/KeyValueParticipantState.scala | 3 +- .../api/KeyValueParticipantStateReader.scala | 21 +- .../updates/TransactionRejections.scala | 444 ++++++++++++++++++ .../participant/state/kvutils/KVTest.scala | 8 +- .../state/kvutils/ConversionsSpec.scala | 180 ++++++- .../kvutils/KVUtilsTransactionSpec.scala | 15 +- .../kvutils/KeyValueConsumptionSpec.scala | 278 ++++++++--- .../KeyValueParticipantStateReaderSpec.scala | 45 +- .../LogAppendingReadServiceFactory.scala | 1 + .../RecoveringIndexerIntegrationSpec.scala | 7 +- .../scala/platform/sandboxnext/Runner.scala | 6 +- 20 files changed, 983 insertions(+), 365 deletions(-) create mode 100644 ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index a579d91aedea..66eb904ef913 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -76,6 +76,7 @@ object Main { keyValueSource, metrics, failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, ) // Note: this method is doing quite a lot of work to transform a sequence of write sets diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala index 3ab11615b74a..4baf6f78eda3 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala @@ -47,6 +47,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state readerWriter, readerWriter, createMetrics(participantConfig, config), + config.enableSelfServiceErrorCodes, ) } diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala index 6d99341eb62c..aae2aee285a2 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala @@ -56,5 +56,10 @@ class InMemoryLedgerReaderWriterIntegrationSpec engine = Engine.DevEngine(), committerExecutionContext = committerExecutionContext, ) - } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) + } yield new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = false, + ) } diff --git a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala index 982c81af5a30..b01584e70dca 100644 --- a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala +++ b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala @@ -78,7 +78,14 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] { metrics = metrics.daml.kvutils.submission.validator.stateValueCache, ), ).acquire() - .map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) + .map(readerWriter => + new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ) + ) } } } diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala index 0182eb4247d6..898d57fb1009 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala @@ -35,5 +35,12 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri resetOnStartup = false, offsetVersion = offsetVersion, logEntryIdAllocator = RandomLogEntryIdAllocator, - ).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) + ).map(readerWriter => + new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = false, + ) + ) } diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index b4bff4e264fd..600ff770d9d1 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -52,6 +52,7 @@ da_scala_library( "//daml-lf/transaction:value_proto_java", "//ledger-api/grpc-definitions:ledger_api_proto_scala", "//ledger/caching", + "//ledger/error", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", "//ledger/ledger-configuration", @@ -118,6 +119,7 @@ da_scala_library( "//ledger-api/grpc-definitions:ledger_api_proto_scala", "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", + "//ledger/error", "//ledger/ledger-api-common", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", @@ -187,6 +189,7 @@ da_scala_test_suite( "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", "//ledger/caching", + "//ledger/error", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", "//ledger/ledger-configuration", diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala index 1843405b8cd9..8c51dbb68ae0 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala @@ -187,6 +187,7 @@ object LedgerFactory { readerWriter, readerWriter, createMetrics(participantConfig, config), + config.enableSelfServiceErrorCodes, ) def owner( diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala index f1a0a09fedfe..7b68504dc15a 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/Conversions.scala @@ -3,11 +3,8 @@ package com.daml.ledger.participant.state.kvutils -import java.io.StringWriter -import java.time.{Duration, Instant} - +import com.daml.error.ValueSwitch import com.daml.ledger.api.DeduplicationPeriod -import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.committer.transaction.Rejection import com.daml.ledger.participant.state.kvutils.committer.transaction.Rejection.{ @@ -42,6 +39,7 @@ import com.daml.ledger.participant.state.kvutils.store.{ DamlStateKey, DamlSubmissionDedupKey, } +import com.daml.ledger.participant.state.kvutils.updates.TransactionRejections._ import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.ledger.participant.state.v2.{CompletionInfo, SubmitterInfo} import com.daml.lf.data.Relation.Relation @@ -50,13 +48,10 @@ import com.daml.lf.transaction._ import com.daml.lf.value.Value.{ContractId, VersionedValue} import com.daml.lf.value.{Value, ValueCoder, ValueOuterClass} import com.daml.lf.{crypto, data} -import com.fasterxml.jackson.databind.ObjectMapper import com.google.protobuf.Empty -import com.google.protobuf.any.{Any => AnyProto} -import com.google.rpc.code.Code -import com.google.rpc.error_details.ErrorInfo import com.google.rpc.status.Status +import java.time.{Duration, Instant} import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -219,8 +214,8 @@ private[state] object Conversions { Ref.SubmissionId.assertFromString ), ) - } + def buildTimestamp(ts: Time.Timestamp): com.google.protobuf.Timestamp = buildTimestamp(ts.toInstant) @@ -472,215 +467,63 @@ private[state] object Conversions { @nowarn("msg=deprecated") def decodeTransactionRejectionEntry( - entry: DamlTransactionRejectionEntry - ): Option[FinalReason] = { - def buildStatus( - code: Code, - message: String, - additionalMetadata: Map[String, String] = Map.empty, - ) = Status.of( - code.value, - message, - Seq( - AnyProto.pack[ErrorInfo]( - ErrorInfo(metadata = - additionalMetadata + (GrpcStatuses.DefiniteAnswerKey -> entry.getDefiniteAnswer.toString) - ) - ) - ), - ) - - val status = entry.getReasonCase match { + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): FinalReason = + FinalReason(entry.getReasonCase match { case DamlTransactionRejectionEntry.ReasonCase.INVALID_LEDGER_TIME => val rejection = entry.getInvalidLedgerTime - Some( - buildStatus( - Code.ABORTED, - s"Invalid ledger time: ${rejection.getDetails}", - Map( - "ledger_time" -> rejection.getLedgerTime.toString, - "lower_bound" -> rejection.getLowerBound.toString, - "upper_bound" -> rejection.getUpperBound.toString, - ), - ) - ) + invalidLedgerTimeStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.DISPUTED => val rejection = entry.getDisputed - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Disputed: ${rejection.getDetails}", - ) - ) + disputedStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.SUBMITTER_CANNOT_ACT_VIA_PARTICIPANT => val rejection = entry.getSubmitterCannotActViaParticipant - Some( - buildStatus( - Code.PERMISSION_DENIED, - s"Submitter cannot act via participant: ${rejection.getDetails}", - Map( - "submitter_party" -> rejection.getSubmitterParty, - "participant_id" -> rejection.getParticipantId, - ), - ) - ) + submitterCannotActViaParticipantStatus(entry, rejection) case DamlTransactionRejectionEntry.ReasonCase.INCONSISTENT => val rejection = entry.getInconsistent - Some( - buildStatus( - Code.ABORTED, - s"Inconsistent: ${rejection.getDetails}", - ) - ) + inconsistentStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.RESOURCES_EXHAUSTED => val rejection = entry.getResourcesExhausted - Some( - buildStatus( - Code.ABORTED, - s"Resources exhausted: ${rejection.getDetails}", - ) - ) + resourceExhaustedStatus(entry, rejection) case DamlTransactionRejectionEntry.ReasonCase.DUPLICATE_COMMAND => - Some( - buildStatus( - Code.ALREADY_EXISTS, - "Duplicate commands", - ) - ) + duplicateCommandStatus(entry) case DamlTransactionRejectionEntry.ReasonCase.PARTY_NOT_KNOWN_ON_LEDGER => val rejection = entry.getPartyNotKnownOnLedger - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Party not known on ledger: ${rejection.getDetails}", - ) - ) + partyNotKnownOnLedgerStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.VALIDATION_FAILURE => val rejection = entry.getValidationFailure - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Disputed: ${rejection.getDetails}", - ) - ) + validationFailureStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.INTERNALLY_DUPLICATE_KEYS => - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Disputed: ${InternallyInconsistentTransaction.DuplicateKeys.description}", - ) - ) + internallyDuplicateKeysStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.INTERNALLY_INCONSISTENT_KEYS => - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Disputed: ${InternallyInconsistentTransaction.InconsistentKeys.description}", - ) - ) + internallyInconsistentKeysStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.EXTERNALLY_INCONSISTENT_CONTRACTS => - Some( - buildStatus( - Code.ABORTED, - s"Inconsistent: ${ExternallyInconsistentTransaction.InconsistentContracts.description}", - ) - ) + externallyInconsistentContractsStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.EXTERNALLY_DUPLICATE_KEYS => - Some( - buildStatus( - Code.ABORTED, - s"Inconsistent: ${ExternallyInconsistentTransaction.DuplicateKeys.description}", - ) - ) + externallyDuplicateKeysStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.EXTERNALLY_INCONSISTENT_KEYS => - Some( - buildStatus( - Code.ABORTED, - s"Inconsistent: ${ExternallyInconsistentTransaction.InconsistentKeys.description}", - ) - ) + externallyInconsistentKeysStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.MISSING_INPUT_STATE => val rejection = entry.getMissingInputState - Some( - buildStatus( - Code.ABORTED, - s"Inconsistent: Missing input state for key ${rejection.getKey.toString}", - Map("key" -> rejection.getKey.toString), - ) - ) + missingInputStateStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.RECORD_TIME_OUT_OF_RANGE => val rejection = entry.getRecordTimeOutOfRange - Some( - buildStatus( - Code.ABORTED, - s"Invalid ledger time: Record time is outside of valid range [${rejection.getMinimumRecordTime}, ${rejection.getMaximumRecordTime}]", - Map( - "minimum_record_time" -> Instant - .ofEpochSecond( - rejection.getMinimumRecordTime.getSeconds, - rejection.getMinimumRecordTime.getNanos.toLong, - ) - .toString, - "maximum_record_time" -> Instant - .ofEpochSecond( - rejection.getMaximumRecordTime.getSeconds, - rejection.getMaximumRecordTime.getNanos.toLong, - ) - .toString, - ), - ) - ) + recordTimeOutOfRangeStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.CAUSAL_MONOTONICITY_VIOLATED => - Some( - buildStatus( - Code.ABORTED, - "Invalid ledger time: Causal monotonicity violated", - ) - ) + causalMonotonicityViolatedStatus(entry, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.SUBMITTING_PARTY_NOT_KNOWN_ON_LEDGER => val rejection = entry.getSubmittingPartyNotKnownOnLedger - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Party not known on ledger: Submitting party '${rejection.getSubmitterParty}' not known", - Map("submitter_party" -> rejection.getSubmitterParty), - ) - ) + submittingPartyNotKnownOnLedgerStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.PARTIES_NOT_KNOWN_ON_LEDGER => val rejection = entry.getPartiesNotKnownOnLedger - val parties = rejection.getPartiesList - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Party not known on ledger: Parties not known on ledger ${parties.asScala.mkString("[", ",", "]")}", - Map("parties" -> objectToJsonString(parties)), - ) - ) + partiesNotKnownOnLedgerStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.INVALID_PARTICIPANT_STATE => val rejection = entry.getInvalidParticipantState - Some( - buildStatus( - Code.INVALID_ARGUMENT, - s"Disputed: ${rejection.getDetails}", - rejection.getMetadataMap.asScala.toMap, - ) - ) + invalidParticipantStateStatus(entry, rejection, errorVersionSwitch) case DamlTransactionRejectionEntry.ReasonCase.REASON_NOT_SET => - Some( - buildStatus( - Code.UNKNOWN, - "No reason set for rejection", - ) - ) - } - status.map(FinalReason) - } - - private def objectToJsonString(obj: Object): String = { - val stringWriter = new StringWriter - val objectMapper = new ObjectMapper - objectMapper.writeValue(stringWriter, obj) - stringWriter.toString - } + reasonNotSetStatus(entry, errorVersionSwitch) + }) private def encodeParties(parties: Set[Ref.Party]): List[String] = (parties.toList: List[String]).sorted diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index 1ac37f62f854..e9bdbd3850df 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -3,8 +3,8 @@ package com.daml.ledger.participant.state.kvutils +import com.daml.error.ValueSwitch import com.daml.ledger.configuration.Configuration -import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.participant.state.kvutils.Conversions._ import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.DamlPackageUploadRejectionEntry import com.daml.ledger.participant.state.kvutils.store.events.{ @@ -20,7 +20,7 @@ import com.daml.ledger.participant.state.kvutils.store.{ DamlOutOfTimeBoundsEntry, DamlStateKey, } -import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason +import com.daml.ledger.participant.state.kvutils.updates.TransactionRejections._ import com.daml.ledger.participant.state.v2.{DivulgedContract, TransactionMeta, Update} import com.daml.lf.data.Ref import com.daml.lf.data.Ref.LedgerString @@ -28,9 +28,6 @@ import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.CommittedTransaction import com.google.common.io.BaseEncoding import com.google.protobuf.ByteString -import com.google.protobuf.any.{Any => AnyProto} -import com.google.rpc.code.Code -import com.google.rpc.error_details.ErrorInfo import com.google.rpc.status.Status import org.slf4j.LoggerFactory @@ -52,6 +49,7 @@ object KeyValueConsumption { * * @param entryId: The log entry identifier. * @param entry: The log entry. + * @param errorVersionSwitch: Decides between v1 and v2 (self-service) errors. * @return [[Update]]s constructed from log entry. */ // TODO(BH): add participantId to ensure participant id matches in DamlLogEntry @@ -59,6 +57,7 @@ object KeyValueConsumption { def logEntryToUpdate( entryId: DamlLogEntryId, entry: DamlLogEntry, + errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, ): List[Update] = { val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => @@ -209,13 +208,20 @@ object KeyValueConsumption { } case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY => - transactionRejectionEntryToUpdate( - recordTime, - entry.getTransactionRejectionEntry, - ).toList + List( + transactionRejectionEntryToUpdate( + recordTime, + entry.getTransactionRejectionEntry, + errorVersionSwitch, + ) + ) case DamlLogEntry.PayloadCase.OUT_OF_TIME_BOUNDS_ENTRY => - outOfTimeBoundsEntryToUpdate(recordTime, entry.getOutOfTimeBoundsEntry).toList + outOfTimeBoundsEntryToUpdate( + recordTime, + entry.getOutOfTimeBoundsEntry, + errorVersionSwitch, + ).toList case DamlLogEntry.PayloadCase.TIME_UPDATE_ENTRY => List.empty @@ -239,15 +245,15 @@ object KeyValueConsumption { private def transactionRejectionEntryToUpdate( recordTime: Timestamp, rejEntry: DamlTransactionRejectionEntry, - ): Option[Update] = Conversions - .decodeTransactionRejectionEntry(rejEntry) - .map { reason => - Update.CommandRejected( - recordTime = recordTime, - completionInfo = parseCompletionInfo(parseInstant(recordTime), rejEntry.getSubmitterInfo), - reasonTemplate = reason, - ) - } + errorVersionSwitch: ValueSwitch[Status], + ): Update = { + val reason = Conversions.decodeTransactionRejectionEntry(rejEntry, errorVersionSwitch) + Update.CommandRejected( + recordTime = recordTime, + completionInfo = parseCompletionInfo(parseInstant(recordTime), rejEntry.getSubmitterInfo), + reasonTemplate = reason, + ) + } /** Transform the transaction entry into the [[Update.TransactionAccepted]] event. */ private def transactionEntryToUpdate( @@ -332,6 +338,7 @@ object KeyValueConsumption { private[kvutils] def outOfTimeBoundsEntryToUpdate( recordTime: Timestamp, outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry, + errorVersionSwitch: ValueSwitch[Status], ): Option[Update] = { val timeBounds = parseTimeBounds(outOfTimeBoundsEntry) val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _) @@ -343,31 +350,7 @@ object KeyValueConsumption { wrappedLogEntry.getPayloadCase match { case DamlLogEntry.PayloadCase.TRANSACTION_REJECTION_ENTRY if deduplicated => val rejectionEntry = wrappedLogEntry.getTransactionRejectionEntry - Some( - Update.CommandRejected( - recordTime = recordTime, - completionInfo = parseCompletionInfo( - Conversions.parseInstant(recordTime), - rejectionEntry.getSubmitterInfo, - ), - reasonTemplate = FinalReason( - Status.of( - Code.ALREADY_EXISTS.value, - "Duplicate commands", - Seq( - AnyProto.pack[ErrorInfo]( - // the definite answer is false, as the rank-based deduplication is not yet implemented - ErrorInfo(metadata = - Map( - GrpcStatuses.DefiniteAnswerKey -> rejectionEntry.getDefiniteAnswer.toString - ) - ) - ) - ), - ) - ), - ) - ) + Some(duplicateCommandsRejectionUpdate(recordTime, rejectionEntry)) case _ if deduplicated => // We only emit updates for duplicate transaction submissions. @@ -386,28 +369,7 @@ object KeyValueConsumption { "Record time outside of valid range" } Some( - Update.CommandRejected( - recordTime = recordTime, - completionInfo = parseCompletionInfo( - Conversions.parseInstant(recordTime), - rejectionEntry.getSubmitterInfo, - ), - reasonTemplate = FinalReason( - Status.of( - Code.ABORTED.value, - reason, - Seq( - AnyProto.pack[ErrorInfo]( - ErrorInfo(metadata = - Map( - GrpcStatuses.DefiniteAnswerKey -> rejectionEntry.getDefiniteAnswer.toString - ) - ) - ) - ), - ) - ), - ) + invalidRecordTimeRejectionUpdate(recordTime, rejectionEntry, reason, errorVersionSwitch) ) case DamlLogEntry.PayloadCase.CONFIGURATION_REJECTION_ENTRY if invalidRecordTime => diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index c76d2cd3c584..b26c596a12f0 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -40,10 +40,11 @@ class KeyValueParticipantState( reader: LedgerReader, writer: LedgerWriter, metrics: Metrics, + enableSelfServiceErrorCodes: Boolean, ) extends ReadService with WriteService { private val readerAdapter = - KeyValueParticipantStateReader(reader, metrics) + KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes) private val writerAdapter = new KeyValueParticipantStateWriter( new TimedLedgerWriter(writer, metrics), diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index 0dfb9933bdff..322a0dfd8ff7 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api import akka.NotUsed import akka.stream.scaladsl.Source +import com.daml.error.ValueSwitch import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset @@ -15,6 +16,7 @@ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp import com.daml.metrics.{Metrics, Timed} +import com.google.rpc.status.Status /** Adapts a [[LedgerReader]] instance to [[ReadService]]. * Performs translation between the offsets required by the underlying reader and [[ReadService]]: @@ -27,13 +29,21 @@ import com.daml.metrics.{Metrics, Timed} class KeyValueParticipantStateReader private[api] ( reader: LedgerReader, metrics: Metrics, - logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update], + enableSelfServiceErrorCodes: Boolean, + logEntryToUpdate: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => List[Update], timeUpdatesProvider: TimeUpdatesProvider, failOnUnexpectedEvent: Boolean, ) extends ReadService { import KeyValueParticipantStateReader._ + private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes) + override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.single(createLedgerInitialConditions()) @@ -50,7 +60,12 @@ class KeyValueParticipantStateReader private[api] ( metrics.daml.kvutils.reader.parseUpdates, { val logEntryId = DamlLogEntryId.parseFrom(entryId.bytes) val updates = - logEntryToUpdate(logEntryId, logEntry, timeUpdatesProvider()) + logEntryToUpdate( + logEntryId, + logEntry, + errorVersionSwitch, + timeUpdatesProvider(), + ) val updatesWithOffsets = Source(updates).zipWithIndex.map { case (update, index) => offsetForUpdate(offset, index.toInt, updates.size) -> update @@ -85,12 +100,14 @@ object KeyValueParticipantStateReader { def apply( reader: LedgerReader, metrics: Metrics, + enableSelfServiceErrorCodes: Boolean, timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, metrics, + enableSelfServiceErrorCodes, KeyValueConsumption.logEntryToUpdate, timeUpdatesProvider, failOnUnexpectedEvent, diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala new file mode 100644 index 000000000000..e0a8746b5d64 --- /dev/null +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/updates/TransactionRejections.scala @@ -0,0 +1,444 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.updates + +import com.daml.error.ValueSwitch +import com.daml.ledger.grpc.GrpcStatuses +import com.daml.ledger.participant.state.kvutils.Conversions.parseCompletionInfo +import com.daml.ledger.participant.state.kvutils.committer.transaction.Rejection.{ + ExternallyInconsistentTransaction, + InternallyInconsistentTransaction, +} +import com.daml.ledger.participant.state.kvutils.store.events._ +import com.daml.ledger.participant.state.kvutils.{Conversions, CorrelationId} +import com.daml.ledger.participant.state.v2.Update +import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason +import com.daml.lf.data.Time.Timestamp +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.protobuf.any.{Any => AnyProto} +import com.google.rpc.code.Code +import com.google.rpc.error_details.ErrorInfo +import com.google.rpc.status.Status + +import java.io.StringWriter +import java.time.Instant +import scala.jdk.CollectionConverters._ + +/** Utilities for converting between rejection log entries and updates and/or gRPC statuses. + */ +private[kvutils] object TransactionRejections { + + def invalidRecordTimeRejectionUpdate( + recordTime: Timestamp, + rejectionEntry: DamlTransactionRejectionEntry, + reason: CorrelationId, + errorVersionSwitch: ValueSwitch[Status], + ): Update.CommandRejected = { + val statusBuilder = invalidRecordTimeRejectionStatus(rejectionEntry, reason, _) + Update.CommandRejected( + recordTime = recordTime, + completionInfo = parseCompletionInfo( + Conversions.parseInstant(recordTime), + rejectionEntry.getSubmitterInfo, + ), + reasonTemplate = FinalReason( + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), + ) + ), + ) + } + + def duplicateCommandsRejectionUpdate( + recordTime: Timestamp, + rejectionEntry: DamlTransactionRejectionEntry, + ): Update.CommandRejected = + Update.CommandRejected( + recordTime = recordTime, + completionInfo = parseCompletionInfo( + Conversions.parseInstant(recordTime), + rejectionEntry.getSubmitterInfo, + ), + reasonTemplate = FinalReason( + duplicateCommandsRejectionStatus(rejectionEntry, Code.ALREADY_EXISTS) + ), + ) + + def reasonNotSetStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus(entry, _, "No reason set for rejection") + errorVersionSwitch.choose( + statusBuilder(Code.UNKNOWN), + statusBuilder(Code.INTERNAL), // We should always set a reason + ) + } + + def invalidParticipantStateStatus( + entry: DamlTransactionRejectionEntry, + rejection: InvalidParticipantState, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Disputed: ${rejection.getDetails}", + rejection.getMetadataMap.asScala.toMap, + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.INTERNAL), + ) + } + + def partiesNotKnownOnLedgerStatus( + entry: DamlTransactionRejectionEntry, + rejection: PartiesNotKnownOnLedger, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + val parties = rejection.getPartiesList + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Party not known on ledger: Parties not known on ledger ${parties.asScala.mkString("[", ",", "]")}", + Map("parties" -> objectToJsonString(parties)), + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.FAILED_PRECONDITION), // The party may become known at a later time + ) + } + + def submittingPartyNotKnownOnLedgerStatus( + entry: DamlTransactionRejectionEntry, + rejection: SubmittingPartyNotKnownOnLedger, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Party not known on ledger: Submitting party '${rejection.getSubmitterParty}' not known", + Map("submitter_party" -> rejection.getSubmitterParty), + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.FAILED_PRECONDITION), // The party may become known at a later time + ) + } + + def partyNotKnownOnLedgerStatus( + entry: DamlTransactionRejectionEntry, + rejection: PartyNotKnownOnLedger, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Party not known on ledger: ${rejection.getDetails}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.FAILED_PRECONDITION), // The party may become known at a later time + ) + } + + def causalMonotonicityViolatedStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + "Invalid ledger time: Causal monotonicity violated", + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def recordTimeOutOfRangeStatus( + entry: DamlTransactionRejectionEntry, + rejection: RecordTimeOutOfRange, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Invalid ledger time: Record time is outside of valid range [${rejection.getMinimumRecordTime}, ${rejection.getMaximumRecordTime}]", + Map( + "minimum_record_time" -> Instant + .ofEpochSecond( + rejection.getMinimumRecordTime.getSeconds, + rejection.getMinimumRecordTime.getNanos.toLong, + ) + .toString, + "maximum_record_time" -> Instant + .ofEpochSecond( + rejection.getMaximumRecordTime.getSeconds, + rejection.getMaximumRecordTime.getNanos.toLong, + ) + .toString, + ), + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def missingInputStateStatus( + entry: DamlTransactionRejectionEntry, + rejection: MissingInputState, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Inconsistent: Missing input state for key ${rejection.getKey.toString}", + Map("key" -> rejection.getKey.toString), + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.INTERNAL), // The inputs should have been provided by the participant + ) + } + + def externallyInconsistentKeysStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Inconsistent: ${ExternallyInconsistentTransaction.InconsistentKeys.description}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def externallyDuplicateKeysStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Inconsistent: ${ExternallyInconsistentTransaction.DuplicateKeys.description}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def externallyInconsistentContractsStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Inconsistent: ${ExternallyInconsistentTransaction.InconsistentContracts.description}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def internallyInconsistentKeysStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Disputed: ${InternallyInconsistentTransaction.InconsistentKeys.description}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.INTERNAL), // Should have been caught by the participant + ) + } + + def internallyDuplicateKeysStatus( + entry: DamlTransactionRejectionEntry, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Disputed: ${InternallyInconsistentTransaction.DuplicateKeys.description}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.INTERNAL), // Should have been caught by the participant + ) + } + + def validationFailureStatus( + entry: DamlTransactionRejectionEntry, + rejection: ValidationFailure, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + val statusBuilder = disputedStatusBuilder(entry, rejection.getDetails) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.INTERNAL), // Should have been caught by the participant + ) + } + + def duplicateCommandStatus( + entry: DamlTransactionRejectionEntry + ): Status = buildStatus( + entry, + Code.ALREADY_EXISTS, + "Duplicate commands", + ) + + def resourceExhaustedStatus( + entry: DamlTransactionRejectionEntry, + rejection: ResourcesExhausted, + ): Status = buildStatus( + entry, + Code.ABORTED, + s"Resources exhausted: ${rejection.getDetails}", + ) + + def inconsistentStatus( + entry: DamlTransactionRejectionEntry, + rejection: Inconsistent, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + def statusBuilder: Code => Status = buildStatus( + entry, + _, + s"Inconsistent: ${rejection.getDetails}", + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + def submitterCannotActViaParticipantStatus( + entry: DamlTransactionRejectionEntry, + rejection: SubmitterCannotActViaParticipant, + ): Status = buildStatus( + entry, + Code.PERMISSION_DENIED, + s"Submitter cannot act via participant: ${rejection.getDetails}", + Map( + "submitter_party" -> rejection.getSubmitterParty, + "participant_id" -> rejection.getParticipantId, + ), + ) + + def disputedStatus( + entry: DamlTransactionRejectionEntry, + rejection: Disputed, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + val statusBuilder = disputedStatusBuilder(entry, rejection.getDetails) + errorVersionSwitch.choose( + statusBuilder(Code.INVALID_ARGUMENT), + statusBuilder(Code.INTERNAL), // Should have been caught by the participant + ) + } + + def invalidLedgerTimeStatus( + entry: DamlTransactionRejectionEntry, + rejection: InvalidLedgerTime, + errorVersionSwitch: ValueSwitch[Status], + ): Status = { + val statusBuilder = buildStatus( + entry, + _, + s"Invalid ledger time: ${rejection.getDetails}", + Map( + "ledger_time" -> rejection.getLedgerTime.toString, + "lower_bound" -> rejection.getLowerBound.toString, + "upper_bound" -> rejection.getUpperBound.toString, + ), + ) + errorVersionSwitch.choose( + statusBuilder(Code.ABORTED), + statusBuilder(Code.FAILED_PRECONDITION), // May succeed at a later time + ) + } + + private def buildStatus( + entry: DamlTransactionRejectionEntry, + code: Code, + message: String, + additionalMetadata: Map[String, String] = Map.empty, + ) = Status.of( + code.value, + message, + Seq( + AnyProto.pack[ErrorInfo]( + ErrorInfo(metadata = + additionalMetadata + (GrpcStatuses.DefiniteAnswerKey -> entry.getDefiniteAnswer.toString) + ) + ) + ), + ) + + private def invalidRecordTimeRejectionStatus( + rejectionEntry: DamlTransactionRejectionEntry, + reason: CorrelationId, + errorCode: Code, + ) = Status.of( + errorCode.value, + reason, + Seq( + AnyProto.pack[ErrorInfo]( + ErrorInfo(metadata = + Map( + GrpcStatuses.DefiniteAnswerKey -> rejectionEntry.getDefiniteAnswer.toString + ) + ) + ) + ), + ) + + private def duplicateCommandsRejectionStatus( + rejectionEntry: DamlTransactionRejectionEntry, + errorCode: Code, + ) = Status.of( + errorCode.value, + "Duplicate commands", + Seq( + AnyProto.pack[ErrorInfo]( + // the definite answer is false, as the rank-based deduplication is not yet implemented + ErrorInfo(metadata = + Map( + GrpcStatuses.DefiniteAnswerKey -> rejectionEntry.getDefiniteAnswer.toString + ) + ) + ) + ), + ) + + private def objectToJsonString(obj: Object): String = { + val stringWriter = new StringWriter + val objectMapper = new ObjectMapper + objectMapper.writeValue(stringWriter, obj) + stringWriter.toString + } + + private def disputedStatusBuilder( + entry: DamlTransactionRejectionEntry, + rejectionString: String, + ): Code => Status = buildStatus( + entry, + _, + s"Disputed: $rejectionString", + ) +} diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala index a573e90c38a1..294c3b455452 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala @@ -4,9 +4,9 @@ package com.daml.ledger.participant.state.kvutils import java.time.Duration - import com.codahale.metrics.MetricRegistry import com.daml.daml_lf_dev.DamlLf +import com.daml.error.ValueSwitch import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult @@ -63,6 +63,8 @@ object KVTest { ) private[kvutils] val metrics = new Metrics(new MetricRegistry) + private[kvutils] val errorVersionSwitch = + new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false) private def initialTestState: KVTestState = { val engine = Engine.DevEngine() @@ -383,7 +385,7 @@ object KVTest { newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission) ) // Verify that we can always process the log entry. - val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) entryId -> logEntry } @@ -413,11 +415,13 @@ object KVTest { KeyValueConsumption.logEntryToUpdate( entryId, successfulLogEntry, + errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, ) KeyValueConsumption.logEntryToUpdate( entryId, outOfTimeBoundsLogEntry, + errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala index 15dd650e2e79..53360b1f5d50 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/ConversionsSpec.scala @@ -3,8 +3,7 @@ package com.daml.ledger.participant.state.kvutils -import java.time.{Duration, Instant} - +import com.daml.error.ValueSwitch import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.configuration.LedgerTimeModel import com.daml.ledger.participant.state.kvutils.Conversions._ @@ -40,12 +39,14 @@ import com.daml.lf.value.ValueOuterClass import com.fasterxml.jackson.databind.ObjectMapper import com.google.protobuf.{TextFormat, Timestamp} import com.google.rpc.error_details.ErrorInfo +import com.google.rpc.status.Status import io.grpc.Status.Code import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks.{Table, forAll} import org.scalatest.wordspec.AnyWordSpec +import java.time.{Duration, Instant} import scala.annotation.nowarn import scala.collection.immutable.{ListMap, ListSet} import scala.collection.mutable @@ -119,10 +120,10 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { val submitterInfo = DamlSubmitterInfo.newBuilder().build() val now = Instant.now - "convert rejection to proto models and back to expected grpc code" in { + "convert rejection to proto models and back to expected grpc v1 code" in { forAll( Table[Rejection, Code, Map[String, String]]( - ("rejection", "expected code", "expected additional details"), + ("Rejection", "Expected Code", "Expected Additional Details"), ( Rejection.ValidationFailure(Error.Package(Error.Package.Internal("ERROR", "ERROR"))), Code.INVALID_ARGUMENT, @@ -225,19 +226,133 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { ), ) ) { (rejection, expectedCode, expectedAdditionalDetails) => - val encodedEntry = Conversions - .encodeTransactionRejectionEntry( - submitterInfo, - rejection, - ) - .build() - val finalReason = Conversions - .decodeTransactionRejectionEntry(encodedEntry) - .value - finalReason.code shouldBe expectedCode.value() - finalReason.definiteAnswer shouldBe false - val actualDetails = finalReasonToDetails(finalReason) - actualDetails should contain allElementsOf (expectedAdditionalDetails) + checkErrors( + v1ErrorSwitch, + submitterInfo, + rejection, + expectedCode, + expectedAdditionalDetails, + ) + } + } + + "convert rejection to proto models and back to expected grpc v2 code" in { + forAll( + Table[Rejection, Code, Map[String, String]]( + ( + "Rejection", + "Expected Code", + "Expected Additional Details", + ), + ( + Rejection.ValidationFailure(Error.Package(Error.Package.Internal("ERROR", "ERROR"))), + Code.INTERNAL, + Map.empty, + ), + ( + Rejection.InternallyInconsistentTransaction.InconsistentKeys, + Code.INTERNAL, + Map.empty, + ), + ( + Rejection.InternallyInconsistentTransaction.DuplicateKeys, + Code.INTERNAL, + Map.empty, + ), + ( + Rejection.ExternallyInconsistentTransaction.InconsistentContracts, + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.ExternallyInconsistentTransaction.InconsistentKeys, + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.ExternallyInconsistentTransaction.DuplicateKeys, + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.MissingInputState(DamlStateKey.getDefaultInstance), + Code.INTERNAL, + Map.empty, + ), + ( + Rejection.InvalidParticipantState(Err.InternalError("error")), + Code.INTERNAL, + Map.empty, + ), + ( + Rejection.InvalidParticipantState( + Err.ArchiveDecodingFailed(Ref.PackageId.assertFromString("id"), "reason") + ), + Code.INTERNAL, + Map("package_id" -> "id"), + ), + ( + Rejection.InvalidParticipantState(Err.MissingDivulgedContractInstance("id")), + Code.INTERNAL, + Map("contract_id" -> "id"), + ), + ( + Rejection.RecordTimeOutOfRange(now, now), + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.LedgerTimeOutOfRange(LedgerTimeModel.OutOfRange(now, now, now)), + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.CausalMonotonicityViolated, + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.SubmittingPartyNotKnownOnLedger(Ref.Party.assertFromString("party")), + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.PartiesNotKnownOnLedger(Seq.empty), + Code.FAILED_PRECONDITION, + Map.empty, + ), + ( + Rejection.MissingInputState(partyStateKey("party")), + Code.INTERNAL, + Map("key" -> "party: \"party\"\n"), + ), + ( + Rejection.RecordTimeOutOfRange(Instant.EPOCH, Instant.EPOCH), + Code.FAILED_PRECONDITION, + Map( + "minimum_record_time" -> Instant.EPOCH.toString, + "maximum_record_time" -> Instant.EPOCH.toString, + ), + ), + ( + Rejection.SubmittingPartyNotKnownOnLedger(party0), + Code.FAILED_PRECONDITION, + Map("submitter_party" -> party0), + ), + ( + Rejection.PartiesNotKnownOnLedger(Iterable(party0, party1)), + Code.FAILED_PRECONDITION, + Map("parties" -> s"""[\"$party0\",\"$party1\"]"""), + ), + ) + ) { (rejection, expectedCode, expectedAdditionalDetails) => + checkErrors( + v2ErrorSwitch, + submitterInfo, + rejection, + expectedCode, + expectedAdditionalDetails, + ) } } @@ -281,8 +396,7 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { ) .build() val finalReason = Conversions - .decodeTransactionRejectionEntry(encodedEntry) - .value + .decodeTransactionRejectionEntry(encodedEntry, v1ErrorSwitch) finalReason.definiteAnswer shouldBe false val actualDetails = finalReasonToDetails(finalReason).toMap metadataParser(actualDetails(metadataKey)) shouldBe expectedParsedMetadata @@ -358,9 +472,9 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { val finalReason = Conversions .decodeTransactionRejectionEntry( rejectionBuilder(DamlTransactionRejectionEntry.newBuilder()) - .build() + .build(), + v1ErrorSwitch, ) - .value finalReason.code shouldBe code.value() finalReason.definiteAnswer shouldBe false val actualDetails = finalReasonToDetails(finalReason) @@ -414,6 +528,27 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { } } + private def checkErrors( + errorVersionSwitch: ValueSwitch[Status], + submitterInfo: DamlSubmitterInfo, + rejection: Rejection, + expectedCode: Code, + expectedAdditionalDetails: Map[String, String], + ) = { + val encodedEntry = Conversions + .encodeTransactionRejectionEntry( + submitterInfo, + rejection, + ) + .build() + val finalReason = Conversions + .decodeTransactionRejectionEntry(encodedEntry, errorVersionSwitch) + finalReason.code shouldBe expectedCode.value() + finalReason.definiteAnswer shouldBe false + val actualDetails = finalReasonToDetails(finalReason) + actualDetails should contain allElementsOf (expectedAdditionalDetails) + } + private def newDisclosureEntry(node: NodeId, parties: List[String]) = DisclosureEntry.newBuilder .setNodeId(node.index.toString) @@ -484,6 +619,9 @@ class ConversionsSpec extends AnyWordSpec with Matchers with OptionValues { ) .build + private lazy val v1ErrorSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = false) + private lazy val v2ErrorSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = true) + private[this] val txVersion = TransactionVersion.StableVersions.max private def deduplicationKeyBytesFor(parties: List[String]): Array[Byte] = { diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala index 7937ee6e2028..78d88dd3ff16 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala @@ -3,8 +3,9 @@ package com.daml.ledger.participant.state.kvutils -import java.time.Duration +import com.daml.error.ValueSwitch +import java.time.Duration import com.daml.ledger.participant.state.kvutils.TestHelpers._ import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue} @@ -41,6 +42,9 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private val errorVersionSwitch = + new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false) + private val alice = party("Alice") private val bob = party("Bob") private val eve = party("Eve") @@ -95,7 +99,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) ) transaction2 <- runSimpleCommand( @@ -219,6 +223,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { KeyValueConsumption.logEntryToUpdate( entryId, preExecutionResult.successfulLogEntry, + errorVersionSwitch, Some(recordTime), ) ) @@ -303,7 +308,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) ) transaction2 <- runSimpleCommand( @@ -519,7 +524,8 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo // Process into updates and verify - val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build) + val updates = + KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch) inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) => txAccepted.optCompletionInfo should be(None) } @@ -573,6 +579,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { KeyValueConsumption.logEntryToUpdate( entryId, preExecutionResult.successfulLogEntry, + errorVersionSwitch, Some(recordTime), ) ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 3e27fd84b5df..12fb5fb7aecf 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -3,8 +3,7 @@ package com.daml.ledger.participant.state.kvutils -import java.time.Instant - +import com.daml.error.ValueSwitch import com.daml.ledger.configuration.Configuration import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant} @@ -19,15 +18,7 @@ import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{ DamlPackageUploadEntry, DamlPackageUploadRejectionEntry, } -import com.daml.ledger.participant.state.kvutils.store.events.{ - DamlConfigurationEntry, - DamlConfigurationRejectionEntry, - DamlPartyAllocationEntry, - DamlPartyAllocationRejectionEntry, - DamlSubmitterInfo, - DamlTransactionEntry, - DamlTransactionRejectionEntry, -} +import com.daml.ledger.participant.state.kvutils.store.events._ import com.daml.ledger.participant.state.kvutils.store.{ DamlLogEntry, DamlLogEntryId, @@ -41,13 +32,16 @@ import com.google.protobuf.any.{Any => AnyProto} import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code import com.google.rpc.error_details.ErrorInfo +import com.google.rpc.status.Status import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks._ -import org.scalatest.prop.TableFor4 import org.scalatest.prop.Tables.Table +import org.scalatest.prop.{TableFor1, TableFor4, TableFor5} import org.scalatest.wordspec.AnyWordSpec +import java.time.Instant + class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { private val aLogEntryIdString = "test" private val aLogEntryId = @@ -63,36 +57,74 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { .setPackageUploadEntry(DamlPackageUploadEntry.getDefaultInstance) .build + private val errorVersionsTable: TableFor1[ValueSwitch[Status]] = Table[ValueSwitch[Status]]( + "Error Version", + v1ErrorSwitch, + v2ErrorSwitch, + ) + "logEntryToUpdate" should { "throw in case no record time is available from the log entry or input argument" in { - assertThrows[Err]( - logEntryToUpdate(aLogEntryId, aLogEntryWithoutRecordTime, recordTimeForUpdate = None) - ) + forAll( + errorVersionsTable + ) { errorSwitch => + assertThrows[Err]( + logEntryToUpdate( + aLogEntryId, + aLogEntryWithoutRecordTime, + errorSwitch, + recordTimeForUpdate = None, + ) + ) + } } "use log entry's record time instead of one provided as input" in { - val actual :: Nil = logEntryToUpdate( - aLogEntryId, - aLogEntryWithRecordTime, - recordTimeForUpdate = Some(aRecordTime), - ) + forAll( + errorVersionsTable + ) { errorSwitch => + val actual :: Nil = logEntryToUpdate( + aLogEntryId, + aLogEntryWithRecordTime, + errorSwitch, + recordTimeForUpdate = Some(aRecordTime), + ) - actual.recordTime shouldBe aRecordTimeFromLogEntry + actual.recordTime shouldBe aRecordTimeFromLogEntry + } } "use record time from log entry if not provided as input" in { - val actual :: Nil = - logEntryToUpdate(aLogEntryId, aLogEntryWithRecordTime, recordTimeForUpdate = None) + forAll( + errorVersionsTable + ) { errorSwitch => + val actual :: Nil = + logEntryToUpdate( + aLogEntryId, + aLogEntryWithRecordTime, + errorSwitch, + recordTimeForUpdate = None, + ) - actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100)) + actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100)) + } } "not generate an update from a time update entry" in { - val timeUpdateEntry = DamlLogEntry.newBuilder - .setRecordTime(Conversions.buildTimestamp(aRecordTime)) - .setTimeUpdateEntry(Empty.getDefaultInstance) - .build - logEntryToUpdate(aLogEntryId, timeUpdateEntry, recordTimeForUpdate = None) shouldBe Nil + forAll( + errorVersionsTable + ) { errorSwitch => + val timeUpdateEntry = DamlLogEntry.newBuilder + .setRecordTime(Conversions.buildTimestamp(aRecordTime)) + .setTimeUpdateEntry(Empty.getDefaultInstance) + .build + logEntryToUpdate( + aLogEntryId, + timeUpdateEntry, + errorSwitch, + recordTimeForUpdate = None, + ) shouldBe Nil + } } } @@ -132,44 +164,48 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { ), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ( TimeBounds(deduplicateUntil = Some(aRecordTime)), aRecordTime, PACKAGE_UPLOAD_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ( TimeBounds(deduplicateUntil = Some(aRecordTime)), aRecordTime, CONFIGURATION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ( TimeBounds(deduplicateUntil = Some(aRecordTime)), aRecordTime, PARTY_ALLOCATION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ) runAll(testCases) } "generate update for deduplicated transaction with definite answer set to true" in { - val inputEntry = buildOutOfTimeBoundsEntry( - TimeBounds(deduplicateUntil = Some(aRecordTime)), - TRANSACTION_REJECTION_ENTRY, - definiteAnswer = Some(true), - ) - val actual = outOfTimeBoundsEntryToUpdate(aRecordTime, inputEntry) - inside(actual) { case Some(CommandRejected(_, _, FinalReason(status))) => - status.code shouldBe Code.ALREADY_EXISTS.value - status.details shouldBe Seq( - AnyProto.pack[ErrorInfo]( - ErrorInfo(metadata = Map(GrpcStatuses.DefiniteAnswerKey -> "true")) - ) + forAll( + errorVersionsTable + ) { errorSwitch => + val inputEntry = buildOutOfTimeBoundsEntry( + TimeBounds(deduplicateUntil = Some(aRecordTime)), + TRANSACTION_REJECTION_ENTRY, + definiteAnswer = Some(true), ) + val actual = outOfTimeBoundsEntryToUpdate(aRecordTime, inputEntry, errorSwitch) + inside(actual) { case Some(CommandRejected(_, _, FinalReason(status))) => + status.code shouldBe Code.ALREADY_EXISTS.value + status.details shouldBe Seq( + AnyProto.pack[ErrorInfo]( + ErrorInfo(metadata = Map(GrpcStatuses.DefiniteAnswerKey -> "true")) + ) + ) + } } } @@ -182,7 +218,6 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { someSubmitterInfo, ) completionInfo.submissionId shouldBe Some(someSubmitterInfo.getSubmissionId) - status.code shouldBe Code.ABORTED.value status.details shouldBe Seq( AnyProto.pack[ErrorInfo]( ErrorInfo(metadata = Map(GrpcStatuses.DefiniteAnswerKey -> "false")) @@ -191,45 +226,110 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { () case _ => fail() } + def verifyStatusCode(actual: Option[Update], code: Code): Unit = actual match { + case Some(Update.CommandRejected(_, _, FinalReason(status))) => + status.code shouldBe code.value + () + case _ => fail() + } val testCases = Table( - ("Time Bounds", "Record Time", "Log Entry Type", "Assertions"), + ("Error Version", "Time Bounds", "Record Time", "Log Entry Type", "Assertions"), ( + v1ErrorSwitch, TimeBounds( tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))) ), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(verify = verifyCommandRejection), + Assertions(verify = { update => + verifyCommandRejection(update) + verifyStatusCode(update, Code.ABORTED) + }), ), ( + v1ErrorSwitch, TimeBounds( tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1))) ), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(verify = verifyCommandRejection), + Assertions(verify = { update => + verifyCommandRejection(update) + verifyStatusCode(update, Code.ABORTED) + }), ), ( + v1ErrorSwitch, TimeBounds(tooLateFrom = Some(aRecordTime)), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ( + v1ErrorSwitch, TimeBounds(tooEarlyUntil = Some(aRecordTime)), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + v1ErrorSwitch, + TimeBounds( + tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))), + tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1))), + ), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + v2ErrorSwitch, + TimeBounds( + tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))) + ), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = { update => + verifyCommandRejection(update) + verifyStatusCode(update, Code.FAILED_PRECONDITION) + }), + ), + ( + v2ErrorSwitch, + TimeBounds( + tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1))) + ), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = { update => + verifyCommandRejection(update) + verifyStatusCode(update, Code.FAILED_PRECONDITION) + }), + ), + ( + v2ErrorSwitch, + TimeBounds(tooLateFrom = Some(aRecordTime)), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + v2ErrorSwitch, + TimeBounds(tooEarlyUntil = Some(aRecordTime)), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), ), // Record time within time bounds. ( + v2ErrorSwitch, TimeBounds( tooEarlyUntil = Some(Timestamp.assertFromInstant(aRecordTimeInstant.minusMillis(1))), tooLateFrom = Some(Timestamp.assertFromInstant(aRecordTimeInstant.plusMillis(1))), ), aRecordTime, TRANSACTION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ) runAll(testCases) @@ -278,13 +378,13 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { TimeBounds(tooLateFrom = Some(aRecordTime)), aRecordTime, CONFIGURATION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ( TimeBounds(tooEarlyUntil = Some(aRecordTime)), aRecordTime, CONFIGURATION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), // Record time within time bounds. ( @@ -294,7 +394,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { ), aRecordTime, CONFIGURATION_REJECTION_ENTRY, - Assertions(), + Assertions(verify = verifyNoUpdateIsGenerated), ), ) runAll(testCases) @@ -303,10 +403,30 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { "not generate an update for rejected entries" in { val testCases = Table( ("Time Bounds", "Record Time", "Log Entry Type", "Assertions"), - (TimeBounds(), aRecordTime, TRANSACTION_REJECTION_ENTRY, Assertions()), - (TimeBounds(), aRecordTime, PACKAGE_UPLOAD_REJECTION_ENTRY, Assertions()), - (TimeBounds(), aRecordTime, CONFIGURATION_REJECTION_ENTRY, Assertions()), - (TimeBounds(), aRecordTime, PARTY_ALLOCATION_REJECTION_ENTRY, Assertions()), + ( + TimeBounds(), + aRecordTime, + TRANSACTION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + TimeBounds(), + aRecordTime, + PACKAGE_UPLOAD_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + TimeBounds(), + aRecordTime, + CONFIGURATION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), + ( + TimeBounds(), + aRecordTime, + PARTY_ALLOCATION_REJECTION_ENTRY, + Assertions(verify = verifyNoUpdateIsGenerated), + ), ) runAll(testCases) } @@ -330,10 +450,13 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { } private def runAll( - table: TableFor4[TimeBounds, Timestamp, DamlLogEntry.PayloadCase, Assertions] - ): Unit = { + table: TableFor5[ValueSwitch[ + Status + ], TimeBounds, Timestamp, DamlLogEntry.PayloadCase, Assertions] + ): Unit = forAll(table) { ( + errorVersionSwitch: ValueSwitch[Status], timeBounds: TimeBounds, recordTime: Timestamp, logEntryType: DamlLogEntry.PayloadCase, @@ -342,14 +465,36 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { val inputEntry = buildOutOfTimeBoundsEntry(timeBounds, logEntryType) if (assertions.throwsInternalError) { assertThrows[Err.InternalError]( - outOfTimeBoundsEntryToUpdate(recordTime, inputEntry) + outOfTimeBoundsEntryToUpdate(recordTime, inputEntry, errorVersionSwitch) ) } else { - val actual = outOfTimeBoundsEntryToUpdate(recordTime, inputEntry) + val actual = outOfTimeBoundsEntryToUpdate(recordTime, inputEntry, errorVersionSwitch) assertions.verify(actual) () } } + + private def runAll( + table: TableFor4[TimeBounds, Timestamp, DamlLogEntry.PayloadCase, Assertions] + ): Unit = { + val (head1, head2, head3, head4) = table.heading + runAll( + Table( + heading = ("Error Version", head1, head2, head3, head4), + rows = table.flatMap { + case ( + timeBounds: TimeBounds, + recordTime: Timestamp, + logEntryType: DamlLogEntry.PayloadCase, + assertions: Assertions, + ) => + Seq( + (v1ErrorSwitch, timeBounds, recordTime, logEntryType, assertions), + (v2ErrorSwitch, timeBounds, recordTime, logEntryType, assertions), + ) + }: _*, + ) + ) } private def buildOutOfTimeBoundsEntry( @@ -421,4 +566,11 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { } builder.build } + + private lazy val v1ErrorSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = false) { + override def toString: String = "1" + } + private lazy val v2ErrorSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = true) { + override def toString: String = "2" + } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala index 8eb22aaa0ee2..91cea6e1c488 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala @@ -6,6 +6,7 @@ package com.daml.ledger.participant.state.kvutils.api import akka.NotUsed import akka.stream.scaladsl.{Sink, Source} import com.codahale.metrics.MetricRegistry +import com.daml.error.ValueSwitch import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader.offsetForUpdate @@ -23,6 +24,7 @@ import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp import com.daml.metrics.Metrics import com.google.protobuf.ByteString +import com.google.rpc.status.Status import org.mockito.Mockito.when import org.mockito.MockitoSugar._ import org.scalatest.matchers.should.Matchers @@ -242,26 +244,34 @@ object KeyValueParticipantStateReaderSpec { private val aWrappedLogEntry = Envelope.enclose(aLogEntry) private val zeroUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) => List.empty + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + (_, _, _, _) => List.empty private val singleUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) => - List( - Update.PartyAddedToParticipant( - Ref.Party.assertFromString("aParty"), - "a party", - Ref.ParticipantId.assertFromString("aParticipant"), - Timestamp.now(), - submissionId = None, + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + (_, _, _, _) => + List( + Update.PartyAddedToParticipant( + Ref.Party.assertFromString("aParty"), + "a party", + Ref.ParticipantId.assertFromString("aParticipant"), + Timestamp.now(), + submissionId = None, + ) ) - ) private val twoUpdatesGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = - (entryId, entry, recordTime) => - singleUpdateGenerator(entryId, entry, recordTime) ::: singleUpdateGenerator( + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + (entryId, entry, errorVersionSwitch, recordTime) => + singleUpdateGenerator( + entryId, + entry, + errorVersionSwitch, + recordTime, + ) ::: singleUpdateGenerator( entryId, entry, + errorVersionSwitch, recordTime, ) @@ -281,13 +291,18 @@ object KeyValueParticipantStateReaderSpec { private def createInstance( reader: LedgerReader, - logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = - singleUpdateGenerator, + logEntryToUpdate: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => List[Update] = singleUpdateGenerator, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, new Metrics(new MetricRegistry), + enableSelfServiceErrorCodes = false, logEntryToUpdate, () => None, failOnUnexpectedEvent, diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 87f32c800fdf..9d01bd6382cb 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -65,6 +65,7 @@ final class LogAppendingReadServiceFactory( keyValueSource, metrics, failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, ) new ReplayingReadService { override def updateCount(): Long = recordedBlocksSnapshot.length.toLong diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index b31e89e4d589..51eb91a48ac9 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -277,7 +277,12 @@ object RecoveringIndexerIntegrationSpec { engine = Engine.DevEngine(), committerExecutionContext = committerExecutionContext, ) - } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) + } yield new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = false, + ) } } diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index 7724a82a78f1..5977cc7bcc45 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -170,7 +170,11 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC), ) readService = new TimedReadService( - KeyValueParticipantStateReader(readerWriter, metrics), + KeyValueParticipantStateReader( + readerWriter, + metrics, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ), metrics, ) writeService = new TimedWriteService(