From 1b96e5167976a6d905942622981753bec246869a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 26 Oct 2020 19:36:29 +0300 Subject: [PATCH] Subscribe to justifications in Millau->Rialto headers sync (#394) * maintain MillauHeadersToRialto sync by subscribing to Millau justifications * more tracing in maintain * Update relays/substrate/src/headers_maintain.rs Co-authored-by: Hernando Castano * Update relays/substrate/src/headers_maintain.rs Co-authored-by: Hernando Castano * -Please * -TODO * revert raise recursion limit * updated comment Co-authored-by: Hernando Castano --- bridges/bin/millau/runtime/src/lib.rs | 3 +- bridges/bin/rialto/runtime/src/millau.rs | 2 +- .../modules/substrate/src/justification.rs | 9 + bridges/modules/substrate/src/lib.rs | 2 + bridges/primitives/millau/src/lib.rs | 2 + .../relays/ethereum/src/ethereum_sync_loop.rs | 1 + .../ethereum/src/substrate_sync_loop.rs | 1 + bridges/relays/headers-relay/src/headers.rs | 5 + bridges/relays/headers-relay/src/sync_loop.rs | 42 +- .../headers-relay/src/sync_loop_tests.rs | 1 + .../relays/headers-relay/src/sync_types.rs | 4 +- bridges/relays/substrate-client/src/client.rs | 17 +- bridges/relays/substrate-client/src/error.rs | 2 +- bridges/relays/substrate-client/src/lib.rs | 2 +- bridges/relays/substrate/Cargo.toml | 1 + .../relays/substrate/src/headers_maintain.rs | 382 ++++++++++++++++++ .../relays/substrate/src/headers_target.rs | 2 + bridges/relays/substrate/src/main.rs | 3 +- .../substrate/src/millau_headers_to_rialto.rs | 60 ++- 19 files changed, 508 insertions(+), 33 deletions(-) create mode 100644 bridges/relays/substrate/src/headers_maintain.rs diff --git a/bridges/bin/millau/runtime/src/lib.rs b/bridges/bin/millau/runtime/src/lib.rs index dd8b0aa79528..9b5692647b42 100644 --- a/bridges/bin/millau/runtime/src/lib.rs +++ b/bridges/bin/millau/runtime/src/lib.rs @@ -292,7 +292,8 @@ impl pallet_sudo::Trait for Runtime { } parameter_types! { - pub const Period: BlockNumber = 4; + /// Authorities are changing every 5 minutes. + pub const Period: BlockNumber = 5 * MINUTES; pub const Offset: BlockNumber = 0; } diff --git a/bridges/bin/rialto/runtime/src/millau.rs b/bridges/bin/rialto/runtime/src/millau.rs index 57ca750e2406..2c0e9dfc87b0 100644 --- a/bridges/bin/rialto/runtime/src/millau.rs +++ b/bridges/bin/rialto/runtime/src/millau.rs @@ -36,7 +36,7 @@ pub fn initial_header() -> Header { Header { parent_hash: Default::default(), number: Default::default(), - state_root: hex!("e901070e3bb061a6ae9ea8e4ba5417bf4c4642f9e75af9d372861c170ba7a9a3").into(), + state_root: hex!("234a17bbd3fbaff8f0a799a6c8f0bdba1979e242fb2ed66d15945acb84947cbd").into(), extrinsics_root: hex!("03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314").into(), digest: Default::default(), } diff --git a/bridges/modules/substrate/src/justification.rs b/bridges/modules/substrate/src/justification.rs index 100049f810ef..911ebf3f2c8c 100644 --- a/bridges/modules/substrate/src/justification.rs +++ b/bridges/modules/substrate/src/justification.rs @@ -44,6 +44,15 @@ pub enum Error { InvalidPrecommitAncestries, } +/// Decode justification target. +pub fn decode_justification_target( + raw_justification: &[u8], +) -> Result<(Header::Hash, Header::Number), Error> { + GrandpaJustification::
::decode(&mut &raw_justification[..]) + .map(|justification| (justification.commit.target_hash, justification.commit.target_number)) + .map_err(|_| Error::JustificationDecode) +} + /// Verify that justification, that is generated by given authority set, finalizes given header. pub fn verify_justification( finalized_target: (Header::Hash, Header::Number), diff --git a/bridges/modules/substrate/src/lib.rs b/bridges/modules/substrate/src/lib.rs index c4406b6a3ccb..6dfe27f2c1ab 100644 --- a/bridges/modules/substrate/src/lib.rs +++ b/bridges/modules/substrate/src/lib.rs @@ -41,6 +41,8 @@ use sp_std::{marker::PhantomData, prelude::*}; // Re-export since the node uses these when configuring genesis pub use storage::{AuthoritySet, ScheduledChange}; +pub use justification::decode_justification_target; + mod justification; mod storage; mod storage_proof; diff --git a/bridges/primitives/millau/src/lib.rs b/bridges/primitives/millau/src/lib.rs index 8571ba9499f9..6276b8db3a17 100644 --- a/bridges/primitives/millau/src/lib.rs +++ b/bridges/primitives/millau/src/lib.rs @@ -62,6 +62,8 @@ impl Chain for Millau { /// Name of the `MillauHeaderApi::best_block` runtime method. pub const BEST_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_best_block"; +/// Name of the `MillauHeaderApi::finalized_block` runtime method. +pub const FINALIZED_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_finalized_block"; /// Name of the `MillauHeaderApi::is_known_block` runtime method. pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block"; /// Name of the `MillauHeaderApi::incomplete_headers` runtime method. diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index c9045c6974dc..c2dd0b2d1e19 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -254,6 +254,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { consts::ETHEREUM_TICK_INTERVAL, target, consts::SUBSTRATE_TICK_INTERVAL, + (), sync_params, metrics_params, futures::future::pending(), diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 80d611a89c4f..fcc6e2786b56 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -177,6 +177,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { consts::SUBSTRATE_TICK_INTERVAL, target, consts::ETHEREUM_TICK_INTERVAL, + (), sync_params, metrics_params, futures::future::pending(), diff --git a/bridges/relays/headers-relay/src/headers.rs b/bridges/relays/headers-relay/src/headers.rs index b8c51eec1b49..6081cb1c18d7 100644 --- a/bridges/relays/headers-relay/src/headers.rs +++ b/bridges/relays/headers-relay/src/headers.rs @@ -496,6 +496,11 @@ impl QueuedHeaders

