Skip to content

Commit

Permalink
make number of buckets processed configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
apopiak committed Nov 17, 2023
1 parent e6fb7a7 commit bfc317c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 42 deletions.
5 changes: 3 additions & 2 deletions cumulus/pallets/xcmp-queue/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ benchmarks! {
set_config_with_weight {}: update_weight_restrict_decay(RawOrigin::Root, Weight::from_parts(3_000_000, 0))
service_deferred {
let m in 1..T::MaxDeferredMessages::get();
let b in 1..T::MaxBucketsProcessed::get();

let max_messages = m as usize;
let max_processed = b as u16;
let para_id = ParaId::from(999);

let xcm = construct_xcm::<T::RuntimeCall>();
Expand All @@ -56,7 +58,6 @@ benchmarks! {
// We set `deferred_to` to the current relay block number to make sure that the messages are serviced.
let deferred_message = DeferredMessage { sent_at: relay_block, deferred_to: relay_block, sender: para_id, xcm };
let deferred_xcm_messages = vec![deferred_message.clone(); max_messages];
let max_processed = T::MaxBucketsProcessed::get() as u16;
for i in 0..max_processed {
crate::Pallet::<T>::inject_deferred_messages(para_id, (relay_block, i), deferred_xcm_messages.clone().try_into().unwrap());
assert_eq!(crate::Pallet::<T>::messages_deferred_to(para_id, (relay_block, i)).len(), max_messages);
Expand All @@ -70,7 +71,7 @@ benchmarks! {
assert!(crate::Pallet::<T>::update_xcmp_max_individual_weight(RawOrigin::Root.into(), weight).is_ok());
// account for the reads induced by trying to execute all `max_messages`
let weight_limit = weight.saturating_add(T::DbWeight::get().reads_writes((max_processed as usize * max_messages) as u64, 0));
} :_(RawOrigin::Root, weight_limit, para_id)
} :_(RawOrigin::Root, weight_limit, para_id, max_processed as u32)
verify
{
assert_eq!(crate::Pallet::<T>::messages_deferred_to(para_id, (relay_block, 0)).len(), 0);
Expand Down
76 changes: 45 additions & 31 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,31 +330,7 @@ pub mod pallet {
Ok(())
}

/// This extrinsic executes deferred messages up to the specified `weight_limit` and the current relay chain block number.
///
/// - `origin`: Must pass `ExecuteDeferredOrigin`.
/// - `weight_limit`: Maximum weight budget for deferred message execution.
/// - `para_id`: The queue to service.
#[pallet::call_index(9)]
#[pallet::weight((weight_limit.saturating_add(T::WeightInfo::service_deferred(T::MaxDeferredMessages::get())), DispatchClass::Operational))]
pub fn service_deferred(
origin: OriginFor<T>,
weight_limit: Weight,
para_id: ParaId,
) -> DispatchResultWithPostInfo {
T::ExecuteDeferredOrigin::ensure_origin(origin)?;

let relay_block_number = T::RelayChainBlockNumberProvider::current_block_number();
let QueueConfigData { xcmp_max_individual_weight, .. } = QueueConfig::<T>::get();

let weight_used = Self::service_deferred_queue(
para_id,
weight_limit,
relay_block_number,
xcmp_max_individual_weight,
);
Ok(Some(weight_used.saturating_add(T::WeightInfo::service_deferred(T::MaxDeferredMessages::get()))).into())
}
// 9 and 10 are deprecated

/// This extrinsic discards all deferred messages that match the given parameters.
///
Expand Down Expand Up @@ -458,6 +434,35 @@ pub mod pallet {

Ok(())
}

/// This extrinsic executes deferred messages up to the specified `weight_limit` and the current relay chain block number.
///
/// - `origin`: Must pass `ExecuteDeferredOrigin`.
/// - `weight_limit`: Maximum weight budget for deferred message execution.
/// - `para_id`: The queue to service.
/// - `max_processed`: The maximum number of buckets to process.
#[pallet::call_index(15)]
#[pallet::weight((weight_limit.saturating_add(T::WeightInfo::service_deferred(T::MaxDeferredMessages::get(), *max_processed)), DispatchClass::Operational))]
pub fn service_deferred(
origin: OriginFor<T>,
weight_limit: Weight,
para_id: ParaId,
max_processed: u32,
) -> DispatchResultWithPostInfo {
T::ExecuteDeferredOrigin::ensure_origin(origin)?;

let relay_block_number = T::RelayChainBlockNumberProvider::current_block_number();
let QueueConfigData { xcmp_max_individual_weight, .. } = QueueConfig::<T>::get();

let weight_used = Self::service_deferred_queue(
para_id,
weight_limit,
relay_block_number,
xcmp_max_individual_weight,
max_processed as usize,
);
Ok(Some(weight_used.saturating_add(T::WeightInfo::service_deferred(T::MaxDeferredMessages::get(), max_processed))).into())
}
}

#[pallet::event]
Expand Down Expand Up @@ -1175,12 +1180,18 @@ impl<T: Config> Pallet<T> {

let (sent_at, format) = status[index].message_metadata[0];


let weight_used_for_queue = if T::WeightInfo::service_deferred(T::MaxDeferredMessages::get()).any_gte(weight_remaining) {
let max_processed = T::MaxBucketsProcessed::get();
let weight_used_for_queue = if T::WeightInfo::service_deferred(T::MaxDeferredMessages::get(), max_processed).any_gte(weight_remaining) {
Weight::zero()
} else {
T::WeightInfo::service_deferred(T::MaxDeferredMessages::get()).saturating_add(
Self::service_deferred_queue(sender, weight_remaining, sent_at, xcmp_max_individual_weight))
T::WeightInfo::service_deferred(T::MaxDeferredMessages::get(), max_processed).saturating_add(
Self::service_deferred_queue(
sender,
weight_remaining,
sent_at,
xcmp_max_individual_weight,
max_processed as usize,
))
};
let weight_remaining = weight_remaining.saturating_sub(weight_used_for_queue);

Expand Down Expand Up @@ -1241,7 +1252,8 @@ impl<T: Config> Pallet<T> {
}
let mut keys = DeferredIndices::<T>::iter_keys();
let mut processed_all_queues = false;
let service_queue_weight = T::WeightInfo::service_deferred(T::MaxDeferredMessages::get());
let service_queue_weight = T::WeightInfo::service_deferred(
T::MaxDeferredMessages::get(), T::MaxBucketsProcessed::get());
while !processed_all_queues && max_weight.all_gt(weight_used.saturating_add(service_queue_weight)) {
if let Some(sender) = keys.next() {
weight_used.saturating_accrue(service_queue_weight);
Expand All @@ -1250,6 +1262,7 @@ impl<T: Config> Pallet<T> {
max_weight.saturating_sub(weight_used),
relay_chain_block_number,
max_individual_weight,
T::MaxBucketsProcessed::get() as usize,
));
} else {
processed_all_queues = true;
Expand All @@ -1265,6 +1278,7 @@ impl<T: Config> Pallet<T> {
max_weight: Weight,
up_to_relay_block_number: RelayBlockNumber,
max_individual_weight: Weight,
max_processed: usize,
) -> Weight {
if QueueSuspended::<T>::get() || DeferredQueueSuspended::<T>::get() {
return Weight::zero();
Expand All @@ -1275,7 +1289,7 @@ impl<T: Config> Pallet<T> {
let indices_to_process =
indices.range((Included(&(0, 0)), Included(&(up_to_relay_block_number, u16::MAX))));
let mut processed = BTreeSet::new();
for index in indices_to_process.take(T::MaxBucketsProcessed::get() as usize) {
for index in indices_to_process.take(max_processed) {
if weight_used.any_gte(max_weight) {
break;
}
Expand Down
21 changes: 15 additions & 6 deletions cumulus/pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ fn service_deferred_should_execute_deferred_messages() {
RelayBlockNumberProviderMock::set(7);

//Act
assert_ok!(XcmpQueue::service_deferred(RuntimeOrigin::root(), Weight::MAX, para_id));
assert_ok!(XcmpQueue::service_deferred(RuntimeOrigin::root(), Weight::MAX, para_id, <Test as Config>::MaxBucketsProcessed::get()));

//Assert
assert_eq!(create_bounded_btreeset([].into_iter()), DeferredIndices::<Test>::get(para_id));
Expand Down Expand Up @@ -531,7 +531,7 @@ fn service_deferred_should_store_unprocessed_messages() {
RelayBlockNumberProviderMock::set(7);

//Act
assert_ok!(XcmpQueue::service_deferred(RuntimeOrigin::root(), Weight::MAX, para_id));
assert_ok!(XcmpQueue::service_deferred(RuntimeOrigin::root(), Weight::MAX, para_id, <Test as Config>::MaxBucketsProcessed::get()));

//Assert
assert_deferred_messages!(para_id, (8, 0), vec![Some(msg_not_to_process)]);
Expand Down Expand Up @@ -566,7 +566,7 @@ fn service_deferred_should_fail_when_called_with_wrong_origin() {

//Act and assert
assert_noop!(
XcmpQueue::service_deferred(RuntimeOrigin::signed(100), Weight::MAX, para_id),
XcmpQueue::service_deferred(RuntimeOrigin::signed(100), Weight::MAX, para_id, <Test as Config>::MaxBucketsProcessed::get()),
BadOrigin
);
});
Expand All @@ -584,7 +584,10 @@ fn service_deferred_queues_should_pass_overweight_messages_to_overweight_queue()
let low_max_individual_weight = Weight::from_parts(100, 1);
let low_max_weight =
low_max_individual_weight.saturating_add(
<Test as Config>::WeightInfo::service_deferred(<Test as Config>::MaxDeferredMessages::get()));
<Test as Config>::WeightInfo::service_deferred(
<Test as Config>::MaxDeferredMessages::get(),
<Test as Config>::MaxBucketsProcessed::get(),
));
assert!(FixedWeigher::weight(&mut xcm).unwrap().any_gt(low_max_weight));
let versioned_xcm = VersionedXcm::from(xcm);
let para_id = ParaId::from(999);
Expand Down Expand Up @@ -628,7 +631,10 @@ fn service_deferred_queues_should_stop_processing_when_weight_limit_is_reached_f
let low_max_weight = FixedWeigher::weight(&mut xcm)
.unwrap()
.saturating_add(
<Test as Config>::WeightInfo::service_deferred(<Test as Config>::MaxDeferredMessages::get()));
<Test as Config>::WeightInfo::service_deferred(
<Test as Config>::MaxDeferredMessages::get(),
<Test as Config>::MaxBucketsProcessed::get(),
));
let versioned_xcm = VersionedXcm::from(xcm);
let para_id = ParaId::from(999);
let second_para_id = ParaId::from(1000);
Expand Down Expand Up @@ -687,7 +693,10 @@ fn service_deferred_queues_should_stop_processing_when_weight_limit_is_reached_f
let low_max_weight = FixedWeigher::weight(&mut xcm)
.unwrap()
.saturating_add(
<Test as Config>::WeightInfo::service_deferred(<Test as Config>::MaxDeferredMessages::get()));
<Test as Config>::WeightInfo::service_deferred(
<Test as Config>::MaxDeferredMessages::get(),
<Test as Config>::MaxBucketsProcessed::get(),
));
let versioned_xcm = VersionedXcm::from(xcm);
let para_id = ParaId::from(999);
let mut xcmp_message = Vec::new();
Expand Down
6 changes: 3 additions & 3 deletions cumulus/pallets/xcmp-queue/src/weights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use sp_std::marker::PhantomData;
pub trait WeightInfo {
fn set_config_with_u32() -> Weight;
fn set_config_with_weight() -> Weight;
fn service_deferred(m: u32) -> Weight;
fn service_deferred(m: u32, b: u32) -> Weight;
fn discard_deferred_bucket(m: u32) -> Weight;
fn discard_deferred_individual(m: u32) -> Weight;
fn try_place_in_deferred_queue(m: u32) -> Weight;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl<T: frame_system::Config> WeightInfo for SubstrateWeight<T> {
// Proof Skipped: XcmpQueue OverweightCount (max_values: Some(1), max_size: None, mode: Measured)
// Storage: XcmpQueue Overweight (r:100 w:100)
// Proof Skipped: XcmpQueue Overweight (max_values: None, max_size: None, mode: Measured)
fn service_deferred(m: u32) -> Weight {
fn service_deferred(m: u32, b: u32) -> Weight {
Weight::from_parts(221_105_465_000 as u64, 0)
.saturating_add(T::DbWeight::get().reads(107 as u64))
.saturating_add(T::DbWeight::get().writes(104 as u64))
Expand Down Expand Up @@ -126,7 +126,7 @@ impl WeightInfo for () {
// Proof Skipped: XcmpQueue OverweightCount (max_values: Some(1), max_size: None, mode: Measured)
// Storage: XcmpQueue Overweight (r:100 w:100)
// Proof Skipped: XcmpQueue Overweight (max_values: None, max_size: None, mode: Measured)
fn service_deferred(m: u32) -> Weight {
fn service_deferred(m: u32, b: u32) -> Weight {
Weight::from_parts(221_105_465_000 as u64, 0)
.saturating_add(RocksDbWeight::get().reads(107 as u64))
.saturating_add(RocksDbWeight::get().writes(104 as u64))
Expand Down

0 comments on commit bfc317c

Please sign in to comment.