Skip to content

Commit

Permalink
Pass submission id as correlation id to error codes in Ledger API
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da committed Oct 27, 2021
1 parent 503e391 commit d361e84
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
Expand Down Expand Up @@ -37,8 +33,6 @@ class GrpcCommandService(
with ProxyCloseable {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private[this] val validator = new SubmitAndWaitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
Expand All @@ -53,7 +47,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
Expand All @@ -70,7 +64,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
Expand All @@ -87,7 +81,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
Expand All @@ -104,7 +98,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId.commands.map(_.submissionId)))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
Expand All @@ -123,4 +117,8 @@ class GrpcCommandService(
request
}
}

private def contextualizedErrorLogger(submissionId: Option[String])(implicit
loggingContext: LoggingContext
) = new DamlContextualizedErrorLogger(logger, loggingContext, submissionId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
Expand Down Expand Up @@ -44,12 +40,6 @@ class GrpcCommandSubmissionService(
with GrpcApiService {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
None,
)
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
FieldValidations(ErrorFactories(errorCodesVersionSwitcher)),
Expand All @@ -64,6 +54,11 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val errorLogger = new DamlContextualizedErrorLogger(
logger = logger,
loggingContext = loggingContext,
correlationId = request.commands.map(_.submissionId),
)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
Expand All @@ -75,7 +70,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)(errorLogger),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private[apiserver] final class ApiCommandService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
import errorFactories.serviceNotRunning

@volatile private var running = true

Expand All @@ -82,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] (
private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
Expand All @@ -96,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] (
.map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)))
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
handleFailure(request, loggingContext)
}
}
}

Expand Down Expand Up @@ -160,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] (
},
)
}

private def handleFailure(
request: SubmitAndWaitRequest,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future
.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.map(_.submissionId),
)
)
)
.andThen(logger.logErrorsOnCall[Completion](loggingContext))
}

private[apiserver] object ApiCommandService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.error.{
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
Expand Down Expand Up @@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")

implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.submissionId.map(SubmissionId.unwrap),
)

val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
Expand All @@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case None =>
Future.failed(
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
Expand All @@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] (
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
submissionService
.deduplicateCommand(
commands.commandId,
Expand All @@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.duplicateCommandException
)
}

Expand All @@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
): Future[CommandExecutionResult] =
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
Expand All @@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
Expand Down Expand Up @@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = RejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)),
)
}

override def close(): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ private[apiserver] final class ApiConfigManagementService private (
) extends ConfigManagementService
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

import errorFactories._

override def close(): Unit = ()
Expand All @@ -69,7 +65,11 @@ private[apiserver] final class ApiConfigManagementService private (
Future.successful(configurationToResponse(configuration))
case None =>
// TODO error codes: Duplicate of missingLedgerConfig
Future.failed(missingLedgerConfigUponRequest)
Future.failed(
missingLedgerConfigUponRequest(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
)
}
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}
Expand All @@ -96,7 +96,7 @@ private[apiserver] final class ApiConfigManagementService private (
implicit val telemetryContext: TelemetryContext =
DefaultTelemetry.contextFromGrpcThreadLocalContext()
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId))

val response = for {
// Validate and convert the request parameters
Expand Down Expand Up @@ -231,13 +231,15 @@ private[apiserver] object ApiConfigManagementService {
configManagementService: IndexConfigManagementService,
ledgerEnd: LedgerOffset.Absolute,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger)
)(implicit loggingContext: LoggingContext)
extends SynchronousResponse.Strategy[
(Time.Timestamp, Configuration),
ConfigurationEntry,
ConfigurationEntry.Accepted,
] {

private val logger = ContextualizedLogger.get(getClass)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
Future.successful(Some(ledgerEnd))

Expand Down Expand Up @@ -265,7 +267,9 @@ private[apiserver] object ApiConfigManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[ConfigurationEntry, StatusRuntimeException] = {
case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) =>
errorFactories.configurationEntryRejected(reason, None)
errorFactories.configurationEntryRejected(reason, None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.api.util.TimestampConversion
import com.daml.daml_lf_dev.DamlLf.Archive
import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.ledger.api.domain.{LedgerOffset, PackageEntry}
import com.daml.ledger.api.v1.admin.package_management_service.PackageManagementServiceGrpc.PackageManagementService
import com.daml.ledger.api.v1.admin.package_management_service._
Expand Down Expand Up @@ -60,8 +57,6 @@ private[apiserver] final class ApiPackageManagementService private (
with GrpcApiService {

private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)

Expand Down Expand Up @@ -128,7 +123,13 @@ private[apiserver] final class ApiPackageManagementService private (
ValidationLogger
.logFailureWithContext(
request,
errorFactories.invalidArgument(None)(err.getMessage),
errorFactories.invalidArgument(None)(err.getMessage)(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
Some(submissionId),
)
),
)
),
Future.successful,
Expand All @@ -143,7 +144,6 @@ private[apiserver] final class ApiPackageManagementService private (

response.andThen(logger.logErrorsOnCall[UploadDarFileResponse])
}

}

private[apiserver] object ApiPackageManagementService {
Expand Down Expand Up @@ -185,8 +185,6 @@ private[apiserver] object ApiPackageManagementService {
PackageEntry.PackageUploadAccepted,
] {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
ledgerEndService.currentLedgerEnd().map(Some(_))
Expand All @@ -209,7 +207,9 @@ private[apiserver] object ApiPackageManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[PackageEntry, StatusRuntimeException] = {
case PackageEntry.PackageUploadRejected(`submissionId`, _, reason) =>
errorFactories.packageUploadRejected(reason, definiteAnswer = None)
errorFactories.packageUploadRejected(reason, definiteAnswer = None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Loading

0 comments on commit d361e84

Please sign in to comment.