{ } } + /// Returns true if given header requires completion data. + pub fn requires_completion_data(&self, id: &HeaderIdOf

) -> bool { + self.incomplete_headers.contains_key(id) + } + /// Returns id of the header for which we want to fetch completion data. pub fn incomplete_header(&mut self) -> Option> { queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| { diff --git a/bridges/relays/headers-relay/src/sync_loop.rs b/bridges/relays/headers-relay/src/sync_loop.rs index c53a1ab0f082..4a09e3518df9 100644 --- a/bridges/relays/headers-relay/src/sync_loop.rs +++ b/bridges/relays/headers-relay/src/sync_loop.rs @@ -16,7 +16,7 @@ //! Entrypoint for running headers synchronization loop. -use crate::sync::HeadersSyncParams; +use crate::sync::{HeadersSync, HeadersSyncParams}; use crate::sync_loop_metrics::SyncLoopMetrics; use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}; @@ -48,10 +48,12 @@ const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60); /// Delay after we have seen update of best source header at target node, /// for us to treat sync stalled. ONLY when relay operates in backup mode. const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); +/// Interval between calling sync maintain procedure. +const MAINTAIN_INTERVAL: Duration = Duration::from_secs(30); /// Source client trait. #[async_trait] -pub trait SourceClient: Sized { +pub trait SourceClient { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; @@ -78,7 +80,7 @@ pub trait SourceClient: Sized { /// Target client trait. #[async_trait] -pub trait TargetClient: Sized { +pub trait TargetClient { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; @@ -102,12 +104,24 @@ pub trait TargetClient: Sized { async fn requires_extra(&self, header: QueuedHeader

) -> Result<(HeaderIdOf

, bool), Self::Error>; } +/// Synchronization maintain procedure. +#[async_trait] +pub trait SyncMaintain: Send + Sync { + /// Run custom maintain procedures. This is guaranteed to be called when both source and target + /// clients are unoccupied. + async fn maintain(&self, _sync: &mut HeadersSync

) {} +} + +impl SyncMaintain

