Skip to content

Commit

Permalink
Fix Parallel Indexer initialization issue [DPP-542] (#10889)
Browse files Browse the repository at this point in the history
RCA: if at parallel indexer initialization some error happening, then a promise never completes, which causes an initialization future never complete
Expected: failure should be propagated, and recovering indexer should retry 10 seconds later
Actual: failure not propagated, a zombie future freezes initialization, preventing retries
Impact: this is a corner case - if no problems at indexer initialization, the issues is not surfacing

* Extracts critical logic into helper function initializeHandle
* Adds unit tests for initializeHandle
* Fixes issue by completing the promise in all cases

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
nmarton-da authored Sep 15, 2021
1 parent b3e4975 commit 0c32e3b
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,55 +75,80 @@ object ParallelIndexerFactory {
)
else
ResourceOwner.successful(NoopHaCoordinator)
} yield toIndexer { resourceContext =>
implicit val rc: ResourceContext = resourceContext
} yield toIndexer { implicit resourceContext =>
implicit val ec: ExecutionContext = resourceContext.executionContext
haCoordinator.protectedExecution { connectionInitializer =>
val killSwitchPromise = Promise[KillSwitch]()

val completionFuture = DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = storageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize = ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
.use { dbDispatcher =>
initializeParallelIngestion(
haCoordinator.protectedExecution(connectionInitializer =>
initializeHandle(
DbDispatcher
.owner(
// this is the DataSource which will be wrapped by HikariCP, and which will drive the ingestion
// therefore this needs to be configured with the connection-init-hook, what we get from HaCoordinator
dataSource = storageBackend.createDataSource(
jdbcUrl = jdbcUrl,
dataSourceConfig = dataSourceConfig,
connectionInitHook = Some(connectionInitializer.initialize),
),
serverRole = ServerRole.Indexer,
connectionPoolSize =
ingestionParallelism + 1, // + 1 for the tailing ledger_end updates
connectionTimeout = FiniteDuration(
250,
"millis",
), // 250 millis is the lowest possible value for this Hikari configuration (see HikariConfig JavaDoc)
metrics = metrics,
)
) { dbDispatcher =>
initializeParallelIngestion(
dbDispatcher = dbDispatcher,
readService = readService,
ec = ec,
mat = mat,
).map(
parallelIndexerSubscription(
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
readService = readService,
ec = ec,
mat = mat,
).map(
parallelIndexerSubscription(
inputMapperExecutor = inputMapperExecutor,
batcherExecutor = batcherExecutor,
dbDispatcher = dbDispatcher,
materializer = mat,
)
).andThen {
// the tricky bit:
// the future in the completion handler will be this one
// but the future for signaling for the HaCoordinator, that the protected execution is initialized, needs to complete precisely here
case Success(handle) => killSwitchPromise.success(handle.killSwitch)
case Failure(ex) => killSwitchPromise.failure(ex)
}.flatMap(_.completed)
}
materializer = mat,
)
)
}
)
}

killSwitchPromise.future
.map(Handle(completionFuture.map(_ => ()), _))
/** Helper function to combine a ResourceOwner and an initialization function to initialize a Handle.
*
* @param owner A ResourceOwner which needs to be used to spawn a resource needed by initHandle
* @param initHandle Asynchronous initialization function to create a Handle
* @return A Future of a Handle where Future encapsulates initialization (as completed initialization completed)
*/
def initializeHandle[T](
owner: ResourceOwner[T]
)(initHandle: T => Future[Handle])(implicit rc: ResourceContext): Future[Handle] = {
implicit val ec: ExecutionContext = rc.executionContext
val killSwitchPromise = Promise[KillSwitch]()
val completed = owner
.use(resource =>
initHandle(resource)
.andThen {
// the tricky bit:
// the future in the completion handler will be this one
// but the future for signaling completion of initialization (the Future of the result), needs to complete precisely here
case Success(handle) => killSwitchPromise.success(handle.killSwitch)
}
.flatMap(_.completed)
)
.andThen {
// if error happens:
// - at Resource initialization (inside ResourceOwner.acquire()): result should complete with a Failure
// - at initHandle: result should complete with a Failure
// - at the execution spawned by initHandle (represented by the result Handle's complete): result should be with a success
// In the last case it is already finished the promise with a success, and this tryFailure will not succeed (returning false).
// In the other two cases the promise was not completed, and we complete here successfully with a failure.
case Failure(ex) => killSwitchPromise.tryFailure(ex)
}
}
killSwitchPromise.future
.map(Handle(completed, _))
}

def toIndexer(subscription: ResourceContext => Handle): Indexer =
new Indexer {
Expand Down
Loading

0 comments on commit 0c32e3b

Please sign in to comment.