Skip to content

Commit

Permalink
Make AwaitUtxo non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Sep 13, 2022
1 parent 0b8184b commit 1cd837d
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ impl Service<Request> for StateService {
}

// Uses pending_utxos and queued_blocks in the StateService.
// Directly uses the shared NonFinalizedState in the ReadStateService.
// If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
Request::AwaitUtxo(outpoint) => {
metrics::counter!(
"state.requests",
Expand All @@ -693,10 +693,13 @@ impl Service<Request> for StateService {

// Prepare the AwaitUtxo future from PendingUxtos.
let response_fut = self.pending_utxos.queue(outpoint);
// Only instrument `response_fut`, the ReadStateService already
// instruments its requests with the same span.
let span = Span::current();
let response_fut = response_fut.instrument(span).boxed();

// Check the non-finalized block queue.
// Check the non-finalized block queue outside the returned future,
// so we can access mutable state fields.
if let Some(utxo) = self.queued_blocks.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);

Expand All @@ -712,38 +715,41 @@ impl Service<Request> for StateService {
// This creates a rare race condition, but it doesn't seem to happen much in practice.
// See #5126 for details.

// Send a request to the ReadStateService, to get UTXOs from the finalized chain.
// Manually send a request to the ReadStateService,
// to get UTXOs from any non-finalized chain or the finalized chain.
let read_service = self.read_service.clone();

// Optional TODO:
// - make pending_utxos.respond() async using a channel,
// so we can use ReadRequest::ChainUtxo here, and avoid a block_in_place().
// Run the request in an async block, so we can await the response.
async move {
let req = ReadRequest::AnyChainUtxo(outpoint);

// Check any non-finalized chain, and the finalized chain.
let span = Span::current();
let utxo = tokio::task::block_in_place(move || {
span.in_scope(move || {
read_service.non_finalized_state_receiver.with_watch_data(
|non_finalized_state| {
read::any_utxo(non_finalized_state, &read_service.db, outpoint)
},
)
})
});
let rsp = read_service.oneshot(req).await?;

if let Some(utxo) = utxo {
self.pending_utxos.respond(&outpoint, utxo);
// Optional TODO:
// - make pending_utxos.respond() async using a channel,
// so we can respond to all waiting requests here
//
// This change is not required for correctness, because:
// - any waiting requests should have returned when the block was sent to the state
// - otherwise, the request returns immediately if:
// - the block is in the non-finalized queue, or
// - the block is in any non-finalized chain or the finalized state
//
// And if the block is in the finalized queue,
// that's rare enough that a retry is ok.
if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
// We got a UTXO, so we replace the response future with the result own.
timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");

return Ok(Response::Utxo(utxo));
}

// We're finished, the returned future gets the UTXO from the respond() channel.
timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
// We're finished, but the returned future is waiting on the respond() channel.
timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");

return response_fut;
response_fut.await
}

// We're finished, but the returned future is waiting on the respond() channel.
timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");

response_fut
.boxed()
}

// TODO: add a name() method to Request, and combine all the generic read requests
Expand Down

0 comments on commit 1cd837d

Please sign in to comment.