Skip to content

Commit

Permalink
[DPP-647][Self-service error codes] Adopt ApiParticipantPruningService
Browse files Browse the repository at this point in the history
  • Loading branch information
pbatko-da committed Oct 20, 2021
1 parent da27a1e commit 9e88d1d
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,43 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
v1 = {
val statusBuilder = Status
.newBuilder()
.setCode(Code.INVALID_ARGUMENT.value())
.setMessage(s"Invalid argument: $message")
addDefiniteAnswerDetails(definiteAnswer, statusBuilder)
grpcError(statusBuilder.build())
invalidArgumentV1(definiteAnswer, message)
},
v2 = LedgerApiErrors.CommandValidation.InvalidArgument
.Reject(message)
.asGrpcError,
)

def readingOffsetAfterLedgerEnd(definiteAnswer: Option[Boolean])(message: String)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
v1 = {
invalidArgumentV1(definiteAnswer, message)
},
v2 = LedgerApiErrors.ReadErrors.RequestedOffsetAfterLedgerEnd
.Reject(message)
.asGrpcError,
)

def nonHexOffset(
definiteAnswer: Option[Boolean]
)(fieldName: String, offsetValue: String, message: String)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
v1 = {
invalidArgumentV1(definiteAnswer, message)
},
v2 = LedgerApiErrors.NonHexOffset
.Error(
fieldName = fieldName,
offsetValue = offsetValue,
message = message,
)
.asGrpcError,
)

