Skip to content

Commit

Permalink
Support contextual logging in KeyValueConsumption
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
fabiotudone-da committed Oct 19, 2021
1 parent a9f6afb commit 0be8b6b
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 36 deletions.
1 change: 1 addition & 0 deletions ledger/indexer-benchmark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/concurrent",
"//libs-scala/contextualized-logging",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.ledger.indexerbenchmark

import java.nio.file.{Files, Paths}
import java.util.concurrent.atomic.AtomicLong

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
Expand All @@ -23,6 +22,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter
import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw}
import com.daml.ledger.participant.state.v2.Update
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics

import scala.concurrent.Future
Expand Down Expand Up @@ -72,12 +72,14 @@ object Main {

val metricRegistry = new MetricRegistry
val metrics = new Metrics(metricRegistry)
val keyValueStateReader = KeyValueParticipantStateReader(
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)
val keyValueStateReader = newLoggingContext { implicit loggingContext =>
KeyValueParticipantStateReader(
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)
}

// Note: this method is doing quite a lot of work to transform a sequence of write sets
// to a sequence of state updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.ledger.participant.state.kvutils

import com.daml.error.ValueSwitch
import com.daml.error.{DamlContextualizedErrorLogger, ValueSwitch}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.kvutils.Conversions._
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.DamlPackageUploadRejectionEntry
Expand All @@ -26,18 +26,18 @@ import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.LedgerString
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.CommittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.google.common.io.BaseEncoding
import com.google.protobuf.ByteString
import com.google.rpc.status.Status
import org.slf4j.LoggerFactory

import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
* key-value based ledger.
*/
object KeyValueConsumption {
private val logger = LoggerFactory.getLogger(this.getClass)
private val logger = ContextualizedLogger.get(this.getClass)

def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString
def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes)
Expand All @@ -59,6 +59,8 @@ object KeyValueConsumption {
entry: DamlLogEntry,
errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
)(implicit
loggingContext: LoggingContext
): List[Update] = {
val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
parseTimestamp(entry.getRecordTime)
Expand Down Expand Up @@ -260,7 +262,7 @@ object KeyValueConsumption {
entryId: DamlLogEntryId,
txEntry: DamlTransactionEntry,
recordTime: Timestamp,
): Update.TransactionAccepted = {
)(implicit loggingContext: LoggingContext): Update.TransactionAccepted = {
val transaction = Conversions.decodeTransaction(txEntry.getTransaction)
val hexTxId = parseLedgerString("TransactionId")(
BaseEncoding.base16.encode(entryId.toByteArray)
Expand Down Expand Up @@ -311,7 +313,7 @@ object KeyValueConsumption {
private def validateDivulgedContracts(
hexTxId: LedgerString,
damlTransactionBlindingInfo: DamlTransactionBlindingInfo,
) =
)(implicit loggingContext: LoggingContext): List[DivulgedContract] =
if (!damlTransactionBlindingInfo.getDivulgencesList.isEmpty) {
Conversions.extractDivulgedContracts(damlTransactionBlindingInfo) match {
case Right(divulgedContractsIndex) =>
Expand Down Expand Up @@ -339,7 +341,7 @@ object KeyValueConsumption {
recordTime: Timestamp,
outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry,
errorVersionSwitch: ValueSwitch[Status],
): Option[Update] = {
)(implicit loggingContext: LoggingContext): Option[Update] = {
val timeBounds = parseTimeBounds(outOfTimeBoundsEntry)
val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _)
val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.ledger.participant.state.kvutils.api

import java.util.concurrent.CompletionStage

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf
Expand All @@ -22,9 +20,12 @@ import com.daml.ledger.participant.state.v2.{
}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.telemetry.TelemetryContext

import java.util.concurrent.CompletionStage

/** Implements read and write operations required for running a participant server.
*
* Adapts [[LedgerReader]] and [[LedgerWriter]] interfaces to [[com.daml.ledger.participant.state.v2.ReadService]] and
Expand All @@ -41,7 +42,8 @@ class KeyValueParticipantState(
writer: LedgerWriter,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
) extends ReadService
)(implicit loggingContext: LoggingContext)
extends ReadService
with WriteService {
private val readerAdapter =
KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2._
import com.daml.ledger.validator.preexecution.TimeUpdatesProvider
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}
import com.google.rpc.status.Status

Expand Down Expand Up @@ -103,7 +104,7 @@ object KeyValueParticipantStateReader {
enableSelfServiceErrorCodes: Boolean,
timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault,
failOnUnexpectedEvent: Boolean = true,
): KeyValueParticipantStateReader =
)(implicit loggingContext: LoggingContext): KeyValueParticipantStateReader =
new KeyValueParticipantStateReader(
reader,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,17 @@ import com.daml.error.ValueSwitch
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.participant.state.kvutils.Conversions.{buildTimestamp, parseInstant}
import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{
TimeBounds,
logEntryToUpdate,
outOfTimeBoundsEntryToUpdate,
}
import com.daml.ledger.participant.state.kvutils.KeyValueConsumption.{TimeBounds, logEntryToUpdate, outOfTimeBoundsEntryToUpdate}
import com.daml.ledger.participant.state.kvutils.api.LedgerReader
import com.daml.ledger.participant.state.kvutils.store.DamlLogEntry.PayloadCase._
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{
DamlPackageUploadEntry,
DamlPackageUploadRejectionEntry,
}
import com.daml.ledger.participant.state.kvutils.store.events.PackageUpload.{DamlPackageUploadEntry, DamlPackageUploadRejectionEntry}
import com.daml.ledger.participant.state.kvutils.store.events._
import com.daml.ledger.participant.state.kvutils.store.{
DamlLogEntry,
DamlLogEntryId,
DamlOutOfTimeBoundsEntry,
}
import com.daml.ledger.participant.state.kvutils.store.{DamlLogEntry, DamlLogEntryId, DamlOutOfTimeBoundsEntry}
import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.v2.Update.CommandRejected
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.google.protobuf.any.{Any => AnyProto}
import com.google.protobuf.{ByteString, Empty}
import com.google.rpc.code.Code
Expand Down Expand Up @@ -63,6 +53,8 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
v2ErrorSwitch,
)

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

"logEntryToUpdate" should {
"throw in case no record time is available from the log entry or input argument" in {
forAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ final class LogAppendingCommitStrategySupport(
}
}

override def newReadServiceFactory(): ReplayingReadServiceFactory =
override def newReadServiceFactory()(implicit
loggingContext: LoggingContext
): ReplayingReadServiceFactory =
new LogAppendingReadServiceFactory(offsetBuilder, metrics)

override val writeSetComparison: WriteSetComparison =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
import com.daml.ledger.participant.state.kvutils.export.{SubmissionInfo, WriteSet}
import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw}
import com.daml.ledger.participant.state.v2.Update
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics

import scala.collection.immutable
Expand All @@ -25,7 +26,8 @@ import scala.collection.mutable.ListBuffer
final class LogAppendingReadServiceFactory(
offsetBuilder: KVOffsetBuilder,
metrics: Metrics,
) extends ReplayingReadServiceFactory {
)(implicit val loggingContext: LoggingContext)
extends ReplayingReadServiceFactory {
private val recordedBlocks = ListBuffer.empty[LedgerRecord]

override def appendBlock(submissionInfo: SubmissionInfo, writeSet: WriteSet): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ final class RawPreExecutingCommitStrategySupport(
.map(_ => access.getWriteSet)
}

override def newReadServiceFactory(): ReplayingReadServiceFactory =
override def newReadServiceFactory()(implicit
loggingContext: LoggingContext
): ReplayingReadServiceFactory =
new LogAppendingReadServiceFactory(offsetBuilder, metrics)

override val writeSetComparison: WriteSetComparison =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.ledger.participant.state.kvutils.tools.integritycheck

import java.time.Instant
import java.util.concurrent.TimeUnit

import akka.stream.scaladsl.Sink
import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
Expand All @@ -17,6 +16,7 @@ import com.daml.ledger.participant.state.kvutils.{Envelope, KVOffsetBuilder, Raw
import com.daml.ledger.participant.state.v2.Update
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.google.protobuf.ByteString
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -71,6 +71,8 @@ final class LogAppendingReadServiceFactorySpec
}

object LogAppendingReadServiceFactorySpec {
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private def createFactory() = {
val offsetBuilder = new KVOffsetBuilder(0)
val metrics = new Metrics(new MetricRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait CommitStrategySupport[LogResult] {
submissionInfo: SubmissionInfo
)(implicit materializer: Materializer, loggingContext: LoggingContext): Future[WriteSet]

def newReadServiceFactory(): ReplayingReadServiceFactory
def newReadServiceFactory()(implicit loggingContext: LoggingContext): ReplayingReadServiceFactory

def writeSetComparison: WriteSetComparison
}

0 comments on commit 0be8b6b

Please sign in to comment.