Skip to content

Commit

Permalink
Wiring of HaCoordinator in parallel indexer
Browse files Browse the repository at this point in the history
changelog_begin
changelog_end
  • Loading branch information
nmarton-da committed Jul 10, 2021
1 parent b0553c6 commit 81b0f1f
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.platform.indexer
import com.daml.ledger.participant.state.v1.ParticipantId
import com.daml.platform.configuration.IndexConfiguration
import com.daml.platform.indexer.IndexerConfig._
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.store.DbType

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand All @@ -23,6 +24,7 @@ case class IndexerConfig(
allowExistingSchema: Boolean = false,
// TODO append-only: remove after removing support for the current (mutating) schema
enableAppendOnlySchema: Boolean = false,
// TODO append-only: this is now configuring only the append-only pool
asyncCommitMode: DbType.AsyncCommitMode = DefaultAsyncCommitMode,
maxInputBufferSize: Int = DefaultMaxInputBufferSize,
inputMappingParallelism: Int = DefaultInputMappingParallelism,
Expand All @@ -32,6 +34,7 @@ case class IndexerConfig(
tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond,
batchWithinMillis: Long = DefaultBatchWithinMillis,
enableCompression: Boolean = DefaultEnableCompression,
haConfig: HaConfig = HaConfig(),
)

object IndexerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ import com.daml.platform.common
import com.daml.platform.common.MismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.parallel.ParallelIndexerFactory
import com.daml.platform.store.DbType.{
AsynchronousCommit,
LocalSynchronousCommit,
SynchronousCommit,
}
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.DataSourceStorageBackend.{
DataSourceConfig,
PgSynchronousCommitValue,
}
import com.daml.platform.store.backend.StorageBackend
import com.daml.platform.store.dao.LedgerDao
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}
Expand Down Expand Up @@ -170,6 +179,14 @@ object JdbcIndexer {
tailingRateLimitPerSecond = config.tailingRateLimitPerSecond,
batchWithinMillis = config.batchWithinMillis,
metrics = metrics,
dataSourceConfig = DataSourceConfig(
pgSynchronousCommit = Some(config.asyncCommitMode match {
case SynchronousCommit => PgSynchronousCommitValue.On
case AsynchronousCommit => PgSynchronousCommitValue.Off
case LocalSynchronousCommit => PgSynchronousCommitValue.Local
})
),
haConfig = config.haConfig,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,35 @@
package com.daml.platform.indexer.parallel

import java.sql.Connection
import java.util.concurrent.TimeUnit
import java.util.concurrent.{Executors, TimeUnit}

import akka.NotUsed
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches, Materializer, UniqueKillSwitch}
import com.daml.ledger.api.health.HealthStatus
import com.daml.ledger.participant.state.v1.{Offset, ParticipantId, ReadService, Update}
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.LoggingContext.{withEnrichedLoggingContext, withEnrichedLoggingContextFrom}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{InstrumentedSource, Metrics}
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.indexer.ha.{HaCoordinator, Handle, NoopHaCoordinator}
import com.daml.platform.indexer.parallel.AsyncSupport._
import com.daml.platform.indexer.{IndexFeedHandle, Indexer}
import com.daml.platform.store.appendonlydao.DbDispatcher
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.{DbDto, StorageBackend}
import com.daml.platform.store.{DbType, backend}
import com.daml.resources
import com.google.common.util.concurrent.ThreadFactoryBuilder

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal

