Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keep processing a block's events after encountering a dispatch error #310

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sp_core::Bytes;

/// Raw bytes for an Event
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub struct RawEvent {
/// The name of the pallet from whence the Event originated.
pub pallet: String,
Expand Down
236 changes: 214 additions & 22 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,60 @@ use crate::{
/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Config> {
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
block_reader: BlockReader<'a, T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
events: VecDeque<Raw>,
finished: bool,
}

enum BlockReader<'a, T: Config> {
Decoder {
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
},
/// Mock event listener for unit tests
#[cfg(test)]
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, Raw)>, Error>)>>),
}

impl<'a, T: Config> BlockReader<'a, T> {
async fn next(&mut self) -> Option<(T::Hash, Result<Vec<(Phase, Raw)>, Error>)> {
match self {
BlockReader::Decoder {
subscription,
decoder,
} => {
let change_set = subscription.next().await?;
let events: Result<Vec<_>, _> = change_set
.changes
.into_iter()
.filter_map(|(_key, change)| {
Some(decoder.decode_events(&mut change?.0.as_slice()))
})
.collect();

let flattened_events = events.map(|x| x.into_iter().flatten().collect());
Some((change_set.block, flattened_events))
}
#[cfg(test)]
BlockReader::Mock(it) => it.next(),
}
}
}

impl<'a, T: Config> EventSubscription<'a, T> {
/// Creates a new event subscription.
pub fn new(
subscription: EventStorageSubscription<T>,
decoder: &'a EventsDecoder<T>,
) -> Self {
Self {
subscription,
decoder,
block_reader: BlockReader::Decoder {
subscription,
decoder,
},
block: None,
extrinsic: None,
event: None,
Expand Down Expand Up @@ -89,44 +125,44 @@ impl<'a, T: Config> EventSubscription<'a, T> {
/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
loop {
if let Some(event) = self.events.pop_front() {
return Some(Ok(event))
if let Some(raw_event) = self.events.pop_front() {
match raw_event {
Raw::Event(event) => return Some(Ok(event)),
Raw::Error(err) => return Some(Err(err.into())),
};
}
if self.finished {
return None
}
// always return None if subscription has closed
let change_set = self.subscription.next().await?;
let (received_hash, events) = self.block_reader.next().await?;
if let Some(hash) = self.block.as_ref() {
if &change_set.block == hash {
if &received_hash == hash {
self.finished = true;
} else {
continue
}
}
for (_key, data) in change_set.changes {
if let Some(data) = data {
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
Ok(events) => events,
Err(error) => return Some(Err(error)),
};

match events {
Err(err) => return Some(Err(err)),
Ok(raw_events) => {
for (phase, raw) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if let Some(ext_index) = self.extrinsic {
if i as usize != ext_index {
continue
}
}
let event = match raw {
Raw::Event(event) => event,
Raw::Error(err) => return Some(Err(err.into())),
};
if let Some((module, variant)) = self.event {
if event.pallet != module || event.variant != variant {
continue
if let Raw::Event(ref event) = raw {
if event.pallet != module || event.variant != variant
{
continue
}
}
}
self.events.push_back(event);
self.events.push_back(raw);
}
}
}
Expand Down Expand Up @@ -227,3 +263,159 @@ where
}
}
}

#[cfg(test)]
mod tests {
use crate::RuntimeError;

use super::*;
use sp_core::H256;
#[derive(Clone)]
struct MockConfig;

impl Config for MockConfig {
type Index = u32;
type BlockNumber = u32;
type Hash = sp_core::H256;
type Hashing = sp_runtime::traits::BlakeTwo256;
type AccountId = sp_runtime::AccountId32;
type Address = sp_runtime::MultiAddress<Self::AccountId, u32>;
type Header = sp_runtime::generic::Header<
Self::BlockNumber,
sp_runtime::traits::BlakeTwo256,
>;
type Signature = sp_runtime::MultiSignature;
type Extrinsic = sp_runtime::OpaqueExtrinsic;
}

fn named_event(event_name: &str) -> RawEvent {
RawEvent {
data: sp_core::Bytes::from(Vec::new()),
pallet: event_name.to_string(),
variant: event_name.to_string(),
pallet_index: 0,
variant_index: 0,
}
}

fn raw_event(id: u8) -> RawEvent {
RawEvent {
data: sp_core::Bytes::from(Vec::new()),
pallet: "SomePallet".to_string(),
variant: "SomeVariant".to_string(),
pallet_index: id,
variant_index: id,
}
}

fn event(id: u8) -> Raw {
Raw::Event(raw_event(id))
}

#[async_std::test]
async fn test_error_does_not_stop_subscription() {
let mut subscription: EventSubscription<MockConfig> = EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![(
H256::from([0; 32]),
Ok(vec![
(
Phase::ApplyExtrinsic(0),
Raw::Error(RuntimeError::BadOrigin),
),
(Phase::ApplyExtrinsic(0), event(1)),
]),
)]
.into_iter(),
)),
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
};

assert!(matches!(
subscription.next().await.unwrap().unwrap_err(),
Error::Runtime(RuntimeError::BadOrigin)
));
assert_eq!(subscription.next().await.unwrap().unwrap(), raw_event(1));
assert!(subscription.next().await.is_none());
}

#[async_std::test]
/// test that filters work correctly, and are independent of each other
async fn test_filters() {
let mut events = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [Phase::ApplyExtrinsic(0), Phase::ApplyExtrinsic(1)] {
for event in [named_event("a"), named_event("b")] {
events.push((block_hash, phase.clone(), event))
}
}
}
// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, event)| {
event.2.variant_index = idx as u8;
});

for block_filter in [None, Some(H256::from([1; 32]))] {
for extrinsic_filter in [None, Some(1)] {
for event_filter in [None, Some(("b", "b"))] {
let mut subscription: EventSubscription<MockConfig> =
EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].0,
Ok(events
.iter()
.take(4)
.map(|(_, phase, event)| {
(phase.clone(), Raw::Event(event.clone()))
})
.collect()),
),
(
events[4].0,
Ok(events
.iter()
.skip(4)
.take(4)
.map(|(_, phase, event)| {
(phase.clone(), Raw::Event(event.clone()))
})
.collect()),
),
]
.into_iter(),
)),
block: block_filter.clone(),
extrinsic: extrinsic_filter.clone(),
event: event_filter.clone(),
events: Default::default(),
finished: false,
};
let mut expected_events = events.clone();
if let Some(hash) = block_filter {
expected_events.retain(|(h, _, _)| h == &hash);
}
if let Some(idx) = extrinsic_filter {
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
}
if let Some(name) = event_filter {
expected_events.retain(|(_, _, event)| event.pallet == name.0);
}
for expected_event in expected_events {
assert_eq!(
subscription.next().await.unwrap().unwrap(),
expected_event.2
);
}
assert!(subscription.next().await.is_none());
}
}
}
}
}