for () {} + /// Run headers synchronization. +#[allow(clippy::too_many_arguments)] pub fn run>( source_client: impl SourceClient

, source_tick: Duration, target_client: TC, target_tick: Duration, + sync_maintain: impl SyncMaintain

, sync_params: HeadersSyncParams, metrics_params: Option, exit_signal: impl Future, @@ -116,7 +130,7 @@ pub fn run>( let mut progress_context = (Instant::now(), None, None); local_pool.run_until(async move { - let mut sync = crate::sync::HeadersSync::

::new(sync_params); + let mut sync = HeadersSync::

::new(sync_params); let mut stall_countdown = None; let mut last_update_time = Instant::now(); @@ -154,6 +168,9 @@ pub fn run>( let target_go_offline_future = futures::future::Fuse::terminated(); let target_tick_stream = interval(target_tick).fuse(); + let mut maintain_required = false; + let maintain_stream = interval(MAINTAIN_INTERVAL).fuse(); + let exit_signal = exit_signal.fuse(); futures::pin_mut!( @@ -172,6 +189,7 @@ pub fn run>( target_complete_header_future, target_go_offline_future, target_tick_stream, + maintain_stream, exit_signal ); @@ -373,6 +391,9 @@ pub fn run>( target_incomplete_headers_required = true; }, + _ = maintain_stream.next() => { + maintain_required = true; + }, _ = exit_signal => { return; } @@ -387,9 +408,16 @@ pub fn run>( // print progress progress_context = print_sync_progress(progress_context, &sync); + // run maintain procedures + if maintain_required && source_client_is_online && target_client_is_online { + log::debug!(target: "bridge", "Maintaining headers sync loop"); + maintain_required = false; + sync_maintain.maintain(&mut sync).await; + } + // If the target client is accepting requests we update the requests that // we want it to run - if target_client_is_online { + if !maintain_required && target_client_is_online { // NOTE: Is is important to reset this so that we only have one // request being processed by the client at a time. This prevents // race conditions like receiving two transactions with the same @@ -476,7 +504,7 @@ pub fn run>( // If the source client is accepting requests we update the requests that // we want it to run - if source_client_is_online { + if !maintain_required && source_client_is_online { // NOTE: Is is important to reset this so that we only have one // request being processed by the client at a time. This prevents // race conditions like receiving two transactions with the same @@ -561,7 +589,7 @@ pub fn run>( /// Print synchronization progress. fn print_sync_progress( progress_context: (Instant, Option, Option), - eth_sync: &crate::sync::HeadersSync

, + eth_sync: &HeadersSync

, ) -> (Instant, Option, Option) { let (prev_time, prev_best_header, prev_target_header) = progress_context; let now_time = Instant::now(); diff --git a/bridges/relays/headers-relay/src/sync_loop_tests.rs b/bridges/relays/headers-relay/src/sync_loop_tests.rs index 84b2082ce91d..f41d9708cd61 100644 --- a/bridges/relays/headers-relay/src/sync_loop_tests.rs +++ b/bridges/relays/headers-relay/src/sync_loop_tests.rs @@ -479,6 +479,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) { test_tick(), target, test_tick(), + (), crate::sync::tests::default_sync_params(), None, exit_receiver.into_future().map(|(_, _)| ()), diff --git a/bridges/relays/headers-relay/src/sync_types.rs b/bridges/relays/headers-relay/src/sync_types.rs index a910ce581c7e..54a41a8aaa57 100644 --- a/bridges/relays/headers-relay/src/sync_types.rs +++ b/bridges/relays/headers-relay/src/sync_types.rs @@ -76,7 +76,7 @@ pub trait HeadersSyncPipeline: Clone + Send + Sync { /// 4) header and extra data are submitted in single transaction. /// /// Example: Ethereum transactions receipts. - type Extra: Clone + PartialEq + std::fmt::Debug; + type Extra: Clone + Send + Sync + PartialEq + std::fmt::Debug; /// Type of data required to 'complete' header that we're receiving from the source node: /// 1) completion data is required for some headers; /// 2) target node can't answer if it'll require completion data before header is accepted; @@ -84,7 +84,7 @@ pub trait HeadersSyncPipeline: Clone + Send + Sync { /// 4) header and completion data are submitted in separate transactions. /// /// Example: Substrate GRANDPA justifications. - type Completion: Clone + std::fmt::Debug; + type Completion: Clone + Send + Sync + std::fmt::Debug; /// Function used to estimate size of target-encoded header. fn estimate_size(source: &QueuedHeader) -> usize; diff --git a/bridges/relays/substrate-client/src/client.rs b/bridges/relays/substrate-client/src/client.rs index 896b01477590..64994505ab07 100644 --- a/bridges/relays/substrate-client/src/client.rs +++ b/bridges/relays/substrate-client/src/client.rs @@ -23,12 +23,15 @@ use crate::{ConnectionParams, Result}; use jsonrpsee::common::DeserializeOwned; use jsonrpsee::raw::RawClient; use jsonrpsee::transport::ws::WsTransportClient; -use jsonrpsee::Client as RpcClient; +use jsonrpsee::{client::Subscription, Client as RpcClient}; use num_traits::Zero; use sp_core::Bytes; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; +/// Opaque justifications subscription type. +pub type JustificationsSubscription = Subscription; + /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; @@ -135,4 +138,16 @@ where .await .map_err(Into::into) } + + /// Return new justifications stream. + pub async fn subscribe_justifications(self) -> Result { + Ok(self + .client + .subscribe( + "grandpa_subscribeJustifications", + jsonrpsee::common::Params::None, + "grandpa_unsubscribeJustifications", + ) + .await?) + } } diff --git a/bridges/relays/substrate-client/src/error.rs b/bridges/relays/substrate-client/src/error.rs index 9dff9e02d2f6..319027440b18 100644 --- a/bridges/relays/substrate-client/src/error.rs +++ b/bridges/relays/substrate-client/src/error.rs @@ -29,7 +29,7 @@ pub type Result = std::result::Result; pub enum Error { /// Web socket connection error. WsConnectionError(WsNewDnsError), - /// An error that can occur when making an HTTP request to + /// An error that can occur when making a request to /// an JSON-RPC server. Request(RequestError), /// The response from the server could not be SCALE decoded. diff --git a/bridges/relays/substrate-client/src/lib.rs b/bridges/relays/substrate-client/src/lib.rs index 9bc1cf164245..adee027d51e5 100644 --- a/bridges/relays/substrate-client/src/lib.rs +++ b/bridges/relays/substrate-client/src/lib.rs @@ -26,7 +26,7 @@ mod rpc; pub mod headers_source; pub use crate::chain::{BlockWithJustification, Chain, TransactionSignScheme}; -pub use crate::client::{Client, OpaqueGrandpaAuthoritiesSet}; +pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet}; pub use crate::error::{Error, Result}; pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf}; diff --git a/bridges/relays/substrate/Cargo.toml b/bridges/relays/substrate/Cargo.toml index d58b9d6f37e9..a3f6ca6e435c 100644 --- a/bridges/relays/substrate/Cargo.toml +++ b/bridges/relays/substrate/Cargo.toml @@ -20,6 +20,7 @@ bp-millau = { path = "../../primitives/millau" } bp-rialto = { path = "../../primitives/rialto" } headers-relay = { path = "../headers-relay" } messages-relay = { path = "../messages-relay" } +pallet-substrate-bridge = { path = "../../modules/substrate" } relay-millau-client = { path = "../millau-client" } relay-rialto-client = { path = "../rialto-client" } relay-substrate-client = { path = "../substrate-client" } diff --git a/bridges/relays/substrate/src/headers_maintain.rs b/bridges/relays/substrate/src/headers_maintain.rs new file mode 100644 index 000000000000..e5e7c7023cda --- /dev/null +++ b/bridges/relays/substrate/src/headers_maintain.rs @@ -0,0 +1,382 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate-to-Substrate headers synchronization maintain procedure. +//! +//! Regular headers synchronization only depends on persistent justifications +//! that are generated when authorities set changes. This happens rarely on +//! real-word chains. So some other way to finalize headers is required. +//! +//! Full nodes are listening to GRANDPA messages, so they may have track authorities +//! votes on their own. They're returning both persistent and ephemeral justifications +//! (justifications that are not stored in the database and not broadcasted over network) +//! throught `grandpa_subscribeJustifications` RPC subscription. +//! +//! The idea of this maintain procedure is that when we see justification that 'improves' +//! best finalized header on the target chain, we submit this justification to the target +//! node. + +use crate::headers_target::SubstrateHeadersSyncPipeline; + +use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; +use codec::{Decode, Encode}; +use futures::future::{poll_fn, FutureExt, TryFutureExt}; +use headers_relay::{ + sync::HeadersSync, + sync_loop::SyncMaintain, + sync_types::{HeaderIdOf, HeaderStatus}, +}; +use relay_substrate_client::{Chain, Client, Error as SubstrateError, JustificationsSubscription}; +use relay_utils::HeaderId; +use sp_core::Bytes; +use sp_runtime::{traits::Header as HeaderT, DeserializeOwned, Justification}; +use std::{collections::VecDeque, task::Poll}; + +/// Substrate-to-Substrate headers synchronization maintain procedure. +pub struct SubstrateHeadersToSubstrateMaintain { + pipeline: P, + target_client: Client, + justifications: Arc>>, +} + +/// Future and already received justifications from the source chain. +struct Justifications { + /// Justifications stream. + stream: JustificationsSubscription, + /// Justifications that we have read from the stream but have not sent to the + /// target node, because their targets were still not synced. + queue: VecDeque<(HeaderIdOf

, Justification)>, +} + +impl SubstrateHeadersToSubstrateMaintain { + /// Create new maintain procedure. + pub fn new(pipeline: P, target_client: Client, justifications: JustificationsSubscription) -> Self { + SubstrateHeadersToSubstrateMaintain { + pipeline, + target_client, + justifications: Arc::new(Mutex::new(Justifications { + stream: justifications, + queue: VecDeque::new(), + })), + } + } +} + +#[async_trait] +impl SyncMaintain

for SubstrateHeadersToSubstrateMaintain +where + C: Chain, + C::Header: DeserializeOwned, + C::Index: DeserializeOwned, + P::Number: Decode + From, + P::Hash: Decode + From, + P: SubstrateHeadersSyncPipeline, +{ + async fn maintain(&self, sync: &mut HeadersSync

) { + // lock justifications before doing anything else + let mut justifications = match self.justifications.try_lock() { + Some(justifications) => justifications, + None => { + // this should never happen, as we use single-thread executor + log::warn!(target: "bridge", "Failed to acquire {} justifications lock", P::SOURCE_NAME); + return; + } + }; + + // we need to read best finalized header from the target node to be able to + // choose justification to submit + let best_finalized = match best_finalized_header_id::(&self.target_client).await { + Ok(best_finalized) => best_finalized, + Err(error) => { + log::warn!( + target: "bridge", + "Failed to read best finalized {} block from maintain: {:?}", + P::SOURCE_NAME, + error, + ); + return; + } + }; + + log::debug!( + target: "bridge", + "Read best finalized {} block from {}: {:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + best_finalized, + ); + + // Select justification to submit to the target node. We're submitting at most one justification + // on every maintain call. So maintain rate directly affects finalization rate. + let justification_to_submit = poll_fn(|context| { + // read justifications from the stream and push to the queue + justifications.read_from_stream::(context); + + // remove all obsolete justifications from the queue + remove_obsolete::

(&mut justifications.queue, best_finalized); + + // select justification to submit + Poll::Ready(select_justification(&mut justifications.queue, sync)) + }) + .await; + + // finally - submit selected justification + if let Some((target, justification)) = justification_to_submit { + let submit_result = self + .pipeline + .make_complete_header_transaction(target, justification) + .and_then(|tx| self.target_client.submit_extrinsic(Bytes(tx.encode()))) + .await; + + match submit_result { + Ok(_) => log::debug!( + target: "bridge", + "Submitted justification received over {} subscription. Target: {:?}", + P::SOURCE_NAME, + target, + ), + Err(error) => log::warn!( + target: "bridge", + "Failed to submit justification received over {} subscription for {:?}: {:?}", + P::SOURCE_NAME, + target, + error, + ), + } + } + } +} + +impl

Justifications

+where + P::Number: Decode, + P::Hash: Decode, + P: SubstrateHeadersSyncPipeline, +{ + /// Read justifications from the subscription stream without blocking. + fn read_from_stream<'a, Header>(&mut self, context: &mut std::task::Context<'a>) + where + Header: HeaderT, + Header::Number: Into, + Header::Hash: Into, + { + loop { + let maybe_next_justification = self.stream.next(); + futures::pin_mut!(maybe_next_justification); + + let maybe_next_justification = maybe_next_justification.poll_unpin(context); + let justification = match maybe_next_justification { + Poll::Ready(justification) => justification, + Poll::Pending => return, + }; + + // decode justification target + let target = pallet_substrate_bridge::decode_justification_target::

(&justification); + let target = match target { + Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to decode justification from {} subscription: {:?}", + P::SOURCE_NAME, + error, + ); + continue; + } + }; + + log::debug!( + target: "bridge", + "Received {} justification over subscription. Target: {:?}", + P::SOURCE_NAME, + target, + ); + + self.queue.push_back((target, justification.0)); + } + } +} + +/// Clean queue of all justifications that are justifying already finalized blocks. +fn remove_obsolete( + queue: &mut VecDeque<(HeaderIdOf

, Justification)>, + best_finalized: HeaderIdOf

, +) { + while queue + .front() + .map(|(target, _)| target.0 <= best_finalized.0) + .unwrap_or(false) + { + queue.pop_front(); + } +} + +/// Select appropriate justification that would improve best finalized block on target node. +/// +/// It is assumed that the selected justification will be submitted to the target node. The +/// justification itself and all preceeding justifications are removed from the queue. +fn select_justification

( + queue: &mut VecDeque<(HeaderIdOf

, Justification)>, + sync: &mut HeadersSync

, +) -> Option<(HeaderIdOf

, Justification)> +where + P: SubstrateHeadersSyncPipeline, +{ + let mut selected_justification = None; + while let Some((target, justification)) = queue.pop_front() { + // if we're waiting for this justification, report it + if sync.headers().requires_completion_data(&target) { + sync.headers_mut().completion_response(&target, Some(justification)); + // we won't submit previous justifications as we going to submit justification for + // next header + selected_justification = None; + // we won't submit next justifications as we need to submit previous justifications + // first + break; + } + + // if we know that the header is already synced (it is known to the target node), let's + // select it for submission. We still may select better justification on the next iteration. + if sync.headers().status(&target) == HeaderStatus::Synced { + selected_justification = Some((target, justification)); + continue; + } + + // finally - return justification back to the queue + queue.push_back((target, justification)); + break; + } + + selected_justification +} + +/// Returns best finalized source header on the target chain. +async fn best_finalized_header_id(client: &Client) -> Result, SubstrateError> +where + P: SubstrateHeadersSyncPipeline, + P::Number: Decode + From, + P::Hash: Decode + From, + C: Chain, + C::Header: DeserializeOwned, + C::Index: DeserializeOwned, +{ + let call = P::FINALIZED_BLOCK_METHOD.into(); + let data = Bytes(Vec::new()); + + let encoded_response = client.state_call(call, data, None).await?; + let decoded_response: (C::BlockNumber, C::Hash) = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + + let best_header_id = HeaderId(decoded_response.0.into(), decoded_response.1.into()); + Ok(best_header_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::millau_headers_to_rialto::{sync_params, MillauHeadersToRialto}; + + fn parent_hash(index: u8) -> bp_millau::Hash { + if index == 1 { + Default::default() + } else { + header(index - 1).hash() + } + } + + fn header_hash(index: u8) -> bp_millau::Hash { + header(index).hash() + } + + fn header(index: u8) -> bp_millau::Header { + bp_millau::Header::new( + index as _, + Default::default(), + Default::default(), + parent_hash(index), + Default::default(), + ) + } + + #[test] + fn obsolete_justifications_are_removed() { + let mut queue = vec![ + (HeaderId(1, header_hash(1)), vec![1]), + (HeaderId(2, header_hash(2)), vec![2]), + (HeaderId(3, header_hash(3)), vec![3]), + ] + .into_iter() + .collect(); + + remove_obsolete::(&mut queue, HeaderId(2, header_hash(2))); + + assert_eq!( + queue, + vec![(HeaderId(3, header_hash(3)), vec![3])] + .into_iter() + .collect::>(), + ); + } + + #[test] + fn latest_justification_is_selected() { + let mut queue = vec![ + (HeaderId(1, header_hash(1)), vec![1]), + (HeaderId(2, header_hash(2)), vec![2]), + (HeaderId(3, header_hash(3)), vec![3]), + ] + .into_iter() + .collect(); + let mut sync = HeadersSync::::new(sync_params()); + sync.headers_mut().header_response(header(1).into()); + sync.headers_mut().header_response(header(2).into()); + sync.headers_mut().header_response(header(3).into()); + sync.target_best_header_response(HeaderId(2, header_hash(2))); + + assert_eq!( + select_justification(&mut queue, &mut sync), + Some((HeaderId(2, header_hash(2)), vec![2])), + ); + } + + #[test] + fn required_justification_is_reported() { + let mut queue = vec![ + (HeaderId(1, header_hash(1)), vec![1]), + (HeaderId(2, header_hash(2)), vec![2]), + (HeaderId(3, header_hash(3)), vec![3]), + ] + .into_iter() + .collect(); + let mut sync = HeadersSync::::new(sync_params()); + sync.headers_mut().header_response(header(1).into()); + sync.headers_mut().header_response(header(2).into()); + sync.headers_mut().header_response(header(3).into()); + sync.headers_mut() + .incomplete_headers_response(vec![HeaderId(2, header_hash(2))].into_iter().collect()); + sync.target_best_header_response(HeaderId(2, header_hash(2))); + + assert_eq!(sync.headers_mut().header_to_complete(), None,); + + assert_eq!(select_justification(&mut queue, &mut sync), None,); + + assert_eq!( + sync.headers_mut().header_to_complete(), + Some((HeaderId(2, header_hash(2)), &vec![2])), + ); + } +} diff --git a/bridges/relays/substrate/src/headers_target.rs b/bridges/relays/substrate/src/headers_target.rs index 92bc017a72eb..546fb805eb63 100644 --- a/bridges/relays/substrate/src/headers_target.rs +++ b/bridges/relays/substrate/src/headers_target.rs @@ -36,6 +36,8 @@ use std::collections::HashSet; pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline { /// Name of the `best_block` runtime method. const BEST_BLOCK_METHOD: &'static str; + /// Name of the `finalized_block` runtime method. + const FINALIZED_BLOCK_METHOD: &'static str; /// Name of the `is_known_block` runtime method. const IS_KNOWN_BLOCK_METHOD: &'static str; /// Name of the `incomplete_headers` runtime method. diff --git a/bridges/relays/substrate/src/main.rs b/bridges/relays/substrate/src/main.rs index ad77a13eea9b..53f10dab86d6 100644 --- a/bridges/relays/substrate/src/main.rs +++ b/bridges/relays/substrate/src/main.rs @@ -28,6 +28,7 @@ pub type MillauClient = relay_substrate_client::Client; mod cli; +mod headers_maintain; mod headers_target; mod millau_headers_to_rialto; @@ -63,7 +64,7 @@ async fn run_command(command: cli::Command) -> Result<(), String> { rialto_sign.rialto_signer_password.as_deref(), ) .map_err(|e| format!("Failed to parse rialto-signer: {:?}", e))?; - millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()); + millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await; } } diff --git a/bridges/relays/substrate/src/millau_headers_to_rialto.rs b/bridges/relays/substrate/src/millau_headers_to_rialto.rs index 67ac6d6b94e7..9f63614cde37 100644 --- a/bridges/relays/substrate/src/millau_headers_to_rialto.rs +++ b/bridges/relays/substrate/src/millau_headers_to_rialto.rs @@ -17,12 +17,16 @@ //! Millau-to-Rialto headers sync entrypoint. use crate::{ + headers_maintain::SubstrateHeadersToSubstrateMaintain, headers_target::{SubstrateHeadersSyncPipeline, SubstrateHeadersTarget}, MillauClient, RialtoClient, }; use async_trait::async_trait; -use bp_millau::{BEST_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, IS_KNOWN_MILLAU_BLOCK_METHOD}; +use bp_millau::{ + BEST_MILLAU_BLOCK_METHOD, FINALIZED_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD, + IS_KNOWN_MILLAU_BLOCK_METHOD, +}; use codec::Encode; use headers_relay::{ sync::{HeadersSyncParams, TargetTransactionMode}, @@ -39,7 +43,7 @@ use std::time::Duration; /// Millau-to-Rialto headers pipeline. #[derive(Debug, Clone)] -struct MillauHeadersToRialto { +pub struct MillauHeadersToRialto { client: RialtoClient, sign: RialtoSigningParams, } @@ -62,6 +66,7 @@ impl HeadersSyncPipeline for MillauHeadersToRialto { #[async_trait] impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto { const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCK_METHOD; + const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_MILLAU_BLOCK_METHOD; const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD; const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD; @@ -100,8 +105,20 @@ type MillauSourceClient = HeadersSource; /// Rialto node as headers target. type RialtoTargetClient = SubstrateHeadersTarget; +/// Return sync parameters for Millau-to-Rialto headers sync. +pub fn sync_params() -> HeadersSyncParams { + HeadersSyncParams { + max_future_headers_to_download: 32, + max_headers_in_submitted_status: 8, + max_headers_in_single_submit: 1, + max_headers_size_in_single_submit: 1024 * 1024, + prune_depth: 256, + target_tx_mode: TargetTransactionMode::Signed, + } +} + /// Run Millau-to-Rialto headers sync. -pub fn run( +pub async fn run( millau_client: MillauClient, rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, @@ -109,27 +126,34 @@ pub fn run( ) { let millau_tick = Duration::from_secs(5); let rialto_tick = Duration::from_secs(5); - let sync_params = HeadersSyncParams { - max_future_headers_to_download: 32, - max_headers_in_submitted_status: 8, - max_headers_in_single_submit: 1, - max_headers_size_in_single_submit: 1024 * 1024, - prune_depth: 256, - target_tx_mode: TargetTransactionMode::Signed, + + let millau_justifications = match millau_client.clone().subscribe_justifications().await { + Ok(millau_justifications) => millau_justifications, + Err(error) => { + log::warn!( + target: "bridge", + "Failed to subscribe to Millau justifications: {:?}", + error, + ); + + return; + } + }; + + let pipeline = MillauHeadersToRialto { + client: rialto_client.clone(), + sign: rialto_sign, }; + let sync_maintain = + SubstrateHeadersToSubstrateMaintain::new(pipeline.clone(), rialto_client.clone(), millau_justifications); headers_relay::sync_loop::run( MillauSourceClient::new(millau_client), millau_tick, - RialtoTargetClient::new( - rialto_client.clone(), - MillauHeadersToRialto { - client: rialto_client, - sign: rialto_sign, - }, - ), + RialtoTargetClient::new(rialto_client, pipeline), rialto_tick, - sync_params, + sync_maintain, + sync_params(), metrics_params, futures::future::pending(), );