Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all Api...Service classes to Sandbox package #3995

Merged
merged 1 commit into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ledger/ledger-api-common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ da_scala_library(
"//language-support/scala/bindings",
"//ledger-api/rs-grpc-bridge",
"//ledger/ledger-api-akka",
"//ledger/ledger-api-client",
"//ledger/ledger-api-domain",
"//ledger/ledger-api-health",
"//ledger/ledger-api-scala-logging",
"//ledger/ledger-api-scala-logging:ledger-api-scala-logging-base",
"//ledger/participant-state",
"//ledger/participant-state-index",
Expand Down
3 changes: 3 additions & 0 deletions ledger/sandbox/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,16 @@ da_scala_test_suite(
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_typesafe_akka_akka_actor_2_12",
"@maven//:com_typesafe_akka_akka_stream_2_12",
"@maven//:com_typesafe_akka_akka_stream_testkit_2_12",
"@maven//:com_typesafe_akka_akka_testkit_2_12",
"@maven//:com_typesafe_config",
"@maven//:commons_io_commons_io",
"@maven//:io_dropwizard_metrics_metrics_core",
"@maven//:io_grpc_grpc_services",
"@maven//:io_netty_netty_handler",
"@maven//:org_awaitility_awaitility",
"@maven//:org_flywaydb_flyway_core",
"@maven//:org_reactivestreams_reactive_streams",
"@maven//:org_scalactic_scalactic_2_12",
"@maven//:org_scalatest_scalatest_2_12",
"@maven//:org_scalaz_scalaz_core_2_12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import com.daml.ledger.participant.state.index.v2.{
IdentityProvider,
IndexActiveContractsService,
IndexCompletionsService,
IndexConfigManagementService,
IndexConfigurationService,
IndexPackagesService,
IndexPartyManagementService,
IndexService,
IndexTransactionsService,
IndexConfigManagementService
IndexTransactionsService
}
import com.daml.ledger.participant.state.v1.{Configuration, WriteService}
import com.digitalasset.api.util.TimeProvider
Expand All @@ -27,25 +27,25 @@ import com.digitalasset.ledger.api.health.HealthChecks
import com.digitalasset.ledger.api.v1.command_completion_service.CompletionEndRequest
import com.digitalasset.ledger.client.services.commands.CommandSubmissionFlow
import com.digitalasset.platform.apiserver.services.admin.{
ApiConfigManagementService,
ApiPackageManagementService,
ApiPartyManagementService,
ApiConfigManagementService
ApiPartyManagementService
}
import com.digitalasset.platform.apiserver.services.transaction.ApiTransactionService
import com.digitalasset.platform.apiserver.services.{
ApiActiveContractsService,
ApiCommandCompletionService,
ApiLedgerConfigurationService,
ApiLedgerIdentityService,
ApiPackageService,
ApiSubmissionService
}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.sandbox.config.CommandConfiguration
import com.digitalasset.platform.sandbox.stores.ledger.CommandExecutorImpl
import com.digitalasset.platform.server.api.services.grpc.GrpcHealthService
import com.digitalasset.platform.server.services.command.ApiCommandService
import com.digitalasset.platform.server.services.identity.ApiLedgerIdentityService
import com.digitalasset.platform.server.services.testing.{ApiTimeService, TimeServiceBackend}
import com.digitalasset.platform.apiserver.services.ApiCommandService
import com.digitalasset.platform.apiserver.services.ApiTimeService
import io.grpc.BindableService
import io.grpc.protobuf.services.ProtoReflectionService
import scalaz.syntax.tag._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.digitalasset.platform.resources.{Resource, ResourceOwner}
import com.digitalasset.platform.sandbox.BuildInfo
import com.digitalasset.platform.sandbox.config.SandboxConfig
import com.digitalasset.platform.sandbox.stores.InMemoryPackageStore
import com.digitalasset.platform.server.services.testing.TimeServiceBackend

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.testing
package com.digitalasset.platform.apiserver

import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services

import akka.NotUsed
import akka.actor.Cancellable
Expand All @@ -26,21 +26,21 @@ import com.digitalasset.ledger.client.services.commands.{
CommandCompletionSource,
CommandTrackerFlow
}
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.apiserver.services.tracking.{TrackerImpl, TrackerMap}
import com.digitalasset.platform.apiserver.services.ApiCommandService.LowLevelCommandServiceAccess
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.platform.server.api.services.grpc.GrpcCommandService
import com.digitalasset.platform.server.services.command.ApiCommandService.LowLevelCommandServiceAccess
import com.digitalasset.util.Ctx
import com.digitalasset.util.akkastreams.MaxInFlight
import com.google.protobuf.empty.Empty

import io.grpc._
import scalaz.syntax.tag._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scalaz.syntax.tag._

class ApiCommandService private (
lowLevelCommandServiceAccess: LowLevelCommandServiceAccess,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.identity
package com.digitalasset.platform.apiserver.services

import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.v1.ledger_identity_service.LedgerIdentityServiceGrpc.{
LedgerIdentityService => GrpcLedgerIdentityService
Expand All @@ -16,8 +17,6 @@ import com.digitalasset.ledger.api.v1.ledger_identity_service.{
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.dec.DirectExecutionContext

import io.grpc.{BindableService, ServerServiceDefinition, Status}
import org.slf4j.Logger
import scalaz.syntax.tag._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.testing
package com.digitalasset.platform.apiserver.services

import java.time.Instant

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.digitalasset.api.util.TimestampConversion._
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.ledger.api.domain.LedgerId
import com.digitalasset.ledger.api.v1.testing.time_service.TimeServiceGrpc.TimeService
import com.digitalasset.ledger.api.v1.testing.time_service._
import com.digitalasset.platform.akkastreams.dispatcher.SignalDispatcher
import com.digitalasset.platform.api.grpc.GrpcApiService
import com.digitalasset.platform.apiserver.TimeServiceBackend
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.platform.server.api.validation.FieldValidations
import com.google.protobuf.empty.Empty

import io.grpc.{ServerServiceDefinition, Status, StatusRuntimeException}
import org.slf4j.Logger
import scalaz.syntax.tag._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import com.digitalasset.dec.{DirectExecutionContext => DE}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}

object PollingUtils {
// TODO Remove this once PartyManagement is made async
private[admin] object PollingUtils {

/**
* Continuously polls the given service to check if the given item has been persisted.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services.tracking

import akka.stream.QueueOfferResult
import com.digitalasset.platform.server.api.ApiException
Expand All @@ -11,7 +11,7 @@ import io.grpc.{Status => GrpcStatus}
import scala.concurrent.Promise
import scala.util.{Failure, Success, Try}

object HandleOfferResult {
private[tracking] object HandleOfferResult {
val toGrpcStatus: PartialFunction[Try[QueueOfferResult], Option[GrpcStatus]] = {
case Failure(t) =>
t match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services.tracking

import java.util

private class SizeCappedMap[K, V](initialCapacity: Int, maxCapacity: Int)
private[tracking] class SizeCappedMap[K, V](initialCapacity: Int, maxCapacity: Int)
extends util.LinkedHashMap[K, V](initialCapacity) {

override def removeEldestEntry(eldest: util.Map.Entry[K, V]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services.tracking

import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.completion.Completion

import scala.concurrent.{ExecutionContext, Future}

trait Tracker extends AutoCloseable {
private[tracking] trait Tracker extends AutoCloseable {

def track(request: SubmitAndWaitRequest)(implicit ec: ExecutionContext): Future[Completion]
}

object Tracker {
private[tracking] object Tracker {

class WithLastSubmission(delegate: Tracker) extends Tracker {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services.tracking

import java.util
import java.util.Collections

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy}
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.command_submission_service.SubmitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.ledger.client.services.commands.CommandTrackerFlow.Materialized
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.platform.server.api.ApiException
import com.digitalasset.util.Ctx
import com.google.rpc.code.Code
Expand All @@ -28,14 +28,14 @@ import scala.util.{Failure, Success}
/**
* Tracks SubmitAndWaitRequests.
* @param queue The input queue to the tracking flow.
* @param historySize The number of command IDs to remember for deduplicating tracked futures and results.
* @param historySize The number of command IDs to remember for de-duplicating tracked futures and results.
*/
class TrackerImpl(queue: SourceQueueWithComplete[TrackerImpl.QueueInput], historySize: Int)
final class TrackerImpl(queue: SourceQueueWithComplete[TrackerImpl.QueueInput], historySize: Int)
extends Tracker {

private val logger = LoggerFactory.getLogger(this.getClass)

require(historySize > 0, " History size must be a positive integer.")
require(historySize > 0, "History size must be a positive integer.")

private val knownResults: util.Map[(String, String), Future[Completion]] =
Collections.synchronizedMap(new SizeCappedMap(historySize / 2, historySize))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
// Copyright (c) 2020 The DAML Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.digitalasset.platform.server.services.command
package com.digitalasset.platform.apiserver.services.tracking

import java.util.concurrent.atomic.AtomicReference

import com.digitalasset.ledger.api.v1.command_service.SubmitAndWaitRequest
import com.digitalasset.ledger.api.v1.completion.Completion
import com.digitalasset.platform.common.logging.NamedLoggerFactory
import com.digitalasset.dec.DirectExecutionContext
import com.digitalasset.platform.server.services.command.TrackerMap.{AsyncResource, Key}
import com.github.ghik.silencer.silent

import scala.collection.immutable.HashMap
import scala.concurrent.duration.{FiniteDuration, _}
Expand All @@ -21,15 +19,15 @@ import scala.util.{Failure, Success}
* A map for [[Tracker]]s with thread-safe tracking methods and automatic cleanup. A tracker tracker, if you will.
* @param retentionPeriod The minimum finite duration for which to retain idle trackers.
*/
class TrackerMap(retentionPeriod: FiniteDuration, loggerFactory: NamedLoggerFactory)
final class TrackerMap(retentionPeriod: FiniteDuration, loggerFactory: NamedLoggerFactory)
extends AutoCloseable {

private val logger = loggerFactory.getLogger(this.getClass)

private val lock = new Object()

@volatile private var trackerBySubmitter =
HashMap.empty[Key, AsyncResource[Tracker.WithLastSubmission]]
HashMap.empty[TrackerMap.Key, TrackerMap.AsyncResource[Tracker.WithLastSubmission]]

val cleanup: Runnable = {
require(
Expand All @@ -56,16 +54,16 @@ class TrackerMap(retentionPeriod: FiniteDuration, loggerFactory: NamedLoggerFact
}
}

def track(submitter: Key, request: SubmitAndWaitRequest)(newTracker: => Future[Tracker])(
implicit ec: ExecutionContext): Future[Completion] =
def track(submitter: TrackerMap.Key, request: SubmitAndWaitRequest)(
newTracker: => Future[Tracker])(implicit ec: ExecutionContext): Future[Completion] =
// double-checked locking
trackerBySubmitter
.getOrElse(
submitter,
lock.synchronized {
trackerBySubmitter.getOrElse(
submitter, {
val r = new AsyncResource(newTracker.map { t =>
val r = new TrackerMap.AsyncResource(newTracker.map { t =>
logger.info("Registered tracker for submitter {}", submitter)
Tracker.WithLastSubmission(t)
}, loggerFactory)
Expand All @@ -79,7 +77,7 @@ class TrackerMap(retentionPeriod: FiniteDuration, loggerFactory: NamedLoggerFact
)
.flatMap(_.track(request))

private def remove(submitter: Key): Unit = lock.synchronized {
private def remove(submitter: TrackerMap.Key): Unit = lock.synchronized {
trackerBySubmitter -= submitter
}

Expand All @@ -93,24 +91,25 @@ class TrackerMap(retentionPeriod: FiniteDuration, loggerFactory: NamedLoggerFact
}

object TrackerMap {
case class Key(application: String, party: String)

final case class Key(application: String, party: String)

sealed trait AsyncResourceState[+T <: AutoCloseable]
stefanobaghino-da marked this conversation as resolved.
Show resolved Hide resolved
final case object Waiting extends AsyncResourceState[Nothing]
final case object Closed extends AsyncResourceState[Nothing]
final case class Ready[T <: AutoCloseable](t: T) extends AsyncResourceState[T]

/**
* A holder for an AutoCloseable that can be opened and closed async.
* If closed before the underlying Future completes, will close the resource on completion.
*/
class AsyncResource[T <: AutoCloseable](future: Future[T], loggerFactory: NamedLoggerFactory) {
final class AsyncResource[T <: AutoCloseable](
future: Future[T],
loggerFactory: NamedLoggerFactory) {
private val logger = loggerFactory.getLogger(this.getClass)
sealed trait AsnycResourceState
final case object Waiting extends AsnycResourceState
// the following silent is due to
// <https://github.com/scala/bug/issues/4440>
@silent
final case class Ready(t: T) extends AsnycResourceState
final case object Closed extends AsnycResourceState

// Must progress Waiting => Ready => Closed or Waiting => Closed.
val state: AtomicReference[AsnycResourceState] = new AtomicReference(Waiting)
val state: AtomicReference[AsyncResourceState[T]] = new AtomicReference(Waiting)

future.andThen({
case Success(t) =>
Expand All @@ -131,8 +130,8 @@ object TrackerMap {
def flatMap[U](f: T => Future[U])(implicit ex: ExecutionContext): Future[U] = {
state.get() match {
case Waiting => future.flatMap(f)
case Ready(t) => f(t)
case Closed => throw new IllegalStateException()
case Ready(t) => f(t)
}
}

Expand Down
Loading