Skip to content

Commit

Permalink
Minor changes based on review
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
nmarton-da committed Sep 15, 2021
1 parent aa061aa commit eecea8c
Showing 1 changed file with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class HaCoordinatorSpec
with AkkaBeforeAndAfterAll
with Eventually {
implicit val ec: ExecutionContext =
system.dispatcher // we need this to not us the default EX which is coming from AsyncTestSuite, and which is serial
system.dispatcher // we need this to not use the default EC which is coming from AsyncTestSuite, and which is serial
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting
private val logger = ContextualizedLogger.get(this.getClass)
private val timer = new Timer(true)
Expand Down Expand Up @@ -55,7 +55,7 @@ class HaCoordinatorSpec
info("As HA initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
executionCompletedPromise.success(())
info("As execution is completed")
Expand All @@ -77,7 +77,7 @@ class HaCoordinatorSpec
info("As HA initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
executionCompletedPromise.failure(new Exception("failed execution"))
info("As execution completes with failure")
Expand All @@ -101,7 +101,7 @@ class HaCoordinatorSpec
info("Protected Handle is not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Expand Down Expand Up @@ -149,7 +149,7 @@ class HaCoordinatorSpec
info("Connection initializer works")
protectedHandle.killSwitch.shutdown()
info("As graceful shutdown started")
executionHandlePromise.success(())
completeExecutionInitialization()
info("And as finishing protected execution initialization")
connectionInitializer.initialize(new TestConnection)
info(
Expand Down Expand Up @@ -189,7 +189,7 @@ class HaCoordinatorSpec
info("Protected Handle is not completed")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Expand Down Expand Up @@ -253,7 +253,7 @@ class HaCoordinatorSpec
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
executionHandlePromise.success(()) // cleanup
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
Expand Down Expand Up @@ -305,7 +305,7 @@ class HaCoordinatorSpec
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
executionHandlePromise.success(()) // cleanup
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
Expand Down Expand Up @@ -335,7 +335,7 @@ class HaCoordinatorSpec
_ <- connectionInitializerFuture
_ = {
info("Initialisation should completed successfully")
executionHandlePromise.success(()) // cleanup
completeExecutionInitialization() // cleanup
executionCompletedPromise.success(()) // cleanup
}
_ <- protectedHandle.completed
Expand Down Expand Up @@ -396,7 +396,7 @@ class HaCoordinatorSpec
_ <- connectionInitializerFuture
_ = {
info("As execution initialization starts")
executionHandlePromise.failure(new Exception("failed as initializing"))
failDuringExecutionInitialization(new Exception("failed as initializing"))
info("Initialization fails")
}
failure <- protectedHandle.completed.failed
Expand All @@ -418,7 +418,7 @@ class HaCoordinatorSpec
info("As HA initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Expand Down Expand Up @@ -465,7 +465,7 @@ class HaCoordinatorSpec
info("As HA initialized")
protectedHandle.completed.isCompleted shouldBe false
info("Protected Handle is not completed")
executionHandlePromise.success(())
completeExecutionInitialization()
info("As finishing protected execution initialization")
connectionInitializer.initialize(new TestConnection)
info("Connection initializer works")
Expand Down Expand Up @@ -523,7 +523,7 @@ class HaCoordinatorSpec
node.connectionInitializerFuture
.foreach { _ =>
logger.info(s"execution started")
node.executionHandlePromise.success(())
node.completeExecutionInitialization()
nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started")
}
node.executionShutdownFuture
Expand Down Expand Up @@ -653,7 +653,7 @@ class HaCoordinatorSpec
node.connectionInitializerFuture
.foreach { connectionInitializer =>
logger.info(s"execution started")
node.executionHandlePromise.success(())
node.completeExecutionInitialization()
connectionInitializer.initialize(workerConnection)
concurrentWorkers.incrementAndGet()
nodeStartedExecutionProbe.send(nodeStartedExecutionProbe.ref, "started")
Expand Down Expand Up @@ -841,5 +841,16 @@ class HaCoordinatorSpec
executionAbortedFuture: Future[Throwable], // observe abort in execution
executionCompletedPromise: Promise[Unit], // trigger completion of execution
dbLock: TestDBLockStorageBackend, // the lock backend
)
) {

/** trigger end of execution initialization */
def completeExecutionInitialization(): Unit = {
executionHandlePromise.success(())
}

/** simulate a failure during execution initialization */
def failDuringExecutionInitialization(cause: Throwable): Unit = {
executionHandlePromise.failure(cause)
}
}
}

0 comments on commit eecea8c

Please sign in to comment.