Skip to content

Commit

Permalink
HA PoC behind a feature flag [DPP-426] (#10227)
Browse files Browse the repository at this point in the history
* Removing previous Async commit features

Previous async commit features had
- async commit configured by conifg-param
- special treatments to stil force sync commit for certain threadpools
- special treatment to stil force sync commit on transaction level for certain transactions.

This is a preparation step to clean the path for adding a new approach for async commit treatment:
- only session/connection level async configuration
- no transaction level special treatments
- only enable async commit for specific Connection pools (where it is needed / is safe)

* Add DataSourceStorageBackend

- to spawn DataSources in a controlled fashion
  these will be needed in upcoming commits for the HikariCP
- DataSources can have Connection init hooks defined with the help of the InitHookDataSourceProxy (this is needed for HA implementation)
- added DataSourceConfig to capture needed level of fine-tuning for DataSource creation

* Switches to DataSource wrapping in HikariCP instantiation
* Adds DBLockStorageBackend

- this is the abstraction and the implementation of database level locking
- with support for Oracle and Postgres

* Adds HaCoordinator and implementation
* Wiring of HaCoordinator in parallel indexer
* Adds feature flag

changelog_begin
changelog_end
  • Loading branch information
nmarton-da authored Jul 21, 2021
1 parent 63739fa commit c0a24fe
Show file tree
Hide file tree
Showing 25 changed files with 1,043 additions and 267 deletions.
7 changes: 3 additions & 4 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ compile_deps = [
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:org_slf4j_slf4j_api",
# this oracle import is problematic for daml assistant build
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:com_oracle_database_jdbc_ojdbc8",
]

scala_compile_deps = [
Expand All @@ -79,9 +81,6 @@ scala_compile_deps = [

runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:com_oracle_database_jdbc_ojdbc8",
]

da_scala_library(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.platform.indexer
import com.daml.lf.data.Ref
import com.daml.platform.configuration.IndexConfiguration
import com.daml.platform.indexer.IndexerConfig._
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.store.DbType

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand All @@ -32,6 +33,7 @@ case class IndexerConfig(
tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond,
batchWithinMillis: Long = DefaultBatchWithinMillis,
enableCompression: Boolean = DefaultEnableCompression,
haConfig: HaConfig = HaConfig(),
)

object IndexerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ import com.daml.platform.common
import com.daml.platform.common.MismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.parallel.ParallelIndexerFactory
import com.daml.platform.store.DbType.{
AsynchronousCommit,
LocalSynchronousCommit,
SynchronousCommit,
}
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.StorageBackend
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.dao.LedgerDao
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}

Expand Down Expand Up @@ -135,7 +142,6 @@ object JdbcIndexer {
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommitMode = config.asyncCommitMode,
enricher = None,
participantId = config.participantId,
)
Expand Down Expand Up @@ -172,6 +178,16 @@ object JdbcIndexer {
tailingRateLimitPerSecond = config.tailingRateLimitPerSecond,
batchWithinMillis = config.batchWithinMillis,
metrics = metrics,
dataSourceConfig = DataSourceConfig(
postgresConfig = PostgresDataSourceConfig(
synchronousCommit = Some(config.asyncCommitMode match {
case SynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.On
case AsynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Off
case LocalSynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Local
})
)
),
haConfig = config.haConfig,
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.sql.Connection

import akka.actor.Scheduler
import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode}
import com.daml.platform.store.backend.DBLockStorageBackend
import javax.sql.DataSource

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

/** A handle of a running program
* @param completed will complete right after the program completed
* - if no KillSwitch used,
* - it will complete successfully as program successfully ends
* - it will complete with the same failure that failed the program
* - if KillSwitch aborted, this completes with the same Throwable
* - if KillSwitch shut down, this completes successfully
* After signalling completion, the program finished it's execution and has released all resources it acquired.
* @param killSwitch to signal abortion and shutdown
*/
case class Handle(completed: Future[Unit], killSwitch: KillSwitch)

/** This functionality initializes a worker Connection, and clears it for further usage.
* This only needs to be done once at the beginning of the Connection life-cycle
* Initialization errors are signaled by throwing an exception.
*/
trait ConnectionInitializer {
def initialize(connection: Connection): Unit
}

/** To add High Availability related features to a program, which intends to use database-connections to do it's work.
* Features include:
* - Safety: mutual exclusion of these programs ensured by DB locking mechanisms
* - Availability: release of the exclusion is detected by idle programs, which start competing for the lock to do
* their work.
*/
trait HaCoordinator {

/** Execute in High Availability mode.
* Wraps around the Handle of the execution.
*
* @param initializeExecution HaCoordinator provides a ConnectionInitializer that must to be used for all database connections during execution
* Future[Handle] embodies asynchronous initialization of the execution
* (e.g. not the actual work. That asynchronous execution completes with the completed Future of the Handle)
* @return the new Handle, which is available immediately to observe and interact with the complete program here
*/
def protectedExecution(initializeExecution: ConnectionInitializer => Future[Handle]): Handle
}

case class HaConfig(
mainLockAquireRetryMillis: Long = 500,
workerLockAquireRetryMillis: Long = 500,
workerLockAquireMaxRetry: Long = 1000,
mainLockCheckerPeriodMillis: Long = 1000,
indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
indexerWorkerLockId: Int = 0x646d6c01,
enable: Boolean = false, // TODO ha: remove as stable
)

object HaCoordinator {

private val logger = ContextualizedLogger.get(this.getClass)

/** This implementation of the HaCoordinator
* - provides a database lock based isolation of the protected executions
* - will run the execution at-most once during the entire lifecycle
* - will wait infinitely to acquire the lock needed to start the protected execution
* - provides a ConnectionInitializer function which is mandatory to execute on all worker connections during execution
* - will spawn a polling-daemon to observe continuous presence of the main lock
*
* @param dataSource to spawn the main connection which keeps the Indexer Main Lock
* @param storageBackend is the database-independent abstraction of session/connection level database locking
* @param executionContext which is use to execute initialisation, will do blocking/IO work, so dedicated execution context is recommended
*/
def databaseLockBasedHaCoordinator(
dataSource: DataSource,
storageBackend: DBLockStorageBackend,
executionContext: ExecutionContext,
scheduler: Scheduler,
haConfig: HaConfig,
)(implicit loggingContext: LoggingContext): HaCoordinator = {
implicit val ec: ExecutionContext = executionContext

val indexerLockId = storageBackend.lock(haConfig.indexerLockId)
val indexerWorkerLockId = storageBackend.lock(haConfig.indexerWorkerLockId)
val preemptableSequence = PreemptableSequence(scheduler)

new HaCoordinator {
override def protectedExecution(
initializeExecution: ConnectionInitializer => Future[Handle]
): Handle = {
def acquireLock(connection: Connection, lockId: LockId, lockMode: LockMode): Lock = {
logger.debug(s"Acquiring lock $lockId $lockMode")
storageBackend
.tryAcquire(lockId, lockMode)(connection)
.getOrElse(
throw new Exception(s"Cannot acquire lock $lockId in lock-mode $lockMode: lock busy")
)
}

def acquireMainLock(connection: Connection): Unit = {
acquireLock(connection, indexerLockId, LockMode.Exclusive)
()
}

preemptableSequence.executeSequence { sequenceHelper =>
import sequenceHelper._
logger.info("Starting databaseLockBasedHaCoordinator")
for {
mainConnection <- go[Connection](dataSource.getConnection)
_ = logger.info("Step 1: creating main-connection - DONE")
_ = registerRelease {
logger.info("Releasing main connection...")
mainConnection.close()
logger.info("Released main connection")
}
_ <- retry(haConfig.mainLockAquireRetryMillis)(acquireMainLock(mainConnection))
_ = logger.info("Step 2: acquire exclusive Indexer Main Lock on main-connection - DONE")
exclusiveWorkerLock <- retry[Lock](
haConfig.workerLockAquireRetryMillis,
haConfig.workerLockAquireMaxRetry,
)(
acquireLock(mainConnection, indexerWorkerLockId, LockMode.Exclusive)
)
_ = logger.info(
"Step 3: acquire exclusive Indexer Worker Lock on main-connection - DONE"
)
_ <- go(storageBackend.release(exclusiveWorkerLock)(mainConnection))
_ = logger.info(
"Step 4: release exclusive Indexer Worker Lock on main-connection - DONE"
)
mainLockChecker <- go[PollingChecker](
new PollingChecker(
periodMillis = haConfig.mainLockCheckerPeriodMillis,
checkBody = acquireMainLock(mainConnection),
killSwitch =
handle.killSwitch, // meaning: this PollingChecker will shut down the main preemptableSequence
)
)
_ = logger.info(
"Step 5: activate periodic checker of the exclusive Indexer Main Lock on the main connection - DONE"
)
_ = registerRelease {
logger.info(
"Releasing periodic checker of the exclusive Indexer Main Lock on the main connection..."
)
mainLockChecker.close()
logger.info(
"Released periodic checker of the exclusive Indexer Main Lock on the main connection"
)
}
protectedHandle <- goF(initializeExecution(workerConnection => {
// this is the checking routine on connection creation
// step 1: acquire shared worker-lock
logger.info(s"Preparing worker connection. Step 1: acquire lock.")
acquireLock(workerConnection, indexerWorkerLockId, LockMode.Shared)
// step 2: check if main connection still holds the lock
logger.info(s"Preparing worker connection. Step 2: checking main lock.")
mainLockChecker.check()
logger.info(s"Preparing worker connection DONE.")
}))
_ = logger.info("Step 6: initialize protected execution - DONE")
_ <- merge(protectedHandle)
} yield ()
}
}
}
}
}

object NoopHaCoordinator extends HaCoordinator {
override def protectedExecution(
initializeExecution: ConnectionInitializer => Future[Handle]
): Handle =
Await.result(initializeExecution(_ => ()), Duration.Inf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.util.concurrent.atomic.AtomicReference

import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}

/** This KillSwitch captures it's usage in it's internal state, which can be queried.
* Captured state is available with the 'state' method.
*
* Rules of state transitions:
* - Shutdown is always the final state
* - From multiple aborts, the last abort wins
*
* With setDelegate() we can set a delegate KillSwitch, which to usage will be replayed
*/
class KillSwitchCaptor(implicit loggingContext: LoggingContext) extends KillSwitch {
import KillSwitchCaptor._
import State._

private val logger = ContextualizedLogger.get(this.getClass)

private val _state = new AtomicReference[State](Unused)
private val _delegate = new AtomicReference[Option[KillSwitch]](None)

private def updateState(newState: Used): Unit = {
_state.getAndAccumulate(
newState,
{
case (Shutdown, _) => Shutdown
case (_, used) => used
},
)
()
}

override def shutdown(): Unit = {
logger.info("Shutdown called!")
updateState(Shutdown)
_delegate.get.foreach { ks =>
logger.info("Shutdown call delegated!")
ks.shutdown()
}
}

override def abort(ex: Throwable): Unit = {
logger.info(s"Abort called! (${ex.getMessage})")
updateState(Aborted(ex))
_delegate.get.foreach { ks =>
logger.info(s"Abort call delegated! (${ex.getMessage})")
ks.abort(ex)
}
}

def state: State = _state.get()
def setDelegate(delegate: Option[KillSwitch]): Unit = _delegate.set(delegate)
}

object KillSwitchCaptor {
sealed trait State
object State {
case object Unused extends State
sealed trait Used extends State
case object Shutdown extends Used
final case class Aborted(t: Throwable) extends Used
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.util.{Timer, TimerTask}

import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}

import scala.util.{Failure, Success, Try}

/** A simple host of checking.
* - This will ensure that checkBody is accessed by only one caller at a time
* - Does periodic checking
* - Exposes check() for on-demand checking from the outside
* - If whatever check() fails, it uses killSwitch with an abort
* - It is also an AutoCloseable to release internal resources
*
* @param periodMillis period of the checking, between each scheduled checks there will be so much delay
* @param checkBody the check function, Exception signals failed check
* @param killSwitch to abort if a check fails
*/
class PollingChecker(
periodMillis: Long,
checkBody: => Unit,
killSwitch: KillSwitch,
)(implicit loggingContext: LoggingContext)
extends AutoCloseable {
private val logger = ContextualizedLogger.get(this.getClass)

private val timer = new Timer(true)

timer.schedule(
new TimerTask {
override def run(): Unit = {
Try(check())
()
}
},
periodMillis,
periodMillis,
)

// This is a cruel approach for ensuring single threaded usage of checkBody.
// In theory this could have been made much more efficient: not enqueueing for a check of it's own,
// but collecting requests, and replying in batches.
// Current usage of this class does not necessarily motivate further optimizations: used from HaCoordinator
// to check Indexer Main Lock seems to be sufficiently fast even in peak scenario: the initialization of the
// complete pool.
def check(): Unit = synchronized {
logger.debug(s"Checking...")
Try(checkBody) match {
case Success(_) =>
logger.debug(s"Check successful.")

case Failure(ex) =>
logger.info(s"Check failed (${ex.getMessage}). KillSwitch/abort called.")
killSwitch.abort(new Exception("check failed, killSwitch aborted", ex))
throw ex
}
}

def close(): Unit = timer.cancel()
}
Loading

0 comments on commit c0a24fe

Please sign in to comment.