Skip to content

Commit

Permalink
[participant-state] Add earliest_offset metadata to pruned data error…
Browse files Browse the repository at this point in the history
… [kvl-1270] (#12546)
  • Loading branch information
nicu-da authored Jan 31, 2022
1 parent ef18bf4 commit c1afabe
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 76 deletions.
17 changes: 12 additions & 5 deletions docs/source/app-dev/command-deduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ Independently of how the outcome is communicated, command deduplication generate
Neither deduplication durations up to the :ref:`maximum deduplication time <com.daml.ledger.api.v1.LedgerConfiguration.max_deduplication_time>` nor deduplication offsets published within that time SHOULD result in this error.
Participants may accept longer periods at their discretion.

- The gRPC status code ``FAILED_PRECONDITION`` with error code id :ref:`PARTICIPANT_PRUNED_DATA_ACCESSED <error_code_PARTICIPANT_PRUNED_DATA_ACCESSED>`, when specifying a deduplication period represented by an offset, indicates that the specified deduplication offset has been pruned.
The field ``earliest_offset`` in the metadata specifies the last pruned offset.

For deduplication to work as intended, all submissions for the same ledger change must be submitted via the same participant.
Whether a submission is considered a duplicate is determined by completion events, and by default a participant outputs only the completion events for submissions that were requested via the very same participant.
Expand Down Expand Up @@ -229,7 +231,16 @@ Fields in the error metadata are written as ``field`` in lowercase letters.
- Set the deduplication offset to ``earliest_offset`` or the deduplication duration to ``longest_duration`` and retry from :ref:`Step 2 <dedup-bounded-step-offset>`, obtaining the completion offset ``OFF1``.
This may lead to accepting the change twice within the originally intended deduplication period.



- * ``FAILED_PRECONDITION`` / :ref:`PARTICIPANT_PRUNED_DATA_ACCESSED <error_code_PARTICIPANT_PRUNED_DATA_ACCESSED>`

* The specified deduplication offset has been pruned by the participant.
``earliest_offset`` contains the last pruned offset.

Use the :ref:`Command Completion Service <command-completion-service>` by asking for the :ref:`completions <com.daml.ledger.api.v1.CompletionStreamRequest>`, starting from the last pruned offset by setting :ref:`offset <com.daml.ledger.api.v1.CompletionStreamRequest.offset>` to the value of
``earliest_offset``, and use the first received :ref:`offset <com.daml.ledger.api.v1.Checkpoint.offset>` as a deduplication offset.


- * ``ABORTED`` / :ref:`SUBMISSION_ALREADY_IN_FLIGHT <error_code_SUBMISSION_ALREADY_IN_FLIGHT>`

This error occurs only as an RPC response, not inside a completion event.
Expand Down Expand Up @@ -376,7 +387,3 @@ The above strategy can fail in the following scenarios:
..
Command deduplication on the JSON API
*************************************



Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import scala.concurrent.duration._
"Errors raised by or forwarded by the Ledger API."
)
object LedgerApiErrors extends LedgerApiErrorGroup {

val EarliestOffsetMetadataKey = "earliest_offset"

@Explanation(
"""This error occurs when a participant rejects a command due to excessive load.
|Load can be caused by the following factors:
Expand Down Expand Up @@ -456,9 +459,13 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
id = "PARTICIPANT_PRUNED_DATA_ACCESSED",
ErrorCategory.InvalidGivenCurrentSystemStateOther,
) {
case class Reject(override val cause: String)(implicit
case class Reject(override val cause: String, _earliestOffset: String)(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl(cause = cause)
) extends LoggingTransactionErrorImpl(cause = cause) {

override def context: Map[String, String] =
super.context + (EarliestOffsetMetadataKey -> _earliestOffset)
}
}

@Explanation(
Expand Down
1 change: 1 addition & 0 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ da_scala_test_suite(
"//ledger/ledger-api-client",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-offset",
"//ledger/metrics",
"//ledger/metrics:metrics-test-lib",
"//ledger/test-common",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

package com.daml.platform.server.api.validation

import java.sql.{SQLNonTransientException, SQLTransientException}
import java.time.{Duration, Instant}

import com.daml.error.ErrorCode.ApiException
import com.daml.error.definitions.{IndexErrors, LedgerApiErrors}
import com.daml.error.{ContextualizedErrorLogger, ErrorCodesVersionSwitcher}
import com.daml.grpc.GrpcStatus
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.grpc.GrpcStatuses
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref.TransactionId
import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value
Expand All @@ -25,8 +29,6 @@ import io.grpc.protobuf.StatusProto
import io.grpc.{Metadata, StatusRuntimeException}
import scalaz.syntax.tag._

import java.sql.{SQLNonTransientException, SQLTransientException}
import java.time.{Duration, Instant}
import scala.annotation.nowarn

class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitcher) {
Expand Down Expand Up @@ -490,7 +492,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
),
)

def participantPrunedDataAccessed(message: String)(implicit
def participantPrunedDataAccessed(message: String, earliestOffset: Offset)(implicit
contextualizedErrorLogger: ContextualizedErrorLogger
): StatusRuntimeException =
errorCodesVersionSwitcher.choose(
Expand All @@ -502,7 +504,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch
.build()
),
v2 = LedgerApiErrors.RequestValidation.ParticipantPrunedDataAccessed
.Reject(message)
.Reject(message, earliestOffset.toHexString)
.asGrpcError,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.time.Duration
import java.util.regex.Pattern

import ch.qos.logback.classic.Level
import com.daml.error.definitions.LedgerApiErrors
import com.daml.error.definitions.LedgerApiErrors.RequestValidation.InvalidDeduplicationPeriodField.ValidMaxDeduplicationFieldKey
import com.daml.error.utils.ErrorDetails
import com.daml.error.{
Expand All @@ -17,6 +18,7 @@ import com.daml.error.{
ErrorCodesVersionSwitcher,
}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.server.api.validation.ErrorFactories._
Expand Down Expand Up @@ -683,7 +685,12 @@ class ErrorFactoriesSpec

"return a participantPrunedDataAccessed error" in {
val msg = s"PARTICIPANT_PRUNED_DATA_ACCESSED(9,$truncatedCorrelationId): my message"
assertVersionedError(_.participantPrunedDataAccessed("my message"))(
assertVersionedError(
_.participantPrunedDataAccessed(
"my message",
Offset.fromHexString(Ref.HexString.assertFromString("00")),
)
)(
v1_code = Code.NOT_FOUND,
v1_message = "my message",
v1_details = Seq.empty,
Expand All @@ -692,14 +699,18 @@ class ErrorFactoriesSpec
v2_details = Seq[ErrorDetails.ErrorDetail](
ErrorDetails.ErrorInfoDetail(
"PARTICIPANT_PRUNED_DATA_ACCESSED",
Map("category" -> "9", "definite_answer" -> "false"),
Map(
"category" -> "9",
"definite_answer" -> "false",
LedgerApiErrors.EarliestOffsetMetadataKey -> "00",
),
),
expectedCorrelationIdRequestInfo,
),
v2_logEntry = ExpectedLogEntry(
Level.INFO,
msg,
Some(expectedLocationLogMarkerRegex),
expectedMarkerRegex(s"${LedgerApiErrors.EarliestOffsetMetadataKey}=00"),
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ object Assertions {
case Some(value) =>
value
case None =>
fail(s"The error metadata did not contain the key $key. Metadata was: [$metadata]")
fail(
s"The error metadata did not contain the key $key. Metadata was: [$metadata]",
exception,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import java.time.{Clock, Instant}
import com.daml.ledger.api.refinements.ApiTypes.TemplateId
import com.daml.ledger.api.testtool.infrastructure.Eventually.eventually
import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._
import com.daml.ledger.api.testtool.infrastructure.time.{
DelayMechanism,
StaticTimeDelayMechanism,
TimeDelayMechanism,
}
import com.daml.ledger.api.testtool.infrastructure.{
Endpoint,
Identification,
Expand Down Expand Up @@ -150,6 +155,11 @@ private[testtool] final class ParticipantTestContext private[participant] (
val nextKeyId: () => String = nextIdGenerator("key")
val nextUserId: () => String = nextIdGenerator("user", lowerCase = true)

lazy val delayMechanism: DelayMechanism = if (features.staticTime) {
new StaticTimeDelayMechanism(this)
} else
new TimeDelayMechanism()

override def toString: String = s"participant $endpointId"

/** Gets the absolute offset of the ledger end at a point in time. Use [[end]] if you need
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ package com.daml.ledger.api.testtool.infrastructure.time
import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext
import com.daml.timer.Delayed

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}

trait DelayMechanism {
val skews: FiniteDuration
def delayBy(duration: Duration): Future[Unit]
}

class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext)
extends DelayMechanism {
class TimeDelayMechanism()(implicit ec: ExecutionContext) extends DelayMechanism {
override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(())
}

class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit
class StaticTimeDelayMechanism(ledger: ParticipantTestContext)(implicit
ec: ExecutionContext
) extends DelayMechanism {
override def delayBy(duration: Duration): Future[Unit] =
Expand Down
Loading

0 comments on commit c1afabe

Please sign in to comment.