From 15b88174ae39d96466fe6e14b3a08f6bc0b42ff4 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Wed, 13 Oct 2021 07:33:47 +0200 Subject: [PATCH] Propagate error version switch to KeyValueConsumption CHANGELOG_BEGIN CHANGELOG_END --- .../scala/ledger/indexerbenchmark/Main.scala | 1 + .../on/memory/InMemoryLedgerFactory.scala | 1 + ...oryLedgerReaderWriterIntegrationSpec.scala | 7 ++++- .../daml/ledger/on/sql/SqlLedgerFactory.scala | 9 +++++- ...edgerReaderWriterIntegrationSpecBase.scala | 9 +++++- ledger/participant-state/kvutils/BUILD.bazel | 3 ++ .../state/kvutils/app/LedgerFactory.scala | 1 + .../state/kvutils/KeyValueConsumption.scala | 11 ++++---- .../api/KeyValueParticipantState.scala | 3 +- .../api/KeyValueParticipantStateReader.scala | 11 ++++++-- .../participant/state/kvutils/KVTest.scala | 8 ++++-- .../kvutils/KVUtilsTransactionSpec.scala | 25 ++++++++--------- .../kvutils/KeyValueConsumptionSpec.scala | 28 ++++++++++++++++--- .../KeyValueParticipantStateReaderSpec.scala | 23 ++++++++------- .../LogAppendingReadServiceFactory.scala | 1 + .../RecoveringIndexerIntegrationSpec.scala | 2 +- .../scala/platform/sandboxnext/Runner.scala | 6 +++- 17 files changed, 103 insertions(+), 46 deletions(-) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 169573353399..593cac55451e 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -75,6 +75,7 @@ object Main { keyValueSource, metrics, failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, ) // Note: this method is doing quite a lot of work to transform a sequence of write sets diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala index 35f5e2df0dd1..0f69e51a1188 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryLedgerFactory.scala @@ -46,6 +46,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state readerWriter, readerWriter, createMetrics(participantConfig, config), + config.enableSelfServiceErrorCodes ) } diff --git a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala index 8be303743b4e..59913bcc0129 100644 --- a/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala +++ b/ledger/ledger-on-memory/src/test/suite/scala/com/daml/ledger/on/memory/InMemoryLedgerReaderWriterIntegrationSpec.scala @@ -44,6 +44,11 @@ class InMemoryLedgerReaderWriterIntegrationSpec engine = Engine.DevEngine(), committerExecutionContext = committerExecutionContext, ) - } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) + } yield new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = false, + ) } diff --git a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala index 872f03c4ec9f..e6e460256d2f 100644 --- a/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala +++ b/ledger/ledger-on-sql/src/app/scala/com/daml/ledger/on/sql/SqlLedgerFactory.scala @@ -77,7 +77,14 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] { metrics = metrics.daml.kvutils.submission.validator.stateValueCache, ), ).acquire() - .map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) + .map(readerWriter => + new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ) + ) } } } diff --git a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala index 73eabe3a7e9d..339a0691ff95 100644 --- a/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala +++ b/ledger/ledger-on-sql/src/test/lib/scala/com/daml/ledger/on/sql/SqlLedgerReaderWriterIntegrationSpecBase.scala @@ -33,5 +33,12 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri jdbcUrl = jdbcUrl(testId), resetOnStartup = false, logEntryIdAllocator = RandomLogEntryIdAllocator, - ).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics)) + ).map(readerWriter => + new KeyValueParticipantState( + readerWriter, + readerWriter, + metrics, + enableSelfServiceErrorCodes = false, + ) + ) } diff --git a/ledger/participant-state/kvutils/BUILD.bazel b/ledger/participant-state/kvutils/BUILD.bazel index 7417b41e9750..2228eb8908d3 100644 --- a/ledger/participant-state/kvutils/BUILD.bazel +++ b/ledger/participant-state/kvutils/BUILD.bazel @@ -52,6 +52,7 @@ da_scala_library( "//daml-lf/transaction:value_proto_java", "//ledger-api/grpc-definitions:ledger_api_proto_scala", "//ledger/caching", + "//ledger/error", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", "//ledger/ledger-configuration", @@ -117,6 +118,7 @@ da_scala_library( "//ledger-api/grpc-definitions:ledger_api_proto_scala", "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", + "//ledger/error", "//ledger/ledger-api-common", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", @@ -186,6 +188,7 @@ da_scala_test_suite( "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", "//ledger/caching", + "//ledger/error", "//ledger/ledger-api-common", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala index 1843405b8cd9..8c51dbb68ae0 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala @@ -187,6 +187,7 @@ object LedgerFactory { readerWriter, readerWriter, createMetrics(participantConfig, config), + config.enableSelfServiceErrorCodes, ) def owner( diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index aa34c26bc28c..ef20d05ceb2a 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -3,6 +3,7 @@ package com.daml.ledger.participant.state.kvutils +import com.daml.error.ValueSwitch import com.daml.ledger.configuration.Configuration import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.participant.state.kvutils.Conversions._ @@ -13,12 +14,7 @@ import com.daml.ledger.participant.state.kvutils.store.events.{ DamlTransactionEntry, DamlTransactionRejectionEntry, } -import com.daml.ledger.participant.state.kvutils.store.{ - DamlLogEntry, - DamlLogEntryId, - DamlOutOfTimeBoundsEntry, - DamlStateKey, -} +import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId, DamlOutOfTimeBoundsEntry, DamlStateKey} import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.ledger.participant.state.v2.{DivulgedContract, TransactionMeta, Update} import com.daml.lf.data.Ref @@ -33,6 +29,7 @@ import com.google.rpc.error_details.ErrorInfo import com.google.rpc.status.Status import org.slf4j.LoggerFactory +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a @@ -51,6 +48,7 @@ object KeyValueConsumption { * * @param entryId: The log entry identifier. * @param entry: The log entry. + * @param errorVersionSwitch: Decides between v1 and v2 (self-service) errors. * @return [[Update]]s constructed from log entry. */ // TODO(BH): add participantId to ensure participant id matches in DamlLogEntry @@ -58,6 +56,7 @@ object KeyValueConsumption { def logEntryToUpdate( entryId: DamlLogEntryId, entry: DamlLogEntry, + @nowarn("msg=parameter value errorVersionSwitch.* is never used") errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, ): List[Update] = { val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index c76d2cd3c584..b26c596a12f0 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -40,10 +40,11 @@ class KeyValueParticipantState( reader: LedgerReader, writer: LedgerWriter, metrics: Metrics, + enableSelfServiceErrorCodes: Boolean, ) extends ReadService with WriteService { private val readerAdapter = - KeyValueParticipantStateReader(reader, metrics) + KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes) private val writerAdapter = new KeyValueParticipantStateWriter( new TimedLedgerWriter(writer, metrics), diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index c589de415089..1796943556ae 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api import akka.NotUsed import akka.stream.scaladsl.Source +import com.daml.error.ValueSwitch import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset @@ -15,6 +16,7 @@ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp import com.daml.metrics.{Metrics, Timed} +import com.google.rpc.status.Status /** Adapts a [[LedgerReader]] instance to [[ReadService]]. * Performs translation between the offsets required by the underlying reader and [[ReadService]]: @@ -29,12 +31,15 @@ import com.daml.metrics.{Metrics, Timed} class KeyValueParticipantStateReader private[api] ( reader: LedgerReader, metrics: Metrics, - logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update], + enableSelfServiceErrorCodes: Boolean, + logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update], timeUpdatesProvider: TimeUpdatesProvider, failOnUnexpectedEvent: Boolean, ) extends ReadService { import KeyValueParticipantStateReader._ + private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes) + override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.single(createLedgerInitialConditions()) @@ -51,7 +56,7 @@ class KeyValueParticipantStateReader private[api] ( metrics.daml.kvutils.reader.parseUpdates, { val logEntryId = DamlLogEntryId.parseFrom(entryId.bytes) val updates = - logEntryToUpdate(logEntryId, logEntry, timeUpdatesProvider()) + logEntryToUpdate(logEntryId, logEntry, errorVersionSwitch, timeUpdatesProvider()) val updatesWithOffsets = Source(updates).zipWithIndex.map { case (update, index) => offsetForUpdate(offset, index.toInt, updates.size) -> update @@ -86,12 +91,14 @@ object KeyValueParticipantStateReader { def apply( reader: LedgerReader, metrics: Metrics, + enableSelfServiceErrorCodes: Boolean, timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, metrics, + enableSelfServiceErrorCodes, KeyValueConsumption.logEntryToUpdate, timeUpdatesProvider, failOnUnexpectedEvent, diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala index a573e90c38a1..294c3b455452 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala @@ -4,9 +4,9 @@ package com.daml.ledger.participant.state.kvutils import java.time.Duration - import com.codahale.metrics.MetricRegistry import com.daml.daml_lf_dev.DamlLf +import com.daml.error.ValueSwitch import com.daml.ledger.api.DeduplicationPeriod import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult @@ -63,6 +63,8 @@ object KVTest { ) private[kvutils] val metrics = new Metrics(new MetricRegistry) + private[kvutils] val errorVersionSwitch = + new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false) private def initialTestState: KVTestState = { val engine = Engine.DevEngine() @@ -383,7 +385,7 @@ object KVTest { newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission) ) // Verify that we can always process the log entry. - val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) entryId -> logEntry } @@ -413,11 +415,13 @@ object KVTest { KeyValueConsumption.logEntryToUpdate( entryId, successfulLogEntry, + errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, ) KeyValueConsumption.logEntryToUpdate( entryId, outOfTimeBoundsLogEntry, + errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala index 7937ee6e2028..0741c5c0d40f 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala @@ -3,8 +3,9 @@ package com.daml.ledger.participant.state.kvutils -import java.time.Duration +import com.daml.error.ValueSwitch +import java.time.Duration import com.daml.ledger.participant.state.kvutils.TestHelpers._ import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue} @@ -17,16 +18,7 @@ import com.daml.lf.crypto.Hash import com.daml.lf.data.{FrontStack, Ref, SortedLookupList} import com.daml.lf.transaction.Node.NodeCreate import com.daml.lf.value.Value -import com.daml.lf.value.Value.{ - ContractId, - ValueList, - ValueOptional, - ValueParty, - ValueRecord, - ValueTextMap, - ValueUnit, - ValueVariant, -} +import com.daml.lf.value.Value.{ContractId, ValueList, ValueOptional, ValueParty, ValueRecord, ValueTextMap, ValueUnit, ValueVariant} import com.daml.logging.LoggingContext import org.scalatest.Inside import org.scalatest.matchers.should.Matchers @@ -41,6 +33,9 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private val errorVersionSwitch = + new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false) + private val alice = party("Alice") private val bob = party("Bob") private val eve = party("Eve") @@ -95,7 +90,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) ) transaction2 <- runSimpleCommand( @@ -219,6 +214,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { KeyValueConsumption.logEntryToUpdate( entryId, preExecutionResult.successfulLogEntry, + errorVersionSwitch, Some(recordTime), ) ) @@ -303,7 +299,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry) + KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) ) transaction2 <- runSimpleCommand( @@ -519,7 +515,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo // Process into updates and verify - val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build) + val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch) inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) => txAccepted.optCompletionInfo should be(None) } @@ -573,6 +569,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { KeyValueConsumption.logEntryToUpdate( entryId, preExecutionResult.successfulLogEntry, + errorVersionSwitch, Some(recordTime), ) ) diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 4e44bfaf080f..96b115e92cb9 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -3,8 +3,9 @@ package com.daml.ledger.participant.state.kvutils -import java.time.Instant +import com.daml.error.ValueSwitch +import java.time.Instant import com.daml.ledger.configuration.Configuration import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant} @@ -39,6 +40,7 @@ import com.google.protobuf.any.{Any => AnyProto} import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code import com.google.rpc.error_details.ErrorInfo +import com.google.rpc.status.Status import org.scalatest.Inside.inside import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks._ @@ -47,6 +49,8 @@ import org.scalatest.prop.Tables.Table import org.scalatest.wordspec.AnyWordSpec class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { + private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = false) + private val aLogEntryIdString = "test" private val aLogEntryId = DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFromUtf8(aLogEntryIdString)).build() @@ -64,7 +68,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { "logEntryToUpdate" should { "throw in case no record time is available from the log entry or input argument" in { assertThrows[Err]( - logEntryToUpdate(aLogEntryId, aLogEntryWithoutRecordTime, recordTimeForUpdate = None) + logEntryToUpdate( + aLogEntryId, + aLogEntryWithoutRecordTime, + errorVersionSwitch, + recordTimeForUpdate = None, + ) ) } @@ -72,6 +81,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { val actual :: Nil = logEntryToUpdate( aLogEntryId, aLogEntryWithRecordTime, + errorVersionSwitch, recordTimeForUpdate = Some(aRecordTime), ) @@ -80,7 +90,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { "use record time from log entry if not provided as input" in { val actual :: Nil = - logEntryToUpdate(aLogEntryId, aLogEntryWithRecordTime, recordTimeForUpdate = None) + logEntryToUpdate( + aLogEntryId, + aLogEntryWithRecordTime, + errorVersionSwitch, + recordTimeForUpdate = None, + ) actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100)) } @@ -90,7 +105,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { .setRecordTime(Conversions.buildTimestamp(aRecordTime)) .setTimeUpdateEntry(Empty.getDefaultInstance) .build - logEntryToUpdate(aLogEntryId, timeUpdateEntry, recordTimeForUpdate = None) shouldBe Nil + logEntryToUpdate( + aLogEntryId, + timeUpdateEntry, + errorVersionSwitch, + recordTimeForUpdate = None, + ) shouldBe Nil } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala index f2d58a1c8455..a74b016207d1 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala @@ -6,23 +6,20 @@ package com.daml.ledger.participant.state.kvutils.api import akka.NotUsed import akka.stream.scaladsl.{Sink, Source} import com.codahale.metrics.MetricRegistry +import com.daml.error.ValueSwitch import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.DamlKvutils._ import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReader.offsetForUpdate import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantStateReaderSpec._ -import com.daml.ledger.participant.state.kvutils.store.{ - DamlLogEntry, - DamlLogEntryId, - DamlPartyAllocation, - DamlStateValue, -} +import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId, DamlPartyAllocation, DamlStateValue} import com.daml.ledger.participant.state.kvutils.{Envelope, OffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp import com.daml.metrics.Metrics import com.google.protobuf.ByteString +import com.google.rpc.status.Status import org.mockito.Mockito.when import org.mockito.MockitoSugar._ import org.scalatest.matchers.should.Matchers @@ -245,10 +242,10 @@ object KeyValueParticipantStateReaderSpec { private val aWrappedLogEntry = Envelope.enclose(aLogEntry) private val zeroUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) => List.empty + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = (_, _, _, _) => List.empty private val singleUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = (_, _, _) => + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = (_, _, _, _) => List( Update.PartyAddedToParticipant( Ref.Party.assertFromString("aParty"), @@ -260,11 +257,12 @@ object KeyValueParticipantStateReaderSpec { ) private val twoUpdatesGenerator - : (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = - (entryId, entry, recordTime) => - singleUpdateGenerator(entryId, entry, recordTime) ::: singleUpdateGenerator( + : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + (entryId, entry, errorVersionSwitch, recordTime) => + singleUpdateGenerator(entryId, entry, errorVersionSwitch, recordTime) ::: singleUpdateGenerator( entryId, entry, + errorVersionSwitch, recordTime, ) @@ -284,13 +282,14 @@ object KeyValueParticipantStateReaderSpec { private def createInstance( reader: LedgerReader, - logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update] = + logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = singleUpdateGenerator, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, new Metrics(new MetricRegistry), + enableSelfServiceErrorCodes = false, logEntryToUpdate, () => None, failOnUnexpectedEvent, diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 14c5d2f76a28..f1acd9cf8902 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -64,6 +64,7 @@ final class LogAppendingReadServiceFactory( keyValueSource, metrics, failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, ) new ReplayingReadService { override def updateCount(): Long = recordedBlocksSnapshot.length.toLong diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index b3fb43314adf..76029b03f69d 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -276,7 +276,7 @@ object RecoveringIndexerIntegrationSpec { engine = Engine.DevEngine(), committerExecutionContext = committerExecutionContext, ) - } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics) + } yield new KeyValueParticipantState(readerWriter, readerWriter, metrics, enableSelfServiceErrorCodes = false) } } diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index f4b54c77a5f0..4ff37739ae28 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -169,7 +169,11 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC), ) readService = new TimedReadService( - KeyValueParticipantStateReader(readerWriter, metrics), + KeyValueParticipantStateReader( + readerWriter, + metrics, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ), metrics, ) writeService = new TimedWriteService(