Skip to content

Commit

Permalink
Propagate error version switch to KeyValueConsumption
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
fabiotudone-da committed Oct 13, 2021
1 parent 74ef17c commit 792afb3
Showing 17 changed files with 138 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ object Main {
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)

// Note: this method is doing quite a lot of work to transform a sequence of write sets
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes,
)
}

Original file line number Diff line number Diff line change
@@ -44,6 +44,11 @@ class InMemoryLedgerReaderWriterIntegrationSpec
engine = Engine.DevEngine(),
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)
} yield new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)

}
Original file line number Diff line number Diff line change
@@ -77,7 +77,14 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
).acquire()
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
.map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -33,5 +33,12 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
jdbcUrl = jdbcUrl(testId),
resetOnStartup = false,
logEntryIdAllocator = RandomLogEntryIdAllocator,
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
).map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)
)
}
3 changes: 3 additions & 0 deletions ledger/participant-state/kvutils/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ da_scala_library(
"//daml-lf/transaction:value_proto_java",
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-configuration",
@@ -117,6 +118,7 @@ da_scala_library(
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
@@ -186,6 +188,7 @@ da_scala_test_suite(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
Original file line number Diff line number Diff line change
@@ -187,6 +187,7 @@ object LedgerFactory {
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes,
)

def owner(
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@

package com.daml.ledger.participant.state.kvutils

import com.daml.error.ValueSwitch
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions._
@@ -33,6 +34,7 @@ import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import org.slf4j.LoggerFactory

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
@@ -51,13 +53,17 @@ object KeyValueConsumption {
*
* @param entryId: The log entry identifier.
* @param entry: The log entry.
* @param errorVersionSwitch: Decides between v1 and v2 (self-service) errors.
* @return [[Update]]s constructed from log entry.
*/
// TODO(BH): add participantId to ensure participant id matches in DamlLogEntry
@throws(classOf[Err])
def logEntryToUpdate(
entryId: DamlLogEntryId,
entry: DamlLogEntry,
@nowarn(
"msg=parameter value errorVersionSwitch.* is never used"
) errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
): List[Update] = {
val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
Original file line number Diff line number Diff line change
@@ -40,10 +40,11 @@ class KeyValueParticipantState(
reader: LedgerReader,
writer: LedgerWriter,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
) extends ReadService
with WriteService {
private val readerAdapter =
KeyValueParticipantStateReader(reader, metrics)
KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes)
private val writerAdapter =
new KeyValueParticipantStateWriter(
new TimedLedgerWriter(writer, metrics),
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.error.ValueSwitch
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
@@ -15,6 +16,7 @@ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.metrics.{Metrics, Timed}
import com.google.rpc.status.Status

/** Adapts a [[LedgerReader]] instance to [[ReadService]].
* Performs translation between the offsets required by the underlying reader and [[ReadService]]:
@@ -29,12 +31,20 @@ import com.daml.metrics.{Metrics, Timed}
class KeyValueParticipantStateReader private[api] (
reader: LedgerReader,
metrics: Metrics,
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update],
enableSelfServiceErrorCodes: Boolean,
logEntryToUpdate: (
DamlLogEntryId,
DamlLogEntry,
ValueSwitch[Status],
Option[Timestamp],
) => List[Update],
timeUpdatesProvider: TimeUpdatesProvider,
failOnUnexpectedEvent: Boolean,
) extends ReadService {
import KeyValueParticipantStateReader._

private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes)

override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
Source.single(createLedgerInitialConditions())

@@ -51,7 +61,12 @@ class KeyValueParticipantStateReader private[api] (
metrics.daml.kvutils.reader.parseUpdates, {
val logEntryId = DamlLogEntryId.parseFrom(entryId.bytes)
val updates =
logEntryToUpdate(logEntryId, logEntry, timeUpdatesProvider())
logEntryToUpdate(
logEntryId,
logEntry,
errorVersionSwitch,
timeUpdatesProvider(),
)
val updatesWithOffsets =
Source(updates).zipWithIndex.map { case (update, index) =>
offsetForUpdate(offset, index.toInt, updates.size) -> update
@@ -86,12 +101,14 @@ object KeyValueParticipantStateReader {
def apply(
reader: LedgerReader,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault,
failOnUnexpectedEvent: Boolean = true,
): KeyValueParticipantStateReader =
new KeyValueParticipantStateReader(
reader,
metrics,
enableSelfServiceErrorCodes,
KeyValueConsumption.logEntryToUpdate,
timeUpdatesProvider,
failOnUnexpectedEvent,
Original file line number Diff line number Diff line change
@@ -4,9 +4,9 @@
package com.daml.ledger.participant.state.kvutils

import java.time.Duration

import com.codahale.metrics.MetricRegistry
import com.daml.daml_lf_dev.DamlLf
import com.daml.error.ValueSwitch
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
@@ -63,6 +63,8 @@ object KVTest {
)

private[kvutils] val metrics = new Metrics(new MetricRegistry)
private[kvutils] val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private def initialTestState: KVTestState = {
val engine = Engine.DevEngine()
@@ -383,7 +385,7 @@ object KVTest {
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)

entryId -> logEntry
}
@@ -413,11 +415,13 @@ object KVTest {
KeyValueConsumption.logEntryToUpdate(
entryId,
successfulLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)

Original file line number Diff line number Diff line change
@@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils

import java.time.Duration
import com.daml.error.ValueSwitch

import java.time.Duration
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue}
@@ -41,6 +42,9 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private val alice = party("Alice")
private val bob = party("Bob")
private val eve = party("Eve")
@@ -95,7 +99,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
@@ -219,6 +223,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
@@ -303,7 +308,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
@@ -519,7 +524,8 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo

// Process into updates and verify
val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build)
val updates =
KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
}
@@ -573,6 +579,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
Original file line number Diff line number Diff line change
@@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils

import java.time.Instant
import com.daml.error.ValueSwitch

import java.time.Instant
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant}
@@ -39,6 +40,7 @@ import com.google.protobuf.any.{Any => AnyProto}
import com.google.protobuf.{ByteString, Empty}
import com.google.rpc.code.Code
import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -47,6 +49,8 @@ import org.scalatest.prop.Tables.Table
import org.scalatest.wordspec.AnyWordSpec

class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes = false)

private val aLogEntryIdString = "test"
private val aLogEntryId =
DamlLogEntryId.newBuilder().setEntryId(ByteString.copyFromUtf8(aLogEntryIdString)).build()
@@ -64,14 +68,20 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
"logEntryToUpdate" should {
"throw in case no record time is available from the log entry or input argument" in {
assertThrows[Err](
logEntryToUpdate(aLogEntryId, aLogEntryWithoutRecordTime, recordTimeForUpdate = None)
logEntryToUpdate(
aLogEntryId,
aLogEntryWithoutRecordTime,
errorVersionSwitch,
recordTimeForUpdate = None,
)
)
}

"use log entry's record time instead of one provided as input" in {
val actual :: Nil = logEntryToUpdate(
aLogEntryId,
aLogEntryWithRecordTime,
errorVersionSwitch,
recordTimeForUpdate = Some(aRecordTime),
)

@@ -80,7 +90,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {

"use record time from log entry if not provided as input" in {
val actual :: Nil =
logEntryToUpdate(aLogEntryId, aLogEntryWithRecordTime, recordTimeForUpdate = None)
logEntryToUpdate(
aLogEntryId,
aLogEntryWithRecordTime,
errorVersionSwitch,
recordTimeForUpdate = None,
)

actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
}
@@ -90,7 +105,12 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
.setRecordTime(Conversions.buildTimestamp(aRecordTime))
.setTimeUpdateEntry(Empty.getDefaultInstance)
.build
logEntryToUpdate(aLogEntryId, timeUpdateEntry, recordTimeForUpdate = None) shouldBe Nil
logEntryToUpdate(
aLogEntryId,
timeUpdateEntry,
errorVersionSwitch,
recordTimeForUpdate = None,
) shouldBe Nil
}
}

Loading

0 comments on commit 792afb3

Please sign in to comment.