diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala index 7a79bf48d859..98597f2603bb 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/ApiSubmissionServiceSpec.scala @@ -4,15 +4,15 @@ package com.daml.platform.apiserver.services import java.time.{Duration, Instant} -import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.atomic.AtomicInteger import akka.stream.Materializer -import com.codahale.metrics.{Meter, MetricRegistry} +import akka.stream.scaladsl.Source +import com.codahale.metrics.MetricRegistry import com.daml.api.util.TimeProvider import com.daml.ledger.api.DomainMocks -import com.daml.ledger.api.domain.LedgerOffset.Absolute -import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails} +import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, LedgerOffset, PartyDetails} import com.daml.ledger.api.messages.command.submission.SubmitRequest import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.ledger.configuration.{Configuration, LedgerTimeModel} @@ -23,7 +23,7 @@ import com.daml.ledger.participant.state.index.v2.{ IndexSubmissionService, } import com.daml.ledger.participant.state.v1.{SubmissionResult, WriteService} -import com.daml.ledger.resources.TestResourceContext +import com.daml.ledger.resources.{ResourceOwner, TestResourceContext} import com.daml.lf import com.daml.lf.command.{Commands => LfCommands} import com.daml.lf.crypto.Hash @@ -43,11 +43,10 @@ import com.daml.platform.configuration.LedgerConfiguration import com.daml.platform.store.ErrorCause import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext} import io.grpc.Status -import org.mockito.captor.ArgCaptor -import org.mockito.{ArgumentMatchersSugar, Mockito, MockitoSugar} +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} +import org.scalatest.Inside import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.{BeforeAndAfter, OneInstancePerTest, Succeeded} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -55,10 +54,9 @@ import scala.util.{Failure, Success} class ApiSubmissionServiceSpec extends AsyncFlatSpec with Matchers + with Inside with MockitoSugar with ArgumentMatchersSugar - with OneInstancePerTest - with BeforeAndAfter with AkkaBeforeAndAfterAll with TestResourceContext { private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting @@ -88,80 +86,115 @@ class ApiSubmissionServiceSpec } private val transaction = builder.buildSubmitted() - private val partyManagementService = mock[IndexPartyManagementService] - private val writeService = mock[WriteService] - private val defaultSubmissionService = - submissionService(writeService, partyManagementService, implicitPartyAllocation = true) + behavior of "allocateMissingInformees" + + it should "allocate missing informees" in { + val partyManagementService = mock[IndexPartyManagementService] + val writeService = mock[WriteService] + + when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext])) + .thenAnswer[Seq[Ref.Party]] { parties => + Future.successful( + parties.view + .filter(knownPartiesSet) + .map(PartyDetails(_, Option.empty, isLocal = true)) + .toList + ) + } - before { when( writeService.allocateParty( any[Option[Ref.Party]], any[Option[Ref.Party]], any[Ref.SubmissionId], )(any[TelemetryContext]) - ).thenReturn(CompletableFuture.completedFuture(SubmissionResult.Acknowledged)) + ).thenReturn(completedFuture(SubmissionResult.Acknowledged)) + + submissionService( + writeService, + partyManagementService, + implicitPartyAllocation = true, + ).use { service => + for { + results <- service.allocateMissingInformees(transaction) + } yield { + results should have size 100 + all(results) should be(SubmissionResult.Acknowledged) + missingParties.foreach { party => + verify(writeService).allocateParty( + eqTo(Some(Ref.Party.assertFromString(party))), + eqTo(Some(party)), + any[Ref.SubmissionId], + )(any[TelemetryContext]) + } + verifyNoMoreInteractions(writeService) + succeed + } + } } - behavior of "allocateMissingInformees" + it should "not allocate if all parties are already known" in { + val partyManagementService = mock[IndexPartyManagementService] + val writeService = mock[WriteService] - it should "allocate missing informees" in { - val argCaptor = ArgCaptor[Seq[Ref.Party]] + when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext])) + .thenAnswer[Seq[Ref.Party]] { parties => + Future.successful(parties.view.map(PartyDetails(_, Option.empty, isLocal = true)).toList) + } - when(partyManagementService.getParties(argCaptor.capture)(any[LoggingContext])).thenAnswer( - Future.successful( - argCaptor.value - .filter(knownPartiesSet) - .map(PartyDetails(_, Option.empty, isLocal = true)) - .toList - ) - ) + when( + writeService.allocateParty( + any[Option[Ref.Party]], + any[Option[Ref.Party]], + any[Ref.SubmissionId], + )(any[TelemetryContext]) + ).thenReturn(completedFuture(SubmissionResult.Acknowledged)) - for { - service <- defaultSubmissionService - results <- service.allocateMissingInformees(transaction) - } yield { - results.find(_ != SubmissionResult.Acknowledged) shouldBe None - results.size shouldBe 100 - Mockito.mockingDetails(partyManagementService).getInvocations.size() shouldBe 1 - Mockito.mockingDetails(writeService).getInvocations.size() shouldBe 100 - missingParties.foreach(party => - verify(writeService).allocateParty( - eqTo(Some(Ref.Party.assertFromString(party))), - eqTo(Some(Ref.Party.assertFromString(party))), + submissionService( + writeService, + partyManagementService, + implicitPartyAllocation = true, + ).use { service => + for { + result <- service.allocateMissingInformees(transaction) + } yield { + result shouldBe Seq.empty[SubmissionResult] + verify(writeService, never).allocateParty( + any[Option[Ref.Party]], + any[Option[String]], any[Ref.SubmissionId], )(any[TelemetryContext]) - ) - verifyNoMoreInteractions(writeService) - succeed - } - } - - it should "not allocate if all parties are already known" in { - val argCaptor = ArgCaptor[Seq[Ref.Party]] - when(partyManagementService.getParties(argCaptor.capture)(any[LoggingContext])).thenAnswer( - Future.successful(argCaptor.value.map(PartyDetails(_, Option.empty, isLocal = true)).toList) - ) - - for { - service <- defaultSubmissionService - result <- service.allocateMissingInformees(transaction) - } yield { - verifyZeroInteractions(writeService) - result shouldBe Seq.empty[SubmissionResult] + succeed + } } } it should "not allocate missing informees if implicit party allocation is disabled" in { - for { - service <- submissionService(null, null, implicitPartyAllocation = false) - result <- service.allocateMissingInformees(transaction) - } yield { - result shouldBe Seq.empty[SubmissionResult] + val writeService = mock[WriteService] + + submissionService( + mock[WriteService], + mock[IndexPartyManagementService], + implicitPartyAllocation = false, + ).use { service => + for { + result <- service.allocateMissingInformees(transaction) + } yield { + result shouldBe Seq.empty[SubmissionResult] + verify(writeService, never).allocateParty( + any[Option[Ref.Party]], + any[Option[String]], + any[Ref.SubmissionId], + )(any[TelemetryContext]) + succeed + } } } it should "forward SubmissionResult if it failed" in { + val partyManagementService = mock[IndexPartyManagementService] + val writeService = mock[WriteService] + val party = "party-1" val typedParty = Ref.Party.assertFromString(party) val submissionFailure = SubmissionResult.InternalError(s"failed to allocate $party") @@ -171,10 +204,9 @@ class ApiSubmissionServiceSpec eqTo(Some(party)), any[Ref.SubmissionId], )(any[TelemetryContext]) - ).thenReturn(CompletableFuture.completedFuture(submissionFailure)) - when(partyManagementService.getParties(eqTo(Seq(typedParty)))(any[LoggingContext])).thenAnswer( - Future(List.empty[PartyDetails]) - ) + ).thenReturn(completedFuture(submissionFailure)) + when(partyManagementService.getParties(Seq(typedParty))) + .thenReturn(Future(List.empty[PartyDetails])) val builder = TransactionBuilder() builder.add( builder.create( @@ -188,17 +220,25 @@ class ApiSubmissionServiceSpec ) val transaction = builder.buildSubmitted() - for { - service <- defaultSubmissionService - result <- service.allocateMissingInformees(transaction) - } yield { - result shouldBe Seq(submissionFailure) + submissionService( + writeService, + partyManagementService, + implicitPartyAllocation = true, + ).use { service => + for { + result <- service.allocateMissingInformees(transaction) + } yield { + result shouldBe Seq(submissionFailure) + } } } behavior of "submit" it should "return proper gRPC status codes for DamlLf errors" in { + val partyManagementService = mock[IndexPartyManagementService] + val writeService = mock[WriteService] + val tmplId = Ref.Identifier.assertFromString("pkgId:M:T") val errorsToStatuses = List( @@ -248,45 +288,46 @@ class ApiSubmissionServiceSpec val commandId = new AtomicInteger() val mockCommandExecutor = mock[CommandExecutor] - Future - .sequence(errorsToStatuses.map { case (error, code) => - val submitRequest = SubmitRequest( - Commands( - ledgerId = LedgerId("ledger-id"), - workflowId = None, - applicationId = DomainMocks.applicationId, - commandId = CommandId( - Ref.LedgerString.assertFromString(s"commandId-${commandId.incrementAndGet()}") - ), - actAs = Set.empty, - readAs = Set.empty, - submittedAt = Instant.MIN, - deduplicateUntil = Instant.MIN, - commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""), - ) - ) - when( - mockCommandExecutor.execute(eqTo(submitRequest.commands), any[Hash])( - any[ExecutionContext], - any[LoggingContext], + submissionService( + writeService, + partyManagementService, + implicitPartyAllocation = true, + commandExecutor = mockCommandExecutor, + ).use { service => + Future + .sequence(errorsToStatuses.map { case (error, code) => + val submitRequest = SubmitRequest( + Commands( + ledgerId = LedgerId("ledger-id"), + workflowId = None, + applicationId = DomainMocks.applicationId, + commandId = CommandId( + Ref.LedgerString.assertFromString(s"commandId-${commandId.incrementAndGet()}") + ), + actAs = Set.empty, + readAs = Set.empty, + submittedAt = Instant.MIN, + deduplicateUntil = Instant.MIN, + commands = LfCommands(ImmArray.empty, Timestamp.MinValue, ""), + ) ) - ).thenReturn(Future.successful(Left(error))) + when( + mockCommandExecutor.execute(eqTo(submitRequest.commands), any[Hash])( + any[ExecutionContext], + any[LoggingContext], + ) + ).thenReturn(Future.successful(Left(error))) - for { - service <- submissionService( - writeService, - partyManagementService, - implicitPartyAllocation = true, - commandExecutor = mockCommandExecutor, - ) - assertions <- service.submit(submitRequest).transform { - case Success(_) => fail() - case Failure(e) => Success(e.getMessage should startWith(code.getCode.toString)) - } - } yield assertions - }) - .map(_.forall(_ == Succeeded)) - .map(assert(_)) + service.submit(submitRequest).transform(result => Success(code -> result)) + }) + }.map { results => + results.foreach { case (code, result) => + inside(result) { case Failure(exception) => + exception.getMessage should startWith(code.getCode.toString) + } + } + succeed + } } private def submissionService( @@ -294,14 +335,18 @@ class ApiSubmissionServiceSpec partyManagementService: IndexPartyManagementService, implicitPartyAllocation: Boolean, commandExecutor: CommandExecutor = null, - )(implicit materializer: Materializer) = { - val mockMetricRegistry = mock[MetricRegistry] - val mockIndexSubmissionService = mock[IndexSubmissionService] - val mockConfigManagementService = mock[IndexConfigManagementService] + )(implicit materializer: Materializer): ResourceOwner[ApiSubmissionService] = { + val configManagementService = mock[IndexConfigManagementService] + val offset = LedgerOffset.Absolute(Ref.LedgerString.assertFromString("offset")) val configuration = Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO) - when(mockMetricRegistry.meter(any[String])).thenReturn(new Meter()) + when(configManagementService.lookupConfiguration()) + .thenReturn(Future.successful(Some((offset, configuration)))) + when(configManagementService.configurationEntries(Some(offset))) + .thenReturn(Source.empty) + + val indexSubmissionService = mock[IndexSubmissionService] when( - mockIndexSubmissionService.deduplicateCommand( + indexSubmissionService.deduplicateCommand( any[CommandId], anyList[Ref.Party], any[Instant], @@ -309,46 +354,36 @@ class ApiSubmissionServiceSpec )(any[LoggingContext]) ).thenReturn(Future.successful(CommandDeduplicationNew)) when( - mockIndexSubmissionService.stopDeduplicatingCommand(any[CommandId], anyList[Ref.Party])( - any[LoggingContext] - ) + indexSubmissionService.stopDeduplicatingCommand( + any[CommandId], + anyList[Ref.Party], + )(any[LoggingContext]) ).thenReturn(Future.unit) - when(mockConfigManagementService.lookupConfiguration()) - .thenReturn( - Future.successful( - Some((Absolute(Ref.LedgerString.assertFromString("offset")), configuration)) - ) - ) - val configProviderResource = LedgerConfigProvider + LedgerConfigProvider .owner( - mockConfigManagementService, - optWriteService = Some(writeService), - timeProvider = mock[TimeProvider], + configManagementService, + optWriteService = None, + timeProvider = TimeProvider.Constant(Instant.EPOCH), config = LedgerConfiguration( initialConfiguration = configuration, initialConfigurationSubmitDelay = Duration.ZERO, configurationLoadTimeout = Duration.ZERO, ), ) - .acquire() - - for { - configProvider <- configProviderResource.asFuture - _ <- configProviderResource.release() - } yield { - new ApiSubmissionService( - writeService = writeService, - submissionService = mockIndexSubmissionService, - partyManagementService = partyManagementService, - timeProvider = null, - timeProviderType = null, - ledgerConfigProvider = configProvider, - seedService = SeedService.WeakRandom, - commandExecutor = commandExecutor, - configuration = ApiSubmissionService.Configuration(implicitPartyAllocation), - metrics = new Metrics(mockMetricRegistry), + .map(configProvider => + new ApiSubmissionService( + writeService = writeService, + submissionService = indexSubmissionService, + partyManagementService = partyManagementService, + timeProvider = null, + timeProviderType = null, + ledgerConfigProvider = configProvider, + seedService = SeedService.WeakRandom, + commandExecutor = commandExecutor, + configuration = ApiSubmissionService.Configuration(implicitPartyAllocation), + metrics = new Metrics(new MetricRegistry), + ) ) - } } }