From 954bc5e0d414d46194a1868fcfec719d29424125 Mon Sep 17 00:00:00 2001 From: mziolekda Date: Wed, 19 Jan 2022 09:18:14 +0100 Subject: [PATCH] Remove the reset service from sandbox-classic and sandbox [DPP-804] (#12448) * Remove the reset service from sandbox-classic and sandbox CHANGELOG_BEGIN Reset service has been removed from the sandbox-classic, sandbox and daml-on-sql CHANGELOG_END * format it --- .../error/definitions/LedgerApiErrors.scala | 9 - .../digitalasset/ledger/api/auth/Claims.scala | 1 - .../api/validation/ErrorFactories.scala | 14 - .../api/validation/ErrorFactoriesSpec.scala | 28 -- .../participant-integration-api/BUILD.bazel | 1 - ledger/sandbox-classic/BUILD.bazel | 34 +- .../platform/sandbox/SandboxServer.scala | 61 +-- .../reset/ResetServiceDatabaseIT.scala | 131 ------ .../sandbox/services/LegacyServiceIT.scala | 11 +- .../services/reflection/ReflectionIT.scala | 2 +- .../reset/ResetServiceH2DatabaseIT.scala | 12 - .../services/reset/ResetServiceIT.scala | 27 -- .../reset/ResetServicePostgresqlIT.scala | 12 - ledger/sandbox-common/BUILD.bazel | 4 - .../services/SandboxResetService.scala | 84 ---- .../services/reset/ResetServiceITBase.scala | 303 ------------- .../platform/sandbox/perf/LedgerContext.scala | 7 - ledger/sandbox/BUILD.bazel | 32 +- .../scala/platform/sandboxnext/Runner.scala | 398 ++++++++---------- .../reset/ResetServiceInMemoryIT.scala | 15 - .../reset/ResetServiceOnPostgresqlIT.scala | 15 - 21 files changed, 197 insertions(+), 1004 deletions(-) delete mode 100644 ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceDatabaseIT.scala delete mode 100644 ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceH2DatabaseIT.scala delete mode 100644 ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceIT.scala delete mode 100644 ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServicePostgresqlIT.scala delete mode 100644 ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala delete mode 100644 ledger/sandbox-common/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceITBase.scala delete mode 100644 ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceInMemoryIT.scala delete mode 100644 ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceOnPostgresqlIT.scala diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index 27b577e7eb46..30918c946fe8 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -284,15 +284,6 @@ object LedgerApiErrors extends LedgerApiErrorGroup { override def context: Map[String, String] = super.context ++ Map("service_name" -> _serviceName) } - - case class ServiceReset(_serviceName: String)(implicit - loggingContext: ContextualizedErrorLogger - ) extends LoggingTransactionErrorImpl( - cause = s"${_serviceName} is currently being reset." - ) { - override def context: Map[String, String] = - super.context ++ Map("service_name" -> _serviceName) - } } @Explanation("Authentication errors.") diff --git a/ledger/ledger-api-auth/src/main/scala/com/digitalasset/ledger/api/auth/Claims.scala b/ledger/ledger-api-auth/src/main/scala/com/digitalasset/ledger/api/auth/Claims.scala index f1057af6dec8..a53c22c6ba47 100644 --- a/ledger/ledger-api-auth/src/main/scala/com/digitalasset/ledger/api/auth/Claims.scala +++ b/ledger/ledger-api-auth/src/main/scala/com/digitalasset/ledger/api/auth/Claims.scala @@ -73,7 +73,6 @@ object ClaimSet { * | PackageService | * | isPublic | * | PackageManagementService | * | isAdmin | * | PartyManagementService | * | isAdmin | - * | ResetService | * | isAdmin | * | ServerReflection | * | N/A (authentication not required) | * | TimeService | GetTime | isPublic | * | TimeService | SetTime | isAdmin | diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index 714f56ef5031..625b3b277d05 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -524,20 +524,6 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch v2 = LedgerApiErrors.ServiceNotRunning.Reject(serviceName).asGrpcError, ) - def serviceIsBeingReset(legacyStatusCode: Int)(serviceName: String)(implicit - contextualizedErrorLogger: ContextualizedErrorLogger - ): StatusRuntimeException = - errorCodesVersionSwitcher.choose( - v1 = { - val statusBuilder = Status - .newBuilder() - .setCode(legacyStatusCode) - .setMessage(s"$serviceName is currently being reset.") - grpcError(statusBuilder.build()) - }, - v2 = LedgerApiErrors.ServiceNotRunning.ServiceReset(serviceName).asGrpcError, - ) - def trackerFailure(msg: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): StatusRuntimeException = diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index c27d5d3c6231..2e45d20437f4 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -753,34 +753,6 @@ class ErrorFactoriesSpec } } - "return a serviceIsBeingReset error" in { - val serviceName = "Some API Service" - val someLegacyStatusCode = Code.CANCELLED - - val msg = - s"SERVICE_NOT_RUNNING(1,$truncatedCorrelationId): $serviceName is currently being reset." - assertVersionedError(_.serviceIsBeingReset(someLegacyStatusCode.value())(serviceName))( - v1_code = someLegacyStatusCode, - v1_message = s"$serviceName is currently being reset.", - v1_details = Seq.empty, - v2_code = Code.UNAVAILABLE, - v2_message = msg, - v2_details = Seq[ErrorDetails.ErrorDetail]( - ErrorDetails.ErrorInfoDetail( - "SERVICE_NOT_RUNNING", - Map("category" -> "1", "definite_answer" -> "false", "service_name" -> serviceName), - ), - expectedCorrelationIdRequestInfo, - ErrorDetails.RetryInfoDetail(1), - ), - v2_logEntry = ExpectedLogEntry( - Level.INFO, - msg, - expectedMarkerRegex("service_name=Some API Service"), - ), - ) - } - "return a missingField error" in { val fieldName = "my field" diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index 3df5ea82cb72..a06b3b7dce98 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -229,7 +229,6 @@ da_scala_test_suite( srcs = glob( ["src/test/suite/**/*.scala"], exclude = [ - "src/test/suite/**/ResetService*IT.scala", "**/*Oracle*", ], ), diff --git a/ledger/sandbox-classic/BUILD.bazel b/ledger/sandbox-classic/BUILD.bazel index 0968ef476244..855ffd90aca5 100644 --- a/ledger/sandbox-classic/BUILD.bazel +++ b/ledger/sandbox-classic/BUILD.bazel @@ -126,8 +126,6 @@ da_scala_library( scala_deps = [ "@maven//:com_typesafe_akka_akka_actor", "@maven//:com_typesafe_akka_akka_stream", - "@maven//:org_playframework_anorm_anorm", - "@maven//:org_playframework_anorm_anorm_tokenizer", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalatest_scalatest_flatspec", @@ -162,7 +160,6 @@ da_scala_library( "//ledger/ledger-api-health", "//ledger/ledger-configuration", "//ledger/ledger-resources", - "//ledger/ledger-resources:ledger-resources-test-lib", "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/participant-integration-api:participant-integration-api-tests-lib", @@ -264,43 +261,24 @@ scala_test_deps = [ "@maven//:org_scalaz_scalaz_core", ] -# The reset service is cursed so we mark all tests involving it as flaky. -reset_service_pattern = "src/test/suite/**/*ResetService*.scala" - [ da_scala_test_suite( - name = "sandbox-classic-tests{}".format(suffix), - size = size, + name = "sandbox-classic-tests", + size = "medium", srcs = glob( - [pattern], - exclude = exclusions, + ["src/test/suite/**/*.scala"], + exclude = [], ), data = [ "//daml-lf/encoder:testing-dars", "//ledger/test-common:model-tests-default.dar", "//ledger/test-common/test-certificates", ], - flaky = flaky, + flaky = False, resources = glob(["src/test/resources/**/*"]) + ["//ledger/sandbox-common:src/main/resources/logback.xml"], scala_deps = scala_test_deps, deps = test_deps, - ) - for (suffix, pattern, exclusions, flaky, size) in [ - ( - "", - "src/test/suite/**/*.scala", - [reset_service_pattern], - False, - "medium", # Default timeout of 5min - ), - ( - "-resetservice", - reset_service_pattern, - [], - True, - "large", # Reset service tests have a large variance in their run time, use a timeout of 15min - ), - ] + ), ] SERVERS = { diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala index bbe10ed6e20e..b4150e5bdccb 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala @@ -46,12 +46,10 @@ import com.daml.platform.sandbox.SandboxServer._ import com.daml.platform.sandbox.banner.Banner import com.daml.platform.sandbox.config.SandboxConfig.EngineMode import com.daml.platform.sandbox.config.{LedgerName, SandboxConfig} -import com.daml.platform.sandbox.services.SandboxResetService import com.daml.platform.sandbox.stores.ledger.ScenarioLoader.LedgerEntryOrBump import com.daml.platform.sandbox.stores.ledger._ import com.daml.platform.sandbox.stores.ledger.sql.SqlStartMode import com.daml.platform.sandbox.stores.{InMemoryActiveLedgerState, SandboxIndexAndWriteService} -import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.services.time.TimeProviderType import com.daml.platform.store.{DbSupport, DbType, FlywayMigrations, LfValueTranslationCache} import com.daml.platform.usermanagement.PersistentUserManagementStore @@ -122,11 +120,8 @@ object SandboxServer { } final class SandboxState( - materializer: Materializer, - metrics: Metrics, - packageStore: InMemoryPackageStore, // nested resource so we can release it independently when restarting - apiServerResource: Resource[ApiServer], + apiServerResource: Resource[ApiServer] ) { def port(implicit executionContext: ExecutionContext): Future[Port] = apiServer.map(_.port) @@ -134,21 +129,6 @@ object SandboxServer { private[SandboxServer] def apiServer: Future[ApiServer] = apiServerResource.asFuture - private[SandboxServer] def reset( - newApiServer: ( - Materializer, - Metrics, - InMemoryPackageStore, - Port, - ) => Resource[ApiServer] - )(implicit executionContext: ExecutionContext): Future[SandboxState] = - for { - currentPort <- port - _ <- release() - replacementApiServer = newApiServer(materializer, metrics, packageStore, currentPort) - _ <- replacementApiServer.asFuture - } yield new SandboxState(materializer, metrics, packageStore, replacementApiServer) - def release(): Future[Unit] = apiServerResource.release() } @@ -211,30 +191,6 @@ final class SandboxServer( def portF(implicit executionContext: ExecutionContext): Future[Port] = apiServer.map(_.port) - def resetAndRestartServer()(implicit - executionContext: ExecutionContext, - loggingContext: LoggingContext, - ): Future[Unit] = { - val apiServicesClosed = apiServer.flatMap(_.servicesClosed()) - - // TODO: eliminate the state mutation somehow - sandboxState = sandboxState.flatMap( - _.reset((materializer, metrics, packageStore, port) => - buildAndStartApiServer( - materializer = materializer, - metrics = metrics, - packageStore = packageStore, - startMode = SqlStartMode.ResetAndStart, - currentPort = Some(port), - ) - ) - ) - - // Wait for the services to be closed, so we can guarantee that future API calls after finishing - // the reset will never be handled by the old one. - apiServicesClosed - } - // if requested, initialize the ledger state with the given scenario private def createInitialState( config: SandboxConfig, @@ -403,15 +359,6 @@ final class SandboxServer( "index" -> indexAndWriteService.indexService, "write" -> indexAndWriteService.writeService, ) - // the reset service is special, since it triggers a server shutdown - resetService = new SandboxResetService( - ledgerId, - () => resetAndRestartServer(), - authorizer, - errorFactories = ErrorFactories( - new ErrorCodesVersionSwitcher(config.enableSelfServiceErrorCodes) - ), - ) executionSequencerFactory <- new ExecutionSequencerFactoryOwner().acquire() apiServicesOwner = new ApiServices.Owner( participantId = config.participantId, @@ -449,7 +396,6 @@ final class SandboxServer( maxDeduplicationDurationEnforced = false, ), )(materializer, executionSequencerFactory, loggingContext) - .map(_.withServices(List(resetService))) apiServer <- new LedgerApiServer( apiServicesOwner, // NOTE: Re-use the same port after reset. @@ -463,8 +409,7 @@ final class SandboxServer( userManagementStore, servicesExecutionContext, errorCodesVersionSwitcher, - ), - resetService, + ) ), servicesExecutionContext, metrics, @@ -524,7 +469,7 @@ final class SandboxServer( .getOrElse(SqlStartMode.MigrateAndStart), currentPort = None, ) - Future.successful(new SandboxState(materializer, metrics, packageStore, apiServerResource)) + Future.successful(new SandboxState(apiServerResource)) } } diff --git a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceDatabaseIT.scala b/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceDatabaseIT.scala deleted file mode 100644 index c1e9af1ebd76..000000000000 --- a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceDatabaseIT.scala +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services.reset - -import java.sql.{Connection, DriverManager} - -import anorm.SqlParser._ -import anorm.{SQL, SqlStringInterpolation} -import com.daml.ledger.api.testing.utils.MockMessages -import com.daml.ledger.resources.{ResourceContext, ResourceOwner} -import com.daml.platform.sandbox.services.reset.ResetServiceDatabaseIT.countRowsOfAllTables -import com.daml.platform.sandbox.services.{DbInfo, SandboxFixture} -import com.daml.platform.store.DbType - -import scala.concurrent.Future -import scala.util.Try - -abstract class ResetServiceDatabaseIT extends ResetServiceITBase with SandboxFixture { - - // Database-backed reset service is allowed a bit more slack - override def spanScaleFactor: Double = 2.0 - - "ResetService" when { - - "run against a database backend" should { - - "leave the tables in the expected state" in { - - val ignored = Set( - "flyway_schema_history", // this is not touched by resets, it's used for migrations - "packages", // preserved by the reset to match the compiled packages still loaded in the engine - ) - - for { - ledgerId <- fetchLedgerId() - party <- allocateParty(MockMessages.party) - _ <- submitAndExpectCompletions(ledgerId, 10, party) - _ <- reset(ledgerId) - counts <- countRowsOfAllTables(ignored, database.get) - } yield { - - val expectedToHaveOneItem = Set( - "parameters" // a new set of parameters is stored at startup - ) - - for ((table, count) <- counts if expectedToHaveOneItem(table)) { - withClue(s"$table has $count item(s): ") { - count shouldBe 1 - } - } - - // FIXME this appears to be racy, forcing us to make a loose check - val expectedToHaveOneItemOrLess = Set( - "configuration_entries" - ) - - for ((table, count) <- counts if expectedToHaveOneItemOrLess(table)) { - withClue(s"$table has $count item(s): ") { - count should be <= 1 - } - } - - // Everything else should be empty - val exceptions = ignored union expectedToHaveOneItem union expectedToHaveOneItemOrLess - val expectedToBeEmpty = counts.keySet.diff(exceptions) - - for ((table, count) <- counts if expectedToBeEmpty(table)) { - withClue(s"$table has $count item(s): ") { - count shouldBe 0 - } - } - - succeed - } - - } - - } - - } - -} - -object ResetServiceDatabaseIT { - - def countRowsOfAllTables( - ignored: Set[String], - dbInfoOwner: ResourceOwner[DbInfo], - )(implicit resourceContext: ResourceContext): Future[Map[String, Int]] = - runQuery(dbInfoOwner)(countRowsOfAllTables(ignored)) - - // Very naive helper, supposed to be used exclusively for testing - private def runQuery[A](dbInfoOwner: ResourceOwner[DbInfo])( - sql: DbType => Connection => A - )(implicit resourceContext: ResourceContext): Future[A] = { - val dbTypeAndConnection = - for { - dbInfo <- dbInfoOwner - _ <- ResourceOwner.forTry[Class[_]](() => Try(Class.forName(dbInfo.dbType.driver))) - connection <- ResourceOwner.forCloseable(() => DriverManager.getConnection(dbInfo.jdbcUrl)) - } yield (dbInfo.dbType, connection) - dbTypeAndConnection.use { case (dbType, connection) => - Future.fromTry(Try(sql(dbType)(connection))) - } - } - - private def listTables(dbType: DbType)(connection: Connection): List[String] = - dbType match { - case DbType.Postgres => - SQL"select tablename from pg_catalog.pg_tables where schemaname != 'pg_catalog' and schemaname != 'information_schema'" - .as(str("tablename").*)(connection) - case DbType.H2Database => - SQL"select table_name from information_schema.tables where table_schema <> 'INFORMATION_SCHEMA'" - .as(str("table_name").*)(connection) - case DbType.Oracle => - SQL"select * from USER_TABLES" - .as(str("table_name").*)(connection) - } - - private def countRows(tableName: String)(connection: Connection): Int = - SQL(s"select count(*) as no_rows from $tableName").as(int("no_rows").single)(connection) - - private def countRowsOfAllTables( - ignored: Set[String] - )(dbType: DbType)(connection: Connection): Map[String, Int] = - listTables(dbType)(connection).collect { - case table if !ignored(table) => table.toLowerCase -> countRows(table)(connection) - }.toMap - -} diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/LegacyServiceIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/LegacyServiceIT.scala index ccbbf81e9bd5..334d002584a5 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/LegacyServiceIT.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/LegacyServiceIT.scala @@ -85,6 +85,15 @@ class LegacyServiceIT } } + private def expectUnimplemented[A](block: => A): Assertion = { + inside(Try(block)) { + case Success(_) => fail() + case Failure(exc: StatusRuntimeException) => + exc.getStatus.getCode shouldBe Status.Code.UNIMPLEMENTED + case Failure(otherwise) => fail(otherwise) + } + } + "Ledger API Server" should { "offer com.digitalasset.ledger.api.v1.ActiveContractsService" in { expectNotUnimplemented { @@ -151,7 +160,7 @@ class LegacyServiceIT } "offer com.digitalasset.ledger.api.v1.testing.ResetService" in { - expectNotUnimplemented { + expectUnimplemented { val testingReset = ResetServiceGrpc.blockingStub(channel).withInterceptors(legacyCallInterceptor) testingReset.reset(ResetRequest(randomLedgerId)) diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reflection/ReflectionIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reflection/ReflectionIT.scala index 25da8b004624..ac6a343a2c6f 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reflection/ReflectionIT.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reflection/ReflectionIT.scala @@ -32,7 +32,7 @@ final class ReflectionIT "accessed" should { "provide a list of exposed services" in { - val expectedServiceCount: Int = 18 + val expectedServiceCount: Int = 17 for { response <- execRequest(listServices) } yield { diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceH2DatabaseIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceH2DatabaseIT.scala deleted file mode 100644 index 77d93a1f1208..000000000000 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceH2DatabaseIT.scala +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services.reset - -import com.daml.platform.sandbox.SandboxBackend -import com.daml.platform.sandbox.services.SandboxFixture - -final class ResetServiceH2DatabaseIT - extends ResetServiceDatabaseIT - with SandboxFixture - with SandboxBackend.H2Database diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceIT.scala deleted file mode 100644 index 8640f6e9d25d..000000000000 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServiceIT.scala +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services.reset - -import com.daml.platform.sandbox.services.SandboxFixture - -import scala.concurrent.Await -import scala.concurrent.duration.DurationInt -import scala.ref.WeakReference - -final class ResetServiceIT extends ResetServiceITBase with SandboxFixture { - "ResetService" when { - "state is reset" should { - "clear out all garbage" in { - val state = new WeakReference(Await.result(server.sandboxState, 5.seconds)) - for { - lid <- fetchLedgerId() - _ <- reset(lid) - } yield { - System.gc() - state.get should be(None) - } - } - } - } -} diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServicePostgresqlIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServicePostgresqlIT.scala deleted file mode 100644 index b918da30c0f9..000000000000 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/services/reset/ResetServicePostgresqlIT.scala +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services.reset - -import com.daml.platform.sandbox.SandboxBackend -import com.daml.platform.sandbox.services.SandboxFixture - -final class ResetServicePostgresqlIT - extends ResetServiceDatabaseIT - with SandboxFixture - with SandboxBackend.Postgresql diff --git a/ledger/sandbox-common/BUILD.bazel b/ledger/sandbox-common/BUILD.bazel index 0d3fac6ea77d..f36a5bebe030 100644 --- a/ledger/sandbox-common/BUILD.bazel +++ b/ledger/sandbox-common/BUILD.bazel @@ -34,7 +34,6 @@ da_scala_library( "//ledger-service/jwt", "//ledger/caching", "//ledger/cli-opts", - "//ledger/error", "//ledger/ledger-api-auth", "//ledger/ledger-api-common", "//ledger/ledger-api-domain", @@ -132,18 +131,15 @@ da_scala_library( "//ledger/ledger-api-domain", "//ledger/ledger-configuration", "//ledger/ledger-resources", - "//ledger/ledger-resources:ledger-resources-test-lib", "//ledger/metrics", "//ledger/participant-integration-api", "//ledger/test-common", "//ledger/test-common:dar-files-default-lib", - "//libs-scala/contextualized-logging", "//libs-scala/ports", "//libs-scala/postgresql-testing", "//libs-scala/resources", "//libs-scala/resources-akka", "//libs-scala/resources-grpc", - "//libs-scala/timer-utils", "@maven//:ch_qos_logback_logback_classic", "@maven//:com_auth0_java_jwt", "@maven//:io_netty_netty_handler", diff --git a/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala b/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala deleted file mode 100644 index a4f850843179..000000000000 --- a/ledger/sandbox-common/src/main/scala/platform/sandbox/services/SandboxResetService.scala +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services - -import java.util.concurrent.atomic.AtomicBoolean - -import com.daml.error.DamlContextualizedErrorLogger -import com.daml.ledger.api.auth.Authorizer -import com.daml.ledger.api.domain.LedgerId -import com.daml.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc} -import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.platform.server.api.validation.ErrorFactories -import com.google.protobuf.empty.Empty -import io.grpc.ServerCall.Listener -import io.grpc._ - -import scala.concurrent.{ExecutionContext, Future} - -class SandboxResetService( - ledgerId: LedgerId, - resetAndRestartServer: () => Future[Unit], - authorizer: Authorizer, - errorFactories: ErrorFactories, -)(implicit loggingContext: LoggingContext) - extends ResetServiceGrpc.ResetService - with BindableService - with ServerInterceptor { - - private val logger = ContextualizedLogger.get(this.getClass) - private implicit val contextualizedErrorLogger: DamlContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) - - private val resetInitialized = new AtomicBoolean(false) - - override def bindService(): ServerServiceDefinition = - ResetServiceGrpc.bindService(this, ExecutionContext.parasitic) - - override def reset(request: ResetRequest): Future[Empty] = - authorizer.requireAdminClaims(doReset)(request) - - override def interceptCall[ReqT, RespT]( - serverCall: ServerCall[ReqT, RespT], - metadata: Metadata, - serverCallHandler: ServerCallHandler[ReqT, RespT], - ): Listener[ReqT] = { - if (resetInitialized.get) { - throw errorFactories - .serviceIsBeingReset(Status.Code.UNAVAILABLE.value())("Sandbox server") - } - - serverCallHandler.startCall(serverCall, metadata) - } - - // to reset: - // * initiate a graceful shutdown -- note that this won't kill the in-flight requests, including - // the reset request itself we're serving; - // * serve the response to the reset request; - // * then, close all the services so hopefully the graceful shutdown will terminate quickly... - // * ...but not before serving the request to the reset request itself, which we've already done. - private def doReset(request: ResetRequest): Future[Empty] = - Either - .cond( - ledgerId == LedgerId(request.ledgerId), - request.ledgerId, - errorFactories.ledgerIdMismatch(ledgerId, LedgerId(request.ledgerId), None), - ) - .fold( - Future.failed[Empty], - _ => actuallyReset().map(_ => Empty())(ExecutionContext.parasitic), - ) - - private def actuallyReset() = { - logger.info("Initiating server reset.") - - if (!resetInitialized.compareAndSet(false, true)) { - throw errorFactories - .serviceIsBeingReset(Status.Code.FAILED_PRECONDITION.value())("Sandbox server") - } - - logger.info(s"Stopping and starting the server.") - resetAndRestartServer() - } -} diff --git a/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceITBase.scala b/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceITBase.scala deleted file mode 100644 index d805be946d82..000000000000 --- a/ledger/sandbox-common/src/test/lib/scala/platform/sandbox/services/reset/ResetServiceITBase.scala +++ /dev/null @@ -1,303 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandbox.services.reset - -import java.io.File -import java.time.Instant -import java.util.UUID -import com.daml.api.util.TimestampConversion -import com.daml.bazeltools.BazelRunfiles.rlocation -import com.daml.ledger.api.domain.LedgerId -import com.daml.ledger.api.testing.utils.{ - IsStatusException, - SuiteResourceManagementAroundAll, - MockMessages => M, -} -import com.daml.ledger.api.v1.active_contracts_service.{ - ActiveContractsServiceGrpc, - GetActiveContractsRequest, - GetActiveContractsResponse, -} -import com.daml.ledger.api.v1.admin.party_management_service.{ - AllocatePartyRequest, - PartyManagementServiceGrpc, -} -import com.daml.ledger.api.v1.command_completion_service.{ - CommandCompletionServiceGrpc, - CompletionStreamRequest, -} -import com.daml.ledger.api.v1.command_service.{CommandServiceGrpc, SubmitAndWaitRequest} -import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc -import com.daml.ledger.api.v1.event.CreatedEvent -import com.daml.ledger.api.v1.ledger_identity_service.{ - GetLedgerIdentityRequest, - LedgerIdentityServiceGrpc, -} -import com.daml.ledger.api.v1.package_service.{ListPackagesRequest, PackageServiceGrpc} -import com.daml.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc} -import com.daml.ledger.api.v1.testing.time_service.{ - GetTimeRequest, - GetTimeResponse, - SetTimeRequest, - TimeServiceGrpc, -} -import com.daml.ledger.api.v1.transaction_filter.TransactionFilter -import com.daml.ledger.resources.TestResourceContext -import com.daml.ledger.test.ModelTestDar -import com.daml.logging.{ContextualizedLogger, LoggingContext} -import com.daml.platform.common.LedgerIdMode -import com.daml.platform.sandbox.AbstractSandboxFixture -import com.daml.platform.sandbox.config.SandboxConfig -import com.daml.platform.sandbox.services.TestCommands -import com.daml.platform.services.time.TimeProviderType -import com.daml.platform.testing.{StreamConsumer, WaitForCompletionsObserver} -import com.daml.timer.RetryStrategy -import com.google.protobuf.empty.Empty -import io.grpc.Status -import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScalaFutures} -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.Span -import org.scalatest.wordspec.AsyncWordSpec -import scalaz.syntax.tag._ - -import scala.concurrent.duration.{DurationInt, DurationLong, FiniteDuration, MILLISECONDS} -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -abstract class ResetServiceITBase - extends AsyncWordSpec - with AsyncTimeLimitedTests - with Matchers - with ScalaFutures - with TestResourceContext - with AbstractSandboxFixture - with SuiteResourceManagementAroundAll - with TestCommands { - - protected val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) - protected implicit val loggingContext: LoggingContext = LoggingContext.ForTesting - - override def timeLimit: Span = scaled(30.seconds) - - override protected def config: SandboxConfig = - super.config.copy(ledgerIdMode = LedgerIdMode.Dynamic) - - protected implicit val ec: ExecutionContext = ExecutionContext.global - - override protected def darFile: File = new File(rlocation(ModelTestDar.path)) - - // Must try for at least (5 seconds * scale factor), as that's what the tests require. - private val eventually: RetryStrategy = RetryStrategy.constant(50, scaled(100.milliseconds)) - - private val timeIsStatic: Boolean = { - val timeProviderType = config.timeProviderType.getOrElse(SandboxConfig.DefaultTimeProviderType) - timeProviderType == TimeProviderType.Static - } - - protected def fetchLedgerId(): Future[LedgerId] = - LedgerIdentityServiceGrpc - .stub(channel) - .getLedgerIdentity(GetLedgerIdentityRequest()) - .map(response => LedgerId(response.ledgerId)) - - protected def waitForLedgerToRestart(oldLedgerId: LedgerId): Future[LedgerId] = - eventually { (_, _) => - fetchLedgerId().filter(_ != oldLedgerId) - } - - // Resets and waits for a new ledger identity to be available - protected def reset(ledgerId: LedgerId): Future[LedgerId] = - timedReset(ledgerId).map(_._1) - - protected def timedReset(ledgerId: LedgerId): Future[(LedgerId, FiniteDuration)] = { - logger.info(s"Calling reset on $ledgerId") - val start = System.nanoTime() - ResetServiceGrpc - .stub(channel) - .reset(ResetRequest(ledgerId.unwrap)) - .flatMap(_ => waitForLedgerToRestart(ledgerId)) - .map(_ -> (System.nanoTime() - start).nanos) - .andThen { - case Success((newLedgerId, d)) => - info(s"Ledger $ledgerId reset") - logger.info( - s"Reset finished on $ledgerId after ${FiniteDuration(d.toMillis, MILLISECONDS)}, new ledgerId is $newLedgerId" - ) - case Failure(e) => - logger.warn(s"Reset failed on $ledgerId because of $e") - } - } - - protected def allocateParty(hint: String): Future[String] = - PartyManagementServiceGrpc - .stub(channel) - .allocateParty(AllocatePartyRequest(hint)) - .map(_.partyDetails.get.party) - - protected def submitAndWait(req: SubmitAndWaitRequest): Future[Empty] = - CommandServiceGrpc.stub(channel).submitAndWait(req) - - protected def activeContracts( - ledgerId: LedgerId, - f: TransactionFilter, - ): Future[Set[CreatedEvent]] = - new StreamConsumer[GetActiveContractsResponse]( - ActiveContractsServiceGrpc - .stub(channel) - .getActiveContracts(GetActiveContractsRequest(ledgerId.unwrap, Some(f)), _) - ) - .all() - .map(_.view.flatMap(_.activeContracts).toSet) - - protected def listPackages(ledgerId: LedgerId): Future[Seq[String]] = - PackageServiceGrpc - .stub(channel) - .listPackages(ListPackagesRequest(ledgerId.unwrap)) - .map(_.packageIds) - - protected def submitAndExpectCompletions( - ledgerId: LedgerId, - commands: Int, - party: String, - ): Future[Unit] = - for { - _ <- Future.sequence( - Vector.fill(commands)( - CommandSubmissionServiceGrpc - .stub(channel) - .submit(dummyCommands(ledgerId, UUID.randomUUID.toString, party)) - ) - ) - unit <- WaitForCompletionsObserver(commands)( - CommandCompletionServiceGrpc - .stub(channel) - .completionStream( - CompletionStreamRequest( - ledgerId = ledgerId.unwrap, - applicationId = M.applicationId, - parties = Seq(party), - offset = Some(M.ledgerBegin), - ), - _, - ) - ) - } yield unit - - protected def getTime(ledgerId: LedgerId): Future[Instant] = - new StreamConsumer[GetTimeResponse]( - TimeServiceGrpc.stub(channel).getTime(GetTimeRequest(ledgerId.unwrap), _) - ) - .first() - .map(_.flatMap(_.currentTime).map(TimestampConversion.toInstant).get) - - protected def setTime(ledgerId: LedgerId, currentTime: Instant, newTime: Instant): Future[Unit] = - TimeServiceGrpc - .stub(channel) - .setTime( - SetTimeRequest( - ledgerId.unwrap, - Some(TimestampConversion.fromInstant(currentTime)), - Some(TimestampConversion.fromInstant(newTime)), - ) - ) - .map(_ => ()) - - "ResetService" when { - "state is reset" should { - "return a new ledger ID" in { - for { - lid1 <- fetchLedgerId() - lid2 <- reset(lid1) - throwable <- reset(lid1).failed - } yield { - lid1 should not equal lid2 - IsStatusException(Status.Code.NOT_FOUND)(throwable) - } - } - - "return new ledger ID - 10 resets" in { - Future - .sequence(Iterator.iterate(fetchLedgerId())(_.flatMap(reset)).take(10).toVector) - .map(ids => ids.distinct should have size 10L) - } - - // 4 attempts with 5 transactions each seem to strike the right balance to complete before the - // 30 seconds test timeout in normal conditions while still causing the test to fail if - // something goes wrong. - // - // The 10 seconds timeout built into the context's ledger reset will be hit if something goes - // horribly wrong, causing an exception to report "waitForNewLedger: out of retries". - val expectedResetCompletionTime: FiniteDuration = scaled(5.seconds) - s"consistently complete within $expectedResetCompletionTime" in { - val numberOfCommands = 5 - val numberOfAttempts = 4 - Future - .sequence( - Iterator - .iterate(fetchLedgerId()) { ledgerIdF => - for { - ledgerId <- ledgerIdF - party <- allocateParty(M.party) - _ <- submitAndExpectCompletions(ledgerId, numberOfCommands, party) - (newLedgerId, completionTime) <- timedReset(ledgerId) - _ = completionTime should be <= expectedResetCompletionTime - } yield newLedgerId - } - .take(numberOfAttempts) - ) - .map(_ => succeed) - } - - "remove contracts from ACS after reset" in { - for { - ledgerId <- fetchLedgerId() - party <- allocateParty(M.party) - request = dummyCommands(ledgerId, "commandId1", party) - _ <- submitAndWait(SubmitAndWaitRequest(commands = request.commands)) - events <- activeContracts(ledgerId, M.transactionFilter) - _ = events should have size 3 - newLid <- reset(ledgerId) - newEvents <- activeContracts(newLid, M.transactionFilter) - } yield { - newEvents should have size 0 - } - } - - "retain previously uploaded packages" in { - for { - ledgerId <- fetchLedgerId() - packagesBeforeReset <- eventually { (_, _) => - listPackages(ledgerId).map { packages => - packages.size should be > 0 - packages - } - } - newLid <- reset(ledgerId) - packagesAfterReset <- listPackages(newLid) - } yield { - packagesBeforeReset should contain theSameElementsAs packagesAfterReset - } - } - - if (timeIsStatic) { - "reset the time to the epoch" in { - for { - ledgerId <- fetchLedgerId() - epoch <- getTime(ledgerId) - - now = Instant.now() - _ <- setTime(ledgerId, epoch, now) - newTime <- getTime(ledgerId) - _ = newTime should not be epoch - - newLedgerId <- reset(ledgerId) - resetTime <- getTime(newLedgerId) - } yield { - resetTime should be(epoch) - } - } - } - } - } -} diff --git a/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/LedgerContext.scala b/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/LedgerContext.scala index 0fb2ec100faf..32750c4047b9 100644 --- a/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/LedgerContext.scala +++ b/ledger/sandbox-perf/src/perf/lib/scala/com/digitalasset/platform/sandbox/perf/LedgerContext.scala @@ -16,11 +16,8 @@ import com.daml.ledger.api.v1.ledger_identity_service.{ GetLedgerIdentityRequest, LedgerIdentityServiceGrpc, } -import com.daml.ledger.api.v1.testing.reset_service.ResetServiceGrpc.ResetService -import com.daml.ledger.api.v1.testing.reset_service.{ResetRequest, ResetServiceGrpc} import io.grpc.{Channel, StatusRuntimeException} import org.slf4j.LoggerFactory -import scalaz.syntax.tag._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -63,7 +60,6 @@ final class LedgerContext(channel: Channel, packageIds: Iterable[PackageId])(imp } } for { - _ <- resetService.reset(ResetRequest(ledgerId.unwrap)) _ <- waitForNewLedger(10) } yield new LedgerContext(channel, packageIds) } @@ -77,7 +73,4 @@ final class LedgerContext(channel: Channel, packageIds: Iterable[PackageId])(imp def acsService: ActiveContractsServiceStub = ActiveContractsServiceGrpc.stub(channel) - def resetService: ResetService = - ResetServiceGrpc.stub(channel) - } diff --git a/ledger/sandbox/BUILD.bazel b/ledger/sandbox/BUILD.bazel index 48576903dc2a..002fd3215792 100644 --- a/ledger/sandbox/BUILD.bazel +++ b/ledger/sandbox/BUILD.bazel @@ -46,7 +46,6 @@ alias( "//daml-lf/transaction", "//language-support/scala/bindings", "//ledger/caching", - "//ledger/error", "//ledger/ledger-api-auth", "//ledger/ledger-api-common", "//ledger/ledger-api-domain", @@ -146,23 +145,20 @@ da_scala_library( openssl_executable = "@openssl_dev_env//:bin/openssl" if not is_windows else "@openssl_dev_env//:usr/bin/openssl.exe" -# The reset service is cursed so we mark all tests involving it as flaky. -reset_service_pattern = "src/test/suite/**/*ResetService*.scala" - [ da_scala_test_suite( - name = "sandbox-tests{}".format(suffix), - size = size, + name = "sandbox-tests", + size = "medium", srcs = glob( - [pattern], - exclude = exclusions, + ["src/test/suite/**/*.scala"], + exclude = [], ), data = [ "//ledger/test-common:dar-files", "//ledger/test-common/test-certificates", openssl_executable, ], - flaky = flaky, + flaky = False, jvm_flags = [ "-Djava.security.debug=\"certpath ocsp\"", # This facilitates debugging of the OCSP checks mechanism ], @@ -239,23 +235,7 @@ reset_service_pattern = "src/test/suite/**/*ResetService*.scala" "@maven//:org_scalatest_scalatest_compatible", "@maven//:org_slf4j_slf4j_api", ], - ) - for (suffix, pattern, exclusions, flaky, size) in [ - ( - "", - "src/test/suite/**/*.scala", - [reset_service_pattern], - False, - "medium", # Default timeout of 5min - ), - ( - "-resetservice", - reset_service_pattern, - [], - True, - "large", # Reset service tests have a large variance in their run time, use a timeout of 15min - ), - ] + ), ] # Full conformance test diff --git a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala index f35d0c460fe1..61305e16d5a9 100644 --- a/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala +++ b/ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala @@ -4,7 +4,7 @@ package com.daml.platform.sandboxnext import java.io.File -import java.time.{Clock, Instant} +import java.time.Instant import java.util.UUID import java.util.concurrent.Executors @@ -13,9 +13,7 @@ import akka.stream.Materializer import com.daml.api.util.TimeProvider import com.daml.buildinfo.BuildInfo import com.daml.caching -import com.daml.error.ErrorCodesVersionSwitcher -import com.daml.ledger.api.auth.{AuthServiceWildcard, Authorizer} -import com.daml.ledger.api.domain +import com.daml.ledger.api.auth.AuthServiceWildcard import com.daml.ledger.api.health.HealthChecks import com.daml.ledger.api.v1.experimental_features.{ CommandDeduplicationFeatures, @@ -49,14 +47,11 @@ import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, StandaloneI import com.daml.platform.sandbox.banner.Banner import com.daml.platform.sandbox.config.SandboxConfig import com.daml.platform.sandbox.config.SandboxConfig.EngineMode -import com.daml.platform.sandbox.services.SandboxResetService import com.daml.platform.sandboxnext.Runner._ -import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.services.time.TimeProviderType import com.daml.platform.store.{DbSupport, LfValueTranslationCache} import com.daml.platform.usermanagement.PersistentUserManagementStore import com.daml.ports.Port -import com.daml.resources.ResettableResourceOwner import com.daml.telemetry.{DefaultTelemetry, SpanKind, SpanName} import scalaz.syntax.tag._ @@ -93,27 +88,26 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { new Engine(engineConfig) } - private val (ledgerType, ledgerJdbcUrl, indexJdbcUrl, startupMode): ( + private val (ledgerType, ledgerJdbcUrl, indexJdbcUrl): ( String, String, String, - StartupMode, ) = config.jdbcUrl match { case Some(url) if url.startsWith("jdbc:postgresql:") => - ("PostgreSQL", url, url, StartupMode.MigrateAndStart) + ("PostgreSQL", url, url) case Some(url) if url.startsWith("jdbc:h2:mem:") => - ("in-memory", InMemoryLedgerJdbcUrl, url, StartupMode.MigrateAndStart) + ("in-memory", InMemoryLedgerJdbcUrl, url) case Some(url) if url.startsWith("jdbc:h2:") => throw new InvalidDatabaseException( "This version of Sandbox does not support file-based H2 databases. Please use SQLite instead." ) case Some(url) if url.startsWith("jdbc:sqlite:") => - ("SQLite", url, InMemoryIndexJdbcUrl, StartupMode.MigrateAndStart) + ("SQLite", url, InMemoryIndexJdbcUrl) case Some(_) => throw new InvalidDatabaseException(s"Unknown database") case None => - ("in-memory", InMemoryLedgerJdbcUrl, InMemoryIndexJdbcUrl, StartupMode.MigrateAndStart) + ("in-memory", InMemoryLedgerJdbcUrl, InMemoryIndexJdbcUrl) } private val timeProviderType = @@ -145,223 +139,173 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] { servicesExecutionContext <- ResourceOwner.forExecutorService(() => ExecutionContext.fromExecutorService(Executors.newWorkStealingPool()) ) - apiServer <- ResettableResourceOwner[ - ResourceContext, - ApiServer, - (Option[Port], StartupMode), - ]( - initialValue = (None, startupMode), - owner = reset => { case (currentPort, startupMode) => - val isReset = startupMode == StartupMode.ResetAndStart - val ledgerId = specifiedLedgerId.getOrElse(UUID.randomUUID().toString) - val timeServiceBackend = timeProviderType match { - case TimeProviderType.Static => - Some(TimeServiceBackend.simple(Instant.EPOCH)) - case TimeProviderType.WallClock => - None - } - for { - readerWriter <- new SqlLedgerReaderWriter.Owner( - ledgerId = ledgerId, - participantId = config.participantId, - metrics = metrics, - engine = engine, - jdbcUrl = ledgerJdbcUrl, - resetOnStartup = isReset, - offsetVersion = 0, - logEntryIdAllocator = - new SeedServiceLogEntryIdAllocator(SeedService(config.seeding.get)), - stateValueCache = caching.WeightedCache.from( - caching.WeightedCache.Configuration( - maximumWeight = MaximumStateValueCacheSize - ) - ), - timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC), - ) - readService = new TimedReadService( - KeyValueParticipantStateReader( - readerWriter, - metrics, - enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, - ), - metrics, - ) - writeService = new TimedWriteService( - new KeyValueParticipantStateWriter( - new TimedLedgerWriter(readerWriter, metrics), - metrics, - ), - metrics, - ) - _ <- - if (isReset) { - ResourceOwner.unit - } else { - ResourceOwner - .forFuture(() => - Future.sequence(config.damlPackages.map(uploadDar(_, writeService))) - ) - .map(_ => ()) - } - indexer <- new StandaloneIndexerServer( - readService = readService, - config = IndexerConfig( - participantId = config.participantId, - jdbcUrl = indexJdbcUrl, - startupMode = - if (isReset) IndexerStartupMode.ResetAndStart - else IndexerStartupMode.MigrateAndStart, - eventsPageSize = config.eventsPageSize, - allowExistingSchema = true, - enableCompression = config.enableCompression, - ), - servicesExecutionContext = servicesExecutionContext, - metrics = metrics, - lfValueTranslationCache = lfValueTranslationCache, - ) - healthChecks = new HealthChecks( - "read" -> readService, - "write" -> writeService, - "indexer" -> indexer, - ) - // Required to tie the loop between the API server and the reset service. - apiServerServicesClosed = Promise[Unit]() - resetService = { - val clock = Clock.systemUTC() - val authorizer = - new Authorizer( - () => clock.instant(), - ledgerId, - config.participantId, - new ErrorCodesVersionSwitcher(config.enableSelfServiceErrorCodes), - ) - new SandboxResetService( - domain.LedgerId(ledgerId), - () => { - // Don't block the reset request; just wait until the services are closed. - // Otherwise we end up in deadlock, because the server won't shut down until - // all requests are completed. - reset() - apiServerServicesClosed.future - }, - authorizer, - errorFactories = ErrorFactories( - new ErrorCodesVersionSwitcher(config.enableSelfServiceErrorCodes) - ), - ) - } - apiServerConfig = ApiServerConfig( - participantId = config.participantId, - archiveFiles = if (isReset) List.empty else config.damlPackages, - // Re-use the same port when resetting the server. - port = currentPort.getOrElse(config.port), - address = config.address, - jdbcUrl = indexJdbcUrl, - databaseConnectionPoolSize = config.databaseConnectionPoolSize, - databaseConnectionTimeout = config.databaseConnectionTimeout, - tlsConfig = config.tlsConfig, - maxInboundMessageSize = config.maxInboundMessageSize, - initialLedgerConfiguration = Some(config.initialLedgerConfiguration), - configurationLoadTimeout = config.configurationLoadTimeout, - eventsPageSize = config.eventsPageSize, - portFile = config.portFile, - // TODO append-only: augment the following defaults for enabling the features for sandbox next - seeding = config.seeding.get, - managementServiceTimeout = config.managementServiceTimeout, - maxContractStateCacheSize = 0L, - maxContractKeyStateCacheSize = 0L, - enableMutableContractStateCache = false, - maxTransactionsInMemoryFanOutBufferSize = 0L, - enableInMemoryFanOutForLedgerApi = false, - enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, - ) - dbSupport <- DbSupport.owner( - jdbcUrl = apiServerConfig.jdbcUrl, - serverRole = ServerRole.ApiServer, - connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, - connectionTimeout = apiServerConfig.databaseConnectionTimeout, - metrics = metrics, - ) - userManagementStore = PersistentUserManagementStore.cached( - dbDispatcher = dbSupport.dbDispatcher, - metrics = metrics, - cacheExpiryAfterWriteInSeconds = - config.userManagementConfig.cacheExpiryAfterWriteInSeconds, - maximumCacheSize = config.userManagementConfig.maximumCacheSize, - )(servicesExecutionContext) - indexService <- StandaloneIndexService( - dbSupport = dbSupport, - ledgerId = ledgerId, - config = apiServerConfig, - metrics = metrics, - engine = engine, - servicesExecutionContext = servicesExecutionContext, - lfValueTranslationCache = lfValueTranslationCache, - ) - writeServiceWithDeduplicationSupport = WriteServiceWithDeduplicationSupport( - writeService, - indexService, - config.enableSelfServiceErrorCodes, - ) - apiServer <- StandaloneApiServer( - indexService = indexService, - userManagementStore = userManagementStore, - ledgerId = ledgerId, - config = apiServerConfig, - engine = engine, - commandConfig = config.commandConfig, - partyConfig = PartyConfiguration.default.copy( - implicitPartyAllocation = config.implicitPartyAllocation - ), - submissionConfig = SubmissionConfiguration.default, - optWriteService = Some(writeServiceWithDeduplicationSupport), - authService = authService, - healthChecks = healthChecks, - metrics = metrics, - timeServiceBackend = timeServiceBackend, - otherServices = List(resetService), - otherInterceptors = List(resetService), - servicesExecutionContext = servicesExecutionContext, - commandDeduplicationFeatures = CommandDeduplicationFeatures.of( - deduplicationPeriodSupport = Some( - CommandDeduplicationPeriodSupport.of( - offsetSupport = - CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION, - durationSupport = - CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT, - ) - ), - deduplicationType = CommandDeduplicationType.ASYNC_ONLY, - maxDeduplicationDurationEnforced = true, - ), - ) - _ = apiServerServicesClosed.completeWith(apiServer.servicesClosed()) - } yield { - Banner.show(Console.out) - logger.withoutContext.info( - "Initialized sandbox version {} with ledger-id = {}, port = {}, dar file = {}, time mode = {}, ledger = {}, auth-service = {}, contract ids seeding = {}{}{}", - BuildInfo.Version, - ledgerId, - apiServer.port.toString, - config.damlPackages, - timeProviderType.description, - ledgerType, - authService.getClass.getSimpleName, - config.seeding.get.name, - if (config.stackTraces) "" else ", stack traces = no", - config.profileDir match { - case None => "" - case Some(profileDir) => s", profile directory = $profileDir" - }, + ledgerId = specifiedLedgerId.getOrElse(UUID.randomUUID().toString) + timeServiceBackend = timeProviderType match { + case TimeProviderType.Static => + Some(TimeServiceBackend.simple(Instant.EPOCH)) + case TimeProviderType.WallClock => + None + } + readerWriter <- new SqlLedgerReaderWriter.Owner( + ledgerId = ledgerId, + participantId = config.participantId, + metrics = metrics, + engine = engine, + jdbcUrl = ledgerJdbcUrl, + resetOnStartup = false, + offsetVersion = 0, + logEntryIdAllocator = new SeedServiceLogEntryIdAllocator(SeedService(config.seeding.get)), + stateValueCache = caching.WeightedCache.from( + caching.WeightedCache.Configuration( + maximumWeight = MaximumStateValueCacheSize + ) + ), + timeProvider = timeServiceBackend.getOrElse(TimeProvider.UTC), + ) + readService = new TimedReadService( + KeyValueParticipantStateReader( + readerWriter, + metrics, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ), + metrics, + ) + writeService = new TimedWriteService( + new KeyValueParticipantStateWriter( + new TimedLedgerWriter(readerWriter, metrics), + metrics, + ), + metrics, + ) + _ <- ResourceOwner + .forFuture(() => Future.sequence(config.damlPackages.map(uploadDar(_, writeService)))) + .map(_ => ()) + + indexer <- new StandaloneIndexerServer( + readService = readService, + config = IndexerConfig( + participantId = config.participantId, + jdbcUrl = indexJdbcUrl, + startupMode = IndexerStartupMode.MigrateAndStart, + eventsPageSize = config.eventsPageSize, + allowExistingSchema = true, + enableCompression = config.enableCompression, + ), + servicesExecutionContext = servicesExecutionContext, + metrics = metrics, + lfValueTranslationCache = lfValueTranslationCache, + ) + healthChecks = new HealthChecks( + "read" -> readService, + "write" -> writeService, + "indexer" -> indexer, + ) + // Required to tie the loop between the API server and the reset service. + apiServerServicesClosed = Promise[Unit]() + apiServerConfig = ApiServerConfig( + participantId = config.participantId, + archiveFiles = config.damlPackages, + // Re-use the same port when resetting the server. + port = config.port, + address = config.address, + jdbcUrl = indexJdbcUrl, + databaseConnectionPoolSize = config.databaseConnectionPoolSize, + databaseConnectionTimeout = config.databaseConnectionTimeout, + tlsConfig = config.tlsConfig, + maxInboundMessageSize = config.maxInboundMessageSize, + initialLedgerConfiguration = Some(config.initialLedgerConfiguration), + configurationLoadTimeout = config.configurationLoadTimeout, + eventsPageSize = config.eventsPageSize, + portFile = config.portFile, + // TODO append-only: augment the following defaults for enabling the features for sandbox next + seeding = config.seeding.get, + managementServiceTimeout = config.managementServiceTimeout, + maxContractStateCacheSize = 0L, + maxContractKeyStateCacheSize = 0L, + enableMutableContractStateCache = false, + maxTransactionsInMemoryFanOutBufferSize = 0L, + enableInMemoryFanOutForLedgerApi = false, + enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + ) + dbSupport <- DbSupport.owner( + jdbcUrl = apiServerConfig.jdbcUrl, + serverRole = ServerRole.ApiServer, + connectionPoolSize = apiServerConfig.databaseConnectionPoolSize, + connectionTimeout = apiServerConfig.databaseConnectionTimeout, + metrics = metrics, + ) + userManagementStore = PersistentUserManagementStore.cached( + dbDispatcher = dbSupport.dbDispatcher, + metrics = metrics, + cacheExpiryAfterWriteInSeconds = + config.userManagementConfig.cacheExpiryAfterWriteInSeconds, + maximumCacheSize = config.userManagementConfig.maximumCacheSize, + )(servicesExecutionContext) + indexService <- StandaloneIndexService( + dbSupport = dbSupport, + ledgerId = ledgerId, + config = apiServerConfig, + metrics = metrics, + engine = engine, + servicesExecutionContext = servicesExecutionContext, + lfValueTranslationCache = lfValueTranslationCache, + ) + writeServiceWithDeduplicationSupport = WriteServiceWithDeduplicationSupport( + writeService, + indexService, + config.enableSelfServiceErrorCodes, + ) + apiServer <- StandaloneApiServer( + indexService = indexService, + userManagementStore = userManagementStore, + ledgerId = ledgerId, + config = apiServerConfig, + engine = engine, + commandConfig = config.commandConfig, + partyConfig = PartyConfiguration.default.copy( + implicitPartyAllocation = config.implicitPartyAllocation + ), + submissionConfig = SubmissionConfiguration.default, + optWriteService = Some(writeServiceWithDeduplicationSupport), + authService = authService, + healthChecks = healthChecks, + metrics = metrics, + timeServiceBackend = timeServiceBackend, + servicesExecutionContext = servicesExecutionContext, + commandDeduplicationFeatures = CommandDeduplicationFeatures.of( + deduplicationPeriodSupport = Some( + CommandDeduplicationPeriodSupport.of( + offsetSupport = + CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION, + durationSupport = + CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT, ) - apiServer - } + ), + deduplicationType = CommandDeduplicationType.ASYNC_ONLY, + maxDeduplicationDurationEnforced = true, + ), + ) + _ = apiServerServicesClosed.completeWith(apiServer.servicesClosed()) + } yield { + Banner.show(Console.out) + logger.withoutContext.info( + "Initialized sandbox version {} with ledger-id = {}, port = {}, dar file = {}, time mode = {}, ledger = {}, auth-service = {}, contract ids seeding = {}{}{}", + BuildInfo.Version, + ledgerId, + apiServer.port.toString, + config.damlPackages, + timeProviderType.description, + ledgerType, + authService.getClass.getSimpleName, + config.seeding.get.name, + if (config.stackTraces) "" else ", stack traces = no", + config.profileDir match { + case None => "" + case Some(profileDir) => s", profile directory = $profileDir" }, - resetOperation = - apiServer => Future.successful((Some(apiServer.port), StartupMode.ResetAndStart)), ) - } yield apiServer.port - + apiServer.port + } owner.acquire() } diff --git a/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceInMemoryIT.scala b/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceInMemoryIT.scala deleted file mode 100644 index d8609250019c..000000000000 --- a/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceInMemoryIT.scala +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandboxnext.services.reset - -import com.daml.platform.sandbox.SandboxBackend -import com.daml.platform.sandbox.services.reset.ResetServiceITBase -import com.daml.platform.sandboxnext.SandboxNextFixture - -final class ResetServiceInMemoryIT - extends ResetServiceITBase - with SandboxNextFixture - with SandboxBackend.H2Database { - override def spanScaleFactor: Double = super.spanScaleFactor * 2 -} diff --git a/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceOnPostgresqlIT.scala b/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceOnPostgresqlIT.scala deleted file mode 100644 index 68dea6cdc6a4..000000000000 --- a/ledger/sandbox/src/test/suite/scala/platform/sandboxnext/services/reset/ResetServiceOnPostgresqlIT.scala +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.daml.platform.sandboxnext.services.reset - -import com.daml.platform.sandbox.SandboxBackend -import com.daml.platform.sandbox.services.reset.ResetServiceITBase -import com.daml.platform.sandboxnext.SandboxNextFixture - -final class ResetServiceOnPostgresqlIT - extends ResetServiceITBase - with SandboxNextFixture - with SandboxBackend.Postgresql { - override def spanScaleFactor: Double = super.spanScaleFactor * 8 -}