Skip to content

Commit

Permalink
move reusable functions from ContractsFetch to new fetch-contracts li…
Browse files Browse the repository at this point in the history
…brary (#11057)

* split akka-streams and doobie utils from ContractsFetch to new fetch-contracts library

* move most stream components from ContractsFetch to new library

* fix packages

* make the fetchcontracts domain model work

* move transactionFilter to fetchcontracts

* lots of unused imports

* start incorporating fetch-contracts in http-json

* move toTerminates

* more unused imports; http-json compiles

* more fetch-contracts dep

* bring back HasTemplateId[ActiveContract]; integration tests compile

* whole ledger-service tree compiles

* fix oracle missing dep

* scoping some new library identifiers

* remove apiIdentifier aliases

* comment on Aliases

* remove toTerminates shim

* no changelog

CHANGELOG_BEGIN
CHANGELOG_END

* unused bazel imports

* remove already-addressed TODO

- suggested by @akshayshirahatti-da; thanks
  • Loading branch information
S11001001 authored Oct 5, 2021
1 parent c2084f6 commit 9fd8182
Show file tree
Hide file tree
Showing 27 changed files with 630 additions and 451 deletions.
75 changes: 75 additions & 0 deletions ledger-service/fetch-contracts/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

load(
"//bazel_tools:scala.bzl",
"da_scala_library",
"da_scala_test",
"lf_scalacopts",
"silencer_plugin",
)
load("@scala_version//:index.bzl", "scala_version_suffix")

hj_scalacopts = lf_scalacopts + [
"-P:wartremover:traverser:org.wartremover.warts.NonUnitStatements",
]

da_scala_library(
name = "fetch-contracts",
srcs = glob(["src/main/scala/**/*.scala"]),
plugins = [
"@maven//:org_typelevel_kind_projector_{}".format(scala_version_suffix),
silencer_plugin,
],
scala_deps = [
"@maven//:io_spray_spray_json",
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scalaz_scalaz_core",
"@maven//:org_tpolecat_doobie_core",
"@maven//:org_tpolecat_doobie_free",
"@maven//:org_typelevel_cats_core",
"@maven//:org_typelevel_cats_free",
],
scalacopts = hj_scalacopts,
tags = ["maven_coordinates=com.daml:fetch-contracts:__VERSION__"],
visibility = ["//visibility:public"],
runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:org_codehaus_janino_janino",
],
deps = [
"//daml-lf/data",
"//daml-lf/transaction",
"//language-support/scala/bindings-akka",
"//ledger-service/db-backend",
"//libs-scala/scala-utils",
],
)

