Skip to content

Commit

Permalink
Wire LedgerEndCache to relevant StorageBackend factories [DPP-704] (#…
Browse files Browse the repository at this point in the history
…11549)

* Add ledgerEndCache dependency to relevant StorageBackend factory methods
* Wire ledgerEndCache to appropriate places
* REFACTORING: StorageBackendTests to instantiate TestBackend, wire ledgerEndCache update
* REFACTORING: RecoveringIndexerIntegrationSpec to simplify party existence, wire ledgerEndCache update
* REFACTORING: Remove StorageBackend super-trait, move StorageBackend DTO-s
* REFACTORING: Move common part of StorageBackendFactory to CommonStorageBackendFactory

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 4, 2021
1 parent 73cb42e commit 0e95ccb
Show file tree
Hide file tree
Showing 29 changed files with 491 additions and 520 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.daml.platform.store.appendonlydao.{
LedgerDaoTransactionsReader,
LedgerReadDao,
}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.{BaseLedger, LfValueTranslationCache}
import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy
Expand Down Expand Up @@ -66,7 +66,8 @@ private[platform] object ReadOnlySqlLedger {
override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = {
val ledgerEndCache = MutableLedgerEndCache()
for {
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories).acquire()
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache)
.acquire()
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
} yield ledger
Expand Down Expand Up @@ -142,6 +143,7 @@ private[platform] object ReadOnlySqlLedger {
private def ledgerDaoOwner(
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
Expand All @@ -156,6 +158,7 @@ private[platform] object ReadOnlySqlLedger {
Some(enricher),
participantId,
errorFactories,
ledgerEndCache,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.cache.MutableLedgerEndCache
import scalaz.Tag

import scala.concurrent.duration._
Expand Down Expand Up @@ -59,6 +60,7 @@ object IndexMetadata {
// and this property is not needed for the used ReadDao.
participantId = Ref.ParticipantId.assertFromString("1"),
errorFactories = errorFactories,
ledgerEndCache = MutableLedgerEndCache(), // not used
)

private val Empty = "<empty>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import com.daml.platform.store.backend.{
StorageBackendFactory,
UpdateToDbDto,
}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.entries.{
ConfigurationEntry,
LedgerEntry,
Expand Down Expand Up @@ -772,6 +772,7 @@ private[platform] object JdbcLedgerDao {
enricher: Option[ValueEnricher],
participantId: Ref.ParticipantId,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerReadDao] =
owner(
serverRole,
Expand All @@ -788,6 +789,7 @@ private[platform] object JdbcLedgerDao {
participantId = participantId,
sequentialWriteDao = NoopSequentialWriteDao,
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerReadDao(_, metrics))

def writeOwner(
Expand Down Expand Up @@ -828,6 +830,7 @@ private[platform] object JdbcLedgerDao {
ledgerEndCache,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}

Expand Down Expand Up @@ -872,6 +875,7 @@ private[platform] object JdbcLedgerDao {
ledgerEndCache,
),
errorFactories = errorFactories,
ledgerEndCache = ledgerEndCache,
).map(new MeteredLedgerDao(_, metrics))
}

Expand Down Expand Up @@ -917,6 +921,7 @@ private[platform] object JdbcLedgerDao {
participantId: Ref.ParticipantId,
sequentialWriteDao: SequentialWriteDao,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
)(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = {
val dbType = DbType.jdbcType(jdbcUrl)
val factory = StorageBackendFactory.of(dbType)
Expand All @@ -940,7 +945,7 @@ private[platform] object JdbcLedgerDao {
enricher,
sequentialWriteDao,
participantId,
StorageBackendFactory.readStorageBackendFor(dbType),
StorageBackendFactory.readStorageBackendFor(dbType, ledgerEndCache),
factory.createParameterStorageBackend,
factory.createDeduplicationStorageBackend,
factory.createResetStorageBackend,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.io.ByteArrayInputStream
import com.daml.platform.store.appendonlydao.events
import com.daml.platform.store.serialization.{Compression, ValueSerializer}
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.backend.StorageBackend.RawContractStateEvent
import com.daml.platform.store.backend.ContractStorageBackend.RawContractStateEvent

import scala.util.control.NoStackTrace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.platform.store.appendonlydao.events
import java.io.ByteArrayInputStream

import com.daml.lf.data.Ref
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent
import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.serialization.{Compression, ValueSerializer}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import com.daml.platform
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw}
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState
Expand All @@ -31,24 +30,7 @@ import scala.util.Try
* Naming convention for the interface methods, which requiring Connection:
* - read operations are represented as nouns (plural, singular form indicates cardinality)
* - write operations are represented as verbs
*
* @tparam DB_BATCH Since parallel ingestion comes also with batching, this implementation specific type allows separation of the CPU intensive batching operation from the pure IO intensive insertBatch operation.
*/
trait StorageBackend[DB_BATCH]
extends IngestionStorageBackend[DB_BATCH]
with ParameterStorageBackend
with ConfigurationStorageBackend
with PartyStorageBackend
with PackageStorageBackend
with DeduplicationStorageBackend
with CompletionStorageBackend
with ContractStorageBackend
with EventStorageBackend
with DataSourceStorageBackend
with DBLockStorageBackend
with IntegrityStorageBackend
with ResetStorageBackend
with StringInterningStorageBackend

trait ResetStorageBackend {

Expand Down Expand Up @@ -224,10 +206,10 @@ trait ContractStorageBackend {
def keyState(key: Key, validAt: Long)(connection: Connection): KeyState
def contractState(contractId: ContractId, before: Long)(
connection: Connection
): Option[StorageBackend.RawContractState]
): Option[ContractStorageBackend.RawContractState]
def activeContractWithArgument(readers: Set[Ref.Party], contractId: ContractId)(
connection: Connection
): Option[StorageBackend.RawContract]
): Option[ContractStorageBackend.RawContract]
def activeContractWithoutArgument(readers: Set[Ref.Party], contractId: ContractId)(
connection: Connection
): Option[String]
Expand All @@ -236,7 +218,38 @@ trait ContractStorageBackend {
): Option[ContractId]
def contractStateEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[StorageBackend.RawContractStateEvent]
): Vector[ContractStorageBackend.RawContractStateEvent]
}

object ContractStorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Timestamp],
)

class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
offset: Offset,
)
}

