diff --git a/docs/source/app-dev/command-deduplication.rst b/docs/source/app-dev/command-deduplication.rst index 0fe359c76925..ee527a0a5c0c 100644 --- a/docs/source/app-dev/command-deduplication.rst +++ b/docs/source/app-dev/command-deduplication.rst @@ -85,6 +85,8 @@ Independently of how the outcome is communicated, command deduplication generate Neither deduplication durations up to the :ref:`maximum deduplication time ` nor deduplication offsets published within that time SHOULD result in this error. Participants may accept longer periods at their discretion. +- The gRPC status code ``FAILED_PRECONDITION`` with error code id :ref:`PARTICIPANT_PRUNED_DATA_ACCESSED `, when specifying a deduplication period represented by an offset, indicates that the specified deduplication offset has been pruned. + The field ``earliest_offset`` in the metadata specifies the last pruned offset. For deduplication to work as intended, all submissions for the same ledger change must be submitted via the same participant. Whether a submission is considered a duplicate is determined by completion events, and by default a participant outputs only the completion events for submissions that were requested via the very same participant. @@ -229,7 +231,16 @@ Fields in the error metadata are written as ``field`` in lowercase letters. - Set the deduplication offset to ``earliest_offset`` or the deduplication duration to ``longest_duration`` and retry from :ref:`Step 2 `, obtaining the completion offset ``OFF1``. This may lead to accepting the change twice within the originally intended deduplication period. - + + - * ``FAILED_PRECONDITION`` / :ref:`PARTICIPANT_PRUNED_DATA_ACCESSED ` + + * The specified deduplication offset has been pruned by the participant. + ``earliest_offset`` contains the last pruned offset. + + Use the :ref:`Command Completion Service ` by asking for the :ref:`completions `, starting from the last pruned offset by setting :ref:`offset ` to the value of + ``earliest_offset``, and use the first received :ref:`offset ` as a deduplication offset. + + - * ``ABORTED`` / :ref:`SUBMISSION_ALREADY_IN_FLIGHT ` This error occurs only as an RPC response, not inside a completion event. @@ -376,7 +387,3 @@ The above strategy can fail in the following scenarios: .. Command deduplication on the JSON API ************************************* - - - - diff --git a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala index 9f212217fe99..17a6ccc2c23c 100644 --- a/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala +++ b/ledger/error/src/main/scala/com/daml/error/definitions/LedgerApiErrors.scala @@ -24,6 +24,9 @@ import scala.concurrent.duration._ "Errors raised by or forwarded by the Ledger API." ) object LedgerApiErrors extends LedgerApiErrorGroup { + + val EarliestOffsetMetadataKey = "earliest_offset" + @Explanation( """This error occurs when a participant rejects a command due to excessive load. |Load can be caused by the following factors: @@ -456,9 +459,13 @@ object LedgerApiErrors extends LedgerApiErrorGroup { id = "PARTICIPANT_PRUNED_DATA_ACCESSED", ErrorCategory.InvalidGivenCurrentSystemStateOther, ) { - case class Reject(override val cause: String)(implicit + case class Reject(override val cause: String, _earliestOffset: String)(implicit loggingContext: ContextualizedErrorLogger - ) extends LoggingTransactionErrorImpl(cause = cause) + ) extends LoggingTransactionErrorImpl(cause = cause) { + + override def context: Map[String, String] = + super.context + (EarliestOffsetMetadataKey -> _earliestOffset) + } } @Explanation( diff --git a/ledger/ledger-api-common/BUILD.bazel b/ledger/ledger-api-common/BUILD.bazel index 4d2c0c7b43fc..1a876619b8cd 100644 --- a/ledger/ledger-api-common/BUILD.bazel +++ b/ledger/ledger-api-common/BUILD.bazel @@ -127,6 +127,7 @@ da_scala_test_suite( "//ledger/ledger-api-client", "//ledger/ledger-api-domain", "//ledger/ledger-api-health", + "//ledger/ledger-offset", "//ledger/metrics", "//ledger/metrics:metrics-test-lib", "//ledger/test-common", diff --git a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala index 8cb307bc6943..45eef1f4c87f 100644 --- a/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala +++ b/ledger/ledger-api-common/src/main/scala/com/digitalasset/platform/server/api/validation/ErrorFactories.scala @@ -3,12 +3,16 @@ package com.daml.platform.server.api.validation +import java.sql.{SQLNonTransientException, SQLTransientException} +import java.time.{Duration, Instant} + import com.daml.error.ErrorCode.ApiException import com.daml.error.definitions.{IndexErrors, LedgerApiErrors} import com.daml.error.{ContextualizedErrorLogger, ErrorCodesVersionSwitcher} import com.daml.grpc.GrpcStatus import com.daml.ledger.api.domain.LedgerId import com.daml.ledger.grpc.GrpcStatuses +import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref.TransactionId import com.daml.lf.transaction.GlobalKey import com.daml.lf.value.Value @@ -25,8 +29,6 @@ import io.grpc.protobuf.StatusProto import io.grpc.{Metadata, StatusRuntimeException} import scalaz.syntax.tag._ -import java.sql.{SQLNonTransientException, SQLTransientException} -import java.time.{Duration, Instant} import scala.annotation.nowarn class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitcher) { @@ -490,7 +492,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch ), ) - def participantPrunedDataAccessed(message: String)(implicit + def participantPrunedDataAccessed(message: String, earliestOffset: Offset)(implicit contextualizedErrorLogger: ContextualizedErrorLogger ): StatusRuntimeException = errorCodesVersionSwitcher.choose( @@ -502,7 +504,7 @@ class ErrorFactories private (errorCodesVersionSwitcher: ErrorCodesVersionSwitch .build() ), v2 = LedgerApiErrors.RequestValidation.ParticipantPrunedDataAccessed - .Reject(message) + .Reject(message, earliestOffset.toHexString) .asGrpcError, ) diff --git a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala index eef8a800cf1a..02f648ecf512 100644 --- a/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala +++ b/ledger/ledger-api-common/src/test/suite/scala/com/digitalasset/platform/server/api/validation/ErrorFactoriesSpec.scala @@ -8,6 +8,7 @@ import java.time.Duration import java.util.regex.Pattern import ch.qos.logback.classic.Level +import com.daml.error.definitions.LedgerApiErrors import com.daml.error.definitions.LedgerApiErrors.RequestValidation.InvalidDeduplicationPeriodField.ValidMaxDeduplicationFieldKey import com.daml.error.utils.ErrorDetails import com.daml.error.{ @@ -17,6 +18,7 @@ import com.daml.error.{ ErrorCodesVersionSwitcher, } import com.daml.ledger.api.domain.LedgerId +import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.platform.server.api.validation.ErrorFactories._ @@ -683,7 +685,12 @@ class ErrorFactoriesSpec "return a participantPrunedDataAccessed error" in { val msg = s"PARTICIPANT_PRUNED_DATA_ACCESSED(9,$truncatedCorrelationId): my message" - assertVersionedError(_.participantPrunedDataAccessed("my message"))( + assertVersionedError( + _.participantPrunedDataAccessed( + "my message", + Offset.fromHexString(Ref.HexString.assertFromString("00")), + ) + )( v1_code = Code.NOT_FOUND, v1_message = "my message", v1_details = Seq.empty, @@ -692,14 +699,18 @@ class ErrorFactoriesSpec v2_details = Seq[ErrorDetails.ErrorDetail]( ErrorDetails.ErrorInfoDetail( "PARTICIPANT_PRUNED_DATA_ACCESSED", - Map("category" -> "9", "definite_answer" -> "false"), + Map( + "category" -> "9", + "definite_answer" -> "false", + LedgerApiErrors.EarliestOffsetMetadataKey -> "00", + ), ), expectedCorrelationIdRequestInfo, ), v2_logEntry = ExpectedLogEntry( Level.INFO, msg, - Some(expectedLocationLogMarkerRegex), + expectedMarkerRegex(s"${LedgerApiErrors.EarliestOffsetMetadataKey}=00"), ), ) } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala index b09d75b2ff48..e81cf37cd13b 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/Assertions.scala @@ -153,7 +153,10 @@ object Assertions { case Some(value) => value case None => - fail(s"The error metadata did not contain the key $key. Metadata was: [$metadata]") + fail( + s"The error metadata did not contain the key $key. Metadata was: [$metadata]", + exception, + ) } } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala index 97903d51a614..ab3aef20aff6 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/participant/ParticipantTestContext.scala @@ -8,6 +8,11 @@ import java.time.{Clock, Instant} import com.daml.ledger.api.refinements.ApiTypes.TemplateId import com.daml.ledger.api.testtool.infrastructure.Eventually.eventually import com.daml.ledger.api.testtool.infrastructure.ProtobufConverters._ +import com.daml.ledger.api.testtool.infrastructure.time.{ + DelayMechanism, + StaticTimeDelayMechanism, + TimeDelayMechanism, +} import com.daml.ledger.api.testtool.infrastructure.{ Endpoint, Identification, @@ -150,6 +155,11 @@ private[testtool] final class ParticipantTestContext private[participant] ( val nextKeyId: () => String = nextIdGenerator("key") val nextUserId: () => String = nextIdGenerator("user", lowerCase = true) + lazy val delayMechanism: DelayMechanism = if (features.staticTime) { + new StaticTimeDelayMechanism(this) + } else + new TimeDelayMechanism() + override def toString: String = s"participant $endpointId" /** Gets the absolute offset of the ledger end at a point in time. Use [[end]] if you need diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala index 8ab8d89b2a19..b124c916dfd4 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/infrastructure/time/DelayMechanism.scala @@ -6,20 +6,18 @@ package com.daml.ledger.api.testtool.infrastructure.time import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.timer.Delayed +import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.{Duration, FiniteDuration} trait DelayMechanism { - val skews: FiniteDuration def delayBy(duration: Duration): Future[Unit] } -class TimeDelayMechanism(val skews: FiniteDuration)(implicit ec: ExecutionContext) - extends DelayMechanism { +class TimeDelayMechanism()(implicit ec: ExecutionContext) extends DelayMechanism { override def delayBy(duration: Duration): Future[Unit] = Delayed.by(duration)(()) } -class StaticTimeDelayMechanism(ledger: ParticipantTestContext, val skews: FiniteDuration)(implicit +class StaticTimeDelayMechanism(ledger: ParticipantTestContext)(implicit ec: ExecutionContext ) extends DelayMechanism { override def delayBy(duration: Duration): Future[Unit] = diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala index aae18c346df8..cbf088674e72 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationIT.scala @@ -20,14 +20,9 @@ import com.daml.ledger.api.testtool.infrastructure.assertions.CommandDeduplicati } import com.daml.ledger.api.testtool.infrastructure.participant.{ CompletionResponse, - Features, ParticipantTestContext, } -import com.daml.ledger.api.testtool.infrastructure.time.{ - DelayMechanism, - StaticTimeDelayMechanism, - TimeDelayMechanism, -} +import com.daml.ledger.api.testtool.infrastructure.time.DelayMechanism import com.daml.ledger.api.v1.admin.config_management_service.TimeModel import com.daml.ledger.api.v1.command_service.SubmitAndWaitRequest import com.daml.ledger.api.v1.command_submission_service.SubmitRequest @@ -224,14 +219,19 @@ final class CommandDeduplicationIT( currentElement.flatMap(value => generateVariations(tail).map(value :: _)) } - runWithTimeModel(configuredParticipants) { delay => + runWithTimeModel(configuredParticipants) { minMaxSkewSum => val numberOfCalls = 4 // cover all the different generated variations of submit and submitAndWait val allGeneratedVariations = generateVariations(List.fill(numberOfCalls)(List(true, false))).zip(parties) forAllParallel(allGeneratedVariations) { case (firstCall :: secondCall :: thirdCall :: fourthCall :: Nil, party) => - mixedClientsCommandDeduplicationTestCase(ledger, party, delay)( + mixedClientsCommandDeduplicationTestCase( + ledger, + party, + ledger.delayMechanism, + minMaxSkewSum, + )( firstCall, secondCall, thirdCall, @@ -248,6 +248,7 @@ final class CommandDeduplicationIT( ledger: ParticipantTestContext, party: Party, delay: DelayMechanism, + skews: FiniteDuration, )(firstCall: Boolean, secondCall: Boolean, thirdCall: Boolean, fourthCall: Boolean)(implicit ec: ExecutionContext ) = { @@ -313,10 +314,10 @@ final class CommandDeduplicationIT( deduplicationDurationFromPeriod = extractDurationFromDeduplicationPeriod( deduplicationCompletionResponse = duplicateResponse, defaultDuration = deduplicationDuration, - delayMechanism = delay, + skews = skews, ) eventuallyAccepted <- succeedsEventually( - maxRetryDuration = deduplicationDurationFromPeriod + delay.skews + 10.seconds, + maxRetryDuration = deduplicationDurationFromPeriod + skews + 10.seconds, description = s"Deduplication period expires and request is accepted for command ${submitRequest.getCommands}.", delayMechanism = delay, @@ -468,7 +469,7 @@ final class CommandDeduplicationIT( DeduplicationPeriod.DeduplicationDuration(deduplicationDuration.asProtobuf) ) val firstAcceptedSubmissionId = newSubmissionId() - runWithTimeModel(configuredParticipants) { delay => + runWithTimeModel(configuredParticipants) { minMaxSkewSum => for { completionResponse <- submitRequestAndAssertCompletionAccepted( ledger, @@ -485,13 +486,13 @@ final class CommandDeduplicationIT( deduplicationDurationFromPeriod = extractDurationFromDeduplicationPeriod( deduplicationCompletionResponse = optDeduplicationCompletionResponse, defaultDuration = deduplicationDuration, - delayMechanism = delay, + skews = minMaxSkewSum, ) eventuallyAcceptedCompletionResponse <- succeedsEventually( - maxRetryDuration = deduplicationDurationFromPeriod + delay.skews + 10.seconds, + maxRetryDuration = deduplicationDurationFromPeriod + minMaxSkewSum + 10.seconds, description = s"The deduplication period expires and the request is accepted for the commands ${request.getCommands}.", - delayMechanism = delay, + delayMechanism = ledger.delayMechanism, ) { submitRequestAndAssertCompletionAccepted( ledger, @@ -543,7 +544,7 @@ final class CommandDeduplicationIT( val request = ledger .submitRequest(party, DummyWithAnnotation(party, "Duplicate command").create.command) val acceptedSubmissionId = newSubmissionId() - runWithTimeModel(configuredParticipants) { delay => + runWithTimeModel(configuredParticipants) { _ => val dummyRequest = ledger.submitRequest( party, DummyWithAnnotation(party, "Dummy command to generate a completion offset").create.command, @@ -565,7 +566,7 @@ final class CommandDeduplicationIT( ) // Wait for any ledgers that might adjust based on time skews // This is done so that we can validate that the third command is accepted - _ <- delayForOffsetIfRequired(ledger, delay, ledger.features) + _ <- delayForOffsetIfRequired(ledger) // Submit command again using the first offset as the deduplication offset response2 <- submitRequestAndAssertAsyncDeduplication( ledger, @@ -614,11 +615,9 @@ final class CommandDeduplicationIT( ) private def delayForOffsetIfRequired( - participantTestContext: ParticipantTestContext, - delayMechanism: DelayMechanism, - features: Features, + ledger: ParticipantTestContext )(implicit ec: ExecutionContext): Future[Unit] = - features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport match { + ledger.features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport match { case OffsetSupport.OFFSET_NATIVE_SUPPORT => Future.unit case OffsetSupport.OFFSET_CONVERT_TO_DURATION => @@ -627,10 +626,10 @@ final class CommandDeduplicationIT( // // the duration is extended with up to minSkew + maxSkew when using pre-execution, // as we use maxRecordTime and minRecordTime to calculate the interval between the two commands - participantTestContext + ledger .getTimeModel() .flatMap(response => { - delayMechanism.delayBy( + ledger.delayMechanism.delayBy( response.getTimeModel.getMaxSkew.asScala + 2 * response.getTimeModel.getMinSkew.asScala ) @@ -961,7 +960,7 @@ final class CommandDeduplicationIT( private def newSubmissionId(): SubmissionId = SubmissionIdGenerator.Random.generate() private def runWithTimeModel(participants: Seq[ParticipantTestContext])( - testWithDelayMechanism: DelayMechanism => Future[Unit] + testWithDelayMechanism: FiniteDuration => Future[Unit] )(implicit ec: ExecutionContext): Future[Unit] = { // deduplication duration is adjusted by min skew and max skew when running using pre-execution // to account for this we adjust the time model @@ -970,19 +969,9 @@ final class CommandDeduplicationIT( participants, _.update(_.minSkew := skew, _.maxSkew := skew), ) { timeModel => - val anyParticipant = participants.head - val skews = asFiniteDuration(timeModel.getMinSkew.asScala + timeModel.getMaxSkew.asScala) - testWithDelayMechanism(delayMechanism(anyParticipant, skews)) - } - } - - private def delayMechanism(ledger: ParticipantTestContext, skews: FiniteDuration)(implicit - ec: ExecutionContext - ) = { - if (staticTime) { - new StaticTimeDelayMechanism(ledger, skews) - } else { - new TimeDelayMechanism(skews) + val minMaxSkewSum = + asFiniteDuration(timeModel.getMinSkew.asScala + timeModel.getMaxSkew.asScala) + testWithDelayMechanism(minMaxSkewSum) } } @@ -1062,7 +1051,7 @@ final class CommandDeduplicationIT( private def extractDurationFromDeduplicationPeriod( deduplicationCompletionResponse: Option[CompletionResponse], defaultDuration: FiniteDuration, - delayMechanism: DelayMechanism, + skews: FiniteDuration, ): FiniteDuration = deduplicationCompletionResponse .map(_.completion.deduplicationPeriod) @@ -1074,7 +1063,7 @@ final class CommandDeduplicationIT( case CompletionDeduplicationPeriod.DeduplicationDuration(value) => value.asScala } - .getOrElse(defaultDuration + delayMechanism.skews) + .getOrElse(defaultDuration + skews) .asInstanceOf[FiniteDuration] } diff --git a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationPeriodValidationIT.scala b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationPeriodValidationIT.scala index a5042a22bd0f..07bd8a195c98 100644 --- a/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationPeriodValidationIT.scala +++ b/ledger/ledger-api-test-tool/src/main/scala/com/daml/ledger/api/testtool/suites/CommandDeduplicationPeriodValidationIT.scala @@ -14,6 +14,7 @@ import com.daml.ledger.api.testtool.infrastructure.Assertions._ import com.daml.ledger.api.testtool.infrastructure.LedgerTestSuite import com.daml.ledger.api.testtool.infrastructure.participant.ParticipantTestContext import com.daml.ledger.api.v1.commands.Commands.DeduplicationPeriod +import com.daml.ledger.api.v1.ledger_offset.LedgerOffset import com.daml.ledger.client.binding.Primitive import com.daml.ledger.test.model.Test.Dummy import com.daml.lf.data.Ref @@ -47,7 +48,7 @@ class CommandDeduplicationPeriodValidationIT extends LedgerTestSuite { val deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration( DurationConversion.toProto(Duration.ofSeconds(-1)) ) - assertFailedRequest( + assertSyncFailedRequest( ledger, party, deduplicationPeriod, @@ -67,7 +68,7 @@ class CommandDeduplicationPeriodValidationIT extends LedgerTestSuite { val deduplicationPeriod = DeduplicationPeriod.DeduplicationOffset( "invalid_offset" ) - assertFailedRequest( + assertSyncFailedRequest( ledger, party, deduplicationPeriod, @@ -139,7 +140,7 @@ class CommandDeduplicationPeriodValidationIT extends LedgerTestSuite { deduplicationPeriod = DeduplicationPeriod.DeduplicationOffset( Ref.HexString.assertFromString(end.getAbsolute.toLowerCase) ) - _ <- assertFailedRequest( + _ <- assertSyncFailedRequest( ledger, party, deduplicationPeriod, @@ -161,30 +162,85 @@ class CommandDeduplicationPeriodValidationIT extends LedgerTestSuite { disabledReason = "The ledger does not support deduplication periods represented by offsets", runConcurrently = false, // Pruning is involved )(implicit ec => { case Participants(Participant(ledger, party)) => + def submitAndWaitWithDeduplication( + deduplicationPeriod: DeduplicationPeriod.DeduplicationOffset + ) = { + ledger + .submitAndWait( + ledger + .submitAndWaitRequest(party, Dummy(party).create.command) + .update( + _.commands.deduplicationPeriod := deduplicationPeriod + ) + ) + } + val isOffsetNativelySupported = + ledger.features.commandDeduplicationFeatures.getDeduplicationPeriodSupport.offsetSupport.isOffsetNativeSupport for { + start <- ledger.currentEnd() firstCreate <- ledger.create(party, Dummy(party)) - firstExercise <- ledger.exercise(party, firstCreate.exerciseDummyChoice1) - end <- ledger.currentEnd() + _ <- ledger.exercise(party, firstCreate.exerciseDummyChoice1) secondCreate <- ledger.create(party, Dummy(party)) + _ <- ledger.submitAndWait(ledger.submitAndWaitRequest(party, Dummy(party).create.command)) + end <- ledger.currentEnd() _ <- ledger.exercise(party, secondCreate.exerciseDummyChoice1) _ <- ledger.create(party, Dummy(party)) // move ledger end + _ <- ledger.submitAndWait(ledger.submitAndWaitRequest(party, Dummy(party).create.command)) _ <- ledger.prune(pruneUpTo = end) - deduplicationPeriod = DeduplicationPeriod.DeduplicationOffset( - Ref.HexString.assertFromString(firstExercise.offset) - ) - _ <- assertFailedRequest( + failure <- submitAndWaitWithDeduplication( + DeduplicationPeriod.DeduplicationOffset( + start.getAbsolute + ) + ).mustFail("using an offset which was pruned") + _ = assertGrpcErrorRegex( ledger, - party, - deduplicationPeriod, - failReason = "Submitting a command with a pruned offset", - expectedMessage = ".*", - expectedCode = Status.Code.INVALID_ARGUMENT, - expectedError = LedgerApiErrors.RequestValidation.ParticipantPrunedDataAccessed, + failure, + expectedCode = Status.Code.FAILED_PRECONDITION, + // Canton returns INVALID_DEDUPLICATION_PERIOD with earliest_offset metadata + // KV returns PARTICIPANT_PRUNED_DATA_ACCESSED with earliest_offset metadata + selfServiceErrorCode = + if (isOffsetNativelySupported) + LedgerApiErrors.RequestValidation.InvalidDeduplicationPeriodField + else LedgerApiErrors.RequestValidation.ParticipantPrunedDataAccessed, + None, + ) + earliestOffset = extractErrorInfoMetadataValue( + failure, + LedgerApiErrors.EarliestOffsetMetadataKey, ) + _ <- + // Because KV treats deduplication offsets as inclusive, and because the participant pruning offset is inclusive + // we cannot simply use the received offset as a deduplication period, but we have to find the first completion after the given offset + if (isOffsetNativelySupported) { + submitAndWaitWithDeduplication( + DeduplicationPeriod.DeduplicationOffset(earliestOffset) + ) + } else { + findFirstOffsetAfterGivenOffset(ledger, earliestOffset)(party).flatMap(offset => + submitAndWaitWithDeduplication(DeduplicationPeriod.DeduplicationOffset(offset)) + ) + } } yield {} }) - private def assertFailedRequest( + private def findFirstOffsetAfterGivenOffset(ledger: ParticipantTestContext, offset: String)( + party: Primitive.Party + )(implicit ec: ExecutionContext) = { + ledger + .findCompletion( + ledger.completionStreamRequest( + LedgerOffset.of(LedgerOffset.Value.Absolute(offset)) + )(party) + )(_ => true) + .map { completionOpt => + { + val completion = assertDefined(completionOpt, "No completion found") + completion.offset.getAbsolute + } + } + } + + private def assertSyncFailedRequest( ledger: ParticipantTestContext, party: Primitive.Party, deduplicationPeriod: DeduplicationPeriod, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/QueryNonPruned.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/QueryNonPruned.scala index 0b7a9c9fa412..d719cc7dbf97 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/QueryNonPruned.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/QueryNonPruned.scala @@ -59,8 +59,9 @@ case class QueryNonPrunedImpl( result case Some(pruningOffsetUpToInclusive) => - throw errorFactories.participantPrunedDataAccessed(message = - error(pruningOffsetUpToInclusive) + throw errorFactories.participantPrunedDataAccessed( + message = error(pruningOffsetUpToInclusive), + pruningOffsetUpToInclusive, )( new DamlContextualizedErrorLogger(logger, loggingContext, None) )