Skip to content

Commit

Permalink
Pass submission id as correlation id to error codes in Ledger API
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da committed Oct 27, 2021
1 parent 46d31d5 commit 4764882
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ trait ContextualizedErrorLogger {
def error(message: String, throwable: Throwable): Unit
}

/** Implementation of [[ContextualizedErrorLogger]] leveraging the //ledger/contextualized-logging as the
* logging tech stack.
*
* @param logger The logger.
* @param loggingContext The logging context.
* @param correlationId The correlation id, if present. The choice of the correlation id depends on the
* ledger integration. By default it should be the command submission id.
*/
class DamlContextualizedErrorLogger(
logger: ContextualizedLogger,
loggingContext: LoggingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
Expand Down Expand Up @@ -37,8 +33,6 @@ class GrpcCommandService(
with ProxyCloseable {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private[this] val validator = new SubmitAndWaitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
Expand All @@ -53,7 +47,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWait(requestWithSubmissionId),
Expand All @@ -70,7 +64,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionId(requestWithSubmissionId),
Expand All @@ -87,7 +81,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransaction(requestWithSubmissionId),
Expand All @@ -104,7 +98,7 @@ class GrpcCommandService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
)
)(contextualizedErrorLogger(requestWithSubmissionId))
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
_ => service.submitAndWaitForTransactionTree(requestWithSubmissionId),
Expand All @@ -123,4 +117,9 @@ class GrpcCommandService(
request
}
}

private def contextualizedErrorLogger(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
) =
new DamlContextualizedErrorLogger(logger, loggingContext, request.commands.map(_.submissionId))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

package com.daml.platform.server.api.services.grpc

import com.daml.error.{
ContextualizedErrorLogger,
DamlContextualizedErrorLogger,
ErrorCodesVersionSwitcher,
}
import com.daml.error.{DamlContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
Expand Down Expand Up @@ -44,12 +40,6 @@ class GrpcCommandSubmissionService(
with GrpcApiService {

protected implicit val logger: ContextualizedLogger = ContextualizedLogger.get(getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
None,
)
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, errorCodesVersionSwitcher),
FieldValidations(ErrorFactories(errorCodesVersionSwitcher)),
Expand All @@ -64,6 +54,11 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val errorLogger = new DamlContextualizedErrorLogger(
logger = logger,
loggingContext = loggingContext,
correlationId = request.commands.map(_.submissionId),
)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
Expand All @@ -75,7 +70,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)(errorLogger),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(request, t)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ private[apiserver] final class ApiCommandService private[services] (
private val logger = ContextualizedLogger.get(this.getClass)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
import errorFactories.serviceNotRunning

@volatile private var running = true

Expand All @@ -82,7 +81,6 @@ private[apiserver] final class ApiCommandService private[services] (
private def submitAndWaitInternal(request: SubmitAndWaitRequest)(implicit
loggingContext: LoggingContext
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] = {
val contextualizedErrorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)
val commands = request.getCommands
withEnrichedLoggingContext(
logging.submissionId(commands.submissionId),
Expand All @@ -96,10 +94,8 @@ private[apiserver] final class ApiCommandService private[services] (
.map(deadline => Duration.ofNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)))
submissionTracker.track(CommandSubmission(commands, timeout))
} else {
Future.failed(
serviceNotRunning(definiteAnswer = Some(false))(contextualizedErrorLogger)
)
}.andThen(logger.logErrorsOnCall[Completion])
handleFailure(request, loggingContext)
}
}
}

Expand Down Expand Up @@ -160,6 +156,22 @@ private[apiserver] final class ApiCommandService private[services] (
},
)
}

private def handleFailure(
request: SubmitAndWaitRequest,
loggingContext: LoggingContext,
): Future[Either[TrackedCompletionFailure, CompletionSuccess]] =
Future
.failed(
errorFactories.serviceNotRunning(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.map(_.submissionId),
)
)
)
.andThen(logger.logErrorsOnCall[Completion](loggingContext))
}