da_scala_test(
name = "tests",
size = "medium",
srcs = glob(["src/test/scala/**/*.scala"]),
plugins = [
"@maven//:org_typelevel_kind_projector_{}".format(scala_version_suffix),
silencer_plugin,
],
scala_deps = [
"@maven//:org_scala_lang_modules_scala_collection_compat",
"@maven//:org_scalacheck_scalacheck",
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
"@maven//:org_scalatest_scalatest_shouldmatchers",
"@maven//:org_scalatestplus_scalacheck_1_15",
"@maven//:org_scalaz_scalaz_core",
"@maven//:org_scalaz_scalaz_scalacheck_binding",
"@maven//:org_tpolecat_doobie_core",
],
scalacopts = hj_scalacopts,
deps = [
":fetch-contracts",
"//ledger-service/db-backend",
"//libs-scala/scalatest-utils",
"@maven//:org_scalatest_scalatest_compatible",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.fetchcontracts

import akka.NotUsed
import akka.stream.scaladsl.{Broadcast, Concat, Flow, GraphDSL, Source}
import akka.stream.{FanOutShape2, Graph}
import com.daml.scalautil.Statement.discard
import domain.TemplateId
import util.{AbsoluteBookmark, BeginBookmark, ContractStreamStep, InsertDeleteStep, LedgerBegin}
import util.IdentifierConverters.apiIdentifier
import com.daml.ledger.api.v1.transaction.Transaction
import com.daml.ledger.api.{v1 => lav1}
import scalaz.OneAnd._
import scalaz.std.set._
import scalaz.syntax.tag._
import scalaz.syntax.foldable._
import scalaz.OneAnd

private[daml] object AcsTxStreams {
import util.AkkaStreamsDoobie.{last, max, project2}

/** Plan inserts, deletes from an in-order batch of create/archive events. */
private[this] def partitionInsertsDeletes(
txes: Iterable[lav1.event.Event]
): InsertDeleteStep.LAV1 = {
val csb = Vector.newBuilder[lav1.event.CreatedEvent]
val asb = Map.newBuilder[String, lav1.event.ArchivedEvent]
import lav1.event.Event
import Event.Event._
txes foreach {
case Event(Created(c)) => discard { csb += c }
case Event(Archived(a)) => discard { asb += ((a.contractId, a)) }
case Event(Empty) => () // nonsense
}
val as = asb.result()
InsertDeleteStep(csb.result() filter (ce => !as.contains(ce.contractId)), as)
}

/** Like `acsAndBoundary`, but also include the events produced by `transactionsSince`
* after the ACS's last offset, terminating with the last offset of the last transaction,
* or the ACS's last offset if there were no transactions.
*/
private[daml] def acsFollowingAndBoundary(
transactionsSince: lav1.ledger_offset.LedgerOffset => Source[Transaction, NotUsed]
): Graph[FanOutShape2[
lav1.active_contracts_service.GetActiveContractsResponse,
ContractStreamStep.LAV1,
BeginBookmark[String],
], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
import ContractStreamStep.{LiveBegin, Acs}
type Off = BeginBookmark[String]
val acs = b add acsAndBoundary
val dupOff = b add Broadcast[Off](2)
val liveStart = Flow fromFunction { off: Off =>
LiveBegin(domain.Offset.tag.subst(off))
}
val txns = b add transactionsFollowingBoundary(transactionsSince)
val allSteps = b add Concat[ContractStreamStep.LAV1](3)
// format: off
discard { dupOff <~ acs.out1 }
discard { acs.out0.map(ces => Acs(ces.toVector)) ~> allSteps }
discard { dupOff ~> liveStart ~> allSteps }
discard { txns.out0 ~> allSteps }
discard { dupOff ~> txns.in }
// format: on
new FanOutShape2(acs.in, allSteps.out, txns.out1)
}

/** Split a series of ACS responses into two channels: one with contracts, the
* other with a single result, the last offset.
*/
private[this] def acsAndBoundary
: Graph[FanOutShape2[lav1.active_contracts_service.GetActiveContractsResponse, Seq[
lav1.event.CreatedEvent,
], BeginBookmark[String]], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
import lav1.active_contracts_service.{GetActiveContractsResponse => GACR}
val dup = b add Broadcast[GACR](2)
val acs = b add (Flow fromFunction ((_: GACR).activeContracts))
val off = b add Flow[GACR]
.collect { case gacr if gacr.offset.nonEmpty => AbsoluteBookmark(gacr.offset) }
.via(last(LedgerBegin: BeginBookmark[String]))
discard { dup ~> acs }
discard { dup ~> off }
new FanOutShape2(dup.in, acs.out, off.out)
}

/** Interpreting the transaction stream so it conveniently depends on
* the ACS graph, if desired. Deliberately matching output shape
* to `acsFollowingAndBoundary`.
*/
private[daml] def transactionsFollowingBoundary(
transactionsSince: lav1.ledger_offset.LedgerOffset => Source[Transaction, NotUsed]
): Graph[FanOutShape2[
BeginBookmark[String],
ContractStreamStep.Txn.LAV1,
BeginBookmark[String],
], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
type Off = BeginBookmark[String]
val dupOff = b add Broadcast[Off](2)
val mergeOff = b add Concat[Off](2)
val txns = Flow[Off]
.flatMapConcat(off => transactionsSince(domain.Offset.tag.subst(off).toLedgerApi))
.map(transactionToInsertsAndDeletes)
val txnSplit = b add project2[ContractStreamStep.Txn.LAV1, domain.Offset]
import domain.Offset.`Offset ordering`
val lastTxOff = b add last(LedgerBegin: Off)
type EndoBookmarkFlow[A] = Flow[BeginBookmark[A], BeginBookmark[A], NotUsed]
val maxOff = b add domain.Offset.tag.unsubst[EndoBookmarkFlow, String](
max(LedgerBegin: BeginBookmark[domain.Offset])
)
// format: off
discard { txnSplit.in <~ txns <~ dupOff }
discard { dupOff ~> mergeOff ~> maxOff }
discard { txnSplit.out1.map(off => AbsoluteBookmark(off.unwrap)) ~> lastTxOff ~> mergeOff }
// format: on
new FanOutShape2(dupOff.in, txnSplit.out0, maxOff.out)
}

private[this] def transactionToInsertsAndDeletes(
tx: lav1.transaction.Transaction
): (ContractStreamStep.Txn.LAV1, domain.Offset) = {
val offset = domain.Offset.fromLedgerApi(tx)
(ContractStreamStep.Txn(partitionInsertsDeletes(tx.events), offset), offset)
}

private[daml] def transactionFilter(
parties: OneAnd[Set, domain.Party],
templateIds: List[TemplateId.RequiredPkg],
): lav1.transaction_filter.TransactionFilter = {
import lav1.transaction_filter._

val filters =
if (templateIds.isEmpty) Filters.defaultInstance
else Filters(Some(lav1.transaction_filter.InclusiveFilters(templateIds.map(apiIdentifier))))

TransactionFilter(domain.Party.unsubst(parties.toVector).map(_ -> filters).toMap)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.fetchcontracts

import com.daml.lf
import lf.data.Ref
import util.ClientUtil.boxedRecord
import com.daml.ledger.api.{v1 => lav1}
import com.daml.ledger.api.refinements.{ApiTypes => lar}
import scalaz.std.list._
import scalaz.std.option._
import scalaz.std.string._
import scalaz.syntax.std.option._
import scalaz.syntax.traverse._
import scalaz.{@@, Applicative, Order, Semigroup, Show, Tag, Tags, Traverse, \/}

package object domain {
type LfValue = lf.value.Value

type ContractIdTag = lar.ContractIdTag
type ContractId = lar.ContractId
val ContractId = lar.ContractId

type PartyTag = lar.PartyTag
type Party = lar.Party
val Party = lar.Party

type Offset = String @@ OffsetTag

private[daml] implicit final class `fc domain ErrorOps`[A](private val o: Option[A])
extends AnyVal {
def required(label: String): Error \/ A =
o toRightDisjunction Error(Symbol("ErrorOps_required"), s"Missing required field $label")
}
}

package domain {
final case class Error(id: Symbol, message: String)

object Error {
implicit val errorShow: Show[Error] = Show shows { e =>
s"domain.Error, ${e.id: Symbol}: ${e.message: String}"
}
}

sealed trait OffsetTag

object Offset {
private[daml] val tag = Tag.of[OffsetTag]

def apply(s: String): Offset = tag(s)

def unwrap(x: Offset): String = tag.unwrap(x)

def fromLedgerApi(
gacr: lav1.active_contracts_service.GetActiveContractsResponse
): Option[Offset] =
Option(gacr.offset).filter(_.nonEmpty).map(x => Offset(x))

def fromLedgerApi(
gler: lav1.transaction_service.GetLedgerEndResponse
): Option[Offset] =
gler.offset.flatMap(_.value.absolute).filter(_.nonEmpty).map(x => Offset(x))

def fromLedgerApi(tx: lav1.transaction.Transaction): Offset = Offset(tx.offset)

def toLedgerApi(o: Offset): lav1.ledger_offset.LedgerOffset =
lav1.ledger_offset.LedgerOffset(lav1.ledger_offset.LedgerOffset.Value.Absolute(unwrap(o)))

implicit val semigroup: Semigroup[Offset] = Tag.unsubst(Semigroup[Offset @@ Tags.LastVal])
implicit val `Offset ordering`: Order[Offset] = Order.orderBy[Offset, String](Offset.unwrap(_))
}

final case class TemplateId[+PkgId](packageId: PkgId, moduleName: String, entityName: String)

object TemplateId {
type OptionalPkg = TemplateId[Option[String]]
type RequiredPkg = TemplateId[String]
type NoPkg = TemplateId[Unit]

def fromLedgerApi(in: lav1.value.Identifier): TemplateId.RequiredPkg =
TemplateId(in.packageId, in.moduleName, in.entityName)

def qualifiedName(a: TemplateId[_]): Ref.QualifiedName =
Ref.QualifiedName(
Ref.DottedName.assertFromString(a.moduleName),
Ref.DottedName.assertFromString(a.entityName),
)

def toLedgerApiValue(a: RequiredPkg): Ref.Identifier = {
val qfName = qualifiedName(a)
val packageId = Ref.PackageId.assertFromString(a.packageId)
Ref.Identifier(packageId, qfName)
}

}

final case class ActiveContract[+LfV](
contractId: ContractId,
templateId: TemplateId.RequiredPkg,
key: Option[LfV],
payload: LfV,
signatories: Seq[Party],
observers: Seq[Party],
agreementText: String,
)

object ActiveContract {

def matchesKey(k: LfValue)(a: domain.ActiveContract[LfValue]): Boolean =
a.key.fold(false)(_ == k)

def fromLedgerApi(
gacr: lav1.active_contracts_service.GetActiveContractsResponse
): Error \/ List[ActiveContract[lav1.value.Value]] = {
gacr.activeContracts.toList.traverse(fromLedgerApi(_))
}

def fromLedgerApi(in: lav1.event.CreatedEvent): Error \/ ActiveContract[lav1.value.Value] =
for {
templateId <- in.templateId required "templateId"
payload <- in.createArguments required "createArguments"
} yield ActiveContract(
contractId = ContractId(in.contractId),
templateId = TemplateId fromLedgerApi templateId,
key = in.contractKey,
payload = boxedRecord(payload),
signatories = Party.subst(in.signatories),
observers = Party.subst(in.observers),
agreementText = in.agreementText getOrElse "",
)

implicit val covariant: Traverse[ActiveContract] = new Traverse[ActiveContract] {

override def map[A, B](fa: ActiveContract[A])(f: A => B): ActiveContract[B] =
fa.copy(key = fa.key map f, payload = f(fa.payload))

override def traverseImpl[G[_]: Applicative, A, B](
fa: ActiveContract[A]
)(f: A => G[B]): G[ActiveContract[B]] = {
import scalaz.syntax.apply._
val gk: G[Option[B]] = fa.key traverse f
val ga: G[B] = f(fa.payload)
^(gk, ga)((k, a) => fa.copy(key = k, payload = a))
}
}
}

// This allows us to avoid rewriting all the imports and references
// to http.domain. We can snap the indirections and remove these
// as convenient
private[daml] trait Aliases {
import com.daml.fetchcontracts.{domain => here}
type Error = here.Error
final val Error = here.Error
type LfValue = here.LfValue
type TemplateId[+PkgId] = here.TemplateId[PkgId]
final val TemplateId = here.TemplateId
type ContractId = here.ContractId
final val ContractId = here.ContractId
type Party = here.Party
final val Party = here.Party
type Offset = here.Offset
final val Offset = here.Offset
type ActiveContract[+LfV] = here.ActiveContract[LfV]
final val ActiveContract = here.ActiveContract
}
}
Loading

0 comments on commit 9fd8182

Please sign in to comment.