From 6cdda6fbf77bbf34189ea254da2aca6b70c22e97 Mon Sep 17 00:00:00 2001 From: Simon Maxen <56595114+simonmaxen-da@users.noreply.github.com> Date: Tue, 1 Feb 2022 10:38:04 +0000 Subject: [PATCH] Metering report [DPP-817] (#12604) With experimental support for non-aggregated metering reporting changelog_begin with experimental support for non-aggregated metering reporting changelog_end --- .../main/scala/com/daml/metrics/Metrics.scala | 2 + .../platform/apiserver/ApiServices.scala | 4 +- .../apiserver/SpannedIndexService.scala | 11 +- .../apiserver/TimedIndexService.scala | 15 +- .../services/ApiMeteringReportService.scala | 47 ------ .../admin/ApiMeteringReportService.scala | 124 +++++++++++++++ .../index/LedgerBackedIndexService.scala | 15 +- .../index/MeteredReadOnlyLedger.scala | 18 ++- .../scala/platform/store/BaseLedger.scala | 9 ++ .../scala/platform/store/ReadOnlyLedger.scala | 8 + .../store/appendonlydao/JdbcLedgerDao.scala | 13 ++ .../store/appendonlydao/LedgerDao.scala | 9 ++ .../appendonlydao/MeteredLedgerDao.scala | 19 ++- .../store/backend/StorageBackend.scala | 19 +-- .../store/backend/StorageBackendFactory.scala | 2 + .../MeteringStorageBackendTemplate.scala | 27 ++-- .../backend/StorageBackendTestValues.scala | 3 +- ...orageBackendTestsInitializeIngestion.scala | 8 +- .../backend/StorageBackendTestsMetering.scala | 103 +++++++++--- .../admin/ApiMeteringReportServiceSpec.scala | 148 ++++++++++++++++++ .../state/index/v2/IndexService.scala | 1 + .../state/index/v2/MeteringStore.scala | 30 ++++ .../sandbox/auth/MeteringReportAuthIT.scala | 5 +- 23 files changed, 543 insertions(+), 97 deletions(-) delete mode 100644 ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiMeteringReportService.scala create mode 100644 ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiMeteringReportService.scala create mode 100644 ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/admin/ApiMeteringReportServiceSpec.scala create mode 100644 ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index cbbae4c83d4a..6bed79aece55 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -373,6 +373,7 @@ final class Metrics(val registry: MetricRegistry) { val stopDeduplicatingCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command") val prune: Timer = registry.timer(Prefix :+ "prune") + val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering") val publishTransaction: Timer = registry.timer(Prefix :+ "publish_transaction") val publishPartyAllocation: Timer = registry.timer(Prefix :+ "publish_party_allocation") @@ -681,6 +682,7 @@ final class Metrics(val registry: MetricRegistry) { val deduplicateCommand: Timer = registry.timer(Prefix :+ "deduplicate_command") val stopDeduplicateCommand: Timer = registry.timer(Prefix :+ "stop_deduplicating_command") val prune: Timer = registry.timer(Prefix :+ "prune") + val getTransactionMetering: Timer = registry.timer(Prefix :+ "get_transaction_metering") object streamsBuffer { private val Prefix: MetricName = index.Prefix :+ "streams_buffer" diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala index bb45efbf747d..8bce30de9f17 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala @@ -105,6 +105,7 @@ private[daml] object ApiServices { private val partyManagementService: IndexPartyManagementService = indexService private val configManagementService: IndexConfigManagementService = indexService private val submissionService: IndexSubmissionService = indexService + private val meteringStore: MeteringStore = indexService private val configurationInitializer = new LedgerConfigurationInitializer( indexService = indexService, @@ -213,7 +214,8 @@ private[daml] object ApiServices { None } - val apiMeteringReportService = new ApiMeteringReportService() + val apiMeteringReportService = + new ApiMeteringReportService(participantId, meteringStore, errorsVersionsSwitcher) apiTimeServiceOpt.toList ::: writeServiceBackedApiServices ::: diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala index de4de0de295f..fa37133d2078 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/SpannedIndexService.scala @@ -26,8 +26,9 @@ import com.daml.ledger.api.{TraceIdentifiers, domain} import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2 -import com.daml.ledger.participant.state.index.v2.IndexService +import com.daml.ledger.participant.state.index.v2.{IndexService, MeteringStore} import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp import com.daml.lf.language.Ast import com.daml.lf.transaction.GlobalKey @@ -209,4 +210,12 @@ private[daml] final class SpannedIndexService(delegate: IndexService) extends In override def currentHealth(): HealthStatus = delegate.currentHealth() + + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = { + delegate.getTransactionMetering(from, to, applicationId) + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala index 31e5d93fbe68..b6a84add4dec 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/TimedIndexService.scala @@ -26,9 +26,9 @@ import com.daml.ledger.api.v1.transaction_service.{ import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2 -import com.daml.ledger.participant.state.index.v2.IndexService +import com.daml.ledger.participant.state.index.v2.{IndexService, MeteringStore} import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.Party +import com.daml.lf.data.Ref.{ApplicationId, Party} import com.daml.lf.data.Time.Timestamp import com.daml.lf.language.Ast import com.daml.lf.transaction.GlobalKey @@ -244,4 +244,15 @@ private[daml] final class TimedIndexService(delegate: IndexService, metrics: Met override def currentHealth(): HealthStatus = delegate.currentHealth() + + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = { + Timed.future( + metrics.daml.services.index.getTransactionMetering, + delegate.getTransactionMetering(from, to, applicationId), + ) + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiMeteringReportService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiMeteringReportService.scala deleted file mode 100644 index c97655db2654..000000000000 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiMeteringReportService.scala +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.apiserver.services - -import com.daml.ledger.api.v1.admin.metering_report_service.MeteringReportServiceGrpc.MeteringReportService -import com.daml.ledger.api.v1.admin.metering_report_service._ -import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.platform.api.grpc.GrpcApiService -import io.grpc.ServerServiceDefinition - -import scala.concurrent.{ExecutionContext, Future} - -private[apiserver] final class ApiMeteringReportService()(implicit - executionContext: ExecutionContext, - loggingContext: LoggingContext, -) extends MeteringReportService - with GrpcApiService { - - private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) - - override def bindService(): ServerServiceDefinition = - MeteringReportServiceGrpc.bindService(this, executionContext) - - override def close(): Unit = () - - override def getMeteringReport( - request: GetMeteringReportRequest - ): Future[GetMeteringReportResponse] = { - logger.info(s"Received metering report request: $request") - val now = java.time.Instant.now() - val generationTime = - com.google.protobuf.timestamp.Timestamp.of(now.getEpochSecond, now.getNano) - val participantReport = ParticipantMeteringReport( - participantId = "participant1", - request.to.orElse(Some(generationTime)), - Seq( - ApplicationMeteringReport("app1", 100), - ApplicationMeteringReport("app2", 200), - ), - ) - val response = - GetMeteringReportResponse(Some(request), Some(participantReport), Some(generationTime)) - Future.successful(response).andThen(logger.logErrorsOnCall[GetMeteringReportResponse]) - } - -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiMeteringReportService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiMeteringReportService.scala new file mode 100644 index 000000000000..8b2a497fd0b8 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiMeteringReportService.scala @@ -0,0 +1,124 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.apiserver.services.admin + +import com.daml.error.{ + ContextualizedErrorLogger, + DamlContextualizedErrorLogger, + ErrorCodesVersionSwitcher, +} +import com.daml.ledger.api.v1.admin.metering_report_service.MeteringReportServiceGrpc.MeteringReportService +import com.daml.ledger.api.v1.admin.metering_report_service._ +import com.daml.ledger.participant.state.index.v2.MeteringStore +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.api.grpc.GrpcApiService +import com.daml.platform.apiserver.services.admin.ApiMeteringReportService._ +import com.daml.platform.server.api.ValidationLogger +import com.daml.platform.server.api.validation.ErrorFactories +import com.google.protobuf.timestamp.{Timestamp => ProtoTimestamp} +import io.grpc.ServerServiceDefinition + +import java.time.Instant +import scala.concurrent.{ExecutionContext, Future} +import scala.util.chaining.scalaUtilChainingOps + +private[apiserver] final class ApiMeteringReportService( + participantId: Ref.ParticipantId, + store: MeteringStore, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, + clock: () => ProtoTimestamp = () => toProtoTimestamp(Timestamp.now()), +)(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, +) extends MeteringReportService + with GrpcApiService { + + private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) + private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = + new DamlContextualizedErrorLogger(logger, loggingContext, None) + + private val generator = new MeteringReportGenerator(participantId) + private val errorFactories = ErrorFactories(errorCodesVersionSwitcher) + + override def bindService(): ServerServiceDefinition = + MeteringReportServiceGrpc.bindService(this, executionContext) + + override def close(): Unit = () + + override def getMeteringReport( + request: GetMeteringReportRequest + ): Future[GetMeteringReportResponse] = { + logger.info(s"Received metering report request: $request") + + (for { + protoFrom <- request.from.toRight("from date must be specified") + from <- toTimestamp(protoFrom) + to <- request.to.fold[Either[String, Option[Timestamp]]](Right(None))(t => + toTimestamp(t).map(Some.apply) + ) + applicationId <- toOption(request.applicationId) + .fold[Either[String, Option[Ref.ApplicationId]]](Right(None))(t => + Ref.ApplicationId.fromString(t).map(Some.apply) + ) + } yield { + val reportTime = clock() + store.getTransactionMetering(from, to, applicationId).map { metering => + generator.generate(request, metering, reportTime) + } + }) match { + case Right(f) => f + case Left(error) => + Future.failed( + ValidationLogger.logFailure(request, errorFactories.invalidArgument(None)(error)) + ) + } + } +} + +private[apiserver] object ApiMeteringReportService { + + def toOption(protoString: String): Option[String] = { + if (protoString.nonEmpty) Some(protoString) else None + } + + def toProtoTimestamp(ts: Timestamp): ProtoTimestamp = { + ts.toInstant.pipe { i => ProtoTimestamp.of(i.getEpochSecond, i.getNano) } + } + + def toTimestamp(ts: ProtoTimestamp): Either[String, Timestamp] = { + Timestamp.fromInstant(Instant.ofEpochSecond(ts.seconds, ts.nanos.toLong)) + } + + class MeteringReportGenerator(participantId: Ref.ParticipantId) { + def generate( + request: GetMeteringReportRequest, + metering: Vector[TransactionMetering], + generationTime: ProtoTimestamp, + ): GetMeteringReportResponse = { + + val applicationReports = metering + .groupMapReduce(_.applicationId)(_.actionCount.toLong)(_ + _) + .toList + .sortBy(_._1) + .map((ApplicationMeteringReport.apply _).tupled) + + val report = ParticipantMeteringReport( + participantId, + toActual = Some(generationTime), + applicationReports, + ) + + GetMeteringReportResponse( + request = Some(request), + participantReport = Some(report), + reportGenerationTime = Some(generationTime), + ) + + } + } + +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala index 5c0548fcb7ce..aa76c65e2515 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/LedgerBackedIndexService.scala @@ -30,9 +30,10 @@ import com.daml.ledger.api.v1.transaction_service.{ } import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.ledger.participant.state.index.v2._ import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.{Identifier, PackageId, Party} +import com.daml.lf.data.Ref.{ApplicationId, Identifier, PackageId, Party} import com.daml.lf.data.Time.Timestamp import com.daml.lf.language.Ast import com.daml.lf.transaction.GlobalKey @@ -346,4 +347,16 @@ private[platform] final class LedgerBackedIndexService( .map(off => Future.fromTry(ApiOffset.fromString(off.value))) .getOrElse(Future.successful(Offset.beforeBegin)) + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = { + ledger.getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + ) + } + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala index 13b00b843267..c1210a4a7760 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/MeteredReadOnlyLedger.scala @@ -18,8 +18,13 @@ import com.daml.ledger.api.v1.transaction_service.{ } import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} +import com.daml.ledger.participant.state.index.v2.{ + CommandDeduplicationResult, + MeteringStore, + PackageDetails, +} import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp import com.daml.lf.language.Ast import com.daml.lf.transaction.GlobalKey @@ -198,6 +203,17 @@ private[platform] class MeteredReadOnlyLedger(ledger: ReadOnlyLedger, metrics: M metrics.daml.index.prune, ledger.prune(pruneUpToInclusive, pruneAllDivulgedContracts), ) + + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = { + Timed.future( + metrics.daml.index.getTransactionMetering, + ledger.getTransactionMetering(from, to, applicationId), + ) + } } private[platform] object MeteredReadOnlyLedger { diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala index 27ceb475acd2..fa2b2ad5a1a6 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/BaseLedger.scala @@ -32,6 +32,7 @@ import com.daml.platform.PruneBuffers import com.daml.platform.akkastreams.dispatcher.Dispatcher import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource import com.daml.platform.store.appendonlydao.{LedgerDaoTransactionsReader, LedgerReadDao} +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry} import scala.concurrent.{ExecutionContext, Future} @@ -204,5 +205,13 @@ private[platform] abstract class BaseLedger( ledgerDao.prune(pruneUpToInclusive, pruneAllDivulgedContracts) } + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[Ref.ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = { + ledgerDao.getTransactionMetering(from, to, applicationId) + } + override def close(): Unit = () } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala index 11fb373f3686..8cb55d44fb21 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/ReadOnlyLedger.scala @@ -18,6 +18,7 @@ import com.daml.ledger.api.v1.transaction_service.{ } import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -169,4 +170,11 @@ private[platform] trait ReadOnlyLedger extends ReportsHealth with AutoCloseable def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext ): Future[Unit] + + def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[Ref.ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] + } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index 99f22e7d82ef..e2e97b6ab11c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -12,6 +12,7 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.ledger.participant.state.index.v2.{ CommandDeduplicationDuplicate, CommandDeduplicationNew, @@ -21,6 +22,7 @@ import com.daml.ledger.participant.state.index.v2.{ import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.archive.ArchiveParser import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp import com.daml.lf.engine.ValueEnricher import com.daml.lf.transaction.{BlindingInfo, CommittedTransaction} @@ -590,6 +592,17 @@ private class JdbcLedgerDao( PersistenceResponse.Ok } } + + /** Returns all TransactionMetering records matching given criteria */ + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] = { + dbDispatcher.executeSql(metrics.daml.index.db.lookupConfiguration)( + readStorageBackend.meteringStorageBackend.transactionMetering(from, to, applicationId) + ) + } } private[platform] object JdbcLedgerDao { diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala index c809f26718df..afeedac63714 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/LedgerDao.scala @@ -18,6 +18,7 @@ import com.daml.ledger.api.v1.transaction_service.{ } import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref @@ -215,6 +216,14 @@ private[platform] trait LedgerReadDao extends ReportsHealth { def prune(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(implicit loggingContext: LoggingContext ): Future[Unit] + + /** Returns all TransactionMetering records matching given criteria */ + def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[Ref.ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] + } // TODO sandbox-classic clean-up: This interface and its implementation is only used in the JdbcLedgerDao suite diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala index 77d9a08bfd4d..3b7b06e7240e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/MeteredLedgerDao.scala @@ -10,9 +10,14 @@ import com.daml.ledger.api.domain.{CommandId, LedgerId, ParticipantId, PartyDeta import com.daml.ledger.api.health.HealthStatus import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset -import com.daml.ledger.participant.state.index.v2.{CommandDeduplicationResult, PackageDetails} +import com.daml.ledger.participant.state.index.v2.{ + CommandDeduplicationResult, + MeteringStore, + PackageDetails, +} import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.ApplicationId import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.{BlindingInfo, CommittedTransaction} import com.daml.logging.LoggingContext @@ -127,6 +132,18 @@ private[platform] class MeteredLedgerReadDao(ledgerDao: LedgerReadDao, metrics: metrics.daml.index.db.prune, ledgerDao.prune(pruneUpToInclusive, pruneAllDivulgedContracts), ) + + /** Returns all TransactionMetering records matching given criteria */ + override def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[MeteringStore.TransactionMetering]] = { + Timed.future( + metrics.daml.index.db.prune, + ledgerDao.getTransactionMetering(from, to, applicationId), + ) + } } private[platform] class MeteredLedgerDao(ledgerDao: LedgerDao, metrics: Metrics) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index f44c9ba3e1b4..2a87d8caa553 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -7,9 +7,10 @@ import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails, User, import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.configuration.Configuration import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref -import com.daml.lf.data.Ref.UserId +import com.daml.lf.data.Ref.{ApplicationId, UserId} import com.daml.lf.data.Time.Timestamp import com.daml.lf.ledger.EventId import com.daml.logging.LoggingContext @@ -431,15 +432,11 @@ object UserManagementStorageBackend { case class DbUser(internalId: Int, domainUser: User) } -object MeteringStorageBackend { - case class TransactionMetering( - applicationId: Ref.ApplicationId, - actionCount: Int, - meteringTimestamp: Timestamp, - ledgerOffset: Offset, - ) -} - trait MeteringStorageBackend { - def entries(connection: Connection): Vector[MeteringStorageBackend.TransactionMetering] + + def transactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + )(connection: Connection): Vector[TransactionMetering] } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala index fbeb1210a2cf..974dd4557e55 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala @@ -45,6 +45,7 @@ trait StorageBackendFactory { completionStorageBackend = createCompletionStorageBackend(stringInterning), contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning), eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning), + meteringStorageBackend = createMeteringStorageBackend(ledgerEndCache), ) } @@ -64,4 +65,5 @@ case class ReadStorageBackend( completionStorageBackend: CompletionStorageBackend, contractStorageBackend: ContractStorageBackend, eventStorageBackend: EventStorageBackend, + meteringStorageBackend: MeteringStorageBackend, ) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala index 9afbc49a287e..b12e31cb98ce 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/MeteringStorageBackendTemplate.scala @@ -5,10 +5,12 @@ package com.daml.platform.store.backend.common import anorm.SqlParser.int import anorm.{RowParser, ~} +import com.daml.lf.data.Ref.ApplicationId +import com.daml.lf.data.Time import com.daml.platform.store.Conversions.{applicationId, offset, timestampFromMicros} import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf import com.daml.platform.store.backend.MeteringStorageBackend -import com.daml.platform.store.backend.MeteringStorageBackend.TransactionMetering +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation import com.daml.platform.store.cache.LedgerEndCache @@ -38,22 +40,29 @@ private[backend] class MeteringStorageBackendTemplate(ledgerEndCache: LedgerEndC } - override def entries( - connection: Connection - ): Vector[MeteringStorageBackend.TransactionMetering] = { + override def transactionMetering( + from: Time.Timestamp, + to: Option[Time.Timestamp], + applicationId: Option[ApplicationId], + )(connection: Connection): Vector[TransactionMetering] = { SQL""" select application_id, action_count, metering_timestamp, ledger_offset - from - transaction_metering - where - ledger_offset <= ${ledgerEndCache()._1.toHexString.toString} - order by ledger_offset, application_id asc + from transaction_metering + where ledger_offset <= ${ledgerEndCache()._1.toHexString.toString} + and metering_timestamp >= ${from.micros} + and (${isSet(to)} = 0 or metering_timestamp < ${to.map(_.micros)}) + and (${isSet(applicationId)} = 0 or application_id = ${applicationId.map(_.toString)}) """ .asVectorOf(transactionMeteringParser)(connection) } + /** Oracle does not understand true/false so compare against number + * @return 0 if the option is unset and non-zero otherwise + */ + private def isSet(o: Option[_]): Int = o.fold(0)(_ => 1) + } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala index a223cae1b89d..97ff08284e1e 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestValues.scala @@ -7,6 +7,7 @@ import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.domain.{LedgerId, ParticipantId} import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.lf.crypto.Hash import com.daml.lf.data.Ref import com.daml.lf.data.Time.Timestamp @@ -243,7 +244,7 @@ private[backend] object StorageBackendTestValues { ) def dtoTransactionMetering( - metering: MeteringStorageBackend.TransactionMetering + metering: TransactionMetering ): DbDto.TransactionMetering = { import metering._ DbDto.TransactionMetering( diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala index b5efd6e485ba..768d74db65bc 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala @@ -3,7 +3,9 @@ package com.daml.platform.store.backend +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp import org.scalatest.Inside import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -19,7 +21,7 @@ private[backend] trait StorageBackendTestsInitializeIngestion import StorageBackendTestValues._ val metering = - MeteringStorageBackend.TransactionMetering( + TransactionMetering( someApplicationId, 1, someTime, @@ -117,7 +119,7 @@ private[backend] trait StorageBackendTestsInitializeIngestion ) ) - val metering1 = executeSql(backend.metering.entries) + val metering1 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) // Restart the indexer - should delete data from the partial insert above val end2 = executeSql(backend.parameter.ledgerEnd) @@ -152,7 +154,7 @@ private[backend] trait StorageBackendTestsInitializeIngestion ) ) - val metering2 = executeSql(backend.metering.entries) + val metering2 = executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) parties1 should have length 1 packages1 should have size 1 diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala index f55d37b3cc2b..a3b7ef2d621c 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMetering.scala @@ -3,10 +3,13 @@ package com.daml.platform.store.backend -import com.daml.platform.store.backend.MeteringStorageBackend.TransactionMetering -import org.scalatest.Inside +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering +import com.daml.lf.data.Ref +import com.daml.lf.data.Ref.ApplicationId +import com.daml.lf.data.Time.Timestamp import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.{Assertion, Inside} private[backend] trait StorageBackendTestsMetering extends Matchers @@ -14,29 +17,93 @@ private[backend] trait StorageBackendTestsMetering with StorageBackendSpec { this: AnyFlatSpec => - behavior of "StorageBackend (metering)" - import StorageBackendTestValues._ - it should "persist transaction metering" in { + { + behavior of "StorageBackend (metering)" + + it should "persist transaction metering" in { + + val toOffset = offset(5) + + val metering = TransactionMetering( + someApplicationId, + actionCount = 1, + meteringTimestamp = someTime.addMicros(2), + ledgerOffset = offset(4), + ) + + val expected = metering + executeSql(backend.parameter.initializeParameters(someIdentityParams)) + executeSql(ingest(Vector(dtoTransactionMetering(metering)), _)) + executeSql(updateLedgerEnd(toOffset, 5L)) + val Vector(actual) = + executeSql(backend.metering.transactionMetering(Timestamp.Epoch, None, None)) + actual shouldBe expected + + } - val now = someTime - val toOffset = offset(5) + val appIdA: Ref.ApplicationId = Ref.ApplicationId.assertFromString("appA") + val appIdB: Ref.ApplicationId = Ref.ApplicationId.assertFromString("appB") - val metering = TransactionMetering( - someApplicationId, + def build(index: Long, appId: Ref.ApplicationId) = TransactionMetering( + appId, actionCount = 1, - meteringTimestamp = now.addMicros(2), - ledgerOffset = offset(4), + meteringTimestamp = someTime.addMicros(index), + ledgerOffset = offset(index), ) - val expected = metering - executeSql(backend.parameter.initializeParameters(someIdentityParams)) - executeSql(ingest(Vector(dtoTransactionMetering(metering)), _)) - executeSql(updateLedgerEnd(toOffset, 5L)) - val Vector(actual) = executeSql(backend.metering.entries) - actual shouldBe expected + val ledgerEnd = 4L - } + val metering = Vector( + build(1, appIdA), + build(2, appIdA), + build(3, appIdB), + build(4, appIdA), + build(5, appIdA), + ) + + def populate(): Unit = { + executeSql(backend.parameter.initializeParameters(someIdentityParams)) + executeSql(ingest(metering.map(dtoTransactionMetering), _)) + executeSql(updateLedgerEnd(offset(ledgerEnd), ledgerEnd)) + } + def execute( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[ApplicationId], + ): Set[TransactionMetering] = { + metering + .filter(_.ledgerOffset <= offset(ledgerEnd)) + .filter(_.meteringTimestamp >= from) + .filter(m => to.fold(true)(m.meteringTimestamp < _)) + .filter(m => applicationId.fold(true)(e => m.applicationId == e)) + .toSet + } + + def check( + fromIdx: Long, + toIdx: Option[Long], + applicationId: Option[ApplicationId], + ): Assertion = { + populate() + val from = someTime.addMicros(fromIdx) + val to = toIdx.map(someTime.addMicros) + val actual = executeSql(backend.metering.transactionMetering(from, to, applicationId)).toSet + val expected = execute(from, to, applicationId) + actual.map(_.ledgerOffset) shouldBe expected.map(_.ledgerOffset) + actual shouldBe expected + } + + it should "only include after from date that have been ingested" in { + check(2, None, None) + } + it should "only include rows that existed before any to timestamp" in { + check(2, toIdx = Some(4), None) + } + it should "only include rows for any given application" in { + check(2, None, Some(appIdA)) + } + } } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/admin/ApiMeteringReportServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/admin/ApiMeteringReportServiceSpec.scala new file mode 100644 index 000000000000..4f9a8c2341f3 --- /dev/null +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/admin/ApiMeteringReportServiceSpec.scala @@ -0,0 +1,148 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.apiserver.services.admin + +import com.daml.error.ErrorCodesVersionSwitcher +import com.daml.ledger.api.v1.admin.metering_report_service.{ + ApplicationMeteringReport, + GetMeteringReportRequest, + GetMeteringReportResponse, + ParticipantMeteringReport, +} +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.index.v2.MeteringStore +import com.daml.ledger.participant.state.index.v2.MeteringStore.TransactionMetering +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext +import com.daml.platform.apiserver.services.admin.ApiMeteringReportService.{ + MeteringReportGenerator, + toProtoTimestamp, +} +import org.mockito.MockitoSugar +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import scala.concurrent.Future + +class ApiMeteringReportServiceSpec extends AsyncWordSpec with Matchers with MockitoSugar { + + private val someParticipantId = Ref.ParticipantId.assertFromString("test-participant") + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + + private val switcher = new ErrorCodesVersionSwitcher(true) + + private val appIdA = Ref.ApplicationId.assertFromString("AppA") + private val appIdB = Ref.ApplicationId.assertFromString("AppB") + + private def build(actionCount: Int, appId: Ref.ApplicationId) = TransactionMetering( + applicationId = appId, + actionCount = actionCount, + meteringTimestamp = Timestamp.Epoch, + ledgerOffset = Offset.beforeBegin, + ) + + private val metering = Vector( + build(1, appIdA), + build(2, appIdB), + build(3, appIdA), + ) + + "the metering report generator" should { + + "generate report" in { + + val underTest = new MeteringReportGenerator(someParticipantId) + + val request = GetMeteringReportRequest.defaultInstance + + val generationTime = toProtoTimestamp(Timestamp.now()) + + val actual = underTest.generate(request, metering, generationTime) + + val expectedReport = ParticipantMeteringReport( + participantId = someParticipantId, + toActual = Some(generationTime), + applicationReports = Seq( + ApplicationMeteringReport(appIdA, 4), + ApplicationMeteringReport(appIdB, 2), + ), + ) + + val expected = GetMeteringReportResponse( + request = Some(request), + participantReport = Some(expectedReport), + reportGenerationTime = Some(generationTime), + ) + + actual shouldBe expected + + } + + } + + "the metering report service" should { + + "generate report with optional parameters unset" in { + + val store = mock[MeteringStore] + + val expectedGenTime = toProtoTimestamp(Timestamp.now().addMicros(-1000)) + + val underTest = + new ApiMeteringReportService(someParticipantId, store, switcher, () => expectedGenTime) + + val from = Timestamp(10000) + + val request = GetMeteringReportRequest.defaultInstance.withFrom(toProtoTimestamp(from)) + + val expected = + new MeteringReportGenerator(someParticipantId).generate(request, metering, expectedGenTime) + + when(store.getTransactionMetering(from, None, None)).thenReturn(Future.successful(metering)) + + underTest.getMeteringReport(request).map { actual => + actual shouldBe expected + } + + } + + "generate report with with optional parameters set" in { + + val store = mock[MeteringStore] + + val expectedGenTime = toProtoTimestamp(Timestamp.now().addMicros(-1000)) + + val underTest = + new ApiMeteringReportService(someParticipantId, store, switcher, () => expectedGenTime) + + val from = Timestamp(10000) + val to = Timestamp(20000) + val appId = Ref.ApplicationId.assertFromString("AppT") + + val request = GetMeteringReportRequest.defaultInstance + .withFrom(toProtoTimestamp(from)) + .withTo(toProtoTimestamp(to)) + .withApplicationId(appId) + + val expected = + new MeteringReportGenerator(someParticipantId).generate(request, metering, expectedGenTime) + + when(store.getTransactionMetering(from, Some(to), Some(appId))) + .thenReturn(Future.successful(metering)) + + underTest.getMeteringReport(request).map { actual => + actual shouldBe expected + } + } + + "fail if the from timestamp is unset" in { + val underTest = new ApiMeteringReportService(someParticipantId, mock[MeteringStore], switcher) + val request = GetMeteringReportRequest.defaultInstance + underTest.getMeteringReport(request).failed.map { _ => succeed } + } + + } + +} diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala index 9e5774c2e94b..e1af45b5a50a 100644 --- a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/IndexService.scala @@ -17,5 +17,6 @@ trait IndexService with IndexConfigManagementService with IndexParticipantPruningService with IndexSubmissionService + with MeteringStore // with IndexTimeService //TODO: this needs some further discussion as the TimeService is actually optional with ReportsHealth diff --git a/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala new file mode 100644 index 000000000000..c7b248b558b6 --- /dev/null +++ b/ledger/participant-state-index/src/main/scala/com/daml/ledger/participant/state/index/v2/MeteringStore.scala @@ -0,0 +1,30 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.index.v2 + +import com.daml.ledger.offset.Offset +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.logging.LoggingContext +import scala.concurrent.Future +import MeteringStore._ + +trait MeteringStore { + + def getTransactionMetering( + from: Timestamp, + to: Option[Timestamp], + applicationId: Option[Ref.ApplicationId], + )(implicit loggingContext: LoggingContext): Future[Vector[TransactionMetering]] + +} + +object MeteringStore { + case class TransactionMetering( + applicationId: Ref.ApplicationId, + actionCount: Int, + meteringTimestamp: Timestamp, + ledgerOffset: Offset, + ) +} diff --git a/ledger/sandbox-on-x/src/test/it/scala/com/daml/ledger/platform/sandbox/auth/MeteringReportAuthIT.scala b/ledger/sandbox-on-x/src/test/it/scala/com/daml/ledger/platform/sandbox/auth/MeteringReportAuthIT.scala index 0572a729659a..16ef4cb44830 100644 --- a/ledger/sandbox-on-x/src/test/it/scala/com/daml/ledger/platform/sandbox/auth/MeteringReportAuthIT.scala +++ b/ledger/sandbox-on-x/src/test/it/scala/com/daml/ledger/platform/sandbox/auth/MeteringReportAuthIT.scala @@ -8,6 +8,7 @@ import com.daml.ledger.api.v1.admin.metering_report_service.{ GetMeteringReportRequest, MeteringReportServiceGrpc, } +import com.google.protobuf.timestamp.Timestamp import scala.concurrent.Future @@ -17,6 +18,8 @@ final class MeteringReportAuthIT extends AdminServiceCallAuthTests { override def serviceCallWithToken(token: Option[String]): Future[Any] = stub(MeteringReportServiceGrpc.stub(channel), token) - .getMeteringReport(GetMeteringReportRequest(None, None)) + .getMeteringReport( + GetMeteringReportRequest.defaultInstance.withFrom(Timestamp.defaultInstance) + ) }