diff --git a/ledger/participant-integration-api/BUILD.bazel b/ledger/participant-integration-api/BUILD.bazel index e10916cb5204..7afdbf5fb002 100644 --- a/ledger/participant-integration-api/BUILD.bazel +++ b/ledger/participant-integration-api/BUILD.bazel @@ -62,7 +62,9 @@ compile_deps = [ "@maven//:io_opentelemetry_opentelemetry_api", "@maven//:io_opentelemetry_opentelemetry_context", "@maven//:org_slf4j_slf4j_api", - # this oracle import is problematic for daml assistant build + "@maven//:com_h2database_h2", + "@maven//:org_postgresql_postgresql", + "@maven//:com_oracle_database_jdbc_ojdbc8", ] scala_compile_deps = [ @@ -79,9 +81,6 @@ scala_compile_deps = [ runtime_deps = [ "@maven//:ch_qos_logback_logback_classic", - "@maven//:com_h2database_h2", - "@maven//:org_postgresql_postgresql", - "@maven//:com_oracle_database_jdbc_ojdbc8", ] da_scala_library( diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala index c005f8d42e2a..75d2bc828d99 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/IndexerConfig.scala @@ -6,6 +6,7 @@ package com.daml.platform.indexer import com.daml.lf.data.Ref import com.daml.platform.configuration.IndexConfiguration import com.daml.platform.indexer.IndexerConfig._ +import com.daml.platform.indexer.ha.HaConfig import com.daml.platform.store.DbType import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -32,6 +33,7 @@ case class IndexerConfig( tailingRateLimitPerSecond: Int = DefaultTailingRateLimitPerSecond, batchWithinMillis: Long = DefaultBatchWithinMillis, enableCompression: Boolean = DefaultEnableCompression, + haConfig: HaConfig = HaConfig(), ) object IndexerConfig { diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala index a9c8ffd5f807..d33b6cca4adc 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/JdbcIndexer.scala @@ -21,8 +21,15 @@ import com.daml.platform.common import com.daml.platform.common.MismatchException import com.daml.platform.configuration.ServerRole import com.daml.platform.indexer.parallel.ParallelIndexerFactory +import com.daml.platform.store.DbType.{ + AsynchronousCommit, + LocalSynchronousCommit, + SynchronousCommit, +} import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation} +import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig import com.daml.platform.store.backend.StorageBackend +import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig import com.daml.platform.store.dao.LedgerDao import com.daml.platform.store.{DbType, FlywayMigrations, LfValueTranslationCache} @@ -135,7 +142,6 @@ object JdbcIndexer { servicesExecutionContext, metrics, lfValueTranslationCache, - jdbcAsyncCommitMode = config.asyncCommitMode, enricher = None, participantId = config.participantId, ) @@ -172,6 +178,16 @@ object JdbcIndexer { tailingRateLimitPerSecond = config.tailingRateLimitPerSecond, batchWithinMillis = config.batchWithinMillis, metrics = metrics, + dataSourceConfig = DataSourceConfig( + postgresConfig = PostgresDataSourceConfig( + synchronousCommit = Some(config.asyncCommitMode match { + case SynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.On + case AsynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Off + case LocalSynchronousCommit => PostgresDataSourceConfig.SynchronousCommitValue.Local + }) + ) + ), + haConfig = config.haConfig, ) } diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala new file mode 100644 index 000000000000..6fdc20822e6a --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/HaCoordinator.scala @@ -0,0 +1,183 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.sql.Connection + +import akka.actor.Scheduler +import akka.stream.KillSwitch +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import com.daml.platform.store.backend.DBLockStorageBackend.{Lock, LockId, LockMode} +import com.daml.platform.store.backend.DBLockStorageBackend +import javax.sql.DataSource + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +/** A handle of a running program + * @param completed will complete right after the program completed + * - if no KillSwitch used, + * - it will complete successfully as program successfully ends + * - it will complete with the same failure that failed the program + * - if KillSwitch aborted, this completes with the same Throwable + * - if KillSwitch shut down, this completes successfully + * After signalling completion, the program finished it's execution and has released all resources it acquired. + * @param killSwitch to signal abortion and shutdown + */ +case class Handle(completed: Future[Unit], killSwitch: KillSwitch) + +/** This functionality initializes a worker Connection, and clears it for further usage. + * This only needs to be done once at the beginning of the Connection life-cycle + * Initialization errors are signaled by throwing an exception. + */ +trait ConnectionInitializer { + def initialize(connection: Connection): Unit +} + +/** To add High Availability related features to a program, which intends to use database-connections to do it's work. + * Features include: + * - Safety: mutual exclusion of these programs ensured by DB locking mechanisms + * - Availability: release of the exclusion is detected by idle programs, which start competing for the lock to do + * their work. + */ +trait HaCoordinator { + + /** Execute in High Availability mode. + * Wraps around the Handle of the execution. + * + * @param initializeExecution HaCoordinator provides a ConnectionInitializer that must to be used for all database connections during execution + * Future[Handle] embodies asynchronous initialization of the execution + * (e.g. not the actual work. That asynchronous execution completes with the completed Future of the Handle) + * @return the new Handle, which is available immediately to observe and interact with the complete program here + */ + def protectedExecution(initializeExecution: ConnectionInitializer => Future[Handle]): Handle +} + +case class HaConfig( + mainLockAquireRetryMillis: Long = 500, + workerLockAquireRetryMillis: Long = 500, + workerLockAquireMaxRetry: Long = 1000, + mainLockCheckerPeriodMillis: Long = 1000, + indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml" + indexerWorkerLockId: Int = 0x646d6c01, + enable: Boolean = false, // TODO ha: remove as stable +) + +object HaCoordinator { + + private val logger = ContextualizedLogger.get(this.getClass) + + /** This implementation of the HaCoordinator + * - provides a database lock based isolation of the protected executions + * - will run the execution at-most once during the entire lifecycle + * - will wait infinitely to acquire the lock needed to start the protected execution + * - provides a ConnectionInitializer function which is mandatory to execute on all worker connections during execution + * - will spawn a polling-daemon to observe continuous presence of the main lock + * + * @param dataSource to spawn the main connection which keeps the Indexer Main Lock + * @param storageBackend is the database-independent abstraction of session/connection level database locking + * @param executionContext which is use to execute initialisation, will do blocking/IO work, so dedicated execution context is recommended + */ + def databaseLockBasedHaCoordinator( + dataSource: DataSource, + storageBackend: DBLockStorageBackend, + executionContext: ExecutionContext, + scheduler: Scheduler, + haConfig: HaConfig, + )(implicit loggingContext: LoggingContext): HaCoordinator = { + implicit val ec: ExecutionContext = executionContext + + val indexerLockId = storageBackend.lock(haConfig.indexerLockId) + val indexerWorkerLockId = storageBackend.lock(haConfig.indexerWorkerLockId) + val preemptableSequence = PreemptableSequence(scheduler) + + new HaCoordinator { + override def protectedExecution( + initializeExecution: ConnectionInitializer => Future[Handle] + ): Handle = { + def acquireLock(connection: Connection, lockId: LockId, lockMode: LockMode): Lock = { + logger.debug(s"Acquiring lock $lockId $lockMode") + storageBackend + .tryAcquire(lockId, lockMode)(connection) + .getOrElse( + throw new Exception(s"Cannot acquire lock $lockId in lock-mode $lockMode: lock busy") + ) + } + + def acquireMainLock(connection: Connection): Unit = { + acquireLock(connection, indexerLockId, LockMode.Exclusive) + () + } + + preemptableSequence.executeSequence { sequenceHelper => + import sequenceHelper._ + logger.info("Starting databaseLockBasedHaCoordinator") + for { + mainConnection <- go[Connection](dataSource.getConnection) + _ = logger.info("Step 1: creating main-connection - DONE") + _ = registerRelease { + logger.info("Releasing main connection...") + mainConnection.close() + logger.info("Released main connection") + } + _ <- retry(haConfig.mainLockAquireRetryMillis)(acquireMainLock(mainConnection)) + _ = logger.info("Step 2: acquire exclusive Indexer Main Lock on main-connection - DONE") + exclusiveWorkerLock <- retry[Lock]( + haConfig.workerLockAquireRetryMillis, + haConfig.workerLockAquireMaxRetry, + )( + acquireLock(mainConnection, indexerWorkerLockId, LockMode.Exclusive) + ) + _ = logger.info( + "Step 3: acquire exclusive Indexer Worker Lock on main-connection - DONE" + ) + _ <- go(storageBackend.release(exclusiveWorkerLock)(mainConnection)) + _ = logger.info( + "Step 4: release exclusive Indexer Worker Lock on main-connection - DONE" + ) + mainLockChecker <- go[PollingChecker]( + new PollingChecker( + periodMillis = haConfig.mainLockCheckerPeriodMillis, + checkBody = acquireMainLock(mainConnection), + killSwitch = + handle.killSwitch, // meaning: this PollingChecker will shut down the main preemptableSequence + ) + ) + _ = logger.info( + "Step 5: activate periodic checker of the exclusive Indexer Main Lock on the main connection - DONE" + ) + _ = registerRelease { + logger.info( + "Releasing periodic checker of the exclusive Indexer Main Lock on the main connection..." + ) + mainLockChecker.close() + logger.info( + "Released periodic checker of the exclusive Indexer Main Lock on the main connection" + ) + } + protectedHandle <- goF(initializeExecution(workerConnection => { + // this is the checking routine on connection creation + // step 1: acquire shared worker-lock + logger.info(s"Preparing worker connection. Step 1: acquire lock.") + acquireLock(workerConnection, indexerWorkerLockId, LockMode.Shared) + // step 2: check if main connection still holds the lock + logger.info(s"Preparing worker connection. Step 2: checking main lock.") + mainLockChecker.check() + logger.info(s"Preparing worker connection DONE.") + })) + _ = logger.info("Step 6: initialize protected execution - DONE") + _ <- merge(protectedHandle) + } yield () + } + } + } + } +} + +object NoopHaCoordinator extends HaCoordinator { + override def protectedExecution( + initializeExecution: ConnectionInitializer => Future[Handle] + ): Handle = + Await.result(initializeExecution(_ => ()), Duration.Inf) +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/KillSwitchCaptor.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/KillSwitchCaptor.scala new file mode 100644 index 000000000000..ec7decb83cda --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/KillSwitchCaptor.scala @@ -0,0 +1,70 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.util.concurrent.atomic.AtomicReference + +import akka.stream.KillSwitch +import com.daml.logging.{ContextualizedLogger, LoggingContext} + +/** This KillSwitch captures it's usage in it's internal state, which can be queried. + * Captured state is available with the 'state' method. + * + * Rules of state transitions: + * - Shutdown is always the final state + * - From multiple aborts, the last abort wins + * + * With setDelegate() we can set a delegate KillSwitch, which to usage will be replayed + */ +class KillSwitchCaptor(implicit loggingContext: LoggingContext) extends KillSwitch { + import KillSwitchCaptor._ + import State._ + + private val logger = ContextualizedLogger.get(this.getClass) + + private val _state = new AtomicReference[State](Unused) + private val _delegate = new AtomicReference[Option[KillSwitch]](None) + + private def updateState(newState: Used): Unit = { + _state.getAndAccumulate( + newState, + { + case (Shutdown, _) => Shutdown + case (_, used) => used + }, + ) + () + } + + override def shutdown(): Unit = { + logger.info("Shutdown called!") + updateState(Shutdown) + _delegate.get.foreach { ks => + logger.info("Shutdown call delegated!") + ks.shutdown() + } + } + + override def abort(ex: Throwable): Unit = { + logger.info(s"Abort called! (${ex.getMessage})") + updateState(Aborted(ex)) + _delegate.get.foreach { ks => + logger.info(s"Abort call delegated! (${ex.getMessage})") + ks.abort(ex) + } + } + + def state: State = _state.get() + def setDelegate(delegate: Option[KillSwitch]): Unit = _delegate.set(delegate) +} + +object KillSwitchCaptor { + sealed trait State + object State { + case object Unused extends State + sealed trait Used extends State + case object Shutdown extends Used + final case class Aborted(t: Throwable) extends Used + } +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala new file mode 100644 index 000000000000..8294a7dcff98 --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PollingChecker.scala @@ -0,0 +1,65 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import java.util.{Timer, TimerTask} + +import akka.stream.KillSwitch +import com.daml.logging.{ContextualizedLogger, LoggingContext} + +import scala.util.{Failure, Success, Try} + +/** A simple host of checking. + * - This will ensure that checkBody is accessed by only one caller at a time + * - Does periodic checking + * - Exposes check() for on-demand checking from the outside + * - If whatever check() fails, it uses killSwitch with an abort + * - It is also an AutoCloseable to release internal resources + * + * @param periodMillis period of the checking, between each scheduled checks there will be so much delay + * @param checkBody the check function, Exception signals failed check + * @param killSwitch to abort if a check fails + */ +class PollingChecker( + periodMillis: Long, + checkBody: => Unit, + killSwitch: KillSwitch, +)(implicit loggingContext: LoggingContext) + extends AutoCloseable { + private val logger = ContextualizedLogger.get(this.getClass) + + private val timer = new Timer(true) + + timer.schedule( + new TimerTask { + override def run(): Unit = { + Try(check()) + () + } + }, + periodMillis, + periodMillis, + ) + + // This is a cruel approach for ensuring single threaded usage of checkBody. + // In theory this could have been made much more efficient: not enqueueing for a check of it's own, + // but collecting requests, and replying in batches. + // Current usage of this class does not necessarily motivate further optimizations: used from HaCoordinator + // to check Indexer Main Lock seems to be sufficiently fast even in peak scenario: the initialization of the + // complete pool. + def check(): Unit = synchronized { + logger.debug(s"Checking...") + Try(checkBody) match { + case Success(_) => + logger.debug(s"Check successful.") + + case Failure(ex) => + logger.info(s"Check failed (${ex.getMessage}). KillSwitch/abort called.") + killSwitch.abort(new Exception("check failed, killSwitch aborted", ex)) + throw ex + } + } + + def close(): Unit = timer.cancel() +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala new file mode 100644 index 000000000000..e353c4dbbe8b --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/ha/PreemptableSequence.scala @@ -0,0 +1,193 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.indexer.ha + +import akka.actor.Scheduler +import com.daml.logging.{ContextualizedLogger, LoggingContext} + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} + +/** PreemptableSequence is a helper to + * - facilitate the execution of a sequence of Futures, which can be stopped or aborted + * - provide a Handle for the client + * - manage the state to implement the above + */ +trait PreemptableSequence { + + /** Execute the preemptable sequence + * + * @param sequence This Future sequence needs to be constructed with the help of the SequenceHelper functions. + * @return the Handle, to observe and to interact with the sequence. + * - The completion future will only complete as soon the sequence and all registered release functionality finished as well + * - The Handle is available immediately + */ + def executeSequence(sequence: SequenceHelper => Future[_]): Handle +} + +/** A collection of helper functions to compose a preemptable-sequence + */ +trait SequenceHelper { + + /** Register at any point in time a synchronous release function, + * which will be ensured to run before the completion future of the handle completes. + * + * @param release the release lambda + */ + def registerRelease(release: => Unit): Unit + + /** Wrap a CBN (lazy) Future, so it is only started if the PreemptableSequence is not yet aborted/shut down. + * + * @param f The lazy Future + * @return the wrapped future + */ + def goF[T](f: => Future[T]): Future[T] + + /** Wrap a CBN (lazy) synchronous function in a Future, which is only started if the PreemptableSequence is not yet aborted/shut down. + * + * @param body The lazy synchronous body + * @return the wrapped future + */ + def go[T](body: => T): Future[T] + + /** Wrap a synchronous call into a Future sequence, which + * - will be preemptable + * - will retry to execute the body if Exception-s thrown + * + * @return the preemptable, retrying Future sequence + */ + def retry[T](waitMillisBetweenRetries: Long, maxAmountOfRetries: Long = -1)( + body: => T + ): Future[T] + + /** Delegate the preemptable-future sequence to another Handle + * - the completion Future future of the PreemptableSequence will only finish after this Hanlde finishes, + * and previously registered release functions all completed + * - KillSwitch events will be replayed to this handle + * - In case of abort/shutdown the PreemptableSequence's completion result will conform to the KillSwitch usage, + * not to the completion of this handle (although it will wait for it naturally) + * + * @param handle The handle to delegate to + * @return the completion of the Handle + */ + def merge(handle: Handle): Future[Unit] + + /** The handle of the PreemptableSequence. This handle is available for sequence construction as well. + * @return the Handle + */ + def handle: Handle +} + +object PreemptableSequence { + private val logger = ContextualizedLogger.get(this.getClass) + + /** @param executionContext this execution context will be used to: + * - execute future transformations + * - and encapsulate synchronous work in futures (this could be possibly blocking) + * Because of the possible blocking nature a dedicated pool is recommended. + */ + def apply(scheduler: Scheduler)(implicit + executionContext: ExecutionContext, + loggingContext: LoggingContext, + ): PreemptableSequence = { sequence => + val resultCompleted = Promise[Unit]() + val killSwitchCaptor = new KillSwitchCaptor + val resultHandle = Handle(resultCompleted.future, killSwitchCaptor) + var releaseStack: List[() => Future[Unit]] = Nil + + val helper: SequenceHelper = new SequenceHelper { + private def waitFor(delayMillis: Long): Future[Unit] = + goF(akka.pattern.after(FiniteDuration(delayMillis, "millis"), scheduler)(Future.unit)) + + override def registerRelease(release: => Unit): Unit = synchronized { + logger.info(s"Registered release function") + releaseStack = (() => Future(release)) :: releaseStack + } + + override def goF[T](f: => Future[T]): Future[T] = + killSwitchCaptor.state match { + case _: KillSwitchCaptor.State.Used => + // Failing Future here means we interrupt the Future sequencing. + // The failure itself is not important, since the returning Handle-s completion-future-s result is overridden in case KillSwitch was used. + logger.info(s"KillSwitch already used, interrupting sequence!") + Future.failed(new Exception("UsedKillSwitch")) + + case _ => + f + } + + override def go[T](body: => T): Future[T] = goF[T](Future(body)) + + override def retry[T](waitMillisBetweenRetries: Long, maxAmountOfRetries: Long)( + body: => T + ): Future[T] = + go(body).transformWith { + // since we check countdown to 0, starting from negative means unlimited retries + case Failure(ex) if maxAmountOfRetries == 0 => + logger.warn( + s"Maximum amount of retries reached ($maxAmountOfRetries) failing permanently.", + ex, + ) + Future.failed(ex) + case Success(t) => Future.successful(t) + case Failure(ex) => + logger.debug(s"Retrying (retires left: ${if (maxAmountOfRetries < 0) "unlimited" + else maxAmountOfRetries - 1}). Due to: ${ex.getMessage}") + waitFor(waitMillisBetweenRetries).flatMap(_ => + // Note: this recursion is out of stack + retry(waitMillisBetweenRetries, maxAmountOfRetries - 1)(body) + ) + } + + override def merge(handle: Handle): Future[Unit] = { + logger.info(s"Delegating KillSwitch upon merge.") + killSwitchCaptor.setDelegate(Some(handle.killSwitch)) + // for safety reasons. if between creation of that killSwitch and delegation there was a usage, we replay that after delegation (worst case multiple calls) + killSwitchCaptor.state match { + case KillSwitchCaptor.State.Shutdown => + logger.info(s"Replying ShutDown after merge.") + handle.killSwitch.shutdown() + case KillSwitchCaptor.State.Aborted(ex) => + logger.info(s"Replaying abort (${ex.getMessage}) after merge.") + handle.killSwitch.abort(ex) + case _ => () + } + val result = handle.completed + // not strictly needed for this use case, but in theory multiple preemptable stages are possible after each other + // this is needed to remove the delegation of the killSwitch after stage is complete + result.onComplete(_ => killSwitchCaptor.setDelegate(None)) + result + } + + override def handle: Handle = resultHandle + } + + def release: Future[Unit] = synchronized { + releaseStack match { + case Nil => Future.unit + case x :: xs => + releaseStack = xs + x().transformWith(_ => release) + } + } + + sequence(helper).transformWith(fResult => release.transform(_ => fResult)).onComplete { + case Success(_) => + killSwitchCaptor.state match { + case KillSwitchCaptor.State.Shutdown => resultCompleted.success(()) + case KillSwitchCaptor.State.Aborted(ex) => resultCompleted.failure(ex) + case _ => resultCompleted.success(()) + } + case Failure(ex) => + killSwitchCaptor.state match { + case KillSwitchCaptor.State.Shutdown => resultCompleted.success(()) + case KillSwitchCaptor.State.Aborted(ex) => resultCompleted.failure(ex) + case _ => resultCompleted.failure(ex) + } + } + + resultHandle + } +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala index c219cbbc68cf..6c8bf784e95d 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/indexer/parallel/ParallelIndexerFactory.scala @@ -4,7 +4,7 @@ package com.daml.platform.indexer.parallel import java.sql.Connection -import java.util.concurrent.TimeUnit +import java.util.concurrent.{Executors, TimeUnit} import akka.NotUsed import akka.stream.scaladsl.{Keep, Sink, Source} @@ -18,16 +18,20 @@ import com.daml.logging.LoggingContext.{withEnrichedLoggingContext, withEnriched import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.{InstrumentedSource, Metrics} import com.daml.platform.configuration.ServerRole +import com.daml.platform.indexer.ha.HaConfig +import com.daml.platform.indexer.ha.{HaCoordinator, Handle, NoopHaCoordinator} import com.daml.platform.indexer.parallel.AsyncSupport._ import com.daml.platform.indexer.{IndexFeedHandle, Indexer} import com.daml.platform.store.appendonlydao.DbDispatcher import com.daml.platform.store.appendonlydao.events.{CompressionStrategy, LfValueTranslation} +import com.daml.platform.store.backend +import com.daml.platform.store.backend.DataSourceStorageBackend.DataSourceConfig import com.daml.platform.store.backend.{DbDto, StorageBackend} -import com.daml.platform.store.{DbType, backend} import com.daml.resources +import com.google.common.util.concurrent.ThreadFactoryBuilder -import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.control.NonFatal object ParallelIndexerFactory { @@ -51,6 +55,8 @@ object ParallelIndexerFactory { tailingRateLimitPerSecond: Int, batchWithinMillis: Long, metrics: Metrics, + dataSourceConfig: DataSourceConfig, + haConfig: HaConfig, )(implicit loggingContext: LoggingContext): ResourceOwner[Indexer] = { for { inputMapperExecutor <- asyncPool( @@ -63,26 +69,39 @@ object ParallelIndexerFactory { "batching-pool", Some(metrics.daml.parallelIndexer.batching.executor -> metrics.registry), ) - dbDispatcher <- DbDispatcher - .owner( - serverRole = ServerRole.Indexer, - jdbcUrl = jdbcUrl, - connectionPoolSize = ingestionParallelism + 1, // + 1 for the tailing ledger_end updates - connectionTimeout = FiniteDuration( - 250, - "millis", - ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc) - metrics = metrics, - connectionAsyncCommitMode = DbType.AsynchronousCommit, - ) toDbDto = backend.UpdateToDbDto( participantId = participantId, translation = translation, compressionStrategy = compressionStrategy, ) + haCoordinator <- + if (storageBackend.dbLockSupported && haConfig.enable) + ResourceOwner + .forExecutorService(() => + ExecutionContext.fromExecutorService( + Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build, + ) + ) + ) + .map( + HaCoordinator.databaseLockBasedHaCoordinator( + // this DataSource will be used to spawn the main connection where we keep the Indexer Main Lock + // The life-cycle of such connections matches the life-cycle of a protectedExecution + dataSource = storageBackend.createDataSource(jdbcUrl), + storageBackend = storageBackend, + _, + scheduler = mat.system.scheduler, + haConfig = haConfig, + ) + ) + else + ResourceOwner.successful(NoopHaCoordinator) } yield { - val ingest: Long => Source[(Offset, Update), NotUsed] => Source[Unit, NotUsed] = - initialSeqId => + val ingest + : (Long, DbDispatcher) => Source[(Offset, Update), NotUsed] => Source[Unit, NotUsed] = + (initialSeqId, dbDispatcher) => source => BatchingParallelIngestionPipe( submissionBatchSize = submissionBatchSize, @@ -108,12 +127,11 @@ object ParallelIndexerFactory { .map(_ -> System.nanoTime()) ) .map(_ => ()) - .keepAlive( + .keepAlive( // TODO ha: remove as stable. This keepAlive approach was introduced for safety with async commit. This is still needed until HA is mandatory for Postgres to ensure safety with async commit. This will not needed anymore if HA is enabled by default, since the Ha mutual exclusion implementation with advisory locks makes impossible to let a db-shutdown go undetected. keepAliveMaxIdleDuration, () => if (dbDispatcher.currentHealth() == HealthStatus.healthy) { logger.debug("Indexer keep-alive: database connectivity OK") - () } else { logger .warn("Indexer keep-alive: database connectivity lost. Stopping indexing.") @@ -123,16 +141,56 @@ object ParallelIndexerFactory { }, ) - def subscribe(readService: ReadService): Future[Source[Unit, NotUsed]] = - dbDispatcher - .executeSql(metrics.daml.parallelIndexer.initialization)(storageBackend.initialize) - .map(initialized => - ingest(initialized.lastEventSeqId.getOrElse(0L))( - readService.stateUpdates(beginAfter = initialized.lastOffset) + def subscribe(resourceContext: ResourceContext)(readService: ReadService): Handle = { + implicit val rc: ResourceContext = resourceContext + implicit val ec: ExecutionContext = resourceContext.executionContext + implicit val matImplicit: Materializer = mat + haCoordinator.protectedExecution { connectionInitializer => + val killSwitchPromise = Promise[KillSwitch]() + + val completionFuture = DbDispatcher + .owner( + // this is tha DataSource which will be wrapped by HikariCP, and which will drive the ingestion + // therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator + dataSource = storageBackend.createDataSource( + jdbcUrl = jdbcUrl, + dataSourceConfig = dataSourceConfig, + connectionInitHook = Some(connectionInitializer.initialize), + ), + serverRole = ServerRole.Indexer, + connectionPoolSize = + ingestionParallelism + 1, // + 1 for the tailing ledger_end updates + connectionTimeout = FiniteDuration( + 250, + "millis", + ), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc) + metrics = metrics, ) - )(scala.concurrent.ExecutionContext.global) + .use { dbDispatcher => + dbDispatcher + .executeSql(metrics.daml.parallelIndexer.initialization)(storageBackend.initialize) + .flatMap { initialized => + val (killSwitch, completionFuture) = + ingest(initialized.lastEventSeqId.getOrElse(0L), dbDispatcher)( + readService.stateUpdates(beginAfter = initialized.lastOffset) + ) + .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) + .toMat(Sink.ignore)(Keep.both) + .run() + // the tricky bit: + // the future in the completion handler will be this one + // but the future for signaling for the HaCoordinator, that the protected execution is initialized needs to complete precisely here + killSwitchPromise.success(killSwitch) + completionFuture + } + } + + killSwitchPromise.future + .map(Handle(completionFuture.map(_ => ()), _)) + } + } - toIndexer(subscribe)(mat) + toIndexer(subscribe) } } @@ -298,20 +356,17 @@ object ParallelIndexerFactory { } def toIndexer( - ingestionPipeOn: ReadService => Future[Source[Unit, NotUsed]] - )(implicit mat: Materializer): Indexer = + ingestionPipeOn: ResourceContext => ReadService => Handle + ): Indexer = readService => new ResourceOwner[IndexFeedHandle] { override def acquire()(implicit context: ResourceContext ): resources.Resource[ResourceContext, IndexFeedHandle] = { Resource { - ingestionPipeOn(readService).map { pipe => - val (killSwitch, completionFuture) = pipe - .viaMat(KillSwitches.single)(Keep.right[NotUsed, UniqueKillSwitch]) - .toMat(Sink.ignore)(Keep.both) - .run() - new SubscriptionIndexFeedHandle(killSwitch, completionFuture.map(_ => ())) + Future { + val handle = ingestionPipeOn(context)(readService) + new SubscriptionIndexFeedHandle(handle.killSwitch, handle.completed) } } { handle => handle.killSwitch.shutdown() diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/DbType.scala b/ledger/participant-integration-api/src/main/scala/platform/store/DbType.scala index 27c1c8985cf1..93bd1a2395d2 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/DbType.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/DbType.scala @@ -51,6 +51,7 @@ private[platform] object DbType { sys.error(s"JDBC URL doesn't match any supported databases (h2, pg, oracle)") } + // TODO append-only: adapt AsyncCommit related configuration here sealed trait AsyncCommitMode { def setting: String } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala index 89312d6e1134..283b3022d4f0 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/DbDispatcher.scala @@ -13,16 +13,15 @@ import com.daml.logging.LoggingContext.withEnrichedLoggingContext import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.{DatabaseMetrics, Metrics} import com.daml.platform.configuration.ServerRole -import com.daml.platform.store.DbType import com.google.common.util.concurrent.ThreadFactoryBuilder +import javax.sql.DataSource import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal private[platform] final class DbDispatcher private ( - val maxConnections: Int, - connectionProvider: HikariJdbcConnectionProvider, + connectionProvider: JdbcConnectionProvider, executor: Executor, overallWaitTimer: Timer, overallExecutionTimer: Timer, @@ -83,22 +82,22 @@ private[platform] object DbDispatcher { private val logger = ContextualizedLogger.get(this.getClass) def owner( + dataSource: DataSource, serverRole: ServerRole, - jdbcUrl: String, connectionPoolSize: Int, connectionTimeout: FiniteDuration, metrics: Metrics, - connectionAsyncCommitMode: DbType.AsyncCommitMode, )(implicit loggingContext: LoggingContext): ResourceOwner[DbDispatcher] = for { - connectionProvider <- HikariJdbcConnectionProvider.owner( + hikariDataSource <- new HikariDataSourceOwner( + dataSource, serverRole, - jdbcUrl, + connectionPoolSize, connectionPoolSize, connectionTimeout, - metrics.registry, - connectionAsyncCommitMode, + Some(metrics.registry), ) + connectionProvider <- DataSourceConnectionProvider.owner(hikariDataSource) threadPoolName = s"daml.index.db.threadpool.connection.${serverRole.threadPoolSuffix}" executor <- ResourceOwner.forExecutorService(() => new InstrumentedExecutorService( @@ -116,7 +115,6 @@ private[platform] object DbDispatcher { ) ) } yield new DbDispatcher( - maxConnections = connectionPoolSize, connectionProvider = connectionProvider, executor = executor, overallWaitTimer = metrics.daml.index.db.waitAll, diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/HikariJdbcConnectionProvider.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/HikariJdbcConnectionProvider.scala index 6b3deff3d125..10698a799059 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/HikariJdbcConnectionProvider.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/HikariJdbcConnectionProvider.scala @@ -13,39 +13,31 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} import com.daml.logging.{ContextualizedLogger, LoggingContext} import com.daml.metrics.{DatabaseMetrics, Timed} import com.daml.platform.configuration.ServerRole -import com.daml.platform.store.DbType -import com.daml.platform.store.appendonlydao.HikariJdbcConnectionProvider._ import com.daml.timer.RetryStrategy import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import javax.sql.DataSource import scala.concurrent.Future import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.control.NonFatal -private[platform] final class HikariConnection( +private[platform] final class HikariDataSourceOwner( + dataSource: DataSource, serverRole: ServerRole, - jdbcUrl: String, minimumIdle: Int, maxPoolSize: Int, connectionTimeout: FiniteDuration, metrics: Option[MetricRegistry], - connectionPoolPrefix: String, - maxInitialConnectRetryAttempts: Int, - connectionAsyncCommitMode: DbType.AsyncCommitMode, + connectionPoolPrefix: String = "daml.index.db.connection", + maxInitialConnectRetryAttempts: Int = 600, )(implicit loggingContext: LoggingContext) - extends ResourceOwner[HikariDataSource] { + extends ResourceOwner[DataSource] { private val logger = ContextualizedLogger.get(this.getClass) override def acquire()(implicit context: ResourceContext): Resource[HikariDataSource] = { val config = new HikariConfig - val dbType = DbType.jdbcType(jdbcUrl) - - config.setJdbcUrl(jdbcUrl) - config.setDriverClassName(dbType.driver) - config.addDataSourceProperty("cachePrepStmts", "true") - config.addDataSourceProperty("prepStmtCacheSize", "128") - config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048") + config.setDataSource(dataSource) config.setAutoCommit(false) config.setMaximumPoolSize(maxPoolSize) config.setMinimumIdle(minimumIdle) @@ -53,8 +45,6 @@ private[platform] final class HikariConnection( config.setPoolName(s"$connectionPoolPrefix.${serverRole.threadPoolSuffix}") metrics.foreach(config.setMetricRegistry) - configureAsyncCommit(config, dbType) - // Hikari dies if a database connection could not be opened almost immediately // regardless of any connection timeout settings. We retry connections so that // Postgres and Sandbox can be started in any order. @@ -72,124 +62,66 @@ private[platform] final class HikariConnection( } )(conn => Future { conn.close() }) } - - private def configureAsyncCommit(config: HikariConfig, dbType: DbType): Unit = - if (dbType.supportsAsynchronousCommits) { - logger.info( - s"Creating Hikari connections with synchronous commit ${connectionAsyncCommitMode.setting}" - ) - config.setConnectionInitSql(s"SET synchronous_commit=${connectionAsyncCommitMode.setting}") - } else if (connectionAsyncCommitMode != DbType.SynchronousCommit) { - logger.warn( - s"Asynchronous commit setting ${connectionAsyncCommitMode.setting} is not compatible with ${dbType.name} database backend" - ) - } -} - -private[platform] object HikariConnection { - private val MaxInitialConnectRetryAttempts = 600 - private val ConnectionPoolPrefix: String = "daml.index.db.connection" - - def owner( - serverRole: ServerRole, - jdbcUrl: String, - minimumIdle: Int, - maxPoolSize: Int, - connectionTimeout: FiniteDuration, - metrics: Option[MetricRegistry], - connectionAsyncCommitMode: DbType.AsyncCommitMode, - )(implicit loggingContext: LoggingContext): HikariConnection = - new HikariConnection( - serverRole, - jdbcUrl, - minimumIdle, - maxPoolSize, - connectionTimeout, - metrics, - ConnectionPoolPrefix, - MaxInitialConnectRetryAttempts, - connectionAsyncCommitMode, - ) -} - -private[platform] class HikariJdbcConnectionProvider( - dataSource: HikariDataSource, - healthPoller: Timer, -) extends JdbcConnectionProvider { - private val transientFailureCount = new AtomicInteger(0) - - private val checkHealth = new TimerTask { - override def run(): Unit = { - try { - dataSource.getConnection().close() - transientFailureCount.set(0) - } catch { - case _: SQLTransientConnectionException => - val _ = transientFailureCount.incrementAndGet() - } - } - } - - healthPoller.schedule(checkHealth, 0, HealthPollingSchedule.toMillis) - - override def currentHealth(): HealthStatus = - if (transientFailureCount.get() < MaxTransientFailureCount) - Healthy - else - Unhealthy - - override def runSQL[T](databaseMetrics: DatabaseMetrics)(block: Connection => T): T = { - val conn = dataSource.getConnection() - conn.setAutoCommit(false) - try { - val res = Timed.value( - databaseMetrics.queryTimer, - block(conn), - ) - Timed.value( - databaseMetrics.commitTimer, - conn.commit(), - ) - res - } catch { - case e: SQLTransientConnectionException => - transientFailureCount.incrementAndGet() - throw e - case NonFatal(t) => - // Log the error in the caller with access to more logging context (such as the sql statement description) - conn.rollback() - throw t - } finally { - conn.close() - } - } } -private[platform] object HikariJdbcConnectionProvider { +object DataSourceConnectionProvider { private val MaxTransientFailureCount: Int = 5 private val HealthPollingSchedule: FiniteDuration = 1.second - def owner( - serverRole: ServerRole, - jdbcUrl: String, - maxConnections: Int, - connectionTimeout: FiniteDuration, - metrics: MetricRegistry, - connectionAsyncCommitMode: DbType.AsyncCommitMode = DbType.SynchronousCommit, - )(implicit loggingContext: LoggingContext): ResourceOwner[HikariJdbcConnectionProvider] = + def owner(dataSource: DataSource): ResourceOwner[JdbcConnectionProvider] = for { - // these connections should never time out as we have the same number of threads as connections - dataSource <- HikariConnection.owner( - serverRole, - jdbcUrl, - maxConnections, - maxConnections, - connectionTimeout, - Some(metrics), - connectionAsyncCommitMode, - ) healthPoller <- ResourceOwner.forTimer(() => - new Timer(s"${classOf[HikariJdbcConnectionProvider].getName}#healthPoller") + new Timer("DataSourceConnectionProvider#healthPoller") ) - } yield new HikariJdbcConnectionProvider(dataSource, healthPoller) + } yield { + val transientFailureCount = new AtomicInteger(0) + + val checkHealth = new TimerTask { + override def run(): Unit = { + try { + dataSource.getConnection().close() + transientFailureCount.set(0) + } catch { + case _: SQLTransientConnectionException => + val _ = transientFailureCount.incrementAndGet() + } + } + } + + healthPoller.schedule(checkHealth, 0, HealthPollingSchedule.toMillis) + + new JdbcConnectionProvider { + override def runSQL[T](databaseMetrics: DatabaseMetrics)(block: Connection => T): T = { + val conn = dataSource.getConnection() + conn.setAutoCommit(false) + try { + val res = Timed.value( + databaseMetrics.queryTimer, + block(conn), + ) + Timed.value( + databaseMetrics.commitTimer, + conn.commit(), + ) + res + } catch { + case e: SQLTransientConnectionException => + transientFailureCount.incrementAndGet() + throw e + case NonFatal(t) => + // Log the error in the caller with access to more logging context (such as the sql statement description) + conn.rollback() + throw t + } finally { + conn.close() + } + } + + override def currentHealth(): HealthStatus = + if (transientFailureCount.get() < MaxTransientFailureCount) + Healthy + else + Unhealthy + } + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala index 123058922f3a..d77b5b0d31b2 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/JdbcLedgerDao.scala @@ -124,7 +124,6 @@ private class JdbcLedgerDao( )(implicit loggingContext: LoggingContext): Future[Unit] = dbDispatcher.executeSql(metrics.daml.index.db.initializeLedgerParameters) { implicit connection => - storageBackend.enforceSynchronousCommit(connection) storageBackend.updateLedgerId(ledgerId.unwrap)(connection) } @@ -132,7 +131,6 @@ private class JdbcLedgerDao( participantId: ParticipantId )(implicit loggingContext: LoggingContext): Future[Unit] = dbDispatcher.executeSql(metrics.daml.index.db.initializeParticipantId) { implicit connection => - storageBackend.enforceSynchronousCommit(connection) storageBackend.updateParticipantId(participantId.unwrap)(connection) } @@ -387,7 +385,6 @@ private class JdbcLedgerDao( logger.info("Storing initial state") dbDispatcher.executeSql(metrics.daml.index.db.storeInitialStateFromScenario) { implicit connection => - storageBackend.enforceSynchronousCommit(connection) ledgerEntries.foreach { case (offset, entry) => entry match { case tx: LedgerEntry.Transaction => @@ -769,7 +766,6 @@ private[platform] object JdbcLedgerDao { servicesExecutionContext: ExecutionContext, metrics: Metrics, lfValueTranslationCache: LfValueTranslationCache.Cache, - jdbcAsyncCommitMode: DbType.AsyncCommitMode, enricher: Option[ValueEnricher], participantId: Ref.ParticipantId, )(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = { @@ -785,8 +781,6 @@ private[platform] object JdbcLedgerDao { servicesExecutionContext, metrics, lfValueTranslationCache, - jdbcAsyncCommitMode = - if (dbType.supportsAsynchronousCommits) jdbcAsyncCommitMode else DbType.SynchronousCommit, enricher = enricher, participantId = participantId, compressionStrategy = CompressionStrategy.none(metrics), // not needed @@ -860,22 +854,20 @@ private[platform] object JdbcLedgerDao { metrics: Metrics, lfValueTranslationCache: LfValueTranslationCache.Cache, validatePartyAllocation: Boolean = false, - jdbcAsyncCommitMode: DbType.AsyncCommitMode = DbType.SynchronousCommit, enricher: Option[ValueEnricher], participantId: Ref.ParticipantId, compressionStrategy: CompressionStrategy, - )(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = + )(implicit loggingContext: LoggingContext): ResourceOwner[LedgerDao] = { + val dbType = DbType.jdbcType(jdbcUrl) + val storageBackend = StorageBackend.of(dbType) for { dbDispatcher <- DbDispatcher.owner( + storageBackend.createDataSource(jdbcUrl), serverRole, - jdbcUrl, connectionPoolSize, connectionTimeout, metrics, - jdbcAsyncCommitMode, ) - dbType = DbType.jdbcType(jdbcUrl) - storageBackend = StorageBackend.of(dbType) } yield new JdbcLedgerDao( dbDispatcher, servicesExecutionContext, @@ -896,41 +888,7 @@ private[platform] object JdbcLedgerDao { participantId, storageBackend, ) - - // TODO H2 support -// object H2DatabaseQueries extends Queries { -// override protected[JdbcLedgerDao] val SQL_INSERT_COMMAND: String = -// """merge into participant_command_submissions pcs -// |using dual on deduplication_key = {deduplicationKey} -// |when not matched then -// | insert (deduplication_key, deduplicate_until) -// | values ({deduplicationKey}, {deduplicateUntil}) -// |when matched and pcs.deduplicate_until < {submittedAt} then -// | update set deduplicate_until={deduplicateUntil}""".stripMargin -// -// override protected[JdbcLedgerDao] val DUPLICATE_KEY_ERROR: String = -// "Unique index or primary key violation" -// -// override protected[JdbcLedgerDao] val SQL_TRUNCATE_TABLES: String = -// """set referential_integrity false; -// |truncate table configuration_entries; -// |truncate table package_entries; -// |truncate table parameters; -// |truncate table participant_command_completions; -// |truncate table participant_command_submissions; -// |truncate table participant_events; -// |truncate table participant_contracts; -// |truncate table participant_contract_witnesses; -// |truncate table parties; -// |truncate table party_entries; -// |set referential_integrity true; -// """.stripMargin -// -// /** H2 does not support asynchronous commits */ -// override protected[JdbcLedgerDao] def enforceSynchronousCommit(implicit -// conn: Connection -// ): Unit = () -// } + } val acceptType = "accept" val rejectType = "reject" diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala index d074a4b019e1..6b97269c0bc2 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/StorageBackend.scala @@ -14,6 +14,7 @@ import com.daml.ledger.offset.Offset import com.daml.ledger.participant.state.index.v2.PackageDetails import com.daml.lf.data.Ref import com.daml.lf.ledger.EventId +import com.daml.logging.LoggingContext import com.daml.platform import com.daml.platform.store.DbType import com.daml.platform.store.appendonlydao.events.{ContractId, EventsTable, Key, Raw} @@ -21,10 +22,11 @@ import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeP import com.daml.platform.store.backend.StorageBackend.RawTransactionEvent import com.daml.platform.store.backend.h2.H2StorageBackend import com.daml.platform.store.backend.oracle.OracleStorageBackend -import com.daml.platform.store.backend.postgresql.PostgresStorageBackend +import com.daml.platform.store.backend.postgresql.{PostgresDataSourceConfig, PostgresStorageBackend} import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry} import com.daml.platform.store.interfaces.LedgerDaoContractsReader.KeyState import com.daml.scalautil.NeverEqualsOverride +import javax.sql.DataSource import scala.util.Try @@ -44,9 +46,10 @@ trait StorageBackend[DB_BATCH] with DeduplicationStorageBackend with CompletionStorageBackend with ContractStorageBackend - with EventStorageBackend { + with EventStorageBackend + with DataSourceStorageBackend + with DBLockStorageBackend { def reset(connection: Connection): Unit - def enforceSynchronousCommit(connection: Connection): Unit def duplicateKeyError: String // TODO: Avoid brittleness of error message checks } @@ -236,6 +239,49 @@ object EventStorageBackend { ) } +trait DataSourceStorageBackend { + def createDataSource( + jdbcUrl: String, + dataSourceConfig: DataSourceStorageBackend.DataSourceConfig = + DataSourceStorageBackend.DataSourceConfig(), + connectionInitHook: Option[Connection => Unit] = None, + )(implicit loggingContext: LoggingContext): DataSource +} + +object DataSourceStorageBackend { + + /** @param postgresConfig configurations which apply only for the PostgresSQL backend + */ + case class DataSourceConfig( + postgresConfig: PostgresDataSourceConfig = PostgresDataSourceConfig() + ) +} + +trait DBLockStorageBackend { + def tryAcquire( + lockId: DBLockStorageBackend.LockId, + lockMode: DBLockStorageBackend.LockMode, + )(connection: Connection): Option[DBLockStorageBackend.Lock] + + def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean + + def lock(id: Int): DBLockStorageBackend.LockId + + def dbLockSupported: Boolean +} + +object DBLockStorageBackend { + case class Lock(lockId: LockId, lockMode: LockMode) + + trait LockId + + trait LockMode + object LockMode { + case object Exclusive extends LockMode + case object Shared extends LockMode + } +} + object StorageBackend { case class Params(ledgerEnd: Offset, eventSeqId: Long) diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala index fb60644b7906..8f0ee597dc72 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/CommonStorageBackend.scala @@ -957,4 +957,14 @@ private[backend] trait CommonStorageBackend[DB_BATCH] extends StorageBackend[DB_ and event_kind != 0 ORDER BY event_sequential_id ASC""" .asVectorOf(rawTransactionEventParser)(connection) + + protected def exe(statement: String): Connection => Unit = { connection => + val stmnt = connection.createStatement() + try { + stmnt.execute(statement) + () + } finally { + stmnt.close() + } + } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/InitHookDataSourceProxy.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/InitHookDataSourceProxy.scala new file mode 100644 index 000000000000..e25587ee886d --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/common/InitHookDataSourceProxy.scala @@ -0,0 +1,66 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend.common + +import java.io.PrintWriter +import java.sql.Connection +import java.util.logging.Logger + +import com.daml.logging.{ContextualizedLogger, LoggingContext} +import javax.sql.DataSource + +private[backend] object InitHookDataSourceProxy { + val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass) + + def apply( + delegate: DataSource, + initHooks: List[Connection => Unit], + )(implicit loggingContext: LoggingContext): DataSource = + if (initHooks.isEmpty) delegate + else InitHookDataSourceProxy(delegate, c => initHooks.foreach(_(c))) +} + +import com.daml.platform.store.backend.common.InitHookDataSourceProxy._ + +private[backend] case class InitHookDataSourceProxy( + delegate: DataSource, + initHook: Connection => Unit, +)(implicit loggingContext: LoggingContext) + extends DataSource { + + private def getConnection(connectionBody: => Connection): Connection = { + logger.debug(s"Creating new connection") + val connection = connectionBody + try { + logger.debug(s"Applying connection init hook") + initHook(connection) + } catch { + case t: Throwable => + logger.warn(s"Init hook execution failed", t) + throw t + } + logger.info(s"Init hook execution finished successfully, connection ready") + connection + } + + override def getConnection: Connection = getConnection(delegate.getConnection) + + override def getConnection(s: String, s1: String): Connection = getConnection( + delegate.getConnection(s, s1) + ) + + override def getLogWriter: PrintWriter = delegate.getLogWriter + + override def setLogWriter(printWriter: PrintWriter): Unit = delegate.setLogWriter(printWriter) + + override def setLoginTimeout(i: Int): Unit = delegate.setLoginTimeout(i) + + override def getLoginTimeout: Int = delegate.getLoginTimeout + + override def getParentLogger: Logger = delegate.getParentLogger + + override def unwrap[T](aClass: Class[T]): T = delegate.unwrap(aClass) + + override def isWrapperFor(aClass: Class[_]): Boolean = delegate.isWrapperFor(aClass) +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala index f0cdf2c335d5..572c012ae93c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/h2/H2StorageBackend.scala @@ -11,6 +11,7 @@ import anorm.SqlParser.get import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref +import com.daml.logging.LoggingContext import com.daml.platform.store.appendonlydao.events.{ContractId, Key} import com.daml.platform.store.backend.EventStorageBackend.FilterParams import com.daml.platform.store.backend.common.{ @@ -18,9 +19,17 @@ import com.daml.platform.store.backend.common.{ CommonStorageBackend, EventStorageBackendTemplate, EventStrategy, + InitHookDataSourceProxy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend, common} +import com.daml.platform.store.backend.{ + DBLockStorageBackend, + DataSourceStorageBackend, + DbDto, + StorageBackend, + common, +} +import javax.sql.DataSource private[backend] object H2StorageBackend extends StorageBackend[AppendOnlySchema.Batch] @@ -45,8 +54,6 @@ private[backend] object H2StorageBackend () } - override def enforceSynchronousCommit(connection: Connection): Unit = () // Not supported - override def duplicateKeyError: String = "Unique index or primary key violation" val SQL_INSERT_COMMAND: String = @@ -183,4 +190,28 @@ private[backend] object H2StorageBackend "false" else parties.view.map(p => s"array_contains($arrayColumn, '$p')").mkString("(", " or ", ")") + + override def createDataSource( + jdbcUrl: String, + dataSourceConfig: DataSourceStorageBackend.DataSourceConfig, + connectionInitHook: Option[Connection => Unit], + )(implicit loggingContext: LoggingContext): DataSource = { + val h2DataSource = new org.h2.jdbcx.JdbcDataSource() + h2DataSource.setUrl(jdbcUrl) + InitHookDataSourceProxy(h2DataSource, connectionInitHook.toList) + } + + override def tryAcquire( + lockId: DBLockStorageBackend.LockId, + lockMode: DBLockStorageBackend.LockMode, + )(connection: Connection): Option[DBLockStorageBackend.Lock] = + throw new UnsupportedOperationException("db level locks are not supported for H2") + + override def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean = + throw new UnsupportedOperationException("db level locks are not supported for H2") + + override def lock(id: Int): DBLockStorageBackend.LockId = + throw new UnsupportedOperationException("db level locks are not supported for H2") + + override def dbLockSupported: Boolean = false } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala index f92e2bacf0fa..563fb22c89f7 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/oracle/OracleStorageBackend.scala @@ -13,15 +13,25 @@ import com.daml.platform.store.backend.common.{ CommonStorageBackend, EventStorageBackendTemplate, EventStrategy, + InitHookDataSourceProxy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend, common} +import com.daml.platform.store.backend.{ + DBLockStorageBackend, + DataSourceStorageBackend, + DbDto, + StorageBackend, + common, +} import java.sql.Connection import java.time.Instant import com.daml.ledger.offset.Offset import com.daml.platform.store.backend.EventStorageBackend.FilterParams +import com.daml.logging.LoggingContext +import javax.sql.DataSource + private[backend] object OracleStorageBackend extends StorageBackend[AppendOnlySchema.Batch] with CommonStorageBackend[AppendOnlySchema.Batch] @@ -42,8 +52,6 @@ private[backend] object OracleStorageBackend "truncate table party_entries cascade", ).map(SQL(_)).foreach(_.execute()(connection)) - override def enforceSynchronousCommit(connection: Connection): Unit = () // Not supported - override def duplicateKeyError: String = "unique constraint" val SQL_INSERT_COMMAND: String = @@ -236,4 +244,73 @@ private[backend] object OracleStorageBackend .mkString(" OR ") + ")" } + override def createDataSource( + jdbcUrl: String, + dataSourceConfig: DataSourceStorageBackend.DataSourceConfig, + connectionInitHook: Option[Connection => Unit], + )(implicit loggingContext: LoggingContext): DataSource = { + val oracleDataSource = new oracle.jdbc.pool.OracleDataSource + oracleDataSource.setURL(jdbcUrl) + InitHookDataSourceProxy(oracleDataSource, connectionInitHook.toList) + } + + override def tryAcquire( + lockId: DBLockStorageBackend.LockId, + lockMode: DBLockStorageBackend.LockMode, + )(connection: Connection): Option[DBLockStorageBackend.Lock] = { + val oracleLockMode = lockMode match { + case DBLockStorageBackend.LockMode.Exclusive => "6" // "DBMS_LOCK.x_mode" + case DBLockStorageBackend.LockMode.Shared => "4" // "DBMS_LOCK.s_mode" + } + SQL""" + SELECT DBMS_LOCK.REQUEST( + id => ${oracleIntLockId(lockId)}, + lockmode => #$oracleLockMode, + timeout => 0 + ) FROM DUAL""" + .as(get[Int](1).single)(connection) match { + case 0 => Some(DBLockStorageBackend.Lock(lockId, lockMode)) + case 1 => None + case 2 => throw new Exception("DBMS_LOCK.REQUEST Error 2: Acquiring lock caused a deadlock!") + case 3 => throw new Exception("DBMS_LOCK.REQUEST Error 3: Parameter error as acquiring lock") + case 4 => Some(DBLockStorageBackend.Lock(lockId, lockMode)) + case 5 => + throw new Exception("DBMS_LOCK.REQUEST Error 5: Illegal lock handle as acquiring lock") + case unknown => throw new Exception(s"Invalid result from DBMS_LOCK.REQUEST: $unknown") + } + } + + override def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean = { + SQL""" + SELECT DBMS_LOCK.RELEASE( + id => ${oracleIntLockId(lock.lockId)} + ) FROM DUAL""" + .as(get[Int](1).single)(connection) match { + case 0 => true + case 3 => throw new Exception("DBMS_LOCK.RELEASE Error 3: Parameter error as releasing lock") + case 4 => throw new Exception("DBMS_LOCK.RELEASE Error 4: Trying to release not-owned lock") + case 5 => + throw new Exception("DBMS_LOCK.RELEASE Error 5: Illegal lock handle as releasing lock") + case unknown => throw new Exception(s"Invalid result from DBMS_LOCK.RELEASE: $unknown") + } + } + + case class OracleLockId(id: Int) extends DBLockStorageBackend.LockId { + // respecting Oracle limitations: https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_lock.htm#ARPLS021 + assert(id >= 0) + assert(id <= 1073741823) + } + + private def oracleIntLockId(lockId: DBLockStorageBackend.LockId): Int = + lockId match { + case OracleLockId(id) => id + case unknown => + throw new Exception( + s"LockId $unknown not supported. Probable cause: LockId was created by a different StorageBackend" + ) + } + + override def lock(id: Int): DBLockStorageBackend.LockId = OracleLockId(id) + + override def dbLockSupported: Boolean = true } diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDataSourceConfig.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDataSourceConfig.scala new file mode 100644 index 000000000000..3cf5eac770dc --- /dev/null +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresDataSourceConfig.scala @@ -0,0 +1,21 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.platform.store.backend.postgresql + +import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig.SynchronousCommitValue + +case class PostgresDataSourceConfig( + synchronousCommit: Option[SynchronousCommitValue] = None +) + +object PostgresDataSourceConfig { + sealed abstract class SynchronousCommitValue(val pgSqlName: String) + object SynchronousCommitValue { + case object On extends SynchronousCommitValue("on") + case object Off extends SynchronousCommitValue("off") + case object RemoteWrite extends SynchronousCommitValue("remote_write") + case object RemoteApply extends SynchronousCommitValue("remote_apply") + case object Local extends SynchronousCommitValue("local") + } +} diff --git a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala index 8060fdf03122..99703b50481c 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/store/backend/postgresql/PostgresStorageBackend.scala @@ -11,6 +11,7 @@ import anorm.SqlParser.get import com.daml.ledger.api.v1.command_completion_service.CompletionStreamResponse import com.daml.ledger.offset.Offset import com.daml.lf.data.Ref +import com.daml.logging.LoggingContext import com.daml.platform.store.appendonlydao.events.{ContractId, Key, Party} import com.daml.platform.store.backend.EventStorageBackend.FilterParams import com.daml.platform.store.backend.common.{ @@ -18,9 +19,18 @@ import com.daml.platform.store.backend.common.{ CommonStorageBackend, EventStorageBackendTemplate, EventStrategy, + InitHookDataSourceProxy, TemplatedStorageBackend, } -import com.daml.platform.store.backend.{DbDto, StorageBackend, common} +import com.daml.platform.store.backend.{ + DBLockStorageBackend, + DataSourceStorageBackend, + DbDto, + StorageBackend, + common, +} +import javax.sql.DataSource +import org.postgresql.ds.PGSimpleDataSource private[backend] object PostgresStorageBackend extends StorageBackend[AppendOnlySchema.Batch] @@ -74,17 +84,6 @@ private[backend] object PostgresStorageBackend () } - override def enforceSynchronousCommit(connnection: Connection): Unit = { - val statement = - connnection.prepareStatement("SET LOCAL synchronous_commit = 'on'") - try { - statement.execute() - () - } finally { - statement.close() - } - } - override val duplicateKeyError: String = "duplicate key" override def commandCompletions( @@ -201,4 +200,58 @@ private[backend] object PostgresStorageBackend // TODO append-only: remove as part of ContractStorageBackend consolidation private def arrayIntersectionWhereClause(arrayColumn: String, parties: Set[Ref.Party]): String = s"$arrayColumn::text[] && array[${format(parties)}]::text[]" + + override def createDataSource( + jdbcUrl: String, + dataSourceConfig: DataSourceStorageBackend.DataSourceConfig, + connectionInitHook: Option[Connection => Unit], + )(implicit loggingContext: LoggingContext): DataSource = { + val pgSimpleDataSource = new PGSimpleDataSource() + pgSimpleDataSource.setUrl(jdbcUrl) + val hookFunctions = List( + dataSourceConfig.postgresConfig.synchronousCommit.toList + .map(synchCommitValue => exe(s"SET synchronous_commit TO ${synchCommitValue.pgSqlName}")), + connectionInitHook.toList, + ).flatten + InitHookDataSourceProxy(pgSimpleDataSource, hookFunctions) + } + + override def tryAcquire( + lockId: DBLockStorageBackend.LockId, + lockMode: DBLockStorageBackend.LockMode, + )(connection: Connection): Option[DBLockStorageBackend.Lock] = { + val lockFunction = lockMode match { + case DBLockStorageBackend.LockMode.Exclusive => "pg_try_advisory_lock" + case DBLockStorageBackend.LockMode.Shared => "pg_try_advisory_lock_shared" + } + SQL"SELECT #$lockFunction(${pgBigintLockId(lockId)})" + .as(get[Boolean](1).single)(connection) match { + case true => Some(DBLockStorageBackend.Lock(lockId, lockMode)) + case false => None + } + } + + override def release(lock: DBLockStorageBackend.Lock)(connection: Connection): Boolean = { + val lockFunction = lock.lockMode match { + case DBLockStorageBackend.LockMode.Exclusive => "pg_advisory_unlock" + case DBLockStorageBackend.LockMode.Shared => "pg_advisory_unlock_shared" + } + SQL"SELECT #$lockFunction(${pgBigintLockId(lock.lockId)})" + .as(get[Boolean](1).single)(connection) + } + + case class PGLockId(id: Long) extends DBLockStorageBackend.LockId + + private def pgBigintLockId(lockId: DBLockStorageBackend.LockId): Long = + lockId match { + case PGLockId(id) => id + case unknown => + throw new Exception( + s"LockId $unknown not supported. Probable cause: LockId was created by a different StorageBackend" + ) + } + + override def lock(id: Int): DBLockStorageBackend.LockId = PGLockId(id.toLong) + + override def dbLockSupported: Boolean = true } diff --git a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala index 680c6840604e..ccabfa56c753 100644 --- a/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala +++ b/ledger/participant-integration-api/src/test/lib/scala/platform/store/dao/JdbcLedgerDaoBackend.scala @@ -76,7 +76,6 @@ private[dao] trait JdbcLedgerDaoBackend extends AkkaBeforeAndAfterAll { servicesExecutionContext = executionContext, metrics = new Metrics(new MetricRegistry), lfValueTranslationCache = LfValueTranslationCache.Cache.none, - jdbcAsyncCommitMode = DbType.AsynchronousCommit, enricher = Some(new ValueEnricher(new Engine())), participantId = JdbcLedgerDaoBackend.TestParticipantIdRef, ) diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala index dd947d1e1a3b..47b453339205 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/Config.scala @@ -44,6 +44,7 @@ final case class Config[Extra]( enableAppendOnlySchema: Boolean, // TODO append-only: remove after removing support for the current (mutating) schema enableMutableContractStateCache: Boolean, enableInMemoryFanOutForLedgerApi: Boolean, + enableHa: Boolean, // TODO ha: remove after stable extra: Extra, ) { def withTlsConfig(modify: TlsConfiguration => TlsConfiguration): Config[Extra] = @@ -76,6 +77,7 @@ object Config { enableAppendOnlySchema = false, enableMutableContractStateCache = false, enableInMemoryFanOutForLedgerApi = false, + enableHa = false, extra = extra, ) @@ -535,6 +537,15 @@ object Config { ) else success ) + + // TODO ha: remove after stable + opt[Unit]("index-ha-unsafe") + .optional() + .hidden() + .text( + s"Use the experimental High Availability feature with the indexer. Should not be used in production." + ) + .action((_, config) => config.copy(enableHa = true)) } extraOptions(parser) parser diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala index 9f859b5e94bb..6a0823b76b97 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/LedgerFactory.scala @@ -51,6 +51,7 @@ trait ConfigProvider[ExtraConfig] { tailingRateLimitPerSecond = participantConfig.indexerConfig.tailingRateLimitPerSecond, batchWithinMillis = participantConfig.indexerConfig.batchWithinMillis, enableCompression = participantConfig.indexerConfig.enableCompression, + haConfig = participantConfig.indexerConfig.haConfig.copy(enable = config.enableHa), ) def apiServerConfig( diff --git a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ParticipantIndexerConfig.scala b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ParticipantIndexerConfig.scala index 1761790f118f..458d4f11a1b8 100644 --- a/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ParticipantIndexerConfig.scala +++ b/ledger/participant-state/kvutils/app/src/main/scala/com/daml/ledger/participant/state/kvutils/app/ParticipantIndexerConfig.scala @@ -4,6 +4,7 @@ package com.daml.ledger.participant.state.kvutils.app import com.daml.platform.indexer.IndexerConfig +import com.daml.platform.indexer.ha.HaConfig import scala.concurrent.duration.FiniteDuration @@ -25,6 +26,7 @@ final case class ParticipantIndexerConfig( tailingRateLimitPerSecond: Int = ParticipantIndexerConfig.DefaultTailingRateLimitPerSecond, batchWithinMillis: Long = ParticipantIndexerConfig.DefaultBatchWithinMillis, enableCompression: Boolean = ParticipantIndexerConfig.DefaultEnableCompression, + haConfig: HaConfig = HaConfig(), ) object ParticipantIndexerConfig { diff --git a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpecAppendOnly.scala b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpecAppendOnly.scala index 2511adc40350..19b042fa5c46 100644 --- a/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpecAppendOnly.scala +++ b/ledger/sandbox-classic/src/test/suite/scala/platform/sandbox/stores/ledger/sql/SqlLedgerSpecAppendOnly.scala @@ -6,7 +6,6 @@ package com.daml.platform.sandbox.stores.ledger.sql import java.io.File import java.time.Instant -import ch.qos.logback.classic.Level import com.daml.api.util.TimeProvider import com.daml.bazeltools.BazelRunfiles.rlocation import com.daml.daml_lf_dev.DamlLf @@ -211,23 +210,6 @@ final class SqlLedgerSpecAppendOnly ledger.currentHealth() should be(Healthy) } } - - /** Workaround test for asserting that PostgreSQL asynchronous commits are disabled in - * [[com.daml.platform.store.dao.JdbcLedgerDao]] transactions when used from [[SqlLedger]]. - * - * NOTE: This is needed for ensuring durability guarantees of Daml-on-SQL. - */ - "does not use async commit when building JdbcLedgerDao" in { - for { - _ <- createSqlLedger(validatePartyAllocation = false) - } yield { - val hikariDataSourceLogs = - LogCollector.read[this.type]("com.daml.platform.store.appendonlydao.HikariConnection") - hikariDataSourceLogs should contain( - Level.INFO -> "Creating Hikari connections with synchronous commit ON" - ) - } - } } private def createSqlLedger(validatePartyAllocation: Boolean): Future[Ledger] = diff --git a/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala b/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala index 1c3c5bebb716..6bd684fccc6c 100644 --- a/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala +++ b/libs-scala/oracle-testing/src/main/scala/testing/oracle/OracleAround.scala @@ -40,7 +40,7 @@ trait OracleAround { val con = use( DriverManager.getConnection( s"jdbc:oracle:thin:@localhost:$port/ORCLPDB1", - systemUser, + "sys as sysdba", // TODO this is needed for being able to grant the execute access for the sys.dbms_lock below. Consider making this configurable systemPwd, ) ) @@ -51,6 +51,11 @@ trait OracleAround { s"""grant create table, create materialized view, create view, create procedure, create sequence, create type to $name""" ) stmt.execute(s"""alter user $name quota unlimited on users""") + + // for DBMS_LOCK access + stmt.execute(s"""GRANT EXECUTE ON SYS.DBMS_LOCK TO $name""") + stmt.execute(s"""GRANT SELECT ON V_$$MYSTAT TO $name""") + stmt.execute(s"""GRANT SELECT ON V_$$LOCK TO $name""") }.get User(name, pwd) }