Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test TransactionsReader.getContractStateEvents within the JDBC DAO suite #9849

Merged
merged 1 commit into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec
(_, t1) <- store(singleCreate)
(_, t2) <- store(singleCreate)
(_, _) <- store(singleExercise(nonTransient(t2).loneElement))
(_, _) <- store(fullyTransient)
(_, _) <- store(fullyTransient())
(_, t5) <- store(singleCreate)
(_, t6) <- store(singleCreate)
after <- ledgerDao.lookupLedgerEnd()
Expand Down Expand Up @@ -77,7 +77,7 @@ private[dao] trait JdbcLedgerDaoActiveContractsSpec
(_, _) <- store(singleCreate)
(_, c) <- store(singleCreate)
(_, _) <- store(singleExercise(nonTransient(c).loneElement))
(_, _) <- store(fullyTransient)
(_, _) <- store(fullyTransient())
(_, _) <- store(singleCreate)
(_, _) <- store(singleCreate)
activeContractsAfter <- activeContractsOf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.dao

import java.util.concurrent.atomic.AtomicLong

import akka.NotUsed
import akka.stream.scaladsl.{Sink, Source}
import com.daml.ledger.participant.state.v1.Offset
import com.daml.lf.data.ImmArray
import com.daml.lf.value.{Value => LfValue}
import com.daml.platform.store.appendonlydao.events.{Contract, ContractId}
import com.daml.platform.store.dao.events.ContractStateEvent
import com.daml.platform.store.dao.events.ContractStateEvent.{Archived, Created, LedgerEndMarker}
import org.scalatest.LoneElement
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.collection.immutable
import scala.concurrent.Future

