Skip to content

Commit

Permalink
String interning read integration [DPP-706] (#11638)
Browse files Browse the repository at this point in the history
* Wire to read pipelines (initialization and update)
* Patch StringInterning through the appropriate StorageBackend-s
* Creating a StringInterning for tests (which interns everything it sees)

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 10, 2021
1 parent c5a1f0b commit fd61d0b
Show file tree
Hide file tree
Showing 22 changed files with 227 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import com.daml.platform.store.appendonlydao.{
}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.interning.{
StringInterning,
StringInterningView,
UpdatingStringInterningView,
}
import com.daml.platform.store.{BaseLedger, DbType, LfValueTranslationCache}
import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy
Expand Down Expand Up @@ -79,27 +84,41 @@ private[platform] object ReadOnlySqlLedger {
metrics = metrics,
)
.acquire()
ledgerDao = ledgerDaoOwner(
stringInterningStorageBackend = storageBackendFactory.createStringInterningStorageBackend
stringInterningView = new StringInterningView(
loadPrefixedEntries = (fromExclusive, toInclusive) =>
implicit loggingContext =>
dbDispatcher.executeSql(metrics.daml.index.db.loadStringInterningEntries) {
stringInterningStorageBackend.loadStringInterningEntries(
fromExclusive,
toInclusive,
)
}
)
ledgerDao = createLedgerDao(
servicesExecutionContext,
errorFactories,
ledgerEndCache,
stringInterningView,
dbDispatcher,
storageBackendFactory,
)
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache, stringInterningView).acquire()
} yield ledger
}

