Skip to content

Commit

Permalink
Port LedgerApiServer rate limiter interface from canton (#11577)
Browse files Browse the repository at this point in the history
* Port LedgerApiServer/ApiSubmissionService rate limiter interface from canton

Also includes ability to specify `AsyncCommitMode` indexer config (to get around
`DbType` being platform-package private).

changelog_begin
changelog_end

* Review feedback

* Review feedback - rely on DbType becoming public in #11572

* Review feedback including unit test
  • Loading branch information
oliverse-da authored Nov 8, 2021
1 parent 04cc5d7 commit 41fb289
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ import com.daml.platform.services.time.TimeProviderType
import io.grpc.BindableService
import io.grpc.protobuf.services.ProtoReflectionService
import scalaz.syntax.tag._

import java.time.Duration

import com.daml.telemetry.TelemetryContext

import scala.collection.immutable
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -95,6 +97,7 @@ private[daml] object ApiServices {
seedService: SeedService,
managementServiceTimeout: Duration,
enableSelfServiceErrorCodes: Boolean,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit
materializer: Materializer,
esf: ExecutionSequencerFactory,
Expand Down Expand Up @@ -131,7 +134,11 @@ private[daml] object ApiServices {
configurationLoadTimeout = ScalaDuration.fromNanos(configurationLoadTimeout.toNanos),
)
services <- Resource(
Future(createServices(ledgerId, currentLedgerConfiguration)(servicesExecutionContext))
Future(
createServices(ledgerId, currentLedgerConfiguration, checkOverloaded)(
servicesExecutionContext
)
)
)(services =>
Future {
services.foreach {
Expand All @@ -146,6 +153,7 @@ private[daml] object ApiServices {
private def createServices(
ledgerId: LedgerId,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit executionContext: ExecutionContext): List[BindableService] = {
val apiTransactionService =
ApiTransactionService.create(ledgerId, transactionsService, metrics, errorsVersionsSwitcher)
Expand Down Expand Up @@ -191,6 +199,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription,
apiCompletionService,
apiTransactionService,
checkOverloaded,
)

val apiReflectionService = ProtoReflectionService.newInstance()
Expand All @@ -217,6 +226,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
apiCompletionService: GrpcCommandCompletionService,
apiTransactionService: GrpcTransactionService,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
)(implicit executionContext: ExecutionContext): List[BindableService] = {
optWriteService.toList.flatMap { writeService =>
val commandExecutor = new TimedCommandExecutor(
Expand Down Expand Up @@ -245,6 +255,7 @@ private[daml] object ApiServices {
ledgerConfigurationSubscription,
seedService,
commandExecutor,
checkOverloaded,
ApiSubmissionService.Configuration(
partyConfig.implicitPartyAllocation,
submissionConfig.enableDeduplication,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import com.daml.platform.store.LfValueTranslationCache
import com.daml.ports.{Port, PortFiles}
import io.grpc.{BindableService, ServerInterceptor}
import scalaz.{-\/, \/-}

import java.io.File
import java.time.Clock

import com.daml.telemetry.TelemetryContext

import scala.collection.immutable
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success, Try}
Expand All @@ -56,6 +58,8 @@ final class StandaloneApiServer(
engine: Engine,
servicesExecutionContext: ExecutionContextExecutor,
lfValueTranslationCache: LfValueTranslationCache.Cache,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult] =
_ => None, // Used for Canton rate-limiting
)(implicit actorSystem: ActorSystem, materializer: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[ApiServer] {

Expand Down Expand Up @@ -127,6 +131,7 @@ final class StandaloneApiServer(
seedService = SeedService(config.seeding),
managementServiceTimeout = config.managementServiceTimeout,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
checkOverloaded = checkOverloaded,
)(materializer, executionSequencerFactory, loggingContext)
.map(_.withServices(otherServices))
apiServer <- new LedgerApiServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private[apiserver] object ApiSubmissionService {
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
Expand All @@ -73,6 +74,7 @@ private[apiserver] object ApiSubmissionService {
ledgerConfigurationSubscription,
seedService,
commandExecutor,
checkOverloaded,
configuration,
metrics,
errorCodesVersionSwitcher,
Expand Down Expand Up @@ -102,6 +104,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
val errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
Expand Down Expand Up @@ -221,12 +224,20 @@ private[apiserver] final class ApiSubmissionService private[services] (
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
transactionInfo <- handleCommandExecutionResult(result)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(transactionInfo, partyAllocationResults, ledgerConfig)
} yield submissionResult
checkOverloaded(telemetryContext) match {
case Some(submissionResult) => Future.successful(submissionResult)
case None =>
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
transactionInfo <- handleCommandExecutionResult(result)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(
transactionInfo,
partyAllocationResults,
ledgerConfig,
)
} yield submissionResult
}

// Takes the whole transaction to ensure to traverse it only if necessary
private[services] def allocateMissingInformees(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.daml.ledger.participant.state.index.v2.{
IndexPartyManagementService,
IndexSubmissionService,
}
import com.daml.ledger.participant.state.v2.WriteService
import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteService}
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf
import com.daml.lf.command.{Commands => LfCommands}
Expand All @@ -35,15 +35,15 @@ import com.daml.platform.apiserver.services.ApiSubmissionServiceSpec._
import com.daml.platform.apiserver.SeedService
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
import com.google.rpc.status.{Status => RpcStatus}
import io.grpc.Status
import io.grpc.{Status, StatusRuntimeException}
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{Assertion, Inside}

import java.time.Duration
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -413,6 +413,32 @@ class ApiSubmissionServiceSpec
Success(succeed)
})
}

it should "rate-limit when configured to do so" in {
val grpcError = RpcStatus.of(Status.Code.ABORTED.value(), s"Quota Exceeded", Seq.empty)

val service =
newSubmissionService(
mock[state.WriteService],
mock[IndexPartyManagementService],
implicitPartyAllocation = true,
deduplicationEnabled = false,
mockIndexSubmissionService = mock[IndexSubmissionService],
commandExecutor = mock[CommandExecutor],
checkOverloaded = _ => Some(SubmissionResult.SynchronousError(grpcError)),
)

val submitRequest = newSubmitRequest()
service
.submit(submitRequest)
.transform {
case Failure(e: StatusRuntimeException)
if e.getStatus.getCode.value == grpcError.code && e.getStatus.getDescription == grpcError.message =>
Success(succeed)
case result =>
Try(fail(s"Expected submission to be aborted, but got ${result}"))
}
}
}

object ApiSubmissionServiceSpec {
Expand Down Expand Up @@ -449,6 +475,7 @@ object ApiSubmissionServiceSpec {
deduplicationEnabled: Boolean = true,
mockIndexSubmissionService: IndexSubmissionService = mock[IndexSubmissionService],
useSelfServiceErrorCodes: Boolean = false,
checkOverloaded: TelemetryContext => Option[SubmissionResult] = _ => None,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
Expand Down Expand Up @@ -489,6 +516,7 @@ object ApiSubmissionServiceSpec {
errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(
enableSelfServiceErrorCodes = useSelfServiceErrorCodes
),
checkOverloaded = checkOverloaded,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ final class SandboxServer(
seedService = seedingService,
managementServiceTimeout = config.managementServiceTimeout,
enableSelfServiceErrorCodes = config.enableSelfServiceErrorCodes,
checkOverloaded = _ => None,
)(materializer, executionSequencerFactory, loggingContext)
.map(_.withServices(List(resetService)))
apiServer <- new LedgerApiServer(
Expand Down

0 comments on commit 41fb289

Please sign in to comment.