Skip to content

Commit

Permalink
Subscribe to justifications in Millau->Rialto headers sync (paritytec…
Browse files Browse the repository at this point in the history
…h#394)

* maintain MillauHeadersToRialto sync by subscribing to Millau justifications

* more tracing in maintain

* Update relays/substrate/src/headers_maintain.rs

Co-authored-by: Hernando Castano <[email protected]>

* Update relays/substrate/src/headers_maintain.rs

Co-authored-by: Hernando Castano <[email protected]>

* -Please

* -TODO

* revert raise recursion limit

* updated comment

Co-authored-by: Hernando Castano <[email protected]>
  • Loading branch information
2 people authored and bkchr committed Apr 10, 2024
1 parent e2d9b63 commit 1b96e51
Show file tree
Hide file tree
Showing 19 changed files with 508 additions and 33 deletions.
3 changes: 2 additions & 1 deletion bridges/bin/millau/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ impl pallet_sudo::Trait for Runtime {
}

parameter_types! {
pub const Period: BlockNumber = 4;
/// Authorities are changing every 5 minutes.
pub const Period: BlockNumber = 5 * MINUTES;
pub const Offset: BlockNumber = 0;
}

Expand Down
2 changes: 1 addition & 1 deletion bridges/bin/rialto/runtime/src/millau.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn initial_header() -> Header {
Header {
parent_hash: Default::default(),
number: Default::default(),
state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(),
state_root: hex!("234a17bbd3fbaff8f0a799a6c8f0bdba1979e242fb2ed66d15945acb84947cbd").into(),
extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(),
digest: Default::default(),
}
Expand Down
9 changes: 9 additions & 0 deletions bridges/modules/substrate/src/justification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ pub enum Error {
InvalidPrecommitAncestries,
}

/// Decode justification target.
pub fn decode_justification_target<Header: HeaderT>(
raw_justification: &[u8],
) -> Result<(Header::Hash, Header::Number), Error> {
GrandpaJustification::<Header>::decode(&mut &raw_justification[..])
.map(|justification| (justification.commit.target_hash, justification.commit.target_number))
.map_err(|_| Error::JustificationDecode)
}

/// Verify that justification, that is generated by given authority set, finalizes given header.
pub fn verify_justification<Header: HeaderT>(
finalized_target: (Header::Hash, Header::Number),
Expand Down
2 changes: 2 additions & 0 deletions bridges/modules/substrate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use sp_std::{marker::PhantomData, prelude::*};
// Re-export since the node uses these when configuring genesis
pub use storage::{AuthoritySet, ScheduledChange};

pub use justification::decode_justification_target;

mod justification;
mod storage;
mod storage_proof;
Expand Down
2 changes: 2 additions & 0 deletions bridges/primitives/millau/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ impl Chain for Millau {

/// Name of the `MillauHeaderApi::best_block` runtime method.
pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block";
/// Name of the `MillauHeaderApi::finalized_block` runtime method.
pub const FINALIZED_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_finalized_block";
/// Name of the `MillauHeaderApi::is_known_block` runtime method.
pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block";
/// Name of the `MillauHeaderApi::incomplete_headers` runtime method.
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
consts::ETHEREUM_TICK_INTERVAL,
target,
consts::SUBSTRATE_TICK_INTERVAL,
(),
sync_params,
metrics_params,
futures::future::pending(),
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
consts::SUBSTRATE_TICK_INTERVAL,
target,
consts::ETHEREUM_TICK_INTERVAL,
(),
sync_params,
metrics_params,
futures::future::pending(),
Expand Down
5 changes: 5 additions & 0 deletions bridges/relays/headers-relay/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
}
}

/// Returns true if given header requires completion data.
pub fn requires_completion_data(&self, id: &HeaderIdOf<P>) -> bool {
self.incomplete_headers.contains_key(id)
}

/// Returns id of the header for which we want to fetch completion data.
pub fn incomplete_header(&mut self) -> Option<HeaderIdOf<P>> {
queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| {
Expand Down
42 changes: 35 additions & 7 deletions bridges/relays/headers-relay/src/sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Entrypoint for running headers synchronization loop.
use crate::sync::HeadersSyncParams;
use crate::sync::{HeadersSync, HeadersSyncParams};
use crate::sync_loop_metrics::SyncLoopMetrics;
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};

Expand Down Expand Up @@ -48,10 +48,12 @@ const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// Delay after we have seen update of best source header at target node,
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Interval between calling sync maintain procedure.
const MAINTAIN_INTERVAL: Duration = Duration::from_secs(30);

/// Source client trait.
#[async_trait]
pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
pub trait SourceClient<P: HeadersSyncPipeline> {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;

Expand All @@ -78,7 +80,7 @@ pub trait SourceClient<P: HeadersSyncPipeline>: Sized {

/// Target client trait.
#[async_trait]
pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
pub trait TargetClient<P: HeadersSyncPipeline> {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;

Expand All @@ -102,12 +104,24 @@ pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
async fn requires_extra(&self, header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, bool), Self::Error>;
}

/// Synchronization maintain procedure.
#[async_trait]
pub trait SyncMaintain<P: HeadersSyncPipeline>: Send + Sync {
/// Run custom maintain procedures. This is guaranteed to be called when both source and target
/// clients are unoccupied.
async fn maintain(&self, _sync: &mut HeadersSync<P>) {}
}

impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {}

/// Run headers synchronization.
#[allow(clippy::too_many_arguments)]
pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
source_client: impl SourceClient<P>,
source_tick: Duration,
target_client: TC,
target_tick: Duration,
sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
Expand All @@ -116,7 +130,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut progress_context = (Instant::now(), None, None);

local_pool.run_until(async move {
let mut sync = crate::sync::HeadersSync::<P>::new(sync_params);
let mut sync = HeadersSync::<P>::new(sync_params);
let mut stall_countdown = None;
let mut last_update_time = Instant::now();

Expand Down Expand Up @@ -154,6 +168,9 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick).fuse();

let mut maintain_required = false;
let maintain_stream = interval(MAINTAIN_INTERVAL).fuse();

let exit_signal = exit_signal.fuse();

futures::pin_mut!(
Expand All @@ -172,6 +189,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_complete_header_future,
target_go_offline_future,
target_tick_stream,
maintain_stream,
exit_signal
);

Expand Down Expand Up @@ -373,6 +391,9 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_incomplete_headers_required = true;
},

_ = maintain_stream.next() => {
maintain_required = true;
},
_ = exit_signal => {
return;
}
Expand All @@ -387,9 +408,16 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
// print progress
progress_context = print_sync_progress(progress_context, &sync);

// run maintain procedures
if maintain_required && source_client_is_online && target_client_is_online {
log::debug!(target: "bridge", "Maintaining headers sync loop");
maintain_required = false;
sync_maintain.maintain(&mut sync).await;
}

// If the target client is accepting requests we update the requests that
// we want it to run
if target_client_is_online {
if !maintain_required && target_client_is_online {
// NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same
Expand Down Expand Up @@ -476,7 +504,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(

// If the source client is accepting requests we update the requests that
// we want it to run
if source_client_is_online {
if !maintain_required && source_client_is_online {
// NOTE: Is is important to reset this so that we only have one
// request being processed by the client at a time. This prevents
// race conditions like receiving two transactions with the same
Expand Down Expand Up @@ -561,7 +589,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
/// Print synchronization progress.
fn print_sync_progress<P: HeadersSyncPipeline>(
progress_context: (Instant, Option<P::Number>, Option<P::Number>),
eth_sync: &crate::sync::HeadersSync<P>,
eth_sync: &HeadersSync<P>,
) -> (Instant, Option<P::Number>, Option<P::Number>) {
let (prev_time, prev_best_header, prev_target_header) = progress_context;
let now_time = Instant::now();
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/headers-relay/src/sync_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
test_tick(),
target,
test_tick(),
(),
crate::sync::tests::default_sync_params(),
None,
exit_receiver.into_future().map(|(_, _)| ()),
Expand Down
4 changes: 2 additions & 2 deletions bridges/relays/headers-relay/src/sync_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ pub trait HeadersSyncPipeline: Clone + Send + Sync {
/// 4) header and extra data are submitted in single transaction.
///
/// Example: Ethereum transactions receipts.
type Extra: Clone + PartialEq + std::fmt::Debug;
type Extra: Clone + Send + Sync + PartialEq + std::fmt::Debug;
/// Type of data required to 'complete' header that we're receiving from the source node:
/// 1) completion data is required for some headers;
/// 2) target node can't answer if it'll require completion data before header is accepted;
/// 3) completion data may be generated after header generation;
/// 4) header and completion data are submitted in separate transactions.
///
/// Example: Substrate GRANDPA justifications.
type Completion: Clone + std::fmt::Debug;
type Completion: Clone + Send + Sync + std::fmt::Debug;

/// Function used to estimate size of target-encoded header.
fn estimate_size(source: &QueuedHeader<Self>) -> usize;
Expand Down
17 changes: 16 additions & 1 deletion bridges/relays/substrate-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ use crate::{ConnectionParams, Result};
use jsonrpsee::common::DeserializeOwned;
use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::ws::WsTransportClient;
use jsonrpsee::Client as RpcClient;
use jsonrpsee::{client::Subscription, Client as RpcClient};
use num_traits::Zero;
use sp_core::Bytes;

const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";

/// Opaque justifications subscription type.
pub type JustificationsSubscription = Subscription<Bytes>;

/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;

Expand Down Expand Up @@ -135,4 +138,16 @@ where
.await
.map_err(Into::into)
}

/// Return new justifications stream.
pub async fn subscribe_justifications(self) -> Result<JustificationsSubscription> {
Ok(self
.client
.subscribe(
"grandpa_subscribeJustifications",
jsonrpsee::common::Params::None,
"grandpa_unsubscribeJustifications",
)
.await?)
}
}
2 changes: 1 addition & 1 deletion bridges/relays/substrate-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub enum Error {
/// Web socket connection error.
WsConnectionError(WsNewDnsError),
/// An error that can occur when making an HTTP request to
/// An error that can occur when making a request to
/// an JSON-RPC server.
Request(RequestError),
/// The response from the server could not be SCALE decoded.
Expand Down
2 changes: 1 addition & 1 deletion bridges/relays/substrate-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod rpc;
pub mod headers_source;

pub use crate::chain::{BlockWithJustification, Chain, TransactionSignScheme};
pub use crate::client::{Client, OpaqueGrandpaAuthoritiesSet};
pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet};
pub use crate::error::{Error, Result};
pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf};

Expand Down
1 change: 1 addition & 0 deletions bridges/relays/substrate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bp-millau = { path = "../../primitives/millau" }
bp-rialto = { path = "../../primitives/rialto" }
headers-relay = { path = "../headers-relay" }
messages-relay = { path = "../messages-relay" }
pallet-substrate-bridge = { path = "../../modules/substrate" }
relay-millau-client = { path = "../millau-client" }
relay-rialto-client = { path = "../rialto-client" }
relay-substrate-client = { path = "../substrate-client" }
Expand Down
Loading

0 comments on commit 1b96e51

Please sign in to comment.