/** This test can only be run successfully against the append-only schema on PostgreSQL.
*
* The asserted logic in this test, the [[LedgerDaoTransactionsReader.getContractStateEvents]] event stream,
* uses the `event_kind` column for SQL events filtering, which is not available in the old mutating schema.
*/
trait JdbcLedgerDaoContractEventsStreamSpec extends LoneElement {
this: AsyncFlatSpec with Matchers with JdbcLedgerDaoSuite =>

behavior of "JdbcLedgerDao (contract stream events)"

it should "return the expected contracts event stream for the specified offset range" in {
val contractArg = (arg: String) =>
LfValue.ValueRecord(
Some(someTemplateId),
ImmArray(
recordFieldName("text") -> LfValue.ValueText(arg)
),
)

for {
before <- ledgerDao.lookupLedgerEndOffsetAndSequentialId()
(offset1, t1) <- store(
singleCreate(cid => create(absCid = cid, contractArgument = contractArg("t1")))
)

(key2, globalKey2) = createTestKey(Set(alice, bob))
(offset2, t2) <- createAndStoreContract(
submittingParties = Set(alice),
signatories = Set(alice, bob),
stakeholders = Set(alice, bob),
key = Some(key2),
contractArgument = contractArg("t2"),
)

(offset3, _) <- store(singleExercise(nonTransient(t2).loneElement, Some(key2)))

(offset4, t4) <- store(
fullyTransient(cid => create(absCid = cid, contractArgument = contractArg("t4")))
)

(offset5, t5) <- store(
singleCreate(cid => create(absCid = cid, contractArgument = contractArg("t5")))
)

(offset6, t6) <- store(
singleCreate(cid => create(absCid = cid, contractArgument = contractArg("t6")))
)

after <- ledgerDao.lookupLedgerEndOffsetAndSequentialId()

contractStateEvents <- contractEventsOf(
ledgerDao.transactionsReader.getContractStateEvents(
startExclusive = before,
endInclusive = after,
)
)
} yield {
val first = contractStateEvents.head
val sequentialIdState = new AtomicLong(first.eventSequentialId)

contractStateEvents should contain theSameElementsInOrderAs Seq(
Created(
nonTransient(t1).loneElement,
contract(created(t1).loneElement, contractArg("t1")),
None,
t1.ledgerEffectiveTime,
Set(alice, bob),
offset1,
sequentialIdState.getAndIncrement(),
),
Created(
nonTransient(t2).loneElement,
contract(created(t2).loneElement, contractArg("t2")),
Some(globalKey2),
t2.ledgerEffectiveTime,
Set(alice, bob),
offset2,
sequentialIdState.getAndIncrement(),
),
Archived(
nonTransient(t2).loneElement,
Some(globalKey2),
Set(alice, bob),
offset3,
sequentialIdState.getAndIncrement(),
),
Created(
created(t4).loneElement,
contract(created(t4).loneElement, contractArg("t4")),
None,
t4.ledgerEffectiveTime,
Set(alice, bob),
offset4,
sequentialIdState.getAndIncrement(),
),
Archived(
created(t4).loneElement,
None,
Set(alice, bob),
offset4,
sequentialIdState.getAndIncrement(),
),
Created(
nonTransient(t5).loneElement,
contract(created(t5).loneElement, contractArg("t5")),
None,
t5.ledgerEffectiveTime,
Set(alice, bob),
offset5,
sequentialIdState.getAndIncrement(),
),
Created(
nonTransient(t6).loneElement,
contract(created(t6).loneElement, contractArg("t6")),
None,
t6.ledgerEffectiveTime,
Set(alice, bob),
offset6,
sequentialIdState.get(),
),
LedgerEndMarker(offset6, sequentialIdState.get()),
)
}
}

private def contractEventsOf(
source: Source[((Offset, Long), ContractStateEvent), NotUsed]
): Future[immutable.Seq[ContractStateEvent]] =
source
.runWith(Sink.seq)
.map(_.map(_._2))

private def contract(cid: ContractId, contractArgument: LfValue[ContractId]): Contract =
createNode(cid, Set.empty, Set.empty, contractArgument = contractArgument)
.copy(agreementText = "")
.versionedCoinst
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.daml.lf.transaction.Node._
import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.transaction._
import com.daml.lf.value.{Value => LfValue}
import com.daml.lf.value.Value.{ContractId, ContractInst}
import com.daml.lf.value.Value.{ContractId, ContractInst, ValueText}
import com.daml.logging.LoggingContext
import com.daml.platform.indexer.OffsetStep
import com.daml.platform.store.dao.events.TransactionsWriter
Expand Down Expand Up @@ -89,7 +89,7 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
Ref.DottedName.assertFromString(name),
),
)
private def recordFieldName(name: String) = Some(Ref.Name.assertFromString(name))
protected def recordFieldName(name: String) = Some(Ref.Name.assertFromString(name))
protected final val someTemplateId = testIdentifier("ParameterShowcase")
protected final val someValueText = LfValue.ValueText("some text")
protected final val someValueInt = LfValue.ValueInt64(1)
Expand Down Expand Up @@ -206,7 +206,8 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
)

private def exercise(
targetCid: ContractId
targetCid: ContractId,
key: Option[KeyWithMaintainers[LfValue[ContractId]]] = None,
): NodeExercises[NodeId, ContractId] =
NodeExercises(
targetCoid = targetCid,
Expand All @@ -221,11 +222,20 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
choiceObservers = Set.empty,
children = ImmArray.empty,
exerciseResult = Some(someChoiceResult),
key = None,
key = key,
byKey = false,
version = TransactionVersion.minVersion,
)

// Ids of all contracts created in a transaction - both transient and non-transient
protected def created(tx: LedgerEntry.Transaction): Set[ContractId] =
tx.transaction.fold(Set.empty[ContractId]) {
case (set, (_, create: NodeCreate[ContractId])) =>
set + create.coid
case (set, _) =>
set
}

// All non-transient contracts created in a transaction
protected def nonTransient(tx: LedgerEntry.Transaction): Set[ContractId] =
tx.transaction.fold(Set.empty[ContractId]) {
Expand Down Expand Up @@ -267,11 +277,21 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
)
}

