Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Switch to relay_dispatch_queue_remaining_capacity (#2608)
Browse files Browse the repository at this point in the history
* Switch to `relay_dispatch_queue_remaining_capacity`

This switches the parachain runtimes to use `relay_dispatch_queue_remaining_capacity` when possible.
If the data is not yet available on the relay chain it falls back to `relay_dispatch_queue_size`. It
will require that all parachains migrate to `relay_dispatch_queue_remaining_capacity` before we can
start removing the call to `relay_dipatch_queue_size`.

Besides that the pr adapts the xcm exumulator to make it work with the message queue.

* Fix test and use correct types

* ".git/.scripts/commands/fmt/fmt.sh"

---------

Co-authored-by: command-bot <>
  • Loading branch information
bkchr authored May 21, 2023
1 parent d090ac0 commit 8cb913d
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 327 deletions.
554 changes: 296 additions & 258 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ pub mod pallet {
};

<PendingUpwardMessages<T>>::mutate(|up| {
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;
let queue_size = relevant_messaging_state.relay_dispatch_queue_size;

let available_capacity = cmp::min(
host_config.max_upward_queue_count.saturating_sub(count),
host_config.max_upward_message_num_per_candidate,
queue_size.remaining_count,
host_config.max_upward_message_num_per_candidate.into(),
);
let available_size = host_config.max_upward_queue_size.saturating_sub(size);
let available_size = queue_size.remaining_size;

// Count the number of messages we can possibly fit in the given constraints, i.e.
// available_capacity and available_size.
Expand Down Expand Up @@ -431,7 +431,7 @@ pub mod pallet {
.read_abridged_host_configuration()
.expect("Invalid host configuration in relay chain state proof");
let relevant_messaging_state = relay_state_proof
.read_messaging_state_snapshot()
.read_messaging_state_snapshot(&host_config)
.expect("Invalid messaging state in relay chain state proof");

<ValidationData<T>>::put(&vfp);
Expand Down
54 changes: 44 additions & 10 deletions pallets/parachain-system/src/relay_state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ use sp_state_machine::{Backend, TrieBackend, TrieBackendBuilder};
use sp_std::vec::Vec;
use sp_trie::{HashDBT, MemoryDB, StorageProof, EMPTY_PREFIX};

/// The capacity of the upward message queue of a parachain on the relay chain.
// The field order should stay the same as the data can be found in the proof to ensure both are
// have the same encoded representation.
#[derive(Clone, Encode, Decode, TypeInfo, Default)]
pub struct RelayDispachQueueSize {
/// The number of additional messages that can be enqueued.
pub remaining_count: u32,
/// The total size of additional messages that can be enqueued.
pub remaining_size: u32,
}

/// A snapshot of some messaging related state of relay chain pertaining to the current parachain.
///
/// This data is essential for making sure that the parachain is aware of current resource use on
Expand All @@ -37,10 +48,7 @@ pub struct MessagingStateSnapshot {
pub dmq_mqc_head: relay_chain::Hash,

/// The current capacity of the upward message queue of the current parachain on the relay chain.
///
/// The capacity is represented by a tuple that consist of the `count` of the messages and the
/// `total_size` expressed as the sum of byte sizes of all messages in the queue.
pub relay_dispatch_queue_size: (u32, u32),
pub relay_dispatch_queue_size: RelayDispachQueueSize,

/// Information about all the inbound HRMP channels.
///
Expand Down Expand Up @@ -164,20 +172,46 @@ impl RelayChainStateProof {
/// Read the [`MessagingStateSnapshot`] from the relay chain state proof.
///
/// Returns an error if anything failed at reading or decoding.
pub fn read_messaging_state_snapshot(&self) -> Result<MessagingStateSnapshot, Error> {
pub fn read_messaging_state_snapshot(
&self,
host_config: &AbridgedHostConfiguration,
) -> Result<MessagingStateSnapshot, Error> {
let dmq_mqc_head: relay_chain::Hash = read_entry(
&self.trie_backend,
&relay_chain::well_known_keys::dmq_mqc_head(self.para_id),
Some(Default::default()),
)
.map_err(Error::DmqMqcHead)?;

let relay_dispatch_queue_size: (u32, u32) = read_entry(
let relay_dispatch_queue_size = read_optional_entry::<RelayDispachQueueSize, _>(
&self.trie_backend,
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
Some((0, 0)),
)
.map_err(Error::RelayDispatchQueueSize)?;
&relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id)
.key,
);

// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
//
// When the relay chain and all parachains support `relay_dispatch_queue_remaining_capacity`,
// this code here needs to be removed and above needs to be changed to `read_entry` that
// returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded.
//
// For now we just fallback to the old dispatch queue size if there is an error.
let relay_dispatch_queue_size = match relay_dispatch_queue_size {
Ok(Some(r)) => r,
_ => {
let res = read_entry::<(u32, u32), _>(
&self.trie_backend,
#[allow(deprecated)]
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
Some((0, 0)),
)
.map_err(Error::RelayDispatchQueueSize)?;

let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0);
let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1);
RelayDispachQueueSize { remaining_count, remaining_size }
},
};

let ingress_channel_index: Vec<ParaId> = read_entry(
&self.trie_backend,
Expand Down
6 changes: 3 additions & 3 deletions pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ fn send_upward_message_num_per_candidate() {
BlockTests::new()
.with_relay_sproof_builder(|_, _, sproof| {
sproof.host_config.max_upward_message_num_per_candidate = 1;
sproof.relay_dispatch_queue_size = None;
sproof.relay_dispatch_queue_remaining_capacity = None;
})
.add_with_post_test(
1,
Expand Down Expand Up @@ -544,8 +544,8 @@ fn send_upward_message_relay_bottleneck() {
sproof.host_config.max_upward_queue_count = 5;

match relay_block_num {
1 => sproof.relay_dispatch_queue_size = Some((5, 0)),
2 => sproof.relay_dispatch_queue_size = Some((4, 0)),
1 => sproof.relay_dispatch_queue_remaining_capacity = Some((0, 2048)),
2 => sproof.relay_dispatch_queue_remaining_capacity = Some((1, 2048)),
_ => unreachable!(),
}
})
Expand Down
2 changes: 2 additions & 0 deletions parachains/integration-tests/emulated/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ decl_test_relay_chains! {
RuntimeOrigin: polkadot_runtime::RuntimeOrigin,
RuntimeCall: polkadot_runtime::RuntimeCall,
RuntimeEvent: polkadot_runtime::RuntimeEvent,
MessageQueue: polkadot_runtime::MessageQueue,
XcmConfig: polkadot_runtime::xcm_config::XcmConfig,
SovereignAccountOf: polkadot_runtime::xcm_config::SovereignAccountOf,
System: polkadot_runtime::System,
Expand All @@ -41,6 +42,7 @@ decl_test_relay_chains! {
RuntimeOrigin: kusama_runtime::RuntimeOrigin,
RuntimeCall: polkadot_runtime::RuntimeCall,
RuntimeEvent: kusama_runtime::RuntimeEvent,
MessageQueue: polkadot_runtime::MessageQueue,
XcmConfig: kusama_runtime::xcm_config::XcmConfig,
SovereignAccountOf: kusama_runtime::xcm_config::SovereignAccountOf,
System: kusama_runtime::System,
Expand Down
4 changes: 4 additions & 0 deletions primitives/parachain-inherent/src/client_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ async fn collect_relay_storage_proof(
relay_well_known_keys::CURRENT_SLOT.to_vec(),
relay_well_known_keys::ACTIVE_CONFIG.to_vec(),
relay_well_known_keys::dmq_mqc_head(para_id),
// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
// We need to keep this here until all parachains have migrated to `relay_dispatch_queue_remaining_capacity`.
#[allow(deprecated)]
relay_well_known_keys::relay_dispatch_queue_size(para_id),
relay_well_known_keys::relay_dispatch_queue_remaining_capacity(para_id).key,
relay_well_known_keys::hrmp_ingress_channel_index(para_id),
relay_well_known_keys::hrmp_egress_channel_index(para_id),
relay_well_known_keys::upgrade_go_ahead_signal(para_id),
Expand Down
11 changes: 7 additions & 4 deletions test/relay-sproof-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct RelayStateSproofBuilder {
pub host_config: AbridgedHostConfiguration,
pub dmq_mqc_head: Option<relay_chain::Hash>,
pub upgrade_go_ahead: Option<UpgradeGoAhead>,
pub relay_dispatch_queue_size: Option<(u32, u32)>,
pub relay_dispatch_queue_remaining_capacity: Option<(u32, u32)>,
pub hrmp_ingress_channel_index: Option<Vec<ParaId>>,
pub hrmp_egress_channel_index: Option<Vec<ParaId>>,
pub hrmp_channels: BTreeMap<relay_chain::HrmpChannelId, AbridgedHrmpChannel>,
Expand All @@ -65,7 +65,7 @@ impl Default for RelayStateSproofBuilder {
},
dmq_mqc_head: None,
upgrade_go_ahead: None,
relay_dispatch_queue_size: None,
relay_dispatch_queue_remaining_capacity: None,
hrmp_ingress_channel_index: None,
hrmp_egress_channel_index: None,
hrmp_channels: BTreeMap::new(),
Expand Down Expand Up @@ -124,9 +124,12 @@ impl RelayStateSproofBuilder {
dmq_mqc_head.encode(),
);
}
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_size {
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_remaining_capacity {
insert(
relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(
self.para_id,
)
.key,
relay_dispatch_queue_size.encode(),
);
}
Expand Down
1 change: 1 addition & 0 deletions xcm/xcm-emulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sp-std = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-message-queue = { git = "https://github.com/paritytech/substrate", branch = "master" }

cumulus-primitives-core = { path = "../../primitives/core"}
cumulus-pallet-xcmp-queue = { path = "../../pallets/xcmp-queue" }
Expand Down
109 changes: 62 additions & 47 deletions xcm/xcm-emulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub use casey::pascal;
pub use codec::Encode;
pub use frame_support::{
sp_runtime::BuildStorage,
traits::{Get, Hooks},
weights::Weight,
traits::{EnqueueMessage, Get, Hooks, ProcessMessage, ProcessMessageError, ServiceQueues},
weights::{Weight, WeightMeter},
};
pub use frame_system::AccountInfo;
pub use log;
Expand All @@ -41,13 +41,14 @@ pub use cumulus_primitives_core::{
pub use cumulus_primitives_parachain_inherent::ParachainInherentData;
pub use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
pub use cumulus_test_service::get_account_id_from_seed;
pub use pallet_message_queue;
pub use parachain_info;
pub use parachains_common::{AccountId, BlockNumber};

pub use polkadot_primitives;
pub use polkadot_runtime_parachains::{
dmp,
ump::{MessageId, UmpSink, XcmSink},
inclusion::{AggregateMessageOrigin, UmpQueueId},
};
pub use std::{collections::HashMap, thread::LocalKey};
pub use xcm::{v3::prelude::*, VersionedXcm};
Expand Down Expand Up @@ -164,7 +165,7 @@ pub trait NetworkComponent<N: Network> {
}
}

pub trait RelayChain: UmpSink {
pub trait RelayChain: ProcessMessage {
type Runtime;
type RuntimeOrigin;
type RuntimeCall;
Expand Down Expand Up @@ -198,14 +199,15 @@ macro_rules! decl_test_relay_chains {
genesis = $genesis:expr,
on_init = $on_init:expr,
runtime = {
Runtime: $($runtime:tt)::*,
RuntimeOrigin: $($runtime_origin:tt)::*,
RuntimeCall: $($runtime_call:tt)::*,
RuntimeEvent: $($runtime_event:tt)::*,
XcmConfig: $($xcm_config:tt)::*,
SovereignAccountOf: $($sovereign_acc_of:tt)::*,
System: $($system:tt)::*,
Balances: $($balances:tt)::*,
Runtime: $runtime:path,
RuntimeOrigin: $runtime_origin:path,
RuntimeCall: $runtime_call:path,
RuntimeEvent: $runtime_event:path,
MessageQueue: $mq:path,
XcmConfig: $xcm_config:path,
SovereignAccountOf: $sovereign_acc_of:path,
System: $system:path,
Balances: $balances:path,
},
pallets_extra = {
$($pallet_name:ident: $pallet_path:path,)*
Expand All @@ -218,14 +220,14 @@ macro_rules! decl_test_relay_chains {
pub struct $name;

impl RelayChain for $name {
type Runtime = $($runtime)::*;
type RuntimeOrigin = $($runtime_origin)::*;
type RuntimeCall = $($runtime_call)::*;
type RuntimeEvent = $($runtime_event)::*;
type XcmConfig = $($xcm_config)::*;
type SovereignAccountOf = $($sovereign_acc_of)::*;
type System = $($system)::*;
type Balances = $($balances)::*;
type Runtime = $runtime;
type RuntimeOrigin = $runtime_origin;
type RuntimeCall = $runtime_call;
type RuntimeEvent = $runtime_event;
type XcmConfig = $xcm_config;
type SovereignAccountOf = $sovereign_acc_of;
type System = $system;
type Balances = $balances;
}

$crate::paste::paste! {
Expand All @@ -242,31 +244,43 @@ macro_rules! decl_test_relay_chains {
}
}

$crate::__impl_xcm_handlers_for_relay_chain!($name);
$crate::__impl_test_ext_for_relay_chain!($name, $genesis, $on_init);
)+
};
}
impl $crate::ProcessMessage for $name {
type Origin = $crate::ParaId;

#[macro_export]
macro_rules! __impl_xcm_handlers_for_relay_chain {
($name:ident) => {
impl $crate::UmpSink for $name {
fn process_upward_message(
origin: $crate::ParaId,
msg: &[u8],
max_weight: $crate::Weight,
) -> Result<$crate::Weight, ($crate::MessageId, $crate::Weight)> {
use $crate::{TestExt, UmpSink};

Self::execute_with(|| {
$crate::XcmSink::<
$crate::XcmExecutor<<Self as RelayChain>::XcmConfig>,
<Self as RelayChain>::Runtime,
>::process_upward_message(origin, msg, max_weight)
})
fn process_message(
msg: &[u8],
para: Self::Origin,
meter: &mut $crate::WeightMeter,
) -> Result<bool, $crate::ProcessMessageError> {
use $crate::{Weight, AggregateMessageOrigin, UmpQueueId, ServiceQueues, EnqueueMessage};
use $mq as message_queue;
use $runtime_event as runtime_event;

Self::execute_with(|| {
<$mq as EnqueueMessage<AggregateMessageOrigin>>::enqueue_message(
msg.try_into().expect("Message too long"),
AggregateMessageOrigin::Ump(UmpQueueId::Para(para.clone()))
);

<$system>::reset_events();
<$mq as ServiceQueues>::service_queues(Weight::MAX);
let events = <$system>::events();
let event = events.last().expect("There must be at least one event");

match &event.event {
runtime_event::MessageQueue(
$crate::pallet_message_queue::Event::Processed {origin, ..}) => {
assert_eq!(origin, &AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
},
event => panic!("Unexpected event: {:#?}", event),
}
Ok(true)
})
}
}
}

$crate::__impl_test_ext_for_relay_chain!($name, $genesis, $on_init);
)+
};
}

Expand Down Expand Up @@ -800,12 +814,13 @@ macro_rules! decl_test_networks {
}

fn _process_upward_messages() {
use $crate::{UmpSink, Bounded};
use $crate::{Bounded, ProcessMessage, WeightMeter};
while let Some((from_para_id, msg)) = $crate::UPWARD_MESSAGES.with(|b| b.borrow_mut().get_mut(stringify!($name)).unwrap().pop_front()) {
let _ = <$relay_chain>::process_upward_message(
from_para_id.into(),
let mut weight_meter = WeightMeter::max_limit();
let _ = <$relay_chain>::process_message(
&msg[..],
$crate::Weight::max_value(),
from_para_id.into(),
&mut weight_meter,
);
}
}
Expand Down

0 comments on commit 8cb913d

Please sign in to comment.