Skip to content

Commit

Permalink
Refactor StandaloneApiServer factory (#11842)
Browse files Browse the repository at this point in the history
To enable direct IndexServer building (and enable two step ApiServer build process)
This will be needed for KV WriteService proxy: a wrapper is being build to serve deduplication time conversion, which needs access to completion service.

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 25, 2021
1 parent 6356f13 commit beca0ee
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,106 +10,84 @@ import com.daml.buildinfo.BuildInfo
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor
import com.daml.ledger.api.auth.{AuthService, Authorizer}
import com.daml.ledger.api.domain
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Engine, ValueEnricher}
import com.daml.lf.engine.Engine
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.{
CommandConfiguration,
PartyConfiguration,
ServerRole,
SubmissionConfiguration,
}
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.services.time.TimeProviderType
import com.daml.platform.store.LfValueTranslationCache
import com.daml.ports.{Port, PortFiles}
import io.grpc.{BindableService, ServerInterceptor}
import scalaz.{-\/, \/-}
import java.io.File
import java.time.Clock

import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.telemetry.TelemetryContext

import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success, Try}

final class StandaloneApiServer(
ledgerId: LedgerId,
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optWriteService: Option[state.WriteService],
authService: AuthService,
healthChecks: HealthChecks,
metrics: Metrics,
timeServiceBackend: Option[TimeServiceBackend] = None,
otherServices: immutable.Seq[BindableService] = immutable.Seq.empty,
otherInterceptors: List[ServerInterceptor] = List.empty,
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
lfValueTranslationCache: LfValueTranslationCache.Cache,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult] =
_ => None, // Used for Canton rate-limiting
)(implicit actorSystem: ActorSystem, materializer: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[ApiServer] {

object StandaloneApiServer {
private val logger = ContextualizedLogger.get(this.getClass)

// Name of this participant,
val participantId: Ref.ParticipantId = config.participantId
def apply(
indexService: IndexService,
ledgerId: LedgerId,
config: ApiServerConfig,
commandConfig: CommandConfiguration,
partyConfig: PartyConfiguration,
submissionConfig: SubmissionConfiguration,
optWriteService: Option[state.WriteService],
authService: AuthService,
healthChecks: HealthChecks,
metrics: Metrics,
timeServiceBackend: Option[TimeServiceBackend] = None,
otherServices: immutable.Seq[BindableService] = immutable.Seq.empty,
otherInterceptors: List[ServerInterceptor] = List.empty,
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult] =
_ => None, // Used for Canton rate-limiting
)(implicit
actorSystem: ActorSystem,
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[ApiServer] = {
val participantId: Ref.ParticipantId = config.participantId

override def acquire()(implicit context: ResourceContext): Resource[ApiServer] = {
val packageStore = loadDamlPackages()
preloadPackages(packageStore)
def writePortFile(port: Port): Try[Unit] = {
config.portFile match {
case Some(path) =>
PortFiles.write(path, port) match {
case -\/(err) => Failure(new RuntimeException(err.toString))
case \/-(()) => Success(())
}
case None =>
Success(())
}
}

val valueEnricher = new ValueEnricher(engine)
val errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
config.enableSelfServiceErrorCodes
)
val authorizer = new Authorizer(
Clock.systemUTC.instant _,
ledgerId,
participantId,
errorCodesVersionSwitcher,
)
val healthChecksWithIndexService = healthChecks + ("index" -> indexService)

val owner = for {
indexService <- JdbcIndex
.owner(
serverRole = ServerRole.ApiServer,
ledgerId = domain.LedgerId(ledgerId),
participantId = participantId,
jdbcUrl = config.jdbcUrl,
databaseConnectionPoolSize = config.databaseConnectionPoolSize,
databaseConnectionTimeout = config.databaseConnectionTimeout,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = valueEnricher,
maxContractStateCacheSize = config.maxContractStateCacheSize,
maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize,
enableMutableContractStateCache = config.enableMutableContractStateCache,
maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize,
enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
)
.map(index => new SpannedIndexService(new TimedIndexService(index, metrics)))
errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
config.enableSelfServiceErrorCodes
)
authorizer = new Authorizer(
Clock.systemUTC.instant _,
ledgerId,
participantId,
errorCodesVersionSwitcher,
)
healthChecksWithIndexService = healthChecks + ("index" -> indexService)
for {
executionSequencerFactory <- new ExecutionSequencerFactoryOwner()
apiServicesOwner = new ApiServices.Owner(
participantId = participantId,
Expand Down Expand Up @@ -145,7 +123,7 @@ final class StandaloneApiServer(
config.tlsConfig,
AuthorizationInterceptor(
authService,
executionContext,
servicesExecutionContext,
errorCodesVersionSwitcher,
) :: otherInterceptors,
servicesExecutionContext,
Expand All @@ -158,49 +136,5 @@ final class StandaloneApiServer(
)
apiServer
}

owner.acquire()
}

private def preloadPackages(packageContainer: InMemoryPackageStore): Unit = {
for {
(pkgId, _) <- packageContainer.listLfPackagesSync()
pkg <- packageContainer.getLfPackageSync(pkgId)
} {
engine
.preloadPackage(pkgId, pkg)
.consume(
{ _ =>
sys.error("Unexpected request of contract")
},
packageContainer.getLfPackageSync,
{ _ =>
sys.error("Unexpected request of contract key")
},
)
()
}
}

private def loadDamlPackages(): InMemoryPackageStore = {
// TODO is it sensible to have all the initial packages to be known since the epoch?
config.archiveFiles
.foldLeft[Either[(String, File), InMemoryPackageStore]](Right(InMemoryPackageStore.empty)) {
case (storeE, f) =>
storeE.flatMap(_.withDarFile(Timestamp.now(), None, f).left.map(_ -> f))
}
.fold({ case (err, file) => sys.error(s"Could not load package $file: $err") }, identity)
}

private def writePortFile(port: Port): Try[Unit] = {
config.portFile match {
case Some(path) =>
PortFiles.write(path, port) match {
case -\/(err) => Failure(new RuntimeException(err.toString))
case \/-(()) => Success(())
}
case None =>
Success(())
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.apiserver

import java.io.File

import akka.stream.Materializer
import com.daml.ledger.api.domain
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.participant.state.index.v2.IndexService
import com.daml.ledger.resources.ResourceOwner
import com.daml.lf.data.Ref
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.engine.{Engine, ValueEnricher}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.index.JdbcIndex
import com.daml.platform.packages.InMemoryPackageStore
import com.daml.platform.store.LfValueTranslationCache

import scala.concurrent.ExecutionContextExecutor

object StandaloneIndexService {
def apply(
ledgerId: LedgerId,
config: ApiServerConfig,
metrics: Metrics,
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
lfValueTranslationCache: LfValueTranslationCache.Cache,
)(implicit
materializer: Materializer,
loggingContext: LoggingContext,
): ResourceOwner[IndexService] = {
val participantId: Ref.ParticipantId = config.participantId
val valueEnricher = new ValueEnricher(engine)

def preloadPackages(packageContainer: InMemoryPackageStore): Unit = {
for {
(pkgId, _) <- packageContainer.listLfPackagesSync()
pkg <- packageContainer.getLfPackageSync(pkgId)
} {
engine
.preloadPackage(pkgId, pkg)
.consume(
{ _ =>
sys.error("Unexpected request of contract")
},
packageContainer.getLfPackageSync,
{ _ =>
sys.error("Unexpected request of contract key")
},
)
()
}
}

def loadDamlPackages(): InMemoryPackageStore = {
// TODO is it sensible to have all the initial packages to be known since the epoch?
config.archiveFiles
.foldLeft[Either[(String, File), InMemoryPackageStore]](Right(InMemoryPackageStore.empty)) {
case (storeE, f) =>
storeE.flatMap(_.withDarFile(Timestamp.now(), None, f).left.map(_ -> f))
}
.fold({ case (err, file) => sys.error(s"Could not load package $file: $err") }, identity)
}

for {
_ <- ResourceOwner.forValue(() => {
val packageStore = loadDamlPackages()
preloadPackages(packageStore)
})
indexService <- JdbcIndex
.owner(
serverRole = ServerRole.ApiServer,
ledgerId = domain.LedgerId(ledgerId),
participantId = participantId,
jdbcUrl = config.jdbcUrl,
databaseConnectionPoolSize = config.databaseConnectionPoolSize,
databaseConnectionTimeout = config.databaseConnectionTimeout,
eventsPageSize = config.eventsPageSize,
eventsProcessingParallelism = config.eventsProcessingParallelism,
acsIdPageSize = config.acsIdPageSize,
acsIdFetchingParallelism = config.acsIdFetchingParallelism,
acsContractFetchingParallelism = config.acsContractFetchingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = valueEnricher,
maxContractStateCacheSize = config.maxContractStateCacheSize,
maxContractKeyStateCacheSize = config.maxContractKeyStateCacheSize,
enableMutableContractStateCache = config.enableMutableContractStateCache,
maxTransactionsInMemoryFanOutBufferSize = config.maxTransactionsInMemoryFanOutBufferSize,
enableInMemoryFanOutForLedgerApi = config.enableInMemoryFanOutForLedgerApi,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
)
.map(index => new SpannedIndexService(new TimedIndexService(index, metrics)))
} yield indexService
}
}
1 change: 1 addition & 0 deletions ledger/participant-state/kvutils/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ da_scala_library(
"//ledger/metrics",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils",
"//libs-scala/contextualized-logging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.daml.lf.engine.{Engine, EngineConfig}
import com.daml.logging.LoggingContext.{newLoggingContext, withEnrichedLoggingContext}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.JvmMetricSet
import com.daml.platform.apiserver.StandaloneApiServer
import com.daml.platform.apiserver.{StandaloneApiServer, StandaloneIndexService}
import com.daml.platform.indexer.StandaloneIndexerServer
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.{IndexMetadata, LfValueTranslationCache}
Expand Down Expand Up @@ -160,11 +160,21 @@ final class Runner[T <: ReadWriteService, Extra](
case ParticipantRunMode.LedgerApiServer =>
Resource.successful(healthChecks)
}
apiServerConfig = factory.apiServerConfig(participantConfig, config)
indexService <- StandaloneIndexService(
ledgerId = config.ledgerId,
config = apiServerConfig,
metrics = metrics,
engine = sharedEngine,
servicesExecutionContext = servicesExecutionContext,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- participantConfig.mode match {
case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer =>
new StandaloneApiServer(
StandaloneApiServer(
indexService = indexService,
ledgerId = config.ledgerId,
config = factory.apiServerConfig(participantConfig, config),
config = apiServerConfig,
commandConfig = config.commandConfig,
submissionConfig = config.submissionConfig,
partyConfig = factory.partyConfig(config),
Expand All @@ -176,7 +186,6 @@ final class Runner[T <: ReadWriteService, Extra](
otherInterceptors = factory.interceptors(config),
engine = sharedEngine,
servicesExecutionContext = servicesExecutionContext,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
case ParticipantRunMode.Indexer =>
Resource.unit
Expand Down
1 change: 1 addition & 0 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ alias(
"//ledger/metrics",
"//ledger/participant-integration-api",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/participant-state-metrics",
"//ledger/participant-state/kvutils",
"//ledger/sandbox-common:sandbox-common-{}".format(edition),
Expand Down
Loading

0 comments on commit beca0ee

Please sign in to comment.