diff --git a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala index 549cb7097e78..ee2e9dbd3084 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/index/ReadOnlySqlLedger.scala @@ -28,7 +28,7 @@ import com.daml.platform.store.appendonlydao.{ LedgerDaoTransactionsReader, LedgerReadDao, } -import com.daml.platform.store.cache.MutableLedgerEndCache +import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache} import com.daml.platform.store.{BaseLedger, LfValueTranslationCache} import com.daml.resources.ProgramResource.StartupException import com.daml.timer.RetryStrategy @@ -66,7 +66,8 @@ private[platform] object ReadOnlySqlLedger { override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = { val ledgerEndCache = MutableLedgerEndCache() for { - ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories).acquire() + ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache) + .acquire() ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId)) ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire() } yield ledger @@ -142,6 +143,7 @@ private[platform] object ReadOnlySqlLedger { private def ledgerDaoOwner( servicesExecutionContext: ExecutionContext, errorFactories: ErrorFactories, + ledgerEndCache: LedgerEndCache, ): ResourceOwner[LedgerReadDao] = JdbcLedgerDao.readOwner( serverRole, @@ -156,6 +158,7 @@ private[platform] object ReadOnlySqlLedger { Some(enricher), participantId, errorFactories, + ledgerEndCache, ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/IndexMetadata.scala b/ledger/participant-integration-api/src/main/scala/platform/store/IndexMetadata.scala index e82baa798a2d..5dc41aca3c6c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/IndexMetadata.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/IndexMetadata.scala @@ -14,6 +14,7 @@ import com.daml.metrics.Metrics import com.daml.platform.ApiOffset import com.daml.platform.configuration.ServerRole import com.daml.platform.server.api.validation.ErrorFactories +import com.daml.platform.store.cache.MutableLedgerEndCache import scalaz.Tag import scala.concurrent.duration._ @@ -59,6 +60,7 @@ object IndexMetadata { // and this property is not needed for the used ReadDao. participantId = Ref.ParticipantId.assertFromString("1"), errorFactories = errorFactories, + ledgerEndCache = MutableLedgerEndCache(), // not used ) private val Empty = "" diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index eba41e8aed3a..e69c843190d6 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -43,7 +43,7 @@ import com.daml.platform.store.backend.{ StorageBackendFactory, UpdateToDbDto, } -import com.daml.platform.store.cache.MutableLedgerEndCache +import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache} import com.daml.platform.store.entries.{ ConfigurationEntry, LedgerEntry, @@ -772,6 +772,7 @@ private[platform] object JdbcLedgerDao { enricher: Option[ValueEnricher], participantId: Ref.ParticipantId, errorFactories: ErrorFactories, + ledgerEndCache: LedgerEndCache, )(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] = owner( serverRole, @@ -788,6 +789,7 @@ private[platform] object JdbcLedgerDao { participantId = participantId, sequentialWriteDao = NoopSequentialWriteDao, errorFactories = errorFactories, + ledgerEndCache = ledgerEndCache, ).map(new MeteredLedgerReadDao(_, metrics)) def writeOwner( @@ -828,6 +830,7 @@ private[platform] object JdbcLedgerDao { ledgerEndCache, ), errorFactories = errorFactories, + ledgerEndCache = ledgerEndCache, ).map(new MeteredLedgerDao(_, metrics)) } @@ -872,6 +875,7 @@ private[platform] object JdbcLedgerDao { ledgerEndCache, ), errorFactories = errorFactories, + ledgerEndCache = ledgerEndCache, ).map(new MeteredLedgerDao(_, metrics)) } @@ -917,6 +921,7 @@ private[platform] object JdbcLedgerDao { participantId: Ref.ParticipantId, sequentialWriteDao: SequentialWriteDao, errorFactories: ErrorFactories, + ledgerEndCache: LedgerEndCache, )(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = { val dbType = DbType.jdbcType(jdbcUrl) val factory = StorageBackendFactory.of(dbType) @@ -940,7 +945,7 @@ private[platform] object JdbcLedgerDao { enricher, sequentialWriteDao, participantId, - StorageBackendFactory.readStorageBackendFor(dbType), + StorageBackendFactory.readStorageBackendFor(dbType, ledgerEndCache), factory.createParameterStorageBackend, factory.createDeduplicationStorageBackend, factory.createResetStorageBackend, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala index 07c529f9b6cd..0ce1402d2be4 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/ContractStateEventsReader.scala @@ -7,7 +7,7 @@ import java.io.ByteArrayInputStream import com.daml.platform.store.appendonlydao.events import com.daml.platform.store.serialization.{Compression, ValueSerializer} import com.daml.platform.store.LfValueTranslationCache -import com.daml.platform.store.backend.StorageBackend.RawContractStateEvent +import com.daml.platform.store.backend.ContractStorageBackend.RawContractStateEvent import scala.util.control.NoStackTrace diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala index 71326e4690f2..fc6eee910f17 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/TransactionLogUpdatesReader.scala @@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao.events import java.io.ByteArrayInputStream import com.daml.lf.data.Ref -import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent +import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent import com.daml.platform.store.interfaces.TransactionLogUpdate import com.daml.platform.store.serialization.{Compression, ValueSerializer} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index 06c13b92ffc7..09833231296a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -17,7 +17,6 @@ import com.daml.platform import com.daml.platform.store.EventSequentialId import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} -import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState @@ -31,24 +30,7 @@ import scala.util.Try * Naming convention for the interface methods, which requiring Connection: * - read operations are represented as nouns (plural, singular form indicates cardinality) * - write operations are represented as verbs - * - * @tparam DB_BATCH Since parallel ingestion comes also with batching, this implementation specific type allows separation of the CPU intensive batching operation from the pure IO intensive insertBatch operation. */ -trait StorageBackend[DB_BATCH] - extends IngestionStorageBackend[DB_BATCH] - with ParameterStorageBackend - with ConfigurationStorageBackend - with PartyStorageBackend - with PackageStorageBackend - with DeduplicationStorageBackend - with CompletionStorageBackend - with ContractStorageBackend - with EventStorageBackend - with DataSourceStorageBackend - with DBLockStorageBackend - with IntegrityStorageBackend - with ResetStorageBackend - with StringInterningStorageBackend trait ResetStorageBackend { @@ -224,10 +206,10 @@ trait ContractStorageBackend { def keyState(key: Key, validAt: Long)(connection: Connection): KeyState def contractState(contractId: ContractId, before: Long)( connection: Connection - ): Option[StorageBackend.RawContractState] + ): Option[ContractStorageBackend.RawContractState] def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection - ): Option[StorageBackend.RawContract] + ): Option[ContractStorageBackend.RawContract] def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection ): Option[String] @@ -236,7 +218,38 @@ trait ContractStorageBackend { ): Option[ContractId] def contractStateEvents(startExclusive: Long, endInclusive: Long)( connection: Connection - ): Vector[StorageBackend.RawContractStateEvent] + ): Vector[ContractStorageBackend.RawContractStateEvent] +} + +object ContractStorageBackend { + case class RawContractState( + templateId: Option[String], + flatEventWitnesses: Set[Ref.Party], + createArgument: Option[Array[Byte]], + createArgumentCompression: Option[Int], + eventKind: Int, + ledgerEffectiveTime: Option[Timestamp], + ) + + class RawContract( + val templateId: String, + val createArgument: Array[Byte], + val createArgumentCompression: Option[Int], + ) + + case class RawContractStateEvent( + eventKind: Int, + contractId: ContractId, + templateId: Option[Ref.Identifier], + ledgerEffectiveTime: Option[Timestamp], + createKeyValue: Option[Array[Byte]], + createKeyCompression: Option[Int], + createArgument: Option[Array[Byte]], + createArgumentCompression: Option[Int], + flatEventWitnesses: Set[Ref.Party], + eventSequentialId: Long, + offset: Offset, + ) } trait EventStorageBackend { @@ -278,7 +291,7 @@ trait EventStorageBackend { def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long] def rawEvents(startExclusive: Long, endInclusive: Long)( connection: Connection - ): Vector[RawTransactionEvent] + ): Vector[EventStorageBackend.RawTransactionEvent] } object EventStorageBackend { @@ -293,6 +306,37 @@ object EventStorageBackend { wildCardParties: Set[Ref.Party], partiesAndTemplates: Set[(Set[Ref.Party], Set[Ref.Identifier])], ) + + case class RawTransactionEvent( + eventKind: Int, + transactionId: String, + nodeIndex: Int, + commandId: Option[String], + workflowId: Option[String], + eventId: EventId, + contractId: platform.store.appendonlydao.events.ContractId, + templateId: Option[platform.store.appendonlydao.events.Identifier], + ledgerEffectiveTime: Option[Timestamp], + createSignatories: Option[Array[String]], + createObservers: Option[Array[String]], + createAgreementText: Option[String], + createKeyValue: Option[Array[Byte]], + createKeyCompression: Option[Int], + createArgument: Option[Array[Byte]], + createArgumentCompression: Option[Int], + treeEventWitnesses: Set[String], + flatEventWitnesses: Set[String], + submitters: Set[String], + exerciseChoice: Option[String], + exerciseArgument: Option[Array[Byte]], + exerciseArgumentCompression: Option[Int], + exerciseResult: Option[Array[Byte]], + exerciseResultCompression: Option[Int], + exerciseActors: Option[Array[String]], + exerciseChildEventIds: Option[Array[String]], + eventSequentialId: Long, + offset: Offset, + ) extends NeverEqualsOverride } trait DataSourceStorageBackend { @@ -358,65 +402,3 @@ trait StringInterningStorageBackend { connection: Connection ): Iterable[(Int, String)] } - -object StorageBackend { - case class RawContractState( - templateId: Option[String], - flatEventWitnesses: Set[Ref.Party], - createArgument: Option[Array[Byte]], - createArgumentCompression: Option[Int], - eventKind: Int, - ledgerEffectiveTime: Option[Timestamp], - ) - - class RawContract( - val templateId: String, - val createArgument: Array[Byte], - val createArgumentCompression: Option[Int], - ) - - case class RawContractStateEvent( - eventKind: Int, - contractId: ContractId, - templateId: Option[Ref.Identifier], - ledgerEffectiveTime: Option[Timestamp], - createKeyValue: Option[Array[Byte]], - createKeyCompression: Option[Int], - createArgument: Option[Array[Byte]], - createArgumentCompression: Option[Int], - flatEventWitnesses: Set[Ref.Party], - eventSequentialId: Long, - offset: Offset, - ) - - case class RawTransactionEvent( - eventKind: Int, - transactionId: String, - nodeIndex: Int, - commandId: Option[String], - workflowId: Option[String], - eventId: EventId, - contractId: platform.store.appendonlydao.events.ContractId, - templateId: Option[platform.store.appendonlydao.events.Identifier], - ledgerEffectiveTime: Option[Timestamp], - createSignatories: Option[Array[String]], - createObservers: Option[Array[String]], - createAgreementText: Option[String], - createKeyValue: Option[Array[Byte]], - createKeyCompression: Option[Int], - createArgument: Option[Array[Byte]], - createArgumentCompression: Option[Int], - treeEventWitnesses: Set[String], - flatEventWitnesses: Set[String], - submitters: Set[String], - exerciseChoice: Option[String], - exerciseArgument: Option[Array[Byte]], - exerciseArgumentCompression: Option[Int], - exerciseResult: Option[Array[Byte]], - exerciseResultCompression: Option[Int], - exerciseActors: Option[Array[String]], - exerciseChildEventIds: Option[Array[String]], - eventSequentialId: Long, - offset: Offset, - ) extends NeverEqualsOverride -} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala index 868a8a844c1a..2741192fa2a8 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackendFactory.scala @@ -7,17 +7,18 @@ import com.daml.platform.store.DbType import com.daml.platform.store.backend.h2.H2StorageBackendFactory import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory +import com.daml.platform.store.cache.LedgerEndCache trait StorageBackendFactory { def createIngestionStorageBackend: IngestionStorageBackend[_] def createParameterStorageBackend: ParameterStorageBackend - def createConfigurationStorageBackend: ConfigurationStorageBackend - def createPartyStorageBackend: PartyStorageBackend - def createPackageStorageBackend: PackageStorageBackend + def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend + def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend + def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend def createDeduplicationStorageBackend: DeduplicationStorageBackend def createCompletionStorageBackend: CompletionStorageBackend - def createContractStorageBackend: ContractStorageBackend - def createEventStorageBackend: EventStorageBackend + def createContractStorageBackend(ledgerEndCache: LedgerEndCache): ContractStorageBackend + def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend def createDataSourceStorageBackend: DataSourceStorageBackend def createDBLockStorageBackend: DBLockStorageBackend def createIntegrityStorageBackend: IntegrityStorageBackend @@ -33,15 +34,18 @@ object StorageBackendFactory { case DbType.Oracle => OracleStorageBackendFactory } - def readStorageBackendFor(dbType: DbType): ReadStorageBackend = { + def readStorageBackendFor( + dbType: DbType, + ledgerEndCache: LedgerEndCache, + ): ReadStorageBackend = { val factory = of(dbType) ReadStorageBackend( - configurationStorageBackend = factory.createConfigurationStorageBackend, - partyStorageBackend = factory.createPartyStorageBackend, - packageStorageBackend = factory.createPackageStorageBackend, + configurationStorageBackend = factory.createConfigurationStorageBackend(ledgerEndCache), + partyStorageBackend = factory.createPartyStorageBackend(ledgerEndCache), + packageStorageBackend = factory.createPackageStorageBackend(ledgerEndCache), completionStorageBackend = factory.createCompletionStorageBackend, - contractStorageBackend = factory.createContractStorageBackend, - eventStorageBackend = factory.createEventStorageBackend, + contractStorageBackend = factory.createContractStorageBackend(ledgerEndCache), + eventStorageBackend = factory.createEventStorageBackend(ledgerEndCache), ) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala new file mode 100644 index 000000000000..50d30be39264 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackendFactory.scala @@ -0,0 +1,34 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend.common + +import com.daml.platform.store.backend.{ + ConfigurationStorageBackend, + IntegrityStorageBackend, + PackageStorageBackend, + ParameterStorageBackend, + StorageBackendFactory, + StringInterningStorageBackend, +} +import com.daml.platform.store.cache.LedgerEndCache + +trait CommonStorageBackendFactory extends StorageBackendFactory { + + override def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend = + PackageStorageBackendTemplate + + override val createParameterStorageBackend: ParameterStorageBackend = + ParameterStorageBackendTemplate + + override def createConfigurationStorageBackend( + ledgerEndCache: LedgerEndCache + ): ConfigurationStorageBackend = + ConfigurationStorageBackendTemplate + + override val createIntegrityStorageBackend: IntegrityStorageBackend = + IntegrityStorageBackendTemplate + + override val createStringInterningStorageBackend: StringInterningStorageBackend = + StringInterningStorageBackendTemplate +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala index eed38f81e529..6252bb4d070c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/ContractStorageBackendTemplate.scala @@ -18,7 +18,7 @@ import com.daml.platform.store.Conversions.{ import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf import com.daml.platform.store.appendonlydao.events.{ContractId, Key} import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation} -import com.daml.platform.store.backend.{ContractStorageBackend, StorageBackend} +import com.daml.platform.store.backend.ContractStorageBackend import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{ KeyAssigned, KeyState, @@ -135,7 +135,7 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr validAt = Some(validAt), )(connection).getOrElse(KeyUnassigned) - private val fullDetailsContractRowParser: RowParser[StorageBackend.RawContractState] = + private val fullDetailsContractRowParser: RowParser[ContractStorageBackend.RawContractState] = (str("template_id").? ~ flatEventWitnessesColumn("flat_event_witnesses") ~ byteArray("create_argument").? @@ -143,11 +143,11 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr ~ int("event_kind") ~ timestampFromMicros("ledger_effective_time").?) .map(SqlParser.flatten) - .map(StorageBackend.RawContractState.tupled) + .map(ContractStorageBackend.RawContractState.tupled) override def contractState(contractId: ContractId, before: Long)( connection: Connection - ): Option[StorageBackend.RawContractState] = { + ): Option[ContractStorageBackend.RawContractState] = { import com.daml.platform.store.Conversions.ContractIdToStatement SQL""" SELECT @@ -167,7 +167,7 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr .as(fullDetailsContractRowParser.singleOpt)(connection) } - private val contractStateRowParser: RowParser[StorageBackend.RawContractStateEvent] = + private val contractStateRowParser: RowParser[ContractStorageBackend.RawContractStateEvent] = (int("event_kind") ~ contractId("contract_id") ~ identifier("template_id").? ~ @@ -180,7 +180,7 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr flatEventWitnessesColumn("flat_event_witnesses") ~ offset("event_offset")).map { case eventKind ~ contractId ~ templateId ~ ledgerEffectiveTime ~ createKeyValue ~ createKeyCompression ~ createArgument ~ createArgumentCompression ~ eventSequentialId ~ flatEventWitnesses ~ offset => - StorageBackend.RawContractStateEvent( + ContractStorageBackend.RawContractStateEvent( eventKind, contractId, templateId, @@ -197,7 +197,7 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr override def contractStateEvents(startExclusive: Long, endInclusive: Long)( connection: Connection - ): Vector[StorageBackend.RawContractStateEvent] = + ): Vector[ContractStorageBackend.RawContractStateEvent] = SQL""" SELECT event_kind, @@ -220,13 +220,17 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr ORDER BY event_sequential_id ASC""" .asVectorOf(contractStateRowParser)(connection) - private val contractRowParser: RowParser[StorageBackend.RawContract] = + private val contractRowParser: RowParser[ContractStorageBackend.RawContract] = (str("template_id") ~ byteArray("create_argument") ~ int("create_argument_compression").?) .map(SqlParser.flatten) .map { case (templateId, createArgument, createArgumentCompression) => - new StorageBackend.RawContract(templateId, createArgument, createArgumentCompression) + new ContractStorageBackend.RawContract( + templateId, + createArgument, + createArgumentCompression, + ) } protected def activeContractSqlLiteral( @@ -320,7 +324,7 @@ class ContractStorageBackendTemplate(queryStrategy: QueryStrategy) extends Contr override def activeContractWithArgument( readers: Set[Ref.Party], contractId: ContractId, - )(connection: Connection): Option[StorageBackend.RawContract] = { + )(connection: Connection): Option[ContractStorageBackend.RawContract] = { activeContract( resultSetParser = contractRowParser.singleOpt, resultColumns = List("template_id", "create_argument", "create_argument_compression"), diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala index 2144f9ab7d8a..7b1407b4c25a 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/EventStorageBackendTemplate.scala @@ -21,7 +21,7 @@ import com.daml.platform.store.SimpleSqlAsVectorOf.SimpleSqlAsVectorOf import com.daml.platform.store.appendonlydao.events.{EventsTable, Identifier, Raw} import com.daml.platform.store.backend.EventStorageBackend import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams} -import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent +import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation} import scala.collection.compat.immutable.ArraySeq diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala index bb014d545aa4..2748dc3d3586 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackendFactory.scala @@ -4,59 +4,45 @@ package com.daml.platform.store.backend.h2 import com.daml.platform.store.backend.common.{ + CommonStorageBackendFactory, CompletionStorageBackendTemplate, - ConfigurationStorageBackendTemplate, IngestionStorageBackendTemplate, - IntegrityStorageBackendTemplate, - PackageStorageBackendTemplate, - ParameterStorageBackendTemplate, PartyStorageBackendTemplate, - StringInterningStorageBackendTemplate, } import com.daml.platform.store.backend.{ CompletionStorageBackend, - ConfigurationStorageBackend, ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, - IntegrityStorageBackend, - PackageStorageBackend, - ParameterStorageBackend, PartyStorageBackend, ResetStorageBackend, StorageBackendFactory, - StringInterningStorageBackend, } +import com.daml.platform.store.cache.LedgerEndCache + +object H2StorageBackendFactory extends StorageBackendFactory with CommonStorageBackendFactory { -object H2StorageBackendFactory extends StorageBackendFactory { override val createIngestionStorageBackend: IngestionStorageBackend[_] = new IngestionStorageBackendTemplate(H2Schema.schema) - override val createParameterStorageBackend: ParameterStorageBackend = - ParameterStorageBackendTemplate - - override val createConfigurationStorageBackend: ConfigurationStorageBackend = - ConfigurationStorageBackendTemplate - - override val createPartyStorageBackend: PartyStorageBackend = + override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(H2QueryStrategy) - override val createPackageStorageBackend: PackageStorageBackend = - PackageStorageBackendTemplate - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = H2DeduplicationStorageBackend override val createCompletionStorageBackend: CompletionStorageBackend = new CompletionStorageBackendTemplate(H2QueryStrategy) - override val createContractStorageBackend: ContractStorageBackend = + override def createContractStorageBackend( + ledgerEndCache: LedgerEndCache + ): ContractStorageBackend = H2ContractStorageBackend - override val createEventStorageBackend: EventStorageBackend = + override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend = H2EventStorageBackend override val createDataSourceStorageBackend: DataSourceStorageBackend = @@ -65,12 +51,6 @@ object H2StorageBackendFactory extends StorageBackendFactory { override val createDBLockStorageBackend: DBLockStorageBackend = H2DBLockStorageBackend - override val createIntegrityStorageBackend: IntegrityStorageBackend = - IntegrityStorageBackendTemplate - override val createResetStorageBackend: ResetStorageBackend = H2ResetStorageBackend - - override val createStringInterningStorageBackend: StringInterningStorageBackend = - StringInterningStorageBackendTemplate } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala index a5558dcedabb..06a0bf6b4a63 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackendFactory.scala @@ -4,60 +4,46 @@ package com.daml.platform.store.backend.oracle import com.daml.platform.store.backend.common.{ + CommonStorageBackendFactory, CompletionStorageBackendTemplate, - ConfigurationStorageBackendTemplate, ContractStorageBackendTemplate, IngestionStorageBackendTemplate, - IntegrityStorageBackendTemplate, - PackageStorageBackendTemplate, - ParameterStorageBackendTemplate, PartyStorageBackendTemplate, - StringInterningStorageBackendTemplate, } import com.daml.platform.store.backend.{ CompletionStorageBackend, - ConfigurationStorageBackend, ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, - IntegrityStorageBackend, - PackageStorageBackend, - ParameterStorageBackend, PartyStorageBackend, ResetStorageBackend, StorageBackendFactory, - StringInterningStorageBackend, } +import com.daml.platform.store.cache.LedgerEndCache + +object OracleStorageBackendFactory extends StorageBackendFactory with CommonStorageBackendFactory { -object OracleStorageBackendFactory extends StorageBackendFactory { override val createIngestionStorageBackend: IngestionStorageBackend[_] = new IngestionStorageBackendTemplate(OracleSchema.schema) - override val createParameterStorageBackend: ParameterStorageBackend = - ParameterStorageBackendTemplate - - override val createConfigurationStorageBackend: ConfigurationStorageBackend = - ConfigurationStorageBackendTemplate - - override val createPartyStorageBackend: PartyStorageBackend = + override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(OracleQueryStrategy) - override val createPackageStorageBackend: PackageStorageBackend = - PackageStorageBackendTemplate - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = OracleDeduplicationStorageBackend - override val createCompletionStorageBackend: CompletionStorageBackend = + override def createCompletionStorageBackend: CompletionStorageBackend = new CompletionStorageBackendTemplate(OracleQueryStrategy) - override val createContractStorageBackend: ContractStorageBackend = + override def createContractStorageBackend( + ledgerEndCache: LedgerEndCache + ): ContractStorageBackend = new ContractStorageBackendTemplate(OracleQueryStrategy) - override val createEventStorageBackend: EventStorageBackend = + override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend = OracleEventStorageBackend override val createDataSourceStorageBackend: DataSourceStorageBackend = @@ -66,12 +52,6 @@ object OracleStorageBackendFactory extends StorageBackendFactory { override val createDBLockStorageBackend: DBLockStorageBackend = OracleDBLockStorageBackend - override val createIntegrityStorageBackend: IntegrityStorageBackend = - IntegrityStorageBackendTemplate - override val createResetStorageBackend: ResetStorageBackend = OracleResetStorageBackend - - override val createStringInterningStorageBackend: StringInterningStorageBackend = - StringInterningStorageBackendTemplate } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala index 25c8e9eaf29d..cedd72cb131d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackendFactory.scala @@ -4,60 +4,48 @@ package com.daml.platform.store.backend.postgresql import com.daml.platform.store.backend.common.{ + CommonStorageBackendFactory, CompletionStorageBackendTemplate, - ConfigurationStorageBackendTemplate, ContractStorageBackendTemplate, IngestionStorageBackendTemplate, - IntegrityStorageBackendTemplate, - PackageStorageBackendTemplate, - ParameterStorageBackendTemplate, PartyStorageBackendTemplate, - StringInterningStorageBackendTemplate, } import com.daml.platform.store.backend.{ CompletionStorageBackend, - ConfigurationStorageBackend, ContractStorageBackend, DBLockStorageBackend, DataSourceStorageBackend, DeduplicationStorageBackend, EventStorageBackend, IngestionStorageBackend, - IntegrityStorageBackend, - PackageStorageBackend, - ParameterStorageBackend, PartyStorageBackend, ResetStorageBackend, StorageBackendFactory, - StringInterningStorageBackend, } +import com.daml.platform.store.cache.LedgerEndCache + +object PostgresStorageBackendFactory + extends StorageBackendFactory + with CommonStorageBackendFactory { -object PostgresStorageBackendFactory extends StorageBackendFactory { override val createIngestionStorageBackend: IngestionStorageBackend[_] = new IngestionStorageBackendTemplate(PGSchema.schema) - override val createParameterStorageBackend: ParameterStorageBackend = - ParameterStorageBackendTemplate - - override val createConfigurationStorageBackend: ConfigurationStorageBackend = - ConfigurationStorageBackendTemplate - - override val createPartyStorageBackend: PartyStorageBackend = + override def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend = new PartyStorageBackendTemplate(PostgresQueryStrategy) - override val createPackageStorageBackend: PackageStorageBackend = - PackageStorageBackendTemplate - override val createDeduplicationStorageBackend: DeduplicationStorageBackend = PostgresDeduplicationStorageBackend override val createCompletionStorageBackend: CompletionStorageBackend = new CompletionStorageBackendTemplate(PostgresQueryStrategy) - override val createContractStorageBackend: ContractStorageBackend = + override def createContractStorageBackend( + ledgerEndCache: LedgerEndCache + ): ContractStorageBackend = new ContractStorageBackendTemplate(PostgresQueryStrategy) - override val createEventStorageBackend: EventStorageBackend = + override def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend = PostgresEventStorageBackend override val createDataSourceStorageBackend: DataSourceStorageBackend = @@ -66,12 +54,6 @@ object PostgresStorageBackendFactory extends StorageBackendFactory { override val createDBLockStorageBackend: DBLockStorageBackend = PostgresDBLockStorageBackend - override val createIntegrityStorageBackend: IntegrityStorageBackend = - IntegrityStorageBackendTemplate - override val createResetStorageBackend: ResetStorageBackend = PostgresResetStorageBackend - - override val createStringInterningStorageBackend: StringInterningStorageBackend = - StringInterningStorageBackendTemplate } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala index dea7c7330373..b679196ff13f 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendProvider.scala @@ -5,24 +5,36 @@ package com.daml.platform.store.backend import java.sql.Connection +import com.daml.platform.store.backend.ParameterStorageBackend.LedgerEnd import com.daml.platform.store.backend.h2.H2StorageBackendFactory import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory +import com.daml.platform.store.cache.MutableLedgerEndCache import com.daml.testing.oracle.OracleAroundAll import com.daml.testing.postgresql.PostgresAroundAll import org.scalatest.Suite -/** Creates a database and a [[StorageBackend]]. +/** Creates a database and a [[TestBackend]]. * Used by [[StorageBackendSpec]] to run all StorageBackend tests on different databases. */ private[backend] trait StorageBackendProvider { protected def jdbcUrl: String - protected def backendFactory: StorageBackendFactory + protected def backend: TestBackend protected final def ingest(dbDtos: Vector[DbDto], connection: Connection): Unit = { def typeBoundIngest[T](ingestionStorageBackend: IngestionStorageBackend[T]): Unit = ingestionStorageBackend.insertBatch(connection, ingestionStorageBackend.batch(dbDtos)) - typeBoundIngest(backendFactory.createIngestionStorageBackend) + typeBoundIngest(backend.ingestion) + } + + protected final def updateLedgerEnd(ledgerEnd: LedgerEnd)(connection: Connection): Unit = { + backend.parameter.updateLedgerEnd(ledgerEnd)(connection) + updateLedgerEndCache(connection) + } + + protected final def updateLedgerEndCache(connection: Connection): Unit = { + val ledgerEnd = backend.parameter.ledgerEndOrBeforeBegin(connection) + backend.ledgerEndCache.set(ledgerEnd.lastOffset -> ledgerEnd.lastEventSeqId) } } @@ -30,12 +42,12 @@ private[backend] trait StorageBackendProviderPostgres extends StorageBackendProvider with PostgresAroundAll { this: Suite => override protected def jdbcUrl: String = postgresDatabase.url - override protected val backendFactory: StorageBackendFactory = PostgresStorageBackendFactory + override protected val backend: TestBackend = TestBackend(PostgresStorageBackendFactory) } private[backend] trait StorageBackendProviderH2 extends StorageBackendProvider { this: Suite => override protected def jdbcUrl: String = "jdbc:h2:mem:storage_backend_provider;db_close_delay=-1" - override protected val backendFactory: StorageBackendFactory = H2StorageBackendFactory + override protected val backend: TestBackend = TestBackend(H2StorageBackendFactory) } private[backend] trait StorageBackendProviderOracle @@ -43,5 +55,46 @@ private[backend] trait StorageBackendProviderOracle with OracleAroundAll { this: Suite => override protected def jdbcUrl: String = s"jdbc:oracle:thin:$oracleUser/$oraclePwd@localhost:$oraclePort/ORCLPDB1" - override protected val backendFactory: StorageBackendFactory = OracleStorageBackendFactory + override protected val backend: TestBackend = TestBackend(OracleStorageBackendFactory) +} + +case class TestBackend( + ingestion: IngestionStorageBackend[_], + parameter: ParameterStorageBackend, + configuration: ConfigurationStorageBackend, + party: PartyStorageBackend, + packageBackend: PackageStorageBackend, + deduplication: DeduplicationStorageBackend, + completion: CompletionStorageBackend, + contract: ContractStorageBackend, + event: EventStorageBackend, + dataSource: DataSourceStorageBackend, + dbLock: DBLockStorageBackend, + integrity: IntegrityStorageBackend, + reset: ResetStorageBackend, + stringInterning: StringInterningStorageBackend, + ledgerEndCache: MutableLedgerEndCache, +) + +object TestBackend { + def apply(storageBackendFactory: StorageBackendFactory): TestBackend = { + val ledgerEndCache = MutableLedgerEndCache() + TestBackend( + ingestion = storageBackendFactory.createIngestionStorageBackend, + parameter = storageBackendFactory.createParameterStorageBackend, + configuration = storageBackendFactory.createConfigurationStorageBackend(ledgerEndCache), + party = storageBackendFactory.createPartyStorageBackend(ledgerEndCache), + packageBackend = storageBackendFactory.createPackageStorageBackend(ledgerEndCache), + deduplication = storageBackendFactory.createDeduplicationStorageBackend, + completion = storageBackendFactory.createCompletionStorageBackend, + contract = storageBackendFactory.createContractStorageBackend(ledgerEndCache), + event = storageBackendFactory.createEventStorageBackend(ledgerEndCache), + dataSource = storageBackendFactory.createDataSourceStorageBackend, + dbLock = storageBackendFactory.createDBLockStorageBackend, + integrity = storageBackendFactory.createIntegrityStorageBackend, + reset = storageBackendFactory.createResetStorageBackend, + stringInterning = storageBackendFactory.createStringInterningStorageBackend, + ledgerEndCache = ledgerEndCache, + ) + } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSpec.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSpec.scala index cf831c5b3869..713e1ea70639 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSpec.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendSpec.scala @@ -49,7 +49,7 @@ private[backend] trait StorageBackendSpec ) dispatcher <- DbDispatcher .owner( - dataSource = backendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl), + dataSource = backend.dataSource.createDataSource(jdbcUrl), serverRole = ServerRole.Testing(this.getClass), connectionPoolSize = connectionPoolSize, connectionTimeout = FiniteDuration(250, "millis"), @@ -81,7 +81,13 @@ private[backend] trait StorageBackendSpec runningTests.incrementAndGet() == 1, "StorageBackendSpec tests must not run in parallel, as they all run against the same database.", ) - Await.result(executeSql(backendFactory.createResetStorageBackend.resetAll), 60.seconds) + Await.result( + executeSql { c => + backend.reset.resetAll(c) + updateLedgerEndCache(c) + }, + 60.seconds, + ) } override protected def afterEach(): Unit = { diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala index 81d613d51f2d..e7ba3954e1c1 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsCompletions.scala @@ -15,11 +15,6 @@ private[backend] trait StorageBackendTestsCompletions with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val completionStorageBackend: CompletionStorageBackend = - backendFactory.createCompletionStorageBackend - behavior of "StorageBackend (completions)" import StorageBackendTestValues._ @@ -36,13 +31,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 3L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 3L)) ) completions0to3 <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( Offset.beforeBegin, offset(3), applicationId, @@ -50,13 +45,13 @@ private[backend] trait StorageBackendTestsCompletions ) ) completions1to3 <- executeSql( - completionStorageBackend.commandCompletions(offset(1), offset(3), applicationId, Set(party)) + backend.completion.commandCompletions(offset(1), offset(3), applicationId, Set(party)) ) completions2to3 <- executeSql( - completionStorageBackend.commandCompletions(offset(2), offset(3), applicationId, Set(party)) + backend.completion.commandCompletions(offset(2), offset(3), applicationId, Set(party)) ) completions1to9 <- executeSql( - completionStorageBackend.commandCompletions(offset(1), offset(9), applicationId, Set(party)) + backend.completion.commandCompletions(offset(1), offset(9), applicationId, Set(party)) ) } yield { completions0to3 should have length 2 @@ -76,13 +71,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) ) completions <- executeSql( - completionStorageBackend.commandCompletions(offset(1), offset(2), applicationId, Set(party)) + backend.completion.commandCompletions(offset(1), offset(2), applicationId, Set(party)) ) } yield { completions should have length 1 @@ -102,13 +97,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) ) completions <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(1), offset(3), someApplicationId, @@ -140,13 +135,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) ) completions <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(1), offset(3), someApplicationId, @@ -189,13 +184,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) ) completions <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(1), offset(3), someApplicationId, @@ -235,13 +230,13 @@ private[backend] trait StorageBackendTestsCompletions ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos1, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) ) result <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(1), offset(2), someApplicationId, @@ -265,10 +260,10 @@ private[backend] trait StorageBackendTestsCompletions for { _ <- executeSql(ingest(dtos2, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 2L)) ) result <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(2), offset(3), someApplicationId, diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala index ae0cf0476d5c..1461e29722ca 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDBLock.scala @@ -172,10 +172,10 @@ trait StorageBackendTestsDBLockForSuite with StorageBackendProvider { this: AsyncFlatSpec => - override val dbLock: DBLockStorageBackend = backendFactory.createDBLockStorageBackend + override val dbLock: DBLockStorageBackend = backend.dbLock override def getConnection: Connection = - backendFactory.createDataSourceStorageBackend + backend.dataSource .createDataSource(jdbcUrl)(LoggingContext.ForTesting) .getConnection } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala index 1e2593ce95cc..8492aa9f9174 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsDeduplication.scala @@ -16,11 +16,6 @@ private[backend] trait StorageBackendTestsDeduplication with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val deduplicationStorageBackend: DeduplicationStorageBackend = - backendFactory.createDeduplicationStorageBackend - behavior of "DeduplicationStorageBackend" import StorageBackendTestValues._ @@ -32,15 +27,15 @@ private[backend] trait StorageBackendTestsDeduplication val n = 8 for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) insertedRows <- Future.sequence( Vector.fill(n)( executeSql( - deduplicationStorageBackend.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) + backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) ) ) ) - foundDeduplicateUntil <- executeSql(deduplicationStorageBackend.deduplicatedUntil(key)) + foundDeduplicateUntil <- executeSql(backend.deduplication.deduplicatedUntil(key)) } yield { insertedRows.count(_ == 1) shouldBe 1 // One of the calls inserts a new row insertedRows.count(_ == 0) shouldBe (n - 1) // All other calls don't write anything @@ -59,15 +54,15 @@ private[backend] trait StorageBackendTestsDeduplication val n = 8 for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) insertedRows <- executeSql( - deduplicationStorageBackend.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) + backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) ) - foundDeduplicateUntil <- executeSql(deduplicationStorageBackend.deduplicatedUntil(key)) + foundDeduplicateUntil <- executeSql(backend.deduplication.deduplicatedUntil(key)) updatedRows <- Future.sequence( Vector.fill(n)( executeSql( - deduplicationStorageBackend.upsertDeduplicationEntry( + backend.deduplication.upsertDeduplicationEntry( key, submittedAt2, deduplicateUntil2, @@ -75,7 +70,7 @@ private[backend] trait StorageBackendTestsDeduplication ) ) ) - foundDeduplicateUntil2 <- executeSql(deduplicationStorageBackend.deduplicatedUntil(key)) + foundDeduplicateUntil2 <- executeSql(backend.deduplication.deduplicatedUntil(key)) } yield { insertedRows shouldBe 1 // First call inserts a new row updatedRows.count( @@ -97,15 +92,15 @@ private[backend] trait StorageBackendTestsDeduplication val deduplicateUntil2 = submittedAt2.addMicros(5000L) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) insertedRows <- executeSql( - deduplicationStorageBackend.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) + backend.deduplication.upsertDeduplicationEntry(key, submittedAt, deduplicateUntil) ) - foundDeduplicateUntil <- executeSql(deduplicationStorageBackend.deduplicatedUntil(key)) + foundDeduplicateUntil <- executeSql(backend.deduplication.deduplicatedUntil(key)) updatedRows <- executeSql( - deduplicationStorageBackend.upsertDeduplicationEntry(key, submittedAt2, deduplicateUntil2) + backend.deduplication.upsertDeduplicationEntry(key, submittedAt2, deduplicateUntil2) ) - foundDeduplicateUntil2 <- executeSql(deduplicationStorageBackend.deduplicatedUntil(key)) + foundDeduplicateUntil2 <- executeSql(backend.deduplication.deduplicatedUntil(key)) } yield { insertedRows shouldBe 1 // First call inserts a new row updatedRows shouldBe 0 // Second call doesn't write anything diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala index 1b01d111517b..82e00fe615a7 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIngestion.scala @@ -13,14 +13,6 @@ private[backend] trait StorageBackendTestsIngestion with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val configurationStorageBackend: ConfigurationStorageBackend = - backendFactory.createConfigurationStorageBackend - private val partyStorageBackend: PartyStorageBackend = backendFactory.createPartyStorageBackend - private val packageStorageBackend: PackageStorageBackend = - backendFactory.createPackageStorageBackend - behavior of "StorageBackend (ingestion)" import StorageBackendTestValues._ @@ -32,13 +24,13 @@ private[backend] trait StorageBackendTestsIngestion ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) - configBeforeLedgerEndUpdate <- executeSql(configurationStorageBackend.ledgerConfiguration) + configBeforeLedgerEndUpdate <- executeSql(backend.configuration.ledgerConfiguration) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) ) - configAfterLedgerEndUpdate <- executeSql(configurationStorageBackend.ledgerConfiguration) + configAfterLedgerEndUpdate <- executeSql(backend.configuration.ledgerConfiguration) } yield { // The first query is executed before the ledger end is updated. // It should not see the already ingested configuration change. @@ -61,13 +53,13 @@ private[backend] trait StorageBackendTestsIngestion ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) - packagesBeforeLedgerEndUpdate <- executeSql(packageStorageBackend.lfPackages) + packagesBeforeLedgerEndUpdate <- executeSql(backend.packageBackend.lfPackages) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) ) - packagesAfterLedgerEndUpdate <- executeSql(packageStorageBackend.lfPackages) + packagesAfterLedgerEndUpdate <- executeSql(backend.packageBackend.lfPackages) } yield { // The first query is executed before the ledger end is updated. // It should not see the already ingested package upload. @@ -85,13 +77,13 @@ private[backend] trait StorageBackendTestsIngestion ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) - partiesBeforeLedgerEndUpdate <- executeSql(partyStorageBackend.knownParties) + partiesBeforeLedgerEndUpdate <- executeSql(backend.party.knownParties) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(someOffset, 0)) ) - partiesAfterLedgerEndUpdate <- executeSql(partyStorageBackend.knownParties) + partiesAfterLedgerEndUpdate <- executeSql(backend.party.knownParties) } yield { // The first query is executed before the ledger end is updated. // It should not see the already ingested party allocation. diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitialization.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitialization.scala index ffacb066dd40..98b8d8eff6b9 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitialization.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitialization.scala @@ -12,9 +12,6 @@ import org.scalatest.matchers.should.Matchers private[backend] trait StorageBackendTestsInitialization extends Matchers with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - behavior of "StorageBackend (initialization)" it should "correctly handle repeated initialization" in { @@ -25,7 +22,7 @@ private[backend] trait StorageBackendTestsInitialization extends Matchers with S for { _ <- executeSql( - parameterStorageBackend.initializeParameters( + backend.parameter.initializeParameters( ParameterStorageBackend.IdentityParams( ledgerId = ledgerId, participantId = participantId, @@ -33,7 +30,7 @@ private[backend] trait StorageBackendTestsInitialization extends Matchers with S ) ) error1 <- executeSql( - parameterStorageBackend.initializeParameters( + backend.parameter.initializeParameters( ParameterStorageBackend.IdentityParams( ledgerId = otherLedgerId, participantId = participantId, @@ -41,7 +38,7 @@ private[backend] trait StorageBackendTestsInitialization extends Matchers with S ) ).failed error2 <- executeSql( - parameterStorageBackend.initializeParameters( + backend.parameter.initializeParameters( ParameterStorageBackend.IdentityParams( ledgerId = ledgerId, participantId = otherParticipantId, @@ -49,7 +46,7 @@ private[backend] trait StorageBackendTestsInitialization extends Matchers with S ) ).failed error3 <- executeSql( - parameterStorageBackend.initializeParameters( + backend.parameter.initializeParameters( ParameterStorageBackend.IdentityParams( ledgerId = otherLedgerId, participantId = otherParticipantId, @@ -57,7 +54,7 @@ private[backend] trait StorageBackendTestsInitialization extends Matchers with S ) ).failed _ <- executeSql( - parameterStorageBackend.initializeParameters( + backend.parameter.initializeParameters( ParameterStorageBackend.IdentityParams( ledgerId = ledgerId, participantId = participantId, diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala index 8946e4077f0a..ae4cf71a8abb 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsInitializeIngestion.scala @@ -15,18 +15,6 @@ private[backend] trait StorageBackendTestsInitializeIngestion with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val configurationStorageBackend: ConfigurationStorageBackend = - backendFactory.createConfigurationStorageBackend - private val partyStorageBackend: PartyStorageBackend = backendFactory.createPartyStorageBackend - private val packageStorageBackend: PackageStorageBackend = - backendFactory.createPackageStorageBackend - private val ingestionStorageBackend: IngestionStorageBackend[_] = - backendFactory.createIngestionStorageBackend - private val contractStorageBackend: ContractStorageBackend = - backendFactory.createContractStorageBackend - behavior of "StorageBackend (initializeIngestion)" import StorageBackendTestValues._ @@ -71,55 +59,55 @@ private[backend] trait StorageBackendTestsInitializeIngestion for { // Initialize - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Start the indexer (a no-op in this case) - end1 <- executeSql(parameterStorageBackend.ledgerEnd) - _ <- executeSql(ingestionStorageBackend.deletePartiallyIngestedData(end1)) + end1 <- executeSql(backend.parameter.ledgerEnd) + _ <- executeSql(backend.ingestion.deletePartiallyIngestedData(end1)) // Fully insert first batch of updates _ <- executeSql(ingest(dtos1, _)) - _ <- executeSql(parameterStorageBackend.updateLedgerEnd(ledgerEnd(5, 3L))) + _ <- executeSql(updateLedgerEnd(ledgerEnd(5, 3L))) // Partially insert second batch of updates (indexer crashes before updating ledger end) _ <- executeSql(ingest(dtos2, _)) // Check the contents - parties1 <- executeSql(partyStorageBackend.knownParties) - config1 <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages1 <- executeSql(packageStorageBackend.lfPackages) + parties1 <- executeSql(backend.party.knownParties) + config1 <- executeSql(backend.configuration.ledgerConfiguration) + packages1 <- executeSql(backend.packageBackend.lfPackages) contract41 <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( readers, ContractId.V0.assertFromString("#4"), ) ) contract91 <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( readers, ContractId.V0.assertFromString("#9"), ) ) // Restart the indexer - should delete data from the partial insert above - end2 <- executeSql(parameterStorageBackend.ledgerEnd) - _ <- executeSql(ingestionStorageBackend.deletePartiallyIngestedData(end2)) + end2 <- executeSql(backend.parameter.ledgerEnd) + _ <- executeSql(backend.ingestion.deletePartiallyIngestedData(end2)) // Move the ledger end so that any non-deleted data would become visible - _ <- executeSql(parameterStorageBackend.updateLedgerEnd(ledgerEnd(10, 6L))) + _ <- executeSql(updateLedgerEnd(ledgerEnd(10, 6L))) // Check the contents - parties2 <- executeSql(partyStorageBackend.knownParties) - config2 <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages2 <- executeSql(packageStorageBackend.lfPackages) + parties2 <- executeSql(backend.party.knownParties) + config2 <- executeSql(backend.configuration.ledgerConfiguration) + packages2 <- executeSql(backend.packageBackend.lfPackages) contract42 <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( readers, ContractId.V0.assertFromString("#4"), ) ) contract92 <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( readers, ContractId.V0.assertFromString("#9"), ) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala index 3779f4d7cc30..73383f9f6bb2 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsIntegrity.scala @@ -9,11 +9,6 @@ import org.scalatest.matchers.should.Matchers private[backend] trait StorageBackendTestsIntegrity extends Matchers with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val integrityStorageBackend: IntegrityStorageBackend = - backendFactory.createIntegrityStorageBackend - import StorageBackendTestValues._ behavior of "IntegrityStorageBackend" @@ -25,12 +20,12 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(7), 7L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(7), 7L)) ) - failure <- executeSql(integrityStorageBackend.verifyIntegrity()).failed + failure <- executeSql(backend.integrity.verifyIntegrity()).failed } yield { // Error message should contain the duplicate event sequential id failure.getMessage should include("7") @@ -44,12 +39,12 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 3L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(3), 3L)) ) - failure <- executeSql(integrityStorageBackend.verifyIntegrity()).failed + failure <- executeSql(backend.integrity.verifyIntegrity()).failed } yield { failure.getMessage should include("consecutive") } @@ -65,12 +60,12 @@ private[backend] trait StorageBackendTestsIntegrity extends Matchers with Storag ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(updates, _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) ) - _ <- executeSql(integrityStorageBackend.verifyIntegrity()) + _ <- executeSql(backend.integrity.verifyIntegrity()) } yield { succeed } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala index 100397ed1006..4f170651fb84 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsMigrationPruning.scala @@ -17,12 +17,6 @@ private[backend] trait StorageBackendTestsMigrationPruning with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val contractStorageBackend: ContractStorageBackend = - backendFactory.createContractStorageBackend - private val eventStorageBackend: EventStorageBackend = backendFactory.createEventStorageBackend - import StorageBackendTestValues._ it should "prune all divulgence events if pruning offset is after migration offset" in { @@ -34,15 +28,15 @@ private[backend] trait StorageBackendTestsMigrationPruning val archive = dtoExercise(offset(2), 3L, consuming = true, "#1", submitter) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(Vector(create, divulgence, archive), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 3L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 3L)) ) // Simulate that the archive happened after the migration to append-only schema _ <- executeSql(updateMigrationHistoryTable(ledgerSequentialIdBefore = 2)) beforePruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString("#1"), ) @@ -52,7 +46,7 @@ private[backend] trait StorageBackendTestsMigrationPruning // Trying to prune all divulged contracts before the migration should fail _ <- executeSql( - eventStorageBackend.isPruningOffsetValidAgainstMigration( + backend.event.isPruningOffsetValidAgainstMigration( offset(1), pruneAllDivulgedContracts = true, _, @@ -60,21 +54,21 @@ private[backend] trait StorageBackendTestsMigrationPruning ).map(_ shouldBe false) // Validation passes the pruning offset for all divulged contracts is after the migration _ <- executeSql( - eventStorageBackend.isPruningOffsetValidAgainstMigration( + backend.event.isPruningOffsetValidAgainstMigration( offset(2), pruneAllDivulgedContracts = true, _, ) ).map(_ shouldBe true) _ <- executeSql( - eventStorageBackend.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( + backend.event.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( _, loggingContext, ) ) // Ensure the divulged contract is not visible anymore afterPruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString("#1"), ) diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala index bf6be8ab25e9..4cf9e764f7ee 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsPruning.scala @@ -12,14 +12,6 @@ import org.scalatest.matchers.should.Matchers private[backend] trait StorageBackendTestsPruning extends Matchers with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val contractStorageBackend: ContractStorageBackend = - backendFactory.createContractStorageBackend - private val eventStorageBackend: EventStorageBackend = backendFactory.createEventStorageBackend - private val completionStorageBackend: CompletionStorageBackend = - backendFactory.createCompletionStorageBackend - behavior of "StorageBackend (pruning)" import StorageBackendTestValues._ @@ -29,17 +21,17 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB val offset_2 = offset(2) val offset_3 = offset(4) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) - initialPruningOffset <- executeSql(parameterStorageBackend.prunedUpToInclusive) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) + initialPruningOffset <- executeSql(backend.parameter.prunedUpToInclusive) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset_1)) - updatedPruningOffset_1 <- executeSql(parameterStorageBackend.prunedUpToInclusive) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset_1)) + updatedPruningOffset_1 <- executeSql(backend.parameter.prunedUpToInclusive) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset_2)) - updatedPruningOffset_2 <- executeSql(parameterStorageBackend.prunedUpToInclusive) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset_2)) + updatedPruningOffset_2 <- executeSql(backend.parameter.prunedUpToInclusive) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset_3)) - updatedPruningOffset_3 <- executeSql(parameterStorageBackend.prunedUpToInclusive) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset_3)) + updatedPruningOffset_3 <- executeSql(backend.parameter.prunedUpToInclusive) } yield { initialPruningOffset shouldBe empty updatedPruningOffset_1 shouldBe Some(offset_1) @@ -54,30 +46,30 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB val offset_2 = offset(2) val offset_3 = offset(4) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) initialPruningOffset <- executeSql( - parameterStorageBackend.participantAllDivulgedContractsPrunedUpToInclusive + backend.parameter.participantAllDivulgedContractsPrunedUpToInclusive ) _ <- executeSql( - parameterStorageBackend.updatePrunedAllDivulgedContractsUpToInclusive(offset_1) + backend.parameter.updatePrunedAllDivulgedContractsUpToInclusive(offset_1) ) updatedPruningOffset_1 <- executeSql( - parameterStorageBackend.participantAllDivulgedContractsPrunedUpToInclusive + backend.parameter.participantAllDivulgedContractsPrunedUpToInclusive ) _ <- executeSql( - parameterStorageBackend.updatePrunedAllDivulgedContractsUpToInclusive(offset_2) + backend.parameter.updatePrunedAllDivulgedContractsUpToInclusive(offset_2) ) updatedPruningOffset_2 <- executeSql( - parameterStorageBackend.participantAllDivulgedContractsPrunedUpToInclusive + backend.parameter.participantAllDivulgedContractsPrunedUpToInclusive ) _ <- executeSql( - parameterStorageBackend.updatePrunedAllDivulgedContractsUpToInclusive(offset_3) + backend.parameter.updatePrunedAllDivulgedContractsUpToInclusive(offset_3) ) updatedPruningOffset_3 <- executeSql( - parameterStorageBackend.participantAllDivulgedContractsPrunedUpToInclusive + backend.parameter.participantAllDivulgedContractsPrunedUpToInclusive ) } yield { initialPruningOffset shouldBe empty @@ -107,34 +99,34 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB val range = RangeParams(0L, 2L, None, None) val filter = FilterParams(Set(someParty), Set.empty) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Ingest a create and archive event _ <- executeSql(ingest(Vector(create, archive), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 2L)) ) // Make sure the events are visible - before1 <- executeSql(eventStorageBackend.transactionEvents(range, filter)) - before2 <- executeSql(eventStorageBackend.activeContractEvents(range, filter, offset(1))) - before3 <- executeSql(eventStorageBackend.flatTransaction(createTransactionId, filter)) - before4 <- executeSql(eventStorageBackend.transactionTreeEvents(range, filter)) - before5 <- executeSql(eventStorageBackend.transactionTree(createTransactionId, filter)) - before6 <- executeSql(eventStorageBackend.rawEvents(0, 2L)) + before1 <- executeSql(backend.event.transactionEvents(range, filter)) + before2 <- executeSql(backend.event.activeContractEvents(range, filter, offset(1))) + before3 <- executeSql(backend.event.flatTransaction(createTransactionId, filter)) + before4 <- executeSql(backend.event.transactionTreeEvents(range, filter)) + before5 <- executeSql(backend.event.transactionTree(createTransactionId, filter)) + before6 <- executeSql(backend.event.rawEvents(0, 2L)) // Prune _ <- executeSql( - eventStorageBackend.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( + backend.event.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( _, loggingContext, ) ) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset(2))) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset(2))) // Make sure the events are not visible anymore - after1 <- executeSql(eventStorageBackend.transactionEvents(range, filter)) - after2 <- executeSql(eventStorageBackend.activeContractEvents(range, filter, offset(1))) - after3 <- executeSql(eventStorageBackend.flatTransaction(createTransactionId, filter)) - after4 <- executeSql(eventStorageBackend.transactionTreeEvents(range, filter)) - after5 <- executeSql(eventStorageBackend.transactionTree(createTransactionId, filter)) - after6 <- executeSql(eventStorageBackend.rawEvents(0, 2L)) + after1 <- executeSql(backend.event.transactionEvents(range, filter)) + after2 <- executeSql(backend.event.activeContractEvents(range, filter, offset(1))) + after3 <- executeSql(backend.event.flatTransaction(createTransactionId, filter)) + after4 <- executeSql(backend.event.transactionTreeEvents(range, filter)) + after5 <- executeSql(backend.event.transactionTree(createTransactionId, filter)) + after6 <- executeSql(backend.event.rawEvents(0, 2L)) } yield { before1 should not be empty before2 should not be empty @@ -166,34 +158,34 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB val range = RangeParams(0L, 1L, None, None) val filter = FilterParams(Set(someParty), Set.empty) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Ingest a create and archive event _ <- executeSql(ingest(Vector(partyEntry, create), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(2), 1L)) ) // Make sure the events are visible - before1 <- executeSql(eventStorageBackend.transactionEvents(range, filter)) - before2 <- executeSql(eventStorageBackend.activeContractEvents(range, filter, offset(2))) - before3 <- executeSql(eventStorageBackend.flatTransaction(createTransactionId, filter)) - before4 <- executeSql(eventStorageBackend.transactionTreeEvents(range, filter)) - before5 <- executeSql(eventStorageBackend.transactionTree(createTransactionId, filter)) - before6 <- executeSql(eventStorageBackend.rawEvents(0, 1L)) + before1 <- executeSql(backend.event.transactionEvents(range, filter)) + before2 <- executeSql(backend.event.activeContractEvents(range, filter, offset(2))) + before3 <- executeSql(backend.event.flatTransaction(createTransactionId, filter)) + before4 <- executeSql(backend.event.transactionTreeEvents(range, filter)) + before5 <- executeSql(backend.event.transactionTree(createTransactionId, filter)) + before6 <- executeSql(backend.event.rawEvents(0, 1L)) // Prune _ <- executeSql( - eventStorageBackend.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( + backend.event.pruneEvents(offset(2), pruneAllDivulgedContracts = true)( _, loggingContext, ) ) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset(2))) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset(2))) // Make sure the events are still visible - active contracts should not be pruned - after1 <- executeSql(eventStorageBackend.transactionEvents(range, filter)) - after2 <- executeSql(eventStorageBackend.activeContractEvents(range, filter, offset(2))) - after3 <- executeSql(eventStorageBackend.flatTransaction(createTransactionId, filter)) - after4 <- executeSql(eventStorageBackend.transactionTreeEvents(range, filter)) - after5 <- executeSql(eventStorageBackend.transactionTree(createTransactionId, filter)) - after6 <- executeSql(eventStorageBackend.rawEvents(0, 1L)) + after1 <- executeSql(backend.event.transactionEvents(range, filter)) + after2 <- executeSql(backend.event.activeContractEvents(range, filter, offset(2))) + after3 <- executeSql(backend.event.flatTransaction(createTransactionId, filter)) + after4 <- executeSql(backend.event.transactionTreeEvents(range, filter)) + after5 <- executeSql(backend.event.transactionTree(createTransactionId, filter)) + after6 <- executeSql(backend.event.rawEvents(0, 1L)) } yield { before1 should not be empty before2 should not be empty @@ -240,7 +232,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Ingest _ <- executeSql( ingest( @@ -254,34 +246,34 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) ) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 4L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(4), 4L)) ) contract1_beforePruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract1_id), ) ) contract2_beforePruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract2_id), ) ) _ <- executeSql( - eventStorageBackend.pruneEvents(offset(3), pruneAllDivulgedContracts = true)( + backend.event.pruneEvents(offset(3), pruneAllDivulgedContracts = true)( _, loggingContext, ) ) contract1_afterPruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract1_id), ) ) contract2_afterPruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract2_id), ) @@ -329,7 +321,7 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Ingest _ <- executeSql( ingest( @@ -344,34 +336,34 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) // Set the ledger end past the last ingested event so we can prune up to it inclusively _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(5), 5L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(5), 5L)) ) contract1_beforePruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract1_id), ) ) contract2_beforePruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract2_id), ) ) _ <- executeSql( - eventStorageBackend.pruneEvents(offset(4), pruneAllDivulgedContracts = false)( + backend.event.pruneEvents(offset(4), pruneAllDivulgedContracts = false)( _, loggingContext, ) ) contract1_afterPruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract1_id), ) ) contract2_afterPruning <- executeSql( - contractStorageBackend.activeContractWithoutArgument( + backend.contract.activeContractWithoutArgument( Set(divulgee), ContractId.assertFromString(contract2_id), ) @@ -398,15 +390,15 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) val applicationId = dtoApplicationId(completion) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) // Ingest a completion _ <- executeSql(ingest(Vector(completion), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) ) // Make sure the completion is visible before <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(0), offset(1), applicationId, @@ -414,11 +406,11 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB ) ) // Prune - _ <- executeSql(completionStorageBackend.pruneCompletions(offset(1))(_, loggingContext)) - _ <- executeSql(parameterStorageBackend.updatePrunedUptoInclusive(offset(1))) + _ <- executeSql(backend.completion.pruneCompletions(offset(1))(_, loggingContext)) + _ <- executeSql(backend.parameter.updatePrunedUptoInclusive(offset(1))) // Make sure the completion is not visible anymore after <- executeSql( - completionStorageBackend.commandCompletions( + backend.completion.commandCompletions( offset(0), offset(1), applicationId, diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsReset.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsReset.scala index 80d371304ba3..11449398516c 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsReset.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsReset.scala @@ -11,33 +11,20 @@ import scala.concurrent.Future private[backend] trait StorageBackendTestsReset extends Matchers with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val configurationStorageBackend: ConfigurationStorageBackend = - backendFactory.createConfigurationStorageBackend - private val partyStorageBackend: PartyStorageBackend = backendFactory.createPartyStorageBackend - private val packageStorageBackend: PackageStorageBackend = - backendFactory.createPackageStorageBackend - private val contractStorageBackend: ContractStorageBackend = - backendFactory.createContractStorageBackend - private val stringInterningStorageBackend: StringInterningStorageBackend = - backendFactory.createStringInterningStorageBackend - private val resetStorageBackend: ResetStorageBackend = backendFactory.createResetStorageBackend - behavior of "StorageBackend (reset)" import StorageBackendTestValues._ it should "start with an empty index" in { for { - identity <- executeSql(parameterStorageBackend.ledgerIdentity) - end <- executeSql(parameterStorageBackend.ledgerEnd) - parties <- executeSql(partyStorageBackend.knownParties) - config <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages <- executeSql(packageStorageBackend.lfPackages) - events <- executeSql(contractStorageBackend.contractStateEvents(0, Long.MaxValue)) + identity <- executeSql(backend.parameter.ledgerIdentity) + end <- executeSql(backend.parameter.ledgerEnd) + parties <- executeSql(backend.party.knownParties) + config <- executeSql(backend.configuration.ledgerConfiguration) + packages <- executeSql(backend.packageBackend.lfPackages) + events <- executeSql(backend.contract.contractStateEvents(0, Long.MaxValue)) stringInterningEntries <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(0, 1000) + backend.stringInterning.loadStringInterningEntries(0, 1000) ) } yield { identity shouldBe None @@ -53,9 +40,9 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac it should "not see any data after advancing the ledger end" in { for { _ <- advanceLedgerEndToMakeOldDataVisible() - parties <- executeSql(partyStorageBackend.knownParties) - config <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages <- executeSql(packageStorageBackend.lfPackages) + parties <- executeSql(backend.party.knownParties) + config <- executeSql(backend.configuration.ledgerConfiguration) + packages <- executeSql(backend.packageBackend.lfPackages) } yield { parties shouldBe empty packages shouldBe empty @@ -84,25 +71,25 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac for { // Initialize and insert some data - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) - _ <- executeSql(parameterStorageBackend.updateLedgerEnd(ledgerEnd(5, 3L))) + _ <- executeSql(updateLedgerEnd(ledgerEnd(5, 3L))) // Reset - _ <- executeSql(resetStorageBackend.reset) + _ <- executeSql(backend.reset.reset) // Check the contents - identity <- executeSql(parameterStorageBackend.ledgerIdentity) - end <- executeSql(parameterStorageBackend.ledgerEnd) - events <- executeSql(contractStorageBackend.contractStateEvents(0, Long.MaxValue)) + identity <- executeSql(backend.parameter.ledgerIdentity) + end <- executeSql(backend.parameter.ledgerEnd) + events <- executeSql(backend.contract.contractStateEvents(0, Long.MaxValue)) // Check the contents (queries that don't read beyond ledger end) _ <- advanceLedgerEndToMakeOldDataVisible() - parties <- executeSql(partyStorageBackend.knownParties) - config <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages <- executeSql(packageStorageBackend.lfPackages) + parties <- executeSql(backend.party.knownParties) + config <- executeSql(backend.configuration.ledgerConfiguration) + packages <- executeSql(backend.packageBackend.lfPackages) stringInterningEntries <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(0, 1000) + backend.stringInterning.loadStringInterningEntries(0, 1000) ) } yield { identity shouldBe None @@ -135,25 +122,25 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac for { // Initialize and insert some data - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(dtos, _)) - _ <- executeSql(parameterStorageBackend.updateLedgerEnd(ledgerEnd(5, 3L))) + _ <- executeSql(updateLedgerEnd(ledgerEnd(5, 3L))) // Reset - _ <- executeSql(resetStorageBackend.resetAll) + _ <- executeSql(backend.reset.resetAll) // Check the contents (queries that do not depend on ledger end) - identity <- executeSql(parameterStorageBackend.ledgerIdentity) - end <- executeSql(parameterStorageBackend.ledgerEnd) - events <- executeSql(contractStorageBackend.contractStateEvents(0, Long.MaxValue)) + identity <- executeSql(backend.parameter.ledgerIdentity) + end <- executeSql(backend.parameter.ledgerEnd) + events <- executeSql(backend.contract.contractStateEvents(0, Long.MaxValue)) // Check the contents (queries that don't read beyond ledger end) _ <- advanceLedgerEndToMakeOldDataVisible() - parties <- executeSql(partyStorageBackend.knownParties) - config <- executeSql(configurationStorageBackend.ledgerConfiguration) - packages <- executeSql(packageStorageBackend.lfPackages) + parties <- executeSql(backend.party.knownParties) + config <- executeSql(backend.configuration.ledgerConfiguration) + packages <- executeSql(backend.packageBackend.lfPackages) stringInterningEntries <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(0, 1000) + backend.stringInterning.loadStringInterningEntries(0, 1000) ) } yield { identity shouldBe None @@ -171,8 +158,8 @@ private[backend] trait StorageBackendTestsReset extends Matchers with StorageBac // queries now find any left-over data not cleaned by reset. private def advanceLedgerEndToMakeOldDataVisible(): Future[Unit] = { for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) - _ <- executeSql(parameterStorageBackend.updateLedgerEnd(ledgerEnd(10000, 10000))) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) + _ <- executeSql(updateLedgerEnd(ledgerEnd(10000, 10000))) } yield () } } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsStringInterning.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsStringInterning.scala index 5f9887493eed..f6c41017eb60 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsStringInterning.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsStringInterning.scala @@ -13,8 +13,6 @@ private[backend] trait StorageBackendTestsStringInterning with StorageBackendSpec { this: AsyncFlatSpec => - private val stringInterningStorageBackend = backendFactory.createStringInterningStorageBackend - behavior of "StorageBackend (StringInterning)" it should "store and load string-interning entries" in { @@ -27,18 +25,18 @@ private[backend] trait StorageBackendTestsStringInterning for { interningIdsBeforeBegin <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(0, 5) + backend.stringInterning.loadStringInterningEntries(0, 5) ) _ <- executeSql(ingest(dtos, _)) - interningIdsFull <- executeSql(stringInterningStorageBackend.loadStringInterningEntries(0, 5)) + interningIdsFull <- executeSql(backend.stringInterning.loadStringInterningEntries(0, 5)) interningIdsOverFetch <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(0, 10) + backend.stringInterning.loadStringInterningEntries(0, 10) ) interningIdsEmpty <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(5, 10) + backend.stringInterning.loadStringInterningEntries(5, 10) ) interningIdsSubset <- executeSql( - stringInterningStorageBackend.loadStringInterningEntries(3, 10) + backend.stringInterning.loadStringInterningEntries(3, 10) ) } yield { val expectedFullList = List( diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala index d65c26976fda..0140abfefb66 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/backend/StorageBackendTestsTimestamps.scala @@ -15,12 +15,6 @@ import scala.util.Success private[backend] trait StorageBackendTestsTimestamps extends Matchers with StorageBackendSpec { this: AsyncFlatSpec => - private val parameterStorageBackend: ParameterStorageBackend = - backendFactory.createParameterStorageBackend - private val eventStorageBackend: EventStorageBackend = backendFactory.createEventStorageBackend - private val contractStorageBackend: ContractStorageBackend = - backendFactory.createContractStorageBackend - behavior of "StorageBackend (timestamps)" import StorageBackendTestValues._ @@ -35,19 +29,19 @@ private[backend] trait StorageBackendTestsTimestamps extends Matchers with Stora ledgerEffectiveTime = Some(let), ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(Vector(create), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) ) - let1 <- executeSql(contractStorageBackend.maximumLedgerTime(Set(cid))) + let1 <- executeSql(backend.contract.maximumLedgerTime(Set(cid))) let2 <- executeSql( - withDefaultTimeZone("GMT-1")(contractStorageBackend.maximumLedgerTime(Set(cid))) + withDefaultTimeZone("GMT-1")(backend.contract.maximumLedgerTime(Set(cid))) ) let3 <- executeSql( - withDefaultTimeZone("GMT+1")(contractStorageBackend.maximumLedgerTime(Set(cid))) + withDefaultTimeZone("GMT+1")(backend.contract.maximumLedgerTime(Set(cid))) ) } yield { withClue("UTC") { let1 shouldBe Success(Some(let)) } @@ -66,16 +60,16 @@ private[backend] trait StorageBackendTestsTimestamps extends Matchers with Stora ledgerEffectiveTime = Some(let), ) for { - _ <- executeSql(parameterStorageBackend.initializeParameters(someIdentityParams)) + _ <- executeSql(backend.parameter.initializeParameters(someIdentityParams)) _ <- executeSql(ingest(Vector(create), _)) _ <- executeSql( - parameterStorageBackend.updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) + updateLedgerEnd(ParameterStorageBackend.LedgerEnd(offset(1), 1L)) ) - events1 <- executeSql(eventStorageBackend.rawEvents(0L, 1L)) - events2 <- executeSql(withDefaultTimeZone("GMT-1")(eventStorageBackend.rawEvents(0L, 1L))) - events3 <- executeSql(withDefaultTimeZone("GMT+1")(eventStorageBackend.rawEvents(0L, 1L))) + events1 <- executeSql(backend.event.rawEvents(0L, 1L)) + events2 <- executeSql(withDefaultTimeZone("GMT-1")(backend.event.rawEvents(0L, 1L))) + events3 <- executeSql(withDefaultTimeZone("GMT+1")(backend.event.rawEvents(0L, 1L))) } yield { withClue("UTC") { events1.head.ledgerEffectiveTime shouldBe Some(let) } withClue("GMT-1") { events2.head.ledgerEffectiveTime shouldBe Some(let) } diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/PostCommitValidationSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/PostCommitValidationSpec.scala index 984c620b1502..4e28e97672e0 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/PostCommitValidationSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/store/appendonlydao/events/PostCommitValidationSpec.scala @@ -10,15 +10,15 @@ import com.daml.lf.data.Time.Timestamp import com.daml.lf.transaction.GlobalKey import com.daml.lf.transaction.test.{TransactionBuilder => TxBuilder} import com.daml.lf.value.Value.ValueText -import com.daml.platform.store.backend.{ContractStorageBackend, PartyStorageBackend, StorageBackend} +import com.daml.platform.store.backend.{ContractStorageBackend, PartyStorageBackend} import com.daml.platform.store.entries.PartyLedgerEntry import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - import java.sql.Connection import java.time.Instant import java.util.UUID + import scala.util.{Failure, Success, Try} final class PostCommitValidationSpec extends AnyWordSpec with Matchers { @@ -547,10 +547,10 @@ object PostCommitValidationSpec { notImplemented() override def contractState(contractId: ContractId, before: Long)( connection: Connection - ): Option[StorageBackend.RawContractState] = notImplemented() + ): Option[ContractStorageBackend.RawContractState] = notImplemented() override def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection - ): Option[StorageBackend.RawContract] = notImplemented() + ): Option[ContractStorageBackend.RawContract] = notImplemented() override def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)( connection: Connection ): Option[String] = notImplemented() @@ -559,7 +559,7 @@ object PostCommitValidationSpec { ): Option[ContractId] = notImplemented() override def contractStateEvents(startExclusive: Long, endInclusive: Long)( connection: Connection - ): Vector[StorageBackend.RawContractStateEvent] = notImplemented() + ): Vector[ContractStorageBackend.RawContractStateEvent] = notImplemented() override def partyEntries( startExclusive: Offset, diff --git a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala index dc2b5ef92b2d..1ef06c3afb7a 100644 --- a/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala +++ b/ledger/recovering-indexer-integration-tests/src/test/suite/scala/com/digitalasset/platform/indexer/RecoveringIndexerIntegrationSpec.scala @@ -30,6 +30,7 @@ import com.daml.platform.indexer.RecoveringIndexerIntegrationSpec._ import com.daml.platform.server.api.validation.ErrorFactories import com.daml.platform.store.appendonlydao.{JdbcLedgerDao, LedgerReadDao} import com.daml.platform.store.LfValueTranslationCache +import com.daml.platform.store.cache.MutableLedgerEndCache import com.daml.platform.testing.LogCollector import com.daml.telemetry.{NoOpTelemetryContext, TelemetryContext} import com.daml.timer.RetryStrategy @@ -74,14 +75,7 @@ class RecoveringIndexerIntegrationSpec submissionId = randomSubmissionId(), ) .toScala - _ <- index.use { ledgerDao => - eventually { (_, _) => - ledgerDao - .listKnownParties() - .map(parties => parties.map(_.displayName)) - .map(_ shouldBe Seq(Some("Alice"))) - } - } + _ <- eventuallyPartiesShouldBe("Alice") } yield () } .map { _ => @@ -121,14 +115,7 @@ class RecoveringIndexerIntegrationSpec submissionId = randomSubmissionId(), ) .toScala - _ <- index.use { ledgerDao => - eventually { (_, _) => - ledgerDao - .listKnownParties() - .map(parties => parties.map(_.displayName)) - .map(_ shouldBe Seq(Some("Alice"), Some("Bob"), Some("Carol"))) - } - } + _ <- eventuallyPartiesShouldBe("Alice", "Bob", "Carol") } yield () } .map { _ => @@ -222,24 +209,46 @@ class RecoveringIndexerIntegrationSpec } yield participantState } - private def index(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] = { + private def eventuallyPartiesShouldBe(partyNames: String*)(implicit + loggingContext: LoggingContext + ): Future[Unit] = + dao.use { case (ledgerDao, ledgerEndCache) => + eventually { (_, _) => + for { + ledgerEnd <- ledgerDao.lookupLedgerEndOffsetAndSequentialId() + _ = ledgerEndCache.set(ledgerEnd) + knownParties <- ledgerDao.listKnownParties() + } yield { + knownParties.map(_.displayName) shouldBe partyNames.map(Some(_)) + () + } + } + } + + private def dao(implicit + loggingContext: LoggingContext + ): ResourceOwner[(LedgerReadDao, MutableLedgerEndCache)] = { + val mutableLedgerEndCache = MutableLedgerEndCache() val jdbcUrl = s"jdbc:h2:mem:${getClass.getSimpleName.toLowerCase}-$testId;db_close_delay=-1;db_close_on_exit=false" val errorFactories: ErrorFactories = mock[ErrorFactories] - JdbcLedgerDao.readOwner( - serverRole = ServerRole.Testing(getClass), - jdbcUrl = jdbcUrl, - connectionPoolSize = 16, - connectionTimeout = 250.millis, - eventsPageSize = 100, - eventsProcessingParallelism = 8, - servicesExecutionContext = executionContext, - metrics = new Metrics(new MetricRegistry), - lfValueTranslationCache = LfValueTranslationCache.Cache.none, - enricher = None, - participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"), - errorFactories = errorFactories, - ) + JdbcLedgerDao + .readOwner( + serverRole = ServerRole.Testing(getClass), + jdbcUrl = jdbcUrl, + connectionPoolSize = 16, + connectionTimeout = 250.millis, + eventsPageSize = 100, + eventsProcessingParallelism = 8, + servicesExecutionContext = executionContext, + metrics = new Metrics(new MetricRegistry), + lfValueTranslationCache = LfValueTranslationCache.Cache.none, + enricher = None, + participantId = Ref.ParticipantId.assertFromString("RecoveringIndexerIntegrationSpec"), + errorFactories = errorFactories, + ledgerEndCache = mutableLedgerEndCache, + ) + .map(_ -> mutableLedgerEndCache) } }