Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

participant-integration-api: Fill out stubs in ApiSubmissionServiceSpec. #10349

Merged
merged 3 commits into from
Jul 21, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
package com.daml.platform.apiserver.services

import java.time.{Duration, Instant}
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger

import akka.stream.Materializer
import com.codahale.metrics.{Meter, MetricRegistry}
import akka.stream.scaladsl.Source
import com.codahale.metrics.MetricRegistry
import com.daml.api.util.TimeProvider
import com.daml.ledger.api.DomainMocks
import com.daml.ledger.api.domain.LedgerOffset.Absolute
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, LedgerOffset, PartyDetails}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
Expand Down Expand Up @@ -43,11 +43,10 @@ import com.daml.platform.configuration.LedgerConfiguration
import com.daml.platform.store.ErrorCause
import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext}
import io.grpc.Status
import org.mockito.captor.ArgCaptor
import org.mockito.{ArgumentMatchersSugar, Mockito, MockitoSugar}
import org.mockito.{ArgumentMatchersSugar, MockitoSugar}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfter, OneInstancePerTest, Succeeded}
import org.scalatest.{OneInstancePerTest, Succeeded}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand All @@ -58,7 +57,6 @@ class ApiSubmissionServiceSpec
with MockitoSugar
with ArgumentMatchersSugar
with OneInstancePerTest
with BeforeAndAfter
with AkkaBeforeAndAfterAll
with TestResourceContext {
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
Expand Down Expand Up @@ -88,80 +86,117 @@ class ApiSubmissionServiceSpec
}
private val transaction = builder.buildSubmitted()

private val partyManagementService = mock[IndexPartyManagementService]
private val writeService = mock[WriteService]
private val defaultSubmissionService =
submissionService(writeService, partyManagementService, implicitPartyAllocation = true)
behavior of "allocateMissingInformees"

it should "allocate missing informees" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[WriteService]

when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext]))
.thenAnswer[Seq[Ref.Party]] { parties =>
Future.successful(
parties.view
.filter(knownPartiesSet)
.map(PartyDetails(_, Option.empty, isLocal = true))
.toList
)
}

