From 0d19bd252ff65cbfba94024b138b64557bd7a900 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 12 Jan 2022 11:05:13 +0100 Subject: [PATCH 1/4] kvutils/app: Factor out a `runParticipant` method for the Runner. --- .../state/kvutils/app/Runner.scala | 267 +++++++++--------- 1 file changed, 141 insertions(+), 126 deletions(-) diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index 480b3d65357b..d02678a927a0 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -19,6 +19,7 @@ import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagem import com.daml.ledger.participant.state.v2.metrics.{TimedReadService, TimedWriteService} import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.lf.engine.{Engine, EngineConfig} +import com.daml.logging.LoggingContext import com.daml.logging.LoggingContext.{newLoggingContext, withEnrichedLoggingContext} import com.daml.metrics.JvmMetricSet import com.daml.platform.apiserver.{StandaloneApiServer, StandaloneIndexService} @@ -34,6 +35,8 @@ final class Runner[T <: ReadWriteService, Extra]( factory: LedgerFactory[Extra], configProvider: ConfigProvider[Extra], ) { + private val cleanedName = "[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-") + def owner(args: collection.Seq[String]): ResourceOwner[Unit] = Config .owner(name, configProvider.extraConfigParser, configProvider.defaultExtraConfig, args) @@ -59,18 +62,9 @@ final class Runner[T <: ReadWriteService, Extra]( private def run( config: Config[Extra] )(implicit resourceContext: ResourceContext): Resource[Unit] = { - implicit val actorSystem: ActorSystem = ActorSystem( - "[^A-Za-z0-9_\\-]".r.replaceAllIn(name.toLowerCase, "-") - ) + implicit val actorSystem: ActorSystem = ActorSystem(cleanedName) implicit val materializer: Materializer = Materializer(actorSystem) - val sharedEngine = new Engine( - EngineConfig( - config.allowedLanguageVersions, - forbidV0ContractId = true, - ) - ) - newLoggingContext { implicit loggingContext => for { // Take ownership of the actor system and materializer so they're cleaned up properly. @@ -78,129 +72,150 @@ final class Runner[T <: ReadWriteService, Extra]( _ <- ResourceOwner.forActorSystem(() => actorSystem).acquire() _ <- ResourceOwner.forMaterializer(() => materializer).acquire() + sharedEngine = new Engine( + EngineConfig( + allowedLanguageVersions = config.allowedLanguageVersions, + forbidV0ContractId = true, + ) + ) + // initialize all configured participants - _ <- Resource.sequence(config.participants.map { participantConfig => - withEnrichedLoggingContext("participantId" -> participantConfig.participantId) { - implicit loggingContext => - val metrics = configProvider.createMetrics(participantConfig, config) - metrics.registry.registerAll(new JvmMetricSet) - val lfValueTranslationCache = LfValueTranslationCache.Cache.newInstrumentedInstance( - eventConfiguration = config.lfValueTranslationEventCache, - contractConfiguration = config.lfValueTranslationContractCache, - metrics = metrics, + _ <- Resource.sequence( + config.participants.map(participantConfig => + runParticipant(config, participantConfig, sharedEngine) + ) + ) + } yield () + } + } + + private def runParticipant( + config: Config[Extra], + participantConfig: ParticipantConfig, + sharedEngine: Engine, + )(implicit + resourceContext: ResourceContext, + loggingContext: LoggingContext, + actorSystem: ActorSystem, + materializer: Materializer, + ): Resource[Unit] = + withEnrichedLoggingContext("participantId" -> participantConfig.participantId) { + implicit loggingContext => + val metrics = configProvider.createMetrics(participantConfig, config) + metrics.registry.registerAll(new JvmMetricSet) + val lfValueTranslationCache = LfValueTranslationCache.Cache.newInstrumentedInstance( + eventConfiguration = config.lfValueTranslationEventCache, + contractConfiguration = config.lfValueTranslationContractCache, + metrics = metrics, + ) + for { + _ <- config.metricsReporter.fold(Resource.unit)(reporter => + ResourceOwner + .forCloseable(() => reporter.register(metrics.registry)) + .map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS)) + .acquire() + ) + servicesExecutionContext <- ResourceOwner + .forExecutorService(() => + new InstrumentedExecutorService( + Executors.newWorkStealingPool(), + metrics.registry, + metrics.daml.lapi.threadpool.apiServices.toString, ) + ) + .map(ExecutionContext.fromExecutorService) + .acquire() + ledgerFactory <- factory + .readWriteServiceFactoryOwner( + config, + participantConfig, + sharedEngine, + metrics, + )(materializer, servicesExecutionContext, loggingContext) + .acquire() + healthChecksWithIndexer <- participantConfig.mode match { + case ParticipantRunMode.Combined | ParticipantRunMode.Indexer => + val readService = new TimedReadService(ledgerFactory.readService(), metrics) for { - _ <- config.metricsReporter.fold(Resource.unit)(reporter => - ResourceOwner - .forCloseable(() => reporter.register(metrics.registry)) - .map(_.start(config.metricsReportingInterval.getSeconds, TimeUnit.SECONDS)) - .acquire() + indexerHealth <- new StandaloneIndexerServer( + readService = readService, + config = configProvider.indexerConfig(participantConfig, config), + servicesExecutionContext = servicesExecutionContext, + metrics = metrics, + lfValueTranslationCache = lfValueTranslationCache, + ).acquire() + } yield { + new HealthChecks( + "read" -> readService, + "indexer" -> indexerHealth, ) - servicesExecutionContext <- ResourceOwner - .forExecutorService(() => - new InstrumentedExecutorService( - Executors.newWorkStealingPool(), - metrics.registry, - metrics.daml.lapi.threadpool.apiServices.toString, - ) + } + case ParticipantRunMode.LedgerApiServer => + Resource.successful(new HealthChecks()) + } + apiServerConfig = configProvider.apiServerConfig(participantConfig, config) + _ <- participantConfig.mode match { + case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer => + for { + dbSupport <- DbSupport + .owner( + jdbcUrl = apiServerConfig.jdbcUrl, + serverRole = ServerRole.ApiServer, + connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, + connectionTimeout = apiServerConfig.databaseConnectionTimeout, + metrics = metrics, ) - .map(ExecutionContext.fromExecutorService) .acquire() - ledgerFactory <- factory - .readWriteServiceFactoryOwner( - config, - participantConfig, - sharedEngine, - metrics, - )(materializer, servicesExecutionContext, loggingContext) - .acquire() - healthChecksWithIndexer <- participantConfig.mode match { - case ParticipantRunMode.Combined | ParticipantRunMode.Indexer => - val readService = new TimedReadService(ledgerFactory.readService(), metrics) - for { - indexerHealth <- new StandaloneIndexerServer( - readService = readService, - config = configProvider.indexerConfig(participantConfig, config), - servicesExecutionContext = servicesExecutionContext, - metrics = metrics, - lfValueTranslationCache = lfValueTranslationCache, - ).acquire() - } yield { - new HealthChecks( - "read" -> readService, - "indexer" -> indexerHealth, + userManagementStore = + new InMemoryUserManagementStore // TODO persistence wiring comes here + indexService <- StandaloneIndexService( + dbSupport = dbSupport, + ledgerId = config.ledgerId, + config = apiServerConfig, + metrics = metrics, + engine = sharedEngine, + servicesExecutionContext = servicesExecutionContext, + lfValueTranslationCache = lfValueTranslationCache, + ).acquire() + factory = new KeyValueDeduplicationSupportFactory( + ledgerFactory, + config, + indexService, + )(implicitly, servicesExecutionContext) + writeService = new TimedWriteService(factory.writeService(), metrics) + _ <- StandaloneApiServer( + indexService = indexService, + userManagementStore = userManagementStore, + ledgerId = config.ledgerId, + config = apiServerConfig, + commandConfig = config.commandConfig, + submissionConfig = config.submissionConfig, + partyConfig = configProvider.partyConfig(config), + optWriteService = Some(writeService), + authService = configProvider.authService(config), + healthChecks = healthChecksWithIndexer + ("write" -> writeService), + metrics = metrics, + timeServiceBackend = configProvider.timeServiceBackend(config), + otherInterceptors = configProvider.interceptors(config), + engine = sharedEngine, + servicesExecutionContext = servicesExecutionContext, + commandDeduplicationFeatures = CommandDeduplicationFeatures.of( + deduplicationPeriodSupport = Some( + CommandDeduplicationPeriodSupport.of( + offsetSupport = + CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION, + durationSupport = + CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT, ) - } - case ParticipantRunMode.LedgerApiServer => - Resource.successful(new HealthChecks()) - } - apiServerConfig = configProvider.apiServerConfig(participantConfig, config) - _ <- participantConfig.mode match { - case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer => - for { - dbSupport <- DbSupport - .owner( - jdbcUrl = apiServerConfig.jdbcUrl, - serverRole = ServerRole.ApiServer, - connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, - connectionTimeout = apiServerConfig.databaseConnectionTimeout, - metrics = metrics, - ) - .acquire() - userManagementStore = - new InMemoryUserManagementStore // TODO persistence wiring comes here - indexService <- StandaloneIndexService( - dbSupport = dbSupport, - ledgerId = config.ledgerId, - config = apiServerConfig, - metrics = metrics, - engine = sharedEngine, - servicesExecutionContext = servicesExecutionContext, - lfValueTranslationCache = lfValueTranslationCache, - ).acquire() - factory = new KeyValueDeduplicationSupportFactory( - ledgerFactory, - config, - indexService, - )(implicitly, servicesExecutionContext) - writeService = new TimedWriteService(factory.writeService(), metrics) - _ <- StandaloneApiServer( - indexService = indexService, - userManagementStore = userManagementStore, - ledgerId = config.ledgerId, - config = apiServerConfig, - commandConfig = config.commandConfig, - submissionConfig = config.submissionConfig, - partyConfig = configProvider.partyConfig(config), - optWriteService = Some(writeService), - authService = configProvider.authService(config), - healthChecks = healthChecksWithIndexer + ("write" -> writeService), - metrics = metrics, - timeServiceBackend = configProvider.timeServiceBackend(config), - otherInterceptors = configProvider.interceptors(config), - engine = sharedEngine, - servicesExecutionContext = servicesExecutionContext, - commandDeduplicationFeatures = CommandDeduplicationFeatures.of( - deduplicationPeriodSupport = Some( - CommandDeduplicationPeriodSupport.of( - offsetSupport = - CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION, - durationSupport = - CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT, - ) - ), - deduplicationType = CommandDeduplicationType.ASYNC_ONLY, - maxDeduplicationDurationEnforced = true, - ), - ).acquire() - } yield {} - case ParticipantRunMode.Indexer => - Resource.unit - } - } yield () + ), + deduplicationType = CommandDeduplicationType.ASYNC_ONLY, + maxDeduplicationDurationEnforced = true, + ), + ).acquire() + } yield {} + case ParticipantRunMode.Indexer => + Resource.unit } - }) - } yield () + } yield () } - } } From d7a4b62c52a26230041fe45dec6d1828383f889a Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 12 Jan 2022 13:13:39 +0100 Subject: [PATCH 2/4] kvutils/app: Create a test case for the Runner. This is quite sophisticated because it needs to provision a configuration, and so needs a basic implementation of the write->read flow, at least for configuration updates. The basic ledger used for the initial configuration update is implemented using an `ArrayBuffer`, a `BoundedSourceQueue`, and a `Dispatcher`, within the test file. CHANGELOG_BEGIN CHANGELOG_END --- ledger/ledger-on-memory/BUILD.bazel | 2 - .../on/memory/InMemoryConfigProvider.scala | 13 - .../com/daml/ledger/on/memory/Owner.scala | 4 +- .../participant-state/kvutils/app/BUILD.bazel | 24 +- .../state/kvutils/app/ConfigProvider.scala | 8 + .../state/kvutils/app/Runner.scala | 26 +- .../app/src/test/resources/logback-test.xml | 9 + .../state/kvutils/app/RunnerSpec.scala | 268 ++++++++++++++++++ 8 files changed, 319 insertions(+), 35 deletions(-) delete mode 100644 ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryConfigProvider.scala create mode 100644 ledger/participant-state/kvutils/app/src/test/resources/logback-test.xml create mode 100644 ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala diff --git a/ledger/ledger-on-memory/BUILD.bazel b/ledger/ledger-on-memory/BUILD.bazel index bb69ec79b1cd..244d2ecb0302 100644 --- a/ledger/ledger-on-memory/BUILD.bazel +++ b/ledger/ledger-on-memory/BUILD.bazel @@ -96,7 +96,6 @@ da_scala_library( srcs = glob(["src/app/scala/**/*.scala"]), resources = glob(["src/app/resources/**/*"]), scala_deps = [ - "@maven//:com_github_scopt_scopt", "@maven//:com_typesafe_akka_akka_actor", "@maven//:com_typesafe_akka_akka_stream", ], @@ -108,7 +107,6 @@ da_scala_library( "//daml-lf/engine", "//language-support/scala/bindings", "//ledger/caching", - "//ledger/ledger-api-auth", "//ledger/ledger-api-common", "//ledger/ledger-api-health", "//ledger/ledger-configuration", diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryConfigProvider.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryConfigProvider.scala deleted file mode 100644 index 059822f92c73..000000000000 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/InMemoryConfigProvider.scala +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.ledger.on.memory - -import com.daml.ledger.participant.state.kvutils.app.{Config, ConfigProvider} -import scopt.OptionParser - -private[memory] object InMemoryConfigProvider extends ConfigProvider[Unit] { - override val defaultExtraConfig: Unit = () - - override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = () -} diff --git a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala index f4aafbea2c14..3c5ab79f22f0 100644 --- a/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala +++ b/ledger/ledger-on-memory/src/app/scala/com/daml/ledger/on/memory/Owner.scala @@ -3,7 +3,7 @@ package com.daml.ledger.on.memory -import com.daml.ledger.participant.state.kvutils.app.{Config, Runner} +import com.daml.ledger.participant.state.kvutils.app.{Config, ConfigProvider, Runner} import com.daml.ledger.resources.ResourceOwner object Owner { @@ -13,6 +13,6 @@ object Owner { dispatcher <- dispatcherOwner sharedState = InMemoryState.empty factory = new InMemoryLedgerFactory(dispatcher, sharedState) - runner <- new Runner(RunnerName, factory, InMemoryConfigProvider).owner(config) + runner <- new Runner(RunnerName, factory, ConfigProvider.ForUnit).owner(config) } yield runner } diff --git a/ledger/participant-state/kvutils/app/BUILD.bazel b/ledger/participant-state/kvutils/app/BUILD.bazel index 534137fd5b06..ae7c291ba6d0 100644 --- a/ledger/participant-state/kvutils/app/BUILD.bazel +++ b/ledger/participant-state/kvutils/app/BUILD.bazel @@ -16,7 +16,6 @@ da_scala_library( "@maven//:com_github_scopt_scopt", "@maven//:com_typesafe_akka_akka_actor", "@maven//:com_typesafe_akka_akka_stream", - "@maven//:org_scala_lang_modules_scala_collection_compat", ], tags = ["maven_coordinates=com.daml:participant-state-kvutils-app:__VERSION__"], visibility = [ @@ -60,10 +59,12 @@ da_scala_library( da_scala_test_suite( name = "app-tests", size = "small", - srcs = glob(["src/test/**/*.scala"]), + srcs = glob(["src/test/scala/**/*.scala"]), + resources = glob(["src/test/resources/**/*"]), scala_deps = [ "@maven//:com_github_scopt_scopt", - "@maven//:org_mockito_mockito_scala", + "@maven//:com_typesafe_akka_akka_actor", + "@maven//:com_typesafe_akka_akka_stream", "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalatest_scalatest_matchers_core", "@maven//:org_scalatest_scalatest_shouldmatchers", @@ -71,16 +72,29 @@ da_scala_test_suite( ], deps = [ ":app", + "//daml-lf/archive:daml_lf_1.dev_archive_proto_java", "//daml-lf/data", + "//daml-lf/engine", + "//daml-lf/transaction", + "//ledger-api/grpc-definitions:ledger_api_proto_scala", + "//ledger/caching", "//ledger/ledger-api-common", "//ledger/ledger-api-health", + "//ledger/ledger-configuration", + "//ledger/ledger-offset", + "//ledger/ledger-resources", + "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/participant-state", "//ledger/participant-state/kvutils", - "//libs-scala/postgresql-testing", + "//libs-scala/contextualized-logging", + "//libs-scala/ports", "//libs-scala/resources", + "//libs-scala/resources-akka", + "//libs-scala/resources-grpc", + "@maven//:com_google_protobuf_protobuf_java", + "@maven//:io_grpc_grpc_api", "@maven//:io_netty_netty_handler", - "@maven//:org_mockito_mockito_core", "@maven//:org_scalatest_scalatest_compatible", ], ) diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ConfigProvider.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ConfigProvider.scala index ad65c219a155..2906d24307ff 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ConfigProvider.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ConfigProvider.scala @@ -122,3 +122,11 @@ trait ConfigProvider[ExtraConfig] { new Metrics(SharedMetricRegistries.getOrCreate(registryName)) } } + +object ConfigProvider { + object ForUnit extends ConfigProvider[Unit] { + override val defaultExtraConfig: Unit = () + + override def extraConfigParser(parser: OptionParser[Config[Unit]]): Unit = () + } +} diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala index d02678a927a0..1382d281a4f1 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Runner.scala @@ -27,6 +27,7 @@ import com.daml.platform.configuration.ServerRole import com.daml.platform.indexer.StandaloneIndexerServer import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.{DbSupport, LfValueTranslationCache} +import com.daml.ports.Port import scala.concurrent.ExecutionContext @@ -45,12 +46,11 @@ final class Runner[T <: ReadWriteService, Extra]( def owner(originalConfig: Config[Extra]): ResourceOwner[Unit] = new ResourceOwner[Unit] { override def acquire()(implicit context: ResourceContext): Resource[Unit] = { val config = configProvider.manipulateConfig(originalConfig) - val errorFactories = ErrorFactories( - new ErrorCodesVersionSwitcher(originalConfig.enableSelfServiceErrorCodes) - ) - config.mode match { case Mode.DumpIndexMetadata(jdbcUrls) => + val errorFactories = ErrorFactories( + new ErrorCodesVersionSwitcher(originalConfig.enableSelfServiceErrorCodes) + ) DumpIndexMetadata(jdbcUrls, errorFactories, name) sys.exit(0) case Mode.Run => @@ -59,7 +59,7 @@ final class Runner[T <: ReadWriteService, Extra]( } } - private def run( + private[app] def run( config: Config[Extra] )(implicit resourceContext: ResourceContext): Resource[Unit] = { implicit val actorSystem: ActorSystem = ActorSystem(cleanedName) @@ -80,7 +80,7 @@ final class Runner[T <: ReadWriteService, Extra]( ) // initialize all configured participants - _ <- Resource.sequence( + _ <- Resource.sequenceIgnoringValues( config.participants.map(participantConfig => runParticipant(config, participantConfig, sharedEngine) ) @@ -89,7 +89,7 @@ final class Runner[T <: ReadWriteService, Extra]( } } - private def runParticipant( + private[app] def runParticipant( config: Config[Extra], participantConfig: ParticipantConfig, sharedEngine: Engine, @@ -98,7 +98,7 @@ final class Runner[T <: ReadWriteService, Extra]( loggingContext: LoggingContext, actorSystem: ActorSystem, materializer: Materializer, - ): Resource[Unit] = + ): Resource[Option[Port]] = withEnrichedLoggingContext("participantId" -> participantConfig.participantId) { implicit loggingContext => val metrics = configProvider.createMetrics(participantConfig, config) @@ -154,7 +154,7 @@ final class Runner[T <: ReadWriteService, Extra]( Resource.successful(new HealthChecks()) } apiServerConfig = configProvider.apiServerConfig(participantConfig, config) - _ <- participantConfig.mode match { + port <- participantConfig.mode match { case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer => for { dbSupport <- DbSupport @@ -183,7 +183,7 @@ final class Runner[T <: ReadWriteService, Extra]( indexService, )(implicitly, servicesExecutionContext) writeService = new TimedWriteService(factory.writeService(), metrics) - _ <- StandaloneApiServer( + apiServer <- StandaloneApiServer( indexService = indexService, userManagementStore = userManagementStore, ledgerId = config.ledgerId, @@ -212,10 +212,10 @@ final class Runner[T <: ReadWriteService, Extra]( maxDeduplicationDurationEnforced = true, ), ).acquire() - } yield {} + } yield Some(apiServer.port) case ParticipantRunMode.Indexer => - Resource.unit + Resource.successful(None) } - } yield () + } yield port } } diff --git a/ledger/participant-state/kvutils/app/src/test/resources/logback-test.xml b/ledger/participant-state/kvutils/app/src/test/resources/logback-test.xml new file mode 100644 index 000000000000..4f492b2a0353 --- /dev/null +++ b/ledger/participant-state/kvutils/app/src/test/resources/logback-test.xml @@ -0,0 +1,9 @@ + + + + + + diff --git a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala new file mode 100644 index 000000000000..bfa91333f077 --- /dev/null +++ b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala @@ -0,0 +1,268 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.ledger.participant.state.kvutils.app + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{CompletableFuture, CompletionStage} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.stream.{Materializer, QueueOfferResult} +import com.daml.daml_lf_dev.DamlLf +import com.daml.ledger.api.health.{HealthStatus, Healthy} +import com.daml.ledger.api.v1.ledger_identity_service.{ + GetLedgerIdentityRequest, + LedgerIdentityServiceGrpc, +} +import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions} +import com.daml.ledger.offset.Offset +import com.daml.ledger.participant.state.kvutils.KVOffsetBuilder +import com.daml.ledger.participant.state.kvutils.app.RunnerSpec._ +import com.daml.ledger.participant.state.v2.{ + PruningResult, + ReadService, + SubmissionResult, + SubmitterInfo, + TransactionMeta, + Update, + WriteService, +} +import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +import com.daml.lf.data.Ref +import com.daml.lf.data.Time.Timestamp +import com.daml.lf.engine.Engine +import com.daml.lf.transaction.SubmittedTransaction +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.metrics.Metrics +import com.daml.platform.akkastreams.dispatcher.{Dispatcher, SubSource} +import com.daml.ports.Port +import com.daml.telemetry.TelemetryContext +import com.google.rpc.status.{Status => StatusProto} +import io.grpc.Status.Code +import io.grpc.{Channel, ManagedChannelBuilder, Status} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AsyncWordSpec + +import scala.collection.mutable +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ExecutionContext, Future} +import scala.language.existentials + +class RunnerSpec extends AsyncWordSpec with Matchers { + private implicit val resourceContext: ResourceContext = ResourceContext(ExecutionContext.global) + private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + + "the runner" should { + "start a participant" in { + val config = Config.createDefault(()).copy(ledgerId = LedgerId) + val participantConfig = newTestPartcipantConfig() + + val runner = new Runner(Name, TestLedgerFactory, ConfigProvider.ForUnit) + val app = startWithRunner(config, participantConfig, runner) + + app.asFuture + .flatMap { channel => + LedgerIdentityServiceGrpc + .stub(channel) + .getLedgerIdentity(GetLedgerIdentityRequest.of()) + .map { response => + response.ledgerId should be(LedgerId) + } + } + .transformWith(result => app.release().flatMap(_ => Future.fromTry(result))) + } + } +} + +object RunnerSpec { + private val Name = classOf[RunnerSpec].getSimpleName + private val LedgerId = s"$Name-Ledger" + + private val logger = ContextualizedLogger.get(getClass) + + private val engine = Engine.StableEngine() + + private def newTestPartcipantConfig() = { + val participantId = Ref.ParticipantId.assertFromString("participant") + ParticipantConfig( + mode = ParticipantRunMode.Combined, + participantId = participantId, + shardName = None, + address = None, + port = Port.Dynamic, + portFile = None, + serverJdbcUrl = ParticipantConfig.defaultIndexJdbcUrl(participantId), + indexerConfig = ParticipantIndexerConfig(allowExistingSchema = false), + ) + } + + private def startWithRunner( + config: Config[Unit], + participantConfig: ParticipantConfig, + runner: Runner[Nothing, Unit], + )(implicit + resourceContext: ResourceContext, + loggingContext: LoggingContext, + ): Resource[Channel] = { + for { + actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(Name)).acquire() + materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire() + port <- runner + .runParticipant(config, participantConfig, engine)( + resourceContext, + loggingContext, + actorSystem, + materializer, + ) + .map(_.get) + channel <- ResourceOwner + .forChannel( + { + val builder = ManagedChannelBuilder + .forAddress(InetAddress.getLoopbackAddress.getHostName, port.value) + builder.usePlaintext() + builder + }, + shutdownTimeout = 1.second, + ) + .acquire() + } yield channel + } + + object TestLedgerFactory extends LedgerFactory[Unit] { + private val offsetBuilder = new KVOffsetBuilder(0) + + private val initialConditions = LedgerInitialConditions( + LedgerId, + Configuration.reasonableInitialConfiguration, + Timestamp.Epoch, + ) + + override def readWriteServiceFactoryOwner( + config: Config[Unit], + participantConfig: ParticipantConfig, + engine: Engine, + metrics: Metrics, + )(implicit + materializer: Materializer, + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): ResourceOwner[ReadWriteServiceFactory] = + for { + updates <- ResourceOwner.forValue(() => mutable.ArrayBuffer.empty[Update]) + head = new AtomicInteger(0) + dispatcher <- Dispatcher.owner(Name, 0, head.get()) + updateQueue <- ResourceOwner + .forBoundedSourceQueue( + Source + .queue[(Int, Update)](bufferSize = 100) + .toMat(Sink.foreach { case (head, update) => + updates += update + dispatcher.signalNewHead(head) + })(Keep.both) + ) + .map(_._1) + factory <- ResourceOwner.successful(new ReadWriteServiceFactory { + override def readService(): ReadService = new ReadService { + override def currentHealth(): HealthStatus = Healthy + + override def ledgerInitialConditions(): Source[LedgerInitialConditions, NotUsed] = + Source.single(initialConditions).concat(Source.never) + + override def stateUpdates( + beginAfter: Option[Offset] + )(implicit loggingContext: LoggingContext): Source[(Offset, Update), NotUsed] = + dispatcher + .startingAt( + beginAfter.fold(0)(offsetBuilder.highestIndex(_).toInt), + SubSource.OneAfterAnother( + _ + 1, + index => Future.successful(updates(index - 1)), + ), + ) + .map { case (index, update) => + offsetBuilder.of(index.toLong) -> update + } + } + + override def writeService(): WriteService = new WriteService { + override def currentHealth(): HealthStatus = Healthy + + override def submitConfiguration( + maxRecordTime: Timestamp, + submissionId: Ref.SubmissionId, + config: Configuration, + )(implicit + loggingContext: LoggingContext, + telemetryContext: TelemetryContext, + ): CompletionStage[SubmissionResult] = { + val configurationUpdate = Update.ConfigurationChanged( + recordTime = Timestamp.now(), + submissionId = submissionId, + participantId = participantConfig.participantId, + newConfiguration = config, + ) + updateQueue.offer(head.incrementAndGet() -> configurationUpdate) match { + case QueueOfferResult.Enqueued => + acknowledged + case QueueOfferResult.Dropped => + failure(Code.RESOURCE_EXHAUSTED, "submitConfiguration") + case QueueOfferResult.QueueClosed => + failure(Code.ABORTED, "submitConfiguration") + case QueueOfferResult.Failure(cause) => + logger.error("submitConfiguration", cause) + failure(Code.INTERNAL, "submitConfiguration") + } + } + + override def allocateParty( + hint: Option[Ref.Party], + displayName: Option[String], + submissionId: Ref.SubmissionId, + )(implicit + loggingContext: LoggingContext, + telemetryContext: TelemetryContext, + ): CompletionStage[SubmissionResult] = failure(Code.UNIMPLEMENTED, "allocateParty") + + override def uploadPackages( + submissionId: Ref.SubmissionId, + archives: List[DamlLf.Archive], + sourceDescription: Option[String], + )(implicit + loggingContext: LoggingContext, + telemetryContext: TelemetryContext, + ): CompletionStage[SubmissionResult] = failure(Code.UNIMPLEMENTED, "uploadPackages") + + override def submitTransaction( + submitterInfo: SubmitterInfo, + transactionMeta: TransactionMeta, + transaction: SubmittedTransaction, + estimatedInterpretationCost: Long, + )(implicit + loggingContext: LoggingContext, + telemetryContext: TelemetryContext, + ): CompletionStage[SubmissionResult] = failure(Code.UNIMPLEMENTED, "submitTransaction") + + override def prune( + pruneUpToInclusive: Offset, + submissionId: Ref.SubmissionId, + pruneAllDivulgedContracts: Boolean, + ): CompletionStage[PruningResult] = CompletableFuture.completedFuture( + PruningResult.NotPruned(Status.UNIMPLEMENTED.withDescription("prune")) + ) + } + }) + } yield factory + + private def acknowledged: CompletionStage[SubmissionResult] = + CompletableFuture.completedFuture(SubmissionResult.Acknowledged) + + private def failure(code: Code, message: String): CompletionStage[SubmissionResult] = + CompletableFuture.completedFuture( + SubmissionResult.SynchronousError(StatusProto.of(code.value(), message, Seq.empty)) + ) + } +} From 8a8136c4ecdf2013e7d71162ead45a65001a002c Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 12 Jan 2022 13:33:30 +0100 Subject: [PATCH 3/4] kvutils/app: Use `ResourceOwner` so we can use `#use` in tests. --- .../state/kvutils/app/RunnerSpec.scala | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala index bfa91333f077..f840bf5caf39 100644 --- a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala +++ b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala @@ -59,20 +59,16 @@ class RunnerSpec extends AsyncWordSpec with Matchers { "start a participant" in { val config = Config.createDefault(()).copy(ledgerId = LedgerId) val participantConfig = newTestPartcipantConfig() - val runner = new Runner(Name, TestLedgerFactory, ConfigProvider.ForUnit) - val app = startWithRunner(config, participantConfig, runner) - app.asFuture - .flatMap { channel => - LedgerIdentityServiceGrpc - .stub(channel) - .getLedgerIdentity(GetLedgerIdentityRequest.of()) - .map { response => - response.ledgerId should be(LedgerId) - } - } - .transformWith(result => app.release().flatMap(_ => Future.fromTry(result))) + newApp(config, participantConfig, runner).use { channel => + LedgerIdentityServiceGrpc + .stub(channel) + .getLedgerIdentity(GetLedgerIdentityRequest.of()) + .map { response => + response.ledgerId should be(LedgerId) + } + } } } } @@ -99,25 +95,27 @@ object RunnerSpec { ) } - private def startWithRunner( + private def newApp[T <: ReadWriteService]( config: Config[Unit], participantConfig: ParticipantConfig, - runner: Runner[Nothing, Unit], + runner: Runner[T, Unit], )(implicit - resourceContext: ResourceContext, - loggingContext: LoggingContext, - ): Resource[Channel] = { + loggingContext: LoggingContext + ): ResourceOwner[Channel] = for { - actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(Name)).acquire() - materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)).acquire() - port <- runner - .runParticipant(config, participantConfig, engine)( - resourceContext, - loggingContext, - actorSystem, - materializer, - ) - .map(_.get) + actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(Name)) + materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)) + port <- new ResourceOwner[Port] { + override def acquire()(implicit context: ResourceContext): Resource[Port] = + runner + .runParticipant(config, participantConfig, engine)( + context, + loggingContext, + actorSystem, + materializer, + ) + .map(_.get) + } channel <- ResourceOwner .forChannel( { @@ -128,9 +126,7 @@ object RunnerSpec { }, shutdownTimeout = 1.second, ) - .acquire() } yield channel - } object TestLedgerFactory extends LedgerFactory[Unit] { private val offsetBuilder = new KVOffsetBuilder(0) From 085a4c0143ae603c26888556cb35fd7de102bcf4 Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Wed, 12 Jan 2022 14:31:58 +0100 Subject: [PATCH 4/4] kvutils/app: Use `AkkaBeforeAndAfterAll` to reduce boilerplate. --- ledger/participant-state/kvutils/app/BUILD.bazel | 2 ++ .../state/kvutils/app/RunnerSpec.scala | 16 ++++++---------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/ledger/participant-state/kvutils/app/BUILD.bazel b/ledger/participant-state/kvutils/app/BUILD.bazel index ae7c291ba6d0..66e9368631fc 100644 --- a/ledger/participant-state/kvutils/app/BUILD.bazel +++ b/ledger/participant-state/kvutils/app/BUILD.bazel @@ -77,6 +77,8 @@ da_scala_test_suite( "//daml-lf/engine", "//daml-lf/transaction", "//ledger-api/grpc-definitions:ledger_api_proto_scala", + "//ledger-api/rs-grpc-bridge", + "//ledger-api/testing-utils", "//ledger/caching", "//ledger/ledger-api-common", "//ledger/ledger-api-health", diff --git a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala index f840bf5caf39..a9656566150c 100644 --- a/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala +++ b/ledger/participant-state/kvutils/app/src/test/scala/com/daml/ledger/participant/state/kvutils/app/RunnerSpec.scala @@ -13,6 +13,7 @@ import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{Materializer, QueueOfferResult} import com.daml.daml_lf_dev.DamlLf import com.daml.ledger.api.health.{HealthStatus, Healthy} +import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.api.v1.ledger_identity_service.{ GetLedgerIdentityRequest, LedgerIdentityServiceGrpc, @@ -51,7 +52,7 @@ import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future} import scala.language.existentials -class RunnerSpec extends AsyncWordSpec with Matchers { +class RunnerSpec extends AsyncWordSpec with Matchers with AkkaBeforeAndAfterAll { private implicit val resourceContext: ResourceContext = ResourceContext(ExecutionContext.global) private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting @@ -100,20 +101,15 @@ object RunnerSpec { participantConfig: ParticipantConfig, runner: Runner[T, Unit], )(implicit - loggingContext: LoggingContext + loggingContext: LoggingContext, + actorSystem: ActorSystem, + materializer: Materializer, ): ResourceOwner[Channel] = for { - actorSystem <- ResourceOwner.forActorSystem(() => ActorSystem(Name)) - materializer <- ResourceOwner.forMaterializer(() => Materializer(actorSystem)) port <- new ResourceOwner[Port] { override def acquire()(implicit context: ResourceContext): Resource[Port] = runner - .runParticipant(config, participantConfig, engine)( - context, - loggingContext, - actorSystem, - materializer, - ) + .runParticipant(config, participantConfig, engine) .map(_.get) } channel <- ResourceOwner