private def ledgerOwner(
ledgerDao: LedgerReadDao,
ledgerId: LedgerId,
ledgerEndCache: MutableLedgerEndCache,
updatingStringInterningView: UpdatingStringInterningView,
) =
if (enableMutableContractStateCache) {
new ReadOnlySqlLedgerWithMutableCache.Owner(
ledgerDao,
ledgerEndCache,
updatingStringInterningView,
enricher,
ledgerId,
metrics,
Expand All @@ -113,6 +132,7 @@ private[platform] object ReadOnlySqlLedger {
new ReadOnlySqlLedgerWithTranslationCache.Owner(
ledgerDao,
ledgerEndCache,
updatingStringInterningView,
ledgerId,
lfValueTranslationCache,
)
Expand Down Expand Up @@ -158,10 +178,11 @@ private[platform] object ReadOnlySqlLedger {
}
}

private def ledgerDaoOwner(
private def createLedgerDao(
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
dbDispatcher: DbDispatcher,
storageBackendFactory: StorageBackendFactory,
): LedgerReadDao =
Expand All @@ -176,6 +197,7 @@ private[platform] object ReadOnlySqlLedger {
participantId = participantId,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,
errorFactories = errorFactories,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.daml.platform.store.cache.{
MutableLedgerEndCache,
}
import com.daml.platform.store.interfaces.TransactionLogUpdate
import com.daml.platform.store.interning.UpdatingStringInterningView
import com.daml.scalautil.Statement.discard

import scala.collection.mutable
Expand All @@ -39,6 +40,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
final class Owner(
ledgerDao: LedgerReadDao,
ledgerEndCache: MutableLedgerEndCache,
updatingStringInterningView: UpdatingStringInterningView,
enricher: ValueEnricher,
ledgerId: LedgerId,
metrics: Metrics,
Expand All @@ -59,6 +61,9 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
ledgerDao.lookupLedgerEnd()
)
_ = ledgerEndCache.set((ledgerEnd.lastOffset, ledgerEnd.lastEventSeqId))
_ <- Resource.fromFuture(
updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
)
prefetchingDispatcher <- dispatcherOffsetSeqIdOwner(
ledgerEnd.lastOffset,
ledgerEnd.lastEventSeqId,
Expand Down Expand Up @@ -154,6 +159,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
cacheUpdatesDispatcher,
generalDispatcher,
dispatcherLagMeter,
updatingStringInterningView,
)
)
} yield ledger
Expand Down Expand Up @@ -226,6 +232,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
contractStateEventsDispatcher = cacheUpdatesDispatcher,
dispatcher = generalDispatcher,
dispatcherLagger = dispatcherLagMeter,
updatingStringInterningView = updatingStringInterningView,
)
)
} yield ledger
Expand Down Expand Up @@ -277,6 +284,7 @@ private final class ReadOnlySqlLedgerWithMutableCache(
contractStateEventsDispatcher: Dispatcher[(Offset, Long)],
dispatcher: Dispatcher[Offset],
dispatcherLagger: DispatcherLagMeter,
updatingStringInterningView: UpdatingStringInterningView,
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends ReadOnlySqlLedger(
ledgerId,
Expand All @@ -294,7 +302,14 @@ private final class ReadOnlySqlLedgerWithMutableCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
.mapAsync(1) {
implicit val ec: ExecutionContext = mat.executionContext
_ =>
for {
ledgerEnd <- ledgerDao.lookupLedgerEnd()
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
} yield ledgerEnd
}
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { case newLedgerHead =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.appendonlydao.LedgerReadDao
import com.daml.platform.store.cache.{MutableLedgerEndCache, TranslationCacheBackedContractStore}
import com.daml.platform.store.interning.UpdatingStringInterningView

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.{Await, ExecutionContext, Future}

private[index] object ReadOnlySqlLedgerWithTranslationCache {

final class Owner(
ledgerDao: LedgerReadDao,
ledgerEndCache: MutableLedgerEndCache,
updatingStringInterningView: UpdatingStringInterningView,
ledgerId: LedgerId,
lfValueTranslationCache: LfValueTranslationCache.Cache,
)(implicit mat: Materializer, loggingContext: LoggingContext)
Expand All @@ -50,6 +52,7 @@ private[index] object ReadOnlySqlLedgerWithTranslationCache {
ledgerEndCache,
contractsStore,
dispatcher,
updatingStringInterningView,
)
)

Expand All @@ -72,6 +75,7 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
ledgerEndCache: MutableLedgerEndCache,
contractStore: ContractStore,
dispatcher: Dispatcher[Offset],
updatingStringInterningView: UpdatingStringInterningView,
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends ReadOnlySqlLedger(
ledgerId,
Expand All @@ -89,7 +93,14 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
)(() =>
Source
.tick(0.millis, 100.millis, ())
.mapAsync(1)(_ => ledgerDao.lookupLedgerEnd())
.mapAsync(1) {
implicit val ec: ExecutionContext = mat.executionContext
_ =>
for {
ledgerEnd <- ledgerDao.lookupLedgerEnd()
_ <- updatingStringInterningView.update(ledgerEnd.lastStringInterningId)
} yield ledgerEnd
}
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.foreach { ledgerEnd =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.interning.StringInterningView
import scalaz.Tag

import scala.concurrent.duration._
Expand Down Expand Up @@ -67,9 +68,11 @@ object IndexMetadata {
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("1"),
errorFactories = errorFactories,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = MutableLedgerEndCache(), // not used
errorFactories = errorFactories,
stringInterning =
new StringInterningView((_, _) => _ => Future.successful(Nil)), // not used
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import com.daml.platform.store.entries.{
PackageLedgerEntry,
PartyLedgerEntry,
}
import com.daml.platform.store.interning.StringInterning

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -759,6 +760,7 @@ private[platform] object JdbcLedgerDao {
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): LedgerReadDao =
new MeteredLedgerReadDao(
new JdbcLedgerDao(
Expand All @@ -773,7 +775,7 @@ private[platform] object JdbcLedgerDao {
enricher,
SequentialWriteDao.noop,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
Expand All @@ -795,6 +797,7 @@ private[platform] object JdbcLedgerDao {
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): LedgerDao =
new MeteredLedgerDao(
new JdbcLedgerDao(
Expand All @@ -809,7 +812,7 @@ private[platform] object JdbcLedgerDao {
enricher,
sequentialWriteDao,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
Expand All @@ -832,6 +835,7 @@ private[platform] object JdbcLedgerDao {
errorFactories: ErrorFactories,
storageBackendFactory: StorageBackendFactory,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): LedgerDao =
new MeteredLedgerDao(
new JdbcLedgerDao(
Expand All @@ -846,7 +850,7 @@ private[platform] object JdbcLedgerDao {
enricher,
sequentialWriteDao,
participantId,
storageBackendFactory.readStorageBackend(ledgerEndCache),
storageBackendFactory.readStorageBackend(ledgerEndCache, stringInterning),
storageBackendFactory.createParameterStorageBackend,
storageBackendFactory.createDeduplicationStorageBackend,
storageBackendFactory.createResetStorageBackend,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.daml.platform.store.backend.h2.H2StorageBackendFactory
import com.daml.platform.store.backend.oracle.OracleStorageBackendFactory
import com.daml.platform.store.backend.postgresql.PostgresStorageBackendFactory
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.interning.StringInterning

trait StorageBackendFactory {
def createIngestionStorageBackend: IngestionStorageBackend[_]
Expand All @@ -16,23 +17,32 @@ trait StorageBackendFactory {
def createPartyStorageBackend(ledgerEndCache: LedgerEndCache): PartyStorageBackend
def createPackageStorageBackend(ledgerEndCache: LedgerEndCache): PackageStorageBackend
def createDeduplicationStorageBackend: DeduplicationStorageBackend
def createCompletionStorageBackend: CompletionStorageBackend
def createContractStorageBackend(ledgerEndCache: LedgerEndCache): ContractStorageBackend
def createEventStorageBackend(ledgerEndCache: LedgerEndCache): EventStorageBackend
def createCompletionStorageBackend(stringInterning: StringInterning): CompletionStorageBackend
def createContractStorageBackend(
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): ContractStorageBackend
def createEventStorageBackend(
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): EventStorageBackend
def createDataSourceStorageBackend: DataSourceStorageBackend
def createDBLockStorageBackend: DBLockStorageBackend
def createIntegrityStorageBackend: IntegrityStorageBackend
def createResetStorageBackend: ResetStorageBackend
def createStringInterningStorageBackend: StringInterningStorageBackend

final def readStorageBackend(ledgerEndCache: LedgerEndCache): ReadStorageBackend =
final def readStorageBackend(
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
): ReadStorageBackend =
ReadStorageBackend(
configurationStorageBackend = createConfigurationStorageBackend(ledgerEndCache),
partyStorageBackend = createPartyStorageBackend(ledgerEndCache),
packageStorageBackend = createPackageStorageBackend(ledgerEndCache),
completionStorageBackend = createCompletionStorageBackend,
contractStorageBackend = createContractStorageBackend(ledgerEndCache),
eventStorageBackend = createEventStorageBackend(ledgerEndCache),
completionStorageBackend = createCompletionStorageBackend(stringInterning),
contractStorageBackend = createContractStorageBackend(ledgerEndCache, stringInterning),
eventStorageBackend = createEventStorageBackend(ledgerEndCache, stringInterning),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.platform.store.backend.common

import java.sql.Connection

import anorm.SqlParser.{byteArray, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse
Expand All @@ -17,12 +18,15 @@ import com.daml.platform.store.CompletionFromTransaction
import com.daml.platform.store.Conversions.{offset, timestampFromMicros}
import com.daml.platform.store.backend.CompletionStorageBackend
import com.daml.platform.store.backend.common.ComposableQuery.SqlStringInterpolation
import com.daml.platform.store.interning.StringInterning
import com.google.protobuf.any
import com.google.rpc.status.{Status => StatusProto}

class CompletionStorageBackendTemplate(queryStrategy: QueryStrategy)
extends CompletionStorageBackend {

class CompletionStorageBackendTemplate(
queryStrategy: QueryStrategy,
stringInterning: StringInterning,
) extends CompletionStorageBackend {
assert(stringInterning != null) // TODO remove
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)

override def commandCompletions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
KeyState,
KeyUnassigned,
}
import com.daml.platform.store.interning.StringInterning

import scala.util.{Failure, Success, Try}

class ContractStorageBackendTemplate(
queryStrategy: QueryStrategy,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
) extends ContractStorageBackend {

assert(stringInterning != null) // TODO remove
override def contractKeyGlobally(key: Key)(connection: Connection): Option[ContractId] =
contractKey(
resultColumns = List("contract_id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeP
import com.daml.platform.store.backend.EventStorageBackend.RawTransactionEvent
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import com.daml.platform.store.cache.LedgerEndCache
import com.daml.platform.store.interning.StringInterning

import scala.collection.compat.immutable.ArraySeq

abstract class EventStorageBackendTemplate(
eventStrategy: EventStrategy,
queryStrategy: QueryStrategy,
ledgerEndCache: LedgerEndCache,
stringInterning: StringInterning,
// TODO Refactoring: This method is needed in pruneEvents, but belongs to [[ParameterStorageBackend]].
// Remove with the break-out of pruneEvents.
participantAllDivulgedContractsPrunedUpToInclusive: Connection => Option[Offset],
) extends EventStorageBackend {
import com.daml.platform.store.Conversions.ArrayColumnToStringArray.arrayColumnToStringArray

assert(stringInterning != null) // TODO remove
private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)

private val selectColumnsForFlatTransactions =
Expand Down
Loading

0 comments on commit fd61d0b

Please sign in to comment.