From 4987d1229d05203cbb2aefabf77927de0ae1a2fe Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 15:26:20 +0200 Subject: [PATCH 1/7] Support contextual logging in KeyValueConsumption CHANGELOG_BEGIN CHANGELOG_END --- ledger/indexer-benchmark/BUILD.bazel | 1 + .../app/scala/ledger/indexerbenchmark/Main.scala | 16 +++++++++------- .../state/kvutils/KeyValueConsumption.scala | 12 +++++++----- .../kvutils/api/KeyValueParticipantState.scala | 8 +++++--- .../api/KeyValueParticipantStateReader.scala | 3 ++- .../state/kvutils/KeyValueConsumptionSpec.scala | 3 +++ .../LogAppendingCommitStrategySupport.scala | 4 +++- .../LogAppendingReadServiceFactory.scala | 4 +++- .../RawPreExecutingCommitStrategySupport.scala | 4 +++- .../LogAppendingReadServiceFactorySpec.scala | 4 +++- .../integritycheck/CommitStrategySupport.scala | 2 +- 11 files changed, 40 insertions(+), 21 deletions(-) diff --git a/ledger/indexer-benchmark/BUILD.bazel b/ledger/indexer-benchmark/BUILD.bazel index fffc238840fd..2bf889648f40 100644 --- a/ledger/indexer-benchmark/BUILD.bazel +++ b/ledger/indexer-benchmark/BUILD.bazel @@ -66,6 +66,7 @@ da_scala_library( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//libs-scala/concurrent", + "//libs-scala/contextualized-logging", "@maven//:io_dropwizard_metrics_metrics_core", ], ) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 1f531b9be008..f4488ab4c655 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -5,7 +5,6 @@ package com.daml.ledger.indexerbenchmark import java.nio.file.{Files, Paths} import java.util.concurrent.atomic.AtomicLong - import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Materializer @@ -23,6 +22,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import scala.concurrent.Future @@ -72,12 +72,14 @@ object Main { val metricRegistry = new MetricRegistry val metrics = new Metrics(metricRegistry) - val keyValueStateReader = KeyValueParticipantStateReader( - keyValueSource, - metrics, - failOnUnexpectedEvent = false, - enableSelfServiceErrorCodes = false, - ) + val keyValueStateReader = newLoggingContext { implicit loggingContext => + KeyValueParticipantStateReader( + keyValueSource, + metrics, + failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, + ) + } // Note: this method is doing quite a lot of work to transform a sequence of write sets // to a sequence of state updates. diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index e9bdbd3850df..f7d06d65de16 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -26,10 +26,10 @@ import com.daml.lf.data.Ref import com.daml.lf.data.Ref.LedgerString import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.CommittedTransaction +import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.google.common.io.BaseEncoding import com.google.protobuf.ByteString import com.google.rpc.status.Status -import org.slf4j.LoggerFactory import scala.jdk.CollectionConverters._ @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._ * key-value based ledger. */ object KeyValueConsumption { - private val logger = LoggerFactory.getLogger(this.getClass) + private val logger = ContextualizedLogger.get(this.getClass) def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes) @@ -59,6 +59,8 @@ object KeyValueConsumption { entry: DamlLogEntry, errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, + )(implicit + loggingContext: LoggingContext ): List[Update] = { val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => parseTimestamp(entry.getRecordTime) @@ -260,7 +262,7 @@ object KeyValueConsumption { entryId: DamlLogEntryId, txEntry: DamlTransactionEntry, recordTime: Timestamp, - ): Update.TransactionAccepted = { + )(implicit loggingContext: LoggingContext): Update.TransactionAccepted = { val transaction = Conversions.decodeTransaction(txEntry.getTransaction) val hexTxId = parseLedgerString("TransactionId")( BaseEncoding.base16.encode(entryId.toByteArray) @@ -311,7 +313,7 @@ object KeyValueConsumption { private def validateDivulgedContracts( hexTxId: LedgerString, damlTransactionBlindingInfo: DamlTransactionBlindingInfo, - ) = + )(implicit loggingContext: LoggingContext): List[DivulgedContract] = if (!damlTransactionBlindingInfo.getDivulgencesList.isEmpty) { Conversions.extractDivulgedContracts(damlTransactionBlindingInfo) match { case Right(divulgedContractsIndex) => @@ -339,7 +341,7 @@ object KeyValueConsumption { recordTime: Timestamp, outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry, errorVersionSwitch: ValueSwitch[Status], - ): Option[Update] = { + )(implicit loggingContext: LoggingContext): Option[Update] = { val timeBounds = parseTimeBounds(outOfTimeBoundsEntry) val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _) val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index b26c596a12f0..359cf12f4b0f 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -3,8 +3,6 @@ package com.daml.ledger.participant.state.kvutils.api -import java.util.concurrent.CompletionStage - import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf @@ -22,9 +20,12 @@ import com.daml.ledger.participant.state.v2.{ } import com.daml.lf.data.{Ref, Time} import com.daml.lf.transaction.SubmittedTransaction +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.telemetry.TelemetryContext +import java.util.concurrent.CompletionStage + /** Implements read and write operations required for running a participant server. * * Adapts [[LedgerReader]] and [[LedgerWriter]] interfaces to [[com.daml.ledger.participant.state.v2.ReadService]] and @@ -41,7 +42,8 @@ class KeyValueParticipantState( writer: LedgerWriter, metrics: Metrics, enableSelfServiceErrorCodes: Boolean, -) extends ReadService +)(implicit loggingContext: LoggingContext) + extends ReadService with WriteService { private val readerAdapter = KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index f659a765173b..bb5646750a67 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2._ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} import com.google.rpc.status.Status @@ -103,7 +104,7 @@ object KeyValueParticipantStateReader { enableSelfServiceErrorCodes: Boolean, timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault, failOnUnexpectedEvent: Boolean = true, - ): KeyValueParticipantStateReader = + )(implicit loggingContext: LoggingContext): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, metrics, diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 12fb5fb7aecf..7092b0823ea1 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -28,6 +28,7 @@ import com.daml.ledger.participant.state.v2.Update import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.google.protobuf.any.{Any => AnyProto} import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code @@ -63,6 +64,8 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { v2ErrorSwitch, ) + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "logEntryToUpdate" should { "throw in case no record time is available from the log entry or input argument" in { forAll( diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala index e6ce9b834a4e..88d32145ac47 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala @@ -71,7 +71,9 @@ final class LogAppendingCommitStrategySupport( } } - override def newReadServiceFactory(): ReplayingReadServiceFactory = + override def newReadServiceFactory()(implicit + loggingContext: LoggingContext + ): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 2fafb89967dd..df4d02f4f988 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -17,6 +17,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet} import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import scala.collection.immutable @@ -25,7 +26,8 @@ import scala.collection.mutable.ListBuffer final class LogAppendingReadServiceFactory( offsetBuilder: KVOffsetBuilder, metrics: Metrics, -) extends ReplayingReadServiceFactory { +)(implicit val loggingContext: LoggingContext) + extends ReplayingReadServiceFactory { private val recordedBlocks = ListBuffer.empty[LedgerRecord] override def appendBlock(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala index 5a2d7c83baa2..0426c65ccee0 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala @@ -86,7 +86,9 @@ final class RawPreExecutingCommitStrategySupport( .map(_ => access.getWriteSet) } - override def newReadServiceFactory(): ReplayingReadServiceFactory = + override def newReadServiceFactory()(implicit + loggingContext: LoggingContext + ): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 81b338263576..635012b82a29 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck import java.time.Instant import java.util.concurrent.TimeUnit - import akka.stream.scaladsl.Sink import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -17,6 +16,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString import org.scalatest.matchers.should.Matchers @@ -71,6 +71,8 @@ final class LogAppendingReadServiceFactorySpec } object LogAppendingReadServiceFactorySpec { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private def createFactory() = { val offsetBuilder = new KVOffsetBuilder(0) val metrics = new Metrics(new MetricRegistry) diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala index a01d85356053..b15d04380fa3 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala @@ -17,7 +17,7 @@ trait CommitStrategySupport[LogResult] { submissionInfo: SubmissionInfo )(implicit materializer: Materializer, loggingContext: LoggingContext): Future[WriteSet] - def newReadServiceFactory(): ReplayingReadServiceFactory + def newReadServiceFactory()(implicit loggingContext: LoggingContext): ReplayingReadServiceFactory def writeSetComparison: WriteSetComparison } From cfa6a7e7b3e43bc6810b4d12993c25258cd49f7b Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 16:43:00 +0200 Subject: [PATCH 2/7] Fix formatting and imports --- .../src/app/scala/ledger/indexerbenchmark/Main.scala | 1 + .../participant/state/kvutils/KeyValueConsumption.scala | 4 +--- .../state/kvutils/api/KeyValueParticipantState.scala | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index f4488ab4c655..1356543829be 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -5,6 +5,7 @@ package com.daml.ledger.indexerbenchmark import java.nio.file.{Files, Paths} import java.util.concurrent.atomic.AtomicLong + import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Materializer diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index f7d06d65de16..aa2e6fcb376a 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -59,9 +59,7 @@ object KeyValueConsumption { entry: DamlLogEntry, errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, - )(implicit - loggingContext: LoggingContext - ): List[Update] = { + )(implicit loggingContext: LoggingContext): List[Update] = { val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => parseTimestamp(entry.getRecordTime) } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index 359cf12f4b0f..f86832e6d412 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -3,6 +3,8 @@ package com.daml.ledger.participant.state.kvutils.api +import java.util.concurrent.CompletionStage + import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf @@ -24,8 +26,6 @@ import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.telemetry.TelemetryContext -import java.util.concurrent.CompletionStage - /** Implements read and write operations required for running a participant server. * * Adapts [[LedgerReader]] and [[LedgerWriter]] interfaces to [[com.daml.ledger.participant.state.v2.ReadService]] and From 43cf4db9e9d8685e1b157dfe087bb729bd249769 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 17:19:09 +0200 Subject: [PATCH 3/7] Propagate the contextual logging per-readUpdates call --- .../scala/ledger/indexerbenchmark/Main.scala | 38 +++++----- .../indexerbenchmark/IndexerBenchmark.scala | 7 +- .../indexer/ha/EndlessReadService.scala | 4 +- ledger/participant-state-metrics/BUILD.bazel | 1 + .../state/v2/metrics/TimedReadService.scala | 5 +- ledger/participant-state/BUILD.bazel | 1 + .../state/kvutils/KeyValueConsumption.scala | 4 +- .../api/KeyValueParticipantState.scala | 7 +- .../api/KeyValueParticipantStateReader.scala | 10 +-- .../participant/state/kvutils/KVTest.scala | 10 ++- .../ParticipantStateIntegrationSpecBase.scala | 1 + .../kvutils/KVUtilsTransactionSpec.scala | 22 ++++-- .../kvutils/KeyValueConsumptionSpec.scala | 8 +-- .../KeyValueParticipantStateReaderSpec.scala | 72 +++++++++++-------- .../kvutils/tools/BUILD.bazel | 1 + .../LogAppendingReadServiceFactory.scala | 4 +- .../LogAppendingReadServiceFactorySpec.scala | 2 + .../integritycheck/IntegrityChecker.scala | 2 +- .../StateUpdateComparison.scala | 5 +- .../integritycheck/StateUpdateExporter.scala | 8 ++- .../integritycheck/IntegrityCheckerSpec.scala | 4 ++ .../StateUpdateExporterSpec.scala | 10 ++- .../state/v2/AdaptedV1ReadService.scala | 5 +- .../participant/state/v2/ReadService.scala | 5 +- .../sandbox/ReadWriteServiceBridge.scala | 4 +- 25 files changed, 158 insertions(+), 82 deletions(-) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 1356543829be..338e07c0c3cf 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -73,14 +73,12 @@ object Main { val metricRegistry = new MetricRegistry val metrics = new Metrics(metricRegistry) - val keyValueStateReader = newLoggingContext { implicit loggingContext => - KeyValueParticipantStateReader( - keyValueSource, - metrics, - failOnUnexpectedEvent = false, - enableSelfServiceErrorCodes = false, - ) - } + val keyValueStateReader = KeyValueParticipantStateReader( + keyValueSource, + metrics, + failOnUnexpectedEvent = false, + enableSelfServiceErrorCodes = false, + ) // Note: this method is doing quite a lot of work to transform a sequence of write sets // to a sequence of state updates. @@ -89,16 +87,18 @@ object Main { // the benchmark. val system = ActorSystem("IndexerBenchmarkUpdateReader") implicit val materializer: Materializer = Materializer(system) - keyValueStateReader - .stateUpdates(None) - .take(config.updateCount.getOrElse(Long.MaxValue)) - .zipWithIndex - .map { case (data, index) => - if (index % 1000 == 0) println(s"Generated update $index") - data - } - .runWith(Sink.seq[(Offset, Update)]) - .map(seq => seq.iterator)(DirectExecutionContext) - .andThen { case _ => system.terminate() }(DirectExecutionContext) + newLoggingContext { implicit loggingContext => + keyValueStateReader + .stateUpdates(None) + .take(config.updateCount.getOrElse(Long.MaxValue)) + .zipWithIndex + .map { case (data, index) => + if (index % 1000 == 0) println(s"Generated update $index") + data + } + .runWith(Sink.seq[(Offset, Update)]) + .map(seq => seq.iterator)(DirectExecutionContext) + .andThen { case _ => system.terminate() }(DirectExecutionContext) + } } } diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala index 86be20c3db23..cd3dcfe34572 100644 --- a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -17,6 +17,7 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.{ReadService, Update} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.lf.data.Time +import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.{JvmMetricSet, Metrics} import com.daml.platform.configuration.ServerRole @@ -198,15 +199,17 @@ class IndexerBenchmark() { ) new ReadService { - override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = { Source.single(initialConditions) } - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") Source.fromIterator(() => updates) } + override def currentHealth(): HealthStatus = Healthy } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala index 829f9772480c..cb1ed76a1d80 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala @@ -61,7 +61,9 @@ case class EndlessReadService( * * The last two items above repeat indefinitely */ - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = synchronized { logger.info(s"EndlessReadService.stateUpdates($beginAfter) called") stateUpdatesCalls.incrementAndGet() diff --git a/ledger/participant-state-metrics/BUILD.bazel b/ledger/participant-state-metrics/BUILD.bazel index 3219e64bb84a..c28953b408b9 100644 --- a/ledger/participant-state-metrics/BUILD.bazel +++ b/ledger/participant-state-metrics/BUILD.bazel @@ -28,6 +28,7 @@ da_scala_library( "//ledger/ledger-offset", "//ledger/metrics", "//ledger/participant-state", + "//libs-scala/contextualized-logging", "@maven//:com_google_protobuf_protobuf_java", "@maven//:io_dropwizard_metrics_metrics_core", ], diff --git a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala index f0704cc4f760..663a4f4b92dc 100644 --- a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala +++ b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala @@ -9,6 +9,7 @@ import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.{ReadService, Update} +import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} final class TimedReadService(delegate: ReadService, metrics: Metrics) extends ReadService { @@ -19,7 +20,9 @@ final class TimedReadService(delegate: ReadService, metrics: Metrics) extends Re delegate.ledgerInitialConditions(), ) - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = Timed.source(metrics.daml.services.read.stateUpdates, delegate.stateUpdates(beginAfter)) override def currentHealth(): HealthStatus = diff --git a/ledger/participant-state/BUILD.bazel b/ledger/participant-state/BUILD.bazel index b637ec04acb2..0034df0a5ad1 100644 --- a/ledger/participant-state/BUILD.bazel +++ b/ledger/participant-state/BUILD.bazel @@ -36,6 +36,7 @@ da_scala_library( "//ledger/ledger-grpc", "//ledger/ledger-offset", "//ledger/metrics", + "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", "//libs-scala/logging-entries", "@maven//:com_google_api_grpc_proto_google_common_protos", diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index aa2e6fcb376a..26749f04d1ba 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -59,7 +59,9 @@ object KeyValueConsumption { entry: DamlLogEntry, errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, - )(implicit loggingContext: LoggingContext): List[Update] = { + )(loggingContext: LoggingContext): List[Update] = { + implicit val logContext: LoggingContext = loggingContext + val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => parseTimestamp(entry.getRecordTime) } diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index f86832e6d412..487311547652 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -42,8 +42,7 @@ class KeyValueParticipantState( writer: LedgerWriter, metrics: Metrics, enableSelfServiceErrorCodes: Boolean, -)(implicit loggingContext: LoggingContext) - extends ReadService +) extends ReadService with WriteService { private val readerAdapter = KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes) @@ -58,7 +57,9 @@ class KeyValueParticipantState( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = readerAdapter.ledgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = readerAdapter.stateUpdates(beginAfter) override def submitTransaction( diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index bb5646750a67..485925fe4644 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -36,7 +36,7 @@ class KeyValueParticipantStateReader private[api] ( DamlLogEntry, ValueSwitch[Status], Option[Timestamp], - ) => List[Update], + ) => LoggingContext => List[Update], timeUpdatesProvider: TimeUpdatesProvider, failOnUnexpectedEvent: Boolean, ) extends ReadService { @@ -48,7 +48,9 @@ class KeyValueParticipantStateReader private[api] ( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.single(createLedgerInitialConditions()) - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { Source .single(beginAfter.map(offset => KVOffset(offset).zeroLowest.offset)) .flatMapConcat(reader.events) @@ -66,7 +68,7 @@ class KeyValueParticipantStateReader private[api] ( logEntry, errorVersionSwitch, timeUpdatesProvider(), - ) + )(loggingContext) val updatesWithOffsets = Source(updates).zipWithIndex.map { case (update, index) => offsetForUpdate(offset, index.toInt, updates.size) -> update @@ -104,7 +106,7 @@ object KeyValueParticipantStateReader { enableSelfServiceErrorCodes: Boolean, timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault, failOnUnexpectedEvent: Boolean = true, - )(implicit loggingContext: LoggingContext): KeyValueParticipantStateReader = + ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( reader, metrics, diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala index 294c3b455452..b6c384206fe1 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala @@ -385,7 +385,11 @@ object KVTest { newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission) ) // Verify that we can always process the log entry. - val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + val _ = KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) entryId -> logEntry } @@ -417,13 +421,13 @@ object KVTest { successfulLogEntry, errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, - ) + )(loggingContext) KeyValueConsumption.logEntryToUpdate( entryId, outOfTimeBoundsLogEntry, errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, - ) + )(loggingContext) entryId -> preExecutionResult } diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index 29b64b9ff0d7..0741fdfc5ab1 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -55,6 +55,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i private implicit val resourceContext: ResourceContext = ResourceContext(testExecutionContext) private implicit val telemetryContext: TelemetryContext = NoOpTelemetryContext + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting // Can be used by [[participantStateFactory]] to get a stable ID throughout the test. // For example, for initializing a database. diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala index 78d88dd3ff16..5cf8273da6cf 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala @@ -99,7 +99,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) ) transaction2 <- runSimpleCommand( @@ -225,7 +229,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { preExecutionResult.successfulLogEntry, errorVersionSwitch, Some(recordTime), - ) + )(loggingContext) ) } @@ -308,7 +312,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) ) transaction2 <- runSimpleCommand( @@ -525,7 +533,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { // Process into updates and verify val updates = - KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + strippedEntry.build, + errorVersionSwitch, + )(loggingContext) inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) => txAccepted.optCompletionInfo should be(None) } @@ -581,7 +593,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { preExecutionResult.successfulLogEntry, errorVersionSwitch, Some(recordTime), - ) + )(loggingContext) ) private def prepareExerciseReplaceByKey( diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 7092b0823ea1..5dc8eeb9d0e8 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -77,7 +77,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithoutRecordTime, errorSwitch, recordTimeForUpdate = None, - ) + )(loggingContext) ) } } @@ -91,7 +91,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithRecordTime, errorSwitch, recordTimeForUpdate = Some(aRecordTime), - ) + )(loggingContext) actual.recordTime shouldBe aRecordTimeFromLogEntry } @@ -107,7 +107,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithRecordTime, errorSwitch, recordTimeForUpdate = None, - ) + )(loggingContext) actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100)) } @@ -126,7 +126,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { timeUpdateEntry, errorSwitch, recordTimeForUpdate = None, - ) shouldBe Nil + )(loggingContext) shouldBe Nil } } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala index ce025ba34894..2c6aa3eee4f4 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala @@ -22,6 +22,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString import com.google.rpc.status.Status @@ -37,6 +38,7 @@ class KeyValueParticipantStateReaderSpec with Matchers with AkkaBeforeAndAfterAll { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting private val offsetBuilder = new KVOffsetBuilder(0) "participant state reader" should { @@ -243,37 +245,51 @@ object KeyValueParticipantStateReaderSpec { private val aWrappedLogEntry = Envelope.enclose(aLogEntry) - private val zeroUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = - (_, _, _, _) => List.empty - - private val singleUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + private val zeroUpdateGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = + (_, _, _, _) => _ => List.empty + + private val singleUpdateGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = (_, _, _, _) => - List( - Update.PartyAddedToParticipant( - Ref.Party.assertFromString("aParty"), - "a party", - Ref.ParticipantId.assertFromString("aParticipant"), - Timestamp.now(), - submissionId = None, + _ => + List( + Update.PartyAddedToParticipant( + Ref.Party.assertFromString("aParty"), + "a party", + Ref.ParticipantId.assertFromString("aParticipant"), + Timestamp.now(), + submissionId = None, + ) ) - ) - private val twoUpdatesGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + private val twoUpdatesGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = (entryId, entry, errorVersionSwitch, recordTime) => - singleUpdateGenerator( - entryId, - entry, - errorVersionSwitch, - recordTime, - ) ::: singleUpdateGenerator( - entryId, - entry, - errorVersionSwitch, - recordTime, - ) + loggingContext => + singleUpdateGenerator( + entryId, + entry, + errorVersionSwitch, + recordTime, + )(loggingContext) ::: singleUpdateGenerator( + entryId, + entry, + errorVersionSwitch, + recordTime, + )(loggingContext) private def aLogEntryId(index: Int): Raw.LogEntryId = Raw.LogEntryId( @@ -296,7 +312,7 @@ object KeyValueParticipantStateReaderSpec { DamlLogEntry, ValueSwitch[Status], Option[Timestamp], - ) => List[Update] = singleUpdateGenerator, + ) => LoggingContext => List[Update] = singleUpdateGenerator, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( diff --git a/ledger/participant-state/kvutils/tools/BUILD.bazel b/ledger/participant-state/kvutils/tools/BUILD.bazel index fa0fde648629..62c7d9b77cc7 100644 --- a/ledger/participant-state/kvutils/tools/BUILD.bazel +++ b/ledger/participant-state/kvutils/tools/BUILD.bazel @@ -81,6 +81,7 @@ da_scala_test( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//ledger/participant-state/kvutils/tools", + "//libs-scala/contextualized-logging", "@maven//:org_mockito_mockito_core", ] + (["@maven//:com_google_protobuf_protobuf_java"] if scala_major_version == "2.13" else []), ) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index df4d02f4f988..a6776b818e09 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -75,7 +75,9 @@ final class LogAppendingReadServiceFactory( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = implementation.ledgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = implementation.stateUpdates(beginAfter) override def currentHealth(): HealthStatus = implementation.currentHealth() diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 635012b82a29..88306e7e6bfa 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -29,6 +29,8 @@ final class LogAppendingReadServiceFactorySpec with Matchers with AkkaBeforeAndAfterAll { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "LogAppendingReadServiceFactory" should { "handle empty blocks" in { val factory = createFactory() diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala index 780bbec8f210..00a486e3e914 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala @@ -127,7 +127,7 @@ class IntegrityChecker[LogResult]( private[integritycheck] def compareStateUpdates( config: Config, stateUpdates: StateUpdateComparison, - ): Future[Unit] = + )(implicit loggingContext: LoggingContext): Future[Unit] = if (!config.indexOnly) stateUpdates.compare() else diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala index 2cea06946837..116d12e309eb 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala @@ -9,11 +9,12 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.tools.integritycheck.IntegrityChecker.ComparisonFailureException import com.daml.ledger.participant.state.kvutils.tools.integritycheck.UpdateNormalizer.normalize import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} trait StateUpdateComparison { - def compare(): Future[Unit] + def compare()(implicit loggingContext: LoggingContext): Future[Unit] } final class ReadServiceStateUpdateComparison( @@ -29,7 +30,7 @@ final class ReadServiceStateUpdateComparison( import ReadServiceStateUpdateComparison._ - def compare(): Future[Unit] = { + def compare()(implicit loggingContext: LoggingContext): Future[Unit] = { println("Comparing expected and actual state updates.".white) if (expectedReadService.updateCount() != actualReadService.updateCount()) { Future.failed( diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala index a6bff5a82bec..f67ba04d96c7 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala @@ -9,6 +9,7 @@ import java.nio.file.Path import akka.stream.Materializer import com.daml.ledger.participant.state.kvutils.tools.integritycheck.UpdateNormalizer.normalize import com.daml.ledger.participant.state.v2.ReadService +import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} @@ -19,7 +20,11 @@ object StateUpdateExporter { actualReadService: ReplayingReadService, outputWriterFactory: Path => PrintWriter, config: Config, - )(implicit executionContext: ExecutionContext, materializer: Materializer): Future[Unit] = { + )(implicit + executionContext: ExecutionContext, + materializer: Materializer, + loggingContext: LoggingContext, + ): Future[Unit] = { for { _ <- config.expectedUpdatesPath.fold(Future.unit)(path => StateUpdateExporter.write( @@ -45,6 +50,7 @@ object StateUpdateExporter { )(implicit materializer: Materializer, executionContext: ExecutionContext, + loggingContext: LoggingContext, ): Future[Unit] = { readService .stateUpdates(None) diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala index 936582b1743e..83358f261ec5 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala @@ -11,12 +11,16 @@ import org.scalatest.wordspec.AsyncWordSpec import scala.concurrent.Future +import com.daml.logging.LoggingContext + final class IntegrityCheckerSpec extends AsyncWordSpec with Matchers with MockitoSugar with ArgumentMatchersSugar { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "compareStateUpdates" should { "call compare if not in index-only mode" in { val mockStateUpdates = mock[StateUpdateComparison] diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala index 9669a0952d69..4e1eb8d924fd 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala @@ -16,6 +16,7 @@ import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Time +import com.daml.logging.LoggingContext import org.mockito.ArgumentMatchers.anyString import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.BeforeAndAfterAll @@ -33,8 +34,9 @@ class StateUpdateExporterSpec with BeforeAndAfterAll { import StateUpdateExporterSpec._ - private[this] implicit val materializer: Materializer = Materializer(system) - private[this] implicit val ec: ExecutionContext = materializer.executionContext + private implicit val materializer: Materializer = Materializer(system) + private implicit val ec: ExecutionContext = materializer.executionContext + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) @@ -88,7 +90,9 @@ object StateUpdateExporterSpec extends MockitoSugar { override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.empty - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = Source.single( ( Offset.beforeBegin, diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala index d9648d6422c5..950b45308243 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2.AdaptedV1ReadService._ import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.RejectionReasonTemplate import com.daml.lf.data.Ref +import com.daml.logging.LoggingContext /** Adapts a [[com.daml.ledger.participant.state.v1.ReadService]] implementation to the * [[com.daml.ledger.participant.state.v2.ReadService]] API. @@ -25,7 +26,9 @@ class AdaptedV1ReadService(delegate: v1.ReadService) extends ReadService { override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = delegate.getLedgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = delegate .stateUpdates(beginAfter) .map { case (offset, update) => Offset(offset.bytes) -> adaptUpdate(update) } diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala index d9afdd8f99e1..8b7cec7511e9 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl.Source import com.daml.ledger.api.health.ReportsHealth import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset +import com.daml.logging.LoggingContext /** An interface for reading the state of a ledger participant. * '''Please note that this interface is unstable and may significantly change.''' @@ -135,5 +136,7 @@ trait ReadService extends ReportsHealth { * * Note further that the offsets of the transactions might not agree, as these offsets are participant-local. */ - def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] + def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] } diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala index c74e57a43f65..d4bf4db3b0ab 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala @@ -130,7 +130,9 @@ case class ReadWriteServiceBridge( ) var stateUpdatesWasCalledAlready = false - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { // TODO for PoC purposes: // This method may only be called once, either with `beginAfter` set or unset. // A second call will result in an error unless the server is restarted. From 4b04ad490f1bd53edc316a032df868209df96e81 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 17:27:06 +0200 Subject: [PATCH 4/7] No need for at-construction logging context in LogAppendingReadServiceFactory --- .../tools/integritycheck/LogAppendingReadServiceFactory.scala | 3 +-- .../integritycheck/LogAppendingReadServiceFactorySpec.scala | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index a6776b818e09..09b5a241b78a 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -26,8 +26,7 @@ import scala.collection.mutable.ListBuffer final class LogAppendingReadServiceFactory( offsetBuilder: KVOffsetBuilder, metrics: Metrics, -)(implicit val loggingContext: LoggingContext) - extends ReplayingReadServiceFactory { +) extends ReplayingReadServiceFactory { private val recordedBlocks = ListBuffer.empty[LedgerRecord] override def appendBlock(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 88306e7e6bfa..a47e2a4787b6 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -73,7 +73,6 @@ final class LogAppendingReadServiceFactorySpec } object LogAppendingReadServiceFactorySpec { - private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting private def createFactory() = { val offsetBuilder = new KVOffsetBuilder(0) From 6cac08e0a13bb1ad39acbc64ffd57a263bb896e5 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 17:29:10 +0200 Subject: [PATCH 5/7] No need for logging context in CommitStrategySupport.newReadServiceFactory --- .../integritycheck/LogAppendingCommitStrategySupport.scala | 4 +--- .../integritycheck/RawPreExecutingCommitStrategySupport.scala | 4 +--- .../kvutils/tools/integritycheck/CommitStrategySupport.scala | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala index 88d32145ac47..e6ce9b834a4e 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingCommitStrategySupport.scala @@ -71,9 +71,7 @@ final class LogAppendingCommitStrategySupport( } } - override def newReadServiceFactory()(implicit - loggingContext: LoggingContext - ): ReplayingReadServiceFactory = + override def newReadServiceFactory(): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala index 0426c65ccee0..5a2d7c83baa2 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/RawPreExecutingCommitStrategySupport.scala @@ -86,9 +86,7 @@ final class RawPreExecutingCommitStrategySupport( .map(_ => access.getWriteSet) } - override def newReadServiceFactory()(implicit - loggingContext: LoggingContext - ): ReplayingReadServiceFactory = + override def newReadServiceFactory(): ReplayingReadServiceFactory = new LogAppendingReadServiceFactory(offsetBuilder, metrics) override val writeSetComparison: WriteSetComparison = diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala index b15d04380fa3..a01d85356053 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/CommitStrategySupport.scala @@ -17,7 +17,7 @@ trait CommitStrategySupport[LogResult] { submissionInfo: SubmissionInfo )(implicit materializer: Materializer, loggingContext: LoggingContext): Future[WriteSet] - def newReadServiceFactory()(implicit loggingContext: LoggingContext): ReplayingReadServiceFactory + def newReadServiceFactory(): ReplayingReadServiceFactory def writeSetComparison: WriteSetComparison } From 0887be6db44cc32f14df5d45fa811cdef236752f Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 18:31:07 +0200 Subject: [PATCH 6/7] Fix JdbcIndexerSpec --- .../src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala index 80b8ee6ac68b..fa4a80504108 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala @@ -64,7 +64,7 @@ final class JdbcIndexerSpec ) ) - when(readService.stateUpdates(any[Option[Offset]])) + when(readService.stateUpdates(any[Option[Offset]])(any[LoggingContext])) .thenReturn(Source.fromIterator(() => updates.iterator)) readService From c3fbfef83e1fa4424d3b5713d292713c61350381 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Tue, 19 Oct 2021 19:05:52 +0200 Subject: [PATCH 7/7] Fix RecoveringIndexerIntegrationSpec --- .../platform/indexer/RecoveringIndexerIntegrationSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index 51eb91a48ac9..648a3dd57f62 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -306,7 +306,10 @@ object RecoveringIndexerIntegrationSpec { Source.single(value) } } - }).when(failingParticipantState).stateUpdates(ArgumentMatchers.any[Option[Offset]]()) + }).when(failingParticipantState) + .stateUpdates( + ArgumentMatchers.any[Option[Offset]]() + )(ArgumentMatchers.any[LoggingContext]) failingParticipantState }