Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Self-service error codes] Pass submission id as correlation id to error codes in Ledger API [DPP-684] #11381

Merged
merged 1 commit into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ trait ContextualizedErrorLogger {
def error(message: String, throwable: Throwable): Unit
}

/** Implementation of [[ContextualizedErrorLogger]] leveraging the //libs-scala/contextualized-logging
* as the logging stack.
*
* @param logger The logger.
* @param loggingContext The logging context.
* @param correlationId The correlation id, if present. The choice of the correlation id depends on the
* ledger integration. By default it should be the command submission id.
*/
class DamlContextualizedErrorLogger(
logger: ContextualizedLogger,
loggingContext: LoggingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
Expand Down Expand Up @@ -37,8 +33,6 @@ class GrpcCommandService(
with ProxyCloseable {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private[this] val validator = new SubmitAndWaitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
Expand All @@ -53,7 +47,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
Expand All @@ -70,7 +64,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
Expand All @@ -87,7 +81,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
Expand All @@ -104,7 +98,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
Expand All @@ -123,4 +117,9 @@ class GrpcCommandService(
request
}
}

private def contextualizedErrorLogger(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
) =
new DamlContextualizedErrorLogger(logger, loggingContext, request.commands.map(_.submissionId))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
Expand Down Expand Up @@ -44,12 +40,6 @@ class GrpcCommandSubmissionService(
with GrpcApiService {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
None,
)
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
FieldValidations(ErrorFactories(errorCodesVersionSwitcher)),
Expand All @@ -64,6 +54,11 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val errorLogger = new DamlContextualizedErrorLogger(
logger = logger,
loggingContext = loggingContext,
correlationId = request.commands.map(_.submissionId),
)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
Expand All @@ -75,7 +70,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)(errorLogger),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private[apiserver] final class ApiCommandService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
import errorFactories.serviceNotRunning

@volatile private var running = true

Expand All @@ -82,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] (
private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
Expand All @@ -96,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] (
.map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)))
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
handleFailure(request, loggingContext)
}
}
}

Expand Down Expand Up @@ -160,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] (
},
)
}

private def handleFailure(
request: SubmitAndWaitRequest,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future
.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.map(_.submissionId),
)
)
)
.andThen(logger.logErrorsOnCall[Completion](loggingContext))
}

private[apiserver] object ApiCommandService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.error.{
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
Expand Down Expand Up @@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")

implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.submissionId.map(SubmissionId.unwrap),
)

val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
Expand All @@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case None =>
Future.failed(
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
Expand All @@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] (
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
submissionService
.deduplicateCommand(
commands.commandId,
Expand All @@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.duplicateCommandException
)
}

Expand All @@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
): Future[CommandExecutionResult] =
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
Expand All @@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
Expand Down Expand Up @@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = RejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)),
)
}

override def close(): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ private[apiserver] final class ApiConfigManagementService private (
) extends ConfigManagementService
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
private val fieldValidations = FieldValidations(errorFactories)

Expand All @@ -70,7 +67,11 @@ private[apiserver] final class ApiConfigManagementService private (
Future.successful(configurationToResponse(configuration))
case None =>
// TODO error codes: Duplicate of missingLedgerConfig
Future.failed(missingLedgerConfigUponRequest)
Future.failed(
missingLedgerConfigUponRequest(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
)
}
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}
Expand All @@ -97,7 +98,7 @@ private[apiserver] final class ApiConfigManagementService private (
implicit val telemetryContext: TelemetryContext =
DefaultTelemetry.contextFromGrpcThreadLocalContext()
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId))

val response = for {
// Validate and convert the request parameters
Expand Down Expand Up @@ -232,13 +233,15 @@ private[apiserver] object ApiConfigManagementService {
configManagementService: IndexConfigManagementService,
ledgerEnd: LedgerOffset.Absolute,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger)
)(implicit loggingContext: LoggingContext)
extends SynchronousResponse.Strategy[
(Time.Timestamp, Configuration),
ConfigurationEntry,
ConfigurationEntry.Accepted,
] {

private val logger = ContextualizedLogger.get(getClass)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
Future.successful(Some(ledgerEnd))

Expand Down Expand Up @@ -266,7 +269,9 @@ private[apiserver] object ApiConfigManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[ConfigurationEntry, StatusRuntimeException] = {
case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) =>
errorFactories.configurationEntryRejected(reason, None)
errorFactories.configurationEntryRejected(reason, None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Loading