diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala index a062ff05d8dd..b62591c8cb34 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala @@ -10,106 +10,84 @@ import com.daml.buildinfo.BuildInfo import com.daml.error.ErrorCodesVersionSwitcher import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor import com.daml.ledger.api.auth.{AuthService, Authorizer} -import com.daml.ledger.api.domain import com.daml.ledger.api.health.HealthChecks import com.daml.ledger.configuration.LedgerId import com.daml.ledger.participant.state.{v2 => state} -import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +import com.daml.ledger.resources.ResourceOwner import com.daml.lf.data.Ref -import com.daml.lf.data.Time.Timestamp -import com.daml.lf.engine.{Engine, ValueEnricher} +import com.daml.lf.engine.Engine import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.Metrics import com.daml.platform.configuration.{ CommandConfiguration, PartyConfiguration, - ServerRole, SubmissionConfiguration, } -import com.daml.platform.index.JdbcIndex -import com.daml.platform.packages.InMemoryPackageStore import com.daml.platform.services.time.TimeProviderType -import com.daml.platform.store.LfValueTranslationCache import com.daml.ports.{Port, PortFiles} import io.grpc.{BindableService, ServerInterceptor} import scalaz.{-\/, \/-} -import java.io.File import java.time.Clock +import com.daml.ledger.participant.state.index.v2.IndexService import com.daml.telemetry.TelemetryContext import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success, Try} -final class StandaloneApiServer( - ledgerId: LedgerId, - config: ApiServerConfig, - commandConfig: CommandConfiguration, - partyConfig: PartyConfiguration, - submissionConfig: SubmissionConfiguration, - optWriteService: Option[state.WriteService], - authService: AuthService, - healthChecks: HealthChecks, - metrics: Metrics, - timeServiceBackend: Option[TimeServiceBackend] = None, - otherServices: immutable.Seq[BindableService] = immutable.Seq.empty, - otherInterceptors: List[ServerInterceptor] = List.empty, - engine: Engine, - servicesExecutionContext: ExecutionContextExecutor, - lfValueTranslationCache: LfValueTranslationCache.Cache, - checkOverloaded: TelemetryContext => Option[state.SubmissionResult] = - _ => None, // Used for Canton rate-limiting -)(implicit actorSystem: ActorSystem, materializer: Materializer, loggingContext: LoggingContext) - extends ResourceOwner[ApiServer] { - +object StandaloneApiServer { private val logger = ContextualizedLogger.get(this.getClass) - // Name of this participant, - val participantId: Ref.ParticipantId = config.participantId + def apply( + indexService: IndexService, + ledgerId: LedgerId, + config: ApiServerConfig, + commandConfig: CommandConfiguration, + partyConfig: PartyConfiguration, + submissionConfig: SubmissionConfiguration, + optWriteService: Option[state.WriteService], + authService: AuthService, + healthChecks: HealthChecks, + metrics: Metrics, + timeServiceBackend: Option[TimeServiceBackend] = None, + otherServices: immutable.Seq[BindableService] = immutable.Seq.empty, + otherInterceptors: List[ServerInterceptor] = List.empty, + engine: Engine, + servicesExecutionContext: ExecutionContextExecutor, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult] = + _ => None, // Used for Canton rate-limiting + )(implicit + actorSystem: ActorSystem, + materializer: Materializer, + loggingContext: LoggingContext, + ): ResourceOwner[ApiServer] = { + val participantId: Ref.ParticipantId = config.participantId - override def acquire()(implicit context: ResourceContext): Resource[ApiServer] = { - val packageStore = loadDamlPackages() - preloadPackages(packageStore) + def writePortFile(port: Port): Try[Unit] = { + config.portFile match { + case Some(path) => + PortFiles.write(path, port) match { + case -\/(err) => Failure(new RuntimeException(err.toString)) + case \/-(()) => Success(()) + } + case None => + Success(()) + } + } - val valueEnricher = new ValueEnricher(engine) + val errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( + config.enableSelfServiceErrorCodes + ) + val authorizer = new Authorizer( + Clock.systemUTC.instant _, + ledgerId, + participantId, + errorCodesVersionSwitcher, + ) + val healthChecksWithIndexService = healthChecks + ("index" -> indexService) - val owner = for { - indexService <- JdbcIndex - .owner( - serverRole = ServerRole.ApiServer, - ledgerId = domain.LedgerId(ledgerId), - participantId = participantId, - jdbcUrl = config.jdbcUrl, - databaseConnectionPoolSize = config.databaseConnectionPoolSize, - databaseConnectionTimeout = config.databaseConnectionTimeout, - eventsPageSize = config.eventsPageSize, - eventsProcessingParallelism = config.eventsProcessingParallelism, - acsIdPageSize = config.acsIdPageSize, - acsIdFetchingParallelism = config.acsIdFetchingParallelism, - acsContractFetchingParallelism = config.acsContractFetchingParallelism, - servicesExecutionContext = servicesExecutionContext, - metrics = metrics, - lfValueTranslationCache = lfValueTranslationCache, - enricher = valueEnricher, - maxContractStateCacheSize = config.maxContractStateCacheSize, - maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize, - enableMutableContractStateCache = config.enableMutableContractStateCache, - maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize, - enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi, - enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, - ) - .map(index => new SpannedIndexService(new TimedIndexService(index, metrics))) - errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( - config.enableSelfServiceErrorCodes - ) - authorizer = new Authorizer( - Clock.systemUTC.instant _, - ledgerId, - participantId, - errorCodesVersionSwitcher, - ) - healthChecksWithIndexService = healthChecks + ("index" -> indexService) + for { executionSequencerFactory <- new ExecutionSequencerFactoryOwner() apiServicesOwner = new ApiServices.Owner( participantId = participantId, @@ -145,7 +123,7 @@ final class StandaloneApiServer( config.tlsConfig, AuthorizationInterceptor( authService, - executionContext, + servicesExecutionContext, errorCodesVersionSwitcher, ) :: otherInterceptors, servicesExecutionContext, @@ -158,49 +136,5 @@ final class StandaloneApiServer( ) apiServer } - - owner.acquire() - } - - private def preloadPackages(packageContainer: InMemoryPackageStore): Unit = { - for { - (pkgId, _) <- packageContainer.listLfPackagesSync() - pkg <- packageContainer.getLfPackageSync(pkgId) - } { - engine - .preloadPackage(pkgId, pkg) - .consume( - { _ => - sys.error("Unexpected request of contract") - }, - packageContainer.getLfPackageSync, - { _ => - sys.error("Unexpected request of contract key") - }, - ) - () - } - } - - private def loadDamlPackages(): InMemoryPackageStore = { - // TODO is it sensible to have all the initial packages to be known since the epoch? - config.archiveFiles - .foldLeft[Either[(String, File), InMemoryPackageStore]](Right(InMemoryPackageStore.empty)) { - case (storeE, f) => - storeE.flatMap(_.withDarFile(Timestamp.now(), None, f).left.map(_ -> f)) - } - .fold({ case (err, file) => sys.error(s"Could not load package $file: $err") }, identity) - } - - private def writePortFile(port: Port): Try[Unit] = { - config.portFile match { - case Some(path) => - PortFiles.write(path, port) match { - case -\/(err) => Failure(new RuntimeException(err.toString)) - case \/-(()) => Success(()) - } - case None => - Success(()) - } } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneIndexService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneIndexService.scala new file mode 100644 index 000000000000..175836f9cd0a --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneIndexService.scala @@ -0,0 +1,102 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.apiserver + +import java.io.File + +import akka.stream.Materializer +import com.daml.ledger.api.domain +import com.daml.ledger.configuration.LedgerId +import com.daml.ledger.participant.state.index.v2.IndexService +import com.daml.ledger.resources.ResourceOwner +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.lf.engine.{Engine, ValueEnricher} +import com.daml.logging.LoggingContext +import com.daml.metrics.Metrics +import com.daml.platform.configuration.ServerRole +import com.daml.platform.index.JdbcIndex +import com.daml.platform.packages.InMemoryPackageStore +import com.daml.platform.store.LfValueTranslationCache + +import scala.concurrent.ExecutionContextExecutor + +object StandaloneIndexService { + def apply( + ledgerId: LedgerId, + config: ApiServerConfig, + metrics: Metrics, + engine: Engine, + servicesExecutionContext: ExecutionContextExecutor, + lfValueTranslationCache: LfValueTranslationCache.Cache, + )(implicit + materializer: Materializer, + loggingContext: LoggingContext, + ): ResourceOwner[IndexService] = { + val participantId: Ref.ParticipantId = config.participantId + val valueEnricher = new ValueEnricher(engine) + + def preloadPackages(packageContainer: InMemoryPackageStore): Unit = { + for { + (pkgId, _) <- packageContainer.listLfPackagesSync() + pkg <- packageContainer.getLfPackageSync(pkgId) + } { + engine + .preloadPackage(pkgId, pkg) + .consume( + { _ => + sys.error("Unexpected request of contract") + }, + packageContainer.getLfPackageSync, + { _ => + sys.error("Unexpected request of contract key") + }, + ) + () + } + } + + def loadDamlPackages(): InMemoryPackageStore = { + // TODO is it sensible to have all the initial packages to be known since the epoch? + config.archiveFiles + .foldLeft[Either[(String, File), InMemoryPackageStore]](Right(InMemoryPackageStore.empty)) { + case (storeE, f) => + storeE.flatMap(_.withDarFile(Timestamp.now(), None, f).left.map(_ -> f)) + } + .fold({ case (err, file) => sys.error(s"Could not load package $file: $err") }, identity) + } + + for { + _ <- ResourceOwner.forValue(() => { + val packageStore = loadDamlPackages() + preloadPackages(packageStore) + }) + indexService <- JdbcIndex + .owner( + serverRole = ServerRole.ApiServer, + ledgerId = domain.LedgerId(ledgerId), + participantId = participantId, + jdbcUrl = config.jdbcUrl, + databaseConnectionPoolSize = config.databaseConnectionPoolSize, + databaseConnectionTimeout = config.databaseConnectionTimeout, + eventsPageSize = config.eventsPageSize, + eventsProcessingParallelism = config.eventsProcessingParallelism, + acsIdPageSize = config.acsIdPageSize, + acsIdFetchingParallelism = config.acsIdFetchingParallelism, + acsContractFetchingParallelism = config.acsContractFetchingParallelism, + servicesExecutionContext = servicesExecutionContext, + metrics = metrics, + lfValueTranslationCache = lfValueTranslationCache, + enricher = valueEnricher, + maxContractStateCacheSize = config.maxContractStateCacheSize, + maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize, + enableMutableContractStateCache = config.enableMutableContractStateCache, + maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize, + enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ) + .map(index => new SpannedIndexService(new TimedIndexService(index, metrics))) + } yield indexService + } +} diff --git a/ledger/participant-state/kvutils/app/BUILD.bazel b/ledger/participant-state/kvutils/app/BUILD.bazel index 5940050e966f..f3ccd72df2e6 100644 --- a/ledger/participant-state/kvutils/app/BUILD.bazel +++ b/ledger/participant-state/kvutils/app/BUILD.bazel @@ -46,6 +46,7 @@ da_scala_library( "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/participant-state", + "//ledger/participant-state-index", "//ledger/participant-state-metrics", "//ledger/participant-state/kvutils", "//libs-scala/contextualized-logging", diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index 8c765507f917..e7d3d7fcb000 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -21,7 +21,7 @@ import com.daml.lf.engine.{Engine, EngineConfig} import com.daml.logging.LoggingContext.{newLoggingContext, withEnrichedLoggingContext} import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.JvmMetricSet -import com.daml.platform.apiserver.StandaloneApiServer +import com.daml.platform.apiserver.{StandaloneApiServer, StandaloneIndexService} import com.daml.platform.indexer.StandaloneIndexerServer import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.{IndexMetadata, LfValueTranslationCache} @@ -160,11 +160,21 @@ final class Runner[T <: ReadWriteService, Extra]( case ParticipantRunMode.LedgerApiServer => Resource.successful(healthChecks) } + apiServerConfig = factory.apiServerConfig(participantConfig, config) + indexService <- StandaloneIndexService( + ledgerId = config.ledgerId, + config = apiServerConfig, + metrics = metrics, + engine = sharedEngine, + servicesExecutionContext = servicesExecutionContext, + lfValueTranslationCache = lfValueTranslationCache, + ).acquire() _ <- participantConfig.mode match { case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer => - new StandaloneApiServer( + StandaloneApiServer( + indexService = indexService, ledgerId = config.ledgerId, - config = factory.apiServerConfig(participantConfig, config), + config = apiServerConfig, commandConfig = config.commandConfig, submissionConfig = config.submissionConfig, partyConfig = factory.partyConfig(config), @@ -176,7 +186,6 @@ final class Runner[T <: ReadWriteService, Extra]( otherInterceptors = factory.interceptors(config), engine = sharedEngine, servicesExecutionContext = servicesExecutionContext, - lfValueTranslationCache = lfValueTranslationCache, ).acquire() case ParticipantRunMode.Indexer => Resource.unit diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index 4b8736f51d35..9ed8e53d7352 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -57,6 +57,7 @@ alias( "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/participant-state", + "//ledger/participant-state-index", "//ledger/participant-state-metrics", "//ledger/participant-state/kvutils", "//ledger/sandbox-common:sandbox-common-{}".format(edition), diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index 3ee54497270d..18890a14d1ca 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -242,33 +242,43 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { ), ) } - apiServer <- new StandaloneApiServer( + apiServerConfig = ApiServerConfig( + participantId = config.participantId, + archiveFiles = if (isReset) List.empty else config.damlPackages, + // Re-use the same port when resetting the server. + port = currentPort.getOrElse(config.port), + address = config.address, + jdbcUrl = indexJdbcUrl, + databaseConnectionPoolSize = config.databaseConnectionPoolSize, + databaseConnectionTimeout = config.databaseConnectionTimeout, + tlsConfig = config.tlsConfig, + maxInboundMessageSize = config.maxInboundMessageSize, + initialLedgerConfiguration = Some(config.initialLedgerConfiguration), + configurationLoadTimeout = config.configurationLoadTimeout, + eventsPageSize = config.eventsPageSize, + portFile = config.portFile, + // TODO append-only: augment the following defaults for enabling the features for sandbox next + seeding = config.seeding.get, + managementServiceTimeout = config.managementServiceTimeout, + maxContractStateCacheSize = 0L, + maxContractKeyStateCacheSize = 0L, + enableMutableContractStateCache = false, + maxTransactionsInMemoryFanOutBufferSize = 0L, + enableInMemoryFanOutForLedgerApi = false, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ) + indexService <- StandaloneIndexService( ledgerId = ledgerId, - config = ApiServerConfig( - participantId = config.participantId, - archiveFiles = if (isReset) List.empty else config.damlPackages, - // Re-use the same port when resetting the server. - port = currentPort.getOrElse(config.port), - address = config.address, - jdbcUrl = indexJdbcUrl, - databaseConnectionPoolSize = config.databaseConnectionPoolSize, - databaseConnectionTimeout = config.databaseConnectionTimeout, - tlsConfig = config.tlsConfig, - maxInboundMessageSize = config.maxInboundMessageSize, - initialLedgerConfiguration = Some(config.initialLedgerConfiguration), - configurationLoadTimeout = config.configurationLoadTimeout, - eventsPageSize = config.eventsPageSize, - portFile = config.portFile, - // TODO append-only: augment the following defaults for enabling the features for sandbox next - seeding = config.seeding.get, - managementServiceTimeout = config.managementServiceTimeout, - maxContractStateCacheSize = 0L, - maxContractKeyStateCacheSize = 0L, - enableMutableContractStateCache = false, - maxTransactionsInMemoryFanOutBufferSize = 0L, - enableInMemoryFanOutForLedgerApi = false, - enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, - ), + config = apiServerConfig, + metrics = metrics, + engine = engine, + servicesExecutionContext = servicesExecutionContext, + lfValueTranslationCache = lfValueTranslationCache, + ) + apiServer <- StandaloneApiServer( + indexService = indexService, + ledgerId = ledgerId, + config = apiServerConfig, engine = engine, commandConfig = config.commandConfig, partyConfig = PartyConfiguration.default.copy( @@ -283,7 +293,6 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { otherServices = List(resetService), otherInterceptors = List(resetService), servicesExecutionContext = servicesExecutionContext, - lfValueTranslationCache = lfValueTranslationCache, ) _ = apiServerServicesClosed.completeWith(apiServer.servicesClosed()) } yield {