Skip to content

Commit

Permalink
Implemented in-memory buffers prunning in BaseLedger (#9936)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored Jun 17, 2021
1 parent b96639e commit eee484b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.engine.ValueEnricher
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.PruneBuffers
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.{LedgerIdNotFoundException, MismatchException}
import com.daml.platform.configuration.ServerRole
Expand Down Expand Up @@ -161,13 +162,15 @@ private[index] abstract class ReadOnlySqlLedger(
ledgerDao: LedgerReadDao,
ledgerDaoTransactionsReader: LedgerDaoTransactionsReader,
contractStore: ContractStore,
pruneBuffers: PruneBuffers,
dispatcher: Dispatcher[Offset],
)(implicit mat: Materializer, loggingContext: LoggingContext)
extends BaseLedger(
ledgerId,
ledgerDao,
ledgerDaoTransactionsReader,
contractStore,
pruneBuffers,
dispatcher,
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.engine.ValueEnricher
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.{PruneBuffers, PruneBuffersNoOp}
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.index.ReadOnlySqlLedgerWithMutableCache.DispatcherLagMeter
Expand Down Expand Up @@ -137,6 +138,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
ledgerDao,
ledgerDao.transactionsReader,
contractStore,
PruneBuffersNoOp,
cacheUpdatesDispatcher,
generalDispatcher,
dispatcherLagMeter,
Expand All @@ -150,7 +152,6 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
generalDispatcher: Dispatcher[Offset],
dispatcherLagMeter: DispatcherLagMeter,
)(implicit resourceContext: ResourceContext) = {
// TODO: in-memory fan-out - wire-up prunning to transactionsBuffer
val transactionsBuffer = new EventsBuffer[Offset, TransactionLogUpdate](
maxBufferSize = maxTransactionsInMemoryFanOutBufferSize,
metrics = metrics,
Expand Down Expand Up @@ -185,9 +186,9 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
ledger <- ResourceOwner
.forCloseable(() =>
new ReadOnlySqlLedgerWithMutableCache(
ledgerId,
ledgerDao,
BufferedTransactionsReader(
ledgerId = ledgerId,
ledgerDao = ledgerDao,
ledgerDaoTransactionsReader = BufferedTransactionsReader(
delegate = ledgerDao.transactionsReader,
transactionsBuffer = transactionsBuffer,
lfValueTranslation = new LfValueTranslation(
Expand All @@ -199,10 +200,11 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
),
metrics = metrics,
),
contractStore,
cacheUpdatesDispatcher,
generalDispatcher,
dispatcherLagMeter,
pruneBuffers = transactionsBuffer.prune,
contractStore = contractStore,
contractStateEventsDispatcher = cacheUpdatesDispatcher,
dispatcher = generalDispatcher,
dispatcherLagger = dispatcherLagMeter,
)
)
.acquire()
Expand Down Expand Up @@ -251,6 +253,7 @@ private final class ReadOnlySqlLedgerWithMutableCache(
ledgerDao: LedgerReadDao,
ledgerDaoTransactionsReader: LedgerDaoTransactionsReader,
contractStore: MutableCacheBackedContractStore,
pruneBuffers: PruneBuffers,
contractStateEventsDispatcher: Dispatcher[(Offset, Long)],
dispatcher: Dispatcher[Offset],
dispatcherLagger: DispatcherLagMeter,
Expand All @@ -260,6 +263,7 @@ private final class ReadOnlySqlLedgerWithMutableCache(
ledgerDao,
ledgerDaoTransactionsReader,
contractStore,
pruneBuffers,
dispatcher,
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import com.daml.ledger.participant.state.index.v2.ContractStore
import com.daml.ledger.participant.state.v1.Offset
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.LoggingContext
import com.daml.platform.PruneBuffersNoOp
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.store.LfValueTranslationCache
import com.daml.platform.store.cache.TranslationCacheBackedContractStore
import com.daml.platform.store.dao.LedgerReadDao

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

private[index] object ReadOnlySqlLedgerWithTranslationCache {

Expand Down Expand Up @@ -73,6 +74,7 @@ private final class ReadOnlySqlLedgerWithTranslationCache(
ledgerDao,
ledgerDao.transactionsReader,
contractStore,
PruneBuffersNoOp,
dispatcher,
) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml

import com.daml.ledger.participant.state.v1.Offset

/** Type aliases used throughout the package */
package object platform {
type PruneBuffers = Offset => Unit
val PruneBuffersNoOp: PruneBuffers = _ => ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value
import com.daml.lf.value.Value.{ContractId, ContractInst}
import com.daml.logging.LoggingContext
import com.daml.platform.PruneBuffers
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.akkastreams.dispatcher.SubSource.RangeSource
import com.daml.platform.store.dao.{LedgerDaoTransactionsReader, LedgerReadDao}
Expand All @@ -46,6 +47,7 @@ private[platform] abstract class BaseLedger(
ledgerDao: LedgerReadDao,
transactionsReader: LedgerDaoTransactionsReader,
contractStore: ContractStore,
pruneBuffers: PruneBuffers,
dispatcher: Dispatcher[Offset],
) extends ReadOnlyLedger {

Expand Down Expand Up @@ -204,8 +206,10 @@ private[platform] abstract class BaseLedger(

override def prune(pruneUpToInclusive: Offset)(implicit
loggingContext: LoggingContext
): Future[Unit] =
): Future[Unit] = {
pruneBuffers(pruneUpToInclusive)
ledgerDao.prune(pruneUpToInclusive)
}

override def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ object MutableCacheBackedContractStore {

def owner(
contractsReader: LedgerDaoContractsReader,
signalNewLedgerHead: Offset => Unit,
signalNewLedgerHead: SignalNewLedgerHead,
metrics: Metrics,
maxContractsCacheSize: Long,
maxKeyCacheSize: Long,
Expand All @@ -325,7 +325,7 @@ object MutableCacheBackedContractStore {
def ownerWithSubscription(
subscribeToContractStateEvents: SubscribeToContractStateEvents,
contractsReader: LedgerDaoContractsReader,
signalNewLedgerHead: Offset => Unit,
signalNewLedgerHead: SignalNewLedgerHead,
metrics: Metrics,
maxContractsCacheSize: Long,
maxKeyCacheSize: Long,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store

import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.participant.state.v1.Offset
import com.daml.logging.LoggingContext
import com.daml.platform.PruneBuffers
import com.daml.platform.store.dao.LedgerReadDao
import org.mockito.MockitoSugar
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class BaseLedgerSpec extends AnyWordSpec with MockitoSugar with Matchers {
private val ledgerDao = mock[LedgerReadDao]
private val pruneBuffers = mock[PruneBuffers]
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private val baseLedger = new BaseLedger(
ledgerDao = ledgerDao,
pruneBuffers = pruneBuffers,
ledgerId = LedgerId("some ledger id"),
transactionsReader = null,
contractStore = null,
dispatcher = null,
) {}

classOf[BaseLedger].getSimpleName when {
"prune" should {
"trigger prune on the ledger read dao and the buffers" in {
val someOffset = Offset.fromByteArray(BigInt(1337L).toByteArray)
baseLedger.prune(someOffset)

verify(ledgerDao).prune(someOffset)
verify(pruneBuffers).apply(someOffset)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.daml.lf.transaction.TransactionCommitter
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset.ApiOffsetConverter
import com.daml.platform.PruneBuffersNoOp
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.common.{LedgerIdMode, MismatchException}
import com.daml.platform.configuration.ServerRole
Expand Down Expand Up @@ -362,7 +363,14 @@ private final class SqlLedger(
timeProvider: TimeProvider,
persistenceQueue: PersistenceQueue,
transactionCommitter: TransactionCommitter,
) extends BaseLedger(ledgerId, ledgerDao, ledgerDao.transactionsReader, contractStore, dispatcher)
) extends BaseLedger(
ledgerId,
ledgerDao,
ledgerDao.transactionsReader,
contractStore,
PruneBuffersNoOp,
dispatcher,
)
with Ledger {

private val logger = ContextualizedLogger.get(this.getClass)
Expand Down

0 comments on commit eee484b

Please sign in to comment.