diff --git a/ledger/indexer-benchmark/BUILD.bazel b/ledger/indexer-benchmark/BUILD.bazel index fffc238840fd..2bf889648f40 100644 --- a/ledger/indexer-benchmark/BUILD.bazel +++ b/ledger/indexer-benchmark/BUILD.bazel @@ -66,6 +66,7 @@ da_scala_library( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//libs-scala/concurrent", + "//libs-scala/contextualized-logging", "@maven//:io_dropwizard_metrics_metrics_core", ], ) diff --git a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala index 1f531b9be008..338e07c0c3cf 100644 --- a/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala +++ b/ledger/indexer-benchmark/src/app/scala/ledger/indexerbenchmark/Main.scala @@ -23,6 +23,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.Metrics import scala.concurrent.Future @@ -86,16 +87,18 @@ object Main { // the benchmark. val system = ActorSystem("IndexerBenchmarkUpdateReader") implicit val materializer: Materializer = Materializer(system) - keyValueStateReader - .stateUpdates(None) - .take(config.updateCount.getOrElse(Long.MaxValue)) - .zipWithIndex - .map { case (data, index) => - if (index % 1000 == 0) println(s"Generated update $index") - data - } - .runWith(Sink.seq[(Offset, Update)]) - .map(seq => seq.iterator)(DirectExecutionContext) - .andThen { case _ => system.terminate() }(DirectExecutionContext) + newLoggingContext { implicit loggingContext => + keyValueStateReader + .stateUpdates(None) + .take(config.updateCount.getOrElse(Long.MaxValue)) + .zipWithIndex + .map { case (data, index) => + if (index % 1000 == 0) println(s"Generated update $index") + data + } + .runWith(Sink.seq[(Offset, Update)]) + .map(seq => seq.iterator)(DirectExecutionContext) + .andThen { case _ => system.terminate() }(DirectExecutionContext) + } } } diff --git a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala index 86be20c3db23..cd3dcfe34572 100644 --- a/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala +++ b/ledger/indexer-benchmark/src/main/scala/ledger/indexerbenchmark/IndexerBenchmark.scala @@ -17,6 +17,7 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.{ReadService, Update} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.lf.data.Time +import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext.newLoggingContext import com.daml.metrics.{JvmMetricSet, Metrics} import com.daml.platform.configuration.ServerRole @@ -198,15 +199,17 @@ class IndexerBenchmark() { ) new ReadService { - override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = { Source.single(initialConditions) } - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { assert(beginAfter.isEmpty, s"beginAfter is $beginAfter") Source.fromIterator(() => updates) } + override def currentHealth(): HealthStatus = Healthy } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala index 829f9772480c..cb1ed76a1d80 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/indexer/ha/EndlessReadService.scala @@ -61,7 +61,9 @@ case class EndlessReadService( * * The last two items above repeat indefinitely */ - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = synchronized { logger.info(s"EndlessReadService.stateUpdates($beginAfter) called") stateUpdatesCalls.incrementAndGet() diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala index 80b8ee6ac68b..fa4a80504108 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/indexer/JdbcIndexerSpec.scala @@ -64,7 +64,7 @@ final class JdbcIndexerSpec ) ) - when(readService.stateUpdates(any[Option[Offset]])) + when(readService.stateUpdates(any[Option[Offset]])(any[LoggingContext])) .thenReturn(Source.fromIterator(() => updates.iterator)) readService diff --git a/ledger/participant-state-metrics/BUILD.bazel b/ledger/participant-state-metrics/BUILD.bazel index 3219e64bb84a..c28953b408b9 100644 --- a/ledger/participant-state-metrics/BUILD.bazel +++ b/ledger/participant-state-metrics/BUILD.bazel @@ -28,6 +28,7 @@ da_scala_library( "//ledger/ledger-offset", "//ledger/metrics", "//ledger/participant-state", + "//libs-scala/contextualized-logging", "@maven//:com_google_protobuf_protobuf_java", "@maven//:io_dropwizard_metrics_metrics_core", ], diff --git a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala index f0704cc4f760..663a4f4b92dc 100644 --- a/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala +++ b/ledger/participant-state-metrics/src/main/scala/com/daml/ledger/participant/state/v2/metrics/TimedReadService.scala @@ -9,6 +9,7 @@ import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.{ReadService, Update} +import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} final class TimedReadService(delegate: ReadService, metrics: Metrics) extends ReadService { @@ -19,7 +20,9 @@ final class TimedReadService(delegate: ReadService, metrics: Metrics) extends Re delegate.ledgerInitialConditions(), ) - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = Timed.source(metrics.daml.services.read.stateUpdates, delegate.stateUpdates(beginAfter)) override def currentHealth(): HealthStatus = diff --git a/ledger/participant-state/BUILD.bazel b/ledger/participant-state/BUILD.bazel index b637ec04acb2..0034df0a5ad1 100644 --- a/ledger/participant-state/BUILD.bazel +++ b/ledger/participant-state/BUILD.bazel @@ -36,6 +36,7 @@ da_scala_library( "//ledger/ledger-grpc", "//ledger/ledger-offset", "//ledger/metrics", + "//libs-scala/contextualized-logging", "//libs-scala/grpc-utils", "//libs-scala/logging-entries", "@maven//:com_google_api_grpc_proto_google_common_protos", diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala index e9bdbd3850df..26749f04d1ba 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumption.scala @@ -26,10 +26,10 @@ import com.daml.lf.data.Ref import com.daml.lf.data.Ref.LedgerString import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.CommittedTransaction +import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.google.common.io.BaseEncoding import com.google.protobuf.ByteString import com.google.rpc.status.Status -import org.slf4j.LoggerFactory import scala.jdk.CollectionConverters._ @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._ * key-value based ledger. */ object KeyValueConsumption { - private val logger = LoggerFactory.getLogger(this.getClass) + private val logger = ContextualizedLogger.get(this.getClass) def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes) @@ -59,7 +59,9 @@ object KeyValueConsumption { entry: DamlLogEntry, errorVersionSwitch: ValueSwitch[Status], recordTimeForUpdate: Option[Timestamp] = None, - ): List[Update] = { + )(loggingContext: LoggingContext): List[Update] = { + implicit val logContext: LoggingContext = loggingContext + val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true => parseTimestamp(entry.getRecordTime) } @@ -260,7 +262,7 @@ object KeyValueConsumption { entryId: DamlLogEntryId, txEntry: DamlTransactionEntry, recordTime: Timestamp, - ): Update.TransactionAccepted = { + )(implicit loggingContext: LoggingContext): Update.TransactionAccepted = { val transaction = Conversions.decodeTransaction(txEntry.getTransaction) val hexTxId = parseLedgerString("TransactionId")( BaseEncoding.base16.encode(entryId.toByteArray) @@ -311,7 +313,7 @@ object KeyValueConsumption { private def validateDivulgedContracts( hexTxId: LedgerString, damlTransactionBlindingInfo: DamlTransactionBlindingInfo, - ) = + )(implicit loggingContext: LoggingContext): List[DivulgedContract] = if (!damlTransactionBlindingInfo.getDivulgencesList.isEmpty) { Conversions.extractDivulgedContracts(damlTransactionBlindingInfo) match { case Right(divulgedContractsIndex) => @@ -339,7 +341,7 @@ object KeyValueConsumption { recordTime: Timestamp, outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry, errorVersionSwitch: ValueSwitch[Status], - ): Option[Update] = { + )(implicit loggingContext: LoggingContext): Option[Update] = { val timeBounds = parseTimeBounds(outOfTimeBoundsEntry) val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _) val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _) diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala index b26c596a12f0..487311547652 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantState.scala @@ -22,6 +22,7 @@ import com.daml.ledger.participant.state.v2.{ } import com.daml.lf.data.{Ref, Time} import com.daml.lf.transaction.SubmittedTransaction +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.daml.telemetry.TelemetryContext @@ -56,7 +57,9 @@ class KeyValueParticipantState( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = readerAdapter.ledgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = readerAdapter.stateUpdates(beginAfter) override def submitTransaction( diff --git a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala index f659a765173b..485925fe4644 100644 --- a/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala +++ b/ledger/participant-state/kvutils/src/main/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReader.scala @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2._ import com.daml.ledger.validator.preexecution.TimeUpdatesProvider import com.daml.lf.data.Time import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.{Metrics, Timed} import com.google.rpc.status.Status @@ -35,7 +36,7 @@ class KeyValueParticipantStateReader private[api] ( DamlLogEntry, ValueSwitch[Status], Option[Timestamp], - ) => List[Update], + ) => LoggingContext => List[Update], timeUpdatesProvider: TimeUpdatesProvider, failOnUnexpectedEvent: Boolean, ) extends ReadService { @@ -47,7 +48,9 @@ class KeyValueParticipantStateReader private[api] ( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.single(createLedgerInitialConditions()) - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { Source .single(beginAfter.map(offset => KVOffset(offset).zeroLowest.offset)) .flatMapConcat(reader.events) @@ -65,7 +68,7 @@ class KeyValueParticipantStateReader private[api] ( logEntry, errorVersionSwitch, timeUpdatesProvider(), - ) + )(loggingContext) val updatesWithOffsets = Source(updates).zipWithIndex.map { case (update, index) => offsetForUpdate(offset, index.toInt, updates.size) -> update diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala index 294c3b455452..b6c384206fe1 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/KVTest.scala @@ -385,7 +385,11 @@ object KVTest { newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission) ) // Verify that we can always process the log entry. - val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + val _ = KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) entryId -> logEntry } @@ -417,13 +421,13 @@ object KVTest { successfulLogEntry, errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, - ) + )(loggingContext) KeyValueConsumption.logEntryToUpdate( entryId, outOfTimeBoundsLogEntry, errorVersionSwitch, recordTimeFromTimeUpdateLogEntry, - ) + )(loggingContext) entryId -> preExecutionResult } diff --git a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala index 29b64b9ff0d7..0741fdfc5ab1 100644 --- a/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala +++ b/ledger/participant-state/kvutils/src/test/lib/scala/com/daml/ledger/participant/state/kvutils/ParticipantStateIntegrationSpecBase.scala @@ -55,6 +55,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i private implicit val resourceContext: ResourceContext = ResourceContext(testExecutionContext) private implicit val telemetryContext: TelemetryContext = NoOpTelemetryContext + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting // Can be used by [[participantStateFactory]] to get a stable ID throughout the test. // For example, for initializing a database. diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala index 78d88dd3ff16..5cf8273da6cf 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KVUtilsTransactionSpec.scala @@ -99,7 +99,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) ) transaction2 <- runSimpleCommand( @@ -225,7 +229,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { preExecutionResult.successfulLogEntry, errorVersionSwitch, Some(recordTime), - ) + )(loggingContext) ) } @@ -308,7 +312,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { ) (entryId, logEntry) = result contractId = contractIdOfCreateTransaction( - KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + logEntry, + errorVersionSwitch, + )(loggingContext) ) transaction2 <- runSimpleCommand( @@ -525,7 +533,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { // Process into updates and verify val updates = - KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch) + KeyValueConsumption.logEntryToUpdate( + entryId, + strippedEntry.build, + errorVersionSwitch, + )(loggingContext) inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) => txAccepted.optCompletionInfo should be(None) } @@ -581,7 +593,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside { preExecutionResult.successfulLogEntry, errorVersionSwitch, Some(recordTime), - ) + )(loggingContext) ) private def prepareExerciseReplaceByKey( diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala index 12fb5fb7aecf..5dc8eeb9d0e8 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/KeyValueConsumptionSpec.scala @@ -28,6 +28,7 @@ import com.daml.ledger.participant.state.v2.Update import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.google.protobuf.any.{Any => AnyProto} import com.google.protobuf.{ByteString, Empty} import com.google.rpc.code.Code @@ -63,6 +64,8 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { v2ErrorSwitch, ) + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "logEntryToUpdate" should { "throw in case no record time is available from the log entry or input argument" in { forAll( @@ -74,7 +77,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithoutRecordTime, errorSwitch, recordTimeForUpdate = None, - ) + )(loggingContext) ) } } @@ -88,7 +91,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithRecordTime, errorSwitch, recordTimeForUpdate = Some(aRecordTime), - ) + )(loggingContext) actual.recordTime shouldBe aRecordTimeFromLogEntry } @@ -104,7 +107,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { aLogEntryWithRecordTime, errorSwitch, recordTimeForUpdate = None, - ) + )(loggingContext) actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100)) } @@ -123,7 +126,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers { timeUpdateEntry, errorSwitch, recordTimeForUpdate = None, - ) shouldBe Nil + )(loggingContext) shouldBe Nil } } } diff --git a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala index ce025ba34894..2c6aa3eee4f4 100644 --- a/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala +++ b/ledger/participant-state/kvutils/src/test/suite/scala/com/daml/ledger/participant/state/kvutils/api/KeyValueParticipantStateReaderSpec.scala @@ -22,6 +22,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString import com.google.rpc.status.Status @@ -37,6 +38,7 @@ class KeyValueParticipantStateReaderSpec with Matchers with AkkaBeforeAndAfterAll { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting private val offsetBuilder = new KVOffsetBuilder(0) "participant state reader" should { @@ -243,37 +245,51 @@ object KeyValueParticipantStateReaderSpec { private val aWrappedLogEntry = Envelope.enclose(aLogEntry) - private val zeroUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = - (_, _, _, _) => List.empty - - private val singleUpdateGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + private val zeroUpdateGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = + (_, _, _, _) => _ => List.empty + + private val singleUpdateGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = (_, _, _, _) => - List( - Update.PartyAddedToParticipant( - Ref.Party.assertFromString("aParty"), - "a party", - Ref.ParticipantId.assertFromString("aParticipant"), - Timestamp.now(), - submissionId = None, + _ => + List( + Update.PartyAddedToParticipant( + Ref.Party.assertFromString("aParty"), + "a party", + Ref.ParticipantId.assertFromString("aParticipant"), + Timestamp.now(), + submissionId = None, + ) ) - ) - private val twoUpdatesGenerator - : (DamlLogEntryId, DamlLogEntry, ValueSwitch[Status], Option[Timestamp]) => List[Update] = + private val twoUpdatesGenerator: ( + DamlLogEntryId, + DamlLogEntry, + ValueSwitch[Status], + Option[Timestamp], + ) => LoggingContext => List[Update] = (entryId, entry, errorVersionSwitch, recordTime) => - singleUpdateGenerator( - entryId, - entry, - errorVersionSwitch, - recordTime, - ) ::: singleUpdateGenerator( - entryId, - entry, - errorVersionSwitch, - recordTime, - ) + loggingContext => + singleUpdateGenerator( + entryId, + entry, + errorVersionSwitch, + recordTime, + )(loggingContext) ::: singleUpdateGenerator( + entryId, + entry, + errorVersionSwitch, + recordTime, + )(loggingContext) private def aLogEntryId(index: Int): Raw.LogEntryId = Raw.LogEntryId( @@ -296,7 +312,7 @@ object KeyValueParticipantStateReaderSpec { DamlLogEntry, ValueSwitch[Status], Option[Timestamp], - ) => List[Update] = singleUpdateGenerator, + ) => LoggingContext => List[Update] = singleUpdateGenerator, failOnUnexpectedEvent: Boolean = true, ): KeyValueParticipantStateReader = new KeyValueParticipantStateReader( diff --git a/ledger/participant-state/kvutils/tools/BUILD.bazel b/ledger/participant-state/kvutils/tools/BUILD.bazel index fa0fde648629..62c7d9b77cc7 100644 --- a/ledger/participant-state/kvutils/tools/BUILD.bazel +++ b/ledger/participant-state/kvutils/tools/BUILD.bazel @@ -81,6 +81,7 @@ da_scala_test( "//ledger/participant-state", "//ledger/participant-state/kvutils", "//ledger/participant-state/kvutils/tools", + "//libs-scala/contextualized-logging", "@maven//:org_mockito_mockito_core", ] + (["@maven//:com_google_protobuf_protobuf_java"] if scala_major_version == "2.13" else []), ) diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala index 2fafb89967dd..09b5a241b78a 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactory.scala @@ -17,6 +17,7 @@ import com.daml.ledger.participant.state.kvutils.api.{ import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet} import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw} import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import scala.collection.immutable @@ -73,7 +74,9 @@ final class LogAppendingReadServiceFactory( override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = implementation.ledgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = implementation.stateUpdates(beginAfter) override def currentHealth(): HealthStatus = implementation.currentHealth() diff --git a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala index 81b338263576..a47e2a4787b6 100644 --- a/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala +++ b/ledger/participant-state/kvutils/tools/integrity-check/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/LogAppendingReadServiceFactorySpec.scala @@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck import java.time.Instant import java.util.concurrent.TimeUnit - import akka.stream.scaladsl.Sink import com.codahale.metrics.MetricRegistry import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -17,6 +16,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import com.google.protobuf.ByteString import org.scalatest.matchers.should.Matchers @@ -29,6 +29,8 @@ final class LogAppendingReadServiceFactorySpec with Matchers with AkkaBeforeAndAfterAll { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "LogAppendingReadServiceFactory" should { "handle empty blocks" in { val factory = createFactory() @@ -71,6 +73,7 @@ final class LogAppendingReadServiceFactorySpec } object LogAppendingReadServiceFactorySpec { + private def createFactory() = { val offsetBuilder = new KVOffsetBuilder(0) val metrics = new Metrics(new MetricRegistry) diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala index 780bbec8f210..00a486e3e914 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityChecker.scala @@ -127,7 +127,7 @@ class IntegrityChecker[LogResult]( private[integritycheck] def compareStateUpdates( config: Config, stateUpdates: StateUpdateComparison, - ): Future[Unit] = + )(implicit loggingContext: LoggingContext): Future[Unit] = if (!config.indexOnly) stateUpdates.compare() else diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala index 2cea06946837..116d12e309eb 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateComparison.scala @@ -9,11 +9,12 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.kvutils.tools.integritycheck.IntegrityChecker.ComparisonFailureException import com.daml.ledger.participant.state.kvutils.tools.integritycheck.UpdateNormalizer.normalize import com.daml.ledger.participant.state.v2.Update +import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} trait StateUpdateComparison { - def compare(): Future[Unit] + def compare()(implicit loggingContext: LoggingContext): Future[Unit] } final class ReadServiceStateUpdateComparison( @@ -29,7 +30,7 @@ final class ReadServiceStateUpdateComparison( import ReadServiceStateUpdateComparison._ - def compare(): Future[Unit] = { + def compare()(implicit loggingContext: LoggingContext): Future[Unit] = { println("Comparing expected and actual state updates.".white) if (expectedReadService.updateCount() != actualReadService.updateCount()) { Future.failed( diff --git a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala index a6bff5a82bec..f67ba04d96c7 100644 --- a/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala +++ b/ledger/participant-state/kvutils/tools/src/main/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporter.scala @@ -9,6 +9,7 @@ import java.nio.file.Path import akka.stream.Materializer import com.daml.ledger.participant.state.kvutils.tools.integritycheck.UpdateNormalizer.normalize import com.daml.ledger.participant.state.v2.ReadService +import com.daml.logging.LoggingContext import scala.concurrent.{ExecutionContext, Future} @@ -19,7 +20,11 @@ object StateUpdateExporter { actualReadService: ReplayingReadService, outputWriterFactory: Path => PrintWriter, config: Config, - )(implicit executionContext: ExecutionContext, materializer: Materializer): Future[Unit] = { + )(implicit + executionContext: ExecutionContext, + materializer: Materializer, + loggingContext: LoggingContext, + ): Future[Unit] = { for { _ <- config.expectedUpdatesPath.fold(Future.unit)(path => StateUpdateExporter.write( @@ -45,6 +50,7 @@ object StateUpdateExporter { )(implicit materializer: Materializer, executionContext: ExecutionContext, + loggingContext: LoggingContext, ): Future[Unit] = { readService .stateUpdates(None) diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala index 936582b1743e..83358f261ec5 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/IntegrityCheckerSpec.scala @@ -11,12 +11,16 @@ import org.scalatest.wordspec.AsyncWordSpec import scala.concurrent.Future +import com.daml.logging.LoggingContext + final class IntegrityCheckerSpec extends AsyncWordSpec with Matchers with MockitoSugar with ArgumentMatchersSugar { + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + "compareStateUpdates" should { "call compare if not in index-only mode" in { val mockStateUpdates = mock[StateUpdateComparison] diff --git a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala index 9669a0952d69..4e1eb8d924fd 100644 --- a/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala +++ b/ledger/participant-state/kvutils/tools/src/test/scala/ledger/participant/state/kvutils/tools/integritycheck/StateUpdateExporterSpec.scala @@ -16,6 +16,7 @@ import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.Update import com.daml.lf.data.Time +import com.daml.logging.LoggingContext import org.mockito.ArgumentMatchers.anyString import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.BeforeAndAfterAll @@ -33,8 +34,9 @@ class StateUpdateExporterSpec with BeforeAndAfterAll { import StateUpdateExporterSpec._ - private[this] implicit val materializer: Materializer = Materializer(system) - private[this] implicit val ec: ExecutionContext = materializer.executionContext + private implicit val materializer: Materializer = Materializer(system) + private implicit val ec: ExecutionContext = materializer.executionContext + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) @@ -88,7 +90,9 @@ object StateUpdateExporterSpec extends MockitoSugar { override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = Source.empty - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = Source.single( ( Offset.beforeBegin, diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala index d9648d6422c5..950b45308243 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/AdaptedV1ReadService.scala @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2.AdaptedV1ReadService._ import com.daml.ledger.participant.state.v2.Update.CommandRejected import com.daml.ledger.participant.state.v2.Update.CommandRejected.RejectionReasonTemplate import com.daml.lf.data.Ref +import com.daml.logging.LoggingContext /** Adapts a [[com.daml.ledger.participant.state.v1.ReadService]] implementation to the * [[com.daml.ledger.participant.state.v2.ReadService]] API. @@ -25,7 +26,9 @@ class AdaptedV1ReadService(delegate: v1.ReadService) extends ReadService { override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = delegate.getLedgerInitialConditions() - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = delegate .stateUpdates(beginAfter) .map { case (offset, update) => Offset(offset.bytes) -> adaptUpdate(update) } diff --git a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala index d9afdd8f99e1..8b7cec7511e9 100644 --- a/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala +++ b/ledger/participant-state/src/main/scala/com/daml/ledger/participant/state/v2/ReadService.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl.Source import com.daml.ledger.api.health.ReportsHealth import com.daml.ledger.configuration.LedgerInitialConditions import com.daml.ledger.offset.Offset +import com.daml.logging.LoggingContext /** An interface for reading the state of a ledger participant. * '''Please note that this interface is unstable and may significantly change.''' @@ -135,5 +136,7 @@ trait ReadService extends ReportsHealth { * * Note further that the offsets of the transactions might not agree, as these offsets are participant-local. */ - def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] + def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] } diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index 51eb91a48ac9..648a3dd57f62 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -306,7 +306,10 @@ object RecoveringIndexerIntegrationSpec { Source.single(value) } } - }).when(failingParticipantState).stateUpdates(ArgumentMatchers.any[Option[Offset]]()) + }).when(failingParticipantState) + .stateUpdates( + ArgumentMatchers.any[Option[Offset]]() + )(ArgumentMatchers.any[LoggingContext]) failingParticipantState } diff --git a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala index c74e57a43f65..d4bf4db3b0ab 100644 --- a/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala +++ b/ledger/sandbox-on-x/src/main/scala/com/daml/ledger/sandbox/ReadWriteServiceBridge.scala @@ -130,7 +130,9 @@ case class ReadWriteServiceBridge( ) var stateUpdatesWasCalledAlready = false - override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = { + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = { // TODO for PoC purposes: // This method may only be called once, either with `beginAfter` set or unset. // A second call will result in an error unless the server is restarted.