Skip to content

Commit

Permalink
Return events from blocks skipped over during Finalization, too (#473)
Browse files Browse the repository at this point in the history
* make subscription stream generic in EventSubscription

* rename to EventSub/FinalizedEventSub

* wip fix some lifetimes so that event sub can depend on client in stream

* Cargo fmt + comment tweaks

* Add another comment

* factor out prev block header fetching into a separate function to tidy

* add a comment

* remove ListOrValue as it's unused

* Into<u128> on BlockNumber to simplify things

* clippy

* Fix an example and clippy

* simplify iterator now we are Into<u128>

* Into<u64> instead because it needs serializing, and test core logic

* Tweak missing block test to fill in >=2 holes

* tweak a comment
  • Loading branch information
jsdw authored Mar 10, 2022
1 parent a091d2b commit 4144a76
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 59 deletions.
4 changes: 2 additions & 2 deletions codegen/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ impl RuntimeGenerator {
::subxt::events::at::<T, Event>(self.client, block_hash).await
}

pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe::<T, Event>(self.client).await
}

pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/rpc_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?
.to_runtime_api::<polkadot::RuntimeApi<DefaultConfig, DefaultExtra<DefaultConfig>>>();

let block_number = 1;
let block_number = 1u32;

let block_hash = api
.client
Expand Down
1 change: 0 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
serde_json = "1.0.64"
thiserror = "1.0.24"
Expand Down
3 changes: 2 additions & 1 deletion subxt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub trait Config: 'static {
+ Default
+ Copy
+ core::hash::Hash
+ core::str::FromStr;
+ core::str::FromStr
+ Into<u64>;

/// The output of the `Hashing` function.
type Hash: Parameter
Expand Down
153 changes: 128 additions & 25 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ use crate::{
use codec::Decode;
use derivative::Derivative;
use futures::{
future::Either,
stream::{
self,
BoxStream,
},
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
task::Poll,
Expand All @@ -51,36 +57,111 @@ pub use super::{
/// [`Events::subscribe_finalized()`] if that is important.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, EventSub<T::Header>, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}

/// Subscribe to events from finalized blocks.
///
/// **Note:** This function is hidden from the documentation
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
/// and is exposed only to be called via the codegen. It may
/// break between minor releases.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, FinalizedEventSub<'a, T::Header>, T, Evs>, BasicError> {
// fetch the last finalised block details immediately, so that we'll get
// events for each block after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client,
last_finalized_block_number,
client.rpc().subscribe_finalized_blocks().await?,
);

Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Take a subscription that returns block headers, and if any block numbers are missed out
/// betweem the block number provided and what's returned from the subscription, we fill in
/// the gaps and get hold of all intermediate block headers.
///
/// **Note:** This is exposed so that we can run integration tests on it, but otherwise
/// should not be used directly and may break between minor releases.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<'a, S, E, T: Config>(
client: &'a Client<T>,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, BasicError>> + Send + 'a
where
S: Stream<Item = Result<T::Header, E>> + Send + 'a,
E: Into<BasicError> + Send + 'static,
{
sub.flat_map(move |s| {
// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, BasicError>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result<Header, BasicError>>;

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;

/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: 'static> {
#[derivative(Debug(bound = "Sub: std::fmt::Debug"))]
pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
block_header_subscription: Sub,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Expand All @@ -90,11 +171,12 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
_event_type: std::marker::PhantomData<Evs>,
}

impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
impl<'a, Sub, T: Config, Evs: Decode, E: Into<BasicError>>
EventSubscription<'a, Sub, T, Evs>
where
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
{
fn new(client: &'a Client<T>, block_header_subscription: Sub) -> Self {
EventSubscription {
finished: false,
client,
Expand All @@ -111,7 +193,10 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
}
}

impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin
for EventSubscription<'a, Sub, T, Evs>
{
}

// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
Expand All @@ -130,7 +215,13 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs>
where
T: Config,
Evs: Decode,
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
E: Into<BasicError>,
{
type Item = Result<Events<'a, T, Evs>, BasicError>;

fn poll_next(
Expand All @@ -155,7 +246,6 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
Expand All @@ -181,9 +271,22 @@ mod test {
use super::*;

// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[test]
#[allow(unused)]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
assert_send::<
EventSubscription<
EventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
assert_send::<
EventSubscription<
FinalizedEventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
}
}
3 changes: 3 additions & 0 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ pub use decoding::EventsDecodingError;
pub use event_subscription::{
subscribe,
subscribe_finalized,
subscribe_to_block_headers_filling_in_gaps,
EventSub,
EventSubscription,
FinalizedEventSub,
};
pub use events_type::{
at,
Expand Down
43 changes: 16 additions & 27 deletions subxt/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,6 @@ pub enum NumberOrHex {
Hex(U256),
}

/// RPC list or value wrapper.
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(untagged)]
pub enum ListOrValue<T> {
/// A list of values of given type.
List(Vec<T>),
/// A single value of given type.
Value(T),
}

/// Alias for the type of a block returned by `chain_getBlock`
pub type ChainBlock<T> =
SignedBlock<Block<<T as Config>::Header, <T as Config>::Extrinsic>>;
Expand All @@ -120,11 +110,19 @@ impl From<NumberOrHex> for BlockNumber {
}
}

impl From<u32> for BlockNumber {
fn from(x: u32) -> Self {
NumberOrHex::Number(x.into()).into()
// All unsigned ints can be converted into a BlockNumber:
macro_rules! into_block_number {
($($t: ty)+) => {
$(
impl From<$t> for BlockNumber {
fn from(x: $t) -> Self {
NumberOrHex::Number(x.into()).into()
}
}
)+
}
}
into_block_number!(u8 u16 u32 u64);

/// Arbitrary properties defined in the chain spec as a JSON object.
pub type SystemProperties = serde_json::Map<String, serde_json::Value>;
Expand Down Expand Up @@ -285,16 +283,11 @@ impl<T: Config> Rpc<T> {

/// Fetch the genesis hash
pub async fn genesis_hash(&self) -> Result<T::Hash, BasicError> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0)));
let block_zero = 0u32;
let params = rpc_params![block_zero];
let list_or_value: ListOrValue<Option<T::Hash>> =
let genesis_hash: Option<T::Hash> =
self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(genesis_hash) => {
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}

/// Fetch the metadata
Expand Down Expand Up @@ -346,13 +339,9 @@ impl<T: Config> Rpc<T> {
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, BasicError> {
let block_number = block_number.map(ListOrValue::Value);
let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
let block_hash = self.client.request("chain_getBlockHash", params).await?;
Ok(block_hash)
}

/// Get a block hash of the latest finalized block
Expand Down
4 changes: 2 additions & 2 deletions subxt/tests/integration/codegen/polkadot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27849,13 +27849,13 @@ pub mod api {
}
pub async fn subscribe(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe::<T, Event>(self.client).await
}
pub async fn subscribe_finalized(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
Expand Down
Loading

0 comments on commit 4144a76

Please sign in to comment.