object ParallelIndexerFactory {

private val keepAliveMaxIdleDuration = FiniteDuration(200, "millis")

private val logger = ContextualizedLogger.get(this.getClass)

def apply[DB_BATCH](
Expand All @@ -49,6 +50,8 @@ object ParallelIndexerFactory {
tailingRateLimitPerSecond: Int,
batchWithinMillis: Long,
metrics: Metrics,
dataSourceConfig: DataSourceConfig,
haConfig: HaConfig,
)(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = {
for {
inputMapperExecutor <- asyncPool(
Expand All @@ -61,26 +64,37 @@ object ParallelIndexerFactory {
"batching-pool",
Some(metrics.daml.parallelIndexer.batching.executor -> metrics.registry),
)
dbDispatcher <- DbDispatcher
.owner(
serverRole = ServerRole.Indexer,
jdbcUrl = jdbcUrl,
connectionPoolSize = ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
connectionAsyncCommitMode = DbType.AsynchronousCommit,
)
toDbDto = backend.UpdateToDbDto(
participantId = participantId,
translation = translation,
compressionStrategy = compressionStrategy,
)
haCoordinator <-
if (storageBackend.dbLockSupported) // TODO feature flag comes here
ResourceOwner
.forExecutorService(() =>
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build,
)
)
)
.map(
HaCoordinator.databaseLockBasedHaCoordinator(
dataSource = storageBackend.createDataSource(jdbcUrl),
storageBackend = storageBackend,
_,
scheduler = mat.system.scheduler,
haConfig = haConfig,
)
)
else
ResourceOwner.successful(NoopHaCoordinator)
} yield {
val ingest: Long => Source[(Offset, Update), NotUsed] => Source[Unit, NotUsed] =
initialSeqId =>
val ingest
: (Long, DbDispatcher) => Source[(Offset, Update), NotUsed] => Source[Unit, NotUsed] =
(initialSeqId, dbDispatcher) =>
source =>
BatchingParallelIngestionPipe(
submissionBatchSize = submissionBatchSize,
Expand All @@ -106,31 +120,56 @@ object ParallelIndexerFactory {
.map(_ -> System.nanoTime())
)
.map(_ => ())
.keepAlive(
keepAliveMaxIdleDuration,
() =>
if (dbDispatcher.currentHealth() == HealthStatus.healthy) {
logger.debug("Indexer keep-alive: database connectivity OK")
()
} else {
logger
.warn("Indexer keep-alive: database connectivity lost. Stopping indexing.")
throw new Exception(
"Connectivity issue to the index-database detected. Stopping indexing."
)
},
)

def subscribe(readService: ReadService): Future[Source[Unit, NotUsed]] =
dbDispatcher
.executeSql(metrics.daml.parallelIndexer.initialization)(storageBackend.initialize)
.map(initialized =>
ingest(initialized.lastEventSeqId.getOrElse(0L))(
readService.stateUpdates(beginAfter = initialized.lastOffset)
def subscribe(resourceContext: ResourceContext)(readService: ReadService): Handle = {
implicit val rc: ResourceContext = resourceContext
implicit val ec: ExecutionContext = resourceContext.executionContext
implicit val matImplicit: Materializer = mat
haCoordinator.protectedBlock { signConnection =>
val killSwitchCaptor = Promise[KillSwitch]()

val completionFuture = DbDispatcher
.owner(
dataSource = storageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(signConnection.sign),
),
serverRole = ServerRole.Indexer,
jdbcUrl = jdbcUrl,
connectionPoolSize =
ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
)(scala.concurrent.ExecutionContext.global)
.use { dbDispatcher =>
dbDispatcher
.executeSql(metrics.daml.parallelIndexer.initialization)(storageBackend.initialize)
.flatMap { initialized =>
val (killSwitch, completionFuture) =
ingest(initialized.lastEventSeqId.getOrElse(0L), dbDispatcher)(
readService.stateUpdates(beginAfter = initialized.lastOffset)
)
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.both)
.run()
// the tricky bit:
// the future in the completion handler will be this one
// but the future for signaling for the HaCoordinator, that the protected block is initialized needs to complete precisely here
killSwitchCaptor.success(killSwitch)
completionFuture
}
}

killSwitchCaptor.future
.map(Handle(completionFuture.map(_ => ()), _))
}
}

toIndexer(subscribe)(mat)
toIndexer(subscribe)
}
}

Expand Down Expand Up @@ -295,20 +334,17 @@ object ParallelIndexerFactory {
}

def toIndexer(
ingestionPipeOn: ReadService => Future[Source[Unit, NotUsed]]
)(implicit mat: Materializer): Indexer =
ingestionPipeOn: ResourceContext => ReadService => Handle
): Indexer =
readService =>
new ResourceOwner[IndexFeedHandle] {
override def acquire()(implicit
context: ResourceContext
): resources.Resource[ResourceContext, IndexFeedHandle] = {
Resource {
ingestionPipeOn(readService).map { pipe =>
val (killSwitch, completionFuture) = pipe
.viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch])
.toMat(Sink.ignore)(Keep.both)
.run()
new SubscriptionIndexFeedHandle(killSwitch, completionFuture.map(_ => ()))
Future {
val handle = ingestionPipeOn(context)(readService)
new SubscriptionIndexFeedHandle(handle.killSwitch, handle.completed)
}
} { handle =>
handle.killSwitch.shutdown()
Expand Down

0 comments on commit 81b0f1f

Please sign in to comment.