From c60c94b13f446a44315a62dc41eb15dd083c6d1a Mon Sep 17 00:00:00 2001 From: pbatko-da Date: Fri, 22 Oct 2021 12:28:15 +0200 Subject: [PATCH] [DPP-645][Self-service error codes] Adapt ApiConfigManagementService (#11312) CHANGELOG_BEGIN CHANGELOG_END --- .../error/ErrorCodesVersionSwitcher.scala | 2 +- .../error/definitions/LedgerApiErrors.scala | 16 ++ .../api/validation/ErrorFactories.scala | 11 ++ .../api/validation/ErrorFactoriesSpec.scala | 14 ++ .../platform/apiserver/ApiServices.scala | 1 + .../admin/ApiConfigManagementService.scala | 28 +++- .../ApiConfigManagementServiceSpec.scala | 152 +++++++++++------- 7 files changed, 158 insertions(+), 66 deletions(-) diff --git a/ledger/error/src/main/scala/com/daml/error/ErrorCodesVersionSwitcher.scala b/ledger/error/src/main/scala/com/daml/error/ErrorCodesVersionSwitcher.scala index 0172b67e9be1..e3dbc139cc5f 100644 --- a/ledger/error/src/main/scala/com/daml/error/ErrorCodesVersionSwitcher.scala +++ b/ledger/error/src/main/scala/com/daml/error/ErrorCodesVersionSwitcher.scala @@ -6,7 +6,7 @@ import io.grpc.StatusRuntimeException import scala.concurrent.Future -final class ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes: Boolean) +class ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes: Boolean) extends ValueSwitch[StatusRuntimeException](enableSelfServiceErrorCodes) { def chooseAsFailedFuture[T]( diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index 010ee1565bb1..fc098fe3e1c7 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -32,6 +32,22 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ) } + object WriteErrors extends ErrorGroup() { + @Explanation("This rejection is given when a configuration entry write was rejected.") + @Resolution("Fetch newest configuration and/or retry.") + object ConfigurationEntryRejected + extends ErrorCode( + id = "CONFIGURATION_ENTRY_REJECTED", + ErrorCategory.InvalidGivenCurrentSystemStateOther, + ) { + case class Reject(message: String)(implicit + loggingContext: ContextualizedErrorLogger + ) extends LoggingTransactionErrorImpl( + cause = message + ) + } + } + object ReadErrors extends ErrorGroup() { @Explanation("This rejection is given when a package id is malformed.") diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index 6d3db01a5b5c..83da071bb183 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -243,6 +243,15 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch grpcError(statusBuilder.build()) } + def configurationEntryRejected(message: String, definiteAnswer: Option[Boolean])(implicit + contextualizedErrorLogger: ContextualizedErrorLogger + ): StatusRuntimeException = { + errorCodesVersionSwitcher.choose( + v1 = aborted(message, definiteAnswer), + v2 = LedgerApiErrors.WriteErrors.ConfigurationEntryRejected.Reject(message).asGrpcError, + ) + } + // permission denied is intentionally without description to ensure we don't leak security relevant information by accident def permissionDenied(cause: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger @@ -302,6 +311,8 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch addDefiniteAnswerDetails(definiteAnswer, statusBuilder) grpcError(statusBuilder.build()) }, + // TODO error codes: This error group is confusing for this generic error as it can be dispatched + // from call-sites that do not involve Daml interpreter. v2 = LedgerApiErrors.InterpreterErrors.LookupErrors.LedgerConfigurationNotFound .Reject() .asGrpcError, diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index 98e0fe26312c..a23384f8ab5e 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -86,6 +86,20 @@ class ErrorFactoriesSpec extends AnyWordSpec with Matchers with TableDrivenPrope ) } + "return the configurationEntryRejected" in { + assertVersionedError(_.configurationEntryRejected("message123", None))( + v1_code = Code.ABORTED, + v1_message = "message123", + v1_details = Seq.empty, + v2_code = Code.FAILED_PRECONDITION, + v2_message = s"CONFIGURATION_ENTRY_REJECTED(9,$correlationId): message123", + v2_details = Seq[ErrorDetails.ErrorDetail]( + ErrorDetails.ErrorInfoDetail("CONFIGURATION_ENTRY_REJECTED"), + DefaultTraceIdRequestInfo, + ), + ) + } + "return a transactionNotFound error" in { assertVersionedError(_.transactionNotFound(Ref.TransactionId.assertFromString("tId")))( v1_code = Code.NOT_FOUND, diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala index 27658880007d..7e53dbbe16d8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/ApiServices.scala @@ -289,6 +289,7 @@ private[daml] object ApiServices { configManagementService, writeService, timeProvider, + errorsVersionsSwitcher, ) val apiParticipantPruningService = diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala index 647295c8d36b..964248b894f8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala @@ -7,7 +7,11 @@ import java.time.{Duration => JDuration} import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.api.util.{DurationConversion, TimeProvider, TimestampConversion} -import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger} +import com.daml.error.{ + ContextualizedErrorLogger, + DamlContextualizedErrorLogger, + ErrorCodesVersionSwitcher, +} import com.daml.ledger.api.domain import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerOffset} import com.daml.ledger.api.v1.admin.config_management_service.ConfigManagementServiceGrpc.ConfigManagementService @@ -36,6 +40,7 @@ private[apiserver] final class ApiConfigManagementService private ( writeService: state.WriteConfigService, timeProvider: TimeProvider, submissionIdGenerator: String => Ref.SubmissionId, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, )(implicit materializer: Materializer, executionContext: ExecutionContext, @@ -46,6 +51,9 @@ private[apiserver] final class ApiConfigManagementService private ( private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None) + private val errorFactories = + com.daml.platform.server.api.validation.ErrorFactories(errorCodesVersionSwitcher) + override def close(): Unit = () override def bindService(): ServerServiceDefinition = @@ -60,7 +68,7 @@ private[apiserver] final class ApiConfigManagementService private ( Future.successful(configurationToResponse(configuration)) case None => // TODO error codes: Duplicate of missingLedgerConfig - Future.failed(ErrorFactories.missingLedgerConfigUponRequest) + Future.failed(errorFactories.missingLedgerConfigUponRequest) } .andThen(logger.logErrorsOnCall[GetTimeModelResponse]) } @@ -106,7 +114,7 @@ private[apiserver] final class ApiConfigManagementService private ( logger.warn( "Could not get the current time model. The index does not yet have any ledger configuration." ) - Future.failed(ErrorFactories.missingLedgerConfig(None)) + Future.failed(errorFactories.missingLedgerConfig(None)) } (ledgerEndBeforeRequest, currentConfig) = configuration @@ -117,7 +125,7 @@ private[apiserver] final class ApiConfigManagementService private ( Future.failed( ValidationLogger.logFailureWithContext( request, - ErrorFactories.invalidArgument(None)( + errorFactories.invalidArgument(None)( s"Mismatching configuration generation, expected $expectedGeneration, received ${request.configurationGeneration}" ), ) @@ -139,6 +147,7 @@ private[apiserver] final class ApiConfigManagementService private ( writeService, index, ledgerEndBeforeRequest, + errorFactories, ), timeToLive = JDuration.ofMillis(params.timeToLive.toMillis), ) @@ -176,7 +185,7 @@ private[apiserver] final class ApiConfigManagementService private ( minSkew = DurationConversion.fromProto(pMinSkew), maxSkew = DurationConversion.fromProto(pMaxSkew), ) match { - case Failure(err) => Left(ErrorFactories.invalidArgument(None)(err.toString)) + case Failure(err) => Left(errorFactories.invalidArgument(None)(err.toString)) case Success(ok) => Right(ok) } // TODO(JM): The maximum record time should be constrained, probably by the current active time model? @@ -189,7 +198,7 @@ private[apiserver] final class ApiConfigManagementService private ( } maximumRecordTime <- Time.Timestamp .fromInstant(mrtInstant) - .fold(err => Left(ErrorFactories.invalidArgument(None)(err)), Right(_)) + .fold(err => Left(errorFactories.invalidArgument(None)(err)), Right(_)) } yield SetTimeModelParameters(newTimeModel, maximumRecordTime, timeToLive) } @@ -201,6 +210,7 @@ private[apiserver] object ApiConfigManagementService { readBackend: IndexConfigManagementService, writeBackend: state.WriteConfigService, timeProvider: TimeProvider, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, submissionIdGenerator: String => Ref.SubmissionId = augmentSubmissionId, )(implicit materializer: Materializer, @@ -212,13 +222,15 @@ private[apiserver] object ApiConfigManagementService { writeBackend, timeProvider, submissionIdGenerator, + errorCodesVersionSwitcher, ) private final class SynchronousResponseStrategy( writeConfigService: state.WriteConfigService, configManagementService: IndexConfigManagementService, ledgerEnd: LedgerOffset.Absolute, - )(implicit loggingContext: LoggingContext) + errorFactories: ErrorFactories, + )(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger) extends SynchronousResponse.Strategy[ (Time.Timestamp, Configuration), ConfigurationEntry, @@ -252,7 +264,7 @@ private[apiserver] object ApiConfigManagementService { submissionId: Ref.SubmissionId ): PartialFunction[ConfigurationEntry, StatusRuntimeException] = { case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) => - ErrorFactories.aborted(reason, None) + errorFactories.configurationEntryRejected(reason, None) } } diff --git a/ledger/participant-integration-api/src/test/suite/scala/db/migration/translation/apiserver/services/admin/ApiConfigManagementServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/db/migration/translation/apiserver/services/admin/ApiConfigManagementServiceSpec.scala index 8d2f7aaee8be..7e0eb500c0ce 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/db/migration/translation/apiserver/services/admin/ApiConfigManagementServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/db/migration/translation/apiserver/services/admin/ApiConfigManagementServiceSpec.scala @@ -3,15 +3,11 @@ package com.daml.platform.apiserver.services.admin -import java.time.Duration -import java.util.concurrent.CompletableFuture.completedFuture -import java.util.concurrent.CompletionStage -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} - import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.api.util.TimeProvider +import com.daml.error.ErrorCodesVersionSwitcher import com.daml.grpc.{GrpcException, GrpcStatus} import com.daml.ledger.api.domain.{ConfigurationEntry, LedgerOffset} import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -22,7 +18,7 @@ import com.daml.ledger.api.v1.admin.config_management_service.{ } import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} import com.daml.ledger.participant.state.index.v2.IndexConfigManagementService -import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteConfigService} +import com.daml.ledger.participant.state.v2.{SubmissionResult, WriteConfigService, WriteService} import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Ref.SubmissionId import com.daml.lf.data.{Ref, Time} @@ -37,9 +33,13 @@ import org.scalatest.Inside import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec +import java.time.Duration +import java.util.concurrent.CompletableFuture.completedFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import scala.collection.immutable -import scala.concurrent.{Await, Future, Promise} import scala.concurrent.duration.{Duration => ScalaDuration} +import scala.concurrent.{Await, Future, Promise} import scala.util.{Failure, Success} class ApiConfigManagementServiceSpec @@ -53,6 +53,8 @@ class ApiConfigManagementServiceSpec private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting + private val useSelfServiceErrorCodes = mock[ErrorCodesVersionSwitcher] + "ApiConfigManagementService" should { "get the time model" in { val indexedTimeModel = LedgerTimeModel( @@ -74,31 +76,27 @@ class ApiConfigManagementServiceSpec ), writeService, TimeProvider.UTC, - ) - - apiConfigManagementService.getTimeModel(GetTimeModelRequest.defaultInstance).map { response => - response.timeModel should be(Some(expectedTimeModel)) - verifyZeroInteractions(writeService) - succeed - } - } - - "return a `NOT_FOUND` error if a time model is not found" in { - val writeService = mock[state.WriteConfigService] - val apiConfigManagementService = ApiConfigManagementService.createApiService( - EmptyIndexConfigManagementService, - writeService, - TimeProvider.UTC, + useSelfServiceErrorCodes, ) apiConfigManagementService .getTimeModel(GetTimeModelRequest.defaultInstance) - .transform(Success.apply) .map { response => - response should matchPattern { case Failure(GrpcException(GrpcStatus.NOT_FOUND(), _)) => } + response.timeModel should be(Some(expectedTimeModel)) + verifyZeroInteractions(writeService) + verifyZeroInteractions(useSelfServiceErrorCodes) + succeed } } + "return a `NOT_FOUND` error if a time model is not found (V1 error codes)" in { + testReturnANotFoundErrorIfTimeModelNotFound(false) + } + + "return a `NOT_FOUND` error if a time model is not found (V2 self-service error codes)" in { + testReturnANotFoundErrorIfTimeModelNotFound(true) + } + "set a new time model" in { val maximumDeduplicationTime = Duration.ofHours(6) val initialGeneration = 2L @@ -135,6 +133,7 @@ class ApiConfigManagementServiceSpec indexService, writeService, timeProvider, + useSelfServiceErrorCodes, ) apiConfigManagementService @@ -155,43 +154,17 @@ class ApiConfigManagementServiceSpec .map { response => response.configurationGeneration should be(expectedGeneration) currentConfiguration() should be(Some(expectedConfiguration)) + verifyZeroInteractions(useSelfServiceErrorCodes) + succeed } } - "refuse to set a new time model if none is indexed yet" in { - val initialGeneration = 0L - - val timeProvider = TimeProvider.UTC - val maximumRecordTime = timeProvider.getCurrentTime.plusSeconds(60) - - val writeService = mock[state.WriteService] - val apiConfigManagementService = ApiConfigManagementService.createApiService( - EmptyIndexConfigManagementService, - writeService, - timeProvider, - ) + "refuse to set a new time model if none is indexed (V1 error codes)" in { + testRefuseToSetANewTimeModelIfNoneIsIndexedYet(false) + } - apiConfigManagementService - .setTimeModel( - SetTimeModelRequest.of( - "a submission ID", - maximumRecordTime = Some(Timestamp.of(maximumRecordTime.getEpochSecond, 0)), - configurationGeneration = initialGeneration, - newTimeModel = Some( - TimeModel( - avgTransactionLatency = Some(DurationProto.of(10, 0)), - minSkew = Some(DurationProto.of(20, 0)), - maxSkew = Some(DurationProto.of(40, 0)), - ) - ), - ) - ) - .transform(Success.apply) - .map { response => - verifyZeroInteractions(writeService) - response should matchPattern { case Failure(GrpcException(GrpcStatus.UNAVAILABLE(), _)) => - } - } + "refuse to set a new time model if none is indexed (V2 self-service error codes)" in { + testRefuseToSetANewTimeModelIfNoneIsIndexedYet(true) } "propagate trace context" in { @@ -199,6 +172,7 @@ class ApiConfigManagementServiceSpec new FakeStreamingIndexConfigManagementService(someConfigurationEntries), TestWriteConfigService, TimeProvider.UTC, + useSelfServiceErrorCodes, _ => Ref.SubmissionId.assertFromString("aSubmission"), ) @@ -212,9 +186,73 @@ class ApiConfigManagementServiceSpec } .map { _ => spanExporter.finishedSpanAttributes should contain(anApplicationIdSpanAttribute) + verifyZeroInteractions(useSelfServiceErrorCodes) + succeed } } } + + private def testReturnANotFoundErrorIfTimeModelNotFound(useSelfServiceErrorCodes: Boolean) = { + val errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(useSelfServiceErrorCodes) + val writeService = mock[WriteConfigService] + val apiConfigManagementService = ApiConfigManagementService.createApiService( + EmptyIndexConfigManagementService, + writeService, + TimeProvider.UTC, + errorCodesVersionSwitcher, + ) + + apiConfigManagementService + .getTimeModel(GetTimeModelRequest.defaultInstance) + .transform(Success.apply) + .map { response => + response should matchPattern { case Failure(GrpcException(GrpcStatus.NOT_FOUND(), _)) => + } + } + } + + private def testRefuseToSetANewTimeModelIfNoneIsIndexedYet(useSelfServiceErrorCodes: Boolean) = { + val errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher(useSelfServiceErrorCodes) + val initialGeneration = 0L + + val timeProvider = TimeProvider.UTC + val maximumRecordTime = timeProvider.getCurrentTime.plusSeconds(60) + + val writeService = mock[WriteService] + val apiConfigManagementService = ApiConfigManagementService.createApiService( + EmptyIndexConfigManagementService, + writeService, + timeProvider, + errorCodesVersionSwitcher, + ) + + apiConfigManagementService + .setTimeModel( + SetTimeModelRequest.of( + "a submission ID", + maximumRecordTime = Some(Timestamp.of(maximumRecordTime.getEpochSecond, 0)), + configurationGeneration = initialGeneration, + newTimeModel = Some( + TimeModel( + avgTransactionLatency = Some(DurationProto.of(10, 0)), + minSkew = Some(DurationProto.of(20, 0)), + maxSkew = Some(DurationProto.of(40, 0)), + ) + ), + ) + ) + .transform(Success.apply) + .map { response => + verifyZeroInteractions(writeService) + if (useSelfServiceErrorCodes) { + response should matchPattern { case Failure(GrpcException(GrpcStatus.NOT_FOUND(), _)) => + } + } else { + response should matchPattern { case Failure(GrpcException(GrpcStatus.UNAVAILABLE(), _)) => + } + } + } + } } object ApiConfigManagementServiceSpec {