From 95d758f793e63cb99f275a4b633038c3e0f4cb42 Mon Sep 17 00:00:00 2001 From: Tudor Voicu Date: Mon, 25 Oct 2021 12:35:54 +0200 Subject: [PATCH] Pass submission id as correlation id to error codes in Ledger API CHANGELOG_BEGIN CHANGELOG_END --- .../services/grpc/GrpcCommandService.scala | 20 +++++----- .../grpc/GrpcCommandSubmissionService.scala | 15 ++++---- .../services/ApiCommandService.scala | 23 ++++++++--- .../services/ApiSubmissionService.scala | 33 +++++++++------- .../admin/ApiConfigManagementService.scala | 19 ++++++---- .../admin/ApiPackageManagementService.scala | 24 ++++++------ .../admin/ApiParticipantPruningService.scala | 38 +++++++++++-------- 7 files changed, 101 insertions(+), 71 deletions(-) diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandService.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandService.scala index 4da59e653b4f..06b0e9fa533b 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandService.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandService.scala @@ -3,7 +3,7 @@ package com.daml.platform.server.api.services.grpc -import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger} +import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.SubmissionIdGenerator import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService @@ -31,12 +31,8 @@ class GrpcCommandService( with ProxyCloseable { protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) - private[this] val validator = new SubmitAndWaitRequestValidator( - new CommandsValidator(ledgerId) - ) + private[this] val validator = new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId)) override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = { val requestWithSubmissionId = generateSubmissionIdIfEmpty(request) @@ -46,7 +42,7 @@ class GrpcCommandService( currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), - ) + )(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId))) .fold( t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), _ => service.submitAndWait(requestWithSubmissionId), @@ -63,7 +59,7 @@ class GrpcCommandService( currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), - ) + )(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId))) .fold( t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), _ => service.submitAndWaitForTransactionId(requestWithSubmissionId), @@ -80,7 +76,7 @@ class GrpcCommandService( currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), - ) + )(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId))) .fold( t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), _ => service.submitAndWaitForTransaction(requestWithSubmissionId), @@ -97,7 +93,7 @@ class GrpcCommandService( currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), - ) + )(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId))) .fold( t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), _ => service.submitAndWaitForTransactionTree(requestWithSubmissionId), @@ -116,4 +112,8 @@ class GrpcCommandService( request } } + + private def contextualizedErrorLogger(submissionId: Option[String])(implicit + loggingContext: LoggingContext + ) = new DamlContextualizedErrorLogger(logger, loggingContext, submissionId) } diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala index 72baaa66ecb8..e68e96491627 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/services/grpc/GrpcCommandSubmissionService.scala @@ -3,7 +3,7 @@ package com.daml.platform.server.api.services.grpc -import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger} +import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.SubmissionIdGenerator import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{ @@ -40,12 +40,6 @@ class GrpcCommandSubmissionService( with GrpcApiService { protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger( - logger, - loggingContext, - None, - ) private val validator = new SubmitRequestValidator(new CommandsValidator(ledgerId)) override def submit(request: ApiSubmitRequest): Future[Empty] = { @@ -58,6 +52,11 @@ class GrpcCommandSubmissionService( telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId) } val requestWithSubmissionId = generateSubmissionIdIfEmpty(request) + val errorLogger = new DamlContextualizedErrorLogger( + logger = logger, + loggingContext = loggingContext, + correlationId = requestWithSubmissionId.commands.map(_.submissionId), + ) Timed.timedAndTrackedFuture( metrics.daml.commands.submissions, metrics.daml.commands.submissionsRunning, @@ -69,7 +68,7 @@ class GrpcCommandSubmissionService( currentLedgerTime(), currentUtcTime(), maxDeduplicationTime(), - ), + )(errorLogger), ) .fold( t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)), diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala index 85bdd4ae4048..2b82f6b582b5 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiCommandService.scala @@ -81,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] ( private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit loggingContext: LoggingContext ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = { - val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None) val commands = request.getCommands withEnrichedLoggingContext( logging.submissionId(commands.submissionId), @@ -95,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] ( .map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS))) submissionTracker.track(CommandSubmission(commands, timeout)) } else { - Future.failed( - errorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger) - ) - }.andThen(logger.logErrorsOnCall[Completion]) + handleFailure(request, loggingContext) + } } } @@ -159,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] ( }, ) } + + private def handleFailure( + request: SubmitAndWaitRequest, + loggingContext: LoggingContext, + ): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = + Future + .failed( + errorFactories.serviceNotRunning(definiteAnswer = Some(false))( + new DamlContextualizedErrorLogger( + logger, + loggingContext, + request.commands.map(_.submissionId), + ) + ) + ) + .andThen(logger.logErrorsOnCall[Completion](loggingContext)) } private[apiserver] object ApiCommandService { diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala index 2892eb28f637..392e7f4343cd 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/ApiSubmissionService.scala @@ -11,7 +11,7 @@ import com.daml.error.{ ErrorCodesVersionSwitcher, } import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators} -import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands} +import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator} import com.daml.ledger.configuration.Configuration @@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] ( withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext => logger.info("Submitting transaction") logger.trace(s"Commands: ${request.commands.commands.commands}") + + implicit val contextualizedErrorLogger: ContextualizedErrorLogger = + new DamlContextualizedErrorLogger( + logger, + loggingContext, + Some(SubmissionId.unwrap(request.commands.submissionId)), + ) + val evaluatedCommand = ledgerConfigurationSubscription .latestConfiguration() match { case Some(ledgerConfiguration) => @@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( } case None => Future.failed( - errorFactories.missingLedgerConfig(definiteAnswer = Some(false))( - new DamlContextualizedErrorLogger(logger, loggingContext, None) - ) + errorFactories.missingLedgerConfig(definiteAnswer = Some(false)) ) } evaluatedCommand.andThen(logger.logErrorsOnCall[Unit]) @@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] ( seed: crypto.Hash, commands: ApiCommands, ledgerConfig: Configuration, - )(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] = + )(implicit + loggingContext: LoggingContext, + telemetryContext: TelemetryContext, + contextualizedErrorLogger: ContextualizedErrorLogger, + ): Future[Unit] = submissionService .deduplicateCommand( commands.commandId, @@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( case _: CommandDeduplicationDuplicate => metrics.daml.commands.deduplicatedCommands.mark() Future.failed( - errorFactories.duplicateCommandException( - new DamlContextualizedErrorLogger(logger, loggingContext, None) - ) + errorFactories.duplicateCommandException ) } @@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( private def handleCommandExecutionResult( result: Either[ErrorCause, CommandExecutionResult] - ): Future[CommandExecutionResult] = + )(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] = result.fold( error => { metrics.daml.commands.failedCommandInterpretations.mark() @@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] ( )(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext, + contextualizedErrorLogger: ContextualizedErrorLogger, ): Future[state.SubmissionResult] = for { result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig) @@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] ( private def failedOnCommandExecution( error: ErrorCause - )(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = { - implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) - + )(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] = errorCodesVersionSwitcher.chooseAsFailedFuture( v1 = toStatusExceptionV1(error), v2 = RejectionGenerators .commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)), ) - } override def close(): Unit = () diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala index 964248b894f8..b6060886f88d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiConfigManagementService.scala @@ -48,9 +48,6 @@ private[apiserver] final class ApiConfigManagementService private ( ) extends ConfigManagementService with GrpcApiService { private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) - private val errorFactories = com.daml.platform.server.api.validation.ErrorFactories(errorCodesVersionSwitcher) @@ -68,7 +65,11 @@ private[apiserver] final class ApiConfigManagementService private ( Future.successful(configurationToResponse(configuration)) case None => // TODO error codes: Duplicate of missingLedgerConfig - Future.failed(errorFactories.missingLedgerConfigUponRequest) + Future.failed( + errorFactories.missingLedgerConfigUponRequest( + new DamlContextualizedErrorLogger(logger, loggingContext, None) + ) + ) } .andThen(logger.logErrorsOnCall[GetTimeModelResponse]) } @@ -95,7 +96,7 @@ private[apiserver] final class ApiConfigManagementService private ( implicit val telemetryContext: TelemetryContext = DefaultTelemetry.contextFromGrpcThreadLocalContext() implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) + new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId)) val response = for { // Validate and convert the request parameters @@ -230,13 +231,15 @@ private[apiserver] object ApiConfigManagementService { configManagementService: IndexConfigManagementService, ledgerEnd: LedgerOffset.Absolute, errorFactories: ErrorFactories, - )(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger) + )(implicit loggingContext: LoggingContext) extends SynchronousResponse.Strategy[ (Time.Timestamp, Configuration), ConfigurationEntry, ConfigurationEntry.Accepted, ] { + private val logger = ContextualizedLogger.get(getClass) + override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] = Future.successful(Some(ledgerEnd)) @@ -264,7 +267,9 @@ private[apiserver] object ApiConfigManagementService { submissionId: Ref.SubmissionId ): PartialFunction[ConfigurationEntry, StatusRuntimeException] = { case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) => - errorFactories.configurationEntryRejected(reason, None) + errorFactories.configurationEntryRejected(reason, None)( + new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) + ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPackageManagementService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPackageManagementService.scala index 7ea831d39c45..3491feb6e5e8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPackageManagementService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiPackageManagementService.scala @@ -8,11 +8,8 @@ import java.util.zip.ZipInputStream import akka.stream.Materializer import akka.stream.scaladsl.Source import com.daml.daml_lf_dev.DamlLf.Archive -import com.daml.error.{ - ContextualizedErrorLogger, - DamlContextualizedErrorLogger, - ErrorCodesVersionSwitcher, -} +import com.daml.error.ErrorCodesVersionSwitcher +import com.daml.error.DamlContextualizedErrorLogger import com.daml.ledger.api.domain.{LedgerOffset, PackageEntry} import com.daml.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService import com.daml.ledger.api.v1.admin.package_management_service._ @@ -60,8 +57,6 @@ private[apiserver] final class ApiPackageManagementService private ( with GrpcApiService { private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) private val errorFactories = ErrorFactories(errorCodesVersionSwitcher) @@ -128,7 +123,13 @@ private[apiserver] final class ApiPackageManagementService private ( ValidationLogger .logFailureWithContext( request, - errorFactories.invalidArgument(None)(err.getMessage), + errorFactories.invalidArgument(None)(err.getMessage)( + new DamlContextualizedErrorLogger( + logger, + loggingContext, + Some(request.submissionId), + ) + ), ) ), Future.successful, @@ -143,7 +144,6 @@ private[apiserver] final class ApiPackageManagementService private ( response.andThen(logger.logErrorsOnCall[UploadDarFileResponse]) } - } private[apiserver] object ApiPackageManagementService { @@ -185,8 +185,6 @@ private[apiserver] object ApiPackageManagementService { PackageEntry.PackageUploadAccepted, ] { private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) - private implicit val contextualizedErrorLogger: ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] = ledgerEndService.currentLedgerEnd().map(Some(_)) @@ -209,7 +207,9 @@ private[apiserver] object ApiPackageManagementService { submissionId: Ref.SubmissionId ): PartialFunction[PackageEntry, StatusRuntimeException] = { case PackageEntry.PackageUploadRejected(`submissionId`, _, reason) => - errorFactories.packageUploadRejected(reason, definiteAnswer = None) + errorFactories.packageUploadRejected(reason, definiteAnswer = None)( + new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) + ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiParticipantPruningService.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiParticipantPruningService.scala index b03fe151dd0e..94100bd8d3cf 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiParticipantPruningService.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/admin/ApiParticipantPruningService.scala @@ -54,7 +54,9 @@ final class ApiParticipantPruningService private ( ) .left .map(err => - errorFactories.invalidArgument(None)(s"submission_id $err")(contextualizedErrorLogger) + errorFactories.invalidArgument(None)(s"submission_id $err")( + contextualizedErrorLogger(request.submissionId) + ) ) submissionIdOrErr.fold( @@ -65,7 +67,10 @@ final class ApiParticipantPruningService private ( logger.info(s"Pruning up to ${request.pruneUpTo}") (for { - pruneUpTo <- validateRequest(request) + pruneUpTo <- validateRequest(request)( + loggingContext, + contextualizedErrorLogger(submissionId), + ) // If write service pruning succeeds but ledger api server index pruning fails, the user can bring the // systems back in sync by reissuing the prune request at the currently specified or later offset. @@ -83,7 +88,10 @@ final class ApiParticipantPruningService private ( private def validateRequest( request: PruneRequest - )(implicit loggingContext: LoggingContext): Future[Offset] = { + )(implicit + loggingContext: LoggingContext, + errorLogger: ContextualizedErrorLogger, + ): Future[Offset] = { (for { pruneUpToString <- checkOffsetIsSpecified(request.pruneUpTo) pruneUpTo <- checkOffsetIsHexadecimal(pruneUpToString) @@ -98,9 +106,7 @@ final class ApiParticipantPruningService private ( pruneUpTo: Offset, submissionId: Ref.SubmissionId, pruneAllDivulgedContracts: Boolean, - )(implicit - loggingContext: LoggingContext - ): Future[Unit] = { + )(implicit loggingContext: LoggingContext): Future[Unit] = { import state.PruningResult._ logger.info( s"About to prune participant ledger up to ${pruneUpTo.toApiString} inclusively starting with the write service" @@ -130,16 +136,16 @@ final class ApiParticipantPruningService private ( private def checkOffsetIsSpecified( offset: String - )(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, String] = + )(implicit errorLogger: ContextualizedErrorLogger): Either[StatusRuntimeException, String] = Either.cond( offset.nonEmpty, offset, - errorFactories.invalidArgument(None)("prune_up_to not specified")(contextualizedErrorLogger), + errorFactories.invalidArgument(None)("prune_up_to not specified"), ) private def checkOffsetIsHexadecimal( pruneUpToString: String - )(implicit loggingContext: LoggingContext): Either[StatusRuntimeException, Offset] = + )(implicit errorLogger: ContextualizedErrorLogger): Either[StatusRuntimeException, Offset] = ApiOffset .fromString(pruneUpToString) .toEither @@ -150,13 +156,16 @@ final class ApiParticipantPruningService private ( offsetValue = pruneUpToString, message = s"prune_up_to needs to be a hexadecimal string and not $pruneUpToString: ${t.getMessage}", - )(contextualizedErrorLogger) + ) ) private def checkOffsetIsBeforeLedgerEnd( pruneUpToProto: Offset, pruneUpToString: String, - )(implicit loggingContext: LoggingContext): Future[Offset] = + )(implicit + loggingContext: LoggingContext, + errorLogger: ContextualizedErrorLogger, + ): Future[Offset] = for { ledgerEnd <- readBackend.currentLedgerEnd() _ <- @@ -165,15 +174,14 @@ final class ApiParticipantPruningService private ( Future.failed( errorFactories.readingOffsetAfterLedgerEnd_was_invalidArgument(None)( s"prune_up_to needs to be before ledger end ${ledgerEnd.value}" - )(contextualizedErrorLogger) + ) ) } yield pruneUpToProto - private def contextualizedErrorLogger(implicit + private def contextualizedErrorLogger(submissionId: String)(implicit loggingContext: LoggingContext ): ContextualizedErrorLogger = - new DamlContextualizedErrorLogger(logger, loggingContext, None) - + new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId)) } object ApiParticipantPruningService {