/** @param fieldName An invalid field's name.
* @param message A status' message.
* @param definiteAnswer A flag that says whether it is a definite answer. Provided only in the context of command deduplication.
Expand Down Expand Up @@ -319,6 +344,18 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
def grpcError(status: Status): StatusRuntimeException = new NoStackTraceApiException(
StatusProto.toStatusRuntimeException(status)
)

private def invalidArgumentV1(
definiteAnswer: Option[Boolean],
message: String,
): StatusRuntimeException = {
val statusBuilder = Status
.newBuilder()
.setCode(Code.INVALID_ARGUMENT.value())
.setMessage(s"Invalid argument: $message")
addDefiniteAnswerDetails(definiteAnswer, statusBuilder)
grpcError(statusBuilder.build())
}
}

/** Object exposing the legacy error factories.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ class ErrorFactoriesSpec extends AnyWordSpec with Matchers with TableDrivenPrope
)
}

"return a nonHexOffset error" in {
assertVersionedError(
_.nonHexOffset(None)(
fieldName = "fieldName123",
offsetValue = "offsetValue123",
message = "message123",
)
)(
v1_code = Code.INVALID_ARGUMENT,
v1_message = "Invalid argument: message123",
v1_details = Seq.empty,
v2_code = Code.INVALID_ARGUMENT,
v2_message =
s"NON_HEXADECIMAL_OFFSET(8,$correlationId): Offset in fieldName123 not specified in hexadecimal: offsetValue123: message123",
v2_details = Seq[ErrorDetails.ErrorDetail](
ErrorDetails.ErrorInfoDetail("NON_HEXADECIMAL_OFFSET"),
DefaultTraceIdRequestInfo,
),
)
}

"return a readingOffsetAfterLedgerEnd error" in {
assertVersionedError(_.readingOffsetAfterLedgerEnd(None)("message123"))(
v1_code = Code.INVALID_ARGUMENT,
v1_message = "Invalid argument: message123",
v1_details = Seq.empty,
v2_code = Code.OUT_OF_RANGE,
v2_message = s"REQUESTED_OFFSET_OUT_OF_RANGE(12,$correlationId): message123",
v2_details = Seq[ErrorDetails.ErrorDetail](
ErrorDetails.ErrorInfoDetail("REQUESTED_OFFSET_OUT_OF_RANGE"),
DefaultTraceIdRequestInfo,
),
)
}

"return an unauthenticatedMissingJwtToken error" in {
assertVersionedError(_.unauthenticatedMissingJwtToken())(
v1_code = Code.UNAUTHENTICATED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ private[daml] object ApiServices {
)

val apiParticipantPruningService =
ApiParticipantPruningService.createApiService(indexService, writeService)
ApiParticipantPruningService.createApiService(
indexService,
writeService,
errorsVersionsSwitcher,
)

List(
new CommandSubmissionServiceAuthorization(apiSubmissionService, authorizer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package com.daml.platform.apiserver.services.admin

import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger}
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}

import java.util.UUID
import com.daml.ledger.api.v1.admin.participant_pruning_service.{
Expand All @@ -30,30 +34,53 @@ import scala.concurrent.{ExecutionContext, Future}
final class ApiParticipantPruningService private (
readBackend: IndexParticipantPruningService with LedgerEndService,
writeBackend: state.WriteParticipantPruningService,
)(implicit executionContext: ExecutionContext, logCtx: LoggingContext)
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext)
extends ParticipantPruningServiceGrpc.ParticipantPruningService
with GrpcApiService {

private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, logCtx, None)
private val delegate = new ApiParticipantPruningServiceDelegate(
readBackend,
writeBackend,
errorCodesVersionSwitcher,
)

override def close(): Unit = ()

override def bindService(): ServerServiceDefinition =
ParticipantPruningServiceGrpc.bindService(this, executionContext)

override def prune(request: PruneRequest): Future[PruneResponse] = {
delegate.prune(request)
}
}

final class ApiParticipantPruningServiceDelegate(
readBackend: IndexParticipantPruningService with LedgerEndService,
writeBackend: state.WriteParticipantPruningService,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit executionContext: ExecutionContext) {

private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

def prune(
request: PruneRequest
)(implicit loggingContext: LoggingContext): Future[PruneResponse] = {
val submissionIdOrErr = Ref.SubmissionId
.fromString(
if (request.submissionId.nonEmpty) request.submissionId else UUID.randomUUID().toString
)
.left
.map(err => ErrorFactories.invalidArgument(None)(s"submission_id $err"))
.map(err =>
errorFactories.invalidArgument(None)(s"submission_id $err")(contextualizedErrorLogger)
)

submissionIdOrErr.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
submissionId =>
LoggingContext.withEnrichedLoggingContext(logging.submissionId(submissionId)) {
implicit logCtx =>
implicit loggingContext =>
logger.info(s"Pruning up to ${request.pruneUpTo}")
(for {

Expand All @@ -75,7 +102,7 @@ final class ApiParticipantPruningService private (

private def validateRequest(
request: PruneRequest
)(implicit logCtx: LoggingContext): Future[Offset] = {
)(implicit loggingContext: LoggingContext): Future[Offset] = {
(for {
pruneUpToString <- checkOffsetIsSpecified(request.pruneUpTo)
pruneUpTo <- checkOffsetIsHexadecimal(pruneUpToString)
Expand All @@ -91,7 +118,7 @@ final class ApiParticipantPruningService private (
submissionId: Ref.SubmissionId,
pruneAllDivulgedContracts: Boolean,
)(implicit
logCtx: LoggingContext
loggingContext: LoggingContext
): Future[Unit] = {
import state.PruningResult._
logger.info(
Expand All @@ -110,7 +137,7 @@ final class ApiParticipantPruningService private (
private def pruneLedgerApiServerIndex(
pruneUpTo: Offset,
pruneAllDivulgedContracts: Boolean,
)(implicit logCtx: LoggingContext): Future[PruneResponse] = {
)(implicit loggingContext: LoggingContext): Future[PruneResponse] = {
logger.info(s"About to prune ledger api server index to ${pruneUpTo.toApiString} inclusively")
readBackend
.prune(pruneUpTo, pruneAllDivulgedContracts)
Expand All @@ -120,56 +147,63 @@ final class ApiParticipantPruningService private (
}
}

private def checkOffsetIsSpecified(offset: String): Either[StatusRuntimeException, String] =
private def checkOffsetIsSpecified(
offset: String
)(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, String] =
Either.cond(
offset.nonEmpty,
offset,
ErrorFactories.invalidArgument(None)("prune_up_to not specified"),
errorFactories.invalidArgument(None)("prune_up_to not specified")(contextualizedErrorLogger),
)

private def checkOffsetIsHexadecimal(
pruneUpToString: String
): Either[StatusRuntimeException, Offset] =
)(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, Offset] =
ApiOffset
.fromString(pruneUpToString)
.toEither
.left
.map(t =>
// TODO error codes: Use LedgerApiErrors.NonHexOffset
ErrorFactories.invalidArgument(None)(
s"prune_up_to needs to be a hexadecimal string and not $pruneUpToString: ${t.getMessage}"
)
errorFactories.nonHexOffset(None)(
fieldName = "prune_up_to",
offsetValue = pruneUpToString,
message =
s"prune_up_to needs to be a hexadecimal string and not $pruneUpToString: ${t.getMessage}",
)(contextualizedErrorLogger)
)

private def checkOffsetIsBeforeLedgerEnd(
pruneUpToProto: Offset,
pruneUpToString: String,
)(implicit logCtx: LoggingContext): Future[Offset] =
)(implicit loggingContext: LoggingContext): Future[Offset] =
for {
ledgerEnd <- readBackend.currentLedgerEnd()
_ <-
if (pruneUpToString < ledgerEnd.value) Future.successful(())
else
Future.failed(
// TODO error codes: Use LedgerApiErrors.ReadErrors.requestedOffsetAfterLedgerEnd
ErrorFactories.invalidArgument(None)(
errorFactories.readingOffsetAfterLedgerEnd(None)(
s"prune_up_to needs to be before ledger end ${ledgerEnd.value}"
)
)(contextualizedErrorLogger)
)
} yield pruneUpToProto

override def close(): Unit = ()
private def contextualizedErrorLogger(implicit
loggingContext: LoggingContext
): ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

}

object ApiParticipantPruningService {
def createApiService(
readBackend: IndexParticipantPruningService with LedgerEndService,
writeBackend: state.WriteParticipantPruningService,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
)(implicit
executionContext: ExecutionContext,
logCtx: LoggingContext,
loggingContext: LoggingContext,
): ParticipantPruningServiceGrpc.ParticipantPruningService with GrpcApiService =
new ApiParticipantPruningService(readBackend, writeBackend)
new ApiParticipantPruningService(readBackend, writeBackend, errorCodesVersionSwitcher)

}

0 comments on commit 9e88d1d

Please sign in to comment.