Skip to content

Commit

Permalink
Integrate HRMP (paritytech#258)
Browse files Browse the repository at this point in the history
* HRMP message ingestion

* Plumb hrmp_watermark to build_collation

* Plumb hrmp_watermark to ValidationResult

* Plumb hrmp outbound messages

* Implement message-broker part of HRMP

* Kill UPWARD_MESSAGES as well

Otherwise, they will get resent each block

* Add sudo versions for easier testing

* Remove the xcmp module

Not useful for the moment

* Doc for HRMP message handler

* Estimate the weight upper bound for on_finalize

* Remove a redundant type annotation

* fix spelling of a method

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <[email protected]>

* Deabbreviate dmp and hrmp in the message ingestion type

* Don't use binary_search since it's broken by a following rotate

Instead use the linear search. We can afford linear search here since
due to limited scalability of HRMP we can only have at most a couple of
dozens of channels.

* Fix the watermark

Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
pepyakin and bkchr authored Dec 15, 2020
1 parent ea10fa8 commit aba8f46
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 80 additions & 10 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use cumulus_network::WaitToAnnounce;
use cumulus_primitives::{
inherents::{DownwardMessagesType, DOWNWARD_MESSAGES_IDENTIFIER, VALIDATION_DATA_IDENTIFIER},
well_known_keys, ValidationData,
inherents::{self, VALIDATION_DATA_IDENTIFIER},
well_known_keys, ValidationData, InboundHrmpMessage, OutboundHrmpMessage, InboundDownwardMessage,
};
use cumulus_runtime::ParachainBlockData;

Expand Down Expand Up @@ -52,7 +52,7 @@ use log::{debug, error, info, trace};

use futures::prelude::*;

use std::{marker::PhantomData, sync::Arc, time::Duration};
use std::{marker::PhantomData, sync::Arc, time::Duration, collections::BTreeMap};

use parking_lot::Mutex;

Expand Down Expand Up @@ -146,8 +146,9 @@ where
/// for.
///
/// Returns `None` in case of an error.
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<DownwardMessagesType> {
self.polkadot_client
fn retrieve_dmq_contents(&self, relay_parent: PHash) -> Option<Vec<InboundDownwardMessage>> {
self
.polkadot_client
.runtime_api()
.dmq_contents_with_context(
&BlockId::hash(relay_parent),
Expand All @@ -164,6 +165,31 @@ where
.ok()
}

/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
/// collating for.
///
/// Empty channels are also included.
fn retrieve_all_inbound_hrmp_channel_contents(&self, relay_parent: PHash)
-> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>
{
self
.polkadot_client
.runtime_api()
.inbound_hrmp_channels_contents_with_context(
&BlockId::hash(relay_parent),
sp_core::ExecutionContext::Importing,
self.para_id,
)
.map_err(|e| {
error!(
target: "cumulus-collator",
"An error occured during requesting the inbound HRMP messages for {}: {:?}",
relay_parent, e,
);
})
.ok()
}

/// Get the inherent data with validation function parameters injected
fn inherent_data(
&mut self,
Expand Down Expand Up @@ -193,9 +219,18 @@ where
})
.ok()?;

let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
let message_ingestion_data = {
let downward_messages = self.retrieve_dmq_contents(relay_parent)?;
let horizontal_messages = self.retrieve_all_inbound_hrmp_channel_contents(relay_parent)?;

inherents::MessageIngestionType {
downward_messages,
horizontal_messages,
}
};

inherent_data
.put_data(DOWNWARD_MESSAGES_IDENTIFIER, &downward_messages)
.put_data(inherents::MESSAGE_INGESTION_IDENTIFIER, &message_ingestion_data)
.map_err(|e| {
error!(
target: "cumulus-collator",
Expand Down Expand Up @@ -296,15 +331,50 @@ where
None => 0,
};

let horizontal_messages = sp_io::storage::get(well_known_keys::HRMP_OUTBOUND_MESSAGES);
let horizontal_messages = match horizontal_messages
.map(|v| Vec::<OutboundHrmpMessage>::decode(&mut &v[..]))
{
Some(Ok(horizontal_messages)) => horizontal_messages,
Some(Err(e)) => {
error!(
target: "cumulus-collator",
"Failed to decode the horizontal messages: {:?}",
e
);
return None
}
None => Vec::new(),
};

let hrmp_watermark = sp_io::storage::get(well_known_keys::HRMP_WATERMARK);
let hrmp_watermark = match hrmp_watermark.map(|v| PBlockNumber::decode(&mut &v[..])) {
Some(Ok(hrmp_watermark)) => hrmp_watermark,
Some(Err(e)) => {
error!(
target: "cumulus-collator",
"Failed to decode the HRMP watermark: {:?}",
e
);
return None
}
None => {
// If the runtime didn't set `HRMP_WATERMARK`, then it means no messages were
// supplied via the message ingestion inherent. Assuming that the PVF/runtime
// checks that legitly there are no pending messages we can therefore move the
// watermark up to the relay-block number.
relay_block_number
}
};

Some(Collation {
upward_messages,
new_validation_code: new_validation_code.map(Into::into),
head_data,
proof_of_validity: PoV { block_data },
processed_downward_messages,
// TODO!
horizontal_messages: Vec::new(),
hrmp_watermark: relay_block_number,
horizontal_messages,
hrmp_watermark,
})
})
}
Expand Down
163 changes: 146 additions & 17 deletions message-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,57 +27,117 @@ use frame_support::{
weights::{DispatchClass, Weight},
StorageValue,
};
use frame_system::ensure_none;
use frame_system::{ensure_none, ensure_root};
use sp_inherents::{InherentData, InherentIdentifier, MakeFatalError, ProvideInherent};
use sp_std::{cmp, prelude::*};

use cumulus_primitives::{
inherents::{DownwardMessagesType, DOWNWARD_MESSAGES_IDENTIFIER},
well_known_keys, DownwardMessageHandler, InboundDownwardMessage, UpwardMessage,
inherents::{MessageIngestionType, MESSAGE_INGESTION_IDENTIFIER},
well_known_keys, DownwardMessageHandler, HrmpMessageHandler, OutboundHrmpMessage,
UpwardMessage, ParaId,
};

// TODO: these should be not a constant, but sourced from the relay-chain configuration.
const UMP_MSG_NUM_PER_CANDIDATE: usize = 5;
const HRMP_MSG_NUM_PER_CANDIDATE: usize = 5;

/// Configuration trait of the message broker pallet.
pub trait Config: frame_system::Config {
/// The downward message handlers that will be informed when a message is received.
type DownwardMessageHandlers: DownwardMessageHandler;
/// The HRMP message handlers that will be informed when a message is received.
type HrmpMessageHandlers: HrmpMessageHandler;
}

decl_storage! {
trait Store for Module<T: Config> as MessageBroker {
PendingUpwardMessages: Vec<UpwardMessage>;

/// Essentially `OutboundHrmpMessage`s grouped by the recipients.
OutboundHrmpMessages: map hasher(twox_64_concat) ParaId => Vec<Vec<u8>>;
/// HRMP channels with the given recipients are awaiting to be processed. If a `ParaId` is
/// present in this vector then `OutboundHrmpMessages` for it should be not empty.
NonEmptyHrmpChannels: Vec<ParaId>;
}
}

decl_module! {
pub struct Module<T: Config> for enum Call where origin: T::Origin {
/// An entrypoint for an inherent to deposit downward messages into the runtime. It accepts
/// and processes the list of downward messages.
/// and processes the list of downward messages and inbound HRMP messages.
#[weight = (10, DispatchClass::Mandatory)]
fn receive_downward_messages(origin, messages: Vec<InboundDownwardMessage>) {
fn ingest_inbound_messages(origin, messages: MessageIngestionType) {
ensure_none(origin)?;

let messages_len = messages.len() as u32;
for message in messages {
T::DownwardMessageHandlers::handle_downward_message(message);
let MessageIngestionType {
downward_messages,
horizontal_messages,
} = messages;

let dm_count = downward_messages.len() as u32;
for downward_message in downward_messages {
T::DownwardMessageHandlers::handle_downward_message(downward_message);
}

// Store the processed_downward_messages here so that it's will be accessible from
// PVF's `validate_block` wrapper and collation pipeline.
storage::unhashed::put(
well_known_keys::PROCESSED_DOWNWARD_MESSAGES,
&messages_len,
&dm_count,
);

let mut hrmp_watermark = None;
for (sender, channel_contents) in horizontal_messages {
for horizontal_message in channel_contents {
if hrmp_watermark
.map(|w| w < horizontal_message.sent_at)
.unwrap_or(true)
{
hrmp_watermark = Some(horizontal_message.sent_at);
}

T::HrmpMessageHandlers::handle_hrmp_message(sender, horizontal_message);
}
}

// If we processed at least one message, then advance watermark to that location.
if let Some(hrmp_watermark) = hrmp_watermark {
storage::unhashed::put(
well_known_keys::HRMP_WATERMARK,
&hrmp_watermark,
);
}
}

#[weight = (1_000, DispatchClass::Operational)]
fn sudo_send_upward_message(origin, message: UpwardMessage) {
ensure_root(origin)?;
let _ = Self::send_upward_message(message);
}

#[weight = (1_000, DispatchClass::Operational)]
fn sudo_send_hrmp_message(origin, message: OutboundHrmpMessage) {
ensure_root(origin)?;
let _ = Self::send_hrmp_message(message);
}

fn on_initialize() -> Weight {
// Reads and writes performed by `on_finalize`.
T::DbWeight::get().reads_writes(1, 2)
let mut weight = T::DbWeight::get().writes(3);
storage::unhashed::kill(well_known_keys::HRMP_WATERMARK);
storage::unhashed::kill(well_known_keys::UPWARD_MESSAGES);
storage::unhashed::kill(well_known_keys::HRMP_OUTBOUND_MESSAGES);

// Reads and writes performed by `on_finalize`. This may actually turn out to be lower,
// but we should err on the safe side.
weight += T::DbWeight::get().reads_writes(
2 + HRMP_MSG_NUM_PER_CANDIDATE as u64,
4 + HRMP_MSG_NUM_PER_CANDIDATE as u64,
);

weight
}

fn on_finalize() {
// TODO: this should be not a constant, but sourced from the relay-chain configuration.
const UMP_MSG_NUM_PER_CANDIDATE: usize = 5;

<Self as Store>::PendingUpwardMessages::mutate(|up| {
let num = cmp::min(UMP_MSG_NUM_PER_CANDIDATE, up.len());
storage::unhashed::put(
Expand All @@ -86,6 +146,50 @@ decl_module! {
);
*up = up.split_off(num);
});

// Sending HRMP messages is a little bit more involved. On top of the number of messages
// per block limit, there is also a constraint that it's possible to send only a single
// message to a given recipient per candidate.
let mut non_empty_hrmp_channels = NonEmptyHrmpChannels::get();
let outbound_hrmp_num = cmp::min(HRMP_MSG_NUM_PER_CANDIDATE, non_empty_hrmp_channels.len());
let mut outbound_hrmp_messages = Vec::with_capacity(outbound_hrmp_num);
let mut prune_empty = Vec::with_capacity(outbound_hrmp_num);

for &recipient in non_empty_hrmp_channels.iter().take(outbound_hrmp_num) {
let (message_payload, became_empty) =
<Self as Store>::OutboundHrmpMessages::mutate(&recipient, |v| {
// this panics if `v` is empty. However, we are iterating only once over non-empty
// channels, therefore it cannot panic.
let first = v.remove(0);
let became_empty = v.is_empty();
(first, became_empty)
});

outbound_hrmp_messages.push(OutboundHrmpMessage {
recipient,
data: message_payload,
});
if became_empty {
prune_empty.push(recipient);
}
}

// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intution how it works see the examples in its rustdoc.
non_empty_hrmp_channels.retain(|x| !prune_empty.contains(x));
non_empty_hrmp_channels.rotate_left(outbound_hrmp_num - prune_empty.len());

<Self as Store>::NonEmptyHrmpChannels::put(non_empty_hrmp_channels);
storage::unhashed::put(
well_known_keys::HRMP_OUTBOUND_MESSAGES,
&outbound_hrmp_messages,
);
}
}
}
Expand All @@ -96,23 +200,48 @@ pub enum SendUpErr {
TooBig,
}

/// An error that can be raised upon sending a horizontal message.
pub enum SendHorizonalErr {
/// The message sent is too big.
TooBig,
/// There is no channel to the specified destination.
NoChannel,
}

impl<T: Config> Module<T> {
pub fn send_upward_message(message: UpwardMessage) -> Result<(), SendUpErr> {
// TODO: check the message against the limit. The limit should be sourced from the
// relay-chain configuration.
<Self as Store>::PendingUpwardMessages::append(message);
Ok(())
}

pub fn send_hrmp_message(message: OutboundHrmpMessage) -> Result<(), SendHorizonalErr> {
// TODO:
// (a) check against the size limit sourced from the relay-chain configuration
// (b) check if the channel to the recipient is actually opened.

let OutboundHrmpMessage { recipient, data } = message;
<Self as Store>::OutboundHrmpMessages::append(&recipient, data);

<Self as Store>::NonEmptyHrmpChannels::mutate(|v| {
if !v.contains(&recipient) {
v.push(recipient);
}
});

Ok(())
}
}

impl<T: Config> ProvideInherent for Module<T> {
type Call = Call<T>;
type Error = MakeFatalError<()>;
const INHERENT_IDENTIFIER: InherentIdentifier = DOWNWARD_MESSAGES_IDENTIFIER;
const INHERENT_IDENTIFIER: InherentIdentifier = MESSAGE_INGESTION_IDENTIFIER;

fn create_inherent(data: &InherentData) -> Option<Self::Call> {
data.get_data::<DownwardMessagesType>(&DOWNWARD_MESSAGES_IDENTIFIER)
data.get_data::<MessageIngestionType>(&MESSAGE_INGESTION_IDENTIFIER)
.expect("Downward messages inherent data failed to decode")
.map(|msgs| Call::receive_downward_messages(msgs))
.map(|msgs| Call::ingest_inbound_messages(msgs))
}
}
2 changes: 2 additions & 0 deletions primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ sp-inherents = { git = "https://github.com/paritytech/substrate", default-featur
sp-std = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sc-chain-spec = { git = "https://github.com/paritytech/substrate", optional = true, branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }

# Polkadot dependencies
polkadot-parachain = { git = "https://github.com/paritytech/polkadot", default-features = false, branch = "master" }
Expand All @@ -33,4 +34,5 @@ std = [
"sp-inherents/std",
"polkadot-core-primitives/std",
"sp-runtime/std",
"sp-core/std",
]
Loading

0 comments on commit aba8f46

Please sign in to comment.