Skip to content

Commit

Permalink
Refactoring store factories [DPP-709] (#11572)
Browse files Browse the repository at this point in the history
* Pulling up DbDispatcher factory form JdbcLedgerDao
* Adapt usage
* Moving ReadStorageBackend factory
* Minor SQL fixing (parameter join remnants)

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Nov 8, 2021
1 parent 41fb289 commit 5dee88b
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import com.daml.platform.common.{LedgerIdNotFoundException, MismatchException}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{
DbDispatcher,
JdbcLedgerDao,
LedgerDaoTransactionsReader,
LedgerReadDao,
}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.{LedgerEndCache, MutableLedgerEndCache}
import com.daml.platform.store.{BaseLedger, LfValueTranslationCache}
import com.daml.platform.store.{BaseLedger, DbType, LfValueTranslationCache}
import com.daml.resources.ProgramResource.StartupException
import com.daml.timer.RetryStrategy

Expand Down Expand Up @@ -65,9 +67,25 @@ private[platform] object ReadOnlySqlLedger {

override def acquire()(implicit context: ResourceContext): Resource[ReadOnlySqlLedger] = {
val ledgerEndCache = MutableLedgerEndCache()
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
for {
ledgerDao <- ledgerDaoOwner(servicesExecutionContext, errorFactories, ledgerEndCache)
dbDispatcher <- DbDispatcher
.owner(
dataSource =
storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = serverRole,
connectionPoolSize = databaseConnectionPoolSize,
connectionTimeout = databaseConnectionTimeout,
metrics = metrics,
)
.acquire()
ledgerDao = ledgerDaoOwner(
servicesExecutionContext,
errorFactories,
ledgerEndCache,
dbDispatcher,
storageBackendFactory,
)
ledgerId <- Resource.fromFuture(verifyLedgerId(ledgerDao, initialLedgerId))
ledger <- ledgerOwner(ledgerDao, ledgerId, ledgerEndCache).acquire()
} yield ledger
Expand Down Expand Up @@ -144,21 +162,21 @@ private[platform] object ReadOnlySqlLedger {
servicesExecutionContext: ExecutionContext,
errorFactories: ErrorFactories,
ledgerEndCache: LedgerEndCache,
): ResourceOwner[LedgerReadDao] =
JdbcLedgerDao.readOwner(
serverRole,
jdbcUrl,
databaseConnectionPoolSize,
databaseConnectionTimeout,
eventsPageSize,
eventsProcessingParallelism,
servicesExecutionContext,
metrics,
lfValueTranslationCache,
Some(enricher),
participantId,
errorFactories,
ledgerEndCache,
dbDispatcher: DbDispatcher,
storageBackendFactory: StorageBackendFactory,
): LedgerReadDao =
JdbcLedgerDao.read(
dbDispatcher = dbDispatcher,
eventsPageSize = eventsPageSize,
eventsProcessingParallelism = eventsProcessingParallelism,
servicesExecutionContext = servicesExecutionContext,
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
enricher = Some(enricher),
participantId = participantId,
storageBackendFactory = storageBackendFactory,
ledgerEndCache = ledgerEndCache,
errorFactories = errorFactories,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ private[platform] sealed abstract class DbType(
if (supportsParallelWrites) maxConnections else 1
}

private[platform] object DbType {
object DbType {
object Postgres
extends DbType(
"postgres",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.server.api.validation.ErrorFactories
import com.daml.platform.store.appendonlydao.{DbDispatcher, JdbcLedgerDao}
import com.daml.platform.store.backend.StorageBackendFactory
import com.daml.platform.store.cache.MutableLedgerEndCache
import scalaz.Tag

Expand Down Expand Up @@ -44,24 +46,33 @@ object IndexMetadata {
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
) =
com.daml.platform.store.appendonlydao.JdbcLedgerDao.readOwner(
serverRole = ServerRole.ReadIndexMetadata,
jdbcUrl = jdbcUrl,
connectionPoolSize = 1,
connectionTimeout = 250.millis,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = new Metrics(new MetricRegistry),
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
// No participant ID is available for the dump index meta path,
// and this property is not needed for the used ReadDao.
participantId = Ref.ParticipantId.assertFromString("1"),
errorFactories = errorFactories,
ledgerEndCache = MutableLedgerEndCache(), // not used
)
) = {
val storageBackendFactory = StorageBackendFactory.of(DbType.jdbcType(jdbcUrl))
val metrics = new Metrics(new MetricRegistry)
DbDispatcher
.owner(
dataSource = storageBackendFactory.createDataSourceStorageBackend.createDataSource(jdbcUrl),
serverRole = ServerRole.ReadIndexMetadata,
connectionPoolSize = 1,
connectionTimeout = 250.millis,
metrics = metrics,
)
.map(dbDispatcher =>
JdbcLedgerDao.read(
dbDispatcher = dbDispatcher,
eventsPageSize = 1000,
eventsProcessingParallelism = 8,
servicesExecutionContext = executionContext,
metrics = metrics,
lfValueTranslationCache = LfValueTranslationCache.Cache.none,
enricher = None,
participantId = Ref.ParticipantId.assertFromString("1"),
storageBackendFactory = storageBackendFactory,
ledgerEndCache = MutableLedgerEndCache(), // not used
errorFactories = errorFactories,
)
)
}

private val Empty = "<empty>"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[platform] final class DbDispatcher private (
}
}

private[platform] object DbDispatcher {
object DbDispatcher {
private val logger = ContextualizedLogger.get(this.getClass)

def owner(
Expand Down
Loading

0 comments on commit 5dee88b

Please sign in to comment.