Skip to content

Commit

Permalink
Fixed issue with onErrorDropped being called when using concatWith in…
Browse files Browse the repository at this point in the history
… QuorumReader (Azure#24180)
  • Loading branch information
kushagraThapar authored and srnagar committed Sep 17, 2021
1 parent cae878b commit 07ab715
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public Mono<StoreResponse> readStrongAsync(
});
}).repeat(maxNumberOfReadQuorumRetries)
.takeUntil(dummy -> !shouldRetryOnSecondary.v)
.concatWith(Flux.defer(() -> {
// In case there is an empty response from above flatMap, it means we could not complete read quorum
// So we will throw an error, which will be eventually retried.
.switchIfEmpty(Flux.defer(() -> {
logger.warn("Could not complete read quorum with read quorum value of {}", readQuorumValue);

return Flux.error(new GoneException(
Expand Down Expand Up @@ -634,8 +636,10 @@ private Mono<Boolean> waitForReadBarrierAsync(
}

return Flux.empty();
})).
concatWith(
}))
// In case the above flux returns empty (which it will after all the retries have been exhausted),
// We will just return false
.switchIfEmpty(
Flux.defer(() -> {
logger.debug("QuorumReader: waitForReadBarrierAsync - TargetGlobalCommittedLsn: {}, MaxGlobalCommittedLsn: {}.", targetGlobalCommittedLSN, maxGlobalCommittedLsn);
return Flux.just(false);
Expand Down

0 comments on commit 07ab715

Please sign in to comment.