Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Parachains source cosmetic changes

- Make `ParaHashAtSource` more generic
- Modify `on_chain_parachain_header` to return `HeaderId`
- Shortening variable names

Signed-off-by: Serban Iorga <[email protected]>

* Change ParachainsSource::max_head_id type

Change ParachainsSource::max_head_id to Arc<Mutex<NoopOption>>

Signed-off-by: Serban Iorga <[email protected]>

* code review changes
  • Loading branch information
serban300 committed Apr 10, 2024
1 parent 912c99d commit d8b25cc
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 101 deletions.
4 changes: 3 additions & 1 deletion bridges/primitives/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub const ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/
pub const ROOT_ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/root";

/// Generic header Id.
#[derive(RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq)]
#[derive(
RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq, PartialOrd, Ord,
)]
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);

/// Generic header id provider.
Expand Down
11 changes: 9 additions & 2 deletions bridges/relays/bin-substrate/src/cli/relay_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ use crate::chains::{
rialto_parachains_to_millau::RialtoParachainToMillauCliBridge,
westend_parachains_to_millau::WestmintToMillauCliBridge,
};
use async_std::sync::Mutex;
use async_trait::async_trait;
use bp_polkadot_core::parachains::ParaId;
use parachains_relay::parachains_loop::{ParachainSyncParams, SourceClient, TargetClient};
use parachains_relay::parachains_loop::{
AvailableHeader, ParachainSyncParams, SourceClient, TargetClient,
};
use relay_utils::metrics::{GlobalMetrics, StandaloneMetric};
use std::sync::Arc;
use structopt::StructOpt;
use strum::{EnumString, EnumVariantNames, VariantNames};
use substrate_relay_helper::{
Expand Down Expand Up @@ -65,7 +69,10 @@ where
{
async fn relay_headers(data: RelayParachains) -> anyhow::Result<()> {
let source_client = data.source.into_client::<Self::SourceRelay>().await?;
let source_client = ParachainsSource::<Self::ParachainFinality>::new(source_client, None);
let source_client = ParachainsSource::<Self::ParachainFinality>::new(
source_client,
Arc::new(Mutex::new(AvailableHeader::Missing)),
);

let target_transaction_params = TransactionParams {
signer: data.target_sign.to_keypair::<Self::Target>()?,
Expand Down
5 changes: 1 addition & 4 deletions bridges/relays/lib-substrate-relay/src/finality/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use relay_substrate_client::{
BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf,
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;
use std::pin::Pin;

/// Shared updatable reference to the maximal header number that we want to sync from the source.
Expand Down Expand Up @@ -76,9 +75,7 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
) -> Result<BlockNumberOf<P::SourceChain>, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because
// target node may be missing proofs that are already available at the source
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
Ok(*finalized_header.number())
self.client.best_finalized_header_number().await
}
}

Expand Down
22 changes: 9 additions & 13 deletions bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::Zero;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient};
use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
TransactionSignScheme,
Expand Down Expand Up @@ -143,15 +143,15 @@ async fn background_task<P: SubstrateParachainsPipeline>(

let mut relay_state = RelayState::Idle;
let mut required_parachain_header_number = Zero::zero();
let required_para_header_number_ref = Arc::new(Mutex::new(None));
let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));

let mut restart_relay = true;
let parachains_relay_task = futures::future::Fuse::terminated();
futures::pin_mut!(parachains_relay_task);

let mut parachains_source = ParachainsSource::<P>::new(
source_relay_client.clone(),
Some(required_para_header_number_ref.clone()),
required_para_header_number_ref.clone(),
);
let mut parachains_target =
ParachainsTarget::<P>::new(target_client.clone(), target_transaction_params.clone());
Expand Down Expand Up @@ -253,7 +253,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
.await;
},
RelayState::RelayingParaHeader(required_para_header) => {
*required_para_header_number_ref.lock().await = Some(required_para_header);
*required_para_header_number_ref.lock().await =
AvailableHeader::Available(required_para_header);
},
}

Expand Down Expand Up @@ -389,13 +390,9 @@ where
source.client().best_finalized_header().await.map_err(map_source_err)?;
let best_finalized_relay_block_id = best_finalized_relay_header.id();
let para_header_at_source = source
.on_chain_parachain_header(
best_finalized_relay_block_id,
P::SOURCE_PARACHAIN_PARA_ID.into(),
)
.on_chain_para_head_id(best_finalized_relay_block_id, P::SOURCE_PARACHAIN_PARA_ID.into())
.await
.map_err(map_source_err)?
.map(|h| h.id());
.map_err(map_source_err)?;

let relay_header_at_source = best_finalized_relay_block_id.0;
let relay_header_at_target =
Expand All @@ -408,10 +405,9 @@ where
.map_err(map_target_err)?;

