Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add try_state check to Pallet MessageQueue #13502

8 changes: 4 additions & 4 deletions frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

use crate::{
mock::{
new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed,
YieldingQueues,
build_and_execute, CountingMessageProcessor, IntoWeight, MockedWeightInfo,
NumMessagesProcessed, YieldingQueues,
},
mock_helpers::MessageOrigin,
*,
Expand Down Expand Up @@ -123,7 +123,7 @@ fn stress_test_enqueue_and_service() {
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(42);

new_test_ext::<Test>().execute_with(|| {
build_and_execute::<Test>(|| {
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
let mut msgs_remaining = 0;
for _ in 0..blocks {
// Start by enqueuing a large number of messages.
Expand Down Expand Up @@ -171,7 +171,7 @@ fn stress_test_queue_suspension() {
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(41);

new_test_ext::<Test>().execute_with(|| {
build_and_execute::<Test>(|| {
let mut suspended = BTreeSet::<u32>::new();
let mut msgs_remaining = 0;

Expand Down
107 changes: 106 additions & 1 deletion frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,12 @@ pub mod pallet {
}
}

/// Check all assumptions about [`crate::Config`].
#[cfg(feature = "try-runtime")]
fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
}

/// Check all compile-time assumptions about [`crate::Config`].
fn integrity_test() {
assert!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
}
Expand Down Expand Up @@ -1105,6 +1110,106 @@ impl<T: Config> Pallet<T> {
ItemExecutionStatus::Executed(is_processed)
}

/// Ensure the correctness of state of this pallet.
///
/// # Assumptions-
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
///
/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
///
/// * `message_count` > 0
/// * `size` > 0
/// * `end` > `begin`
/// * Some(ready_neighbours)
/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
/// (only queue in ring)
///
/// For Pages(begin to end-1) in BookState:
///
/// * `remaining` > 0
/// * `remaining_size` > 0
/// * `first` <= `last`
/// * Every page can be decoded into peek_* functions
#[cfg(any(test, feature = "try-runtime"))]
pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
// Checking memory corruption for BookStateFor
ensure!(
BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
"Memory Corruption in BookStateFor"
);
// Checking memory corruption for Pages
ensure!(
Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
"Memory Corruption in Pages"
);

// No state to check
if ServiceHead::<T>::get().is_none() {
return Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still a few assumptions about books even if they are not in the ready ring, but it is okay for a first version.

}

//loop around this origin
let starting_origin = ServiceHead::<T>::get().unwrap();

while let Some(head) = Self::bump_service_head(&mut WeightMeter::max_limit()) {
ensure!(
BookStateFor::<T>::contains_key(&head),
"Service head must point to an existing book"
);

let head_book_state = BookStateFor::<T>::get(&head);
ensure!(
head_book_state.message_count > 0,
"There must be some messages if in ReadyRing"
);
ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
ensure!(
head_book_state.end > head_book_state.begin,
"End > Begin if unprocessed messages exists"
);
ensure!(
head_book_state.ready_neighbours.is_some(),
"There must be neighbours if in ReadyRing"
);

if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
ensure!(
head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
"Can only happen if only queue in ReadyRing"
);
}

for page_index in head_book_state.begin..head_book_state.end {
let page = Pages::<T>::get(&head, page_index).unwrap();
let remaining_messages = page.remaining;
let mut counted_remaining_messages = 0;
ensure!(
remaining_messages > 0.into(),
"These must be some messages that have not been processed yet!"
);

for i in 0..u32::MAX {
if let Some((_, processed, _)) = page.peek_index(i as usize) {
if !processed {
counted_remaining_messages += 1;
}
} else {
break
}
}

ensure!(
remaining_messages == counted_remaining_messages.into(),
"Memory Corruption"
);
}

if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
break
}
}
Ok(())
}

/// Print the pages in each queue and the messages in each page.
///
/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
Expand Down
13 changes: 9 additions & 4 deletions frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,15 @@ where
ext
}

/// Run this closure in test externalities.
pub fn test_closure<R>(f: impl FnOnce() -> R) -> R {
let mut ext = new_test_ext::<Test>();
ext.execute_with(f)
/// Run the function pointer inside externalities and asserts the try_state hook at the end.
pub fn build_and_execute<T: Config>(test: impl FnOnce() -> ())
gitofdeepanshu marked this conversation as resolved.
Show resolved Hide resolved
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
where
BlockNumberFor<T>: From<u32>,
{
new_test_ext::<T>().execute_with(|| {
test();
MessageQueue::do_try_state().expect("All invariants must hold after a test");
});
}

/// Set the weight of a specific weight function.
Expand Down
15 changes: 5 additions & 10 deletions frame/message-queue/src/mock_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn page<T: Config>(msg: &[u8]) -> PageOf<T> {
}

pub fn single_page_book<T: Config>() -> BookStateOf<T> {
BookState { begin: 0, end: 1, count: 1, ..Default::default() }
BookState { begin: 0, end: 1, count: 1, message_count: 1, size: 1, ..Default::default() }
}

pub fn empty_book<T: Config>() -> BookStateOf<T> {
Expand Down Expand Up @@ -139,10 +139,8 @@ pub fn setup_bump_service_head<T: Config>(
current: <<T as Config>::MessageProcessor as ProcessMessage>::Origin,
next: <<T as Config>::MessageProcessor as ProcessMessage>::Origin,
) {
let mut book = single_page_book::<T>();
book.ready_neighbours = Some(Neighbours::<MessageOriginOf<T>> { prev: next.clone(), next });
ServiceHead::<T>::put(&current);
BookStateFor::<T>::insert(&current, &book);
crate::Pallet::<T>::enqueue_message(msg("1"), current);
crate::Pallet::<T>::enqueue_message(msg("1"), next);
}

/// Knit a queue into the ready-ring and write it back to storage.
Expand All @@ -164,11 +162,8 @@ pub fn unknit<T: Config>(o: &<<T as Config>::MessageProcessor as ProcessMessage>
pub fn build_ring<T: Config>(
queues: &[<<T as Config>::MessageProcessor as ProcessMessage>::Origin],
) {
for queue in queues {
BookStateFor::<T>::insert(queue, empty_book::<T>());
}
for queue in queues {
knit::<T>(queue);
for queue in queues.iter() {
crate::Pallet::<T>::enqueue_message(msg("1"), queue.clone());
}
assert_ring::<T>(queues);
}
Expand Down
Loading