diff --git a/docs/source/error-codes/self-service/index.rst b/docs/source/error-codes/self-service/index.rst index 6d8e78f76453..cbfdf609f3be 100644 --- a/docs/source/error-codes/self-service/index.rst +++ b/docs/source/error-codes/self-service/index.rst @@ -496,6 +496,8 @@ The following gRPC status codes have changed for submission rejections in Sandbo +-----------------------------------+---------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------+ |ABORTED |NOT_FOUND |NOT_FOUND is now returned on transaction rejections on not found contract. |CONTRACT_NOT_FOUND | +-----------------------------------+---------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------+ +|ABORTED |NOT_FOUND |NOT_FOUND is now returned on rejections occurring due to missing ledger configuration. |LEDGER_CONFIGURATION_NOT_FOUND | ++-----------------------------------+---------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------+ |INVALID_ARGUMENT |INTERNAL |INTERNAL is now returned on transaction rejections on system faults. |DISPUTED | +-----------------------------------+---------------------------------------+--------------------------------------------------------------------------------------------+----------------------------------------+ |INVALID_ARGUMENT |NOT_FOUND |PARTY_NOT_KNOWN_ON_LEDGER is now returned on transaction rejections on unallocated parties. |PARTY_NOT_KNOWN_ON_LEDGER | diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/IndexErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/IndexErrors.scala index 1ff35055bb12..2d28f00db7aa 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/IndexErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/IndexErrors.scala @@ -47,6 +47,23 @@ object IndexErrors extends IndexErrorGroup { throwableO = Some(throwable), ) } + + @Explanation( + "This error occurs if the result set returned by a query against the Index database is invalid." + ) + @Resolution("Contact support.") + object ResultSetError + extends ErrorCode( + id = "INDEX_DB_INVALID_RESULT_SET", + ErrorCategory.SystemInternalAssumptionViolated, + ) + with HasUnapply { + case class Reject(message: String)(implicit + val loggingContext: ContextualizedErrorLogger + ) extends LoggingTransactionErrorImpl( + cause = message + ) + } } trait HasUnapply { diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index f75c674fd522..3c77636475bb 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -393,6 +393,12 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ) extends LoggingTransactionErrorImpl( cause = "The ledger configuration could not be retrieved." ) + + case class RejectWithMessage(message: String)(implicit + loggingContext: ContextualizedErrorLogger + ) extends LoggingTransactionErrorImpl( + cause = s"The ledger configuration could not be retrieved: $message." + ) } } @@ -594,6 +600,10 @@ object LedgerApiErrors extends LedgerApiErrorGroup { case class VersionService(message: String)(implicit loggingContext: ContextualizedErrorLogger ) extends LoggingTransactionErrorImpl(cause = message) + + case class Buffer(message: String, override val throwableO: Option[Throwable])(implicit + loggingContext: ContextualizedErrorLogger + ) extends LoggingTransactionErrorImpl(cause = message, throwableO = throwableO) } object AdminServices { @@ -770,12 +780,14 @@ object LedgerApiErrors extends LedgerApiErrorGroup { ErrorCategory.InvalidGivenCurrentSystemStateOther, // It may succeed at a later time ) { case class RejectEnriched( - details: String, + message: String, ledger_time: Instant, ledger_time_lower_bound: Instant, ledger_time_upper_bound: Instant, )(implicit loggingContext: ContextualizedErrorLogger) - extends LoggingTransactionErrorImpl(cause = s"Invalid ledger time: $details") + extends LoggingTransactionErrorImpl( + cause = s"Invalid ledger time: $message" + ) case class RejectSimple( details: String diff --git a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala index 948abed87d00..00b1abc55833 100644 --- a/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala +++ b/ledger/ledger-api-client/src/main/scala/com/digitalasset/ledger/client/services/commands/tracker/CompletionResponse.scala @@ -111,9 +111,9 @@ object CompletionResponse { val statusBuilder = GrpcStatus.toJavaBuilder(notOkResponse.grpcStatus) GrpcStatus.buildStatus(metadata, statusBuilder) case CompletionResponse.TimeoutResponse(_) => - errorFactories.TrackerErrors.timedOutOnAwaitingForCommandCompletion() + errorFactories.SubmissionQueueErrors.timedOutOnAwaitingForCommandCompletion() case CompletionResponse.NoStatusInResponse(_) => - errorFactories.TrackerErrors.noStatusInCompletionResponse() + errorFactories.SubmissionQueueErrors.noStatusInCompletionResponse() } } diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index 13d04cb4bb0d..fae51cacfd47 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -26,11 +26,11 @@ import io.grpc.{Metadata, StatusRuntimeException} import scalaz.syntax.tag._ import java.sql.{SQLNonTransientException, SQLTransientException} -import java.time.Duration +import java.time.{Duration, Instant} import scala.annotation.nowarn class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitcher) { - object TrackerErrors { + object SubmissionQueueErrors { def failedToEnqueueCommandSubmission(message: String)(t: Throwable)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): Status = @@ -50,22 +50,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch .asGrpcStatusFromContext, ) - def commandServiceIngressBufferFull()(implicit - contextualizedErrorLogger: ContextualizedErrorLogger - ): Status = - errorCodesVersionSwitcher.choose( - v1 = { - val status = io.grpc.Status.RESOURCE_EXHAUSTED - .withDescription("Ingress buffer is full") - val statusBuilder = GrpcStatus.toJavaBuilder(status) - GrpcStatus.buildStatus(Map.empty, statusBuilder) - }, - v2 = LedgerApiErrors.ParticipantBackpressure - .Rejection("Command service ingress buffer is full") - .asGrpcStatusFromContext, - ) - - def commandSubmissionQueueClosed()(implicit + def queueClosed(queueName: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): Status = errorCodesVersionSwitcher.choose( @@ -75,7 +60,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch GrpcStatus.buildStatus(Map.empty, statusBuilder) }, v2 = LedgerApiErrors.ServiceNotRunning - .Reject("Command service submission queue") + .Reject(queueName) .asGrpcStatusFromContext, ) @@ -117,6 +102,21 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch } } + def bufferFull(message: String)(implicit + contextualizedErrorLogger: ContextualizedErrorLogger + ): Status = + errorCodesVersionSwitcher.choose( + v1 = { + val status = io.grpc.Status.RESOURCE_EXHAUSTED + .withDescription("Ingress buffer is full") + val statusBuilder = GrpcStatus.toJavaBuilder(status) + GrpcStatus.buildStatus(Map.empty, statusBuilder) + }, + v2 = LedgerApiErrors.ParticipantBackpressure + .Rejection(message) + .asGrpcStatusFromContext, + ) + def sqlTransientException(exception: SQLTransientException)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): StatusRuntimeException = @@ -477,6 +477,21 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch .asGrpcError, ) + def missingLedgerConfig( + v1Status: RpcStatus, + message: String, + )(implicit + contextualizedErrorLogger: ContextualizedErrorLogger + ): com.google.rpc.status.Status = + errorCodesVersionSwitcher.choose( + v1 = v1Status, + v2 = GrpcStatus.toProto( + LedgerApiErrors.RequestValidation.NotFound.LedgerConfiguration + .RejectWithMessage(message) + .asGrpcStatusFromContext + ), + ) + def participantPrunedDataAccessed(message: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): StatusRuntimeException = @@ -686,6 +701,31 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch ), ) + def invalidLedgerTime( + v1Status: RpcStatus, + ledgerTime: Instant, + ledgerTimeLowerBound: Instant, + ledgerTimeUpperBound: Instant, + )(implicit + contextualizedErrorLogger: ContextualizedErrorLogger + ): com.google.rpc.status.Status = { + val details = + s"Ledger time $ledgerTime outside of range [$ledgerTimeLowerBound, $ledgerTimeUpperBound]" + errorCodesVersionSwitcher.choose( + v1 = v1Status, + v2 = GrpcStatus.toProto( + LedgerApiErrors.ConsistencyErrors.InvalidLedgerTime + .RejectEnriched( + details, + ledgerTime, + ledgerTimeLowerBound, + ledgerTimeUpperBound, + ) + .asGrpcStatusFromContext + ), + ) + } + def inconsistent(reason: String)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): com.google.rpc.status.Status = diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index 58fcfb35a0d5..9c7a566aa998 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -90,7 +90,7 @@ class ErrorFactoriesSpec "return failedToEnqueueCommandSubmission" in { val t = new Exception("message123") assertVersionedStatus( - _.TrackerErrors.failedToEnqueueCommandSubmission("some message")(t)( + _.SubmissionQueueErrors.failedToEnqueueCommandSubmission("some message")(t)( contextualizedErrorLogger ) )( @@ -107,18 +107,16 @@ class ErrorFactoriesSpec ) } - "return ingressBufferFull" in { + "return bufferFul" in { assertVersionedStatus( - _.TrackerErrors.commandServiceIngressBufferFull()( - contextualizedErrorLogger = contextualizedErrorLogger - ) + _.bufferFull("Some buffer is full")(contextualizedErrorLogger) )( v1_code = Code.RESOURCE_EXHAUSTED, v1_message = "Ingress buffer is full", v1_details = Seq(errorDetails), v2_code = Code.ABORTED, v2_message = - s"PARTICIPANT_BACKPRESSURE(2,$truncatedCorrelationId): The participant is overloaded: Command service ingress buffer is full", + s"PARTICIPANT_BACKPRESSURE(2,$truncatedCorrelationId): The participant is overloaded: Some buffer is full", v2_details = Seq[ErrorDetails.ErrorDetail]( ErrorDetails.ErrorInfoDetail("PARTICIPANT_BACKPRESSURE"), expectedCorrelationIdRequestInfo, @@ -129,7 +127,7 @@ class ErrorFactoriesSpec "return queueClosed" in { assertVersionedStatus( - _.TrackerErrors.commandSubmissionQueueClosed()( + _.SubmissionQueueErrors.queueClosed("Some service")( contextualizedErrorLogger = contextualizedErrorLogger ) )( @@ -138,7 +136,7 @@ class ErrorFactoriesSpec v1_details = Seq(errorDetails), v2_code = Code.UNAVAILABLE, v2_message = - s"SERVICE_NOT_RUNNING(1,$truncatedCorrelationId): Command service submission queue has been shut down.", + s"SERVICE_NOT_RUNNING(1,$truncatedCorrelationId): Some service has been shut down.", v2_details = Seq[ErrorDetails.ErrorDetail]( ErrorDetails.ErrorInfoDetail("SERVICE_NOT_RUNNING"), expectedCorrelationIdRequestInfo, @@ -149,7 +147,7 @@ class ErrorFactoriesSpec "return timeout" in { assertVersionedStatus( - _.TrackerErrors.timedOutOnAwaitingForCommandCompletion()( + _.SubmissionQueueErrors.timedOutOnAwaitingForCommandCompletion()( contextualizedErrorLogger = contextualizedErrorLogger ) )( @@ -168,7 +166,7 @@ class ErrorFactoriesSpec } "return noStatusInResponse" in { assertVersionedStatus( - _.TrackerErrors.noStatusInCompletionResponse()( + _.SubmissionQueueErrors.noStatusInCompletionResponse()( contextualizedErrorLogger = contextualizedErrorLogger ) )( diff --git a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala index 9b6db794ead7..2928a7f373b4 100644 --- a/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala +++ b/ledger/ledger-api-domain/src/main/scala/com/digitalasset/ledger/api/domain.scala @@ -210,6 +210,8 @@ object domain { /** The ledger time of the submission violated some constraint on the ledger time. */ final case class InvalidLedgerTime(description: String) extends RejectionReason + final case class LedgerConfigNotFound(description: String) extends RejectionReason + /** The transaction relied on contracts being active that were no longer * active at the point where it was sequenced. */ diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index c4bf36e7e38a..ea5d26fd9939 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -59,6 +59,7 @@ compile_deps = [ "//libs-scala/build-info", "//libs-scala/contextualized-logging", "//libs-scala/concurrent", + "//libs-scala/grpc-utils", "//libs-scala/logging-entries", "//libs-scala/ports", "//libs-scala/resources", diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/LedgerTimeAwareCommandExecutor.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/LedgerTimeAwareCommandExecutor.scala index 83de9ea86426..a0a33b110cd0 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/LedgerTimeAwareCommandExecutor.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/execution/LedgerTimeAwareCommandExecutor.scala @@ -86,7 +86,6 @@ private[apiserver] final class LedgerTimeAwareCommandExecutor( } } .recoverWith { - case MissingContracts(contracts) => if (retriesLeft > 0) { metrics.daml.execution.retry.mark() diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPartyManagementService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPartyManagementService.scala index a7fff73e87a8..12e2d75c1b08 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPartyManagementService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPartyManagementService.scala @@ -200,8 +200,6 @@ private[apiserver] object ApiPartyManagementService { PartyEntry.AllocationAccepted, ] { private val logger = ContextualizedLogger.get(getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] = ledgerEndService.currentLedgerEnd().map(Some(_)) @@ -227,7 +225,9 @@ private[apiserver] object ApiPartyManagementService { submissionId: Ref.SubmissionId ): PartialFunction[PartyEntry, StatusRuntimeException] = { case PartyEntry.AllocationRejected(`submissionId`, reason) => - errorFactories.invalidArgument(None)(reason) + errorFactories.invalidArgument(None)(reason)( + new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) + ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala index 4cec54c81756..656e88915a49 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala @@ -56,20 +56,23 @@ private[services] final class QueueBackedTracker( ) case QueueOfferResult.Failure(t) => toQueueSubmitFailure( - errorFactories.TrackerErrors.failedToEnqueueCommandSubmission("Failed to enqueue")(t) + errorFactories.SubmissionQueueErrors + .failedToEnqueueCommandSubmission("Failed to enqueue")(t) ) case QueueOfferResult.Dropped => - toQueueSubmitFailure(errorFactories.TrackerErrors.commandServiceIngressBufferFull()) + toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full")) case QueueOfferResult.QueueClosed => - toQueueSubmitFailure(errorFactories.TrackerErrors.commandSubmissionQueueClosed()) + toQueueSubmitFailure( + errorFactories.SubmissionQueueErrors.queueClosed("Command service queue") + ) } .recoverWith { case i: IllegalStateException if i.getMessage == "You have to wait for previous offer to be resolved to send another request" => - toQueueSubmitFailure(errorFactories.TrackerErrors.commandServiceIngressBufferFull()) + toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full")) case t => toQueueSubmitFailure( - errorFactories.TrackerErrors.failedToEnqueueCommandSubmission("Failed")(t) + errorFactories.SubmissionQueueErrors.failedToEnqueueCommandSubmission("Failed")(t) ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala index 95de25d785a4..c58f0f42094e 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/Conversions.scala @@ -6,6 +6,8 @@ package com.daml.platform.store import anorm.Column.nonNull import anorm._ import com.daml.error.ContextualizedErrorLogger +import com.daml.error.definitions.LedgerApiErrors +import com.daml.grpc.GrpcStatus import com.daml.ledger.api.domain import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.v2.Update.CommandRejected @@ -22,7 +24,6 @@ import scala.util.Try import java.io.BufferedReader import java.sql.{PreparedStatement, SQLNonTransientException, Types} import java.util.stream.Collectors - import scala.annotation.nowarn // TODO append-only: split this file on cleanup, and move anorm/db conversion related stuff to the right place @@ -426,6 +427,16 @@ private[platform] object Conversions { CommandRejected.FinalReason( errorFactories.CommandRejections.invalidLedgerTime(reason) ) + case domain.RejectionReason.LedgerConfigNotFound(description) => + // This rejection is returned only for V2 error codes already so we don't need to + // wrap it in ErrorFactories (see [[com.daml.platform.sandbox.stores.ledger.Rejection.NoLedgerConfiguration]] + CommandRejected.FinalReason( + GrpcStatus.toProto( + LedgerApiErrors.RequestValidation.NotFound.LedgerConfiguration + .RejectWithMessage(description) + .asGrpcStatusFromContext + ) + ) } } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala index bb0eac319003..19b93952a28a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ACSReader.scala @@ -4,9 +4,12 @@ package com.daml.platform.store.appendonlydao.events import akka.NotUsed -import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} import akka.stream.scaladsl.Source +import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} import com.daml.dec.DirectExecutionContext +import com.daml.error.definitions.LedgerApiErrors +import com.daml.error.definitions.LedgerApiErrors.ParticipantBackpressure +import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref import com.daml.logging.{ContextualizedLogger, LoggingContext} @@ -48,6 +51,9 @@ class FilterTableACSReader( )(implicit loggingContext: LoggingContext ): Source[Vector[EventsTable.Entry[Raw.FlatEvent]], NotUsed] = { + implicit val errorLogger: ContextualizedErrorLogger = + new DamlContextualizedErrorLogger(logger, loggingContext, None) + val allFilterParties = filter.keySet val tasks = filter.iterator .flatMap { @@ -163,7 +169,9 @@ private[events] object FilterTableACSReader { )( work: TASK => Future[(RESULT, Option[TASK])], initialTasks: Iterable[TASK], - ): Source[(TASK, RESULT), NotUsed] = if (initialTasks.isEmpty) Source.empty + )(implicit errorLogger: ContextualizedErrorLogger): Source[(TASK, RESULT), NotUsed] = if ( + initialTasks.isEmpty + ) Source.empty else { val (signalQueue, signalSource) = Source .queue[Unit](initialTasks.size) @@ -186,7 +194,7 @@ private[events] object FilterTableACSReader { class QueueState[TASK: Ordering]( signalQueue: BoundedSourceQueue[Unit], initialTasks: Iterable[TASK], - ) { + )(implicit errorLogger: ContextualizedErrorLogger) { private val priorityQueue = new mutable.PriorityQueue[TASK]()(implicitly[Ordering[TASK]].reverse) private var runningTasks: Int = 0 @@ -214,9 +222,19 @@ private[events] object FilterTableACSReader { signalQueue.offer(()) match { case QueueOfferResult.Enqueued => () case QueueOfferResult.Dropped => - throw new Exception("Cannot enqueue signal: dropped. Queue bufferSize not big enough.") - case QueueOfferResult.Failure(_) => () // stream already failed - case QueueOfferResult.QueueClosed => () // stream already closed + throw ParticipantBackpressure + .Rejection( + "Cannot enqueue signal: dropped. ACS reader queue bufferSize not big enough." + ) + .asGrpcError + case QueueOfferResult.Failure(f) => + throw LedgerApiErrors.InternalError + .Buffer("Failed to enqueue in ACS reader queue state: Internal failure", Some(f)) + .asGrpcError + case QueueOfferResult.QueueClosed => + throw LedgerApiErrors.InternalError + .Buffer("Failed to enqueue in ACS reader queue state: Queue closed", None) + .asGrpcError } } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala index fb381f15b23a..6efb6e701ebd 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractsReader.scala @@ -3,18 +3,20 @@ package com.daml.platform.store.appendonlydao.events -import java.io.ByteArrayInputStream import com.codahale.metrics.Timer +import com.daml.error.definitions.IndexErrors +import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} import com.daml.lf.data.Time.Timestamp -import com.daml.logging.LoggingContext +import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.{Metrics, Timed} -import com.daml.platform.store.interfaces.LedgerDaoContractsReader._ -import com.daml.platform.store.appendonlydao.events.ContractsReader._ import com.daml.platform.store.appendonlydao.DbDispatcher +import com.daml.platform.store.appendonlydao.events.ContractsReader._ import com.daml.platform.store.backend.ContractStorageBackend import com.daml.platform.store.interfaces.LedgerDaoContractsReader +import com.daml.platform.store.interfaces.LedgerDaoContractsReader._ import com.daml.platform.store.serialization.{Compression, ValueSerializer} +import java.io.ByteArrayInputStream import scala.concurrent.{ExecutionContext, Future} private[appendonlydao] sealed class ContractsReader( @@ -23,6 +25,7 @@ private[appendonlydao] sealed class ContractsReader( metrics: Metrics, )(implicit ec: ExecutionContext) extends LedgerDaoContractsReader { + private val logger = ContextualizedLogger.get(getClass) override def lookupMaximumLedgerTime(ids: Set[ContractId])(implicit loggingContext: LoggingContext @@ -54,7 +57,9 @@ private[appendonlydao] sealed class ContractsReader( override def lookupContractState(contractId: ContractId, before: Long)(implicit loggingContext: LoggingContext - ): Future[Option[ContractState]] = + ): Future[Option[ContractState]] = { + implicit val errorLogger: ContextualizedErrorLogger = + new DamlContextualizedErrorLogger(logger, loggingContext, None) Timed.future( metrics.daml.index.db.lookupActiveContract, dispatcher @@ -85,9 +90,13 @@ private[appendonlydao] sealed class ContractsReader( ), ) case raw if raw.eventKind == 20 => ArchivedContract(raw.flatEventWitnesses) - case raw => throw ContractsReaderError(s"Unexpected event kind ${raw.eventKind}") + case raw => + throw throw IndexErrors.DatabaseErrors.ResultSetError + .Reject(s"Unexpected event kind ${raw.eventKind}") + .asGrpcError }), ) + } /** Lookup of a contract in the case the contract value is not already known */ override def lookupActiveContractAndLoadArgument( @@ -208,8 +217,8 @@ private[appendonlydao] object ContractsReader { agreementText = "", ) - private def assertPresent[T](in: Option[T])(err: String): T = - in.getOrElse(throw ContractsReaderError(err)) - - case class ContractsReaderError(msg: String) extends RuntimeException(msg) + private def assertPresent[T](in: Option[T])(err: String)(implicit + errorLogger: ContextualizedErrorLogger + ): T = + in.getOrElse(throw IndexErrors.DatabaseErrors.ResultSetError.Reject(err).asGrpcError) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTable.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTable.scala index 9f3004ac9505..247c02293c82 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTable.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/EventsTable.scala @@ -4,6 +4,8 @@ package com.daml.platform.store.appendonlydao.events import com.daml.api.util.TimestampConversion +import com.daml.error.ContextualizedErrorLogger +import com.daml.error.definitions.IndexErrors import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.event.Event import com.daml.ledger.api.v1.transaction.{ @@ -72,6 +74,8 @@ object EventsTable { def toGetActiveContractsResponse( events: Vector[Entry[Event]] + )(implicit + contextualizedErrorLogger: ContextualizedErrorLogger ): Vector[GetActiveContractsResponse] = { events.map { case entry if entry.event.isCreated => @@ -81,9 +85,11 @@ object EventsTable { activeContracts = Seq(entry.event.getCreated), ) case entry => - throw new IllegalStateException( - s"Non-create event ${entry.event.eventId} fetched as part of the active contracts" - ) + throw IndexErrors.DatabaseErrors.ResultSetError + .Reject( + s"Non-create event ${entry.event.eventId} fetched as part of the active contracts" + ) + .asGrpcError } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala index 6fd1e233632f..3063ee13e723 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionsReader.scala @@ -4,10 +4,10 @@ package com.daml.platform.store.appendonlydao.events import java.sql.Connection - import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.{Done, NotUsed} +import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.TraceIdentifiers import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse import com.daml.ledger.api.v1.event.Event @@ -361,6 +361,7 @@ private[appendonlydao] final class TransactionsReader( filter: FilterRelation, verbose: Boolean, )(implicit loggingContext: LoggingContext): Source[GetActiveContractsResponse, NotUsed] = { + val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None) val span = Telemetry.Transactions.createSpan(activeAt)(qualifiedNameOfCurrentFunc) logger.debug(s"getActiveContracts($activeAt, $filter, $verbose)") @@ -380,7 +381,7 @@ private[appendonlydao] final class TransactionsReader( timer = dbMetrics.getActiveContracts.translationTimer, ) } - .mapConcat(EventsTable.Entry.toGetActiveContractsResponse) + .mapConcat(EventsTable.Entry.toGetActiveContractsResponse(_)(contextualizedErrorLogger)) .buffer(outputStreamBufferSize, OverflowStrategy.backpressure) .wireTap(response => { Spans.addEventToSpan( diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/ACSReaderSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/ACSReaderSpec.scala index e2fd70583bc6..2a78333e8edd 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/ACSReaderSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/ACSReaderSpec.scala @@ -7,6 +7,7 @@ import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.Sink import com.codahale.metrics.MetricRegistry +import com.daml.error.{ContextualizedErrorLogger, NoLogging} import com.daml.logging.LoggingContext import com.daml.metrics.Metrics import org.scalatest.BeforeAndAfterAll @@ -14,11 +15,11 @@ import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers import scala.collection.immutable -import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future, Promise} class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { - + private implicit val errorLogger: ContextualizedErrorLogger = NoLogging private val actorSystem = ActorSystem() private implicit val materializer: Materializer = Materializer(actorSystem) private implicit val ec: ExecutionContext = actorSystem.dispatcher @@ -115,7 +116,7 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { )( work = simple4Worker, initialTasks = simple4Task, - )(Ordering.by[(Int, Int), Int](_._2)) + )(Ordering.by[(Int, Int), Int](_._2), errorLogger) .runWith(Sink.collection) .map( _.map(_._2) shouldBe simple4WorkerExpectedOrderedResult @@ -130,7 +131,7 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { )( work = simple4Worker, initialTasks = simple4Task, - )(Ordering.by[(Int, Int), Int](_._2)) + )(Ordering.by[(Int, Int), Int](_._2), errorLogger) .runWith(Sink.collection) .map( _.map(_._2).toSet shouldBe simple4WorkerExpectedOrderedResult.toSet @@ -145,7 +146,7 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { )( work = simple4Worker, initialTasks = simple4Task, - )(Ordering.by[(Int, Int), Int](_._2)) + )(Ordering.by[(Int, Int), Int](_._2), errorLogger) .runWith(Sink.collection) .map( _.map(_._2).toSet shouldBe simple4WorkerExpectedOrderedResult.toSet @@ -216,7 +217,7 @@ class ACSReaderSpec extends AsyncFlatSpec with Matchers with BeforeAndAfterAll { )( work = puppetWorker, initialTasks = List(puppetTask1, puppetTask6, puppetTask3, puppetTask4, puppetTask2), - )(Ordering.by[PuppetTask, Int](_.i)) + )(Ordering.by[PuppetTask, Int](_.i), errorLogger) .runWith(Sink.collection) info("As stream processing starts") for { diff --git a/ledger/sandbox-classic/BUILD.bazel b/ledger/sandbox-classic/BUILD.bazel index 0ff6bc36577c..d85d9671377a 100644 --- a/ledger/sandbox-classic/BUILD.bazel +++ b/ledger/sandbox-classic/BUILD.bazel @@ -80,12 +80,14 @@ alias( "//libs-scala/build-info", "//libs-scala/concurrent", "//libs-scala/contextualized-logging", + "//libs-scala/grpc-utils", "//libs-scala/logging-entries", "//libs-scala/ports", "//libs-scala/resources", "//libs-scala/resources-akka", "//libs-scala/resources-grpc", "@maven//:ch_qos_logback_logback_classic", + "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:com_typesafe_config", "@maven//:io_dropwizard_metrics_metrics_core", "@maven//:org_slf4j_slf4j_api", @@ -151,6 +153,7 @@ da_scala_library( "//ledger-api/rs-grpc-bridge", "//ledger-api/testing-utils", "//ledger/caching", + "//ledger/error", "//ledger/ledger-api-auth", "//ledger/ledger-api-auth-client", "//ledger/ledger-api-client", diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/SandboxIndexAndWriteService.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/SandboxIndexAndWriteService.scala index 426b19ea6f7d..791873667a6d 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/SandboxIndexAndWriteService.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/SandboxIndexAndWriteService.scala @@ -130,7 +130,7 @@ private[sandbox] object SandboxIndexAndWriteService { templateStore, ledgerEntries, engine, - ErrorFactories(new ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes)), + new ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes), ) owner( ledger = MeteredLedger(ledger, metrics), diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/Rejection.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/Rejection.scala index c1a9864898a7..843316fd9a77 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/Rejection.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/Rejection.scala @@ -3,18 +3,24 @@ package com.daml.platform.sandbox.stores.ledger +import com.daml.error.{ContextualizedErrorLogger, ErrorCodesVersionSwitcher} import com.daml.ledger.api.domain import com.daml.ledger.configuration.LedgerTimeModel import com.daml.ledger.participant.state.{v2 => state} +import com.daml.platform.server.api.validation.ErrorFactories import com.google.protobuf.any.{Any => AnyProto} import com.google.rpc.error_details.ErrorInfo import com.google.rpc.status.{Status => RpcStatus} import io.grpc.Status sealed trait Rejection { - def toDomainRejectionReason: domain.RejectionReason + def toDomainRejectionReason( + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher + ): domain.RejectionReason - def toStateRejectionReason: state.Update.CommandRejected.RejectionReasonTemplate + def toStateRejectionReason(errorFactories: ErrorFactories)(implicit + errorLogger: ContextualizedErrorLogger + ): state.Update.CommandRejected.RejectionReasonTemplate } object Rejection { @@ -24,49 +30,70 @@ object Rejection { private val description: String = "No ledger configuration available, cannot validate ledger time" - override lazy val toDomainRejectionReason: domain.RejectionReason = - domain.RejectionReason.InvalidLedgerTime(description) + override def toDomainRejectionReason( + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher + ): domain.RejectionReason = + errorCodesVersionSwitcher.choose[domain.RejectionReason]( + // The V1 rejection reason is not precise enough but changing it in-place involves breaking compatibility. + // Instead use the error codes version switcher to correct the rejection from now on + v1 = domain.RejectionReason.InvalidLedgerTime(description), + v2 = domain.RejectionReason.LedgerConfigNotFound(description), + ) - override lazy val toStateRejectionReason: state.Update.CommandRejected.RejectionReasonTemplate = + override def toStateRejectionReason(errorFactories: ErrorFactories)(implicit + errorLogger: ContextualizedErrorLogger + ): state.Update.CommandRejected.RejectionReasonTemplate = state.Update.CommandRejected.FinalReason( - RpcStatus.of( - code = Status.Code.ABORTED.value(), - message = description, - details = Seq( - AnyProto.pack( - ErrorInfo.of( - reason = "NO_LEDGER_CONFIGURATION", - domain = ErrorDomain, - metadata = Map.empty, + errorFactories.missingLedgerConfig( + RpcStatus.of( + code = Status.Code.ABORTED.value(), + message = description, + details = Seq( + AnyProto.pack( + ErrorInfo.of( + reason = "NO_LEDGER_CONFIGURATION", + domain = ErrorDomain, + metadata = Map.empty, + ) ) - ) + ), ), + "Cannot validate ledger time", ) ) } final case class InvalidLedgerTime(outOfRange: LedgerTimeModel.OutOfRange) extends Rejection { - override lazy val toDomainRejectionReason: domain.RejectionReason = + override def toDomainRejectionReason( + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher + ): domain.RejectionReason = domain.RejectionReason.InvalidLedgerTime(outOfRange.message) - override lazy val toStateRejectionReason: state.Update.CommandRejected.RejectionReasonTemplate = + override def toStateRejectionReason(errorFactories: ErrorFactories)(implicit + errorLogger: ContextualizedErrorLogger + ): state.Update.CommandRejected.RejectionReasonTemplate = state.Update.CommandRejected.FinalReason( - RpcStatus.of( - code = Status.Code.ABORTED.value(), - message = outOfRange.message, - details = Seq( - AnyProto.pack( - ErrorInfo.of( - reason = "INVALID_LEDGER_TIME", - domain = ErrorDomain, - metadata = Map( - "ledgerTime" -> outOfRange.ledgerTime.toString, - "lowerBound" -> outOfRange.lowerBound.toString, - "upperBound" -> outOfRange.upperBound.toString, - ), + errorFactories.CommandRejections.invalidLedgerTime( + RpcStatus.of( + code = Status.Code.ABORTED.value(), + message = outOfRange.message, + details = Seq( + AnyProto.pack( + ErrorInfo.of( + reason = "INVALID_LEDGER_TIME", + domain = ErrorDomain, + metadata = Map( + "ledgerTime" -> outOfRange.ledgerTime.toString, + "lowerBound" -> outOfRange.lowerBound.toString, + "upperBound" -> outOfRange.upperBound.toString, + ), + ) ) - ) + ), ), + outOfRange.ledgerTime.toInstant, + outOfRange.lowerBound.toInstant, + outOfRange.upperBound.toInstant, ) ) } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala index cae36fdff0b8..62185771f083 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/inmemory/InMemoryLedger.scala @@ -9,7 +9,7 @@ import akka.NotUsed import akka.stream.scaladsl.Source import com.daml.api.util.TimeProvider import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.error.DamlContextualizedErrorLogger +import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher} import com.daml.ledger.api.domain.{ ApplicationId, CommandId, @@ -88,10 +88,11 @@ private[sandbox] final class InMemoryLedger( packageStoreInit: InMemoryPackageStore, ledgerEntries: ImmArray[LedgerEntryOrBump], engine: Engine, - errorFactories: ErrorFactories, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, ) extends Ledger { private val enricher = new ValueEnricher(engine) + private val errorFactories = ErrorFactories(errorCodesVersionSwitcher) private def consumeEnricherResult[V](res: Result[V]): V = { LfEngineToApi.assertOrRuntimeEx( @@ -393,7 +394,8 @@ private[sandbox] final class InMemoryLedger( val recordTime = timeProvider.getCurrentTimestamp checkTimeModel(ledgerTime, recordTime) .fold( - rejection => handleError(submitterInfo, rejection.toDomainRejectionReason), + rejection => + handleError(submitterInfo, rejection.toDomainRejectionReason(errorCodesVersionSwitcher)), _ => { val (committedTransaction, disclosureForIndex, divulgence) = Ledger @@ -762,5 +764,4 @@ private[sandbox] object InMemoryLedger { deduplicationKey: String, deduplicateUntil: Instant, ) - } diff --git a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala index 7ed0db563ddf..e2e8537fd83b 100644 --- a/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala +++ b/ledger/sandbox-classic/src/main/scala/platform/sandbox/stores/ledger/sql/SqlLedger.scala @@ -3,8 +3,6 @@ package com.daml.platform.sandbox.stores.ledger.sql -import java.util.concurrent.atomic.AtomicReference - import akka.Done import akka.stream.QueueOfferResult.{Dropped, Enqueued, QueueClosed} import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete} @@ -12,6 +10,8 @@ import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} import com.daml.api.util.TimeProvider import com.daml.daml_lf_dev.DamlLf.Archive import com.daml.dec.{DirectExecutionContext => DEC} +import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger} +import com.daml.grpc.GrpcStatus import com.daml.ledger.api.domain import com.daml.ledger.api.domain.{LedgerId, ParticipantId, PartyDetails} import com.daml.ledger.api.health.HealthStatus @@ -39,7 +39,6 @@ import com.daml.platform.sandbox.stores.ledger.sql.SqlLedger._ import com.daml.platform.sandbox.stores.ledger.{Ledger, Rejection, SandboxOffset} import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.appendonlydao.events.CompressionStrategy -import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore} import com.daml.platform.store.appendonlydao.{ DbDispatcher, LedgerDao, @@ -47,6 +46,7 @@ import com.daml.platform.store.appendonlydao.{ SequentialWriteDao, } import com.daml.platform.store.backend.StorageBackendFactory +import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore} import com.daml.platform.store.entries.{LedgerEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.interning.{ InternizingStringInterningView, @@ -54,9 +54,9 @@ import com.daml.platform.store.interning.{ StringInterningView, } import com.daml.platform.store.{BaseLedger, DbType, FlywayMigrations, LfValueTranslationCache} -import com.google.rpc.status.{Status => RpcStatus} -import io.grpc.Status +import io.grpc.protobuf +import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.Queue import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} @@ -168,6 +168,7 @@ private[sandbox] object SqlLedger { contractStore, dispatcher, persistenceQueue, + errorFactories, ).acquire() } yield ledger @@ -345,6 +346,7 @@ private[sandbox] object SqlLedger { contractStore: ContractStore, dispatcher: Dispatcher[Offset], persistenceQueue: PersistenceQueue, + errorFactories: ErrorFactories, ): ResourceOwner[SqlLedger] = ResourceOwner.forCloseable(() => new SqlLedger( @@ -356,6 +358,7 @@ private[sandbox] object SqlLedger { timeProvider, persistenceQueue, transactionCommitter, + errorFactories, ) ) @@ -419,6 +422,7 @@ private final class SqlLedger( timeProvider: TimeProvider, persistenceQueue: PersistenceQueue, transactionCommitter: TransactionCommitter, + errorFactories: ErrorFactories, ) extends BaseLedger( ledgerId, ledgerDao, @@ -458,7 +462,10 @@ private final class SqlLedger( submitterInfo: state.SubmitterInfo, transactionMeta: state.TransactionMeta, transaction: SubmittedTransaction, - )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = + )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = { + val errorLogger = + new DamlContextualizedErrorLogger(logger, loggingContext, submitterInfo.submissionId) + enqueue { offset => val transactionId = offset.toApiString @@ -472,7 +479,7 @@ private final class SqlLedger( completionInfo = Some(submitterInfo.toCompletionInfo), recordTime = recordTime, offset = offset, - reason = reason.toStateRejectionReason, + reason = reason.toStateRejectionReason(errorFactories)(errorLogger), ), _ => { val divulgedContracts = Nil @@ -498,10 +505,12 @@ private final class SqlLedger( logger.error(s"Failed to persist entry with offset: ${offset.toApiString}", t) } )(DEC) + }(errorLogger) + } - } - - private def enqueue(persist: Offset => Future[Unit]): Future[state.SubmissionResult] = + private def enqueue( + persist: Offset => Future[Unit] + )(implicit errorLogger: ContextualizedErrorLogger): Future[state.SubmissionResult] = persistenceQueue .offer(persist) .transform { @@ -510,17 +519,27 @@ private final class SqlLedger( case Success(Dropped) => Success( state.SubmissionResult.SynchronousError( - RpcStatus.of( - Status.Code.RESOURCE_EXHAUSTED.value(), - "System is overloaded, please try again later", - Seq.empty, + GrpcStatus.toProto( + errorFactories.bufferFull("The submission ingress buffer is full") ) ) ) case Success(QueueClosed) => - Failure(new IllegalStateException("queue closed")) - case Success(QueueOfferResult.Failure(e)) => Failure(e) - case Failure(f) => Failure(f) + val failedStatus = + errorFactories.SubmissionQueueErrors.queueClosed("SQL Ledger submission queue") + Failure(protobuf.StatusProto.toStatusRuntimeException(failedStatus)) + case Success(QueueOfferResult.Failure(e)) => + val failedStatus = + errorFactories.SubmissionQueueErrors.failedToEnqueueCommandSubmission( + "Failed to enqueue submission" + )(e) + Failure(protobuf.StatusProto.toStatusRuntimeException(failedStatus)) + case Failure(f) => + val failedStatus = + errorFactories.SubmissionQueueErrors.failedToEnqueueCommandSubmission( + "Failed to enqueue submission" + )(f) + Failure(protobuf.StatusProto.toStatusRuntimeException(failedStatus)) }(DEC) override def publishPartyAllocation( @@ -528,6 +547,7 @@ private final class SqlLedger( party: Ref.Party, displayName: Option[String], )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = { + val errorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) enqueue { offset => ledgerDao .getParties(Seq(party)) @@ -558,7 +578,7 @@ private final class SqlLedger( ) Future.unit }(DEC) - } + }(errorLogger) } override def uploadPackages( @@ -567,6 +587,8 @@ private final class SqlLedger( sourceDescription: Option[String], payload: List[Archive], )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = { + val errorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) + val packages = payload.map(archive => (archive, PackageDetails(archive.getPayload.size().toLong, knownSince, sourceDescription)) ) @@ -585,14 +607,15 @@ private final class SqlLedger( logger.error(s"Failed to persist packages with offset: ${offset.toApiString}", t) () }(DEC) - } + }(errorLogger) } override def publishConfiguration( maxRecordTime: Time.Timestamp, submissionId: String, config: Configuration, - )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = + )(implicit loggingContext: LoggingContext): Future[state.SubmissionResult] = { + val errorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) enqueue { offset => val recordTime = timeProvider.getCurrentTimestamp @@ -636,5 +659,6 @@ private final class SqlLedger( logger.error(s"Failed to persist configuration with offset: $offset", t) () }(DEC) - } + }(errorLogger) + } } diff --git a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/LedgerResource.scala b/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/LedgerResource.scala index c1e331fd1add..27ff1e8f8c66 100644 --- a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/LedgerResource.scala +++ b/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/LedgerResource.scala @@ -6,6 +6,7 @@ package com.daml.platform.sandbox import akka.stream.Materializer import com.codahale.metrics.MetricRegistry import com.daml.api.util.TimeProvider +import com.daml.error.ErrorCodesVersionSwitcher import com.daml.ledger.api.domain import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.testing.utils.{OwnedResource, Resource} @@ -37,7 +38,7 @@ private[sandbox] object LedgerResource { def inMemory( ledgerId: LedgerId, timeProvider: TimeProvider, - errorFactories: ErrorFactories, + errorCodesVersionSwitcher: ErrorCodesVersionSwitcher, acs: InMemoryActiveLedgerState = InMemoryActiveLedgerState.empty, packages: InMemoryPackageStore = InMemoryPackageStore.empty, entries: ImmArray[LedgerEntryOrBump] = ImmArray.Empty, @@ -52,7 +53,7 @@ private[sandbox] object LedgerResource { packageStoreInit = packages, ledgerEntries = entries, engine = new Engine(), - errorFactories, + errorCodesVersionSwitcher, ) ) ) diff --git a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/stores/ledger/RejectionSpec.scala b/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/stores/ledger/RejectionSpec.scala index 2aa69afc2147..c2567d4bd96a 100644 --- a/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/stores/ledger/RejectionSpec.scala +++ b/ledger/sandbox-classic/src/test/lib/scala/platform/sandbox/stores/ledger/RejectionSpec.scala @@ -3,20 +3,34 @@ package com.daml.platform.sandbox.stores.ledger +import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher} import com.daml.ledger.configuration.LedgerTimeModel import com.daml.ledger.participant.state.{v2 => state} import com.daml.lf.data.Time.Timestamp +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.server.api.validation.ErrorFactories import com.google.protobuf.any.{Any => AnyProto} -import com.google.rpc.error_details.ErrorInfo +import com.google.rpc.error_details.{ErrorInfo, RequestInfo} import com.google.rpc.status.{Status => StatusProto} import io.grpc.Status import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class RejectionSpec extends AnyWordSpec with Matchers { + private val submissionId = "12345678" + private implicit val contextualizedErrorLogger: DamlContextualizedErrorLogger = + new DamlContextualizedErrorLogger( + ContextualizedLogger.get(getClass), + LoggingContext.ForTesting, + Some(submissionId), + ) + + private val errorFactoriesV1 = ErrorFactories(new ErrorCodesVersionSwitcher(false)) + private val errorFactoriesV2 = ErrorFactories(new ErrorCodesVersionSwitcher(true)) + "Rejection.NoLedgerConfiguration" should { - "convert to a state rejection reason" in { - Rejection.NoLedgerConfiguration.toStateRejectionReason should be( + "convert to a state rejection reason (V1 errors)" in { + Rejection.NoLedgerConfiguration.toStateRejectionReason(errorFactoriesV1) should be( state.Update.CommandRejected.FinalReason( StatusProto.of( code = Status.Code.ABORTED.value, @@ -34,16 +48,35 @@ class RejectionSpec extends AnyWordSpec with Matchers { ) ) } + + "convert to a state rejection reason (V2 errors)" in { + val actualRejectionReason = + Rejection.NoLedgerConfiguration.toStateRejectionReason(errorFactoriesV2) + actualRejectionReason.code shouldBe Status.Code.NOT_FOUND.value() + actualRejectionReason.message shouldBe "LEDGER_CONFIGURATION_NOT_FOUND(11,12345678): The ledger configuration could not be retrieved: Cannot validate ledger time." + + val (errorInfo, requestInfo) = extractDetails(actualRejectionReason.status.details) + errorInfo shouldBe com.google.rpc.error_details.ErrorInfo( + reason = "LEDGER_CONFIGURATION_NOT_FOUND", + metadata = Map( + "message" -> "Cannot validate ledger time", + "category" -> "11", + "definite_answer" -> "false", + ), + ) + + requestInfo shouldBe RequestInfo(requestId = submissionId) + } } "Rejection.InvalidLedgerTime" should { - "convert to a state rejection reason" in { - val ledgerTime = Timestamp.assertFromString("2021-07-20T09:30:00Z") - val lowerBound = Timestamp.assertFromString("2021-07-20T09:00:00Z") - val upperBound = Timestamp.assertFromString("2021-07-20T09:10:00Z") - val outOfRange = LedgerTimeModel.OutOfRange(ledgerTime, lowerBound, upperBound) + val ledgerTime = Timestamp.assertFromString("2021-07-20T09:30:00Z") + val lowerBound = Timestamp.assertFromString("2021-07-20T09:00:00Z") + val upperBound = Timestamp.assertFromString("2021-07-20T09:10:00Z") + val outOfRange = LedgerTimeModel.OutOfRange(ledgerTime, lowerBound, upperBound) - Rejection.InvalidLedgerTime(outOfRange).toStateRejectionReason should be( + "convert to a state rejection reason (V1 errors)" in { + Rejection.InvalidLedgerTime(outOfRange).toStateRejectionReason(errorFactoriesV1) should be( state.Update.CommandRejected.FinalReason( StatusProto.of( code = Status.Code.ABORTED.value, @@ -65,5 +98,40 @@ class RejectionSpec extends AnyWordSpec with Matchers { ) ) } + + "convert to a state rejection reason (V2 errors)" in { + val actualRejectionReason = + Rejection.InvalidLedgerTime(outOfRange).toStateRejectionReason(errorFactoriesV2) + + actualRejectionReason.code shouldBe Status.Code.FAILED_PRECONDITION.value() + actualRejectionReason.message shouldBe "INVALID_LEDGER_TIME(9,12345678): Invalid ledger time: Ledger time 2021-07-20T09:30:00Z outside of range [2021-07-20T09:00:00Z, 2021-07-20T09:10:00Z]" + + val (errorInfo, requestInfo) = extractDetails(actualRejectionReason.status.details) + + errorInfo shouldBe com.google.rpc.error_details.ErrorInfo( + reason = "INVALID_LEDGER_TIME", + metadata = Map( + "ledger_time" -> "2021-07-20T09:30:00Z", + "ledger_time_lower_bound" -> "2021-07-20T09:00:00Z", + "ledger_time_upper_bound" -> "2021-07-20T09:10:00Z", + "category" -> "9", + "message" -> "Ledger time 2021-07-20T09:30:00Z outside of range [2021-07-20T09:00:00Z, 2021-07-20T09:10:00Z]", + "definite_answer" -> "false", + ), + ) + + requestInfo shouldBe RequestInfo(requestId = submissionId) + } + } + + def extractDetails(details: Seq[AnyProto]): (ErrorInfo, RequestInfo) = { + details should have size 2L + details match { + case d1 :: d2 :: Nil if d1.is(ErrorInfo) && d2.is[RequestInfo] => + d1.unpack[ErrorInfo] -> d2.unpack[RequestInfo] + case d1 :: d2 :: Nil if d2.is(ErrorInfo) && d1.is[RequestInfo] => + d2.unpack[ErrorInfo] -> d1.unpack[RequestInfo] + case details => fail(s"Unexpected details: $details") + } } } diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala index aacb82e38b6b..ba30eb2dcec8 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/TransactionTimeModelComplianceIT.scala @@ -26,7 +26,6 @@ import com.daml.lf.transaction.test.TransactionBuilder import com.daml.platform.sandbox.stores.ledger.TransactionTimeModelComplianceIT._ import com.daml.platform.sandbox.{LedgerResource, MetricsAround} import com.daml.platform.server.api.validation.ErrorFactories -import org.mockito.MockitoSugar import org.scalatest.concurrent.{AsyncTimeLimitedTests, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.time.Span @@ -45,8 +44,7 @@ class TransactionTimeModelComplianceIT with ScalaFutures with Matchers with OptionValues - with MetricsAround - with MockitoSugar { + with MetricsAround { override def timeLimit: Span = scaled(60.seconds) @@ -55,15 +53,18 @@ class TransactionTimeModelComplianceIT Set(BackendType.InMemory, BackendType.Postgres) override protected def constructResource(index: Int, fixtureId: BackendType): Resource[Ledger] = { + val errorCodesVersionSwitcher = new ErrorCodesVersionSwitcher( + enableSelfServiceErrorCodes = true + ) val errorFactories = ErrorFactories( - new ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes = false) + errorCodesVersionSwitcher ) implicit val resourceContext: ResourceContext = ResourceContext(system.dispatcher) fixtureId match { case BackendType.InMemory => - LedgerResource.inMemory(ledgerId, timeProvider, errorFactories) + LedgerResource.inMemory(ledgerId, timeProvider, errorCodesVersionSwitcher) case BackendType.Postgres => - LedgerResource.postgres(getClass, ledgerId, timeProvider, metrics, mock[ErrorFactories]) + LedgerResource.postgres(getClass, ledgerId, timeProvider, metrics, errorFactories) } } @@ -139,9 +140,11 @@ class TransactionTimeModelComplianceIT } } - private[this] def expectInvalidLedgerTime(completion: Completion): Assertion = { - completion.status.value.code shouldBe aborted - } + private[this] def expectInvalidLedgerTime(completion: Completion): Assertion = + completion.status.value.code shouldBe failedPrecondition + + private[this] def expectLedgerConfigNotFound(completion: Completion): Assertion = + completion.status.value.code shouldBe notFound private[this] def expectValidTx(completion: Completion): Assertion = completion.status.value.code shouldBe ok @@ -158,7 +161,7 @@ class TransactionTimeModelComplianceIT configuration = null, ) } yield { - expectInvalidLedgerTime(r1) + expectLedgerConfigNotFound(r1) } } "accept transactions with ledger time that is right" in allFixtures { ledger => @@ -242,7 +245,6 @@ class TransactionTimeModelComplianceIT } } } - } object TransactionTimeModelComplianceIT { @@ -263,6 +265,6 @@ object TransactionTimeModelComplianceIT { } private val ok = io.grpc.Status.Code.OK.value() - private val aborted = io.grpc.Status.Code.ABORTED.value() - + private val failedPrecondition = io.grpc.Status.Code.FAILED_PRECONDITION.value() + private val notFound = io.grpc.Status.Code.NOT_FOUND.value() } diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala index ceefedd3012b..2ef42767fdb7 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpec.scala @@ -3,14 +3,12 @@ package com.daml.platform.sandbox.stores.ledger.sql -import java.io.File -import java.time.{Duration, Instant} - import akka.stream.scaladsl.Sink import ch.qos.logback.classic.Level import com.daml.api.util.TimeProvider import com.daml.bazeltools.BazelRunfiles.rlocation import com.daml.daml_lf_dev.DamlLf +import com.daml.error.ErrorCodesVersionSwitcher import com.daml.ledger.api.domain.{LedgerId, ParticipantId} import com.daml.ledger.api.health.Healthy import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll @@ -41,8 +39,6 @@ import com.daml.platform.store.{IndexMetadata, LfValueTranslationCache} import com.daml.platform.testing.LogCollector import com.daml.testing.postgresql.PostgresAroundEach import com.daml.timer.RetryStrategy -import com.google.protobuf.any.{Any => AnyProto} -import com.google.rpc.error_details.ErrorInfo import io.grpc.Status import org.mockito.MockitoSugar import org.scalatest.Inside @@ -51,6 +47,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.time.{Minute, Seconds, Span} import org.scalatest.wordspec.AsyncWordSpec +import java.io.File +import java.time.{Duration, Instant} import scala.collection.mutable import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, Future} @@ -293,6 +291,7 @@ final class SqlLedgerSpec "reject a transaction if no configuration is found" in { val now = Time.Timestamp.now() + val submissionId = "12345678" for { sqlLedger <- createSqlLedger() start = sqlLedger.ledgerEnd() @@ -302,7 +301,7 @@ final class SqlLedgerSpec applicationId = applicationId, commandId = commandId1, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)), - submissionId = None, + submissionId = Some(Ref.SubmissionId.assertFromString(submissionId)), ledgerConfiguration = Configuration.reasonableInitialConfiguration, ), transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now), @@ -321,20 +320,9 @@ final class SqlLedgerSpec completion._1 should be > start inside(completion._2.completions) { case Seq(Completion(`commandId1`, Some(status), _, _, _, _, _)) => - status.code should be(Status.Code.ABORTED.value) + status.code should be(Status.Code.NOT_FOUND.value) status.message should be( - "No ledger configuration available, cannot validate ledger time" - ) - status.details should be( - Seq( - AnyProto.pack( - ErrorInfo.of( - reason = "NO_LEDGER_CONFIGURATION", - domain = "com.daml.on.sql", - metadata = Map.empty, - ) - ) - ) + s"LEDGER_CONFIGURATION_NOT_FOUND(11,$submissionId): The ledger configuration could not be retrieved: Cannot validate ledger time." ) } } @@ -351,6 +339,8 @@ final class SqlLedgerSpec timeModel = LedgerTimeModel.reasonableDefault.copy(minSkew = minSkew, maxSkew = maxSkew) ) + val submissionId = "12345678" + val transactionLedgerEffectiveTime = now.add(Duration.ofMinutes(5)) for { sqlLedger <- createSqlLedger( @@ -371,7 +361,7 @@ final class SqlLedgerSpec applicationId = applicationId, commandId = commandId1, deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)), - submissionId = None, + submissionId = Some(Ref.SubmissionId.assertFromString(submissionId)), ledgerConfiguration = configuration, ), transactionMeta = @@ -391,24 +381,9 @@ final class SqlLedgerSpec completion._1 should be > start inside(completion._2.completions) { case Seq(Completion(`commandId1`, Some(status), _, _, _, _, _)) => - status.code should be(Status.Code.ABORTED.value) + status.code should be(Status.Code.FAILED_PRECONDITION.value) status.message should be( - s"Ledger time 2021-09-01T18:05:00Z outside of range [2021-09-01T17:59:50Z, 2021-09-01T18:00:30Z]" - ) - status.details should be( - Seq( - AnyProto.pack( - ErrorInfo.of( - reason = "INVALID_LEDGER_TIME", - domain = "com.daml.on.sql", - metadata = Map( - "ledgerTime" -> transactionLedgerEffectiveTime.toInstant.toString, - "lowerBound" -> nowInstant.minus(minSkew).toString, - "upperBound" -> nowInstant.plus(maxSkew).toString, - ), - ) - ) - ) + s"INVALID_LEDGER_TIME(9,$submissionId): Invalid ledger time: Ledger time 2021-09-01T18:05:00Z outside of range [2021-09-01T17:59:50Z, 2021-09-01T18:00:30Z]" ) } } @@ -472,7 +447,7 @@ final class SqlLedgerSpec engine = new Engine(), validatePartyAllocation = false, enableCompression = false, - errorFactories = mock[ErrorFactories], + errorFactories = ErrorFactories(new ErrorCodesVersionSwitcher(true)), ).acquire()(ResourceContext(system.dispatcher)) createdLedgers += ledger ledger.asFuture