protected final def createTestKey(
maintainers: Set[Party]
): (KeyWithMaintainers[ValueText], GlobalKey) = {
val aTextValue = ValueText(scala.util.Random.nextString(10))
val keyWithMaintainers = KeyWithMaintainers(aTextValue, maintainers)
val globalKey = GlobalKey.assertBuild(someTemplateId, aTextValue)
(keyWithMaintainers, globalKey)
}

protected final def createAndStoreContract(
submittingParties: Set[Party],
signatories: Set[Party],
stakeholders: Set[Party],
key: Option[KeyWithMaintainers[LfValue[ContractId]]],
contractArgument: LfValue[ContractId] = someContractArgument,
): Future[(Offset, LedgerEntry.Transaction)] =
store(
singleCreate(
Expand All @@ -281,6 +301,7 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
signatories = signatories,
stakeholders = stakeholders,
key = key,
contractArgument = contractArgument,
)
},
actAs = submittingParties.toList,
Expand Down Expand Up @@ -350,10 +371,11 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
}

protected def singleExercise(
targetCid: ContractId
targetCid: ContractId,
key: Option[KeyWithMaintainers[LfValue[ContractId]]] = None,
): (Offset, LedgerEntry.Transaction) = {
val txBuilder = TransactionBuilder()
val nid = txBuilder.add(exercise(targetCid))
val nid = txBuilder.add(exercise(targetCid, key))
val offset = nextOffset()
val id = offset.toLong
val let = Instant.now
Expand Down Expand Up @@ -436,7 +458,9 @@ private[dao] trait JdbcLedgerDaoSuite extends JdbcLedgerDaoBackend {
)
}

protected def fullyTransient: (Offset, LedgerEntry.Transaction) = {
protected def fullyTransient(
create: ContractId => NodeCreate[ContractId] = create(_)
): (Offset, LedgerEntry.Transaction) = {
val txBuilder = TransactionBuilder()
val cid = txBuilder.newCid
val createId = txBuilder.add(create(cid))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec

it should "return the expected transaction tree for a correct request (create, exercise)" in {
for {
(offset, tx) <- store(fullyTransient)
(offset, tx) <- store(fullyTransient())
result <- ledgerDao.transactionsReader
.lookupTransactionTreeById(tx.transactionId, tx.actAs.toSet)
} yield {
Expand Down Expand Up @@ -281,7 +281,7 @@ private[dao] trait JdbcLedgerDaoTransactionTreesSpec
(_, t1) <- store(singleCreate)
(_, t2) <- store(singleCreate)
(_, t3) <- store(singleExercise(nonTransient(t2).loneElement))
(_, t4) <- store(fullyTransient)
(_, t4) <- store(fullyTransient())
to <- ledgerDao.lookupLedgerEnd()
} yield (from, to, Seq(t1, t2, t3, t4))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid

it should "hide events on transient contracts to the original submitter" in {
for {
(offset, tx) <- store(fullyTransient)
(offset, tx) <- store(fullyTransient())
result <- ledgerDao.transactionsReader
.lookupFlatTransactionById(tx.transactionId, tx.actAs.toSet)
} yield {
Expand Down Expand Up @@ -589,7 +589,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid
(_, t1) <- store(singleCreate)
(_, t2) <- store(singleCreate)
(_, t3) <- store(singleExercise(nonTransient(t2).loneElement))
(_, t4) <- store(fullyTransient)
(_, t4) <- store(fullyTransient())
to <- ledgerDao.lookupLedgerEnd()
} yield (from, to, Seq(t1, t2, t3, t4))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ final class JdbcLedgerDaoPostgresqlAppendOnlySpec
with JdbcLedgerDaoPartiesSpec
with JdbcLedgerDaoTransactionsSpec
with JdbcLedgerDaoTransactionTreesSpec
with JdbcLedgerDaoContractEventsStreamSpec
with JdbcLedgerDaoTransactionsWriterSpec
with JdbcAppendOnlyTransactionInsertion