Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

dmp-queue: Store messages if already processed more than the maximum #2343

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 81 additions & 39 deletions pallets/dmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ pub mod pallet {
},
/// Downward message from the overweight queue was executed.
OverweightServiced { overweight_index: OverweightIndex, weight_used: Weight },
/// The maximum number of downward messages was.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The maximum number of downward messages was.
/// The maximum number of downward messages was exhausted and the message enqueued for
/// later processing.

MaxMessagesExhausted { message_id: MessageId },
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -306,46 +308,53 @@ pub mod pallet {
};

for (i, (sent_at, data)) in iter.enumerate() {
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
break
}
if maybe_enqueue_page.is_none() {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit = Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));

Self::deposit_event(Event::MaxMessagesExhausted {
message_id: sp_io::hashing::blake2_256(&data),
});
} else {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit =
Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
}
}
}
// Cannot be an `else` here since the `maybe_enqueue_page` may have changed.
Expand Down Expand Up @@ -888,4 +897,37 @@ mod tests {
assert_eq!(pages_queued(), 1);
});
}

#[test]
fn handle_max_messages_per_block() {
new_test_ext().execute_with(|| {
enqueue(&vec![msg(1000), msg(1001)]);
enqueue(&vec![msg(1002), msg(1003)]);
enqueue(&vec![msg(1004), msg(1005)]);

let incoming = (0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg(1006 + i as u64))
.collect::<Vec<_>>();
handle_messages(&incoming, Weight::from_parts(25000, 25000));

assert_eq!(
take_trace(),
(0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
assert_eq!(pages_queued(), 1);

handle_messages(&[], Weight::from_parts(25000, 25000));
assert_eq!(
take_trace(),
(MAX_MESSAGES_PER_BLOCK..MAX_MESSAGES_PER_BLOCK + 6)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
});
}
}