From 8318c02cff95b428870fef8795a5d94dc4fa6cb7 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Sep 2022 16:36:31 +1000 Subject: [PATCH 01/16] Move AwaitUtxos next to the other shared writeable state requests --- zebra-state/src/service.rs | 55 +++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 4a29f612a7b..57ee1e41333 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -704,7 +704,34 @@ impl Service for StateService { .boxed() } - // TODO: add a name() method to Request, and combine all the read requests + // Uses pending_utxos and queued_blocks in the StateService. + // Accesses shared writeable state in the StateService. + // Runs a StoredUtxo request concurrently using the ReadStateService. + Request::AwaitUtxo(outpoint) => { + metrics::counter!( + "state.requests", + 1, + "service" => "state", + "type" => "await_utxo", + ); + + let timer = CodeTimer::start(); + let span = Span::current(); + + let fut = self.pending_utxos.queue(outpoint); + + // TODO: move disk reads (in `any_utxo()`) to a blocking thread (#2188) + if let Some(utxo) = self.any_utxo(&outpoint) { + self.pending_utxos.respond(&outpoint, utxo); + } + + // The future waits on a channel for a response. + timer.finish(module_path!(), line!(), "AwaitUtxo"); + + fut.instrument(span).boxed() + } + + // TODO: add a name() method to Request, and combine all the generic read requests // // Runs concurrently using the ReadStateService Request::Depth(_) => { @@ -831,32 +858,6 @@ impl Service for StateService { .boxed() } - // Uses pending_utxos and queued_blocks in the StateService. - // Accesses shared writeable state in the StateService. - Request::AwaitUtxo(outpoint) => { - metrics::counter!( - "state.requests", - 1, - "service" => "state", - "type" => "await_utxo", - ); - - let timer = CodeTimer::start(); - let span = Span::current(); - - let fut = self.pending_utxos.queue(outpoint); - - // TODO: move disk reads (in `any_utxo()`) to a blocking thread (#2188) - if let Some(utxo) = self.any_utxo(&outpoint) { - self.pending_utxos.respond(&outpoint, utxo); - } - - // The future waits on a channel for a response. - timer.finish(module_path!(), line!(), "AwaitUtxo"); - - fut.instrument(span).boxed() - } - // Runs concurrently using the ReadStateService Request::FindBlockHashes { .. } => { metrics::counter!( From 8e3da9d4f9a8af010f0df76e1a9f316d1b46a1b5 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Sep 2022 16:41:18 +1000 Subject: [PATCH 02/16] Rename ReadResponse::Utxos to ReadResponse::AddressUtxos ```sh fastmod Utxos AddressUtxos zebra* ``` --- zebra-rpc/src/methods.rs | 2 +- zebra-state/src/response.rs | 5 ++--- zebra-state/src/service.rs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index dcd8c91a835..c121e3c9a4a 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -916,7 +916,7 @@ where data: None, })?; let utxos = match response { - zebra_state::ReadResponse::Utxos(utxos) => utxos, + zebra_state::ReadResponse::AddressUtxos(utxos) => utxos, _ => unreachable!("unmatched response to a UtxosByAddresses request"), }; diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index f7933816623..f87a372e7a2 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -89,7 +89,7 @@ pub enum ReadResponse { AddressesTransactionIds(BTreeMap), /// Response to [`ReadRequest::UtxosByAddresses`] with found utxos and transaction data. - Utxos(AddressUtxos), + AddressUtxos(AddressUtxos), } /// Conversion from read-only [`ReadResponse`]s to read-write [`Response`]s. @@ -117,8 +117,7 @@ impl TryFrom for Response { ReadResponse::AddressBalance(_) => unimplemented!(), ReadResponse::AddressesTransactionIds(_) => unimplemented!(), - // TODO: Rename to AddressUtxos - ReadResponse::Utxos(_) => unimplemented!(), + ReadResponse::AddressUtxos(_) => unimplemented!(), } } } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 57ee1e41333..851e8513f8d 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1309,7 +1309,7 @@ impl Service for ReadStateService { // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses"); - utxos.map(ReadResponse::Utxos) + utxos.map(ReadResponse::AddressUtxos) }) }) .map(|join_result| join_result.expect("panic in ReadRequest::UtxosByAddresses")) From e1c4b1fca41a77e5f8b8b4e9850c4374373b12dc Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Sep 2022 16:58:16 +1000 Subject: [PATCH 03/16] Rename an out_point variable to outpoint for consistency --- zebra-state/src/service/non_finalized_state/chain.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index c04d4517626..2b25732ff52 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -646,7 +646,7 @@ impl Chain { /// and removed from the relevant chain(s). pub fn unspent_utxos(&self) -> HashMap { let mut unspent_utxos = self.created_utxos.clone(); - unspent_utxos.retain(|out_point, _utxo| !self.spent_utxos.contains(out_point)); + unspent_utxos.retain(|outpoint, _utxo| !self.spent_utxos.contains(outpoint)); unspent_utxos } From 9e131dd712266ccbef04754e6fb25ff3964484c7 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Sep 2022 17:06:05 +1000 Subject: [PATCH 04/16] Rename transparent_utxos to address_utxos ```sh fastmod transparent_utxos address_utxos zebra* ``` --- zebra-state/src/service.rs | 2 +- .../src/service/finalized_state/zebra_db/transparent.rs | 2 +- zebra-state/src/service/read.rs | 2 +- zebra-state/src/service/read/address/utxo.rs | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 851e8513f8d..9055a6a0bd5 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1303,7 +1303,7 @@ impl Service for ReadStateService { tokio::task::spawn_blocking(move || { span.in_scope(move || { let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_utxos(state.network, best_chain, &state.db, addresses) + read::address_utxos(state.network, best_chain, &state.db, addresses) }); // The work is done in the future. diff --git a/zebra-state/src/service/finalized_state/zebra_db/transparent.rs b/zebra-state/src/service/finalized_state/zebra_db/transparent.rs index a54796d9aa3..6e2ac9808b4 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/transparent.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/transparent.rs @@ -313,7 +313,7 @@ impl ZebraDb { /// /// Specifically, a block in the partial chain must be a child block of the finalized tip. /// (But the child block does not have to be the partial chain root.) - pub fn partial_finalized_transparent_utxos( + pub fn partial_finalized_address_utxos( &self, addresses: &HashSet, ) -> BTreeMap { diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index 2649dc86a92..d30dd9d6172 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -21,7 +21,7 @@ mod tests; pub use address::{ balance::transparent_balance, tx_id::transparent_tx_ids, - utxo::{transparent_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE}, + utxo::{address_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE}, }; pub use block::{block, block_header, transaction}; pub use find::{ diff --git a/zebra-state/src/service/read/address/utxo.rs b/zebra-state/src/service/read/address/utxo.rs index 6ab5a446fbe..ea865cf7915 100644 --- a/zebra-state/src/service/read/address/utxo.rs +++ b/zebra-state/src/service/read/address/utxo.rs @@ -83,7 +83,7 @@ impl AddressUtxos { /// /// If the addresses do not exist in the non-finalized `chain` or finalized `db`, /// returns an empty list. -pub fn transparent_utxos( +pub fn address_utxos( network: Network, chain: Option, db: &ZebraDb, @@ -100,7 +100,7 @@ where for attempt in 0..=FINALIZED_ADDRESS_INDEX_RETRIES { debug!(?attempt, ?address_count, "starting address UTXO query"); - let (finalized_utxos, finalized_tip_range) = finalized_transparent_utxos(db, &addresses); + let (finalized_utxos, finalized_tip_range) = finalized_address_utxos(db, &addresses); debug!( finalized_utxo_count = ?finalized_utxos.len(), @@ -162,7 +162,7 @@ where /// If the addresses do not exist in the finalized `db`, returns an empty list. // // TODO: turn the return type into a struct? -fn finalized_transparent_utxos( +fn finalized_address_utxos( db: &ZebraDb, addresses: &HashSet, ) -> ( @@ -176,7 +176,7 @@ fn finalized_transparent_utxos( // Check if the finalized state changed while we were querying it let start_finalized_tip = db.finalized_tip_height(); - let finalized_utxos = db.partial_finalized_transparent_utxos(addresses); + let finalized_utxos = db.partial_finalized_address_utxos(addresses); let end_finalized_tip = db.finalized_tip_height(); From 3e757153c303f0e8b43bd42513f8048948408b23 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 8 Sep 2022 17:49:20 +1000 Subject: [PATCH 05/16] Run AwaitUtxo without accessing shared mutable chain state --- zebra-state/src/request.rs | 26 ++++- zebra-state/src/response.rs | 26 +++-- zebra-state/src/service.rs | 109 +++++++++++++++--- .../src/service/non_finalized_state.rs | 13 +-- .../src/service/non_finalized_state/chain.rs | 12 ++ zebra-state/src/service/read.rs | 2 +- zebra-state/src/service/read/block.rs | 23 ++++ 7 files changed, 172 insertions(+), 39 deletions(-) diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index c507d2c07d7..910fdb3d62b 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -445,9 +445,11 @@ pub enum Request { /// [`block::Height`] using `.into()`. Block(HashOrHeight), - /// Request a UTXO identified by the given - /// [`OutPoint`](transparent::OutPoint), waiting until it becomes available - /// if it is unknown. + /// Request a UTXO identified by the given [`OutPoint`](transparent::OutPoint), + /// waiting until it becomes available if it is unknown. + /// + /// Checks the finalized chain, non-finalized chain, queued unverified blocks, + /// and any blocks that arrive at the state after the request future has been created. /// /// This request is purely informational, and there are no guarantees about /// whether the UTXO remains unspent or is on the best chain, or any chain. @@ -458,6 +460,8 @@ pub enum Request { /// UTXO requests should be wrapped in a timeout, so that /// out-of-order and invalid requests do not hang indefinitely. See the [`crate`] /// documentation for details. + /// + /// Outdated requests are pruned on a regular basis. AwaitUtxo(transparent::OutPoint), /// Finds the first hash that's in the peer's `known_blocks` and the local best chain. @@ -542,6 +546,15 @@ pub enum ReadRequest { /// * [`ReadResponse::Transaction(None)`](ReadResponse::Transaction) otherwise. Transaction(transaction::Hash), + /// Looks up a UTXO identified by the given [`OutPoint`](transparent::OutPoint), + /// returning `None` immediately if it is unknown. + /// + /// Checks verified blocks in the finalized chain and non-finalized chain. + /// + /// This request is purely informational, there is no guarantee that + /// the UTXO remains unspent in the best chain. + ChainUtxo(transparent::OutPoint), + /// Computes a block locator object based on the current best chain. /// /// Returns [`ReadResponse::BlockLocator`] with hashes starting @@ -662,8 +675,6 @@ impl TryFrom for ReadRequest { Request::Block(hash_or_height) => Ok(ReadRequest::Block(hash_or_height)), Request::Transaction(tx_hash) => Ok(ReadRequest::Transaction(tx_hash)), - Request::AwaitUtxo(_) => unimplemented!("use StoredUtxo here"), - Request::BlockLocator => Ok(ReadRequest::BlockLocator), Request::FindBlockHashes { known_blocks, stop } => { Ok(ReadRequest::FindBlockHashes { known_blocks, stop }) @@ -675,6 +686,11 @@ impl TryFrom for ReadRequest { Request::CommitBlock(_) | Request::CommitFinalizedBlock(_) => { Err("ReadService does not write blocks") } + + Request::AwaitUtxo(_) => { + Err("ReadService does not track pending UTXOs. \ + Manually convert the request to ReadRequest::ChainUtxo, and handle pending UTXOs.") + } } } } diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index f87a372e7a2..44bc0c923ac 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -39,7 +39,8 @@ pub enum Response { /// Response to [`Request::Block`] with the specified block. Block(Option>), - /// The response to a `AwaitUtxo` request. + /// The response to a `AwaitUtxo` request, from the non-finalized chain, finalized chain, + /// pending unverified blocks, or blocks received after the request was sent. Utxo(transparent::Utxo), /// The response to a `FindBlockHashes` request. @@ -75,6 +76,13 @@ pub enum ReadResponse { /// The response to a `FindBlockHeaders` request. BlockHeaders(Vec), + /// The response to a `ChainUtxo` request, from verified blocks in the + /// non-finalized chain or finalized chain. + /// + /// This response is purely informational, there is no guarantee that + /// the UTXO remains unspent in the best chain. + ChainUtxo(Option), + /// Response to [`ReadRequest::SaplingTree`] with the specified Sapling note commitment tree. SaplingTree(Option>), @@ -108,16 +116,20 @@ impl TryFrom for Response { Ok(Response::Transaction(tx_and_height.map(|(tx, _height)| tx))) } + ReadResponse::ChainUtxo(_) => Err("ReadService does not track pending UTXOs. \ + Manually unwrap the response, and handle pending UTXOs."), + ReadResponse::BlockLocator(hashes) => Ok(Response::BlockLocator(hashes)), ReadResponse::BlockHashes(hashes) => Ok(Response::BlockHashes(hashes)), ReadResponse::BlockHeaders(headers) => Ok(Response::BlockHeaders(headers)), - ReadResponse::SaplingTree(_) => unimplemented!(), - ReadResponse::OrchardTree(_) => unimplemented!(), - - ReadResponse::AddressBalance(_) => unimplemented!(), - ReadResponse::AddressesTransactionIds(_) => unimplemented!(), - ReadResponse::AddressUtxos(_) => unimplemented!(), + ReadResponse::SaplingTree(_) + | ReadResponse::OrchardTree(_) + | ReadResponse::AddressBalance(_) + | ReadResponse::AddressesTransactionIds(_) + | ReadResponse::AddressUtxos(_) => { + Err("there is no corresponding Response for this ReadResponse") + } } } } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 9055a6a0bd5..74d0771cf2e 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -495,23 +495,21 @@ impl StateService { } /// Return the [`transparent::Utxo`] pointed to by `outpoint`, if it exists - /// in any chain, or in any pending block. + /// in any non-finalized chain, or in any pending block. /// /// Some of the returned UTXOs may be invalid, because: + /// - they have already been spent, /// - they are not in the best chain, or /// - their block fails contextual validation. - pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option { - // We ignore any UTXOs in FinalizedState.queued_by_prev_hash, - // because it is only used during checkpoint verification. + /// + /// Finalized UTXOs are handled via [`ReadRequest::ChainUtxo`]. + pub fn any_non_finalized_utxo( + &self, + outpoint: &transparent::OutPoint, + ) -> Option { self.mem .any_utxo(outpoint) .or_else(|| self.queued_blocks.utxo(outpoint)) - .or_else(|| { - self.disk - .db() - .utxo(outpoint) - .map(|ordered_utxo| ordered_utxo.utxo) - }) } /// Return an iterator over the relevant chain of the block identified by @@ -706,7 +704,7 @@ impl Service for StateService { // Uses pending_utxos and queued_blocks in the StateService. // Accesses shared writeable state in the StateService. - // Runs a StoredUtxo request concurrently using the ReadStateService. + // Runs a ChainUtxo request using the ReadStateService, but without any concurrency. Request::AwaitUtxo(outpoint) => { metrics::counter!( "state.requests", @@ -716,19 +714,62 @@ impl Service for StateService { ); let timer = CodeTimer::start(); + + let response_fut = self.pending_utxos.queue(outpoint); let span = Span::current(); + let response_fut = response_fut.instrument(span).boxed(); + + if let Some(utxo) = self.any_non_finalized_utxo(&outpoint) { + self.pending_utxos.respond(&outpoint, utxo); + + // We're finished, the returned future gets the UTXO from the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/any-non-finalized"); - let fut = self.pending_utxos.queue(outpoint); + return response_fut; + } + + // We ignore any UTXOs in FinalizedState.queued_by_prev_hash, + // because it is only used during checkpoint verification. + // + // This creates a rare race condition between concurrent: + // - pending blocks in the last few checkpoints, and + // - the first few blocks doing full validation using UTXOs. + // + // But it doesn't seem to happen much in practice. + // If it did, some blocks would temporarily fail full validation, + // until they get retried after the checkpointed blocks are committed. + + // Send a request to the ReadStateService, to get UTXOs from the finalized chain. + let read_service = self.read_service.clone(); - // TODO: move disk reads (in `any_utxo()`) to a blocking thread (#2188) - if let Some(utxo) = self.any_utxo(&outpoint) { + // Optional TODO: + // - make pending_utxos.respond() async using a channel, + // so we can use ReadRequest::ChainUtxo here, and avoid a block_in_place(). + + let span = Span::current(); + let utxo = tokio::task::block_in_place(move || { + span.in_scope(move || { + read_service + .best_chain_receiver + .with_watch_data(|best_chain| { + read::utxo(best_chain, &read_service.db, outpoint) + }) + }) + }); + + if let Some(utxo) = utxo { self.pending_utxos.respond(&outpoint, utxo); + + // We're finished, the returned future gets the UTXO from the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/finalized"); + + return response_fut; } - // The future waits on a channel for a response. - timer.finish(module_path!(), line!(), "AwaitUtxo"); + // We're finished, but the returned future is waiting on the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/waiting"); - fut.instrument(span).boxed() + response_fut } // TODO: add a name() method to Request, and combine all the generic read requests @@ -984,7 +1025,7 @@ impl Service for ReadStateService { .boxed() } - // Used by get_block RPC. + // Used by get_block RPC and the StateService. ReadRequest::Block(hash_or_height) => { metrics::counter!( "state.requests", @@ -1014,7 +1055,7 @@ impl Service for ReadStateService { .boxed() } - // For the get_raw_transaction RPC. + // For the get_raw_transaction RPC and the StateService. ReadRequest::Transaction(hash) => { metrics::counter!( "state.requests", @@ -1045,6 +1086,36 @@ impl Service for ReadStateService { .boxed() } + // Currently unused. + ReadRequest::ChainUtxo(outpoint) => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "chain_utxo", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let utxo = state.best_chain_receiver.with_watch_data(|best_chain| { + read::utxo(best_chain, &state.db, outpoint) + }); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::ChainUtxo"); + + Ok(ReadResponse::ChainUtxo(utxo)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::ChainUtxo")) + .boxed() + } + // Used by the StateService. ReadRequest::BlockLocator => { metrics::counter!( diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index 8c5e3644f1f..dfc16440434 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -340,14 +340,13 @@ impl NonFinalizedState { /// Returns the [`transparent::Utxo`] pointed to by the given /// [`transparent::OutPoint`] if it is present in any chain. + /// + /// UTXOs are returned regardless of whether they have been spent. pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option { - for chain in self.chain_set.iter().rev() { - if let Some(utxo) = chain.created_utxos.get(outpoint) { - return Some(utxo.utxo.clone()); - } - } - - None + self.chain_set + .iter() + .rev() + .find_map(|chain| chain.created_utxo(outpoint)) } /// Returns the `block` with the given hash in any chain. diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index 2b25732ff52..0d707b197e0 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -651,6 +651,18 @@ impl Chain { unspent_utxos } + /// Returns the [`transparent::Utxo`] pointed to by the given + /// [`transparent::OutPoint`] if it was created by this chain. + /// + /// UTXOs are returned regardless of whether they have been spent. + pub fn created_utxo(&self, outpoint: &transparent::OutPoint) -> Option { + if let Some(utxo) = self.created_utxos.get(outpoint) { + return Some(utxo.utxo.clone()); + } + + None + } + // Address index queries /// Returns the transparent transfers for `addresses` in this non-finalized chain. diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index d30dd9d6172..76553e383f2 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -23,7 +23,7 @@ pub use address::{ tx_id::transparent_tx_ids, utxo::{address_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE}, }; -pub use block::{block, block_header, transaction}; +pub use block::{block, block_header, transaction, utxo}; pub use find::{ block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers, hash_by_height, height_by_hash, tip, tip_height, diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 195d93d2a37..81df7d8be9d 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use zebra_chain::{ block::{self, Block, Height}, transaction::{self, Transaction}, + transparent::{self, Utxo}, }; use crate::{ @@ -88,3 +89,25 @@ where }) .or_else(|| db.transaction(hash)) } + +/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in the +/// non-finalized `chain` or finalized `db`. +/// +/// UTXOs may be returned regardless of whether they have been spent. +pub fn utxo(chain: Option, db: &ZebraDb, outpoint: transparent::OutPoint) -> Option +where + C: AsRef, +{ + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // Since UTXOs are the same in the finalized and non-finalized state, + // we check the most efficient alternative first. (`chain` is always in + // memory, but `db` stores transactions on disk, with a memory cache.) + chain + .and_then(|chain| chain.as_ref().created_utxo(&outpoint)) + .or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo)) +} From 9a104544327cf94a9a6a0c91005d1692bb17ec55 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 9 Sep 2022 09:38:53 +1000 Subject: [PATCH 06/16] Fix some incorrect comments --- zebra-state/src/request.rs | 2 +- zebra-state/src/response.rs | 2 +- zebra-state/src/service.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 910fdb3d62b..9c2de243185 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -448,7 +448,7 @@ pub enum Request { /// Request a UTXO identified by the given [`OutPoint`](transparent::OutPoint), /// waiting until it becomes available if it is unknown. /// - /// Checks the finalized chain, non-finalized chain, queued unverified blocks, + /// Checks the finalized chain, all non-finalized chains, queued unverified blocks, /// and any blocks that arrive at the state after the request future has been created. /// /// This request is purely informational, and there are no guarantees about diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index 44bc0c923ac..bce1dd2947f 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -39,7 +39,7 @@ pub enum Response { /// Response to [`Request::Block`] with the specified block. Block(Option>), - /// The response to a `AwaitUtxo` request, from the non-finalized chain, finalized chain, + /// The response to a `AwaitUtxo` request, from any non-finalized chains, finalized chain, /// pending unverified blocks, or blocks received after the request was sent. Utxo(transparent::Utxo), diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 74d0771cf2e..55116e1c6ce 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -137,7 +137,7 @@ pub(crate) struct StateService { /// A cloneable [`ReadStateService`], used to answer concurrent read requests. /// - /// TODO: move concurrent read requests to [`ReadRequest`], and remove `read_service`. + /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`. read_service: ReadStateService, } From 0f3e13c5c2c18a0aed3d041b50a44252bd9bbf90 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 9 Sep 2022 09:56:33 +1000 Subject: [PATCH 07/16] Explain why some concurrent reads are ok --- zebra-state/src/service/read/find.rs | 83 ++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/zebra-state/src/service/read/find.rs b/zebra-state/src/service/read/find.rs index b3e9855d4cc..7cd28713065 100644 --- a/zebra-state/src/service/read/find.rs +++ b/zebra-state/src/service/read/find.rs @@ -22,6 +22,16 @@ pub fn tip(chain: Option, db: &ZebraDb) -> Option<(Height, block::Hash)> where C: AsRef, { + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // If there is an overlap between the non-finalized and finalized states, + // where the finalized tip is above the non-finalized tip, + // Zebra is receiving a lot of blocks, or this request has been delayed for a long time, + // so it is acceptable to return either tip. chain .map(|chain| chain.as_ref().non_finalized_tip()) .or_else(|| db.tip()) @@ -54,6 +64,15 @@ where { let chain = chain.as_ref(); + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // It is ok to do this lookup in two different calls. Finalized state updates + // can only add overlapping blocks, and hashes are unique. + let tip = tip_height(chain, db)?; let height = height_by_hash(chain, db, hash)?; @@ -65,6 +84,14 @@ pub fn height_by_hash(chain: Option, db: &ZebraDb, hash: block::Hash) -> O where C: AsRef, { + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // Finalized state updates can only add overlapping blocks, and hashes are unique. + chain .and_then(|chain| chain.as_ref().height_by_hash(hash)) .or_else(|| db.height(hash)) @@ -75,6 +102,20 @@ pub fn hash_by_height(chain: Option, db: &ZebraDb, height: Height) -> Opti where C: AsRef, { + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // Finalized state updates can only add overlapping blocks, and heights are unique + // in the current `chain`. + // + // If there is an overlap between the non-finalized and finalized states, + // where the finalized tip is above the non-finalized tip, + // Zebra is receiving a lot of blocks, or this request has been delayed for a long time, + // so it is acceptable to return hashes from either chain. + chain .and_then(|chain| chain.as_ref().hash_by_height(height)) .or_else(|| db.hash(height)) @@ -85,6 +126,19 @@ pub fn chain_contains_hash(chain: Option, db: &ZebraDb, hash: block::Hash) where C: AsRef, { + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // Finalized state updates can only add overlapping blocks, and hashes are unique. + // + // If there is an overlap between the non-finalized and finalized states, + // where the finalized tip is above the non-finalized tip, + // Zebra is receiving a lot of blocks, or this request has been delayed for a long time, + // so it is acceptable to return hashes from either chain. + chain .map(|chain| chain.as_ref().height_by_hash.contains_key(&hash)) .unwrap_or(false) @@ -102,6 +156,23 @@ where { let chain = chain.as_ref(); + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // this `chain` variable. + // + // It is ok to do these lookups using multiple database calls. Finalized state updates + // can only add overlapping blocks, and hashes are unique. + // + // If there is an overlap between the non-finalized and finalized states, + // where the finalized tip is above the non-finalized tip, + // Zebra is receiving a lot of blocks, or this request has been delayed for a long time, + // so it is acceptable to return a set of hashes from multiple chains. + // + // Multiple heights can not map to the same hash, even in different chains, + // because the block height is covered by the block hash, + // via the transaction merkle tree commitments. let tip_height = tip_height(chain, db)?; let heights = block_locator_heights(tip_height); @@ -419,6 +490,10 @@ pub fn find_chain_hashes( where C: AsRef, { + // # Correctness + // + // See the note in `block_locator()`. + let chain = chain.as_ref(); let intersection = find_chain_intersection(chain, db, known_blocks); @@ -439,6 +514,14 @@ pub fn find_chain_headers( where C: AsRef, { + // # Correctness + // + // Headers are looked up by their hashes using a unique mapping, + // so it is not possible for multiple hashes to look up the same header, + // even across different chains. + // + // See also the note in `block_locator()`. + let chain = chain.as_ref(); let intersection = find_chain_intersection(chain, db, known_blocks); From 392b19afed31db82006533ac8c9c6765be89dd38 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 9 Sep 2022 10:22:59 +1000 Subject: [PATCH 08/16] Add a TODO --- zebra-state/src/service.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 55116e1c6ce..c5320e1c6b7 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -315,6 +315,9 @@ impl StateService { rsp_rx }; + // TODO: avoid a temporary verification failure that can happen + // if the first non-finalized block arrives before the last finalized block is committed + // (#5125) if !self.can_fork_chain_at(&parent_hash) { tracing::trace!("unready to verify, returning early"); return rsp_rx; From 882e9840845a94136c6d3b8fa2e36837c7b5481d Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 9 Sep 2022 10:55:38 +1000 Subject: [PATCH 09/16] Stop using self.mem in AwaitUtxo requests --- zebra-state/src/service.rs | 223 +++++++++++---------- zebra-state/src/service/check/nullifier.rs | 10 +- zebra-state/src/service/read.rs | 16 +- zebra-state/src/service/read/block.rs | 28 ++- 4 files changed, 166 insertions(+), 111 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index c5320e1c6b7..42b60741e47 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -19,7 +19,6 @@ use std::{ convert, future::Future, pin::Pin, - sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -36,7 +35,6 @@ use zebra_chain::{ block::{self, CountedHeader}, diagnostic::CodeTimer, parameters::{Network, NetworkUpgrade}, - transparent, }; use crate::{ @@ -44,7 +42,7 @@ use crate::{ service::{ chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip}, finalized_state::{FinalizedState, ZebraDb}, - non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks}, + non_finalized_state::{NonFinalizedState, QueuedBlocks}, pending_utxos::PendingUtxos, watch_receiver::WatchReceiver, }, @@ -132,8 +130,8 @@ pub(crate) struct StateService { /// [`LatestChainTip`] and [`ChainTipChange`]. chain_tip_sender: ChainTipSender, - /// A sender channel used to update the current best non-finalized chain for [`ReadStateService`]. - best_chain_sender: watch::Sender>>, + /// A sender channel used to update the recent non-finalized state for the [`ReadStateService`]. + non_finalized_state_sender: watch::Sender, /// A cloneable [`ReadStateService`], used to answer concurrent read requests. /// @@ -170,11 +168,11 @@ pub struct ReadStateService { /// so it might include some block data that is also in `best_mem`. db: ZebraDb, - /// A watch channel for the current best in-memory chain. + /// A watch channel for a recent [`NonFinalizedState`]. /// - /// This chain is only updated between requests, + /// This state is only updated between requests, /// so it might include some block data that is also on `disk`. - best_chain_receiver: WatchReceiver>>, + non_finalized_state_receiver: WatchReceiver, } impl StateService { @@ -205,7 +203,7 @@ impl StateService { let mem = NonFinalizedState::new(network); - let (read_service, best_chain_sender) = ReadStateService::new(&disk); + let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk); let queued_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); @@ -218,7 +216,7 @@ impl StateService { pending_utxos, last_prune: Instant::now(), chain_tip_sender, - best_chain_sender, + non_finalized_state_sender, read_service: read_service.clone(), }; timer.finish(module_path!(), line!(), "initializing state service"); @@ -367,7 +365,7 @@ impl StateService { rsp_rx } - /// Update the [`LatestChainTip`], [`ChainTipChange`], and `best_chain_sender` + /// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender` /// channels with the latest non-finalized [`ChainTipBlock`] and /// [`Chain`][1]. /// @@ -384,11 +382,8 @@ impl StateService { .map(ChainTipBlock::from); let tip_block_height = tip_block.as_ref().map(|block| block.height); - // The RPC service uses the ReadStateService, but it is not turned on by default. - if self.best_chain_sender.receiver_count() > 0 { - // If the final receiver was just dropped, ignore the error. - let _ = self.best_chain_sender.send(best_chain.cloned()); - } + // If the final receiver was just dropped, ignore the error. + let _ = self.non_finalized_state_sender.send(self.mem.clone()); self.chain_tip_sender.set_best_non_finalized_tip(tip_block); @@ -465,7 +460,7 @@ impl StateService { /// network, based on the committed finalized and non-finalized state. /// /// Note: some additional contextual validity checks are performed by the - /// non-finalized [`Chain`]. + /// non-finalized [`Chain`](non_finalized_state::Chain). fn check_contextual_validity( &mut self, prepared: &PreparedBlock, @@ -497,24 +492,6 @@ impl StateService { .or_else(|| self.disk.db().height(hash)) } - /// Return the [`transparent::Utxo`] pointed to by `outpoint`, if it exists - /// in any non-finalized chain, or in any pending block. - /// - /// Some of the returned UTXOs may be invalid, because: - /// - they have already been spent, - /// - they are not in the best chain, or - /// - their block fails contextual validation. - /// - /// Finalized UTXOs are handled via [`ReadRequest::ChainUtxo`]. - pub fn any_non_finalized_utxo( - &self, - outpoint: &transparent::OutPoint, - ) -> Option { - self.mem - .any_utxo(outpoint) - .or_else(|| self.queued_blocks.utxo(outpoint)) - } - /// Return an iterator over the relevant chain of the block identified by /// `hash`, in order from the largest height to the genesis block. /// @@ -543,19 +520,20 @@ impl ReadStateService { /// Creates a new read-only state service, using the provided finalized state. /// /// Returns the newly created service, - /// and a watch channel for updating its best non-finalized chain. - pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender>>) { - let (best_chain_sender, best_chain_receiver) = watch::channel(None); + /// and a watch channel for updating the shared recent non-finalized chain. + pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender) { + let (non_finalized_state_sender, non_finalized_state_receiver) = + watch::channel(NonFinalizedState::new(disk.network())); let read_service = Self { network: disk.network(), db: disk.db().clone(), - best_chain_receiver: WatchReceiver::new(best_chain_receiver), + non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver), }; tracing::info!("created new read-only state service"); - (read_service, best_chain_sender) + (read_service, non_finalized_state_sender) } } @@ -706,8 +684,7 @@ impl Service for StateService { } // Uses pending_utxos and queued_blocks in the StateService. - // Accesses shared writeable state in the StateService. - // Runs a ChainUtxo request using the ReadStateService, but without any concurrency. + // Directly uses the shared NonFinalizedState in the ReadStateService. Request::AwaitUtxo(outpoint) => { metrics::counter!( "state.requests", @@ -718,15 +695,17 @@ impl Service for StateService { let timer = CodeTimer::start(); + // Prepare the AwaitUtxo future. let response_fut = self.pending_utxos.queue(outpoint); let span = Span::current(); let response_fut = response_fut.instrument(span).boxed(); - if let Some(utxo) = self.any_non_finalized_utxo(&outpoint) { + // Check the non-finalized block queue. + if let Some(utxo) = self.queued_blocks.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); // We're finished, the returned future gets the UTXO from the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/any-non-finalized"); + timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized"); return response_fut; } @@ -734,13 +713,8 @@ impl Service for StateService { // We ignore any UTXOs in FinalizedState.queued_by_prev_hash, // because it is only used during checkpoint verification. // - // This creates a rare race condition between concurrent: - // - pending blocks in the last few checkpoints, and - // - the first few blocks doing full validation using UTXOs. - // - // But it doesn't seem to happen much in practice. - // If it did, some blocks would temporarily fail full validation, - // until they get retried after the checkpointed blocks are committed. + // This creates a rare race condition, but it doesn't seem to happen much in practice. + // See #5126 for details. // Send a request to the ReadStateService, to get UTXOs from the finalized chain. let read_service = self.read_service.clone(); @@ -749,14 +723,15 @@ impl Service for StateService { // - make pending_utxos.respond() async using a channel, // so we can use ReadRequest::ChainUtxo here, and avoid a block_in_place(). + // Check any non-finalized chain, and the finalized chain. let span = Span::current(); let utxo = tokio::task::block_in_place(move || { span.in_scope(move || { - read_service - .best_chain_receiver - .with_watch_data(|best_chain| { - read::utxo(best_chain, &read_service.db, outpoint) - }) + read_service.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::any_utxo(non_finalized_state, &read_service.db, outpoint) + }, + ) }) }); @@ -764,7 +739,7 @@ impl Service for StateService { self.pending_utxos.respond(&outpoint, utxo); // We're finished, the returned future gets the UTXO from the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/finalized"); + timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain"); return response_fut; } @@ -984,9 +959,11 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let tip = state - .best_chain_receiver - .with_watch_data(|best_chain| read::tip(best_chain, &state.db)); + let tip = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::tip(non_finalized_state.best_chain(), &state.db) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::Tip"); @@ -1014,9 +991,11 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let depth = state - .best_chain_receiver - .with_watch_data(|best_chain| read::depth(best_chain, &state.db, hash)); + let depth = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::depth(non_finalized_state.best_chain(), &state.db, hash) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::Depth"); @@ -1044,9 +1023,15 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let block = state.best_chain_receiver.with_watch_data(|best_chain| { - read::block(best_chain, &state.db, hash_or_height) - }); + let block = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::block( + non_finalized_state.best_chain(), + &state.db, + hash_or_height, + ) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::Block"); @@ -1074,9 +1059,10 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let transaction_and_height = - state.best_chain_receiver.with_watch_data(|best_chain| { - read::transaction(best_chain, &state.db, hash) + let transaction_and_height = state + .non_finalized_state_receiver + .with_watch_data(|non_finalized_state| { + read::transaction(non_finalized_state.best_chain(), &state.db, hash) }); // The work is done in the future. @@ -1105,9 +1091,11 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let utxo = state.best_chain_receiver.with_watch_data(|best_chain| { - read::utxo(best_chain, &state.db, outpoint) - }); + let utxo = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::utxo(non_finalized_state.best_chain(), &state.db, outpoint) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::ChainUtxo"); @@ -1135,10 +1123,11 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let block_locator = - state.best_chain_receiver.with_watch_data(|best_chain| { - read::block_locator(best_chain, &state.db) - }); + let block_locator = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::block_locator(non_finalized_state.best_chain(), &state.db) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator"); @@ -1168,16 +1157,17 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let block_hashes = - state.best_chain_receiver.with_watch_data(|best_chain| { + let block_hashes = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { read::find_chain_hashes( - best_chain, + non_finalized_state.best_chain(), &state.db, known_blocks, stop, MAX_FIND_BLOCK_HASHES_RESULTS, ) - }); + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes"); @@ -1205,16 +1195,17 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let block_headers = - state.best_chain_receiver.with_watch_data(|best_chain| { + let block_headers = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { read::find_chain_headers( - best_chain, + non_finalized_state.best_chain(), &state.db, known_blocks, stop, MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA, ) - }); + }, + ); let block_headers = block_headers .into_iter() @@ -1246,10 +1237,15 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let sapling_tree = - state.best_chain_receiver.with_watch_data(|best_chain| { - read::sapling_tree(best_chain, &state.db, hash_or_height) - }); + let sapling_tree = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::sapling_tree( + non_finalized_state.best_chain(), + &state.db, + hash_or_height, + ) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree"); @@ -1276,10 +1272,15 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let orchard_tree = - state.best_chain_receiver.with_watch_data(|best_chain| { - read::orchard_tree(best_chain, &state.db, hash_or_height) - }); + let orchard_tree = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::orchard_tree( + non_finalized_state.best_chain(), + &state.db, + hash_or_height, + ) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree"); @@ -1307,9 +1308,15 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let balance = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_balance(best_chain, &state.db, addresses) - })?; + let balance = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::transparent_balance( + non_finalized_state.best_chain().cloned(), + &state.db, + addresses, + ) + }, + )?; // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance"); @@ -1340,9 +1347,16 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let tx_ids = state.best_chain_receiver.with_watch_data(|best_chain| { - read::transparent_tx_ids(best_chain, &state.db, addresses, height_range) - }); + let tx_ids = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::transparent_tx_ids( + non_finalized_state.best_chain(), + &state.db, + addresses, + height_range, + ) + }, + ); // The work is done in the future. timer.finish( @@ -1376,9 +1390,16 @@ impl Service for ReadStateService { let span = Span::current(); tokio::task::spawn_blocking(move || { span.in_scope(move || { - let utxos = state.best_chain_receiver.with_watch_data(|best_chain| { - read::address_utxos(state.network, best_chain, &state.db, addresses) - }); + let utxos = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::address_utxos( + state.network, + non_finalized_state.best_chain(), + &state.db, + addresses, + ) + }, + ); // The work is done in the future. timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses"); diff --git a/zebra-state/src/service/check/nullifier.rs b/zebra-state/src/service/check/nullifier.rs index 4012d7ff967..86fdd0c4110 100644 --- a/zebra-state/src/service/check/nullifier.rs +++ b/zebra-state/src/service/check/nullifier.rs @@ -9,9 +9,13 @@ use crate::{ ValidateContextError, }; +// Tidy up some doc links +#[allow(unused_imports)] +use crate::service; + /// Reject double-spends of nullifers: /// - one from this [`PreparedBlock`], and the other already committed to the -/// [`FinalizedState`](super::super::FinalizedState). +/// [`FinalizedState`](service::FinalizedState). /// /// (Duplicate non-finalized nullifiers are rejected during the chain update, /// see [`add_to_non_finalized_chain_unique`] for details.) @@ -80,7 +84,7 @@ pub(crate) fn no_duplicates_in_finalized_chain( /// [2]: zebra_chain::sapling::Spend /// [3]: zebra_chain::orchard::Action /// [4]: zebra_chain::block::Block -/// [5]: super::super::Chain +/// [5]: service::non_finalized_state::Chain #[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))] pub(crate) fn add_to_non_finalized_chain_unique<'block, NullifierT>( chain_nullifiers: &mut HashSet, @@ -124,7 +128,7 @@ where /// [`add_to_non_finalized_chain_unique`], so this shielded data should be the /// only shielded data that added this nullifier to this [`Chain`][1]. /// -/// [1]: super::super::Chain +/// [1]: service::non_finalized_state::Chain #[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))] pub(crate) fn remove_from_non_finalized_chain<'block, NullifierT>( chain_nullifiers: &mut HashSet, diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index 76553e383f2..e2a0c01695d 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -4,11 +4,15 @@ //! best [`Chain`][5] in the [`NonFinalizedState`][3], and the database in the //! [`FinalizedState`][4]. //! -//! [1]: super::StateService -//! [2]: super::ReadStateService -//! [3]: super::non_finalized_state::NonFinalizedState -//! [4]: super::finalized_state::FinalizedState -//! [5]: super::Chain +//! [1]: service::StateService +//! [2]: service::ReadStateService +//! [3]: service::non_finalized_state::NonFinalizedState +//! [4]: service::finalized_state::FinalizedState +//! [5]: service::non_finalized_state::Chain + +// Tidy up some doc links +#[allow(unused_imports)] +use crate::service; pub mod address; pub mod block; @@ -23,7 +27,7 @@ pub use address::{ tx_id::transparent_tx_ids, utxo::{address_utxos, AddressUtxos, ADDRESS_HEIGHTS_FULL_RANGE}, }; -pub use block::{block, block_header, transaction, utxo}; +pub use block::{any_utxo, block, block_header, transaction, utxo}; pub use find::{ block_locator, chain_contains_hash, depth, find_chain_hashes, find_chain_headers, hash_by_height, height_by_hash, tip, tip_height, diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 81df7d8be9d..9b45e29e3b0 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -9,7 +9,10 @@ use zebra_chain::{ }; use crate::{ - service::{finalized_state::ZebraDb, non_finalized_state::Chain}, + service::{ + finalized_state::ZebraDb, + non_finalized_state::{Chain, NonFinalizedState}, + }, HashOrHeight, }; @@ -111,3 +114,26 @@ where .and_then(|chain| chain.as_ref().created_utxo(&outpoint)) .or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo)) } + +/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in the +/// `non_finalized_state` or finalized `db`. +/// +/// UTXOs may be returned regardless of whether they have been spent. +pub fn any_utxo( + non_finalized_state: NonFinalizedState, + db: &ZebraDb, + outpoint: transparent::OutPoint, +) -> Option { + // # Correctness + // + // The StateService commits blocks to the finalized state before updating + // the latest chain, and it can commit additional blocks after we've cloned + // the `non_finalized_state`. + // + // Since UTXOs are the same in the finalized and non-finalized state, + // we check the most efficient alternative first. (`non_finalized_state` is always in + // memory, but `db` stores transactions on disk, with a memory cache.) + non_finalized_state + .any_utxo(&outpoint) + .or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo)) +} From 5b3be70e500683d837fdec633b3480d69653a4b3 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 09:12:00 +1000 Subject: [PATCH 10/16] Update state service module documentation --- zebra-state/src/service.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 42b60741e47..f3d3ff3d2dc 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1,13 +1,12 @@ //! [`tower::Service`]s for Zebra's cached chain state. //! //! Zebra provides cached state access via two main services: -//! - [`StateService`]: a read-write service that waits for queued blocks. +//! - [`StateService`]: a read-write service that writes blocks to the state, +//! and redirects most read requests to the [`ReadStateService`]. //! - [`ReadStateService`]: a read-only service that answers from the most //! recent committed block. //! -//! Most users should prefer [`ReadStateService`], unless they need to wait for -//! verified blocks to be committed. (For example, the syncer and mempool -//! tasks.) +//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state. //! //! Zebra also provides access to the best chain tip via: //! - [`LatestChainTip`]: a read-only channel that contains the latest committed From 8591b44a30fcdb9b0aeba980c8debb7b53cf62cd Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 09:25:55 +1000 Subject: [PATCH 11/16] Move the QueuedBlock type into the queued_blocks module --- zebra-state/src/service.rs | 9 +++------ .../service/non_finalized_state/queued_blocks.rs | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index f3d3ff3d2dc..67c8fba9bdf 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -68,10 +68,6 @@ mod tests; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; -pub type QueuedBlock = ( - PreparedBlock, - oneshot::Sender>, -); pub type QueuedFinalized = ( FinalizedBlock, oneshot::Sender>, @@ -110,9 +106,10 @@ pub(crate) struct StateService { /// The non-finalized chain state, including its in-memory chain forks. mem: NonFinalizedState, - // Queued Non-Finalized Blocks + // Queued Blocks // - /// Blocks awaiting their parent blocks for contextual verification. + /// Blocks for the [`NonFinalizedState`], which are awaiting their parent blocks + /// before they can do contextual verification. queued_blocks: QueuedBlocks, // Pending UTXO Request Tracking diff --git a/zebra-state/src/service/non_finalized_state/queued_blocks.rs b/zebra-state/src/service/non_finalized_state/queued_blocks.rs index b41cb32603c..292103a3bc3 100644 --- a/zebra-state/src/service/non_finalized_state/queued_blocks.rs +++ b/zebra-state/src/service/non_finalized_state/queued_blocks.rs @@ -1,12 +1,22 @@ +//! Queued blocks that are awaiting their parent block for verification. + use std::{ collections::{BTreeMap, HashMap, HashSet}, mem, }; +use tokio::sync::oneshot; use tracing::instrument; + use zebra_chain::{block, transparent}; -use crate::service::QueuedBlock; +use crate::{BoxError, PreparedBlock}; + +/// A queued non-finalized block, and its corresponding [`Result`] channel. +pub type QueuedBlock = ( + PreparedBlock, + oneshot::Sender>, +); /// A queue of blocks, awaiting the arrival of parent blocks. #[derive(Debug, Default)] @@ -27,6 +37,7 @@ impl QueuedBlocks { /// # Panics /// /// - if a block with the same `block::Hash` has already been queued. + #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))] pub fn queue(&mut self, new: QueuedBlock) { let new_hash = new.0.hash; let new_height = new.0.height; @@ -94,6 +105,7 @@ impl QueuedBlocks { /// Remove all queued blocks whose height is less than or equal to the given /// `finalized_tip_height`. + #[instrument(skip(self))] pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) { // split_off returns the values _greater than or equal to_ the key. What // we need is the keys that are less than or equal to @@ -165,11 +177,13 @@ impl QueuedBlocks { } /// Try to look up this UTXO in any queued block. + #[instrument(skip(self))] pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option { self.known_utxos.get(outpoint).cloned() } } +// TODO: move these tests into their own `tests/vectors.rs` module #[cfg(test)] mod tests { use std::sync::Arc; From 896190086b2d7b3579c0c7992d32976b9d896f6b Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 11:04:15 +1000 Subject: [PATCH 12/16] Explain how spent UTXOs are treated by the state --- zebra-state/src/service/read/block.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 9b45e29e3b0..8f1780ea37d 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -96,7 +96,12 @@ where /// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in the /// non-finalized `chain` or finalized `db`. /// -/// UTXOs may be returned regardless of whether they have been spent. +/// Non-finalized UTXOs are returned regardless of whether they have been spent. +/// +/// Finalized UTXOs are only returned if they are unspent in the finalized chain. +/// They may have been spent in the non-finalized chain, +/// but this function returns them without checking for non-finalized spends, +/// because we don't know which non-finalized chain will be committed to the finalized state. pub fn utxo(chain: Option, db: &ZebraDb, outpoint: transparent::OutPoint) -> Option where C: AsRef, @@ -115,10 +120,18 @@ where .or_else(|| db.utxo(&outpoint).map(|utxo| utxo.utxo)) } -/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in the -/// `non_finalized_state` or finalized `db`. +/// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in any chain +/// in the `non_finalized_state`, or in the finalized `db`. +/// +/// Non-finalized UTXOs are returned regardless of whether they have been spent. +/// +/// Finalized UTXOs are only returned if they are unspent in the finalized chain. +/// They may have been spent in one or more non-finalized chains, +/// but this function returns them without checking for non-finalized spends, +/// because we don't know which non-finalized chain the request belongs to. /// -/// UTXOs may be returned regardless of whether they have been spent. +/// UTXO spends are checked once the block reaches the non-finalized state, +/// by [`check::utxo::transparent_spend()`](crate::service::check::utxo::transparent_spend). pub fn any_utxo( non_finalized_state: NonFinalizedState, db: &ZebraDb, From 087995e87c42a5e9ec58a935d062f8407daf9128 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 11:16:55 +1000 Subject: [PATCH 13/16] Clarify how cached Chains impact state read requests And move repeated comments to the module header. --- .../src/service/read/address/balance.rs | 13 +++++-- zebra-state/src/service/read/address/tx_id.rs | 15 +++++--- zebra-state/src/service/read/address/utxo.rs | 15 +++++--- zebra-state/src/service/read/block.rs | 31 ++++++----------- zebra-state/src/service/read/find.rs | 34 ++++++------------- zebra-state/src/service/read/tree.rs | 18 +++++----- 6 files changed, 64 insertions(+), 62 deletions(-) diff --git a/zebra-state/src/service/read/address/balance.rs b/zebra-state/src/service/read/address/balance.rs index 81f5955b3c0..a6a7c7facbf 100644 --- a/zebra-state/src/service/read/address/balance.rs +++ b/zebra-state/src/service/read/address/balance.rs @@ -1,4 +1,14 @@ //! Reading address balances. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` from the latest chain. Then it can commit additional blocks to +//! the finalized state after we've cloned the `chain`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`], and +//! - the shared finalized [`ZebraDb`] reference. use std::{collections::HashSet, sync::Arc}; @@ -91,8 +101,7 @@ fn chain_transparent_balance_change( ) -> Amount { // # Correctness // - // The StateService commits blocks to the finalized state before updating the latest chain, - // and it can commit additional blocks after we've cloned this `chain` variable. + // Find the balance adjustment that corrects for overlapping finalized and non-finalized blocks. // Check if the finalized and non-finalized states match let required_chain_root = finalized_tip diff --git a/zebra-state/src/service/read/address/tx_id.rs b/zebra-state/src/service/read/address/tx_id.rs index 582227bf5f0..86d055eafd4 100644 --- a/zebra-state/src/service/read/address/tx_id.rs +++ b/zebra-state/src/service/read/address/tx_id.rs @@ -1,4 +1,14 @@ //! Reading address transaction IDs. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` from the latest chain. Then it can commit additional blocks to +//! the finalized state after we've cloned the `chain`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`], and +//! - the shared finalized [`ZebraDb`] reference. use std::{ collections::{BTreeMap, HashSet}, @@ -134,10 +144,7 @@ where // # Correctness // - // The StateService commits blocks to the finalized state before updating the latest chain, - // and it can commit additional blocks after we've cloned this `chain` variable. - // - // But we can compensate for addresses with mismatching blocks, + // We can compensate for addresses with mismatching blocks, // by adding the overlapping non-finalized transaction IDs. // // If there is only one address, mismatches aren't possible, diff --git a/zebra-state/src/service/read/address/utxo.rs b/zebra-state/src/service/read/address/utxo.rs index ea865cf7915..886b3df6c87 100644 --- a/zebra-state/src/service/read/address/utxo.rs +++ b/zebra-state/src/service/read/address/utxo.rs @@ -1,4 +1,14 @@ //! Transparent address index UTXO queries. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` from the latest chain. Then it can commit additional blocks to +//! the finalized state after we've cloned the `chain`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`], and +//! - the shared finalized [`ZebraDb`] reference. use std::{ collections::{BTreeMap, BTreeSet, HashSet}, @@ -234,10 +244,7 @@ where // # Correctness // - // The StateService commits blocks to the finalized state before updating the latest chain, - // and it can commit additional blocks after we've cloned this `chain` variable. - // - // But we can compensate for deleted UTXOs by applying the overlapping non-finalized UTXO changes. + // We can compensate for deleted UTXOs by applying the overlapping non-finalized UTXO changes. // Check if the finalized and non-finalized states match or overlap let required_min_non_finalized_root = finalized_tip_range.start().0 + 1; diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 8f1780ea37d..942fe40f4fa 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -1,4 +1,15 @@ //! Shared block, header, and transaction reading code. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` or `non_finalized_state` from the latest chains. Then it can +//! commit additional blocks to the finalized state after we've cloned the +//! `chain` or `non_finalized_state`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`] or [`NonFinalizedState`], and +//! - the shared finalized [`ZebraDb`] reference. use std::sync::Arc; @@ -24,10 +35,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since blocks are the same in the finalized and non-finalized state, we // check the most efficient alternative first. (`chain` is always in memory, // but `db` stores blocks on disk, with a memory cache.) @@ -50,10 +57,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since blocks are the same in the finalized and non-finalized state, we // check the most efficient alternative first. (`chain` is always in memory, // but `db` stores blocks on disk, with a memory cache.) @@ -76,10 +79,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since transactions are the same in the finalized and non-finalized state, // we check the most efficient alternative first. (`chain` is always in // memory, but `db` stores transactions on disk, with a memory cache.) @@ -108,10 +107,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since UTXOs are the same in the finalized and non-finalized state, // we check the most efficient alternative first. (`chain` is always in // memory, but `db` stores transactions on disk, with a memory cache.) @@ -139,10 +134,6 @@ pub fn any_utxo( ) -> Option { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // the `non_finalized_state`. - // // Since UTXOs are the same in the finalized and non-finalized state, // we check the most efficient alternative first. (`non_finalized_state` is always in // memory, but `db` stores transactions on disk, with a memory cache.) diff --git a/zebra-state/src/service/read/find.rs b/zebra-state/src/service/read/find.rs index 7cd28713065..2459500d279 100644 --- a/zebra-state/src/service/read/find.rs +++ b/zebra-state/src/service/read/find.rs @@ -1,4 +1,14 @@ //! Finding and reading block hashes and headers, in response to peer requests. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` from the latest chain. Then it can commit additional blocks to +//! the finalized state after we've cloned the `chain`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`], and +//! - the shared finalized [`ZebraDb`] reference. use std::{ iter, @@ -24,10 +34,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // If there is an overlap between the non-finalized and finalized states, // where the finalized tip is above the non-finalized tip, // Zebra is receiving a lot of blocks, or this request has been delayed for a long time, @@ -66,10 +72,6 @@ where // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // It is ok to do this lookup in two different calls. Finalized state updates // can only add overlapping blocks, and hashes are unique. @@ -86,10 +88,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Finalized state updates can only add overlapping blocks, and hashes are unique. chain @@ -104,10 +102,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Finalized state updates can only add overlapping blocks, and heights are unique // in the current `chain`. // @@ -128,10 +122,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Finalized state updates can only add overlapping blocks, and hashes are unique. // // If there is an overlap between the non-finalized and finalized states, @@ -158,10 +148,6 @@ where // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // It is ok to do these lookups using multiple database calls. Finalized state updates // can only add overlapping blocks, and hashes are unique. // diff --git a/zebra-state/src/service/read/tree.rs b/zebra-state/src/service/read/tree.rs index ea2cbf4df5e..7c3e69da497 100644 --- a/zebra-state/src/service/read/tree.rs +++ b/zebra-state/src/service/read/tree.rs @@ -1,4 +1,14 @@ //! Reading note commitment trees. +//! +//! In the functions in this module: +//! +//! The StateService commits blocks to the finalized state before updating +//! `chain` from the latest chain. Then it can commit additional blocks to +//! the finalized state after we've cloned the `chain`. +//! +//! This means that some blocks can be in both: +//! - the cached [`Chain`], and +//! - the shared finalized [`ZebraDb`] reference. use std::sync::Arc; @@ -22,10 +32,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since sapling treestates are the same in the finalized and non-finalized // state, we check the most efficient alternative first. (`chain` is always // in memory, but `db` stores blocks on disk, with a memory cache.) @@ -47,10 +53,6 @@ where { // # Correctness // - // The StateService commits blocks to the finalized state before updating - // the latest chain, and it can commit additional blocks after we've cloned - // this `chain` variable. - // // Since orchard treestates are the same in the finalized and non-finalized // state, we check the most efficient alternative first. (`chain` is always // in memory, but `db` stores blocks on disk, with a memory cache.) From 9ff6d8ed195bee31acac3a0a76f1cc54a06789fc Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 11:44:00 +1000 Subject: [PATCH 14/16] fastmod ChainUtxo BestChainUtxo zebra* --- zebra-state/src/request.rs | 2 +- zebra-state/src/response.rs | 4 ++-- zebra-state/src/service.rs | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 9c2de243185..dd304c7c8dd 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -553,7 +553,7 @@ pub enum ReadRequest { /// /// This request is purely informational, there is no guarantee that /// the UTXO remains unspent in the best chain. - ChainUtxo(transparent::OutPoint), + BestChainUtxo(transparent::OutPoint), /// Computes a block locator object based on the current best chain. /// diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index bce1dd2947f..0639a8afc15 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -76,12 +76,12 @@ pub enum ReadResponse { /// The response to a `FindBlockHeaders` request. BlockHeaders(Vec), - /// The response to a `ChainUtxo` request, from verified blocks in the + /// The response to a `BestChainUtxo` request, from verified blocks in the /// non-finalized chain or finalized chain. /// /// This response is purely informational, there is no guarantee that /// the UTXO remains unspent in the best chain. - ChainUtxo(Option), + BestChainUtxo(Option), /// Response to [`ReadRequest::SaplingTree`] with the specified Sapling note commitment tree. SaplingTree(Option>), diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 67c8fba9bdf..eb2c1cf045d 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1072,7 +1072,7 @@ impl Service for ReadStateService { } // Currently unused. - ReadRequest::ChainUtxo(outpoint) => { + ReadRequest::BestChainUtxo(outpoint) => { metrics::counter!( "state.requests", 1, @@ -1094,12 +1094,12 @@ impl Service for ReadStateService { ); // The work is done in the future. - timer.finish(module_path!(), line!(), "ReadRequest::ChainUtxo"); + timer.finish(module_path!(), line!(), "ReadRequest::BestChainUtxo"); - Ok(ReadResponse::ChainUtxo(utxo)) + Ok(ReadResponse::BestChainUtxo(utxo)) }) }) - .map(|join_result| join_result.expect("panic in ReadRequest::ChainUtxo")) + .map(|join_result| join_result.expect("panic in ReadRequest::BestChainUtxo")) .boxed() } From 2e4ff5485d03c95588658f9e736dc601b4638e62 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 11:49:00 +1000 Subject: [PATCH 15/16] Add an AnyChainUtxo request --- zebra-state/src/request.rs | 18 +++++++++++++----- zebra-state/src/response.rs | 16 ++++++++++++---- zebra-state/src/service.rs | 36 ++++++++++++++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index dd304c7c8dd..87dac6a70e1 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -549,12 +549,21 @@ pub enum ReadRequest { /// Looks up a UTXO identified by the given [`OutPoint`](transparent::OutPoint), /// returning `None` immediately if it is unknown. /// - /// Checks verified blocks in the finalized chain and non-finalized chain. + /// Checks verified blocks in the finalized chain and the _best_ non-finalized chain. /// /// This request is purely informational, there is no guarantee that /// the UTXO remains unspent in the best chain. BestChainUtxo(transparent::OutPoint), + /// Looks up a UTXO identified by the given [`OutPoint`](transparent::OutPoint), + /// returning `None` immediately if it is unknown. + /// + /// Checks verified blocks in the finalized chain and _all_ non-finalized chains. + /// + /// This request is purely informational, there is no guarantee that + /// the UTXO remains unspent in the best chain. + AnyChainUtxo(transparent::OutPoint), + /// Computes a block locator object based on the current best chain. /// /// Returns [`ReadResponse::BlockLocator`] with hashes starting @@ -687,10 +696,9 @@ impl TryFrom for ReadRequest { Err("ReadService does not write blocks") } - Request::AwaitUtxo(_) => { - Err("ReadService does not track pending UTXOs. \ - Manually convert the request to ReadRequest::ChainUtxo, and handle pending UTXOs.") - } + Request::AwaitUtxo(_) => Err("ReadService does not track pending UTXOs. \ + Manually convert the request to ReadRequest::AnyChainUtxo, \ + and handle pending UTXOs"), } } } diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index 0639a8afc15..ca66938b20f 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -77,12 +77,19 @@ pub enum ReadResponse { BlockHeaders(Vec), /// The response to a `BestChainUtxo` request, from verified blocks in the - /// non-finalized chain or finalized chain. + /// _best_ non-finalized chain, or the finalized chain. /// /// This response is purely informational, there is no guarantee that /// the UTXO remains unspent in the best chain. BestChainUtxo(Option), + /// The response to an `AnyChainUtxo` request, from verified blocks in + /// _any_ non-finalized chain, or the finalized chain. + /// + /// This response is purely informational, there is no guarantee that + /// the UTXO remains unspent in the best chain. + AnyChainUtxo(Option), + /// Response to [`ReadRequest::SaplingTree`] with the specified Sapling note commitment tree. SaplingTree(Option>), @@ -116,14 +123,15 @@ impl TryFrom for Response { Ok(Response::Transaction(tx_and_height.map(|(tx, _height)| tx))) } - ReadResponse::ChainUtxo(_) => Err("ReadService does not track pending UTXOs. \ - Manually unwrap the response, and handle pending UTXOs."), + ReadResponse::AnyChainUtxo(_) => Err("ReadService does not track pending UTXOs. \ + Manually unwrap the response, and handle pending UTXOs."), ReadResponse::BlockLocator(hashes) => Ok(Response::BlockLocator(hashes)), ReadResponse::BlockHashes(hashes) => Ok(Response::BlockHashes(hashes)), ReadResponse::BlockHeaders(headers) => Ok(Response::BlockHeaders(headers)), - ReadResponse::SaplingTree(_) + ReadResponse::BestChainUtxo(_) + | ReadResponse::SaplingTree(_) | ReadResponse::OrchardTree(_) | ReadResponse::AddressBalance(_) | ReadResponse::AddressesTransactionIds(_) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index eb2c1cf045d..c84de0b0d95 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -691,7 +691,7 @@ impl Service for StateService { let timer = CodeTimer::start(); - // Prepare the AwaitUtxo future. + // Prepare the AwaitUtxo future from PendingUxtos. let response_fut = self.pending_utxos.queue(outpoint); let span = Span::current(); let response_fut = response_fut.instrument(span).boxed(); @@ -1077,7 +1077,7 @@ impl Service for ReadStateService { "state.requests", 1, "service" => "read_state", - "type" => "chain_utxo", + "type" => "best_chain_utxo", ); let timer = CodeTimer::start(); @@ -1103,6 +1103,38 @@ impl Service for ReadStateService { .boxed() } + // Manually used by the StateService to implement part of AwaitUtxo. + ReadRequest::AnyChainUtxo(outpoint) => { + metrics::counter!( + "state.requests", + 1, + "service" => "read_state", + "type" => "any_chain_utxo", + ); + + let timer = CodeTimer::start(); + + let state = self.clone(); + + let span = Span::current(); + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let utxo = state.non_finalized_state_receiver.with_watch_data( + |non_finalized_state| { + read::any_utxo(non_finalized_state, &state.db, outpoint) + }, + ); + + // The work is done in the future. + timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo"); + + Ok(ReadResponse::AnyChainUtxo(utxo)) + }) + }) + .map(|join_result| join_result.expect("panic in ReadRequest::AnyChainUtxo")) + .boxed() + } + // Used by the StateService. ReadRequest::BlockLocator => { metrics::counter!( From 019a75aa737e086719e5902633256a3bf5b84d3e Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 12 Sep 2022 12:19:09 +1000 Subject: [PATCH 16/16] Make AwaitUtxo non-blocking --- zebra-state/src/service.rs | 60 +++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index c84de0b0d95..f7092282547 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -680,7 +680,7 @@ impl Service for StateService { } // Uses pending_utxos and queued_blocks in the StateService. - // Directly uses the shared NonFinalizedState in the ReadStateService. + // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. Request::AwaitUtxo(outpoint) => { metrics::counter!( "state.requests", @@ -693,10 +693,13 @@ impl Service for StateService { // Prepare the AwaitUtxo future from PendingUxtos. let response_fut = self.pending_utxos.queue(outpoint); + // Only instrument `response_fut`, the ReadStateService already + // instruments its requests with the same span. let span = Span::current(); let response_fut = response_fut.instrument(span).boxed(); - // Check the non-finalized block queue. + // Check the non-finalized block queue outside the returned future, + // so we can access mutable state fields. if let Some(utxo) = self.queued_blocks.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); @@ -712,38 +715,41 @@ impl Service for StateService { // This creates a rare race condition, but it doesn't seem to happen much in practice. // See #5126 for details. - // Send a request to the ReadStateService, to get UTXOs from the finalized chain. + // Manually send a request to the ReadStateService, + // to get UTXOs from any non-finalized chain or the finalized chain. let read_service = self.read_service.clone(); - // Optional TODO: - // - make pending_utxos.respond() async using a channel, - // so we can use ReadRequest::ChainUtxo here, and avoid a block_in_place(). + // Run the request in an async block, so we can await the response. + async move { + let req = ReadRequest::AnyChainUtxo(outpoint); - // Check any non-finalized chain, and the finalized chain. - let span = Span::current(); - let utxo = tokio::task::block_in_place(move || { - span.in_scope(move || { - read_service.non_finalized_state_receiver.with_watch_data( - |non_finalized_state| { - read::any_utxo(non_finalized_state, &read_service.db, outpoint) - }, - ) - }) - }); + let rsp = read_service.oneshot(req).await?; - if let Some(utxo) = utxo { - self.pending_utxos.respond(&outpoint, utxo); + // Optional TODO: + // - make pending_utxos.respond() async using a channel, + // so we can respond to all waiting requests here + // + // This change is not required for correctness, because: + // - any waiting requests should have returned when the block was sent to the state + // - otherwise, the request returns immediately if: + // - the block is in the non-finalized queue, or + // - the block is in any non-finalized chain or the finalized state + // + // And if the block is in the finalized queue, + // that's rare enough that a retry is ok. + if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp { + // We got a UTXO, so we replace the response future with the result own. + timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain"); + + return Ok(Response::Utxo(utxo)); + } - // We're finished, the returned future gets the UTXO from the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain"); + // We're finished, but the returned future is waiting on the respond() channel. + timer.finish(module_path!(), line!(), "AwaitUtxo/waiting"); - return response_fut; + response_fut.await } - - // We're finished, but the returned future is waiting on the respond() channel. - timer.finish(module_path!(), line!(), "AwaitUtxo/waiting"); - - response_fut + .boxed() } // TODO: add a name() method to Request, and combine all the generic read requests