Skip to content

Commit

Permalink
Log execution context throwables as error (#11702)
Browse files Browse the repository at this point in the history
* Log execution context throwables as error

changelog_begin
changelog_end

* Review feedback and reuse more logging contexts
  • Loading branch information
oliverse-da authored Nov 15, 2021
1 parent 69471d6 commit bf86ee4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.util.concurrent.Executors

import com.codahale.metrics.{InstrumentedExecutorService, MetricRegistry}
import com.daml.ledger.resources.ResourceOwner
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.MetricName
import com.google.common.util.concurrent.ThreadFactoryBuilder

Expand All @@ -30,25 +31,33 @@ object AsyncSupport {
size: Int,
namePrefix: String,
withMetric: Option[(MetricName, MetricRegistry)] = None,
): ResourceOwner[Executor] =
)(implicit loggingContext: LoggingContext): ResourceOwner[Executor] =
ResourceOwner
.forExecutorService(() =>
ExecutionContext.fromExecutorService {
val executor = Executors.newFixedThreadPool(
size,
new ThreadFactoryBuilder().setNameFormat(s"$namePrefix-%d").build,
)
withMetric match {
case Some((metricName, metricRegistry)) =>
new InstrumentedExecutorService(
executor,
metricRegistry,
metricName,
)

case None => executor
}
}
ExecutionContext.fromExecutorService(
{
val executor = Executors.newFixedThreadPool(
size,
new ThreadFactoryBuilder()
.setNameFormat(s"$namePrefix-%d")
.build,
)
withMetric match {
case Some((metricName, metricRegistry)) =>
new InstrumentedExecutorService(
executor,
metricRegistry,
metricName,
)

case None => executor
}
},
throwable =>
ContextualizedLogger
.get(this.getClass)
.error(s"ExecutionContext ${namePrefix} has failed with an exception", throwable),
)
)
.map(Executor.forExecutionContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.Executors
import akka.stream.{KillSwitch, Materializer}
import com.daml.ledger.participant.state.v2.ReadService
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.logging.LoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.configuration.ServerRole
import com.daml.platform.indexer.ha.{HaConfig, HaCoordinator, Handle, NoopHaCoordinator}
Expand Down Expand Up @@ -68,7 +68,14 @@ object ParallelIndexerFactory {
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat(s"ha-coordinator-%d").build,
)
),
throwable =>
ContextualizedLogger
.get(this.getClass)
.error(
s"ExecutionContext has failed with an exception",
throwable,
),
)
)
timer <- ResourceOwner.forTimer(() => new Timer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ private[platform] final class DbDispatcher private (
executor: Executor,
overallWaitTimer: Timer,
overallExecutionTimer: Timer,
) extends ReportsHealth {
)(implicit loggingContext: LoggingContext)
extends ReportsHealth {

private val logger = ContextualizedLogger.get(this.getClass)
private val executionContext = ExecutionContext.fromExecutor(executor)
private val executionContext = ExecutionContext.fromExecutor(
executor,
throwable => logger.error("ExecutionContext has failed with an exception", throwable),
)

override def currentHealth(): HealthStatus =
connectionProvider.currentHealth()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.daml.resources
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{Executors, TimeUnit}

import com.daml.logging.ContextualizedLogger
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.logging.LoggingContext.newLoggingContext
import com.daml.resources.ProgramResource._

Expand All @@ -31,7 +31,15 @@ final class ProgramResource[Context: HasExecutionContext, T](
def run(newContext: ExecutionContext => Context): Unit = {
newLoggingContext { implicit loggingContext =>
val resource = {
implicit val context: Context = newContext(ExecutionContext.fromExecutor(executorService))
implicit val context: Context = newContext(
ExecutionContext.fromExecutor(
executorService,
throwable =>
LoggingContext.newLoggingContext { implicit loggingContext =>
logger.error("ExecutionContext has failed with an exception", throwable)
},
)
)
Try(owner.acquire()).fold(exception => PureResource(Future.failed(exception)), identity)
}

Expand Down

0 comments on commit bf86ee4

Please sign in to comment.