Skip to content

Commit

Permalink
Propagate the contextual logging per-readUpdates call
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiotudone-da committed Oct 19, 2021
1 parent cfa6a7e commit 43cf4db
Show file tree
Hide file tree
Showing 25 changed files with 158 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ object Main {

val metricRegistry = new MetricRegistry
val metrics = new Metrics(metricRegistry)
val keyValueStateReader = newLoggingContext { implicit loggingContext =>
KeyValueParticipantStateReader(
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)
}
val keyValueStateReader = KeyValueParticipantStateReader(
keyValueSource,
metrics,
failOnUnexpectedEvent = false,
enableSelfServiceErrorCodes = false,
)

// Note: this method is doing quite a lot of work to transform a sequence of write sets
// to a sequence of state updates.
Expand All @@ -89,16 +87,18 @@ object Main {
// the benchmark.
val system = ActorSystem("IndexerBenchmarkUpdateReader")
implicit val materializer: Materializer = Materializer(system)
keyValueStateReader
.stateUpdates(None)
.take(config.updateCount.getOrElse(Long.MaxValue))
.zipWithIndex
.map { case (data, index) =>
if (index % 1000 == 0) println(s"Generated update $index")
data
}
.runWith(Sink.seq[(Offset, Update)])
.map(seq => seq.iterator)(DirectExecutionContext)
.andThen { case _ => system.terminate() }(DirectExecutionContext)
newLoggingContext { implicit loggingContext =>
keyValueStateReader
.stateUpdates(None)
.take(config.updateCount.getOrElse(Long.MaxValue))
.zipWithIndex
.map { case (data, index) =>
if (index % 1000 == 0) println(s"Generated update $index")
data
}
.runWith(Sink.seq[(Offset, Update)])
.map(seq => seq.iterator)(DirectExecutionContext)
.andThen { case _ => system.terminate() }(DirectExecutionContext)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.metrics.{JvmMetricSet, Metrics}
import com.daml.platform.configuration.ServerRole
Expand Down Expand Up @@ -198,15 +199,17 @@ class IndexerBenchmark() {
)

new ReadService {

override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = {
Source.single(initialConditions)
}

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = {
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = {
assert(beginAfter.isEmpty, s"beginAfter is $beginAfter")
Source.fromIterator(() => updates)
}

override def currentHealth(): HealthStatus = Healthy
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ case class EndlessReadService(
*
* The last two items above repeat indefinitely
*/
override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
synchronized {
logger.info(s"EndlessReadService.stateUpdates($beginAfter) called")
stateUpdatesCalls.incrementAndGet()
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state-metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ da_scala_library(
"//ledger/ledger-offset",
"//ledger/metrics",
"//ledger/participant-state",
"//libs-scala/contextualized-logging",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_dropwizard_metrics_metrics_core",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.LedgerInitialConditions
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v2.{ReadService, Update}
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}

final class TimedReadService(delegate: ReadService, metrics: Metrics) extends ReadService {
Expand All @@ -19,7 +20,9 @@ final class TimedReadService(delegate: ReadService, metrics: Metrics) extends Re
delegate.ledgerInitialConditions(),
)

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
Timed.source(metrics.daml.services.read.stateUpdates, delegate.stateUpdates(beginAfter))

override def currentHealth(): HealthStatus =
Expand Down
1 change: 1 addition & 0 deletions ledger/participant-state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ da_scala_library(
"//ledger/ledger-grpc",
"//ledger/ledger-offset",
"//ledger/metrics",
"//libs-scala/contextualized-logging",
"//libs-scala/grpc-utils",
"//libs-scala/logging-entries",
"@maven//:com_google_api_grpc_proto_google_common_protos",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ object KeyValueConsumption {
entry: DamlLogEntry,
errorVersionSwitch: ValueSwitch[Status],
recordTimeForUpdate: Option[Timestamp] = None,
)(implicit loggingContext: LoggingContext): List[Update] = {
)(loggingContext: LoggingContext): List[Update] = {
implicit val logContext: LoggingContext = loggingContext

val recordTimeFromLogEntry = PartialFunction.condOpt(entry.hasRecordTime) { case true =>
parseTimestamp(entry.getRecordTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ class KeyValueParticipantState(
writer: LedgerWriter,
metrics: Metrics,
enableSelfServiceErrorCodes: Boolean,
)(implicit loggingContext: LoggingContext)
extends ReadService
) extends ReadService
with WriteService {
private val readerAdapter =
KeyValueParticipantStateReader(reader, metrics, enableSelfServiceErrorCodes)
Expand All @@ -58,7 +57,9 @@ class KeyValueParticipantState(
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
readerAdapter.ledgerInitialConditions()

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] =
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] =
readerAdapter.stateUpdates(beginAfter)

override def submitTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class KeyValueParticipantStateReader private[api] (
DamlLogEntry,
ValueSwitch[Status],
Option[Timestamp],
) => List[Update],
) => LoggingContext => List[Update],
timeUpdatesProvider: TimeUpdatesProvider,
failOnUnexpectedEvent: Boolean,
) extends ReadService {
Expand All @@ -48,7 +48,9 @@ class KeyValueParticipantStateReader private[api] (
override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] =
Source.single(createLedgerInitialConditions())

override def stateUpdates(beginAfter: Option[Offset]): Source[(Offset, Update), NotUsed] = {
override def stateUpdates(
beginAfter: Option[Offset]
)(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = {
Source
.single(beginAfter.map(offset => KVOffset(offset).zeroLowest.offset))
.flatMapConcat(reader.events)
Expand All @@ -66,7 +68,7 @@ class KeyValueParticipantStateReader private[api] (
logEntry,
errorVersionSwitch,
timeUpdatesProvider(),
)
)(loggingContext)
val updatesWithOffsets =
Source(updates).zipWithIndex.map { case (update, index) =>
offsetForUpdate(offset, index.toInt, updates.size) -> update
Expand Down Expand Up @@ -104,7 +106,7 @@ object KeyValueParticipantStateReader {
enableSelfServiceErrorCodes: Boolean,
timeUpdatesProvider: TimeUpdatesProvider = TimeUpdatesProvider.ReasonableDefault,
failOnUnexpectedEvent: Boolean = true,
)(implicit loggingContext: LoggingContext): KeyValueParticipantStateReader =
): KeyValueParticipantStateReader =
new KeyValueParticipantStateReader(
reader,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ object KVTest {
newState.keySet subsetOf KeyValueCommitting.submissionOutputs(submission)
)
// Verify that we can always process the log entry.
val _ = KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
val _ = KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)

entryId -> logEntry
}
Expand Down Expand Up @@ -417,13 +421,13 @@ object KVTest {
successfulLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
)(loggingContext)
KeyValueConsumption.logEntryToUpdate(
entryId,
outOfTimeBoundsLogEntry,
errorVersionSwitch,
recordTimeFromTimeUpdateLogEntry,
)
)(loggingContext)

entryId -> preExecutionResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i

private implicit val resourceContext: ResourceContext = ResourceContext(testExecutionContext)
private implicit val telemetryContext: TelemetryContext = NoOpTelemetryContext
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

// Can be used by [[participantStateFactory]] to get a stable ID throughout the test.
// For example, for initializing a database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -225,7 +229,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)(loggingContext)
)
}

Expand Down Expand Up @@ -308,7 +312,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
)
(entryId, logEntry) = result
contractId = contractIdOfCreateTransaction(
KeyValueConsumption.logEntryToUpdate(entryId, logEntry, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
logEntry,
errorVersionSwitch,
)(loggingContext)
)

transaction2 <- runSimpleCommand(
Expand Down Expand Up @@ -525,7 +533,11 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {

// Process into updates and verify
val updates =
KeyValueConsumption.logEntryToUpdate(entryId, strippedEntry.build, errorVersionSwitch)
KeyValueConsumption.logEntryToUpdate(
entryId,
strippedEntry.build,
errorVersionSwitch,
)(loggingContext)
inside(updates) { case Seq(txAccepted: Update.TransactionAccepted) =>
txAccepted.optCompletionInfo should be(None)
}
Expand Down Expand Up @@ -581,7 +593,7 @@ class KVUtilsTransactionSpec extends AnyWordSpec with Matchers with Inside {
preExecutionResult.successfulLogEntry,
errorVersionSwitch,
Some(recordTime),
)
)(loggingContext)
)

private def prepareExerciseReplaceByKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithoutRecordTime,
errorSwitch,
recordTimeForUpdate = None,
)
)(loggingContext)
)
}
}
Expand All @@ -91,7 +91,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithRecordTime,
errorSwitch,
recordTimeForUpdate = Some(aRecordTime),
)
)(loggingContext)

actual.recordTime shouldBe aRecordTimeFromLogEntry
}
Expand All @@ -107,7 +107,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
aLogEntryWithRecordTime,
errorSwitch,
recordTimeForUpdate = None,
)
)(loggingContext)

actual.recordTime shouldBe Timestamp.assertFromInstant(Instant.ofEpochSecond(100))
}
Expand All @@ -126,7 +126,7 @@ class KeyValueConsumptionSpec extends AnyWordSpec with Matchers {
timeUpdateEntry,
errorSwitch,
recordTimeForUpdate = None,
) shouldBe Nil
)(loggingContext) shouldBe Nil
}
}
}
Expand Down
Loading

0 comments on commit 43cf4db

Please sign in to comment.