From 41fb2890d6a7effa532950f201baa6e226860ac6 Mon Sep 17 00:00:00 2001 From: Oliver Seeliger <46452754+oliverse-da@users.noreply.github.com> Date: Mon, 8 Nov 2021 20:55:57 +0100 Subject: [PATCH] Port LedgerApiServer rate limiter interface from canton (#11577) * Port LedgerApiServer/ApiSubmissionService rate limiter interface from canton Also includes ability to specify `AsyncCommitMode` indexer config (to get around `DbType` being platform-package private). changelog_begin changelog_end * Review feedback * Review feedback - rely on DbType becoming public in #11572 * Review feedback including unit test --- .../platform/apiserver/ApiServices.scala | 15 ++++++-- .../apiserver/StandaloneApiServer.scala | 7 +++- .../services/ApiSubmissionService.scala | 23 +++++++++---- .../services/ApiSubmissionServiceSpec.scala | 34 +++++++++++++++++-- .../platform/sandbox/SandboxServer.scala | 1 + 5 files changed, 68 insertions(+), 12 deletions(-) diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala index e7f750ea2ba3..ddde8bd255b7 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala @@ -52,8 +52,10 @@ import com.daml.platform.services.time.TimeProviderType import io.grpc.BindableService import io.grpc.protobuf.services.ProtoReflectionService import scalaz.syntax.tag._ - import java.time.Duration + +import com.daml.telemetry.TelemetryContext + import scala.collection.immutable import scala.concurrent.duration.{Duration => ScalaDuration} import scala.concurrent.{ExecutionContext, Future} @@ -95,6 +97,7 @@ private[daml] object ApiServices { seedService: SeedService, managementServiceTimeout: Duration, enableSelfServiceErrorCodes: Boolean, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult], )(implicit materializer: Materializer, esf: ExecutionSequencerFactory, @@ -131,7 +134,11 @@ private[daml] object ApiServices { configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos), ) services <- Resource( - Future(createServices(ledgerId, currentLedgerConfiguration)(servicesExecutionContext)) + Future( + createServices(ledgerId, currentLedgerConfiguration, checkOverloaded)( + servicesExecutionContext + ) + ) )(services => Future { services.foreach { @@ -146,6 +153,7 @@ private[daml] object ApiServices { private def createServices( ledgerId: LedgerId, ledgerConfigurationSubscription: LedgerConfigurationSubscription, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult], )(implicit executionContext: ExecutionContext): List[BindableService] = { val apiTransactionService = ApiTransactionService.create(ledgerId, transactionsService, metrics, errorsVersionsSwitcher) @@ -191,6 +199,7 @@ private[daml] object ApiServices { ledgerConfigurationSubscription, apiCompletionService, apiTransactionService, + checkOverloaded, ) val apiReflectionService = ProtoReflectionService.newInstance() @@ -217,6 +226,7 @@ private[daml] object ApiServices { ledgerConfigurationSubscription: LedgerConfigurationSubscription, apiCompletionService: GrpcCommandCompletionService, apiTransactionService: GrpcTransactionService, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult], )(implicit executionContext: ExecutionContext): List[BindableService] = { optWriteService.toList.flatMap { writeService => val commandExecutor = new TimedCommandExecutor( @@ -245,6 +255,7 @@ private[daml] object ApiServices { ledgerConfigurationSubscription, seedService, commandExecutor, + checkOverloaded, ApiSubmissionService.Configuration( partyConfig.implicitPartyAllocation, submissionConfig.enableDeduplication, diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala index 481b2c344eac..901d618752e9 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/StandaloneApiServer.scala @@ -33,9 +33,11 @@ import com.daml.platform.store.LfValueTranslationCache import com.daml.ports.{Port, PortFiles} import io.grpc.{BindableService, ServerInterceptor} import scalaz.{-\/, \/-} - import java.io.File import java.time.Clock + +import com.daml.telemetry.TelemetryContext + import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success, Try} @@ -56,6 +58,8 @@ final class StandaloneApiServer( engine: Engine, servicesExecutionContext: ExecutionContextExecutor, lfValueTranslationCache: LfValueTranslationCache.Cache, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult] = + _ => None, // Used for Canton rate-limiting )(implicit actorSystem: ActorSystem, materializer: Materializer, loggingContext: LoggingContext) extends ResourceOwner[ApiServer] { @@ -127,6 +131,7 @@ final class StandaloneApiServer( seedService = SeedService(config.seeding), managementServiceTimeout = config.managementServiceTimeout, enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + checkOverloaded = checkOverloaded, )(materializer, executionSequencerFactory, loggingContext) .map(_.withServices(otherServices)) apiServer <- new LedgerApiServer( diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 8b775dba1250..9387fe1e3894 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -56,6 +56,7 @@ private[apiserver] object ApiSubmissionService { ledgerConfigurationSubscription: LedgerConfigurationSubscription, seedService: SeedService, commandExecutor: CommandExecutor, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult], configuration: ApiSubmissionService.Configuration, metrics: Metrics, errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, @@ -73,6 +74,7 @@ private[apiserver] object ApiSubmissionService { ledgerConfigurationSubscription, seedService, commandExecutor, + checkOverloaded, configuration, metrics, errorCodesVersionSwitcher, @@ -102,6 +104,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( ledgerConfigurationSubscription: LedgerConfigurationSubscription, seedService: SeedService, commandExecutor: CommandExecutor, + checkOverloaded: TelemetryContext => Option[state.SubmissionResult], configuration: ApiSubmissionService.Configuration, metrics: Metrics, val errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, @@ -221,12 +224,20 @@ private[apiserver] final class ApiSubmissionService private[services] ( telemetryContext: TelemetryContext, contextualizedErrorLogger: ContextualizedErrorLogger, ): Future[state.SubmissionResult] = - for { - result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig) - transactionInfo <- handleCommandExecutionResult(result) - partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction) - submissionResult <- submitTransaction(transactionInfo, partyAllocationResults, ledgerConfig) - } yield submissionResult + checkOverloaded(telemetryContext) match { + case Some(submissionResult) => Future.successful(submissionResult) + case None => + for { + result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig) + transactionInfo <- handleCommandExecutionResult(result) + partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction) + submissionResult <- submitTransaction( + transactionInfo, + partyAllocationResults, + ledgerConfig, + ) + } yield submissionResult + } // Takes the whole transaction to ensure to traverse it only if necessary private[services] def allocateMissingInformees( diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index 9b3bafc6e6dd..7c9bcc6be3c3 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -14,7 +14,7 @@ import com.daml.ledger.participant.state.index.v2.{ IndexPartyManagementService, IndexSubmissionService, } -import com.daml.ledger.participant.state.v2.WriteService +import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteService} import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf import com.daml.lf.command.{Commands => LfCommands} @@ -35,15 +35,15 @@ import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._ import com.daml.platform.apiserver.SeedService import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext} import com.google.rpc.status.{Status => RpcStatus} -import io.grpc.Status +import io.grpc.{Status, StatusRuntimeException} import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{Assertion, Inside} - import java.time.Duration import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.atomic.AtomicInteger + import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -413,6 +413,32 @@ class ApiSubmissionServiceSpec Success(succeed) }) } + + it should "rate-limit when configured to do so" in { + val grpcError = RpcStatus.of(Status.Code.ABORTED.value(), s"Quota Exceeded", Seq.empty) + + val service = + newSubmissionService( + mock[state.WriteService], + mock[IndexPartyManagementService], + implicitPartyAllocation = true, + deduplicationEnabled = false, + mockIndexSubmissionService = mock[IndexSubmissionService], + commandExecutor = mock[CommandExecutor], + checkOverloaded = _ => Some(SubmissionResult.SynchronousError(grpcError)), + ) + + val submitRequest = newSubmitRequest() + service + .submit(submitRequest) + .transform { + case Failure(e: StatusRuntimeException) + if e.getStatus.getCode.value == grpcError.code && e.getStatus.getDescription == grpcError.message => + Success(succeed) + case result => + Try(fail(s"Expected submission to be aborted, but got ${result}")) + } + } } object ApiSubmissionServiceSpec { @@ -449,6 +475,7 @@ object ApiSubmissionServiceSpec { deduplicationEnabled: Boolean = true, mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService], useSelfServiceErrorCodes: Boolean = false, + checkOverloaded: TelemetryContext => Option[SubmissionResult] = _ => None, )(implicit executionContext: ExecutionContext, loggingContext: LoggingContext, @@ -489,6 +516,7 @@ object ApiSubmissionServiceSpec { errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( enableSelfServiceErrorCodes = useSelfServiceErrorCodes ), + checkOverloaded = checkOverloaded, ) } } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala index c4ff66609471..61632259aa62 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/SandboxServer.scala @@ -397,6 +397,7 @@ final class SandboxServer( seedService = seedingService, managementServiceTimeout = config.managementServiceTimeout, enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes, + checkOverloaded = _ => None, )(materializer, executionSequencerFactory, loggingContext) .map(_.withServices(List(resetService))) apiServer <- new LedgerApiServer(