Skip to content

Commit

Permalink
Create IndexDbException as a specialization
Browse files Browse the repository at this point in the history
* Used to globally define error codes returned by the persistence layer
that are logging
  • Loading branch information
tudor-da committed Nov 25, 2021
1 parent ed9bb70 commit 1758539
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

package com.daml.error.definitions

import com.daml.error.ErrorCode.LoggingApiException
import com.daml.error._
import com.daml.error.definitions.ErrorGroups.ParticipantErrorGroup.IndexErrorGroup
import com.daml.error.utils.ErrorDetails
import io.grpc.StatusRuntimeException

@Explanation("Errors raised by the Participant Index persistence layer.")
object IndexErrors extends IndexErrorGroup {
Expand All @@ -19,11 +18,10 @@ object IndexErrors extends IndexErrorGroup {
extends ErrorCode(
id = "INDEX_DB_SQL_TRANSIENT_ERROR",
ErrorCategory.TransientServerFailure,
)
with HasUnapply {
) {
case class Reject(throwable: Throwable)(implicit
val loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
) extends DbError(
cause =
s"Processing the request failed due to a transient database error: ${throwable.getMessage}",
throwableO = Some(throwable),
Expand All @@ -38,11 +36,10 @@ object IndexErrors extends IndexErrorGroup {
extends ErrorCode(
id = "INDEX_DB_SQL_NON_TRANSIENT_ERROR",
ErrorCategory.SystemInternalAssumptionViolated,
)
with HasUnapply {
) {
case class Reject(throwable: Throwable)(implicit
val loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
) extends DbError(
cause =
s"Processing the request failed due to a non-transient database error: ${throwable.getMessage}",
throwableO = Some(throwable),
Expand All @@ -57,21 +54,34 @@ object IndexErrors extends IndexErrorGroup {
extends ErrorCode(
id = "INDEX_DB_INVALID_RESULT_SET",
ErrorCategory.SystemInternalAssumptionViolated,
)
with HasUnapply {
) {
case class Reject(message: String)(implicit
val loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
) extends DbError(
cause = message
)
}
}

trait HasUnapply {
this: ErrorCode =>
// TODO error codes: Create a generic unapply for ErrorCode that returns the ErrorCode instance
// and match against that one.
def unapply(exception: StatusRuntimeException): Option[Unit] =
if (ErrorDetails.isErrorCode(exception)(errorCode = this)) Some(()) else None
// Decorator that returns a specialized StatusRuntimeException (IndexDbException)
// that can be used for precise matching of persistence exceptions (e.g. for index initialization failures that need retrying).
// Without this specialization, internal errors just appear as StatusRuntimeExceptions (see INDEX_DB_SQL_NON_TRANSIENT_ERROR)
// without any marker, impeding us to assert whether they are emitted by the persistence layer or not.
abstract class DbError(
override val cause: String,
override val throwableO: Option[Throwable] = None,
)(implicit
code: ErrorCode,
loggingContext: ContextualizedErrorLogger,
) extends LoggingTransactionErrorImpl(cause, throwableO) {
override def asGrpcErrorFromContext(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): IndexDbException = {
val err = super.asGrpcErrorFromContext(contextualizedErrorLogger)
IndexDbException(err.getStatus, err.getTrailers)
}
}

case class IndexDbException(status: io.grpc.Status, metadata: io.grpc.Metadata)
extends LoggingApiException(status, metadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(
cause = "A command with the given command id has already been successfully processed",
definiteAnswer = _definiteAnswer
definiteAnswer = _definiteAnswer,
) {
override def context: Map[String, String] =
super.context ++ _existingCommandSubmissionId.map("existing_submission_id" -> _).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

package com.daml.error.utils

import com.daml.error.ErrorCode
import com.google.protobuf
import com.google.rpc.{ErrorInfo, RequestInfo, ResourceInfo, RetryInfo}
import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -39,16 +36,4 @@ object ErrorDetails {

case any => throw new IllegalStateException(s"Could not unpack value of: |$any|")
}

def isErrorCode(exception: StatusRuntimeException)(errorCode: ErrorCode): Boolean = {
val rpcStatus =
StatusProto.fromStatusAndTrailers(exception.getStatus, exception.getTrailers)

ErrorDetails
.from(rpcStatus.getDetailsList.asScala.toSeq)
.exists {
case ErrorInfoDetail(reason, _) => reason == errorCode.id
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
exception
},
v2 = LedgerApiErrors.ConsistencyErrors.DuplicateCommand
.Reject(existingCommandSubmissionId = existingSubmissionId)
.Reject(_existingCommandSubmissionId = existingSubmissionId)
.asGrpcError,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.Done
import akka.actor.Cancellable
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.daml.error.definitions.IndexErrors
import com.daml.error.definitions.IndexErrors.IndexDbException
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.offset.Offset
Expand Down Expand Up @@ -147,12 +147,11 @@ private[platform] object ReadOnlySqlLedger {
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[LedgerId] = {
// If the index database is not yet fully initialized,
// querying for the ledger ID will throw different errors,
// depending on the database, and how far the initialization is.
val isRetryable: PartialFunction[Throwable, Boolean] = {
// If the index database is not yet fully initialized,
// querying for the ledger ID will throw different errors,
// depending on the database, and how far the initialization is.
case IndexErrors.DatabaseErrors.SqlTransientError(_) => true
case IndexErrors.DatabaseErrors.SqlNonTransientError(_) => true
case _: IndexDbException => true
case _: LedgerIdNotFoundException => true
case _: MismatchException.LedgerId => false
case _ => false
Expand Down

0 comments on commit 1758539

Please sign in to comment.