before {
when(
writeService.allocateParty(
any[Option[Ref.Party]],
any[Option[Ref.Party]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
).thenReturn(CompletableFuture.completedFuture(SubmissionResult.Acknowledged))
}

behavior of "allocateMissingInformees"

it should "allocate missing informees" in {
val argCaptor = ArgCaptor[Seq[Ref.Party]]

when(partyManagementService.getParties(argCaptor.capture)(any[LoggingContext])).thenAnswer(
Future.successful(
argCaptor.value
.filter(knownPartiesSet)
.map(PartyDetails(_, Option.empty, isLocal = true))
.toList
)
)
).thenReturn(completedFuture(SubmissionResult.Acknowledged))

for {
service <- defaultSubmissionService
service <- submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
)
results <- service.allocateMissingInformees(transaction)
} yield {
results.find(_ != SubmissionResult.Acknowledged) shouldBe None
results.size shouldBe 100
Mockito.mockingDetails(partyManagementService).getInvocations.size() shouldBe 1
Mockito.mockingDetails(writeService).getInvocations.size() shouldBe 100
missingParties.foreach(party =>
results should have size 100
all(results) should be(SubmissionResult.Acknowledged)
verify(writeService).submitConfiguration(
any[Timestamp],
any[Ref.SubmissionId],
any[Configuration],
)(any[TelemetryContext])
missingParties.foreach { party =>
verify(writeService).allocateParty(
eqTo(Some(Ref.Party.assertFromString(party))),
eqTo(Some(Ref.Party.assertFromString(party))),
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
)
}
verifyNoMoreInteractions(writeService)
succeed
}
}

it should "not allocate if all parties are already known" in {
val argCaptor = ArgCaptor[Seq[Ref.Party]]
when(partyManagementService.getParties(argCaptor.capture)(any[LoggingContext])).thenAnswer(
Future.successful(argCaptor.value.map(PartyDetails(_, Option.empty, isLocal = true)).toList)
)
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[WriteService]

when(partyManagementService.getParties(any[Seq[Ref.Party]])(any[LoggingContext]))
.thenAnswer[Seq[Ref.Party]] { parties =>
Future.successful(parties.view.map(PartyDetails(_, Option.empty, isLocal = true)).toList)
}

when(
writeService.allocateParty(
any[Option[Ref.Party]],
any[Option[Ref.Party]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
).thenReturn(completedFuture(SubmissionResult.Acknowledged))

for {
service <- defaultSubmissionService
service <- submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
)
result <- service.allocateMissingInformees(transaction)
} yield {
verifyZeroInteractions(writeService)
result shouldBe Seq.empty[SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
}

it should "not allocate missing informees if implicit party allocation is disabled" in {
val writeService = mock[WriteService]

for {
service <- submissionService(null, null, implicitPartyAllocation = false)
service <- submissionService(
mock[WriteService],
mock[IndexPartyManagementService],
implicitPartyAllocation = false,
)
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq.empty[SubmissionResult]
verify(writeService, never).allocateParty(
any[Option[Ref.Party]],
any[Option[String]],
any[Ref.SubmissionId],
)(any[TelemetryContext])
succeed
}
}

it should "forward SubmissionResult if it failed" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[WriteService]

val party = "party-1"
val typedParty = Ref.Party.assertFromString(party)
val submissionFailure = SubmissionResult.InternalError(s"failed to allocate $party")
Expand All @@ -171,10 +206,9 @@ class ApiSubmissionServiceSpec
eqTo(Some(party)),
any[Ref.SubmissionId],
)(any[TelemetryContext])
).thenReturn(CompletableFuture.completedFuture(submissionFailure))
when(partyManagementService.getParties(eqTo(Seq(typedParty)))(any[LoggingContext])).thenAnswer(
Future(List.empty[PartyDetails])
)
).thenReturn(completedFuture(submissionFailure))
when(partyManagementService.getParties(Seq(typedParty)))
.thenReturn(Future(List.empty[PartyDetails]))
val builder = TransactionBuilder()
builder.add(
builder.create(
Expand All @@ -189,7 +223,11 @@ class ApiSubmissionServiceSpec
val transaction = builder.buildSubmitted()

for {
service <- defaultSubmissionService
service <- submissionService(
writeService,
partyManagementService,
implicitPartyAllocation = true,
)
result <- service.allocateMissingInformees(transaction)
} yield {
result shouldBe Seq(submissionFailure)
Expand All @@ -199,6 +237,9 @@ class ApiSubmissionServiceSpec
behavior of "submit"

it should "return proper gRPC status codes for DamlLf errors" in {
val partyManagementService = mock[IndexPartyManagementService]
val writeService = mock[WriteService]

val tmplId = Ref.Identifier.assertFromString("pkgId:M:T")

val errorsToStatuses = List(
Expand Down Expand Up @@ -294,37 +335,43 @@ class ApiSubmissionServiceSpec
partyManagementService: IndexPartyManagementService,
implicitPartyAllocation: Boolean,
commandExecutor: CommandExecutor = null,
)(implicit materializer: Materializer) = {
val mockMetricRegistry = mock[MetricRegistry]
val mockIndexSubmissionService = mock[IndexSubmissionService]
val mockConfigManagementService = mock[IndexConfigManagementService]
)(implicit materializer: Materializer): Future[ApiSubmissionService] = {
val configManagementService = mock[IndexConfigManagementService]
val offset = LedgerOffset.Absolute(Ref.LedgerString.assertFromString("offset"))
val configuration = Configuration(0L, LedgerTimeModel.reasonableDefault, Duration.ZERO)
when(mockMetricRegistry.meter(any[String])).thenReturn(new Meter())
when(configManagementService.lookupConfiguration())
.thenReturn(Future.successful(Some((offset, configuration))))
when(configManagementService.configurationEntries(Some(offset)))
.thenReturn(Source.empty)
when(
mockIndexSubmissionService.deduplicateCommand(
writeService.submitConfiguration(
any[Timestamp],
any[Ref.SubmissionId],
eqTo(configuration),
)(any[TelemetryContext])
).thenReturn(completedFuture(SubmissionResult.Acknowledged))

val indexSubmissionService = mock[IndexSubmissionService]
when(
indexSubmissionService.deduplicateCommand(
any[CommandId],
anyList[Ref.Party],
any[Instant],
any[Instant],
)(any[LoggingContext])
).thenReturn(Future.successful(CommandDeduplicationNew))
when(
mockIndexSubmissionService.stopDeduplicatingCommand(any[CommandId], anyList[Ref.Party])(
any[LoggingContext]
)
indexSubmissionService.stopDeduplicatingCommand(
any[CommandId],
anyList[Ref.Party],
)(any[LoggingContext])
).thenReturn(Future.unit)
when(mockConfigManagementService.lookupConfiguration())
.thenReturn(
Future.successful(
Some((Absolute(Ref.LedgerString.assertFromString("offset")), configuration))
)
)

val configProviderResource = LedgerConfigProvider
.owner(
mockConfigManagementService,
configManagementService,
optWriteService = Some(writeService),
timeProvider = mock[TimeProvider],
timeProvider = TimeProvider.Constant(Instant.EPOCH),
config = LedgerConfiguration(
initialConfiguration = configuration,
initialConfigurationSubmitDelay = Duration.ZERO,
Expand All @@ -336,19 +383,17 @@ class ApiSubmissionServiceSpec
for {
configProvider <- configProviderResource.asFuture
_ <- configProviderResource.release()
} yield {
new ApiSubmissionService(
writeService = writeService,
submissionService = mockIndexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigProvider = configProvider,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(mockMetricRegistry),
)
}
} yield new ApiSubmissionService(
writeService = writeService,
submissionService = indexSubmissionService,
partyManagementService = partyManagementService,
timeProvider = null,
timeProviderType = null,
ledgerConfigProvider = configProvider,
seedService = SeedService.WeakRandom,
commandExecutor = commandExecutor,
configuration = ApiSubmissionService.Configuration(implicitPartyAllocation),
metrics = new Metrics(new MetricRegistry),
)
}
}