private[apiserver] object ApiCommandService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.daml.error.{
}
import com.daml.error.definitions.{ErrorCauseExport, RejectionGenerators}
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
Expand Down Expand Up @@ -118,6 +118,14 @@ private[apiserver] final class ApiSubmissionService private[services] (
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting transaction")
logger.trace(s"Commands: ${request.commands.commands.commands}")

implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.submissionId.map(SubmissionId.unwrap),
)

val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
Expand All @@ -133,9 +141,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
}
case None =>
Future.failed(
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.missingLedgerConfig(definiteAnswer = Some(false))
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
Expand All @@ -145,7 +151,11 @@ private[apiserver] final class ApiSubmissionService private[services] (
seed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit loggingContext: LoggingContext, telemetryContext: TelemetryContext): Future[Unit] =
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[Unit] =
submissionService
.deduplicateCommand(
commands.commandId,
Expand All @@ -168,9 +178,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
case _: CommandDeduplicationDuplicate =>
metrics.daml.commands.deduplicatedCommands.mark()
Future.failed(
errorFactories.duplicateCommandException(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
errorFactories.duplicateCommandException
)
}

Expand All @@ -195,7 +203,7 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
): Future[CommandExecutionResult] =
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
Expand All @@ -211,6 +219,7 @@ private[apiserver] final class ApiSubmissionService private[services] (
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
Expand Down Expand Up @@ -326,16 +335,12 @@ private[apiserver] final class ApiSubmissionService private[services] (

private def failedOnCommandExecution(
error: ErrorCause
)(implicit loggingContext: LoggingContext): Future[CommandExecutionResult] = {
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
errorCodesVersionSwitcher.chooseAsFailedFuture(
v1 = toStatusExceptionV1(error),
v2 = RejectionGenerators
.commandExecutorError(cause = ErrorCauseExport.fromErrorCause(error)),
)
}

override def close(): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ private[apiserver] final class ApiConfigManagementService private (
) extends ConfigManagementService
with GrpcApiService {
private implicit val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)
private implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)

private val errorFactories = ErrorFactories(errorCodesVersionSwitcher)
private val fieldValidations = FieldValidations(errorFactories)

Expand All @@ -70,7 +67,11 @@ private[apiserver] final class ApiConfigManagementService private (
Future.successful(configurationToResponse(configuration))
case None =>
// TODO error codes: Duplicate of missingLedgerConfig
Future.failed(missingLedgerConfigUponRequest)
Future.failed(
missingLedgerConfigUponRequest(
new DamlContextualizedErrorLogger(logger, loggingContext, None)
)
)
}
.andThen(logger.logErrorsOnCall[GetTimeModelResponse])
}
Expand All @@ -97,7 +98,7 @@ private[apiserver] final class ApiConfigManagementService private (
implicit val telemetryContext: TelemetryContext =
DefaultTelemetry.contextFromGrpcThreadLocalContext()
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(logger, loggingContext, None)
new DamlContextualizedErrorLogger(logger, loggingContext, Some(request.submissionId))

val response = for {
// Validate and convert the request parameters
Expand Down Expand Up @@ -232,13 +233,15 @@ private[apiserver] object ApiConfigManagementService {
configManagementService: IndexConfigManagementService,
ledgerEnd: LedgerOffset.Absolute,
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext, contextualizedErrorLogger: ContextualizedErrorLogger)
)(implicit loggingContext: LoggingContext)
extends SynchronousResponse.Strategy[
(Time.Timestamp, Configuration),
ConfigurationEntry,
ConfigurationEntry.Accepted,
] {

private val logger = ContextualizedLogger.get(getClass)

override def currentLedgerEnd(): Future[Option[LedgerOffset.Absolute]] =
Future.successful(Some(ledgerEnd))

Expand Down Expand Up @@ -266,7 +269,9 @@ private[apiserver] object ApiConfigManagementService {
submissionId: Ref.SubmissionId
): PartialFunction[ConfigurationEntry, StatusRuntimeException] = {
case domain.ConfigurationEntry.Rejected(`submissionId`, reason, _) =>
errorFactories.configurationEntryRejected(reason, None)
errorFactories.configurationEntryRejected(reason, None)(
new DamlContextualizedErrorLogger(logger, loggingContext, Some(submissionId))
)
}
}

Expand Down
Loading

0 comments on commit 4764882

Please sign in to comment.