From 0be8b6b0393ed2f8c8704e0f04ddecfea0111f84 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 15:26:20 +0200 Subject: [PATCH] Support contextual logging in KeyValueConsumption CHANGELOG_BEGIN CHANGELOG_END --- ledger/indexer-benchmark/BUILD.bazel | 1 + .../scala/ledger/indexerbenchmark/Main.scala | 16 ++++++++------- .../state/kvutils/KeyValueConsumption.scala | 14 +++++++------ .../api/KeyValueParticipantState.scala | 8 +++++--- .../api/KeyValueParticipantStateReader.scala | 3 ++- .../kvutils/KeyValueConsumptionSpec.scala | 20 ++++++------------- .../LogAppendingCommitStrategySupport.scala | 4 +++- .../LogAppendingReadServiceFactory.scala | 4 +++- ...RawPreExecutingCommitStrategySupport.scala | 4 +++- .../LogAppendingReadServiceFactorySpec.scala | 4 +++- .../CommitStrategySupport.scala | 2 +- 11 files changed, 44 insertions(+), 36 deletions(-) diff --git a/ledger/indexer-benchmark/BUILD.bazel b/ledger/indexer-benchmark/BUILD.bazel index fffc238840fd..2bf889648f40 100644 --- a/ledger/indexer-benchmark/BUILD.bazel +++ b/ledger/indexer-benchmark/BUILD.bazel @@ -66,6 +66,7 @@ da_scala_library( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//libs-scala/concurrent", + "//libs-scala/contextualized-logging", "@maven//:io_dropwizard_metrics_metrics_core", ], ) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 1f531b9be008..f4488ab4c655 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -5,7 +5,6 @@ package com.daml.ledger.indexerbenchmark import java.nio.file.{Files, Paths} import java.util.concurrent.atomic.AtomicLong - import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Materializer @@ -23,6 +22,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import scala.concurrent.Future @@ -72,12 +72,14 @@ object Main { val metricRegistry = new MetricRegistry val metrics = new Metrics(metricRegistry) - val keyValueStateReader = KeyValueParticipantStateReader( - keyValueSource, - metrics, - failOnUnexpectedEvent = false, - enableSelfServiceErrorCodes = false, - ) + val keyValueStateReader = newLoggingContext { implicit loggingContext => + KeyValueParticipantStateReader( + keyValueSource, + metrics, + failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, + ) + } // Note: this method is doing quite a lot of work to transform a sequence of write sets // to a sequence of state updates. diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index e9bdbd3850df..e8d5907ba892 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -3,7 +3,7 @@ package com.daml.ledger.participant.state.kvutils -import com.daml.error.ValueSwitch +import com.daml.error.{DamlContextualizedErrorLogger, ValueSwitch} import com.daml.ledger.configuration.Configuration import com.daml.ledger.participant.state.kvutils.Conversions._ import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.DamlPackageUploadRejectionEntry @@ -26,10 +26,10 @@ import com.daml.lf.data.Ref import com.daml.lf.data.Ref.LedgerString import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.CommittedTransaction +import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.google.common.io.BaseEncoding import com.google.protobuf.ByteString import com.google.rpc.status.Status -import org.slf4j.LoggerFactory import scala.jdk.CollectionConverters._ @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._ * key-value based ledger. */ object KeyValueConsumption { - private val logger = LoggerFactory.getLogger(this.getClass) + private val logger = ContextualizedLogger.get(this.getClass) def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes) @@ -59,6 +59,8 @@ object KeyValueConsumption { entry: DamlLogEntry, errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, + )(implicit + loggingContext: LoggingContext ): List[Update] = { val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => parseTimestamp(entry.getRecordTime) @@ -260,7 +262,7 @@ object KeyValueConsumption { entryId: DamlLogEntryId, txEntry: DamlTransactionEntry, recordTime: Timestamp, - ): Update.TransactionAccepted = { + )(implicit loggingContext: LoggingContext): Update.TransactionAccepted = { val transaction = Conversions.decodeTransaction(txEntry.getTransaction) val hexTxId = parseLedgerString("TransactionId")( BaseEncoding.base16.encode(entryId.toByteArray) @@ -311,7 +313,7 @@ object KeyValueConsumption { private def validateDivulgedContracts( hexTxId: LedgerString, damlTransactionBlindingInfo: DamlTransactionBlindingInfo, - ) = + )(implicit loggingContext: LoggingContext): List[DivulgedContract] = if (!damlTransactionBlindingInfo.getDivulgencesList.isEmpty) { Conversions.extractDivulgedContracts(damlTransactionBlindingInfo) match { case Right(divulgedContractsIndex) => @@ -339,7 +341,7 @@ object KeyValueConsumption { recordTime: Timestamp, outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry, errorVersionSwitch: ValueSwitch[Status], - ): Option[Update] = { + )(implicit loggingContext: LoggingContext): Option[Update] = { val timeBounds = parseTimeBounds(outOfTimeBoundsEntry) val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _) val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index b26c596a12f0..359cf12f4b0f 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -3,8 +3,6 @@ package com.daml.ledger.participant.state.kvutils.api -import java.util.concurrent.CompletionStage - import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf @@ -22,9 +20,12 @@ import com.daml.ledger.participant.state.v2.{ } import com.daml.lf.data.{Ref, Time} import com.daml.lf.transaction.SubmittedTransaction +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.telemetry.TelemetryContext +import java.util.concurrent.CompletionStage + /** Implements read and write operations required for running a participant server. * * Adapts [[LedgerReader]] and [[LedgerWriter]] interfaces to [[com.daml.ledger.participant.state.v2.ReadService]] and @@ -41,7 +42,8 @@ class KeyValueParticipantState( writer: LedgerWriter, metrics: Metrics, enableSelfServiceErrorCodes: Boolean, -) extends ReadService +)(implicit loggingContext: LoggingContext) + extends ReadService with WriteService { private val readerAdapter = KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index f659a765173b..bb5646750a67 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2._ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} import com.google.rpc.status.Status @@ -103,7 +104,7 @@ object KeyValueParticipantStateReader { enableSelfServiceErrorCodes: Boolean, timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault, failOnUnexpectedEvent: Boolean = true, - ): KeyValueParticipantStateReader = + )(implicit loggingContext: LoggingContext): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, metrics, diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 12fb5fb7aecf..573a40e0a5ce 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -7,27 +7,17 @@ import com.daml.error.ValueSwitch import com.daml.ledger.configuration.Configuration import com.daml.ledger.grpc.GrpcStatuses import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant} -import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{ - TimeBounds, - logEntryToUpdate, - outOfTimeBoundsEntryToUpdate, -} +import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{TimeBounds, logEntryToUpdate, outOfTimeBoundsEntryToUpdate} import com.daml.ledger.participant.state.kvutils.api.LedgerReader import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry.PayloadCase._ -import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{ - DamlPackageUploadEntry, - DamlPackageUploadRejectionEntry, -} +import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{DamlPackageUploadEntry, DamlPackageUploadRejectionEntry} import com.daml.ledger.participant.state.kvutils.store.events._ -import com.daml.ledger.participant.state.kvutils.store.{ - DamlLogEntry, - DamlLogEntryId, - DamlOutOfTimeBoundsEntry, -} +import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId, DamlOutOfTimeBoundsEntry} import com.daml.ledger.participant.state.v2.Update import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.google.protobuf.any.{Any => AnyProto} import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code @@ -63,6 +53,8 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { v2ErrorSwitch, ) + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "logEntryToUpdate" should { "throw in case no record time is available from the log entry or input argument" in { forAll( diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala index e6ce9b834a4e..88d32145ac47 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala @@ -71,7 +71,9 @@ final class LogAppendingCommitStrategySupport( } } - override def newReadServiceFactory(): ReplayingReadServiceFactory = + override def newReadServiceFactory()(implicit + loggingContext: LoggingContext + ): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 2fafb89967dd..df4d02f4f988 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -17,6 +17,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet} import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import scala.collection.immutable @@ -25,7 +26,8 @@ import scala.collection.mutable.ListBuffer final class LogAppendingReadServiceFactory( offsetBuilder: KVOffsetBuilder, metrics: Metrics, -) extends ReplayingReadServiceFactory { +)(implicit val loggingContext: LoggingContext) + extends ReplayingReadServiceFactory { private val recordedBlocks = ListBuffer.empty[LedgerRecord] override def appendBlock(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala index 5a2d7c83baa2..0426c65ccee0 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala @@ -86,7 +86,9 @@ final class RawPreExecutingCommitStrategySupport( .map(_ => access.getWriteSet) } - override def newReadServiceFactory(): ReplayingReadServiceFactory = + override def newReadServiceFactory()(implicit + loggingContext: LoggingContext + ): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 81b338263576..635012b82a29 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck import java.time.Instant import java.util.concurrent.TimeUnit - import akka.stream.scaladsl.Sink import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -17,6 +16,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString import org.scalatest.matchers.should.Matchers @@ -71,6 +71,8 @@ final class LogAppendingReadServiceFactorySpec } object LogAppendingReadServiceFactorySpec { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private def createFactory() = { val offsetBuilder = new KVOffsetBuilder(0) val metrics = new Metrics(new MetricRegistry) diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala index a01d85356053..b15d04380fa3 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala @@ -17,7 +17,7 @@ trait CommitStrategySupport[LogResult] { submissionInfo: SubmissionInfo )(implicit materializer: Materializer, loggingContext: LoggingContext): Future[WriteSet] - def newReadServiceFactory(): ReplayingReadServiceFactory + def newReadServiceFactory()(implicit loggingContext: LoggingContext): ReplayingReadServiceFactory def writeSetComparison: WriteSetComparison }