Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA PoC behind a feature flag [DPP-426] #10227

Merged
merged 21 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ compile_deps = [
"@maven//:io_opentelemetry_opentelemetry_api",
"@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:org_slf4j_slf4j_api",
# this oracle import is problematic for daml assistant build
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:com_oracle_database_jdbc_ojdbc8",
nmarton-da marked this conversation as resolved.
Show resolved Hide resolved
]

scala_compile_deps = [
Expand All @@ -79,9 +81,6 @@ scala_compile_deps = [

runtime_deps = [
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_h2database_h2",
"@maven//:org_postgresql_postgresql",
"@maven//:com_oracle_database_jdbc_ojdbc8",
]

da_scala_library(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.platform.indexer
import com.daml.lf.data.Ref
import com.daml.platform.configuration.IndexConfiguration
import com.daml.platform.indexer.IndexerConfig._
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.store.DbType

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand All @@ -32,6 +33,7 @@ case class IndexerConfig(
tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond,
batchWithinMillis: Long = DefaultBatchWithinMillis,
enableCompression: Boolean = DefaultEnableCompression,
haConfig: HaConfig = HaConfig(),
)

object IndexerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ import com.daml.platform.common
import com.daml.platform.common.MismatchException
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.parallel.ParallelIndexerFactory
import com.daml.platform.store.DbType.{
AsynchronousCommit,
LocalSynchronousCommit,
SynchronousCommit,
}
import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig
import com.daml.platform.store.backend.StorageBackend
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.dao.LedgerDao
import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache}

Expand Down Expand Up @@ -135,7 +142,6 @@ object JdbcIndexer {
servicesExecutionContext,
metrics,
lfValueTranslationCache,
jdbcAsyncCommitMode = config.asyncCommitMode,
enricher = None,
participantId = config.participantId,
)
Expand Down Expand Up @@ -172,6 +178,16 @@ object JdbcIndexer {
tailingRateLimitPerSecond = config.tailingRateLimitPerSecond,
batchWithinMillis = config.batchWithinMillis,
metrics = metrics,
dataSourceConfig = DataSourceConfig(
postgresConfig = PostgresDataSourceConfig(
synchronousCommit = Some(config.asyncCommitMode match {
case SynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.On
case AsynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Off
case LocalSynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Local
})
)
),
haConfig = config.haConfig,
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.sql.Connection

import akka.actor.Scheduler
import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode}
import com.daml.platform.store.backend.DBLockStorageBackend
import javax.sql.DataSource

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

/** A handle of a running program
* @param completed will complete right after the program completed
* - if no KillSwitch used,
* - it will complete successfully as program successfully ends
* - it will complete with the same failure that failed the program
* - if KillSwitch aborted, this completes with the same Throwable
* - if KillSwitch shut down, this completes successfully
* After signalling completion, the program finished it's execution and has released all resources it acquired.
* @param killSwitch to signal abortion and shutdown
*/
case class Handle(completed: Future[Unit], killSwitch: KillSwitch)

/** This functionality initializes a worker Connection, and clears it for further usage.
* This only needs to be done once at the beginning of the Connection life-cycle
* Initialization errors are signaled by throwing an exception.
*/
trait ConnectionInitializer {
def initialize(connection: Connection): Unit
}

/** To add High Availability related features to a program, which intends to use database-connections to do it's work.
* Features include:
* - Safety: mutual exclusion of these programs ensured by DB locking mechanisms
* - Availability: release of the exclusion is detected by idle programs, which start competing for the lock to do
* their work.
*/
trait HaCoordinator {

/** Execute in High Availability mode.
* Wraps around the Handle of the execution.
*
* @param initializeExecution HaCoordinator provides a ConnectionInitializer that must to be used for all database connections during execution
* Future[Handle] embodies asynchronous initialization of the execution
* (e.g. not the actual work. That asynchronous execution completes with the completed Future of the Handle)
* @return the new Handle, which is available immediately to observe and interact with the complete program here
*/
def protectedExecution(initializeExecution: ConnectionInitializer => Future[Handle]): Handle
}

case class HaConfig(
mainLockAquireRetryMillis: Long = 500,
workerLockAquireRetryMillis: Long = 500,
workerLockAquireMaxRetry: Long = 1000,
mainLockCheckerPeriodMillis: Long = 1000,
meiersi-da marked this conversation as resolved.
Show resolved Hide resolved
indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
indexerWorkerLockId: Int = 0x646d6c01,
enable: Boolean = false, // TODO ha: remove as stable
)

object HaCoordinator {

private val logger = ContextualizedLogger.get(this.getClass)

/** This implementation of the HaCoordinator
* - provides a database lock based isolation of the protected executions
* - will run the execution at-most once during the entire lifecycle
* - will wait infinitely to acquire the lock needed to start the protected execution
* - provides a ConnectionInitializer function which is mandatory to execute on all worker connections during execution
* - will spawn a polling-daemon to observe continuous presence of the main lock
*
* @param dataSource to spawn the main connection which keeps the Indexer Main Lock
* @param storageBackend is the database-independent abstraction of session/connection level database locking
* @param executionContext which is use to execute initialisation, will do blocking/IO work, so dedicated execution context is recommended
*/
def databaseLockBasedHaCoordinator(
dataSource: DataSource,
storageBackend: DBLockStorageBackend,
executionContext: ExecutionContext,
scheduler: Scheduler,
haConfig: HaConfig,
)(implicit loggingContext: LoggingContext): HaCoordinator = {
implicit val ec: ExecutionContext = executionContext

val indexerLockId = storageBackend.lock(haConfig.indexerLockId)
val indexerWorkerLockId = storageBackend.lock(haConfig.indexerWorkerLockId)
val preemptableSequence = PreemptableSequence(scheduler)

new HaCoordinator {
nmarton-da marked this conversation as resolved.
Show resolved Hide resolved
override def protectedExecution(
initializeExecution: ConnectionInitializer => Future[Handle]
): Handle = {
def acquireLock(connection: Connection, lockId: LockId, lockMode: LockMode): Lock = {
logger.debug(s"Acquiring lock $lockId $lockMode")
storageBackend
.tryAcquire(lockId, lockMode)(connection)
.getOrElse(
throw new Exception(s"Cannot acquire lock $lockId in lock-mode $lockMode: lock busy")
)
}

def acquireMainLock(connection: Connection): Unit = {
acquireLock(connection, indexerLockId, LockMode.Exclusive)
()
}

preemptableSequence.executeSequence { sequenceHelper =>
import sequenceHelper._
logger.info("Starting databaseLockBasedHaCoordinator")
for {
mainConnection <- go[Connection](dataSource.getConnection)
_ = logger.info("Step 1: creating main-connection - DONE")
nmarton-da marked this conversation as resolved.
Show resolved Hide resolved
_ = registerRelease {
logger.info("Releasing main connection...")
mainConnection.close()
logger.info("Released main connection")
}
_ <- retry(haConfig.mainLockAquireRetryMillis)(acquireMainLock(mainConnection))
_ = logger.info("Step 2: acquire exclusive Indexer Main Lock on main-connection - DONE")
exclusiveWorkerLock <- retry[Lock](
haConfig.workerLockAquireRetryMillis,
haConfig.workerLockAquireMaxRetry,
)(
acquireLock(mainConnection, indexerWorkerLockId, LockMode.Exclusive)
)
_ = logger.info(
"Step 3: acquire exclusive Indexer Worker Lock on main-connection - DONE"
)
_ <- go(storageBackend.release(exclusiveWorkerLock)(mainConnection))
_ = logger.info(
"Step 4: release exclusive Indexer Worker Lock on main-connection - DONE"
)
mainLockChecker <- go[PollingChecker](
new PollingChecker(
periodMillis = haConfig.mainLockCheckerPeriodMillis,
checkBody = acquireMainLock(mainConnection),
killSwitch =
handle.killSwitch, // meaning: this PollingChecker will shut down the main preemptableSequence
)
)
_ = logger.info(
"Step 5: activate periodic checker of the exclusive Indexer Main Lock on the main connection - DONE"
)
_ = registerRelease {
logger.info(
"Releasing periodic checker of the exclusive Indexer Main Lock on the main connection..."
)
mainLockChecker.close()
logger.info(
"Released periodic checker of the exclusive Indexer Main Lock on the main connection"
)
}
protectedHandle <- goF(initializeExecution(workerConnection => {
// this is the checking routine on connection creation
// step 1: acquire shared worker-lock
logger.info(s"Preparing worker connection. Step 1: acquire lock.")
acquireLock(workerConnection, indexerWorkerLockId, LockMode.Shared)
// step 2: check if main connection still holds the lock
logger.info(s"Preparing worker connection. Step 2: checking main lock.")
mainLockChecker.check()
logger.info(s"Preparing worker connection DONE.")
}))
_ = logger.info("Step 6: initialize protected execution - DONE")
_ <- merge(protectedHandle)
} yield ()
}
}
}
}
}

object NoopHaCoordinator extends HaCoordinator {
override def protectedExecution(
initializeExecution: ConnectionInitializer => Future[Handle]
): Handle =
Await.result(initializeExecution(_ => ()), Duration.Inf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.util.concurrent.atomic.AtomicReference

import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}

/** This KillSwitch captures it's usage in it's internal state, which can be queried.
* Captured state is available with the 'state' method.
*
* Rules of state transitions:
* - Shutdown is always the final state
* - From multiple aborts, the last abort wins
*
* With setDelegate() we can set a delegate KillSwitch, which to usage will be replayed
*/
class KillSwitchCaptor(implicit loggingContext: LoggingContext) extends KillSwitch {
import KillSwitchCaptor._
import State._

private val logger = ContextualizedLogger.get(this.getClass)

private val _state = new AtomicReference[State](Unused)
private val _delegate = new AtomicReference[Option[KillSwitch]](None)

private def updateState(newState: Used): Unit = {
_state.getAndAccumulate(
newState,
{
case (Shutdown, _) => Shutdown
case (_, used) => used
},
)
()
}

override def shutdown(): Unit = {
logger.info("Shutdown called!")
updateState(Shutdown)
_delegate.get.foreach { ks =>
logger.info("Shutdown call delegated!")
ks.shutdown()
}
}

override def abort(ex: Throwable): Unit = {
logger.info(s"Abort called! (${ex.getMessage})")
updateState(Aborted(ex))
_delegate.get.foreach { ks =>
logger.info(s"Abort call delegated! (${ex.getMessage})")
ks.abort(ex)
}
}

def state: State = _state.get()
def setDelegate(delegate: Option[KillSwitch]): Unit = _delegate.set(delegate)
}

object KillSwitchCaptor {
sealed trait State
object State {
case object Unused extends State
sealed trait Used extends State
case object Shutdown extends Used
final case class Aborted(t: Throwable) extends Used
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.indexer.ha

import java.util.{Timer, TimerTask}

import akka.stream.KillSwitch
import com.daml.logging.{ContextualizedLogger, LoggingContext}

import scala.util.{Failure, Success, Try}

/** A simple host of checking.
* - This will ensure that checkBody is accessed by only one caller at a time
* - Does periodic checking
* - Exposes check() for on-demand checking from the outside
* - If whatever check() fails, it uses killSwitch with an abort
* - It is also an AutoCloseable to release internal resources
*
* @param periodMillis period of the checking, between each scheduled checks there will be so much delay
* @param checkBody the check function, Exception signals failed check
* @param killSwitch to abort if a check fails
*/
class PollingChecker(
nmarton-da marked this conversation as resolved.
Show resolved Hide resolved
periodMillis: Long,
checkBody: => Unit,
killSwitch: KillSwitch,
)(implicit loggingContext: LoggingContext)
extends AutoCloseable {
private val logger = ContextualizedLogger.get(this.getClass)

private val timer = new Timer(true)

timer.schedule(
new TimerTask {
override def run(): Unit = {
Try(check())
()
}
},
periodMillis,
periodMillis,
)

// This is a cruel approach for ensuring single threaded usage of checkBody.
// In theory this could have been made much more efficient: not enqueueing for a check of it's own,
// but collecting requests, and replying in batches.
// Current usage of this class does not necessarily motivate further optimizations: used from HaCoordinator
// to check Indexer Main Lock seems to be sufficiently fast even in peak scenario: the initialization of the
// complete pool.
def check(): Unit = synchronized {
nmarton-da marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(s"Checking...")
Try(checkBody) match {
case Success(_) =>
logger.debug(s"Check successful.")

case Failure(ex) =>
logger.info(s"Check failed (${ex.getMessage}). KillSwitch/abort called.")
killSwitch.abort(new Exception("check failed, killSwitch aborted", ex))
throw ex
}
}

def close(): Unit = timer.cancel()
}
Loading