diff --git a/codegen/src/api/mod.rs b/codegen/src/api/mod.rs index c2cd719d3a..5b706443db 100644 --- a/codegen/src/api/mod.rs +++ b/codegen/src/api/mod.rs @@ -343,11 +343,11 @@ impl RuntimeGenerator { ::subxt::events::at::(self.client, block_hash).await } - pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> { + pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } - pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> { + pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } } diff --git a/examples/examples/rpc_call.rs b/examples/examples/rpc_call.rs index 2763ba97ba..71eafd1f34 100644 --- a/examples/examples/rpc_call.rs +++ b/examples/examples/rpc_call.rs @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { .await? .to_runtime_api::>>(); - let block_number = 1; + let block_number = 1u32; let block_hash = api .client diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index eb92b76401..13d3801c74 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -22,7 +22,6 @@ futures = "0.3.13" hex = "0.4.3" jsonrpsee = { version = "0.8.0", features = ["async-client", "client-ws-transport"] } log = "0.4.14" -num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } serde_json = "1.0.64" thiserror = "1.0.24" diff --git a/subxt/src/config.rs b/subxt/src/config.rs index dd7b727790..09c363c2ac 100644 --- a/subxt/src/config.rs +++ b/subxt/src/config.rs @@ -46,7 +46,8 @@ pub trait Config: 'static { + Default + Copy + core::hash::Hash - + core::str::FromStr; + + core::str::FromStr + + Into; /// The output of the `Hashing` function. type Hash: Parameter diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index afd7fb3220..a19d3f6867 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -24,12 +24,18 @@ use crate::{ use codec::Decode; use derivative::Derivative; use futures::{ + future::Either, + stream::{ + self, + BoxStream, + }, Future, FutureExt, Stream, StreamExt, }; use jsonrpsee::core::client::Subscription; +use sp_runtime::traits::Header; use std::{ marker::Unpin, task::Poll, @@ -51,12 +57,12 @@ pub use super::{ /// [`Events::subscribe_finalized()`] if that is important. /// /// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe()` over calling this directly. +/// and is exposed only to be called via the codegen. It may +/// break between minor releases. #[doc(hidden)] -pub async fn subscribe( - client: &'_ Client, -) -> Result, BasicError> { +pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>( + client: &'a Client, +) -> Result, T, Evs>, BasicError> { let block_subscription = client.rpc().subscribe_blocks().await?; Ok(EventSubscription::new(client, block_subscription)) } @@ -64,23 +70,98 @@ pub async fn subscribe( /// Subscribe to events from finalized blocks. /// /// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe_finalized()` over calling this directly. +/// and is exposed only to be called via the codegen. It may +/// break between minor releases. #[doc(hidden)] -pub async fn subscribe_finalized( - client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_finalized_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) +pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>( + client: &'a Client, +) -> Result, T, Evs>, BasicError> { + // fetch the last finalised block details immediately, so that we'll get + // events for each block after this one. + let last_finalized_block_hash = client.rpc().finalized_head().await?; + let last_finalized_block_number = client + .rpc() + .header(Some(last_finalized_block_hash)) + .await? + .map(|h| (*h.number()).into()); + + // Fill in any gaps between the block above and the finalized blocks reported. + let block_subscription = subscribe_to_block_headers_filling_in_gaps( + client, + last_finalized_block_number, + client.rpc().subscribe_finalized_blocks().await?, + ); + + Ok(EventSubscription::new(client, Box::pin(block_subscription))) } +/// Take a subscription that returns block headers, and if any block numbers are missed out +/// betweem the block number provided and what's returned from the subscription, we fill in +/// the gaps and get hold of all intermediate block headers. +/// +/// **Note:** This is exposed so that we can run integration tests on it, but otherwise +/// should not be used directly and may break between minor releases. +#[doc(hidden)] +pub fn subscribe_to_block_headers_filling_in_gaps<'a, S, E, T: Config>( + client: &'a Client, + mut last_block_num: Option, + sub: S, +) -> impl Stream> + Send + 'a +where + S: Stream> + Send + 'a, + E: Into + Send + 'static, +{ + sub.flat_map(move |s| { + // Get the header, or return a stream containing just the error. Our EventSubscription + // stream will return `None` as soon as it hits an error like this. + let header = match s { + Ok(header) => header, + Err(e) => return Either::Left(stream::once(async { Err(e.into()) })), + }; + + // We want all previous details up to, but not including this current block num. + let end_block_num = (*header.number()).into(); + + // This is one after the last block we returned details for last time. + let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num); + + // Iterate over all of the previous blocks we need headers for, ignoring the current block + // (which we already have the header info for): + let previous_headers = stream::iter(start_block_num..end_block_num) + .then(move |n| { + async move { + let hash = client.rpc().block_hash(Some(n.into())).await?; + let header = client.rpc().header(hash).await?; + Ok::<_, BasicError>(header) + } + }) + .filter_map(|h| async { h.transpose() }); + + // On the next iteration, we'll get details starting just after this end block. + last_block_num = Some(end_block_num); + + // Return a combination of any previous headers plus the new header. + Either::Right(previous_headers.chain(stream::once(async { Ok(header) }))) + }) +} + +/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen. +#[doc(hidden)] +pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result>; + +/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back +/// in codegen from `subscribe`, and is exposed to be used in codegen. +#[doc(hidden)] +pub type EventSub = Subscription; + /// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. #[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct EventSubscription<'a, T: Config, Evs: 'static> { +#[derivative(Debug(bound = "Sub: std::fmt::Debug"))] +pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> { finished: bool, client: &'a Client, - block_header_subscription: Subscription, + block_header_subscription: Sub, #[derivative(Debug = "ignore")] at: Option< std::pin::Pin< @@ -90,11 +171,12 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> { _event_type: std::marker::PhantomData, } -impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { - fn new( - client: &'a Client, - block_header_subscription: Subscription, - ) -> Self { +impl<'a, Sub, T: Config, Evs: Decode, E: Into> + EventSubscription<'a, Sub, T, Evs> +where + Sub: Stream> + Unpin + 'a, +{ + fn new(client: &'a Client, block_header_subscription: Sub) -> Self { EventSubscription { finished: false, client, @@ -111,7 +193,10 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { } } -impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} +impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin + for EventSubscription<'a, Sub, T, Evs> +{ +} // We want `EventSubscription` to implement Stream. The below implementation is the rather verbose // way to roughly implement the following function: @@ -130,7 +215,13 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} // // The advantage of this manual implementation is that we have a named type that we (and others) // can derive things on, store away, alias etc. -impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { +impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs> +where + T: Config, + Evs: Decode, + Sub: Stream> + Unpin + 'a, + E: Into, +{ type Item = Result, BasicError>; fn poll_next( @@ -155,7 +246,6 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { return Poll::Ready(Some(Err(e.into()))) } Some(Ok(block_header)) => { - use sp_runtime::traits::Header; // Note [jsdw]: We may be able to get rid of the per-item allocation // with https://github.com/oblique/reusable-box-future. self.at = Some(Box::pin(at(self.client, block_header.hash()))); @@ -181,9 +271,22 @@ mod test { use super::*; // Ensure `EventSubscription` can be sent; only actually a compile-time check. - #[test] + #[allow(unused)] fn check_sendability() { fn assert_send() {} - assert_send::>(); + assert_send::< + EventSubscription< + EventSub<::Header>, + crate::DefaultConfig, + (), + >, + >(); + assert_send::< + EventSubscription< + FinalizedEventSub<::Header>, + crate::DefaultConfig, + (), + >, + >(); } } diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 846019563a..a6ed76b568 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -25,7 +25,10 @@ pub use decoding::EventsDecodingError; pub use event_subscription::{ subscribe, subscribe_finalized, + subscribe_to_block_headers_filling_in_gaps, + EventSub, EventSubscription, + FinalizedEventSub, }; pub use events_type::{ at, diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index 73e39e5495..5f062aca4e 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -96,16 +96,6 @@ pub enum NumberOrHex { Hex(U256), } -/// RPC list or value wrapper. -#[derive(Serialize, Deserialize, Debug, PartialEq)] -#[serde(untagged)] -pub enum ListOrValue { - /// A list of values of given type. - List(Vec), - /// A single value of given type. - Value(T), -} - /// Alias for the type of a block returned by `chain_getBlock` pub type ChainBlock = SignedBlock::Header, ::Extrinsic>>; @@ -120,11 +110,19 @@ impl From for BlockNumber { } } -impl From for BlockNumber { - fn from(x: u32) -> Self { - NumberOrHex::Number(x.into()).into() +// All unsigned ints can be converted into a BlockNumber: +macro_rules! into_block_number { + ($($t: ty)+) => { + $( + impl From<$t> for BlockNumber { + fn from(x: $t) -> Self { + NumberOrHex::Number(x.into()).into() + } + } + )+ } } +into_block_number!(u8 u16 u32 u64); /// Arbitrary properties defined in the chain spec as a JSON object. pub type SystemProperties = serde_json::Map; @@ -285,16 +283,11 @@ impl Rpc { /// Fetch the genesis hash pub async fn genesis_hash(&self) -> Result { - let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(0))); + let block_zero = 0u32; let params = rpc_params![block_zero]; - let list_or_value: ListOrValue> = + let genesis_hash: Option = self.client.request("chain_getBlockHash", params).await?; - match list_or_value { - ListOrValue::Value(genesis_hash) => { - genesis_hash.ok_or_else(|| "Genesis hash not found".into()) - } - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } + genesis_hash.ok_or_else(|| "Genesis hash not found".into()) } /// Fetch the metadata @@ -346,13 +339,9 @@ impl Rpc { &self, block_number: Option, ) -> Result, BasicError> { - let block_number = block_number.map(ListOrValue::Value); let params = rpc_params![block_number]; - let list_or_value = self.client.request("chain_getBlockHash", params).await?; - match list_or_value { - ListOrValue::Value(hash) => Ok(hash), - ListOrValue::List(_) => Err("Expected a Value, got a List".into()), - } + let block_hash = self.client.request("chain_getBlockHash", params).await?; + Ok(block_hash) } /// Get a block hash of the latest finalized block diff --git a/subxt/tests/integration/codegen/polkadot.rs b/subxt/tests/integration/codegen/polkadot.rs index 79c070decd..e158b897e7 100644 --- a/subxt/tests/integration/codegen/polkadot.rs +++ b/subxt/tests/integration/codegen/polkadot.rs @@ -27849,13 +27849,13 @@ pub mod api { } pub async fn subscribe( &self, - ) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe::(self.client).await } pub async fn subscribe_finalized( &self, - ) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> + ) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> { ::subxt::events::subscribe_finalized::(self.client).await } diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index fbeb7cb17e..44f00845e6 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -136,6 +136,58 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { Ok(()) } +#[async_std::test] +async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::BasicError> { + // This function is not publically available to use, but contains + // the key logic for filling in missing blocks, so we want to test it. + // This is used in `subscribe_finalized` to ensure no block headers are + // missed. + use subxt::events::subscribe_to_block_headers_filling_in_gaps; + + let ctx = test_context().await; + + // Manually subscribe to the next 6 finalized block headers, but deliberately + // filter out some in the middle so we get back b _ _ b _ b. This guarantees + // that there will be some gaps, even if there aren't any from the subscription. + let some_finalized_blocks = ctx + .api + .client + .rpc() + .subscribe_finalized_blocks() + .await? + .enumerate() + .take(6) + .filter(|(n, _)| { + let n = *n; + async move { n == 0 || n == 3 || n == 5 } + }) + .map(|(_, h)| h); + + // This should spot any gaps in the middle and fill them back in. + let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps( + &ctx.api.client, + None, + some_finalized_blocks, + ); + futures::pin_mut!(all_finalized_blocks); + + // Iterate the block headers, making sure we get them all in order. + let mut last_block_number = None; + while let Some(header) = all_finalized_blocks.next().await { + let header = header?; + + use sp_runtime::traits::Header; + let block_number: u128 = (*header.number()).into(); + + if let Some(last) = last_block_number { + assert_eq!(last + 1, block_number); + } + last_block_number = Some(block_number); + } + + Ok(()) +} + // This is just a compile-time check that we can subscribe to events in // a context that requires the event subscription/filtering to be Send-able. // We test a typical use of EventSubscription and FilterEvents. We don't need