Skip to content

Commit

Permalink
Propagate error version switch to KeyValueConsumption
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
fabiotudone-da committed Oct 13, 2021
1 parent 74ef17c commit 15b8817
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object Main {
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)

// Note: this method is doing quite a lot of work to transform a sequence of write sets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[memory] class InMemoryLedgerFactory(dispatcher: Dispatcher[Index], state
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class InMemoryLedgerReaderWriterIntegrationSpec
engine = Engine.DevEngine(),
committerExecutionContext = committerExecutionContext,
)
} yield new KeyValueParticipantState(readerWriter, readerWriter, metrics)
} yield new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
metrics = metrics.daml.kvutils.submission.validator.stateValueCache,
),
).acquire()
.map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
.map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,12 @@ abstract class SqlLedgerReaderWriterIntegrationSpecBase(implementationName: Stri
jdbcUrl = jdbcUrl(testId),
resetOnStartup = false,
logEntryIdAllocator = RandomLogEntryIdAllocator,
).map(readerWriter => new KeyValueParticipantState(readerWriter, readerWriter, metrics))
).map(readerWriter =>
new KeyValueParticipantState(
readerWriter,
readerWriter,
metrics,
enableSelfServiceErrorCodes = false,
)
)
}
3 changes: 3 additions & 0 deletions ledger/participant-state/kvutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ da_scala_library(
"//daml-lf/transaction:value_proto_java",
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-configuration",
Expand Down Expand Up @@ -117,6 +118,7 @@ da_scala_library(
"//ledger-api/grpc-definitions:ledger_api_proto_scala",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
Expand Down Expand Up @@ -186,6 +188,7 @@ da_scala_test_suite(
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/caching",
"//ledger/error",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ object LedgerFactory {
readerWriter,
readerWriter,
createMetrics(participantConfig, config),
config.enableSelfServiceErrorCodes,
)

def owner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.daml.ledger.participant.state.kvutils

import com.daml.error.ValueSwitch
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions._
Expand All @@ -13,12 +14,7 @@ import com.daml.ledger.participant.state.kvutils.store.events.{
DamlTransactionEntry,
DamlTransactionRejectionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{
DamlLogEntry,
DamlLogEntryId,
DamlOutOfTimeBoundsEntry,
DamlStateKey,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId, DamlOutOfTimeBoundsEntry, DamlStateKey}
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import com.daml.ledger.participant.state.v2.{DivulgedContract, TransactionMeta, Update}
import com.daml.lf.data.Ref
Expand All @@ -33,6 +29,7 @@ import com.google.rpc.error_details.ErrorInfo
import com.google.rpc.status.Status
import org.slf4j.LoggerFactory

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
Expand All @@ -51,13 +48,15 @@ object KeyValueConsumption {
*
* @param entryId: The log entry identifier.
* @param entry: The log entry.
* @param errorVersionSwitch: Decides between v1 and v2 (self-service) errors.
* @return [[Update]]s constructed from log entry.
*/
// TODO(BH): add participantId to ensure participant id matches in DamlLogEntry
@throws(classOf[Err])
def logEntryToUpdate(
entryId: DamlLogEntryId,
entry: DamlLogEntry,
@nowarn("msg=parameter value errorVersionSwitch.* is never used") errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
): List[Update] = {
val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ class KeyValueParticipantState(
reader: LedgerReader,
writer: LedgerWriter,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
) extends ReadService
with WriteService {
private val readerAdapter =
KeyValueParticipantStateReader(reader, metrics)
KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes)
private val writerAdapter =
new KeyValueParticipantStateWriter(
new TimedLedgerWriter(writer, metrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.api

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.error.ValueSwitch
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
Expand All @@ -15,6 +16,7 @@ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.metrics.{Metrics, Timed}
import com.google.rpc.status.Status

/** Adapts a [[LedgerReader]] instance to [[ReadService]].
* Performs translation between the offsets required by the underlying reader and [[ReadService]]:
Expand All @@ -29,12 +31,15 @@ import com.daml.metrics.{Metrics, Timed}
class KeyValueParticipantStateReader private[api] (
reader: LedgerReader,
metrics: Metrics,
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, Option[Timestamp]) => List[Update],
enableSelfServiceErrorCodes: Boolean,
logEntryToUpdate: (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update],
timeUpdatesProvider: TimeUpdatesProvider,
failOnUnexpectedEvent: Boolean,
) extends ReadService {
import KeyValueParticipantStateReader._

private val errorVersionSwitch = new ValueSwitch[Status](enableSelfServiceErrorCodes)

override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
Source.single(createLedgerInitialConditions())

Expand All @@ -51,7 +56,7 @@ class KeyValueParticipantStateReader private[api] (
metrics.daml.kvutils.reader.parseUpdates, {
val logEntryId = DamlLogEntryId.parseFrom(entryId.bytes)
val updates =
logEntryToUpdate(logEntryId, logEntry, timeUpdatesProvider())
logEntryToUpdate(logEntryId, logEntry, errorVersionSwitch, timeUpdatesProvider())
val updatesWithOffsets =
Source(updates).zipWithIndex.map { case (update, index) =>
offsetForUpdate(offset, index.toInt, updates.size) -> update
Expand Down Expand Up @@ -86,12 +91,14 @@ object KeyValueParticipantStateReader {
def apply(
reader: LedgerReader,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault,
failOnUnexpectedEvent: Boolean = true,
): KeyValueParticipantStateReader =
new KeyValueParticipantStateReader(
reader,
metrics,
enableSelfServiceErrorCodes,
KeyValueConsumption.logEntryToUpdate,
timeUpdatesProvider,
failOnUnexpectedEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package com.daml.ledger.participant.state.kvutils

import java.time.Duration

import com.codahale.metrics.MetricRegistry
import com.daml.daml_lf_dev.DamlLf
import com.daml.error.ValueSwitch
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.kvutils.KeyValueCommitting.PreExecutionResult
Expand Down Expand Up @@ -63,6 +63,8 @@ object KVTest {
)

private[kvutils] val metrics = new Metrics(new MetricRegistry)
private[kvutils] val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private def initialTestState: KVTestState = {
val engine = Engine.DevEngine()
Expand Down Expand Up @@ -383,7 +385,7 @@ object KVTest {
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)

entryId -> logEntry
}
Expand Down Expand Up @@ -413,11 +415,13 @@ object KVTest {
KeyValueConsumption.logEntryToUpdate(
entryId,
successfulLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

package com.daml.ledger.participant.state.kvutils

import java.time.Duration
import com.daml.error.ValueSwitch

import java.time.Duration
import com.daml.ledger.participant.state.kvutils.TestHelpers._
import com.daml.ledger.participant.state.kvutils.store.events.DamlTransactionRejectionEntry
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlStateValue}
Expand All @@ -17,16 +18,7 @@ import com.daml.lf.crypto.Hash
import com.daml.lf.data.{FrontStack, Ref, SortedLookupList}
import com.daml.lf.transaction.Node.NodeCreate
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{
ContractId,
ValueList,
ValueOptional,
ValueParty,
ValueRecord,
ValueTextMap,
ValueUnit,
ValueVariant,
}
import com.daml.lf.value.Value.{ContractId, ValueList, ValueOptional, ValueParty, ValueRecord, ValueTextMap, ValueUnit, ValueVariant}
import com.daml.logging.LoggingContext
import org.scalatest.Inside
import org.scalatest.matchers.should.Matchers
Expand All @@ -41,6 +33,9 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private val errorVersionSwitch =
new ValueSwitch[com.google.rpc.status.Status](enableSelfServiceErrorCodes = false)

private val alice = party("Alice")
private val bob = party("Bob")
private val eve = party("Eve")
Expand Down Expand Up @@ -95,7 +90,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -219,6 +214,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
Expand Down Expand Up @@ -303,7 +299,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry)
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -519,7 +515,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
strippedEntry.getTransactionEntryBuilder.clearSubmitterInfo

// Process into updates and verify
val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build)
val updates = KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
}
Expand Down Expand Up @@ -573,6 +569,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
KeyValueConsumption.logEntryToUpdate(
entryId,
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)
Expand Down
Loading

0 comments on commit 15b8817

Please sign in to comment.