Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pallet-message-queue: add queue pausing #14318

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/node/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ impl pallet_message_queue::Config for Runtime {
type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor<u32>;
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = ConstU32<{ 64 * 1024 }>;
type MaxStale = ConstU32<128>;
type ServiceWeight = MessageQueueServiceWeight;
Expand Down
7 changes: 4 additions & 3 deletions frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::{
mock::{
new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed,
SuspendedQueues,
YieldingQueues,
},
mock_helpers::MessageOrigin,
*,
Expand Down Expand Up @@ -96,6 +96,7 @@ impl Config for Test {
type MessageProcessor = CountingMessageProcessor;
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
Expand Down Expand Up @@ -207,7 +208,7 @@ fn stress_test_queue_suspension() {
to_resume,
per_queue.len()
);
SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());
YieldingQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());

// Pick a fraction of all messages currently in queue and process them.
let resumed_messages =
Expand All @@ -229,7 +230,7 @@ fn stress_test_queue_suspension() {
process_all_messages(resumed_messages);
msgs_remaining -= resumed_messages;

let resumed = SuspendedQueues::take();
let resumed = YieldingQueues::take();
log::info!("Resumed all {} suspended queues", resumed.len());
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
Expand Down
29 changes: 26 additions & 3 deletions frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ use frame_support::{
pallet_prelude::*,
traits::{
DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage,
ProcessMessageError, ServiceQueues,
ProcessMessageError, QueuePausedQuery, ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
Expand Down Expand Up @@ -473,6 +473,13 @@ pub mod pallet {
/// removed.
type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;

/// Queried by the pallet to check whether a queue can be serviced.
///
/// This also applies to manual servicing via `execute_overweight` and `service_queues`. The
/// value of this is only polled once before servicing the queue. This means that changes to
/// it that happen *within* the servicing will not be reflected.
type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved

/// The size of the page; this implies the maximum message size which can be sent.
///
/// A good value depends on the expected message sizes, their weights, the weight that is
Expand Down Expand Up @@ -534,6 +541,10 @@ pub mod pallet {
/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
/// retrying.
TemporarilyUnprocessable,
/// The queue is paused and no message can be executed from it.
///
/// This can change at any time and may resolve in the future by re-trying.
QueuePaused,
}

/// The index of the first and last (non-empty) pages.
Expand Down Expand Up @@ -694,6 +705,8 @@ impl<T: Config> Pallet<T> {
}

fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
debug_assert!(!T::QueuePausedQuery::is_paused(origin));

if origin == &neighbours.next {
debug_assert!(
origin == &neighbours.prev,
Expand Down Expand Up @@ -770,7 +783,7 @@ impl<T: Config> Pallet<T> {
BookStateFor::<T>::insert(origin, book_state);
return
}
} else {
} else if !T::QueuePausedQuery::is_paused(origin) {
debug_assert!(
book_state.ready_neighbours.is_none(),
"Must not be in ready ring if not ready"
Expand Down Expand Up @@ -803,6 +816,8 @@ impl<T: Config> Pallet<T> {
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
let mut book_state = BookStateFor::<T>::get(&origin);
ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);

let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
let (pos, is_processed, payload) =
page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
Expand Down Expand Up @@ -943,6 +958,10 @@ impl<T: Config> Pallet<T> {

let mut book_state = BookStateFor::<T>::get(&origin);
let mut total_processed = 0;
if T::QueuePausedQuery::is_paused(&origin) {
let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
return (false, next_ready)
}

while book_state.end > book_state.begin {
let (processed, status) =
Expand Down Expand Up @@ -1284,7 +1303,11 @@ impl<T: Config> ServiceQueues for Pallet<T> {
Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
|e| match e {
Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
_ => ExecuteOverweightError::NotFound,
Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
ExecuteOverweightError::NotFound,
_ => ExecuteOverweightError::Other,
},
)
}
Expand Down
23 changes: 21 additions & 2 deletions frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Config for Test {
type MessageProcessor = RecordingMessageProcessor;
type Size = u32;
type QueueChangeHandler = RecordingQueueChangeHandler;
type QueuePausedQuery = MockedQueuePauser;
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
Expand Down Expand Up @@ -154,7 +155,8 @@ impl crate::weights::WeightInfo for MockedWeightInfo {

parameter_types! {
pub static MessagesProcessed: Vec<(Vec<u8>, MessageOrigin)> = vec![];
pub static SuspendedQueues: Vec<MessageOrigin> = vec![];
/// Queues that should return `Yield` upon being processed.
pub static YieldingQueues: Vec<MessageOrigin> = vec![];
}

/// A message processor which records all processed messages into [`MessagesProcessed`].
Expand Down Expand Up @@ -205,7 +207,7 @@ impl ProcessMessage for RecordingMessageProcessor {
/// Processed a mocked message. Messages that end with `badformat`, `corrupt`, `unsupported` or
/// `yield` will fail with an error respectively.
fn processing_message(msg: &[u8], origin: &MessageOrigin) -> Result<(), ProcessMessageError> {
if SuspendedQueues::get().contains(&origin) {
if YieldingQueues::get().contains(&origin) {
return Err(ProcessMessageError::Yield)
}

Expand Down Expand Up @@ -270,6 +272,17 @@ impl OnQueueChanged<MessageOrigin> for RecordingQueueChangeHandler {
}
}

parameter_types! {
pub static PausedQueues: Vec<MessageOrigin> = vec![];
}

pub struct MockedQueuePauser;
impl QueuePausedQuery<MessageOrigin> for MockedQueuePauser {
fn is_paused(id: &MessageOrigin) -> bool {
PausedQueues::get().contains(id)
}
}

/// Create new test externalities.
///
/// Is generic since it is used by the unit test, integration tests and benchmarks.
Expand All @@ -287,6 +300,12 @@ where
ext
}

/// Run this closure in test externalities.
pub fn test_closure<R>(f: impl FnOnce() -> R) -> R {
let mut ext = new_test_ext::<Test>();
ext.execute_with(f)
}

/// Set the weight of a specific weight function.
pub fn set_weight(name: &str, w: Weight) {
MockedWeightInfo::set_weight::<Test>(name, w);
Expand Down
6 changes: 3 additions & 3 deletions frame/message-queue/src/mock_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ pub fn page<T: Config>(msg: &[u8]) -> PageOf<T> {
}

pub fn single_page_book<T: Config>() -> BookStateOf<T> {
BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 }
BookState { begin: 0, end: 1, count: 1, ..Default::default() }
}

pub fn empty_book<T: Config>() -> BookStateOf<T> {
BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 }
BookState { begin: 0, end: 1, count: 1, ..Default::default() }
}

/// Returns a full page of messages with their index as payload and the number of messages.
Expand All @@ -118,9 +118,9 @@ pub fn book_for<T: Config>(page: &PageOf<T>) -> BookStateOf<T> {
count: 1,
begin: 0,
end: 1,
ready_neighbours: None,
message_count: page.remaining.into() as u64,
size: page.remaining_size.into() as u64,
..Default::default()
}
}

Expand Down
Loading