Skip to content

Commit

Permalink
Increase timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rautenrieth-da committed Sep 27, 2021
1 parent 1c334b1 commit 4b4302c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait IndexerStabilitySpec
implicit val ec: ExecutionContext = system.dispatcher
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

behavior of "redundant parallel indexers"
behavior of "concurrently running indexers"

it should "correctly work in high availability mode" in {
val updatesPerSecond = 10 // Number of updates per second produced by the read service
Expand All @@ -50,16 +50,11 @@ trait IndexerStabilitySpec
var abortedIndexer: Option[ReadServiceAndIndexer] = None
(1 to restartIterations).foreach(_ => {
// Assert that there is exactly one indexer running
val activeIndexer = eventuallyAfterRecovery { indexers.runningIndexer }
val activeIndexer = findActiveIndexer(indexers)
info(s"Indexer ${activeIndexer.readService.name} is running")

// The indexer should appear "healthy"
eventually {
assert(
activeIndexer.indexing.currentHealth() == HealthStatus.healthy,
"Running indexer should be healthy",
)
}
assertHealthStatus(activeIndexer, HealthStatus.healthy)
info(s"Indexer ${activeIndexer.readService.name} appears to be healthy")

// At this point, the indexer that was aborted by the previous iteration can be reset,
Expand All @@ -75,20 +70,13 @@ trait IndexerStabilitySpec
info(s"ReadService ${activeIndexer.readService.name} was aborted")

// The indexer should appear "unhealthy"
eventually {
assert(
activeIndexer.indexing.currentHealth() == HealthStatus.unhealthy,
"Aborted indexer should be unhealthy",
)
}
assertHealthStatus(activeIndexer, HealthStatus.unhealthy)
info(s"Indexer ${activeIndexer.readService.name} appears to be unhealthy")
})

// Stop all indexers, in order to stop all database operations
indexers.indexers.foreach(_.readService.abort(simulatedFailure()))
eventually {
indexers.indexers.foreach(_.indexing.currentHealth() == HealthStatus.unhealthy)
}
indexers.indexers.foreach(assertHealthStatus(_, HealthStatus.unhealthy))
info(s"All ReadServices were aborted")

// Verify the integrity of the index database
Expand All @@ -104,16 +92,33 @@ trait IndexerStabilitySpec
.map(_ => succeed)
}

// It takes some time until a new indexer takes over after a failure,
// the default ScalaTest timeout for eventually() is too short for this.
private def eventuallyAfterRecovery[T](
fun: => T
)(implicit pos: org.scalactic.source.Position): T = {
// Finds the only indexer that has subscribed to the ReadService stream
private def findActiveIndexer(indexers: Indexers): ReadServiceAndIndexer = {
// It takes some time until a new indexer takes over after a failure.
// The default ScalaTest timeout for eventually() is too short for this.
implicit val patienceConfig: PatienceConfig = PatienceConfig(
timeout = scaled(Span(10, Seconds)),
interval = scaled(Span(100, Millis)),
)
eventually(fun)
eventually {
indexers.runningIndexer
}
}

// Asserts that the given indexer changes its health status to the given value
private def assertHealthStatus(
indexer: ReadServiceAndIndexer,
status: HealthStatus,
)(implicit pos: org.scalactic.source.Position): Unit = {
// It takes some time until an indexer changes its health status after a failure.
// The default ScalaTest timeout for eventually() is too short for this.
implicit val patienceConfig: PatienceConfig = PatienceConfig(
timeout = scaled(Span(1, Seconds)),
interval = scaled(Span(100, Millis)),
)
eventually {
assert(indexer.indexing.currentHealth() == status)
}
}

private def simulatedFailure() = new RuntimeException("Simulated failure")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object IndexerStabilityTestFixture {
.acquire()

// Start N indexers that all compete for the same database
_ = logger.info(s"Starting $indexerCount indexers")
_ = logger.info(s"Starting $indexerCount indexers for database $jdbcUrl")
indexers <- Resource
.sequence(
(1 to indexerCount).toList
Expand Down

0 comments on commit 4b4302c

Please sign in to comment.