Skip to content

Commit

Permalink
Metering report [DPP-817] (#12604)
Browse files Browse the repository at this point in the history
With experimental support for non-aggregated metering reporting

changelog_begin
with experimental support for non-aggregated metering reporting
changelog_end
  • Loading branch information
simonmaxen-da authored Feb 1, 2022
1 parent 716cc22 commit 6cdda6f
Show file tree
Hide file tree
Showing 23 changed files with 543 additions and 97 deletions.
2 changes: 2 additions & 0 deletions ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ final class Metrics(val registry: MetricRegistry) {
val stopDeduplicatingCommand: Timer =
registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")

val publishTransaction: Timer = registry.timer(Prefix :+ "publish_transaction")
val publishPartyAllocation: Timer = registry.timer(Prefix :+ "publish_party_allocation")
Expand Down Expand Up @@ -681,6 +682,7 @@ final class Metrics(val registry: MetricRegistry) {
val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command")
val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command")
val prune: Timer = registry.timer(Prefix :+ "prune")
val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering")

object streamsBuffer {
private val Prefix: MetricName = index.Prefix :+ "streams_buffer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[daml] object ApiServices {
private val partyManagementService: IndexPartyManagementService = indexService
private val configManagementService: IndexConfigManagementService = indexService
private val submissionService: IndexSubmissionService = indexService
private val meteringStore: MeteringStore = indexService

private val configurationInitializer = new LedgerConfigurationInitializer(
indexService = indexService,
Expand Down Expand Up @@ -213,7 +214,8 @@ private[daml] object ApiServices {
None
}

val apiMeteringReportService = new ApiMeteringReportService()
val apiMeteringReportService =
new ApiMeteringReportService(participantId, meteringStore, errorsVersionsSwitcher)

apiTimeServiceOpt.toList :::
writeServiceBackedApiServices :::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import com.daml.ledger.api.{TraceIdentifiers, domain}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.index.v2.{IndexService, MeteringStore}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.language.Ast
import com.daml.lf.transaction.GlobalKey
Expand Down Expand Up @@ -209,4 +210,12 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In

override def currentHealth(): HealthStatus =
delegate.currentHealth()

override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = {
delegate.getTransactionMetering(from, to, applicationId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import com.daml.ledger.api.v1.transaction_service.{
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.participant.state.index.v2.{IndexService, MeteringStore}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.Party
import com.daml.lf.data.Ref.{ApplicationId, Party}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.language.Ast
import com.daml.lf.transaction.GlobalKey
Expand Down Expand Up @@ -244,4 +244,15 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met

override def currentHealth(): HealthStatus =
delegate.currentHealth()

override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = {
Timed.future(
metrics.daml.services.index.getTransactionMetering,
delegate.getTransactionMetering(from, to, applicationId),
)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.apiserver.services.admin

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.ledger.api.v1.admin.metering_report_service.MeteringReportServiceGrpc.MeteringReportService
import com.daml.ledger.api.v1.admin.metering_report_service._
import com.daml.ledger.participant.state.index.v2.MeteringStore
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.services.admin.ApiMeteringReportService._
import com.daml.platform.server.api.ValidationLogger
import com.daml.platform.server.api.validation.ErrorFactories
import com.google.protobuf.timestamp.{Timestamp => ProtoTimestamp}
import io.grpc.ServerServiceDefinition

import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps

private[apiserver] final class ApiMeteringReportService(
participantId: Ref.ParticipantId,
store: MeteringStore,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
clock: () => ProtoTimestamp = () => toProtoTimestamp(Timestamp.now()),
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) extends MeteringReportService
with GrpcApiService {

private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val generator = new MeteringReportGenerator(participantId)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

override def bindService(): ServerServiceDefinition =
MeteringReportServiceGrpc.bindService(this, executionContext)

override def close(): Unit = ()

override def getMeteringReport(
request: GetMeteringReportRequest
): Future[GetMeteringReportResponse] = {
logger.info(s"Received metering report request: $request")

(for {
protoFrom <- request.from.toRight("from date must be specified")
from <- toTimestamp(protoFrom)
to <- request.to.fold[Either[String, Option[Timestamp]]](Right(None))(t =>
toTimestamp(t).map(Some.apply)
)
applicationId <- toOption(request.applicationId)
.fold[Either[String, Option[Ref.ApplicationId]]](Right(None))(t =>
Ref.ApplicationId.fromString(t).map(Some.apply)
)
} yield {
val reportTime = clock()
store.getTransactionMetering(from, to, applicationId).map { metering =>
generator.generate(request, metering, reportTime)
}
}) match {
case Right(f) => f
case Left(error) =>
Future.failed(
ValidationLogger.logFailure(request, errorFactories.invalidArgument(None)(error))
)
}
}
}

private[apiserver] object ApiMeteringReportService {

def toOption(protoString: String): Option[String] = {
if (protoString.nonEmpty) Some(protoString) else None
}

def toProtoTimestamp(ts: Timestamp): ProtoTimestamp = {
ts.toInstant.pipe { i => ProtoTimestamp.of(i.getEpochSecond, i.getNano) }
}

def toTimestamp(ts: ProtoTimestamp): Either[String, Timestamp] = {
Timestamp.fromInstant(Instant.ofEpochSecond(ts.seconds, ts.nanos.toLong))
}

class MeteringReportGenerator(participantId: Ref.ParticipantId) {
def generate(
request: GetMeteringReportRequest,
metering: Vector[TransactionMetering],
generationTime: ProtoTimestamp,
): GetMeteringReportResponse = {

val applicationReports = metering
.groupMapReduce(_.applicationId)(_.actionCount.toLong)(_ + _)
.toList
.sortBy(_._1)
.map((ApplicationMeteringReport.apply _).tupled)

val report = ParticipantMeteringReport(
participantId,
toActual = Some(generationTime),
applicationReports,
)

GetMeteringReportResponse(
request = Some(request),
participantReport = Some(report),
reportGenerationTime = Some(generationTime),
)

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import com.daml.ledger.api.v1.transaction_service.{
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2._
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.{Identifier, PackageId, Party}
import com.daml.lf.data.Ref.{ApplicationId, Identifier, PackageId, Party}
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.language.Ast
import com.daml.lf.transaction.GlobalKey
Expand Down Expand Up @@ -346,4 +347,16 @@ private[platform] final class LedgerBackedIndexService(
.map(off => Future.fromTry(ApiOffset.fromString(off.value)))
.getOrElse(Future.successful(Offset.beforeBegin))

override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = {
ledger.getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ import com.daml.ledger.api.v1.transaction_service.{
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationResult,
MeteringStore,
PackageDetails,
}
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.language.Ast
import com.daml.lf.transaction.GlobalKey
Expand Down Expand Up @@ -198,6 +203,17 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M
metrics.daml.index.prune,
ledger.prune(pruneUpToInclusive, pruneAllDivulgedContracts),
)

override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = {
Timed.future(
metrics.daml.index.getTransactionMetering,
ledger.getTransactionMetering(from, to, applicationId),
)
}
}

private[platform] object MeteredReadOnlyLedger {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.daml.platform.PruneBuffers
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.store.appendonlydao.{LedgerDaoTransactionsReader, LedgerReadDao}
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -204,5 +205,13 @@ private[platform] abstract class BaseLedger(
ledgerDao.prune(pruneUpToInclusive, pruneAllDivulgedContracts)
}

override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[Ref.ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = {
ledgerDao.getTransactionMetering(from, to, applicationId)
}

override def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.daml.ledger.api.v1.transaction_service.{
}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails}
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
Expand Down Expand Up @@ -169,4 +170,11 @@ private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable
def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit
loggingContext: LoggingContext
): Future[Unit]

def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[Ref.ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationDuplicate,
CommandDeduplicationNew,
Expand All @@ -21,6 +22,7 @@ import com.daml.ledger.participant.state.index.v2.{
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.archive.ArchiveParser
import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.ApplicationId
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.ValueEnricher
import com.daml.lf.transaction.{BlindingInfo, CommittedTransaction}
Expand Down Expand Up @@ -590,6 +592,17 @@ private class JdbcLedgerDao(
PersistenceResponse.Ok
}
}

/** Returns all TransactionMetering records matching given criteria */
override def getTransactionMetering(
from: Timestamp,
to: Option[Timestamp],
applicationId: Option[ApplicationId],
)(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = {
dbDispatcher.executeSql(metrics.daml.index.db.lookupConfiguration)(
readStorageBackend.meteringStorageBackend.transactionMetering(from, to, applicationId)
)
}
}

private[platform] object JdbcLedgerDao {
Expand Down
Loading

0 comments on commit 6cdda6f

Please sign in to comment.