Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV: introduce v2 error codes behind a CLI switch [KVL-1140] #11224

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object Main {
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)

// Note: this method is doing quite a lot of work to transform a sequence of write sets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class InMemoryLedgerReaderWriterIntegrationSpec
engine = Engine.DevEngine(),
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)
} yield new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
).acquire()
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
.map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,12 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
jdbcUrl = jdbcUrl(testId),
resetOnStartup = false,
logEntryIdAllocator = RandomLogEntryIdAllocator,
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
).map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)
)
}
3 changes: 3 additions & 0 deletions ledger/participant-state/kvutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ da_scala_library(
"//daml-lf/transaction:value_proto_java",
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-configuration",
Expand Down Expand Up @@ -117,6 +118,7 @@ da_scala_library(
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
Expand Down Expand Up @@ -186,6 +188,7 @@ da_scala_test_suite(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ object LedgerFactory {
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes,
)

def owner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.participant.state.kvutils

import com.daml.error.ValueSwitch
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions._
Expand Down Expand Up @@ -33,6 +34,7 @@ import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import org.slf4j.LoggerFactory

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
Expand All @@ -51,13 +53,17 @@ object KeyValueConsumption {
*
* @param entryId: The log entry identifier.
* @param entry: The log entry.
* @param errorVersionSwitch: Decides between v1 and v2 (self-service) errors.
* @return [[Update]]s constructed from log entry.
*/
// TODO(BH): add participantId to ensure participant id matches in DamlLogEntry
@throws(classOf[Err])
def logEntryToUpdate(
entryId: DamlLogEntryId,
entry: DamlLogEntry,
@nowarn(
"msg=parameter value errorVersionSwitch.* is never used"
) errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
): List[Update] = {
val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ class KeyValueParticipantState(
reader: LedgerReader,
writer: LedgerWriter,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
) extends ReadService
with WriteService {
private val readerAdapter =
KeyValueParticipantStateReader(reader, metrics)
KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes)
private val writerAdapter =
new KeyValueParticipantStateWriter(
new TimedLedgerWriter(writer, metrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.error.ValueSwitch
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
Expand All @@ -15,6 +16,7 @@ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.metrics.{Metrics, Timed}
import com.google.rpc.status.Status

/** Adapts a [[LedgerReader]] instance to [[ReadService]].
* Performs translation between the offsets required by the underlying reader and [[ReadService]]:
Expand All @@ -29,12 +31,20 @@ import com.daml.metrics.{Metrics, Timed}
class KeyValueParticipantStateReader private[api] (
reader: LedgerReader,
metrics: Metrics,
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update],
enableSelfServiceErrorCodes: Boolean,
logEntryToUpdate: (
DamlLogEntryId,
DamlLogEntry,
ValueSwitch[Status],
Option[Timestamp],
) => List[Update],
Comment on lines +33 to +38
Copy link
Contributor Author

@fabiotudone-da fabiotudone-da Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not pretty but, considering the switch should be short-lived, I wouldn't re-architect (e.g. making it stateful) in this PR.

timeUpdatesProvider: TimeUpdatesProvider,
failOnUnexpectedEvent: Boolean,
) extends ReadService {
import KeyValueParticipantStateReader._

private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes)

override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
Source.single(createLedgerInitialConditions())

Expand All @@ -51,7 +61,12 @@ class KeyValueParticipantStateReader private[api] (
metrics.daml.kvutils.reader.parseUpdates, {
val logEntryId = DamlLogEntryId.parseFrom(entryId.bytes)
val updates =
logEntryToUpdate(logEntryId, logEntry, timeUpdatesProvider())
logEntryToUpdate(
logEntryId,
logEntry,
errorVersionSwitch,
timeUpdatesProvider(),
)
val updatesWithOffsets =
Source(updates).zipWithIndex.map { case (update, index) =>
offsetForUpdate(offset, index.toInt, updates.size) -> update
Expand Down Expand Up @@ -86,12 +101,14 @@ object KeyValueParticipantStateReader {
def apply(
reader: LedgerReader,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault,
failOnUnexpectedEvent: Boolean = true,
): KeyValueParticipantStateReader =
new KeyValueParticipantStateReader(
reader,
metrics,
enableSelfServiceErrorCodes,
KeyValueConsumption.logEntryToUpdate,
timeUpdatesProvider,
failOnUnexpectedEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package com.daml.ledger.participant.state.kvutils

import java.time.Duration

import com.codahale.metrics.MetricRegistry
import com.daml.daml_lf_dev.DamlLf
import com.daml.error.ValueSwitch
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
Expand Down Expand Up @@ -63,6 +63,8 @@ object KVTest {
)

private[kvutils] val metrics = new Metrics(new MetricRegistry)
private[kvutils] val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private def initialTestState: KVTestState = {
val engine = Engine.DevEngine()
Expand Down Expand Up @@ -383,7 +385,7 @@ object KVTest {
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)

entryId -> logEntry
}
Expand Down Expand Up @@ -413,11 +415,13 @@ object KVTest {
KeyValueConsumption.logEntryToUpdate(
entryId,
successfulLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils

import java.time.Duration
import com.daml.error.ValueSwitch

import java.time.Duration
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue}
Expand Down Expand Up @@ -41,6 +42,9 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private val alice = party("Alice")
private val bob = party("Bob")
private val eve = party("Eve")
Expand Down Expand Up @@ -95,7 +99,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -219,6 +223,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
Expand Down Expand Up @@ -303,7 +308,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -519,7 +524,8 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo

// Process into updates and verify
val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build)
val updates =
KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
}
Expand Down Expand Up @@ -573,6 +579,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils

import java.time.Instant
import com.daml.error.ValueSwitch

import java.time.Instant
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant}
Expand Down Expand Up @@ -39,6 +40,7 @@ import com.google.protobuf.any.{Any => AnyProto}
import com.google.protobuf.{ByteString, Empty}
import com.google.rpc.code.Code
import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
Expand All @@ -47,6 +49,8 @@ import org.scalatest.prop.Tables.Table
import org.scalatest.wordspec.AnyWordSpec

class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = false)

private val aLogEntryIdString = "test"
private val aLogEntryId =
DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFromUtf8(aLogEntryIdString)).build()
Expand All @@ -64,14 +68,20 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
"logEntryToUpdate" should {
"throw in case no record time is available from the log entry or input argument" in {
assertThrows[Err](
logEntryToUpdate(aLogEntryId, aLogEntryWithoutRecordTime, recordTimeForUpdate = None)
logEntryToUpdate(
aLogEntryId,
aLogEntryWithoutRecordTime,
errorVersionSwitch,
recordTimeForUpdate = None,
)
)
}

"use log entry's record time instead of one provided as input" in {
val actual :: Nil = logEntryToUpdate(
aLogEntryId,
aLogEntryWithRecordTime,
errorVersionSwitch,
recordTimeForUpdate = Some(aRecordTime),
)

Expand All @@ -80,7 +90,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {

"use record time from log entry if not provided as input" in {
val actual :: Nil =
logEntryToUpdate(aLogEntryId, aLogEntryWithRecordTime, recordTimeForUpdate = None)
logEntryToUpdate(
aLogEntryId,
aLogEntryWithRecordTime,
errorVersionSwitch,
recordTimeForUpdate = None,
)

actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
}
Expand All @@ -90,7 +105,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
.setRecordTime(Conversions.buildTimestamp(aRecordTime))
.setTimeUpdateEntry(Empty.getDefaultInstance)
.build
logEntryToUpdate(aLogEntryId, timeUpdateEntry, recordTimeForUpdate = None) shouldBe Nil
logEntryToUpdate(
aLogEntryId,
timeUpdateEntry,
errorVersionSwitch,
recordTimeForUpdate = None,
) shouldBe Nil
}
}

Expand Down
Loading