Skip to content

Commit

Permalink
Adds Indexer state to GRPC health checks [DPP-434] (#9951) (#9961)
Browse files Browse the repository at this point in the history
* Adds Indexer state to GRPC health checks [DPP-434]

CHANGELOG_BEGIN
[Integration Kit] The state of the participant indexer can now be checked via the GRPC health endpoint
CHANGELOG_END

* Addressed review comments
  • Loading branch information
tudor-da authored Jun 11, 2021
1 parent a08f6ea commit a9d0473
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference

import akka.actor.Scheduler
import akka.pattern.after
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
import com.daml.ledger.resources.{Resource, ResourceContext}
import com.daml.logging.{ContextualizedLogger, LoggingContext}

Expand All @@ -25,6 +26,8 @@ private[indexer] final class RecoveringIndexer(
scheduler: Scheduler,
executionContext: ExecutionContext,
restartDelay: FiniteDuration,
updateHealthStatus: HealthStatus => Unit,
healthReporter: ReportsHealth,
)(implicit loggingContext: LoggingContext) {
private implicit val ec: ExecutionContext = executionContext
private implicit val resourceContext: ResourceContext = ResourceContext(executionContext)
Expand All @@ -36,14 +39,15 @@ private[indexer] final class RecoveringIndexer(
* @param subscribe A function that creates a new indexer and calls subscribe() on it.
* @return A future that completes with [[akka.Done]] when the indexer finishes processing all read service updates.
*/
def start(subscribe: () => Resource[IndexFeedHandle]): Resource[Future[Unit]] = {
def start(subscribe: () => Resource[IndexFeedHandle]): Resource[(ReportsHealth, Future[Unit])] = {
val complete = Promise[Unit]()

logger.info("Starting Indexer Server")
val subscription = new AtomicReference[Resource[IndexFeedHandle]](null)

val firstSubscription = subscribe().map(handle => {
logger.info("Started Indexer Server")
updateHealthStatus(HealthStatus.healthy)
handle
})
subscription.set(firstSubscription)
Expand Down Expand Up @@ -81,6 +85,7 @@ private[indexer] final class RecoveringIndexer(
if (subscription.compareAndSet(oldSubscription, newSubscription)) {
resubscribeOnFailure(newSubscription)
newSubscription.asFuture.map { _ =>
updateHealthStatus(HealthStatus.healthy)
logger.info("Restarted Indexer Server")
}
} else { // we must have stopped the server during the restart
Expand All @@ -107,21 +112,21 @@ private[indexer] final class RecoveringIndexer(
complete.future

case Failure(exception) =>
logger.error(
reportErrorState(
s"Error while running indexer, restart scheduled after $restartDelay",
exception,
)

currentSubscription
.release()
.recover { case _ => () } // releasing may yield the same error as above
.flatMap(_ => resubscribe(currentSubscription))
}
case Failure(exception) =>
logger
.error(
s"Error while starting indexer, restart scheduled after $restartDelay",
exception,
)
reportErrorState(
s"Error while starting indexer, restart scheduled after $restartDelay",
exception,
)
resubscribe(currentSubscription)
()
}
Expand All @@ -130,16 +135,40 @@ private[indexer] final class RecoveringIndexer(
subscription
.get()
.asFuture
.transform(_ => Success(complete.future))
.transform(_ => Success(healthReporter -> complete.future))
)(_ => {
logger.info("Stopping Indexer Server")
subscription
.getAndSet(null)
.release()
.flatMap(_ => complete.future)
.map(_ => {
updateHealthStatus(HealthStatus.unhealthy)
logger.info("Stopped Indexer Server")
})
})
}

private def reportErrorState(errorMessage: String, exception: Throwable): Unit = {
updateHealthStatus(HealthStatus.unhealthy)
logger.error(errorMessage, exception)
}
}

private[indexer] object RecoveringIndexer {
def apply(scheduler: Scheduler, executionContext: ExecutionContext, restartDelay: FiniteDuration)(
implicit loggingContext: LoggingContext
): RecoveringIndexer = {
val healthStatusRef = new AtomicReference[HealthStatus](HealthStatus.unhealthy)

val healthReporter: ReportsHealth = () => healthStatusRef.get()

new RecoveringIndexer(
scheduler = scheduler,
executionContext = executionContext,
restartDelay = restartDelay,
updateHealthStatus = healthStatusRef.set,
healthReporter = healthReporter,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.platform.indexer

import akka.stream.Materializer
import com.daml.ledger.api.health.{HealthStatus, ReportsHealth}
import com.daml.ledger.participant.state.v1.ReadService
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.{ContextualizedLogger, LoggingContext}
Expand All @@ -20,11 +21,11 @@ final class StandaloneIndexerServer(
metrics: Metrics,
lfValueTranslationCache: LfValueTranslation.Cache,
)(implicit materializer: Materializer, loggingContext: LoggingContext)
extends ResourceOwner[Unit] {
extends ResourceOwner[ReportsHealth] {

private val logger = ContextualizedLogger.get(this.getClass)

override def acquire()(implicit context: ResourceContext): Resource[Unit] = {
override def acquire()(implicit context: ResourceContext): Resource[ReportsHealth] = {
val indexerFactory = new JdbcIndexer.Factory(
ServerRole.Indexer,
config,
Expand All @@ -33,43 +34,46 @@ final class StandaloneIndexerServer(
metrics,
lfValueTranslationCache,
)
val indexer = new RecoveringIndexer(
val indexer = RecoveringIndexer(
materializer.system.scheduler,
materializer.executionContext,
config.restartDelay,
)
config.startupMode match {
case IndexerStartupMode.MigrateOnly =>
Resource.unit
Resource.successful(() => HealthStatus.healthy)
case IndexerStartupMode.MigrateAndStart =>
Resource
.fromFuture(indexerFactory.migrateSchema(config.allowExistingSchema))
.flatMap(startIndexer(indexer, _))
.map { _ =>
.map { healthReporter =>
logger.debug("Waiting for the indexer to initialize the database.")
healthReporter
}
case IndexerStartupMode.ResetAndStart =>
Resource
.fromFuture(indexerFactory.resetSchema())
.flatMap(startIndexer(indexer, _))
.map { _ =>
.map { healthReporter =>
logger.debug("Waiting for the indexer to initialize the database.")
healthReporter
}
case IndexerStartupMode.ValidateAndStart =>
Resource
.fromFuture(indexerFactory.validateSchema())
.flatMap(startIndexer(indexer, _))
.map { _ =>
.map { healthReporter =>
logger.debug("Waiting for the indexer to initialize the database.")
healthReporter
}
}
}

private def startIndexer(
indexer: RecoveringIndexer,
initializedIndexerFactory: ResourceOwner[JdbcIndexer],
)(implicit context: ResourceContext): Resource[Unit] =
)(implicit context: ResourceContext): Resource[ReportsHealth] =
indexer
.start(() => initializedIndexerFactory.flatMap(_.subscription(readService)).acquire())
.map(_ => ())
.map { case (indexerHealthReporter, _) => indexerHealthReporter }
}
Loading

0 comments on commit a9d0473

Please sign in to comment.