Skip to content

Commit

Permalink
participant-integration-api: Switch to the v2 participant state API. …
Browse files Browse the repository at this point in the history
…[KVL-1002] (#10398)

* participant-state: Give `ChangeId` its own file.

* participant-state: Introduce `v1.CompletionInfo` for a while.

`CompletionInfo` is identical to `SubmitterInfo`. Its purpose is to make
the migration to v2 easier. It should not make it into the final
version.

* participant-integration-api: Switch to the v2 participant state API.

This means that:

  - the API server and indexer expect v2 API traits
    - adapters are provided if you need to elevate your v1 API usage
  - the indexer internally uses v2 Updates
  - rejections are switched over to use the v2 format
  - Sandbox Classic uses v2 as the internal representation too (because
    it interacts directly with the underlying indexer representation,
    and is therefore tightly coupled)

_kvutils_ and other users of the `StandaloneApiServer` and
`StandaloneIndexerServer` use the adapters.

CHANGELOG_BEGIN
- [Integration Kit] The API server and indexer have switched over to v2
  of the participant-state API. You can continue to use the v1 API, but
  you will need to wrap your ``ReadService`` and ``WriteService``
  objects in the ``AdaptedV1ReadService`` and ``AdaptedV1WriteSerivce``
  classes.
CHANGELOG_END

* participant-state: Remove v1.CompletionInfo.

It's served its purpose.

* kvutils: Remove an unnecessary line from `Runner`.

* ledger-api-common: Delete a TODO; we'll track it elsewhere.

* participant-integration-api: Use full words in `JdbcLedgerDao`.

Just a little bit of cleanup.

* ledger-api-common: Extract out the random submission ID generator.

And introduce a trait, because, well, this is still the JVM.
  • Loading branch information
SamirTalwar authored Jul 28, 2021
1 parent 8cf3076 commit d666f76
Show file tree
Hide file tree
Showing 94 changed files with 1,013 additions and 694 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.configuration.{Configuration, LedgerInitialConditions, LedgerTimeModel}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.v1.{ReadService, Update}
import com.daml.ledger.participant.state.v2.AdaptedV1ReadService
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.data.Time
import com.daml.logging.LoggingContext.newLoggingContext
Expand Down Expand Up @@ -67,7 +68,7 @@ class IndexerBenchmark() {
val updates = Await.result(createUpdates(config), Duration(10, "minute"))

println("Creating read service and indexer...")
val readService = createReadService(updates)
val readService = new AdaptedV1ReadService(createReadService(updates))
val indexerFactory = new JdbcIndexer.Factory(
ServerRole.Indexer,
config.indexerConfig,
Expand All @@ -87,7 +88,7 @@ class IndexerBenchmark() {

_ = println("Setting up the index database...")
indexer <- Await
.result(indexerFactory.migrateSchema(false), Duration(5, "minute"))
.result(indexerFactory.migrateSchema(allowExistingSchema = false), Duration(5, "minute"))
.acquire()

_ = println("Starting the indexing...")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.api

import java.util.UUID

import com.daml.lf.data.Ref
import com.daml.lf.data.Ref.SubmissionId

trait SubmissionIdGenerator {
def generate(): Ref.SubmissionId
}

object SubmissionIdGenerator {
object Random extends SubmissionIdGenerator {
override def generate(): SubmissionId =
Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package com.daml.ledger.api.validation
import java.time.{Duration, Instant}

import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.lf.command._
import com.daml.lf.data._
import com.daml.ledger.api.domain
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.commands.Command.Command.{
Create => ProtoCreate,
CreateAndExercise => ProtoCreateAndExercise,
Expand All @@ -17,17 +15,19 @@ import com.daml.ledger.api.v1.commands.Command.Command.{
ExerciseByKey => ProtoExerciseByKey,
}
import com.daml.ledger.api.v1.commands.{Command => ProtoCommand, Commands => ProtoCommands}
import com.daml.ledger.api.{SubmissionIdGenerator, domain}
import com.daml.lf.command._
import com.daml.lf.data._
import com.daml.lf.value.{Value => Lf}
import com.daml.ledger.api.domain.LedgerId
import com.daml.platform.server.api.validation.ErrorFactories._
import com.daml.platform.server.api.validation.FieldValidations.{requirePresence, _}
import io.grpc.StatusRuntimeException
import scalaz.syntax.tag._

import scala.collection.immutable
import scala.Ordering.Implicits.infixOrderingOps
import scala.collection.immutable

final class CommandsValidator(ledgerId: LedgerId) {
final class CommandsValidator(ledgerId: LedgerId, submissionIdGenerator: SubmissionIdGenerator) {

import ValueValidator._

Expand All @@ -46,6 +46,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
appId <- requireLedgerString(commands.applicationId, "application_id")
.map(domain.ApplicationId(_))
commandId <- requireLedgerString(commands.commandId, "command_id").map(domain.CommandId(_))
submissionId = domain.SubmissionId(submissionIdGenerator.generate())
submitters <- CommandsValidator.validateSubmitters(commands)
commandz <- requireNonEmpty(commands.commands, "commands")
validatedCommands <- validateInnerCommands(commandz)
Expand All @@ -68,6 +69,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
workflowId = workflowId,
applicationId = appId,
commandId = commandId,
submissionId = submissionId,
actAs = submitters.actAs,
readAs = submitters.readAs,
submittedAt = currentUtcTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.platform.server.api.services.grpc

import java.time.{Duration, Instant}

import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_service.CommandServiceGrpc.CommandService
import com.daml.ledger.api.v1.command_service._
Expand All @@ -23,6 +24,7 @@ class GrpcCommandService(
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Option[Duration],
generateSubmissionId: SubmissionIdGenerator,
)(implicit executionContext: ExecutionContext)
extends CommandService
with GrpcApiService
Expand All @@ -31,7 +33,7 @@ class GrpcCommandService(
protected implicit val logger: Logger = LoggerFactory.getLogger(service.getClass)

private[this] val validator =
new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId))
new SubmitAndWaitRequestValidator(new CommandsValidator(ledgerId, generateSubmissionId))

override def submitAndWait(request: SubmitAndWaitRequest): Future[Empty] =
validator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.platform.server.api.services.grpc

import java.time.{Duration, Instant}

import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
Expand All @@ -16,8 +17,8 @@ import com.daml.ledger.api.v1.command_submission_service.{
import com.daml.ledger.api.validation.{CommandsValidator, SubmitRequestValidator}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.server.api.{ProxyCloseable, ValidationLogger}
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.platform.server.api.{ProxyCloseable, ValidationLogger}
import com.daml.telemetry.{DefaultTelemetry, SpanAttribute, TelemetryContext}
import com.google.protobuf.empty.Empty
import io.grpc.ServerServiceDefinition
Expand All @@ -31,6 +32,7 @@ class GrpcCommandSubmissionService(
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Option[Duration],
submissionIdGenerator: SubmissionIdGenerator,
metrics: Metrics,
)(implicit executionContext: ExecutionContext)
extends ApiCommandSubmissionService
Expand All @@ -39,7 +41,9 @@ class GrpcCommandSubmissionService(

protected implicit val logger: Logger = LoggerFactory.getLogger(service.getClass)

private val validator = new SubmitRequestValidator(new CommandsValidator(ledgerId))
private val validator = new SubmitRequestValidator(
new CommandsValidator(ledgerId, submissionIdGenerator)
)

override def submit(request: ApiSubmitRequest): Future[Empty] = {
implicit val telemetryContext: TelemetryContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,15 @@ trait ErrorFactories {
def invalidArgument(errorMsg: String): StatusRuntimeException =
grpcError(Status.INVALID_ARGUMENT.withDescription(s"Invalid argument: $errorMsg"))

def invalidField(fieldName: String, message: String) =
def invalidField(fieldName: String, message: String): StatusRuntimeException =
grpcError(Status.INVALID_ARGUMENT.withDescription(s"Invalid field $fieldName: $message"))

def outOfRange(description: String): StatusRuntimeException =
grpcError(Status.OUT_OF_RANGE.withDescription(description))

def notFound(target: String): StatusRuntimeException =
grpcError(Status.NOT_FOUND.withDescription(s"$target not found."))

def internal(description: String): StatusRuntimeException =
grpcError(Status.INTERNAL.withDescription(description))

def aborted(description: String): StatusRuntimeException =
grpcError(Status.ABORTED.withDescription(description))

def unimplemented(description: String): StatusRuntimeException =
grpcError(Status.UNIMPLEMENTED.withDescription(description))

// permission denied is intentionally without description to ensure we don't leak security relevant information by accident
def permissionDenied(): StatusRuntimeException =
grpcError(Status.PERMISSION_DENIED)
Expand All @@ -52,13 +43,11 @@ trait ErrorFactories {
def missingLedgerConfig(): StatusRuntimeException =
grpcError(Status.UNAVAILABLE.withDescription("The ledger configuration is not available."))

def resourceExhausted(description: String): StatusRuntimeException =
grpcError(Status.RESOURCE_EXHAUSTED.withDescription(description))

def participantPrunedDataAccessed(message: String): StatusRuntimeException =
grpcError(Status.NOT_FOUND.withDescription(message))

def grpcError(status: Status) = new ApiException(status)
def grpcError(status: Status): StatusRuntimeException =
new ApiException(status)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ object DomainMocks {

val commandId = CommandId(Ref.LedgerString.assertFromString("commandId"))

val submissionId = SubmissionId(Ref.LedgerString.assertFromString("submissionId"))

val transactionId = TransactionId(Ref.LedgerString.assertFromString("deadbeef"))

val applicationId = ApplicationId(Ref.LedgerString.assertFromString("applicationId"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package com.daml.ledger.api.validation

import java.time.Instant
import java.util.UUID

import com.daml.api.util.{DurationConversion, TimestampConversion}
import com.daml.ledger.api.DomainMocks
import com.daml.ledger.api.DomainMocks.{applicationId, commandId, workflowId}
import com.daml.ledger.api.{DomainMocks, SubmissionIdGenerator}
import com.daml.ledger.api.DomainMocks.{applicationId, commandId, submissionId, workflowId}
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.v1.commands.{Command, Commands, CreateCommand}
import com.daml.ledger.api.v1.value.Value.Sum
Expand Down Expand Up @@ -35,9 +36,6 @@ class SubmitRequestValidatorTest
val int64 = Sum.Int64(1)
val label = "label"
val constructor = "constructor"
val workflowId = "workflowId"
val applicationId = "applicationId"
val commandId = "commandId"
val submitter = "party"
val deduplicationTime = new Duration().withSeconds(10)
val command = Command.of(
Expand All @@ -56,9 +54,9 @@ class SubmitRequestValidatorTest

val commands = Commands(
ledgerId = ledgerId.unwrap,
workflowId = workflowId,
applicationId = applicationId,
commandId = commandId,
workflowId = workflowId.unwrap,
applicationId = applicationId.unwrap,
commandId = commandId.unwrap,
party = submitter,
commands = Seq(command),
deduplicationTime = Some(deduplicationTime),
Expand All @@ -82,6 +80,7 @@ class SubmitRequestValidatorTest
workflowId = Some(workflowId),
applicationId = applicationId,
commandId = commandId,
submissionId = submissionId,
actAs = Set(DomainMocks.party),
readAs = Set.empty,
submittedAt = submittedAt,
Expand Down Expand Up @@ -123,14 +122,17 @@ class SubmitRequestValidatorTest
)
)

val commandsValidator = new CommandsValidator(ledgerId)
import ValueValidator.validateValue

private def unexpectedError = sys.error("unexpected error")

private val generateRandomSubmissionId: SubmissionIdGenerator =
() => Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)

"CommandSubmissionRequestValidator" when {
"validating command submission requests" should {
"reject requests with empty commands" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withCommands(Seq.empty),
Expand All @@ -144,6 +146,7 @@ class SubmitRequestValidatorTest
}

"not allow missing ledgerId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator
.validateCommands(
Expand All @@ -158,6 +161,8 @@ class SubmitRequestValidatorTest
}

"tolerate a missing workflowId" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.withWorkflowId(""),
internal.ledgerTime,
Expand All @@ -172,6 +177,7 @@ class SubmitRequestValidatorTest
}

"not allow missing applicationId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withApplicationId(""),
Expand All @@ -185,6 +191,7 @@ class SubmitRequestValidatorTest
}

"not allow missing commandId" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.withCommandId(""),
Expand All @@ -198,6 +205,7 @@ class SubmitRequestValidatorTest
}

"not allow missing submitter" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator
.validateCommands(
Expand All @@ -212,6 +220,7 @@ class SubmitRequestValidatorTest
}

"correctly read and deduplicate multiple submitters" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
val result = commandsValidator
.validateCommands(
api.commands
Expand All @@ -233,6 +242,8 @@ class SubmitRequestValidatorTest
}

"tolerate a single submitter specified in the actAs fields" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator
.validateCommands(
api.commands.withParty("").addActAs(api.submitter),
Expand All @@ -243,6 +254,8 @@ class SubmitRequestValidatorTest
}

"tolerate a single submitter specified in party, actAs, and readAs fields" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator
.validateCommands(
api.commands.withParty(api.submitter).addActAs(api.submitter).addReadAs(api.submitter),
Expand All @@ -254,6 +267,8 @@ class SubmitRequestValidatorTest

"advance ledger time if minLedgerTimeAbs is set" in {
val minLedgerTimeAbs = internal.ledgerTime.plus(internal.timeDelta)
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.copy(
minLedgerTimeAbs = Some(TimestampConversion.fromInstant(minLedgerTimeAbs))
Expand All @@ -266,6 +281,8 @@ class SubmitRequestValidatorTest

"advance ledger time if minLedgerTimeRel is set" in {
val minLedgerTimeAbs = internal.ledgerTime.plus(internal.timeDelta)
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.copy(
minLedgerTimeRel = Some(DurationConversion.toProto(internal.timeDelta))
Expand All @@ -277,6 +294,7 @@ class SubmitRequestValidatorTest
}

"not allow negative deduplication time" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = Some(Duration.of(-1, 0))),
Expand All @@ -291,6 +309,7 @@ class SubmitRequestValidatorTest

"not allow deduplication time exceeding maximum deduplication time" in {
val manySeconds = 100000L
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = Some(Duration.of(manySeconds, 0))),
Expand All @@ -305,6 +324,8 @@ class SubmitRequestValidatorTest
}

"default to maximum deduplication time if deduplication time is missing" in {
val generateSubmissionId: SubmissionIdGenerator = () => submissionId.unwrap
val commandsValidator = new CommandsValidator(ledgerId, generateSubmissionId)
commandsValidator.validateCommands(
api.commands.copy(deduplicationTime = None),
internal.ledgerTime,
Expand All @@ -318,14 +339,14 @@ class SubmitRequestValidatorTest
}

"not allow missing ledger configuration" in {
val commandsValidator = new CommandsValidator(ledgerId, generateRandomSubmissionId)
requestMustFailWith(
commandsValidator
.validateCommands(api.commands, internal.ledgerTime, internal.submittedAt, None),
UNAVAILABLE,
"The ledger configuration is not available.",
)
}

}

"validating contractId values" should {
Expand Down
Loading

0 comments on commit d666f76

Please sign in to comment.