diff --git a/zebra-chain/src/chain_tip.rs b/zebra-chain/src/chain_tip.rs index ffb3dd61b31..0c8f2469020 100644 --- a/zebra-chain/src/chain_tip.rs +++ b/zebra-chain/src/chain_tip.rs @@ -1,10 +1,11 @@ //! Zebra interfaces for access to chain tip information. -use std::sync::Arc; +use std::{future, sync::Arc}; use chrono::{DateTime, Utc}; +use futures::{future::BoxFuture, Future, FutureExt}; -use crate::{block, parameters::Network, transaction}; +use crate::{block, parameters::Network, transaction, BoxError}; mod network_chain_tip_height_estimator; @@ -18,8 +19,8 @@ use network_chain_tip_height_estimator::NetworkChainTipHeightEstimator; /// An interface for querying the chain tip. /// /// This trait helps avoid dependencies between: -/// * zebra-chain and tokio -/// * zebra-network and zebra-state +/// * `zebra-chain` and `tokio` +/// * `zebra-network` and `zebra-state` pub trait ChainTip { /// Return the height of the best chain tip. fn best_tip_height(&self) -> Option; @@ -44,6 +45,20 @@ pub trait ChainTip { /// even if their authorizing data is different. fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]>; + /// A future that returns when the best chain tip changes. + /// Can return immediately if the latest value in this [`ChainTip`] has not been seen yet. + /// + /// Returns an error if Zebra is shutting down, or the state has permanently failed. + /// + /// See [`tokio::watch::Receiver::changed()`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.changed) for details. + // + // TODO: + // Use async_fn_in_trait or return_position_impl_trait_in_trait when one of them stabilises: + // https://github.com/rust-lang/rust/issues/91611 + fn best_tip_changed(&self) -> BestTipChanged; + + // Provided methods + // /// Return an estimate of the network chain tip's height. /// /// The estimate is calculated based on the current local time, the block time of the best tip @@ -84,6 +99,33 @@ pub trait ChainTip { } } +/// A future for the [`ChainTip::best_tip_changed()`] method. +/// See that method for details. +pub struct BestTipChanged<'f> { + fut: BoxFuture<'f, Result<(), BoxError>>, +} + +impl<'f> BestTipChanged<'f> { + /// Returns a new [`BestTipChanged`] containing `fut`. + pub fn new(fut: Fut) -> Self + where + Fut: Future> + Send + 'f, + { + Self { fut: Box::pin(fut) } + } +} + +impl<'f> Future for BestTipChanged<'f> { + type Output = Result<(), BoxError>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.fut.poll_unpin(cx) + } +} + /// A chain tip that is always empty. /// /// Used in production for isolated network connections, @@ -115,4 +157,8 @@ impl ChainTip for NoChainTip { fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> { Arc::new([]) } + + fn best_tip_changed(&self) -> BestTipChanged { + BestTipChanged::new(future::pending()) + } } diff --git a/zebra-chain/src/chain_tip/mock.rs b/zebra-chain/src/chain_tip/mock.rs index fec056a3749..2453bcde172 100644 --- a/zebra-chain/src/chain_tip/mock.rs +++ b/zebra-chain/src/chain_tip/mock.rs @@ -3,11 +3,19 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; +use futures::{future, FutureExt, TryFutureExt}; use tokio::sync::watch; -use crate::{block, chain_tip::ChainTip, parameters::Network, transaction}; +use crate::{ + block, + chain_tip::{BestTipChanged, ChainTip}, + parameters::Network, + transaction, +}; /// A sender to sets the values read by a [`MockChainTip`]. +// +// Update `best_tip_changed()` for each new field that is added to MockChainTipSender. pub struct MockChainTipSender { /// A sender that sets the `best_tip_height` of a [`MockChainTip`]. best_tip_height: watch::Sender>, @@ -112,6 +120,37 @@ impl ChainTip for MockChainTip { .map(|tip_height| (estimated_distance, tip_height)) }) } + + /// Returns when any sender channel changes. + // + // Update this method when each new mock field is added. + fn best_tip_changed(&self) -> BestTipChanged { + // Clone all the watch channels + let mut best_tip_height = self.best_tip_height.clone(); + let mut best_tip_hash = self.best_tip_hash.clone(); + let mut best_tip_block_time = self.best_tip_block_time.clone(); + let mut estimated_distance_to_network_chain_tip = + self.estimated_distance_to_network_chain_tip.clone(); + + // Move them into an async block, to manage lifetimes + let select_changed = async move { + // Get the first watch channel that has changed + future::select_all([ + // Erase the differing future types for each channel, and map their error types + BestTipChanged::new(best_tip_height.changed().err_into()), + BestTipChanged::new(best_tip_hash.changed().err_into()), + BestTipChanged::new(best_tip_block_time.changed().err_into()), + BestTipChanged::new(estimated_distance_to_network_chain_tip.changed().err_into()), + ]) + // Map the select result to the expected type, dropping the unused channels, + // any removing any dependencies on their lifetimes + .map(|(changed_result, _changed_index, _remaining_futures)| changed_result) + .await + }; + + // Erase the un-nameable type of the async block + BestTipChanged::new(select_changed) + } } impl MockChainTipSender { diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index db85536fbae..bece7670202 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -8,17 +8,13 @@ use std::{fmt, sync::Arc}; use chrono::{DateTime, Utc}; +use futures::TryFutureExt; use tokio::sync::watch; use tracing::{field, instrument}; -#[cfg(any(test, feature = "proptest-impl"))] -use proptest_derive::Arbitrary; - -#[cfg(any(test, feature = "proptest-impl"))] -use zebra_chain::serialization::arbitrary::datetime_full; use zebra_chain::{ block, - chain_tip::ChainTip, + chain_tip::{BestTipChanged, ChainTip}, parameters::{Network, NetworkUpgrade}, transaction::{self, Transaction}, }; @@ -29,6 +25,12 @@ use crate::{ use TipAction::*; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + +#[cfg(any(test, feature = "proptest-impl"))] +use zebra_chain::serialization::arbitrary::datetime_full; + #[cfg(test)] mod tests; @@ -387,6 +389,25 @@ impl ChainTip for LatestChainTip { self.with_chain_tip_block(|block| block.transaction_hashes.clone()) .unwrap_or_else(|| Arc::new([])) } + + /// Returns when the state tip changes. + #[instrument(skip(self))] + fn best_tip_changed(&self) -> BestTipChanged { + // The changed() future doesn't lock the value, + // so we don't need to use `with_chain_tip_block()` here. + // + // Clone the watch channel + let mut best_tip = self.receiver.clone(); + + // Move it into an async block, to manage lifetimes + let best_tip_changed = async move { + // Map its error type + best_tip.changed().err_into().await + }; + + // Erase the un-nameable type of the async block + BestTipChanged::new(best_tip_changed) + } } /// A chain tip change monitor.