Skip to content

Commit

Permalink
KV: support contextual logging in KeyValueConsumption [KVL-1143] (#11288
Browse files Browse the repository at this point in the history
)

* Support contextual logging in KeyValueConsumption

CHANGELOG_BEGIN
CHANGELOG_END

* Fix formatting and imports

* Propagate the contextual logging per-readUpdates call

* No need for at-construction logging context in LogAppendingReadServiceFactory

* No need for logging context in CommitStrategySupport.newReadServiceFactory

* Fix JdbcIndexerSpec
  • Loading branch information
fabiotudone-da authored Oct 19, 2021
1 parent f4df1cc commit 2fd200f
Show file tree
Hide file tree
Showing 28 changed files with 170 additions and 79 deletions.
1 change: 1 addition & 0 deletions ledger/indexer-benchmark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ da_scala_library(
"//ledger/participant-state",
"//ledger/participant-state/kvutils",
"//libs-scala/concurrent",
"//libs-scala/contextualized-logging",
"@maven//:io_dropwizard_metrics_metrics_core",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.daml.ledger.participant.state.kvutils.api.{
import com.daml.ledger.participant.state.kvutils.export.ProtobufBasedLedgerDataImporter
import com.daml.ledger.participant.state.kvutils.{KVOffsetBuilder, Raw}
import com.daml.ledger.participant.state.v2.Update
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.Metrics

import scala.concurrent.Future
Expand Down Expand Up @@ -86,16 +87,18 @@ object Main {
// the benchmark.
val system = ActorSystem("IndexerBenchmarkUpdateReader")
implicit val materializer: Materializer = Materializer(system)
keyValueStateReader
.stateUpdates(None)
.take(config.updateCount.getOrElse(Long.MaxValue))
.zipWithIndex
.map { case (data, index) =>
if (index % 1000 == 0) println(s"Generated update $index")
data
}
.runWith(Sink.seq[(Offset, Update)])
.map(seq => seq.iterator)(DirectExecutionContext)
.andThen { case _ => system.terminate() }(DirectExecutionContext)
newLoggingContext { implicit loggingContext =>
keyValueStateReader
.stateUpdates(None)
.take(config.updateCount.getOrElse(Long.MaxValue))
.zipWithIndex
.map { case (data, index) =>
if (index % 1000 == 0) println(s"Generated update $index")
data
}
.runWith(Sink.seq[(Offset, Update)])
.map(seq => seq.iterator)(DirectExecutionContext)
.andThen { case _ => system.terminate() }(DirectExecutionContext)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.{JvmMetricSet, Metrics}
import com.daml.platform.configuration.ServerRole
Expand Down Expand Up @@ -198,15 +199,17 @@ class IndexerBenchmark() {
)

new ReadService {

override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = {
Source.single(initialConditions)
}

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = {
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = {
assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
Source.fromIterator(() => updates)
}

override def currentHealth(): HealthStatus = Healthy
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ case class EndlessReadService(
*
* The last two items above repeat indefinitely
*/
override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
synchronized {
logger.info(s"EndlessReadService.stateUpdates($beginAfter) called")
stateUpdatesCalls.incrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final class JdbcIndexerSpec
)
)

when(readService.stateUpdates(any[Option[Offset]]))
when(readService.stateUpdates(any[Option[Offset]])(any[LoggingContext]))
.thenReturn(Source.fromIterator(() => updates.iterator))

readService
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state-metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ da_scala_library(
"//ledger/ledger-offset",
"//ledger/metrics",
"//ledger/participant-state",
"//libs-scala/contextualized-logging",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}

final class TimedReadService(delegate: ReadService, metrics: Metrics) extends ReadService {
Expand All @@ -19,7 +20,9 @@ final class TimedReadService(delegate: ReadService, metrics: Metrics) extends Re
delegate.ledgerInitialConditions(),
)

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
Timed.source(metrics.daml.services.read.stateUpdates, delegate.stateUpdates(beginAfter))

override def currentHealth(): HealthStatus =
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ da_scala_library(
"//ledger/ledger-grpc",
"//ledger/ledger-offset",
"//ledger/metrics",
"//libs-scala/contextualized-logging",
"//libs-scala/grpc-utils",
"//libs-scala/logging-entries",
"@maven//:com_google_api_grpc_proto_google_common_protos",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.LedgerString
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.transaction.CommittedTransaction
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.google.common.io.BaseEncoding
import com.google.protobuf.ByteString
import com.google.rpc.status.Status
import org.slf4j.LoggerFactory

import scala.jdk.CollectionConverters._

/** Utilities for producing [[Update]] events from [[DamlLogEntry]]'s committed to a
* key-value based ledger.
*/
object KeyValueConsumption {
private val logger = LoggerFactory.getLogger(this.getClass)
private val logger = ContextualizedLogger.get(this.getClass)

def packDamlLogEntry(entry: DamlStateKey): ByteString = entry.toByteString
def unpackDamlLogEntry(bytes: ByteString): DamlLogEntry = DamlLogEntry.parseFrom(bytes)
Expand All @@ -59,7 +59,9 @@ object KeyValueConsumption {
entry: DamlLogEntry,
errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
): List[Update] = {
)(loggingContext: LoggingContext): List[Update] = {
implicit val logContext: LoggingContext = loggingContext

val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
parseTimestamp(entry.getRecordTime)
}
Expand Down Expand Up @@ -260,7 +262,7 @@ object KeyValueConsumption {
entryId: DamlLogEntryId,
txEntry: DamlTransactionEntry,
recordTime: Timestamp,
): Update.TransactionAccepted = {
)(implicit loggingContext: LoggingContext): Update.TransactionAccepted = {
val transaction = Conversions.decodeTransaction(txEntry.getTransaction)
val hexTxId = parseLedgerString("TransactionId")(
BaseEncoding.base16.encode(entryId.toByteArray)
Expand Down Expand Up @@ -311,7 +313,7 @@ object KeyValueConsumption {
private def validateDivulgedContracts(
hexTxId: LedgerString,
damlTransactionBlindingInfo: DamlTransactionBlindingInfo,
) =
)(implicit loggingContext: LoggingContext): List[DivulgedContract] =
if (!damlTransactionBlindingInfo.getDivulgencesList.isEmpty) {
Conversions.extractDivulgedContracts(damlTransactionBlindingInfo) match {
case Right(divulgedContractsIndex) =>
Expand Down Expand Up @@ -339,7 +341,7 @@ object KeyValueConsumption {
recordTime: Timestamp,
outOfTimeBoundsEntry: DamlOutOfTimeBoundsEntry,
errorVersionSwitch: ValueSwitch[Status],
): Option[Update] = {
)(implicit loggingContext: LoggingContext): Option[Update] = {
val timeBounds = parseTimeBounds(outOfTimeBoundsEntry)
val deduplicated = timeBounds.deduplicateUntil.exists(recordTime <= _)
val tooEarly = timeBounds.tooEarlyUntil.exists(recordTime < _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.daml.ledger.participant.state.v2.{
}
import com.daml.lf.data.{Ref, Time}
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.telemetry.TelemetryContext

Expand Down Expand Up @@ -56,7 +57,9 @@ class KeyValueParticipantState(
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
readerAdapter.ledgerInitialConditions()

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
readerAdapter.stateUpdates(beginAfter)

override def submitTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.daml.ledger.participant.state.v2._
import com.daml.ledger.validator.preexecution.TimeUpdatesProvider
import com.daml.lf.data.Time
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}
import com.google.rpc.status.Status

Expand All @@ -35,7 +36,7 @@ class KeyValueParticipantStateReader private[api] (
DamlLogEntry,
ValueSwitch[Status],
Option[Timestamp],
) => List[Update],
) => LoggingContext => List[Update],
timeUpdatesProvider: TimeUpdatesProvider,
failOnUnexpectedEvent: Boolean,
) extends ReadService {
Expand All @@ -47,7 +48,9 @@ class KeyValueParticipantStateReader private[api] (
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
Source.single(createLedgerInitialConditions())

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = {
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = {
Source
.single(beginAfter.map(offset => KVOffset(offset).zeroLowest.offset))
.flatMapConcat(reader.events)
Expand All @@ -65,7 +68,7 @@ class KeyValueParticipantStateReader private[api] (
logEntry,
errorVersionSwitch,
timeUpdatesProvider(),
)
)(loggingContext)
val updatesWithOffsets =
Source(updates).zipWithIndex.map { case (update, index) =>
offsetForUpdate(offset, index.toInt, updates.size) -> update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ object KVTest {
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
val _ = KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)

entryId -> logEntry
}
Expand Down Expand Up @@ -417,13 +421,13 @@ object KVTest {
successfulLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
)(loggingContext)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
)(loggingContext)

entryId -> preExecutionResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i

private implicit val resourceContext: ResourceContext = ResourceContext(testExecutionContext)
private implicit val telemetryContext: TelemetryContext = NoOpTelemetryContext
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

// Can be used by [[participantStateFactory]] to get a stable ID throughout the test.
// For example, for initializing a database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -225,7 +229,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)(loggingContext)
)
}

Expand Down Expand Up @@ -308,7 +312,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -525,7 +533,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {

// Process into updates and verify
val updates =
KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
strippedEntry.build,
errorVersionSwitch,
)(loggingContext)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
}
Expand Down Expand Up @@ -581,7 +593,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)(loggingContext)
)

private def prepareExerciseReplaceByKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.daml.ledger.participant.state.v2.Update
import com.daml.ledger.participant.state.v2.Update.CommandRejected
import com.daml.ledger.participant.state.v2.Update.CommandRejected.FinalReason
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.LoggingContext
import com.google.protobuf.any.{Any => AnyProto}
import com.google.protobuf.{ByteString, Empty}
import com.google.rpc.code.Code
Expand Down Expand Up @@ -63,6 +64,8 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
v2ErrorSwitch,
)

private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

"logEntryToUpdate" should {
"throw in case no record time is available from the log entry or input argument" in {
forAll(
Expand All @@ -74,7 +77,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithoutRecordTime,
errorSwitch,
recordTimeForUpdate = None,
)
)(loggingContext)
)
}
}
Expand All @@ -88,7 +91,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithRecordTime,
errorSwitch,
recordTimeForUpdate = Some(aRecordTime),
)
)(loggingContext)

actual.recordTime shouldBe aRecordTimeFromLogEntry
}
Expand All @@ -104,7 +107,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithRecordTime,
errorSwitch,
recordTimeForUpdate = None,
)
)(loggingContext)

actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
}
Expand All @@ -123,7 +126,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
timeUpdateEntry,
errorSwitch,
recordTimeForUpdate = None,
) shouldBe Nil
)(loggingContext) shouldBe Nil
}
}
}
Expand Down
Loading

0 comments on commit 2fd200f

Please sign in to comment.