Skip to content

Commit

Permalink
Add a best_tip_changed() method to trait ChainTip
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Dec 14, 2022
1 parent 5d7a967 commit 2ad7964
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 11 deletions.
54 changes: 50 additions & 4 deletions zebra-chain/src/chain_tip.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Zebra interfaces for access to chain tip information.
use std::sync::Arc;
use std::{future, sync::Arc};

use chrono::{DateTime, Utc};
use futures::{future::BoxFuture, Future, FutureExt};

use crate::{block, parameters::Network, transaction};
use crate::{block, parameters::Network, transaction, BoxError};

mod network_chain_tip_height_estimator;

Expand All @@ -18,8 +19,8 @@ use network_chain_tip_height_estimator::NetworkChainTipHeightEstimator;
/// An interface for querying the chain tip.
///
/// This trait helps avoid dependencies between:
/// * zebra-chain and tokio
/// * zebra-network and zebra-state
/// * `zebra-chain` and `tokio`
/// * `zebra-network` and `zebra-state`
pub trait ChainTip {
/// Return the height of the best chain tip.
fn best_tip_height(&self) -> Option<block::Height>;
Expand All @@ -44,6 +45,20 @@ pub trait ChainTip {
/// even if their authorizing data is different.
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]>;

/// A future that returns when the best chain tip changes.
/// Can return immediately if the latest value in this [`ChainTip`] has not been seen yet.
///
/// Returns an error if Zebra is shutting down, or the state has permanently failed.
///
/// See [`tokio::watch::Receiver::changed()`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.changed) for details.
//
// TODO:
// Use async_fn_in_trait or return_position_impl_trait_in_trait when one of them stabilises:
// https://github.com/rust-lang/rust/issues/91611
fn best_tip_changed(&self) -> BestTipChanged;

// Provided methods
//
/// Return an estimate of the network chain tip's height.
///
/// The estimate is calculated based on the current local time, the block time of the best tip
Expand Down Expand Up @@ -84,6 +99,33 @@ pub trait ChainTip {
}
}

/// A future for the [`ChainTip::best_tip_changed()`] method.
/// See that method for details.
pub struct BestTipChanged<'f> {
fut: BoxFuture<'f, Result<(), BoxError>>,
}

impl<'f> BestTipChanged<'f> {
/// Returns a new [`BestTipChanged`] containing `fut`.
pub fn new<Fut>(fut: Fut) -> Self
where
Fut: Future<Output = Result<(), BoxError>> + Send + 'f,
{
Self { fut: Box::pin(fut) }
}
}

impl<'f> Future for BestTipChanged<'f> {
type Output = Result<(), BoxError>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.fut.poll_unpin(cx)
}
}

/// A chain tip that is always empty.
///
/// Used in production for isolated network connections,
Expand Down Expand Up @@ -115,4 +157,8 @@ impl ChainTip for NoChainTip {
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
Arc::new([])
}

fn best_tip_changed(&self) -> BestTipChanged {
BestTipChanged::new(future::pending())
}
}
41 changes: 40 additions & 1 deletion zebra-chain/src/chain_tip/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@
use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::{future, FutureExt, TryFutureExt};
use tokio::sync::watch;

use crate::{block, chain_tip::ChainTip, parameters::Network, transaction};
use crate::{
block,
chain_tip::{BestTipChanged, ChainTip},
parameters::Network,
transaction,
};

/// A sender to sets the values read by a [`MockChainTip`].
//
// Update `best_tip_changed()` for each new field that is added to MockChainTipSender.
pub struct MockChainTipSender {
/// A sender that sets the `best_tip_height` of a [`MockChainTip`].
best_tip_height: watch::Sender<Option<block::Height>>,
Expand Down Expand Up @@ -112,6 +120,37 @@ impl ChainTip for MockChainTip {
.map(|tip_height| (estimated_distance, tip_height))
})
}

/// Returns when any sender channel changes.
//
// Update this method when each new mock field is added.
fn best_tip_changed(&self) -> BestTipChanged {
// Clone all the watch channels
let mut best_tip_height = self.best_tip_height.clone();
let mut best_tip_hash = self.best_tip_hash.clone();
let mut best_tip_block_time = self.best_tip_block_time.clone();
let mut estimated_distance_to_network_chain_tip =
self.estimated_distance_to_network_chain_tip.clone();

// Move them into an async block, to manage lifetimes
let select_changed = async move {
// Get the first watch channel that has changed
future::select_all([
// Erase the differing future types for each channel, and map their error types
BestTipChanged::new(best_tip_height.changed().err_into()),
BestTipChanged::new(best_tip_hash.changed().err_into()),
BestTipChanged::new(best_tip_block_time.changed().err_into()),
BestTipChanged::new(estimated_distance_to_network_chain_tip.changed().err_into()),
])
// Map the select result to the expected type, dropping the unused channels,
// any removing any dependencies on their lifetimes
.map(|(changed_result, _changed_index, _remaining_futures)| changed_result)
.await
};

// Erase the un-nameable type of the async block
BestTipChanged::new(select_changed)
}
}

impl MockChainTipSender {
Expand Down
33 changes: 27 additions & 6 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@
use std::{fmt, sync::Arc};

use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use tokio::sync::watch;
use tracing::{field, instrument};

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

#[cfg(any(test, feature = "proptest-impl"))]
use zebra_chain::serialization::arbitrary::datetime_full;
use zebra_chain::{
block,
chain_tip::ChainTip,
chain_tip::{BestTipChanged, ChainTip},
parameters::{Network, NetworkUpgrade},
transaction::{self, Transaction},
};
Expand All @@ -29,6 +25,12 @@ use crate::{

use TipAction::*;

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

#[cfg(any(test, feature = "proptest-impl"))]
use zebra_chain::serialization::arbitrary::datetime_full;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -387,6 +389,25 @@ impl ChainTip for LatestChainTip {
self.with_chain_tip_block(|block| block.transaction_hashes.clone())
.unwrap_or_else(|| Arc::new([]))
}

/// Returns when the state tip changes.
#[instrument(skip(self))]
fn best_tip_changed(&self) -> BestTipChanged {
// The changed() future doesn't lock the value,
// so we don't need to use `with_chain_tip_block()` here.
//
// Clone the watch channel
let mut best_tip = self.receiver.clone();

// Move it into an async block, to manage lifetimes
let best_tip_changed = async move {
// Map its error type
best_tip.changed().err_into().await
};

// Erase the un-nameable type of the async block
BestTipChanged::new(best_tip_changed)
}
}

/// A chain tip change monitor.
Expand Down

0 comments on commit 2ad7964

Please sign in to comment.