trait EventStorageBackend {
Expand Down Expand Up @@ -278,7 +291,7 @@ trait EventStorageBackend {
def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long]
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[RawTransactionEvent]
): Vector[EventStorageBackend.RawTransactionEvent]
}

object EventStorageBackend {
Expand All @@ -293,6 +306,37 @@ object EventStorageBackend {
wildCardParties: Set[Ref.Party],
partiesAndTemplates: Set[(Set[Ref.Party], Set[Ref.Identifier])],
)

case class RawTransactionEvent(
eventKind: Int,
transactionId: String,
nodeIndex: Int,
commandId: Option[String],
workflowId: Option[String],
eventId: EventId,
contractId: platform.store.appendonlydao.events.ContractId,
templateId: Option[platform.store.appendonlydao.events.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
eventSequentialId: Long,
offset: Offset,
) extends NeverEqualsOverride
}

trait DataSourceStorageBackend {
Expand Down Expand Up @@ -358,65 +402,3 @@ trait StringInterningStorageBackend {
connection: Connection
): Iterable[(Int, String)]
}

object StorageBackend {
case class RawContractState(
templateId: Option[String],
flatEventWitnesses: Set[Ref.Party],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
eventKind: Int,
ledgerEffectiveTime: Option[Timestamp],
)

class RawContract(
val templateId: String,
val createArgument: Array[Byte],
val createArgumentCompression: Option[Int],
)

case class RawContractStateEvent(
eventKind: Int,
contractId: ContractId,
templateId: Option[Ref.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
flatEventWitnesses: Set[Ref.Party],
eventSequentialId: Long,
offset: Offset,
)

case class RawTransactionEvent(
eventKind: Int,
transactionId: String,
nodeIndex: Int,
commandId: Option[String],
workflowId: Option[String],
eventId: EventId,
contractId: platform.store.appendonlydao.events.ContractId,
templateId: Option[platform.store.appendonlydao.events.Identifier],
ledgerEffectiveTime: Option[Timestamp],
createSignatories: Option[Array[String]],
createObservers: Option[Array[String]],
createAgreementText: Option[String],
createKeyValue: Option[Array[Byte]],
createKeyCompression: Option[Int],
createArgument: Option[Array[Byte]],
createArgumentCompression: Option[Int],
treeEventWitnesses: Set[String],
flatEventWitnesses: Set[String],
submitters: Set[String],
exerciseChoice: Option[String],
exerciseArgument: Option[Array[Byte]],
exerciseArgumentCompression: Option[Int],
exerciseResult: Option[Array[Byte]],
exerciseResultCompression: Option[Int],
exerciseActors: Option[Array[String]],
exerciseChildEventIds: Option[Array[String]],
eventSequentialId: Long,
offset: Offset,
) extends NeverEqualsOverride
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import com.daml.platform.store.DbType
import com.daml.platform.store.backend.h2.H2StorageBackendFactory
import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory
import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory
import com.daml.platform.store.cache.LedgerEndCache

trait StorageBackendFactory {
def createIngestionStorageBackend: IngestionStorageBackend[_]
def createParameterStorageBackend: ParameterStorageBackend
def createConfigurationStorageBackend: ConfigurationStorageBackend
def createPartyStorageBackend: PartyStorageBackend
def createPackageStorageBackend: PackageStorageBackend
def createConfigurationStorageBackend(ledgerEndCache: LedgerEndCache): ConfigurationStorageBackend
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
def createDeduplicationStorageBackend: DeduplicationStorageBackend
def createCompletionStorageBackend: CompletionStorageBackend
def createContractStorageBackend: ContractStorageBackend
def createEventStorageBackend: EventStorageBackend
def createContractStorageBackend(ledgerEndCache: LedgerEndCache): ContractStorageBackend
def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend
def createDataSourceStorageBackend: DataSourceStorageBackend
def createDBLockStorageBackend: DBLockStorageBackend
def createIntegrityStorageBackend: IntegrityStorageBackend
Expand All @@ -33,15 +34,18 @@ object StorageBackendFactory {
case DbType.Oracle => OracleStorageBackendFactory
}

def readStorageBackendFor(dbType: DbType): ReadStorageBackend = {
def readStorageBackendFor(
dbType: DbType,
ledgerEndCache: LedgerEndCache,
): ReadStorageBackend = {
val factory = of(dbType)
ReadStorageBackend(
configurationStorageBackend = factory.createConfigurationStorageBackend,
partyStorageBackend = factory.createPartyStorageBackend,
packageStorageBackend = factory.createPackageStorageBackend,
configurationStorageBackend = factory.createConfigurationStorageBackend(ledgerEndCache),
partyStorageBackend = factory.createPartyStorageBackend(ledgerEndCache),
packageStorageBackend = factory.createPackageStorageBackend(ledgerEndCache),
completionStorageBackend = factory.createCompletionStorageBackend,
contractStorageBackend = factory.createContractStorageBackend,
eventStorageBackend = factory.createEventStorageBackend,
contractStorageBackend = factory.createContractStorageBackend(ledgerEndCache),
eventStorageBackend = factory.createEventStorageBackend(ledgerEndCache),
)
}
}
Expand Down
Loading

0 comments on commit 0e95ccb

Please sign in to comment.