Skip to content

Commit

Permalink
Events sub (paritytech#126)
Browse files Browse the repository at this point in the history
* Make event subscription logic more generic.

* Fix build.

* Add test-node.

* Update deps.

* Address review comments.
  • Loading branch information
dvc94ch authored Jun 25, 2020
1 parent 3080ec9 commit 7f08471
Show file tree
Hide file tree
Showing 22 changed files with 1,451 additions and 98 deletions.
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = [".", "client", "proc-macro"]
members = [".", "client", "proc-macro", "test-node"]

[package]
name = "substrate-subxt"
Expand All @@ -25,7 +25,7 @@ thiserror = "1.0.20"
futures = "0.3.5"
jsonrpsee = { version = "0.1.0", features = ["ws"] }
num-traits = { version = "0.2.12", default-features = false }
serde = { version = "1.0.113", features = ["derive"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.55"
url = "2.1.1"
codec = { package = "parity-scale-codec", version = "1.3", default-features = false, features = ["derive", "full"] }
Expand All @@ -48,12 +48,8 @@ async-std = { version = "=1.5.0", features = ["attributes"] }
env_logger = "0.7.1"
wabt = "0.9.2"
frame-system = { version = "2.0.0-rc3", package = "frame-system" }
node-template = { git = "https://github.com/paritytech/substrate" }
pallet-balances = { version = "2.0.0-rc3", package = "pallet-balances" }
sp-keyring = { version = "2.0.0-rc3", package = "sp-keyring" }
substrate-subxt-client = { path = "client" }
tempdir = "0.3.7"

[patch.crates-io]
sc-network = { git = "https://github.com/paritytech/substrate" }
sc-service = { git = "https://github.com/paritytech/substrate" }
test-node = { path = "test-node" }
3 changes: 2 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ thiserror = "1.0.20"
[dev-dependencies]
async-std = { version = "=1.5.0", features = ["attributes"] }
env_logger = "0.7.1"
node-template = { git = "https://github.com/paritytech/substrate" }
substrate-subxt = { path = ".." }
tempdir = "0.3.7"
test-node = { path = "../test-node" }
2 changes: 1 addition & 1 deletion client/gen-chain-spec.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/sh
NODE_TEMPLATE=../../substrate/target/release/node-template
NODE_TEMPLATE=../target/release/test-node
$NODE_TEMPLATE purge-chain --dev
$NODE_TEMPLATE build-spec --dev > dev-chain.json
rm -rf /tmp/subxt-light-client
2 changes: 1 addition & 1 deletion client/purge-chain.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
NODE_TEMPLATE=../../substrate/target/release/node-template
NODE_TEMPLATE=../target/release/test-node
$NODE_TEMPLATE purge-chain --chain=dev-chain.json
rm -rf /tmp/subxt-light-client
2 changes: 1 addition & 1 deletion client/run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/sh
NODE_TEMPLATE=../../substrate/target/release/node-template
NODE_TEMPLATE=../target/release/test-node
$NODE_TEMPLATE --chain=dev-chain.json --alice
21 changes: 10 additions & 11 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use sc_service::{
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::Poll,
};
use thiserror::Error;
Expand Down Expand Up @@ -209,7 +208,7 @@ fn start_subxt_client<C: ChainSpec + 'static, S: AbstractService>(
impl_version: config.impl_version,
chain_spec: Box::new(config.chain_spec),
role: config.role.into(),
task_executor: Arc::new(move |fut, ty| {
task_executor: std::sync::Arc::new(move |fut, ty| {
match ty {
TaskType::Async => task::spawn(fut),
TaskType::Blocking => task::spawn_blocking(|| task::block_on(fut)),
Expand All @@ -220,7 +219,6 @@ fn start_subxt_client<C: ChainSpec + 'static, S: AbstractService>(
max_runtime_instances: 8,
announce_block: true,
dev_key_seed: config.role.into(),
base_path: None,

telemetry_endpoints: Default::default(),
telemetry_external_transport: Default::default(),
Expand All @@ -233,7 +231,6 @@ fn start_subxt_client<C: ChainSpec + 'static, S: AbstractService>(
pruning: Default::default(),
rpc_cors: Default::default(),
rpc_http: Default::default(),
rpc_ipc: Default::default(),
rpc_ws: Default::default(),
rpc_ws_max_connections: Default::default(),
rpc_methods: Default::default(),
Expand Down Expand Up @@ -303,6 +300,7 @@ mod tests {
KusamaRuntime as NodeTemplateRuntime,
PairSigner,
};
use tempdir::TempDir;

#[async_std::test]
#[ignore]
Expand All @@ -328,17 +326,18 @@ mod tests {
Path::new(env!("CARGO_MANIFEST_DIR")).join("dev-chain.json");
let bytes = async_std::fs::read(chain_spec_path).await.unwrap();
let chain_spec =
node_template::chain_spec::ChainSpec::from_json_bytes(bytes).unwrap();
test_node::chain_spec::ChainSpec::from_json_bytes(bytes).unwrap();
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
let config = SubxtClientConfig {
impl_name: "substrate-subxt-light-client",
impl_version: "0.0.1",
author: "David Craven",
copyright_start_year: 2020,
db: DatabaseConfig::RocksDb {
path: "/tmp/subxt-light-client".into(),
path: tmp.path().into(),
cache_size: 64,
},
builder: node_template::service::new_light,
builder: test_node::service::new_light,
chain_spec,
role: Role::Light,
};
Expand All @@ -358,18 +357,18 @@ mod tests {
#[async_std::test]
async fn test_full_client() {
env_logger::try_init().ok();
let chain_spec = node_template::chain_spec::development_config();
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
let config = SubxtClientConfig {
impl_name: "substrate-subxt-full-client",
impl_version: "0.0.1",
author: "David Craven",
copyright_start_year: 2020,
db: DatabaseConfig::RocksDb {
path: "/tmp/subxt-full-client".into(),
path: tmp.path().into(),
cache_size: 128,
},
builder: node_template::service::new_full,
chain_spec,
builder: test_node::service::new_full,
chain_spec: test_node::chain_spec::development_config(),
role: Role::Authority(AccountKeyring::Alice),
};
let client = ClientBuilder::<NodeTemplateRuntime>::new()
Expand Down
2 changes: 1 addition & 1 deletion proc-macro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ proc-macro2 = "1.0.18"
proc-macro-crate = "0.1.4"
proc-macro-error = "1.0.2"
quote = "1.0.7"
syn = "1.0.31"
syn = "1.0.33"
synstructure = "0.12.4"

[dev-dependencies]
Expand Down
26 changes: 26 additions & 0 deletions src/frame/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ mod tests {
Error,
RuntimeError,
},
events::EventsDecoder,
signer::{
PairSigner,
Signer,
},
subscription::EventSubscription,
system::AccountStoreExt,
tests::{
test_client,
Expand Down Expand Up @@ -201,4 +203,28 @@ mod tests {
panic!("expected an error");
}
}

#[async_std::test]
async fn test_transfer_subscription() {
env_logger::try_init().ok();
let alice = PairSigner::new(AccountKeyring::Alice.pair());
let bob = AccountKeyring::Bob.to_account_id();
let (client, _) = test_client().await;
let sub = client.subscribe_events().await.unwrap();
let mut decoder = EventsDecoder::<TestRuntime>::new(client.metadata().clone());
decoder.with_balances();
let mut sub = EventSubscription::<TestRuntime>::new(sub, decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&alice, &bob, 10_000).await.unwrap();
let raw = sub.next().await.unwrap().unwrap();
let event = TransferEvent::<TestRuntime>::decode(&mut &raw.data[..]).unwrap();
assert_eq!(
event,
TransferEvent {
from: alice.account_id().clone(),
to: bob.clone(),
amount: 10_000,
}
);
}
}
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod metadata;
mod rpc;
mod runtimes;
mod signer;
mod subscription;

pub use crate::{
error::Error,
Expand All @@ -87,6 +88,7 @@ pub use crate::{
},
runtimes::*,
signer::*,
subscription::*,
substrate_subxt_proc_macro::*,
};
use crate::{
Expand Down Expand Up @@ -446,6 +448,7 @@ mod tests {
pub(crate) type TestRuntime = crate::NodeTemplateRuntime;

pub(crate) async fn test_client() -> (Client<TestRuntime>, TempDir) {
env_logger::try_init().ok();
let tmp = TempDir::new("subxt-").expect("failed to create tempdir");
let config = SubxtClientConfig {
impl_name: "substrate-subxt-full-client",
Expand All @@ -456,8 +459,8 @@ mod tests {
path: tmp.path().into(),
cache_size: 128,
},
builder: node_template::service::new_full,
chain_spec: node_template::chain_spec::development_config(),
builder: test_node::service::new_full,
chain_spec: test_node::chain_spec::development_config(),
role: Role::Authority(AccountKeyring::Alice),
};
let client = ClientBuilder::new()
Expand All @@ -470,7 +473,6 @@ mod tests {

#[async_std::test]
async fn test_tx_transfer_balance() {
env_logger::try_init().ok();
let mut signer = PairSigner::new(AccountKeyring::Alice.pair());
let dest = AccountKeyring::Bob.to_account_id().into();

Expand Down
104 changes: 32 additions & 72 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ use crate::{
RawEvent,
},
frame::{
system::{
Phase,
System,
},
system::System,
Event,
},
metadata::Metadata,
runtimes::Runtime,
subscription::EventSubscription,
};

pub type ChainBlock<T> =
Expand Down Expand Up @@ -107,12 +106,12 @@ where
}

/// Client for substrate rpc interfaces
pub struct Rpc<T: System> {
pub struct Rpc<T: Runtime> {
client: Client,
marker: PhantomData<T>,
}

impl<T: System> Clone for Rpc<T> {
impl<T: Runtime> Clone for Rpc<T> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
Expand All @@ -121,7 +120,7 @@ impl<T: System> Clone for Rpc<T> {
}
}

impl<T: System> Rpc<T> {
impl<T: Runtime> Rpc<T> {
pub fn new(client: Client) -> Self {
Self {
client,
Expand Down Expand Up @@ -256,7 +255,7 @@ impl<T: System> Rpc<T> {
/// Subscribe to substrate System Events
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<<T as System>::Hash>>, Error> {
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
Expand Down Expand Up @@ -360,14 +359,31 @@ impl<T: System> Rpc<T> {
block_hash,
signed_block.block.extrinsics.len()
);
wait_for_block_events(
decoder,
ext_hash,
signed_block,
block_hash,
events_sub,
)
.await
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
}
None => {
Err(format!("Failed to find block {:?}", block_hash).into())
Expand Down Expand Up @@ -424,59 +440,3 @@ impl<T: System> ExtrinsicSuccess<T> {
}
}
}

/// Waits for events for the block triggered by the extrinsic
pub async fn wait_for_block_events<T: System>(
decoder: EventsDecoder<T>,
ext_hash: T::Hash,
signed_block: ChainBlock<T>,
block_hash: T::Hash,
events_subscription: Subscription<StorageChangeSet<T::Hash>>,
) -> Result<ExtrinsicSuccess<T>, Error> {
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash))
})?;

let mut subscription = events_subscription;
while let change_set = subscription.next().await {
// only interested in events for the given block
if change_set.block != block_hash {
continue
}
let mut events = Vec::new();
for (_key, data) in change_set.changes {
if let Some(data) = data {
match decoder.decode_events(&mut &data.0[..]) {
Ok(raw_events) => {
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if i as usize == ext_index {
events.push(event)
}
}
}
}
Err(err) => return Err(err),
}
}
}
return if !events.is_empty() {
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("No events found for block {}", block_hash).into())
}
}
unreachable!()
}
Loading

0 comments on commit 7f08471

Please sign in to comment.