diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index c84de0b0d95..f7092282547 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -680,7 +680,7 @@ impl Service for StateService { } // Uses pending_utxos and queued_blocks in the StateService. - // Directly uses the shared NonFinalizedState in the ReadStateService. + // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. Request::AwaitUtxo(outpoint) => { metrics::counter!( "state.requests", @@ -693,10 +693,13 @@ impl Service for StateService { // Prepare the AwaitUtxo future from PendingUxtos. let response_fut = self.pending_utxos.queue(outpoint); + // Only instrument `response_fut`, the ReadStateService already + // instruments its requests with the same span. let span = Span::current(); let response_fut = response_fut.instrument(span).boxed(); - // Check the non-finalized block queue. + // Check the non-finalized block queue outside the returned future, + // so we can access mutable state fields. if let Some(utxo) = self.queued_blocks.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); @@ -712,38 +715,41 @@ impl Service for StateService { // This creates a rare race condition, but it doesn't seem to happen much in practice. // See #5126 for details. - // Send a request to the ReadStateService, to get UTXOs from the finalized chain. + // Manually send a request to the ReadStateService, + // to get UTXOs from any non-finalized chain or the finalized chain. let read_service = self.read_service.clone(); - // Optional TODO: - // - make pending_utxos.respond() async using a channel, - // so we can use ReadRequest::ChainUtxo here, and avoid a block_in_place(). + // Run the request in an async block, so we can await the response. + async move { + let req = ReadRequest::AnyChainUtxo(outpoint); - // Check any non-finalized chain, and the finalized chain. - let span = Span::current(); - let utxo = tokio::task::block_in_place(move || { - span.in_scope(move || { - read_service.non_finalized_state_receiver.with_watch_data( - |non_finalized_state| { - read::any_utxo(non_finalized_state, &read_service.db, outpoint) - }, - ) - }) - }); + let rsp = read_service.oneshot(req).await?; - if let Some(utxo) = utxo { - self.pending_utxos.respond(&outpoint, utxo); + // Optional TODO: + // - make pending_utxos.respond() async using a channel, + // so we can respond to all waiting requests here + // + // This change is not required for correctness, because: + // - any waiting requests should have returned when the block was sent to the state + // - otherwise, the request returns immediately if: + // - the block is in the non-finalized queue, or + // - the block is in any non-finalized chain or the finalized state + // + // And if the block is in the finalized queue, + // that's rare enough that a retry is ok. + if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp { + // We got a UTXO, so we replace the response future with the result own. + timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain"); + + return Ok(Response::Utxo(utxo)); + } - // We're finished, the returned future gets the UTXO from the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain"); + // We're finished, but the returned future is waiting on the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/waiting"); - return response_fut; + response_fut.await } - - // We're finished, but the returned future is waiting on the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/waiting"); - - response_fut + .boxed() } // TODO: add a name() method to Request, and combine all the generic read requests