let para_header_at_relay_header_at_target = source
.on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
.on_chain_para_head_id(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
.await
.map_err(map_source_err)?
.map(|h| h.id());
.map_err(map_source_err)?;

Ok(RelayData {
required_para_header: required_header_number,
Expand Down
76 changes: 30 additions & 46 deletions bridges/relays/lib-substrate-relay/src/parachains/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,35 @@ use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bp_parachains::parachain_head_storage_key_at_source;
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
use bp_runtime::HeaderIdProvider;
use codec::Decode;
use parachains_relay::{
parachains_loop::{ParaHashAtSource, SourceClient},
parachains_loop::{AvailableHeader, SourceClient},
parachains_loop_metrics::ParachainsLoopMetrics,
};
use relay_substrate_client::{
Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;

/// Shared updatable reference to the maximal parachain header id that we want to sync from the
/// source.
pub type RequiredHeaderIdRef<C> = Arc<Mutex<Option<HeaderIdOf<C>>>>;
pub type RequiredHeaderIdRef<C> = Arc<Mutex<AvailableHeader<HeaderIdOf<C>>>>;

/// Substrate client as parachain heads source.
#[derive(Clone)]
pub struct ParachainsSource<P: SubstrateParachainsPipeline> {
client: Client<P::SourceRelayChain>,
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
}

impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
/// Creates new parachains source client.
pub fn new(
client: Client<P::SourceRelayChain>,
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
) -> Self {
ParachainsSource { client, maximal_header_id }
ParachainsSource { client, max_head_id }
}

/// Returns reference to the underlying RPC client.
Expand All @@ -59,11 +59,11 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
}

/// Return decoded head of given parachain.
pub async fn on_chain_parachain_header(
pub async fn on_chain_para_head_id(
&self,
at_block: HeaderIdOf<P::SourceRelayChain>,
para_id: ParaId,
) -> Result<Option<HeaderOf<P::SourceParachain>>, SubstrateError> {
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
let storage_key =
parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id);
let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?;
Expand All @@ -72,8 +72,8 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
Some(para_head) => para_head,
None => return Ok(None),
};

Ok(Some(Decode::decode(&mut &para_head.0[..])?))
let para_head: HeaderOf<P::SourceParachain> = Decode::decode(&mut &para_head.0[..])?;
Ok(Some(para_head.id()))
}
}

Expand Down Expand Up @@ -105,7 +105,7 @@ where
at_block: HeaderIdOf<P::SourceRelayChain>,
metrics: Option<&ParachainsLoopMetrics>,
para_id: ParaId,
) -> Result<ParaHashAtSource, Self::Error> {
) -> Result<AvailableHeader<ParaHash>, Self::Error> {
// we don't need to support many parachains now
if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID {
return Err(SubstrateError::Custom(format!(
Expand All @@ -115,44 +115,28 @@ where
)))
}

let mut para_hash_at_source = ParaHashAtSource::None;
let mut para_header_number_at_source = None;
match self.on_chain_parachain_header(at_block, para_id).await? {
Some(parachain_header) => {
para_hash_at_source = ParaHashAtSource::Some(parachain_header.hash());
para_header_number_at_source = Some(*parachain_header.number());
// never return head that is larger than requested. This way we'll never sync
// headers past `maximal_header_id`
if let Some(ref maximal_header_id) = self.maximal_header_id {
let maximal_header_id = *maximal_header_id.lock().await;
match maximal_header_id {
Some(maximal_header_id)
if *parachain_header.number() > maximal_header_id.0 =>
{
// we don't want this header yet => let's report previously requested
// header
para_hash_at_source = ParaHashAtSource::Some(maximal_header_id.1);
para_header_number_at_source = Some(maximal_header_id.0);
},
Some(_) => (),
None => {
// on-demand relay has not yet asked us to sync anything let's do that
para_hash_at_source = ParaHashAtSource::Unavailable;
para_header_number_at_source = None;
},
}
}
},
None => {},
};
let mut para_head_id = AvailableHeader::Missing;
if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? {
// Never return head that is larger than requested. This way we'll never sync
// headers past `max_header_id`.
para_head_id = match *self.max_head_id.lock().await {
AvailableHeader::Unavailable => AvailableHeader::Unavailable,
AvailableHeader::Missing => {
// `max_header_id` is not set. There is no limit.
AvailableHeader::Available(on_chain_para_head_id)
},
AvailableHeader::Available(max_head_id) => {
// We report at most `max_header_id`.
AvailableHeader::Available(std::cmp::min(on_chain_para_head_id, max_head_id))
},
}
}

if let (Some(metrics), Some(para_header_number_at_source)) =
(metrics, para_header_number_at_source)
{
metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source);
if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) {
metrics.update_best_parachain_block_at_source(para_id, para_head_id.0);
}

Ok(para_hash_at_source)
Ok(para_head_id.map(|para_head_id| para_head_id.1))
}

async fn prove_parachain_heads(
Expand Down
Loading

0 comments on commit d8b25cc

Please sign in to comment.