Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6. refactor(state): prepare finalized state for shared read-only access #3810

Merged
merged 7 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub use service::{

#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::populated_state,
chain_tip::{ChainTipBlock, ChainTipSender},
init_test, populated_state,
init_test,
};

pub(crate) use request::ContextuallyValidBlock;
200 changes: 21 additions & 179 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use std::{
time::{Duration, Instant},
};

use futures::{future::FutureExt, stream::FuturesUnordered};
use futures::future::FutureExt;
use tokio::sync::oneshot;
use tower::{util::BoxService, Service, ServiceExt};
use tower::{util::BoxService, Service};
use tracing::instrument;

#[cfg(any(test, feature = "proptest-impl"))]
Expand All @@ -24,18 +24,23 @@ use zebra_chain::{
};

use crate::{
constants, request::HashOrHeight, service::chain_tip::ChainTipBlock, BoxError, CloneError,
request::HashOrHeight, service::chain_tip::ChainTipBlock, BoxError, CloneError,
CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request, Response,
ValidateContextError,
};

use self::{
chain_tip::{ChainTipChange, ChainTipSender, LatestChainTip},
finalized_state::FinalizedState,
non_finalized_state::{NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos,
};

pub mod block_iter;
pub mod chain_tip;

pub(crate) mod check;

mod finalized_state;
mod non_finalized_state;
mod pending_utxos;
Expand All @@ -46,8 +51,6 @@ pub mod arbitrary;
#[cfg(test)]
mod tests;

use self::{finalized_state::FinalizedState, pending_utxos::PendingUtxos};

pub type QueuedBlock = (
PreparedBlock,
oneshot::Sender<Result<block::Hash, BoxError>>,
Expand All @@ -57,6 +60,15 @@ pub type QueuedFinalized = (
oneshot::Sender<Result<block::Hash, BoxError>>,
);

/// A read-write service for Zebra's cached blockchain state.
///
/// This service modifies and provides access to:
/// - the non-finalized state: the ~100 most recent blocks.
/// Zebra allows chain forks in the non-finalized state,
/// stores it in memory, and re-downloads it when restarted.
/// - the finalized state: older blocks that have many confirmations.
/// Zebra stores the single best chain in the finalized state,
/// and re-loads it from disk when restarted.
pub(crate) struct StateService {
/// Holds data relating to finalized chain state.
pub(crate) disk: FinalizedState,
Expand Down Expand Up @@ -103,7 +115,7 @@ impl StateService {
tracing::info!("starting legacy chain check");
if let Some(tip) = state.best_tip() {
if let Some(nu5_activation_height) = NetworkUpgrade::Nu5.activation_height(network) {
if legacy_chain_check(
if check::legacy_chain(
nu5_activation_height,
state.any_ancestor_blocks(tip.1),
state.network,
Expand Down Expand Up @@ -401,10 +413,10 @@ impl StateService {
///
/// The block identified by `hash` is included in the chain of blocks yielded
/// by the iterator. `hash` can come from any chain.
pub fn any_ancestor_blocks(&self, hash: block::Hash) -> Iter<'_> {
Iter {
pub fn any_ancestor_blocks(&self, hash: block::Hash) -> block_iter::Iter<'_> {
block_iter::Iter {
service: self,
state: IterState::NonFinalized(hash),
state: block_iter::IterState::NonFinalized(hash),
}
}

Expand Down Expand Up @@ -569,98 +581,6 @@ impl StateService {
}
}

pub(crate) struct Iter<'a> {
service: &'a StateService,
state: IterState,
}

enum IterState {
NonFinalized(block::Hash),
Finalized(block::Height),
Finished,
}

impl Iter<'_> {
fn next_non_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;

let hash = match state {
IterState::NonFinalized(hash) => *hash,
IterState::Finalized(_) | IterState::Finished => unreachable!(),
};

if let Some(block) = service.mem.any_block_by_hash(hash) {
let hash = block.header.previous_block_hash;
self.state = IterState::NonFinalized(hash);
Some(block)
} else {
None
}
}

fn next_finalized_block(&mut self) -> Option<Arc<Block>> {
let Iter { service, state } = self;

let hash_or_height: HashOrHeight = match *state {
IterState::Finalized(height) => height.into(),
IterState::NonFinalized(hash) => hash.into(),
IterState::Finished => unreachable!(),
};

if let Some(block) = service.disk.block(hash_or_height) {
let height = block
.coinbase_height()
.expect("valid blocks have a coinbase height");

if let Some(next_height) = height - 1 {
self.state = IterState::Finalized(next_height);
} else {
self.state = IterState::Finished;
}

Some(block)
} else {
self.state = IterState::Finished;
None
}
}
}

impl Iterator for Iter<'_> {
type Item = Arc<Block>;

fn next(&mut self) -> Option<Self::Item> {
match self.state {
IterState::NonFinalized(_) => self
.next_non_finalized_block()
.or_else(|| self.next_finalized_block()),
IterState::Finalized(_) => self.next_finalized_block(),
IterState::Finished => None,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}

impl std::iter::FusedIterator for Iter<'_> {}

impl ExactSizeIterator for Iter<'_> {
fn len(&self) -> usize {
match self.state {
IterState::NonFinalized(hash) => self
.service
.any_height_by_hash(hash)
.map(|height| (height.0 + 1) as _)
.unwrap_or(0),
IterState::Finalized(height) => (height.0 + 1) as _,
IterState::Finished => 0,
}
}
}

impl Service<Request> for StateService {
type Response = Response;
type Error = BoxError;
Expand Down Expand Up @@ -842,81 +762,3 @@ pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxEr

Buffer::new(BoxService::new(state_service), 1)
}

/// Initialize a state service with blocks.
#[cfg(any(test, feature = "proptest-impl"))]
pub async fn populated_state(
blocks: impl IntoIterator<Item = Arc<Block>>,
network: Network,
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let requests = blocks
.into_iter()
.map(|block| Request::CommitFinalizedBlock(block.into()));

let mut state = init_test(network);

let mut responses = FuturesUnordered::new();

for request in requests {
let rsp = state.ready().await.unwrap().call(request);
responses.push(rsp);
}

use futures::StreamExt;
while let Some(rsp) = responses.next().await {
rsp.expect("blocks should commit just fine");
}

state
}

/// Check if zebra is following a legacy chain and return an error if so.
fn legacy_chain_check<I>(
nu5_activation_height: block::Height,
ancestors: I,
network: Network,
) -> Result<(), BoxError>
where
I: Iterator<Item = Arc<Block>>,
{
for (count, block) in ancestors.enumerate() {
// Stop checking if the chain reaches Canopy. We won't find any more V5 transactions,
// so the rest of our checks are useless.
//
// If the cached tip is close to NU5 activation, but there aren't any V5 transactions in the
// chain yet, we could reach MAX_BLOCKS_TO_CHECK in Canopy, and incorrectly return an error.
if block
.coinbase_height()
.expect("valid blocks have coinbase heights")
< nu5_activation_height
{
return Ok(());
}

// If we are past our NU5 activation height, but there are no V5 transactions in recent blocks,
// the Zebra instance that verified those blocks had no NU5 activation height.
if count >= constants::MAX_LEGACY_CHAIN_BLOCKS {
return Err("giving up after checking too many blocks".into());
}

// If a transaction `network_upgrade` field is different from the network upgrade calculated
// using our activation heights, the Zebra instance that verified those blocks had different
// network upgrade heights.
block
.check_transaction_network_upgrade_consistency(network)
.map_err(|_| "inconsistent network upgrade found in transaction")?;

// If we find at least one transaction with a valid `network_upgrade` field, the Zebra instance that
// verified those blocks used the same network upgrade heights. (Up to this point in the chain.)
let has_network_upgrade = block
.transactions
.iter()
.find_map(|trans| trans.network_upgrade())
.is_some();
if has_network_upgrade {
return Ok(());
}
}

Ok(())
}
40 changes: 36 additions & 4 deletions zebra-state/src/service/arbitrary.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
//! Arbitrary data generation and test setup for Zebra's state.

use std::sync::Arc;

use futures::{stream::FuturesUnordered, StreamExt};
use proptest::{
num::usize::BinarySearch,
prelude::*,
strategy::{NewTree, ValueTree},
test_runner::TestRunner,
};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};

use zebra_chain::{
block::Block, fmt::SummaryDebug, history_tree::HistoryTree, parameters::NetworkUpgrade,
block::Block,
fmt::SummaryDebug,
history_tree::HistoryTree,
parameters::{Network, NetworkUpgrade},
LedgerState,
};

use crate::arbitrary::Prepare;

use super::*;
use crate::{
arbitrary::Prepare, init_test, service::check, BoxError, PreparedBlock, Request, Response,
};

pub use zebra_chain::block::arbitrary::MAX_PARTIAL_CHAIN_BLOCKS;

Expand Down Expand Up @@ -158,3 +165,28 @@ impl Strategy for PreparedChain {
})
}
}

/// Initialize a state service with blocks.
pub async fn populated_state(
blocks: impl IntoIterator<Item = Arc<Block>>,
network: Network,
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let requests = blocks
.into_iter()
.map(|block| Request::CommitFinalizedBlock(block.into()));

let mut state = init_test(network);

let mut responses = FuturesUnordered::new();

for request in requests {
let rsp = state.ready().await.unwrap().call(request);
responses.push(rsp);
}

while let Some(rsp) = responses.next().await {
rsp.expect("blocks should commit just fine");
}

state
}
Loading