Skip to content

Commit

Permalink
Pass submission id as correlation id to error codes in Ledger API
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da committed Oct 25, 2021
1 parent 03cfd12 commit 95d758f
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger}
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
Expand Down Expand Up @@ -31,12 +31,8 @@ class GrpcCommandService(
with ProxyCloseable {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private[this] val validator = new SubmitAndWaitRequestValidator(
new CommandsValidator(ledgerId)
)
private[this] val validator = new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId))

override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] = {
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
Expand All @@ -46,7 +42,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
Expand All @@ -63,7 +59,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
Expand All @@ -80,7 +76,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
Expand All @@ -97,7 +93,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
Expand All @@ -116,4 +112,8 @@ class GrpcCommandService(
request
}
}

private def contextualizedErrorLogger(submissionId: Option[String])(implicit
loggingContext: LoggingContext
) = new DamlContextualizedErrorLogger(logger, loggingContext, submissionId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{DamlContextualizedErrorLogger, ContextualizedErrorLogger}
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
Expand Down Expand Up @@ -40,12 +40,6 @@ class GrpcCommandSubmissionService(
with GrpcApiService {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
None,
)
private val validator = new SubmitRequestValidator(new CommandsValidator(ledgerId))

override def submit(request: ApiSubmitRequest): Future[Empty] = {
Expand All @@ -58,6 +52,11 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
val errorLogger = new DamlContextualizedErrorLogger(
logger = logger,
loggingContext = loggingContext,
correlationId = requestWithSubmissionId.commands.map(_.submissionId),
)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
Expand All @@ -69,7 +68,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)(errorLogger),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] (
private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
Expand All @@ -95,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] (
.map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)))
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
handleFailure(request, loggingContext)
}
}
}

Expand Down Expand Up @@ -159,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] (
},
)
}

private def handleFailure(
request: SubmitAndWaitRequest,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future
.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.map(_.submissionId),
)
)
)
.andThen(logger.logErrorsOnCall[Completion](loggingContext))
}

private[apiserver] object ApiCommandService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.daml.error.{
ErrorCodesVersionSwitcher,
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.configuration.Configuration
Expand Down Expand Up @@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")

implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
Some(SubmissionId.unwrap(request.commands.submissionId)),
)

val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
Expand All @@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case None =>
Future.failed(
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
Expand All @@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] (
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
submissionService
.deduplicateCommand(
commands.commandId,
Expand All @@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.duplicateCommandException
)
}

Expand All @@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
): Future[CommandExecutionResult] =
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
Expand All @@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
Expand Down Expand Up @@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = RejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)),
)
}

override def close(): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ private[apiserver] final class ApiConfigManagementService private (
) extends ConfigManagementService
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories =
com.daml.platform.server.api.validation.ErrorFactories(errorCodesVersionSwitcher)

Expand All @@ -68,7 +65,11 @@ private[apiserver] final class ApiConfigManagementService private (
Future.successful(configurationToResponse(configuration))
case None =>
// TODO error codes: Duplicate of missingLedgerConfig
Future.failed(errorFactories.missingLedgerConfigUponRequest)
Future.failed(
errorFactories.missingLedgerConfigUponRequest(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
)
}
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}
Expand All @@ -95,7 +96,7 @@ private[apiserver] final class ApiConfigManagementService private (
implicit val telemetryContext: TelemetryContext =
DefaultTelemetry.contextFromGrpcThreadLocalContext()
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId))

val response = for {
// Validate and convert the request parameters
Expand Down Expand Up @@ -230,13 +231,15 @@ private[apiserver] object ApiConfigManagementService {
configManagementService: IndexConfigManagementService,
ledgerEnd: LedgerOffset.Absolute,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger)
)(implicit loggingContext: LoggingContext)
extends SynchronousResponse.Strategy[
(Time.Timestamp, Configuration),
ConfigurationEntry,
ConfigurationEntry.Accepted,
] {

private val logger = ContextualizedLogger.get(getClass)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
Future.successful(Some(ledgerEnd))

Expand Down Expand Up @@ -264,7 +267,9 @@ private[apiserver] object ApiConfigManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[ConfigurationEntry, StatusRuntimeException] = {
case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) =>
errorFactories.configurationEntryRejected(reason, None)
errorFactories.configurationEntryRejected(reason, None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import java.util.zip.ZipInputStream
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain.{LedgerOffset, PackageEntry}
import com.daml.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.daml.ledger.api.v1.admin.package_management_service._
Expand Down Expand Up @@ -60,8 +57,6 @@ private[apiserver] final class ApiPackageManagementService private (
with GrpcApiService {

private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

Expand Down Expand Up @@ -128,7 +123,13 @@ private[apiserver] final class ApiPackageManagementService private (
ValidationLogger
.logFailureWithContext(
request,
errorFactories.invalidArgument(None)(err.getMessage),
errorFactories.invalidArgument(None)(err.getMessage)(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
Some(request.submissionId),
)
),
)
),
Future.successful,
Expand All @@ -143,7 +144,6 @@ private[apiserver] final class ApiPackageManagementService private (

response.andThen(logger.logErrorsOnCall[UploadDarFileResponse])
}

}

private[apiserver] object ApiPackageManagementService {
Expand Down Expand Up @@ -185,8 +185,6 @@ private[apiserver] object ApiPackageManagementService {
PackageEntry.PackageUploadAccepted,
] {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
ledgerEndService.currentLedgerEnd().map(Some(_))
Expand All @@ -209,7 +207,9 @@ private[apiserver] object ApiPackageManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[PackageEntry, StatusRuntimeException] = {
case PackageEntry.PackageUploadRejected(`submissionId`, _, reason) =>
errorFactories.packageUploadRejected(reason, definiteAnswer = None)
errorFactories.packageUploadRejected(reason, definiteAnswer = None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Loading

0 comments on commit 95d758f

Please sign in to comment.